package jiangdu.fire.job.wj.baseDb; import java.util.Calendar; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.util.CollectionUtils; import jiangdu.fire.job.ContextUtil; import jiangdu.fire.util.Contants; import jiangdu.fire.util.wj.DateUtils; import jiangdu.fire.util.wj.HttpClient4; /** * 同步警情json 到t_alarm。如果服务器down掉几天,会遗漏期间的数据 * @author wj * @date 2016-12-30 * */ @SuppressWarnings("unchecked") //阻止并发,防止maxBjsj混乱 @DisallowConcurrentExecution public class AlarmJob implements Job { static JdbcTemplate jdbcTemplate = (JdbcTemplate) ContextUtil .getApplicationContext().getBean("jdbcTemplate"); public static AtomicReference<List<Map<String,Object>>> cacheData = new AtomicReference<List<Map<String,Object>>>() ; public static final HttpClient4 httpClient4 = new HttpClient4(); static{ if(CollectionUtils.isEmpty(cacheData.get()) ){//服务器重启后,从数据库里捞数据 String sql = "select * from t_alarm where ALARM_TIME like ?"; String today = DateUtils.getDate("yyyy-MM-dd"); List<Map<String, Object>> list = jdbcTemplate.queryForList(sql,today+"%"); if(!CollectionUtils.isEmpty(list)){ for(Map<String, Object> m : list){ //将数据库的映射为JSON里的 m.put("JQBH", m.get("ALARM_NO")); } } cacheData.set(list); } } public static int count = 0; /** * @DisallowConcurrentExecution 已经阻止job并发了。如果还是并发httpClient4.get是同步方法,一样可以阻止并发 2017-2-18 by wj */ @Override public void execute(JobExecutionContext context) throws JobExecutionException { // try { // Thread.sleep(2000); //等待static执行完 // } catch (InterruptedException e1) { // // TODO Auto-generated catch block // e1.printStackTrace(); // } // // // 获取 JobDataMap , 并从中取出参数 // JobDataMap data = context.getJobDetail().getJobDataMap(); // String favoriteColor = data.getString(FAVORITE_COLOR); // data.put(EXECUTION_COUNT, count); // String url = Contants.alarm_url+DateUtils.getDate("yyyyMMdd"); try { long t1 = System.currentTimeMillis(); // 排序前取得当前时间 if(httpClient4.get(url,cacheData,Contants.TYPE_JQ)){ //也可以是static锁 // if(new HttpClient().get(url,maxBjsj,Contants.TYPE_JQ)){ long t2 = System.currentTimeMillis(); // 排序后取得当前时间 Calendar c = Calendar.getInstance(); c.setTimeInMillis(t2 - t1); System.out.println("----第"+ ++count +"次 同步结束----"+"耗时: " + c.get(Calendar.MINUTE) + "分 " + c.get(Calendar.SECOND) + "秒 " + c.get(Calendar.MILLISECOND) + " 毫秒"); } // } catch (Exception e) { e.printStackTrace(); } } }
package jiangdu.fire.util.wj; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.sql.SQLIntegrityConstraintViolationException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.ParseException; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLContexts; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentType; import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.content.FileBody; import org.apache.http.entity.mime.content.StringBody; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import jiangdu.fire.util.Contants; /** * 过滤调派中队,机构为null的,直到有值,再新增,不用缓存,基于DB的 * 模拟http请求 * @author wj * @date 2017-2-31 * */ public class HttpClient4 { private static Logger logger = LoggerFactory.getLogger(HttpClient4.class); public static final Integer timeOut = 30000; // public static Date maxBjsj ; @Test public void jUnitTest() throws Exception { // get("http://221.226.3.57:8888/fw/V_AJXX_20161229"); } /** * 发送 get请求 * @throws java.text.ParseException */ @SuppressWarnings({ "unused", "unchecked" }) public synchronized boolean get(String url ,AtomicReference<List<Map<String,Object>>> cacheData,String type) throws Exception{ CloseableHttpClient httpclient = HttpClients.createDefault(); try { // 创建httpget. HttpGet httpget = new HttpGet(url); // httpget.setParams(params); // set Timeout // RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(timeOut) // .setConnectTimeout(timeOut).setSocketTimeout(timeOut).build(); // httpget.setConfig(requestConfig); System.out.println("\n-----------------------"); logger.debug("executing request " + httpget.getURI()); // 执行get请求. CloseableHttpResponse response = httpclient.execute(httpget); try { // 获取响应实体 HttpEntity entity = response.getEntity(); // 打印响应状态 logger.info(response.getStatusLine().toString()); if(HttpStatus.SC_OK==(response.getStatusLine().getStatusCode())){ // if(200==(response.getStatusLine().getStatusCode())){ if (entity != null) { String s = EntityUtils.toString(entity, "UTF-8"); // 打印响应内容长度 logger.debug("Response content length: " + entity.getContentLength()); // 打印响应内容 logger.debug("Response content: " + s); Map<String,List<Map<String,Object> >> map = FastJsonUtil.stringToCollect(s); List<Map<String,Object> > data = map.get("data"); List<Map<String,Object> > originData = data; int total = data.size(); //今天的数据 data = getCurrentDateData(data, type); int todaySize = data.size(); List<Map<String,Object> > listWithoutDup = data; int originSize = listWithoutDup.size(); String jobName = ""; String cacheName = ""; String cacheKeyName = ""; if(Contants.TYPE_JQ.equals(type)){ //去重 listWithoutDup = withoutDuplicate(data, "JQBH",type); //警情去重 jobName="警情"; cacheName=Contants.jqCache; cacheKeyName=Contants.jqCachekey; }else if(Contants.TYPE_DPZD.equals(type)){ //不去重,存在“两条同一个警情号,调派中队为空。后来修改中队为不同的值的情况” jobName="调派中队"; cacheName=Contants.dpzdCache; cacheKeyName=Contants.dpzdCachekey; }else if(Contants.TYPE_DPCL.equals(type)){ //不去重,原因同 调派中队 jobName="调派车辆"; cacheName=Contants.dpclCache; cacheKeyName=Contants.dpclCachekey; }else{ throw new RuntimeException("不应该到这"); } // // if( !CollectionUtils.isEmpty(cacheData.get())){ //只会执行第一次,如果缓存里没有值 // // //必须放在这里,如果放在下面,因为插DB耗时间,导致,当下一次JOB执行时候,cache还没有值,就会使得产生重复待新增数据,导致主键冲突异常 // cacheData.set(listWithoutDup); //将今天的数据插入缓存,放入全局引用变量里,供运行时使用 // } //如果一开始缓存无值,就会全部进入forAdd集合里 Map<String,List<Map<String,Object> >> r = getResult(cacheData.get(), listWithoutDup, type); List<Map<String,Object>> forAdd = r.get("forAdd"); List<Map<String,Object>> forUpdate = r.get("forUpdate"); List<Map<String,Object>> nullCount = r.get("nullCount"); boolean ret = true; int count = 0; int countForUpdate = 0; int pkDupSize = 0; if(!CollectionUtils.isEmpty(forAdd)){ for(Map<String,Object> cur: forAdd){ try{ count ++; BeanMapper.setAndPersist(cur,type); //如果这里报错,因为已经在上面有了最大日期,下一次JOB、数据就为空除非有新数据 }catch (org.springframework.dao.DuplicateKeyException e){ // 过滤掉之前日期,不应该到这 logger.error(e.getMessage()+"--duplicate【"+jobName+"】: "+getPkStr(cur, type)); count --; pkDupSize++; } } if(ret){ String dupInfo = Contants.TYPE_JQ.equals(type) ? ",JSON里重复警情号数据:" + (originSize - listWithoutDup.size()) + ",与DB主键冲突的数据:" + (pkDupSize) : Contants.TYPE_DPZD.equals(type) ? ",与DB主键冲突的数据:" + (pkDupSize)+",调派中队为null的数据:" +(nullCount.get(0).get("nullCount")) : ",调派中队为null或者车辆ID为null的数据:" +(nullCount.get(0).get("nullCount")); logger.info("-------------【"+jobName+"】成功增加"+count+"条最新数据-----------------------"+" (总:"+total+",今天的数据:"+todaySize+ dupInfo +")"); } } // Assert.isTrue(CollectionUtils.isEmpty(forUpdate), Contants.MSG_NEVER_UPADATE_NON_NULL_xfjgid); if(!CollectionUtils.isEmpty(forUpdate)){ //永远不会有值 for(Map<String,Object> cur: forUpdate){ logger.info("------不应存在的情况-------【"+jobName+"】修改的:-----------------------"+getPkStr(cur, type)); } } if(CollectionUtils.isEmpty(forAdd) && CollectionUtils.isEmpty(forUpdate)){ logger.info("-------------【"+jobName+"】没有最新数据-----------------------"); ret= false; } cacheData.set(listWithoutDup); //将今天的数据插入缓存,放入全局引用变量里,供运行时使用。 return ret; } }else { logger.warn("------请求["+url+"]错误:"+response.getStatusLine().toString());// HTTP/1.1 404 Not Found return false; } } finally { response.close(); } } catch (ClientProtocolException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }catch (SQLIntegrityConstraintViolationException e) { logger.error("--------------主键冲突异常:"+e.getMessage()+"-----------------------"); } finally { // 关闭连接,释放资源 try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } } return false; } /** * 去重,如果重复,只保留第一个。并获得最大日期 * @author wj * @param list * @param key 要去重的map的key * @return */ public static List<Map<String,Object>> withoutDuplicate(List<Map<String,Object>> list,String key,String type){ if(!CollectionUtils.isEmpty(list)){ List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>(); Set<Object> keysSet = new HashSet<Object>(); for(Map<String,Object> map : list){ Object keys = map.get(key); int beforeSize = keysSet.size(); keysSet.add(keys); int afterSize = keysSet.size(); if(afterSize == beforeSize + 1){ tmpList.add(map); } } return tmpList; } return Collections.EMPTY_LIST; } public static List<Map<String,Object> > getCurrentDateData(List<Map<String,Object> > list,String type) throws java.text.ParseException{ if(!CollectionUtils.isEmpty(list)){ List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>(); for(Map<String,Object> map : list){ if(Contants.TYPE_JQ.equals(type)){ String BJSJ = (String)map.get("BJSJ");//报警时间 --是否对应到 ALARM_TIME Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000 BJSJ = DateUtils.formatDate(BJSJDate); if(BJSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else if(Contants.TYPE_DPZD.equals(type)){ String DPSJ = (String)map.get("DPSJ"); Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000 DPSJ = DateUtils.formatDate(DPSJDate); if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else if(Contants.TYPE_DPCL.equals(type)){ String DPSJ = (String)map.get("DPSJ"); Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2017/01/01 08:44:31.000 DPSJ = DateUtils.formatDate(DPSJDate); if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else{ throw new RuntimeException("不应该到这"); } } return tmpList; } return Collections.EMPTY_LIST; } public static void main(String[] s){ Date d1 = jiangdu.fire.util.wj.DateUtils.parseDate("2017-11-12 12:11:54"); Date d2 = jiangdu.fire.util.wj.DateUtils.parseDate("2017-11-12 12:11:44"); System.out.println(comparDate(d1, d2)); } /** * 返回 新增的数据和待修改的数据 * @param cacheData 缓存 * @param newData 新值 * @param type * @return */ public static Map<String,List<Map<String,Object> >> getResult(List<Map<String,Object> > cacheData,List<Map<String,Object> > newData,String type){ List<Map<String,Object> > forAdd =Lists.newArrayList(); List<Map<String,Object> > forUpdate =Lists.newArrayList(); int nullCount = 0; if(!CollectionUtils.isEmpty(newData)){ for(Map<String,Object> n : newData){ if(checkNull(n,type)){ nullCount++; continue; } Map<String, Object> c = contains(cacheData,n,type); if(CollectionUtils.isEmpty(c) ){//新增的 forAdd.add(n); }else{//在缓存里面找到了相应的 if(!compare(c,n,type)){//是否修改过 forUpdate.add(n); // throw new RuntimeException(Contants.MSG_NEVER_UPADATE_NON_NULL_xfjgid); } } } } Map<String,List<Map<String,Object> >> r = Maps.newConcurrentMap(); r.put("forAdd", forAdd); r.put("forUpdate", forUpdate); Map<String,Object> m = Maps.newHashMap(); m.put("nullCount",nullCount); r.put("nullCount", Lists.newArrayList(m)); return r; } /** * 用来判断机构是否为null,或者车辆是否为null。警情没有null的字段,不做处理,恒不为null * @param map * @param type * @return */ private static boolean checkNull(Map<String, Object> map, String type) { if(Contants.TYPE_JQ.equals(type)){ return false; }else if(Contants.TYPE_DPZD.equals(type)){ String XFJGID = (String)map.get("XFJGID"); return StringUtils.isEmpty(XFJGID); }else if(Contants.TYPE_DPCL.equals(type)){// String XFJGID = (String)map.get("XFJGID"); String CLID = (String)map.get("CLID"); return StringUtils.isEmpty(XFJGID) || StringUtils.isEmpty(CLID); }else{ throw new RuntimeException("不应该到这"); } } private static boolean compare(Map<String, Object> c, Map<String, Object> n, String type) { if(Contants.TYPE_JQ.equals(type)){ //警情不会修改字段 return true; }else if(Contants.TYPE_DPZD.equals(type)){ String x = StringUtils.defaultIfEmpty((String)n.get("XFJGID"), ""); String y = StringUtils.defaultIfEmpty((String)c.get("XFJGID"), ""); return x.equals(y); }else if(Contants.TYPE_DPCL.equals(type)){ String x1 = StringUtils.defaultIfEmpty((String)n.get("XFJGID"), ""); String x2 = StringUtils.defaultIfEmpty((String)n.get("CLID"), ""); String y1 = StringUtils.defaultIfEmpty((String)c.get("XFJGID"), ""); String y2 = StringUtils.defaultIfEmpty((String)c.get("CLID"), ""); return x1.equals(y1) && x2.equals(y2); }else{ throw new RuntimeException("不应该到这"); } } private static Map<String, Object> contains(List<Map<String, Object>> cacheData, Map<String, Object> n, String type) { if(Contants.TYPE_JQ.equals(type)){ if(CollectionUtils.isEmpty(cacheData)){ return Collections.EMPTY_MAP; } String y1 = StringUtils.defaultIfEmpty((String)n.get("JQBH"), ""); for(Map<String, Object> c :cacheData ){ String x1 = StringUtils.defaultIfEmpty((String)c.get("JQBH"), ""); // String x1 = StringUtils.defaultIfEmpty((String)c.get("ALARM_NO"), ""); if(x1.equals(y1)){ return c; } } return Collections.EMPTY_MAP; }else if(Contants.TYPE_DPZD.equals(type)){ if(CollectionUtils.isEmpty(cacheData)){ return Collections.EMPTY_MAP; } String y1 = StringUtils.defaultIfEmpty((String)n.get("JQID"), ""); String y2 = StringUtils.defaultIfEmpty((String)n.get("XFJGID"), ""); for(Map<String, Object> c :cacheData ){ String x1 = StringUtils.defaultIfEmpty((String)c.get("JQID"), ""); String x2 = StringUtils.defaultIfEmpty((String)c.get("XFJGID"), ""); //警情和机构都一样,或者,警情相等、时间相等、机构为Null ,都视为准备修改的记录 if((x1.equals(y1) && x2.equals(y2)) ){ return c; } } return Collections.EMPTY_MAP; }else if(Contants.TYPE_DPCL.equals(type)){ if(CollectionUtils.isEmpty(cacheData)){ return Collections.EMPTY_MAP; } String y1 = StringUtils.defaultIfEmpty((String)n.get("JQID"), ""); String y2 = StringUtils.defaultIfEmpty((String)n.get("XFJGID"), ""); String y3 = StringUtils.defaultIfEmpty((String)n.get("CLID"), ""); for(Map<String, Object> c :cacheData ){ String x1 = StringUtils.defaultIfEmpty((String)c.get("JQID"), ""); String x2 = StringUtils.defaultIfEmpty((String)c.get("XFJGID"), ""); String x3 = StringUtils.defaultIfEmpty((String)c.get("CLID"), ""); //警情、机构、车辆都一样,或者,警情相等、时间相等、机构为Null、车辆为Null ,都视为准备修改的记录 if(x1.equals(y1) && x2.equals(y2) && x3.equals(y3) ){ return c; } } return Collections.EMPTY_MAP; }else{ throw new RuntimeException("不应该到这"); } } public static Date getBJSJDate(Map<String,Object> map,String type){ String BJSJ =""; if(Contants.TYPE_JQ.equals(type)){ BJSJ = (String)map.get("BJSJ");//报警时间 --是否对应到 ALARM_TIME }else if(Contants.TYPE_DPZD.equals(type)){ BJSJ = (String)map.get("DPSJ"); }else if(Contants.TYPE_DPCL.equals(type)){// BJSJ = (String)map.get("DPSJ"); }else{ throw new RuntimeException("不应该到这"); } try { Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS");//2016/12/29 13:56:22.000 return BJSJDate; } catch (java.text.ParseException e) { e.printStackTrace(); } return null; } public static int comparDate(Date DATE1, Date DATE2) { try { if(DATE1 ==null && DATE2==null){ return 0; }else if(DATE1 ==null || DATE2==null){ throw new Exception(); } if (DATE1.getTime() > DATE2.getTime()) { // System.out.println("dt1 在dt2前"); return 1; } else if (DATE1.getTime() < DATE2.getTime()) { // System.out.println("dt1在dt2后"); return -1; } else { return 0; } } catch (Exception exception) { exception.printStackTrace(); } return 0; } public static String getPkStr(Map<String,Object> map,String type) { String pk = ""; if(Contants.TYPE_JQ.equals(type)){ pk = "{JQBH:"+(String)map.get("JQBH")+"}"; }else if(Contants.TYPE_DPZD.equals(type)){ pk = "{JQID:"+(String)map.get("JQID")+",XFJGID:"+(String)map.get("XFJGID")+"}"; }else if(Contants.TYPE_DPCL.equals(type)){// pk = "{JQID:"+(String)map.get("JQID")+",XFJGID:"+(String)map.get("XFJGID")+",CLID:"+(String)map.get("CLID")+"}"; }else{ throw new RuntimeException("不应该到这"); } return pk; } }
相关推荐
自定义JavaBean作为MapReduce的数据类型需实现WritableComparable接口,同时可自定义OutputFormat以适应特定的输出需求。MapReduce适用于多种场景,如排序、统计TopN、join操作和寻找共同好友问题。 Hive是一个基于...
### 大数据技术之Storm知识点解析 #### 一、Storm概述 **1.1 离线计算** 离线计算是指对于数据进行批量处理的过程,主要包括数据的批量获取、批量传输以及周期性的批量计算和展示。典型的离线计算技术包括: - **...
大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是Hadoop的大数据处理框架,主要由两个...
Hadoop MapReduce处理的数据类型不是标准的Java类型,而是实现了WritableComparable接口的特殊类型,如IntWritable、LongWritable、Text等,这些类型能够被序列化,方便在网络间传输和存储。在WordCount程序中,Map...
MapReduce 的 Java 编程接口主要包括 Mapper 和 Reducer 接口。通过实现这些接口,开发者可以定义如何对输入数据进行映射和归约处理。 ##### Mapper API Mapper 类负责处理输入键值对,并生成中间键值对。Mapper ...
MapReduce通过InputFormat、Mapper、Partitioner、Reducer和OutputFormat等扩展接口,允许用户自定义处理逻辑,适应不同的数据处理需求。 Hadoop生态系统还包括多个辅助工具和项目,如Pig和Hive。Pig提供了一种SQL-...
**Spark**通过引入弹性分布式数据集(RDD),提供了更丰富的数据操作接口,并允许Job中间结果存于内存,避免了频繁的磁盘读写,极大地提升了迭代计算效率。Spark的**MLLIB**库相比Mahout在实际应用中表现出更高的...
### 大数据技术分享:Hadoop运行原理分析 #### 一、概论 Hadoop作为一个开源框架,主要用于处理大规模的数据集。它通过提供一个高效、可靠、可扩展的基础架构来支持分布式数据处理任务。Hadoop的核心组件包括HDFS...
【标题】中的“基于Spark的地铁大数据客流分析系统”是一个以Apache Spark为核心技术的数据处理与分析项目,旨在解决城市地铁运营中的客流统计、预测及优化问题。Spark作为一个快速、通用的大数据处理框架,能高效地...
1. **易于编程**:MapReduce 的编程模型简单,只需要实现 Map 和 Reduce 两个接口,即可实现分布式计算。对于程序员来说,编写 MapReduce 程序就像编写普通的串行程序一样。 2. **良好的扩展性**:MapReduce 可以...
在大数据环境中,`main`方法通常包含驱动程序逻辑,负责初始化数据源、配置处理框架(如Hadoop或Spark)、定义计算任务,并最终执行这些任务。在这个实训中,`main.java`可能包含对大数据处理框架的调用,比如初始化...
- Driver程序是MapReduce作业的启动点,负责创建Job对象,配置输入输出路径,以及提交作业到集群运行。 - 数据在MapReduce中以键值对的形式流动,输入和输出都是kv对,需要根据业务需求定义合适的键值对类型。 3...
在规约阶段,所有映射结果会被汇集并合并,得出最终的单词总数。 三、WordCount代码分析 1. **Mapper阶段**:Mapper类通常是实现`org.apache.hadoop.mapreduce.Mapper`接口。在这个阶段,Mapper读取输入数据(通常...
`easydc`(Easy Distributed Computing)就是这样一个为开发者提供简单易用接口的分布式计算框架,它使得在Python环境中实现MapReduce模型变得更加简便。本文将深入探讨`easydc`的核心特性、工作原理以及如何在实际...
Reduce阶段则接收Map阶段的结果,进行聚合操作,例如汇总、排序等,以产生最终结果。 在标签中,“大数据”和“hadoop”是关键词。大数据指的是处理海量、快速增长的数据集,这些数据集具有高复杂性,传统数据处理...
Hadoop提供了一套丰富的API,包括FileSystem API用于文件操作,InputFormat和OutputFormat接口用于定制数据读写,RecordReader和RecordWriter用于读写单条记录,以及Partitioner和Comparator用于控制数据分发和排序...
### Hadoop权威指南第三版知识点概述 #### 一、了解Hadoop **1.1 数据!数据存储与分析** - **大数据背景**:随着互联网的发展,数据量急剧增长,传统的关系型数据库管理系统(RDBMS)难以应对海量数据的处理需求...
WordCount的实现涉及到了Hadoop自定义的数据类型,如Text和IntWritable,它们实现了WritableComparable接口,便于数据序列化和比较。Map函数解析输入文本,生成<单词, 1>的键值对,reduce函数则将所有相同单词的计数...
Spark是大数据处理领域的一个重要框架,由马泰·扎哈里亚在2009年创建,并最终归入Apache软件基金会。它以其高性能、易用性和广泛的功能集而闻名,旨在简化大规模数据处理的任务。Spark提供了一个高级编程接口,允许...