`

JOB调接口大数据 最终版

 
阅读更多
package jiangdu.fire.job.wj.baseDb;

import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.CollectionUtils;

import jiangdu.fire.job.ContextUtil;
import jiangdu.fire.util.Contants;
import jiangdu.fire.util.wj.DateUtils;
import jiangdu.fire.util.wj.HttpClient4;

/**
 * 同步警情json 到t_alarm。如果服务器down掉几天,会遗漏期间的数据
 * @author wj
 * @date 2016-12-30
 *
 */
@SuppressWarnings("unchecked")
//阻止并发,防止maxBjsj混乱
@DisallowConcurrentExecution
public class AlarmJob implements Job {
	
	static JdbcTemplate jdbcTemplate = (JdbcTemplate) ContextUtil
			.getApplicationContext().getBean("jdbcTemplate");
	
	public static  AtomicReference<List<Map<String,Object>>>  cacheData = new  AtomicReference<List<Map<String,Object>>>() ;
	
	public static final HttpClient4 httpClient4 = new HttpClient4();
	
	static{ 
		if(CollectionUtils.isEmpty(cacheData.get()) ){//服务器重启后,从数据库里捞数据
			
			String sql = "select * from t_alarm  where ALARM_TIME  like ?";
			String today = DateUtils.getDate("yyyy-MM-dd");
			List<Map<String, Object>> list = jdbcTemplate.queryForList(sql,today+"%");
			if(!CollectionUtils.isEmpty(list)){
			
				for(Map<String, Object> m : list){ //将数据库的映射为JSON里的
					m.put("JQBH", m.get("ALARM_NO"));
				}
			}
			
			
			cacheData.set(list);
			
			
			
		}
	}
	
	public static int count = 0;
	
	/**
	 *  @DisallowConcurrentExecution 已经阻止job并发了。如果还是并发httpClient4.get是同步方法,一样可以阻止并发 2017-2-18 by wj
	 */
	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {
//			try {
//				Thread.sleep(2000);  //等待static执行完
//			} catch (InterruptedException e1) {
//				// TODO Auto-generated catch block
//				e1.printStackTrace();
//			}
//
//        // 获取 JobDataMap , 并从中取出参数   
//        JobDataMap data = context.getJobDetail().getJobDataMap();  
//        String favoriteColor = data.getString(FAVORITE_COLOR);  
//        data.put(EXECUTION_COUNT, count);  
//		
		
		String url = Contants.alarm_url+DateUtils.getDate("yyyyMMdd");
		
		try {
			 long t1 = System.currentTimeMillis(); // 排序前取得当前时间  
			 
			if(httpClient4.get(url,cacheData,Contants.TYPE_JQ)){ //也可以是static锁
//				if(new HttpClient().get(url,maxBjsj,Contants.TYPE_JQ)){
				
				 long t2 = System.currentTimeMillis(); // 排序后取得当前时间  
				  Calendar c = Calendar.getInstance();  
			        c.setTimeInMillis(t2 - t1);  
			        System.out.println("----第"+ ++count +"次 同步结束----"+"耗时: " + c.get(Calendar.MINUTE) + "分 "  
		                + c.get(Calendar.SECOND) + "秒 " + c.get(Calendar.MILLISECOND)  
		                + " 毫秒");
			}
//		
		} catch (Exception e) {
			e.printStackTrace();
		}
	
	}

	
}

 

 

 

 

 

 

package jiangdu.fire.util.wj;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.SSLContext;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.ParseException;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.entity.mime.content.StringBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import jiangdu.fire.util.Contants;

/**
 * 过滤调派中队,机构为null的,直到有值,再新增,不用缓存,基于DB的
 * 模拟http请求
 * @author wj
 * @date 2017-2-31
 *
 */
public class HttpClient4 {

	private static Logger logger = LoggerFactory.getLogger(HttpClient4.class);
	
	public static final Integer timeOut = 30000;
	
	
//	public static  Date maxBjsj ;
	@Test
	public void jUnitTest() throws Exception {
//		get("http://221.226.3.57:8888/fw/V_AJXX_20161229");
		
		
	}


	/**
	 * 发送 get请求
	 * @throws java.text.ParseException 
	 */
	@SuppressWarnings({ "unused", "unchecked" })
	public  synchronized  boolean get(String url ,AtomicReference<List<Map<String,Object>>> cacheData,String type)  throws Exception{
		CloseableHttpClient httpclient = HttpClients.createDefault();
		
		try {
			// 创建httpget.  
			HttpGet httpget = new HttpGet(url);
//			httpget.setParams(params);
			
			// set Timeout
//			RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(timeOut)
//					.setConnectTimeout(timeOut).setSocketTimeout(timeOut).build();
//			httpget.setConfig(requestConfig);
			

			System.out.println("\n-----------------------");
			logger.debug("executing request " + httpget.getURI());
			
			// 执行get请求.  
			CloseableHttpResponse response = httpclient.execute(httpget);
			try {
				// 获取响应实体  
				HttpEntity entity = response.getEntity();
				
				// 打印响应状态  
				logger.info(response.getStatusLine().toString());	
				
				if(HttpStatus.SC_OK==(response.getStatusLine().getStatusCode())){
//					if(200==(response.getStatusLine().getStatusCode())){
					if (entity != null) {
						String s = EntityUtils.toString(entity, "UTF-8");
						// 打印响应内容长度  
						logger.debug("Response content length: " + entity.getContentLength());
						// 打印响应内容  
						logger.debug("Response content: " + s);
						
						Map<String,List<Map<String,Object> >> map = FastJsonUtil.stringToCollect(s);
						List<Map<String,Object> > data = map.get("data");
						List<Map<String,Object> >  originData = data;
						int total = data.size();
						//今天的数据
						data = getCurrentDateData(data, type);
						int todaySize = data.size();
						
						
						List<Map<String,Object> > listWithoutDup = data;
						int originSize = listWithoutDup.size();
						
						String jobName = "";
						String cacheName = "";
						String cacheKeyName = "";
						if(Contants.TYPE_JQ.equals(type)){
							//去重 
							listWithoutDup =  withoutDuplicate(data, "JQBH",type);  //警情去重
							jobName="警情";
							cacheName=Contants.jqCache;
							cacheKeyName=Contants.jqCachekey;
						}else if(Contants.TYPE_DPZD.equals(type)){  //不去重,存在“两条同一个警情号,调派中队为空。后来修改中队为不同的值的情况”
							jobName="调派中队";
							cacheName=Contants.dpzdCache;
							cacheKeyName=Contants.dpzdCachekey;
						}else if(Contants.TYPE_DPCL.equals(type)){ //不去重,原因同 调派中队
							jobName="调派车辆";
							cacheName=Contants.dpclCache;
							cacheKeyName=Contants.dpclCachekey;
						}else{
							throw new RuntimeException("不应该到这");
						}
//						
//						if( !CollectionUtils.isEmpty(cacheData.get())){ //只会执行第一次,如果缓存里没有值
//							
//							//必须放在这里,如果放在下面,因为插DB耗时间,导致,当下一次JOB执行时候,cache还没有值,就会使得产生重复待新增数据,导致主键冲突异常
//							cacheData.set(listWithoutDup); //将今天的数据插入缓存,放入全局引用变量里,供运行时使用
//						}
						
						
						//如果一开始缓存无值,就会全部进入forAdd集合里
						Map<String,List<Map<String,Object> >> r = getResult(cacheData.get(), listWithoutDup, type);
						List<Map<String,Object>> forAdd = r.get("forAdd");
						List<Map<String,Object>> forUpdate = r.get("forUpdate");
						List<Map<String,Object>> nullCount = r.get("nullCount");
						
					
						boolean ret = true;
						int count = 0;
						int countForUpdate = 0;
						int pkDupSize = 0;
						if(!CollectionUtils.isEmpty(forAdd)){
							for(Map<String,Object> cur: forAdd){
								try{
									count ++;
									BeanMapper.setAndPersist(cur,type); //如果这里报错,因为已经在上面有了最大日期,下一次JOB、数据就为空除非有新数据
								}catch (org.springframework.dao.DuplicateKeyException e){ // 过滤掉之前日期,不应该到这
									logger.error(e.getMessage()+"--duplicate【"+jobName+"】: "+getPkStr(cur, type));
									count --;
									pkDupSize++;
								}
								
							}
							if(ret){
								String dupInfo = Contants.TYPE_JQ.equals(type)
										? ",JSON里重复警情号数据:" + (originSize - listWithoutDup.size()) + ",与DB主键冲突的数据:"
												+ (pkDupSize)
										: Contants.TYPE_DPZD.equals(type) ? ",与DB主键冲突的数据:" + (pkDupSize)+",调派中队为null的数据:" +(nullCount.get(0).get("nullCount"))
										: ",调派中队为null或者车辆ID为null的数据:" +(nullCount.get(0).get("nullCount"));
								logger.info("-------------【"+jobName+"】成功增加"+count+"条最新数据-----------------------"+" (总:"+total+",今天的数据:"+todaySize+ dupInfo +")");
							}
						}
						
//						Assert.isTrue(CollectionUtils.isEmpty(forUpdate), Contants.MSG_NEVER_UPADATE_NON_NULL_xfjgid);
						if(!CollectionUtils.isEmpty(forUpdate)){ //永远不会有值
							for(Map<String,Object> cur: forUpdate){
								
								logger.info("------不应存在的情况-------【"+jobName+"】修改的:-----------------------"+getPkStr(cur, type));
							}
						}
						
						if(CollectionUtils.isEmpty(forAdd) && CollectionUtils.isEmpty(forUpdate)){
							logger.info("-------------【"+jobName+"】没有最新数据-----------------------");
							ret=  false;
						}
						
						cacheData.set(listWithoutDup); //将今天的数据插入缓存,放入全局引用变量里,供运行时使用。
						
						return ret;
						
					}
				}else {
					logger.warn("------请求["+url+"]错误:"+response.getStatusLine().toString());//   HTTP/1.1 404 Not Found
					return false;
					
				}
				
			} finally {
				response.close();
				
			}
		} catch (ClientProtocolException e) {
			e.printStackTrace();
		} catch (ParseException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}catch (SQLIntegrityConstraintViolationException e) {
			logger.error("--------------主键冲突异常:"+e.getMessage()+"-----------------------");
		} finally {
			// 关闭连接,释放资源  
			try {
				httpclient.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return false;
		
	}
	
	
	
	
	/**
	 * 去重,如果重复,只保留第一个。并获得最大日期
	 * @author wj
	 * @param list
	 * @param key 要去重的map的key
	 * @return
	 */
	public static List<Map<String,Object>>  withoutDuplicate(List<Map<String,Object>> list,String key,String type){
		
			if(!CollectionUtils.isEmpty(list)){
				 List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			        Set<Object> keysSet = new HashSet<Object>();
			        for(Map<String,Object> map : list){
			            Object keys = map.get(key);
			            int beforeSize = keysSet.size();
			            keysSet.add(keys);
			            int afterSize = keysSet.size();
			            if(afterSize == beforeSize + 1){
			                tmpList.add(map);
			                
			            }
			        }
			        return tmpList;
				
			}
			return Collections.EMPTY_LIST;
		  
	}
	
	
	public static List<Map<String,Object> > getCurrentDateData(List<Map<String,Object> >  list,String type) throws java.text.ParseException{
		
		if(!CollectionUtils.isEmpty(list)){
			List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			for(Map<String,Object> map : list){
				
				if(Contants.TYPE_JQ.equals(type)){
					
					String BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
					Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					BJSJ = DateUtils.formatDate(BJSJDate);
					
					if(BJSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPZD.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPCL.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2017/01/01 08:44:31.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else{
					throw new RuntimeException("不应该到这");
				}
				
			}
			
			return tmpList;
		
		}
		return Collections.EMPTY_LIST;
		
	}
	
	public static void main(String[] s){
		Date d1 = jiangdu.fire.util.wj.DateUtils.parseDate("2017-11-12 12:11:54");
		Date d2 = jiangdu.fire.util.wj.DateUtils.parseDate("2017-11-12 12:11:44");
		System.out.println(comparDate(d1, d2));
		
	}
	
	/**
	 * 返回 新增的数据和待修改的数据
	 * @param cacheData 缓存
	 * @param newData 新值
	 * @param type
	 * @return
	 */
	public static Map<String,List<Map<String,Object> >>  getResult(List<Map<String,Object> > cacheData,List<Map<String,Object> > newData,String type){
		List<Map<String,Object> > forAdd =Lists.newArrayList();
		List<Map<String,Object> > forUpdate =Lists.newArrayList();
		int nullCount = 0;
		if(!CollectionUtils.isEmpty(newData)){
			for(Map<String,Object> n : newData){
				if(checkNull(n,type)){
					nullCount++;
					continue;
				}
				Map<String, Object> c = contains(cacheData,n,type);
				if(CollectionUtils.isEmpty(c) ){//新增的
					forAdd.add(n);
				}else{//在缓存里面找到了相应的
					
					if(!compare(c,n,type)){//是否修改过
						forUpdate.add(n);
//						throw new RuntimeException(Contants.MSG_NEVER_UPADATE_NON_NULL_xfjgid);
					}
					
				}
			}
		}
		Map<String,List<Map<String,Object> >> r = Maps.newConcurrentMap();
		r.put("forAdd", forAdd);
		r.put("forUpdate", forUpdate);
		
		Map<String,Object>  m = Maps.newHashMap();
		m.put("nullCount",nullCount);
		r.put("nullCount", Lists.newArrayList(m));
		return r;
		
	}


	/**
	 * 用来判断机构是否为null,或者车辆是否为null。警情没有null的字段,不做处理,恒不为null
	 * @param map
	 * @param type
	 * @return
	 */
	private static boolean checkNull(Map<String, Object> map, String type) {
		if(Contants.TYPE_JQ.equals(type)){
			return false;
		}else if(Contants.TYPE_DPZD.equals(type)){
			String XFJGID = (String)map.get("XFJGID");
			return StringUtils.isEmpty(XFJGID);
		}else if(Contants.TYPE_DPCL.equals(type)){//
			String XFJGID = (String)map.get("XFJGID");
			String CLID = (String)map.get("CLID");
			return StringUtils.isEmpty(XFJGID) || StringUtils.isEmpty(CLID);
		}else{
			throw new RuntimeException("不应该到这");
		}
	}




	private static boolean compare(Map<String, Object> c, Map<String, Object> n, String type) {

		if(Contants.TYPE_JQ.equals(type)){ //警情不会修改字段
			
			return true;
			
		}else if(Contants.TYPE_DPZD.equals(type)){
			

			String x =  StringUtils.defaultIfEmpty((String)n.get("XFJGID"), "");
			String y =  StringUtils.defaultIfEmpty((String)c.get("XFJGID"), "");
			return x.equals(y);
			
		}else if(Contants.TYPE_DPCL.equals(type)){

			String x1 =  StringUtils.defaultIfEmpty((String)n.get("XFJGID"), "");
			String x2 =  StringUtils.defaultIfEmpty((String)n.get("CLID"), "");
			String y1 =  StringUtils.defaultIfEmpty((String)c.get("XFJGID"), "");
			String y2 =  StringUtils.defaultIfEmpty((String)c.get("CLID"), "");
			return x1.equals(y1) && x2.equals(y2);
		
			
		
		}else{
			throw new RuntimeException("不应该到这");
		}
	
	}


	private static Map<String, Object> contains(List<Map<String, Object>> cacheData, Map<String, Object> n, String type) {
		if(Contants.TYPE_JQ.equals(type)){
			if(CollectionUtils.isEmpty(cacheData)){
				return Collections.EMPTY_MAP;
			}
			String y1 =  StringUtils.defaultIfEmpty((String)n.get("JQBH"), "");
			
			for(Map<String, Object> c :cacheData ){
				String x1 =  StringUtils.defaultIfEmpty((String)c.get("JQBH"), "");
//				String x1 =  StringUtils.defaultIfEmpty((String)c.get("ALARM_NO"), "");
				if(x1.equals(y1)){
					return c;
				}
			}
			return Collections.EMPTY_MAP;
		}else if(Contants.TYPE_DPZD.equals(type)){
			

			if(CollectionUtils.isEmpty(cacheData)){
				return Collections.EMPTY_MAP;
			}
			String y1 =  StringUtils.defaultIfEmpty((String)n.get("JQID"), "");
			String y2 =  StringUtils.defaultIfEmpty((String)n.get("XFJGID"), "");
			for(Map<String, Object> c :cacheData ){
				String x1 =  StringUtils.defaultIfEmpty((String)c.get("JQID"), "");
				String x2 =  StringUtils.defaultIfEmpty((String)c.get("XFJGID"), "");
				//警情和机构都一样,或者,警情相等、时间相等、机构为Null ,都视为准备修改的记录
				if((x1.equals(y1) && x2.equals(y2)) ){
					return c;
				}
			}
			return Collections.EMPTY_MAP;
		
			
		}else if(Contants.TYPE_DPCL.equals(type)){
			
			

			if(CollectionUtils.isEmpty(cacheData)){
				return Collections.EMPTY_MAP;
			}
			String y1 =  StringUtils.defaultIfEmpty((String)n.get("JQID"), "");
			String y2 =  StringUtils.defaultIfEmpty((String)n.get("XFJGID"), "");
			String y3 =  StringUtils.defaultIfEmpty((String)n.get("CLID"), "");
			for(Map<String, Object> c :cacheData ){
				String x1 =  StringUtils.defaultIfEmpty((String)c.get("JQID"), "");
				String x2 =  StringUtils.defaultIfEmpty((String)c.get("XFJGID"), "");
				String x3 =  StringUtils.defaultIfEmpty((String)c.get("CLID"), "");
				//警情、机构、车辆都一样,或者,警情相等、时间相等、机构为Null、车辆为Null ,都视为准备修改的记录
				if(x1.equals(y1) && x2.equals(y2) && x3.equals(y3) 
						){
					return c;
				}
			}
			return Collections.EMPTY_MAP;
		
			
		
		}else{
			throw new RuntimeException("不应该到这");
		}
	}
	
	
	public static Date getBJSJDate(Map<String,Object> map,String type){
		String BJSJ ="";
		if(Contants.TYPE_JQ.equals(type)){
			 BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
		}else if(Contants.TYPE_DPZD.equals(type)){
			 BJSJ = (String)map.get("DPSJ");
		}else if(Contants.TYPE_DPCL.equals(type)){//
			 BJSJ = (String)map.get("DPSJ");
		}else{
			throw new RuntimeException("不应该到这");
		}
		
		
		
		try {
			Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS");//2016/12/29 13:56:22.000
			return BJSJDate;
		} catch (java.text.ParseException e) {
			e.printStackTrace();
		} 
		return null;
	}
	
	public static int comparDate(Date DATE1, Date DATE2) {
        try {
        	if(DATE1 ==null && DATE2==null){
        		return 0;
        	}else if(DATE1 ==null ||  DATE2==null){
        		throw new Exception();
        	}
            if (DATE1.getTime() > DATE2.getTime()) {
//                System.out.println("dt1 在dt2前");
                return 1;
            } else if (DATE1.getTime() < DATE2.getTime()) {
//                System.out.println("dt1在dt2后");
                return -1;
            } else {
                return 0;
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return 0;
    }
	public static String getPkStr(Map<String,Object> map,String type) {
		String pk = "";
		if(Contants.TYPE_JQ.equals(type)){
			pk = "{JQBH:"+(String)map.get("JQBH")+"}";
		}else if(Contants.TYPE_DPZD.equals(type)){
			pk = "{JQID:"+(String)map.get("JQID")+",XFJGID:"+(String)map.get("XFJGID")+"}";
		}else if(Contants.TYPE_DPCL.equals(type)){//
			pk = "{JQID:"+(String)map.get("JQID")+",XFJGID:"+(String)map.get("XFJGID")+",CLID:"+(String)map.get("CLID")+"}";
		}else{
			throw new RuntimeException("不应该到这");
		}
		return pk;
	}
	
}

 

分享到:
评论

相关推荐

    大数据框架整理.pdf

    自定义JavaBean作为MapReduce的数据类型需实现WritableComparable接口,同时可自定义OutputFormat以适应特定的输出需求。MapReduce适用于多种场景,如排序、统计TopN、join操作和寻找共同好友问题。 Hive是一个基于...

    大数据技术之Storm.doc

    ### 大数据技术之Storm知识点解析 #### 一、Storm概述 **1.1 离线计算** 离线计算是指对于数据进行批量处理的过程,主要包括数据的批量获取、批量传输以及周期性的批量计算和展示。典型的离线计算技术包括: - **...

    大数据MapReduce和YARN二次开发.pdf

    大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是Hadoop的大数据处理框架,主要由两个...

    MapReduce开发 大数据 入门 学习

    Hadoop MapReduce处理的数据类型不是标准的Java类型,而是实现了WritableComparable接口的特殊类型,如IntWritable、LongWritable、Text等,这些类型能够被序列化,方便在网络间传输和存储。在WordCount程序中,Map...

    大数据-Hadoop-MapReduce介绍

    MapReduce 的 Java 编程接口主要包括 Mapper 和 Reducer 接口。通过实现这些接口,开发者可以定义如何对输入数据进行映射和归约处理。 ##### Mapper API Mapper 类负责处理输入键值对,并生成中间键值对。Mapper ...

    大数据应用技术介绍.pptx

    MapReduce通过InputFormat、Mapper、Partitioner、Reducer和OutputFormat等扩展接口,允许用户自定义处理逻辑,适应不同的数据处理需求。 Hadoop生态系统还包括多个辅助工具和项目,如Pig和Hive。Pig提供了一种SQL-...

    大数据技术进展与发展趋势.pdf

    **Spark**通过引入弹性分布式数据集(RDD),提供了更丰富的数据操作接口,并允许Job中间结果存于内存,避免了频繁的磁盘读写,极大地提升了迭代计算效率。Spark的**MLLIB**库相比Mahout在实际应用中表现出更高的...

    大数据技术分享 Hadoop运行原理分析 共3页.pdf

    ### 大数据技术分享:Hadoop运行原理分析 #### 一、概论 Hadoop作为一个开源框架,主要用于处理大规模的数据集。它通过提供一个高效、可靠、可扩展的基础架构来支持分布式数据处理任务。Hadoop的核心组件包括HDFS...

    计算机课程毕设:基于spark的地铁大数据客流分析系统.zip

    【标题】中的“基于Spark的地铁大数据客流分析系统”是一个以Apache Spark为核心技术的数据处理与分析项目,旨在解决城市地铁运营中的客流统计、预测及优化问题。Spark作为一个快速、通用的大数据处理框架,能高效地...

    15_尚硅谷大数据之MapReduce入门1

    1. **易于编程**:MapReduce 的编程模型简单,只需要实现 Map 和 Reduce 两个接口,即可实现分布式计算。对于程序员来说,编写 MapReduce 程序就像编写普通的串行程序一样。 2. **良好的扩展性**:MapReduce 可以...

    java代码-大数据 03 实训4-4

    在大数据环境中,`main`方法通常包含驱动程序逻辑,负责初始化数据源、配置处理框架(如Hadoop或Spark)、定义计算任务,并最终执行这些任务。在这个实训中,`main.java`可能包含对大数据处理框架的调用,比如初始化...

    15、MapReduce介绍及wordcount

    - Driver程序是MapReduce作业的启动点,负责创建Job对象,配置输入输出路径,以及提交作业到集群运行。 - 数据在MapReduce中以键值对的形式流动,输入和输出都是kv对,需要根据业务需求定义合适的键值对类型。 3...

    Hadoop入门脚本WordCount

    在规约阶段,所有映射结果会被汇集并合并,得出最终的单词总数。 三、WordCount代码分析 1. **Mapper阶段**:Mapper类通常是实现`org.apache.hadoop.mapreduce.Mapper`接口。在这个阶段,Mapper读取输入数据(通常...

    Python-easydc一个简单易用的分布式计算框架

    `easydc`(Easy Distributed Computing)就是这样一个为开发者提供简单易用接口的分布式计算框架,它使得在Python环境中实现MapReduce模型变得更加简便。本文将深入探讨`easydc`的核心特性、工作原理以及如何在实际...

    map-reduce.rar

    Reduce阶段则接收Map阶段的结果,进行聚合操作,例如汇总、排序等,以产生最终结果。 在标签中,“大数据”和“hadoop”是关键词。大数据指的是处理海量、快速增长的数据集,这些数据集具有高复杂性,传统数据处理...

    Hadoop技术文档

    Hadoop提供了一套丰富的API,包括FileSystem API用于文件操作,InputFormat和OutputFormat接口用于定制数据读写,RecordReader和RecordWriter用于读写单条记录,以及Partitioner和Comparator用于控制数据分发和排序...

    hadoop_the_definitive_guide_3nd_edition.pdf

    ### Hadoop权威指南第三版知识点概述 #### 一、了解Hadoop **1.1 数据!数据存储与分析** - **大数据背景**:随着互联网的发展,数据量急剧增长,传统的关系型数据库管理系统(RDBMS)难以应对海量数据的处理需求...

    MapReduce经典例子WordCount运行详解.pdf

    WordCount的实现涉及到了Hadoop自定义的数据类型,如Text和IntWritable,它们实现了WritableComparable接口,便于数据序列化和比较。Map函数解析输入文本,生成&lt;单词, 1&gt;的键值对,reduce函数则将所有相同单词的计数...

    工信部Spark初级考前辅导.pdf

    Spark是大数据处理领域的一个重要框架,由马泰·扎哈里亚在2009年创建,并最终归入Apache软件基金会。它以其高性能、易用性和广泛的功能集而闻名,旨在简化大规模数据处理的任务。Spark提供了一个高级编程接口,允许...

Global site tag (gtag.js) - Google Analytics