`

定时JOB,去请求数据,并找出最新数据持久化

 
阅读更多

JOB:

 

/**
 * 同步警情json 到t_alarm。如果服务器down掉几天,会遗漏期间的数据
 * @author wj
 * @date 2016-12-30
 *
 */
public class AlarmJob implements Job {
	
	
	
	public static AtomicReference<Date>  maxBjsj  ;
	
	static{
		if(CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key) == null){
			maxBjsj = new  AtomicReference<Date>();
		}else{
			maxBjsj = (AtomicReference<Date>)CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key);
		}
	}
	
	public static int count = 0;
	
	//可以缓存到内存
//	@Cacheable(value="sysCache" )
	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {

		String url = Contants.alarm_url+DateUtils.getDate("yyyyMMdd");
		
		try {
			 long t1 = System.currentTimeMillis(); // 排序前取得当前时间  
			 
			if(new HttpClient().get(url,maxBjsj,Contants.TYPE_JQ)){
				
				 long t2 = System.currentTimeMillis(); // 排序后取得当前时间  
				  Calendar c = Calendar.getInstance();  
			        c.setTimeInMillis(t2 - t1);  
			        System.out.println("----第"+ ++count +"次 同步结束----"+"耗时: " + c.get(Calendar.MINUTE) + "分 "  
		                + c.get(Calendar.SECOND) + "秒 " + c.get(Calendar.MILLISECOND)  
		                + " 毫秒");
			}
//		
		} catch (Exception e) {
			e.printStackTrace();
		}
	
	}

	
}

 httpclient:

public boolean get(String url ,AtomicReference<Date> maxBjsj,String type)  throws Exception{
		CloseableHttpClient httpclient = HttpClients.createDefault();
		try {
			// 创建httpget.  
			HttpGet httpget = new HttpGet(url);
//			httpget.setParams(params);
			
			System.out.println("\n-----------------------");
			logger.debug("executing request " + httpget.getURI());
			// 执行get请求.  
			CloseableHttpResponse response = httpclient.execute(httpget);
			try {
				// 获取响应实体  
				HttpEntity entity = response.getEntity();
				
				// 打印响应状态  
				logger.info(response.getStatusLine().toString());	
				
				if(200==(response.getStatusLine().getStatusCode())){
					if (entity != null) {
						String s = EntityUtils.toString(entity, "UTF-8");
						// 打印响应内容长度  
						logger.debug("Response content length: " + entity.getContentLength());
						// 打印响应内容  
						logger.debug("Response content: " + s);
						
						Map<String,List<Map<String,Object> >> map = FastJsonUtil.stringToCollect(s);
						List<Map<String,Object> > data = map.get("data");
						
						data = getCurrentDateData(data, type);
						
						if( maxBjsj.get()==null ){ //只会执行第一次,如果缓存里没有值
							
//							maxBjsj =  new AtomicReference<Date>(getBJSJDate(data.get(0)))   ;
							maxBjsj.set(getBJSJDate(data.get(0),type)); 
						}else{
							data = filterDateGt(maxBjsj.get(), data,type);
						}
						
						
						List<Map<String,Object> > listWithoutDup = data;
						
						String jobName = "";
						String cacheName = "";
						String cacheKeyName = "";
						if(Contants.TYPE_JQ.equals(type)){
							//去重 ,获取下一次最大日期
							listWithoutDup =  withoutDuplicate(data, "JQBH",maxBjsj,type);  //警情去重
							jobName="警情";
							cacheName=Contants.jq_maxDateCache;
							cacheKeyName=Contants.jq_maxDate_key;
						}else if(Contants.TYPE_DPZD.equals(type)){  //没有主键,无法去重
							setMaxDate(data, maxBjsj,type);
							jobName="调派中队";
							cacheName=Contants.dpzd_maxDateCache;
							cacheKeyName=Contants.dpzd_maxDate_key;
						}else if(Contants.TYPE_DPCL.equals(type)){
							setMaxDate(data, maxBjsj,type);
							jobName="调派车辆";
							cacheName=Contants.dpcl_maxDateCache;
							cacheKeyName=Contants.dpcl_maxDate_key;
						}else{
							throw new RuntimeException("不应该到这");
						}
						
					
						
						boolean ret = true;
						if(!CollectionUtils.isEmpty(listWithoutDup)){
							for(Map<String,Object> cur: listWithoutDup){
								
								try{
									
									BeanMapper.setAndPersist(cur,type); //如果这里报错,因为已经在上面有了最大日期,下一次JOB、数据就为空除非有新数据
									
								}catch (org.springframework.dao.DuplicateKeyException e){ // 过滤掉之前日期,不应该到这
//									maxBjsj.set(null);  //清空最大日期,让下一次JOB继续插入(下一次数据也许会变)
									logger.error(e.getMessage()+"--duplicate: "+(String)cur.get("JQBH"));
									ret = false;
								}
								
							}
							if(ret)
							  logger.info("-------------【"+jobName+"】成功导入"+listWithoutDup.size() +"条数据-----------------------");
						}else{
							logger.info("-------------【"+jobName+"】没有最新数据-----------------------");
							ret=  false;
						}
						
						
						// 可以比较一下缓存里的日期和这里的最大日期。不如直接put;比较还得get再比较,然后再put
						//将最大日期缓存磁盘(用于防止server关闭)。如果没有关闭,最大日期值 依然在外面静态变量里
						CacheUtils.put(cacheName, cacheKeyName,maxBjsj);
						//看看有没有持久化成功
						org.springframework.util.Assert.isTrue(comparDate(((AtomicReference<Date>)CacheUtils.get(cacheName, cacheKeyName)).get(), maxBjsj.get())==0   ,"缓存日期和刚才日应该相等");
						
						CacheUtils.flush(cacheName);
						
						
						return ret;
						
						
					}
				}else {
					logger.warn("------请求["+url+"]错误:"+response.getStatusLine().toString());//   HTTP/1.1 404 Not Found
					return false;
					
				}
				
			} finally {
				response.close();
				
			}
		} catch (ClientProtocolException e) {
			e.printStackTrace();
		} catch (ParseException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}catch (SQLIntegrityConstraintViolationException e) {
			logger.error("--------------主键冲突异常:"+e.getMessage()+"-----------------------");
		} finally {
			// 关闭连接,释放资源  
			try {
				httpclient.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return false;
		
	}

 其他私有辅助方法:

/**
	 * 去重,如果重复,只保留第一个。并获得最大日期
	 * @author wj
	 * @param list
	 * @param key 要去重的map的key
	 * @return
	 */
	private List<Map<String,Object>>  withoutDuplicate(List<Map<String,Object>> list,String key,AtomicReference<Date> maxBjsj,String type){
		
			if(!CollectionUtils.isEmpty(list)){
				 List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			        Set<Object> keysSet = new HashSet<Object>();
			        for(Map<String,Object> map : list){
			            Object keys = map.get(key);
			            int beforeSize = keysSet.size();
			            keysSet.add(keys);
			            int afterSize = keysSet.size();
			            if(afterSize == beforeSize + 1){
			                tmpList.add(map);
			                
			                Date date =  getBJSJDate(map,type);
			                if(comparDate(date, maxBjsj.get())==1){
//			                	if(date>maxBjsj){
//			                	maxBjsj =  new AtomicReference<Date>(date)    ;
			                	maxBjsj.set(date);    
			                }
			                
			            }
			        }
			        return tmpList;
				
			}
			return null;
		  
	}
	private void  setMaxDate(List<Map<String,Object>> list,AtomicReference<Date> maxBjsj,String type){
		
		if(!CollectionUtils.isEmpty(list)){
			for(Map<String,Object> map : list){
					Date date =  getBJSJDate(map,type);
					if(comparDate(date, maxBjsj.get())==1){
						maxBjsj.set(date);    
					}
					
			}
			
		}
		
	}
	/**
	 * 查询日期大于{@code date} 的数据
	 * @param date
	 * @param list
	 * @return
	 */
	private List<Map<String,Object>>  filterDateGt(Date date ,List<Map<String,Object>> list,String type){
		
		if(!CollectionUtils.isEmpty(list)){
			List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			for(Map<String,Object> map : list){
				Date date1 = getBJSJDate(map,type);
				if(comparDate(date1, date) == 1){
					tmpList.add(map);
				}
			}
			return tmpList;
			
		}
		return null;
		
	}
	
	private Date getBJSJDate(Map<String,Object> map,String type){
		String BJSJ ="";
		if(Contants.TYPE_JQ.equals(type)){
			 BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
		}else if(Contants.TYPE_DPZD.equals(type)){
			 BJSJ = (String)map.get("DPSJ");
		}else if(Contants.TYPE_DPCL.equals(type)){//
			 BJSJ = (String)map.get("DPSJ");
		}else{
			throw new RuntimeException("不应该到这");
		}
		
		
		
		try {
			Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS");//2016/12/29 13:56:22.000
			return BJSJDate;
		} catch (java.text.ParseException e) {
			e.printStackTrace();
		} 
		return null;
	}
	
	public static int comparDate(Date DATE1, Date DATE2) {
        try {
        	if(DATE1 ==null && DATE2==null){
        		return 0;
        	}else if(DATE1 ==null ||  DATE2==null){
        		throw new Exception();
        	}
            if (DATE1.getTime() > DATE2.getTime()) {
//                System.out.println("dt1 在dt2前");
                return 1;
            } else if (DATE1.getTime() < DATE2.getTime()) {
//                System.out.println("dt1在dt2后");
                return -1;
            } else {
                return 0;
            }
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        return 0;
    }
	
	
	public static List<Map<String,Object> > getCurrentDateData(List<Map<String,Object> >  list,String type) throws java.text.ParseException{
		
		if(!CollectionUtils.isEmpty(list)){
			List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>();
			for(Map<String,Object> map : list){
				
				if(Contants.TYPE_JQ.equals(type)){
					
					String BJSJ = (String)map.get("BJSJ");//报警时间  --是否对应到 ALARM_TIME
					Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					BJSJ = DateUtils.formatDate(BJSJDate);
					
					if(BJSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPZD.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else if(Contants.TYPE_DPCL.equals(type)){
					
					String DPSJ = (String)map.get("DPSJ");
					Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2017/01/01 08:44:31.000
					DPSJ = DateUtils.formatDate(DPSJDate);
					
					if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){
						tmpList.add(map);
					}
					
				}else{
					throw new RuntimeException("不应该到这");
				}
				
			}
			
			return tmpList;
		
		}
		return null;
		
	}

 用到ehcache缓存最大时间到磁盘,缓存可以看我另一篇博客

0
0
分享到:
评论
1 楼 faradayroger 2017-01-09  
[color=green][color=red][/color][/color]不错不错,没看懂

相关推荐

    quartz job持久化

    实现把job持久化数据库,里面有具体的说明文档 Quartz 1 1 通过quartz创建持久化定时执行任务 3 1.1 首先创建一个基本的web工程,所需jar包 3 1.2 Quartz配置文件quartz.properties 4 1.3 创建job 6 1.3.1 实现org....

    Quartz定时任务持久化数据表

    在Java应用中,Quartz常被用来实现后台任务的自动化,比如定时发送邮件、数据备份等。在分布式环境中,Quartz还支持集群,确保任务的高可用性。 Quartz的定时任务持久化是指将任务的相关信息(如触发器、作业详情)...

    python基于51job数据可视化图表展示源码.zip

    python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...

    爬取51job数据并做可视化分析(可视化大屏项目)

    该项目是关于利用Python进行网络爬虫,从51job网站获取相关数据,然后通过Echarts进行数据可视化,并最终构建一个基于Web的可视化大屏展示。在这个过程中,涉及到的关键技术包括Python爬虫、数据库管理(MySQL)、...

    Python爬取分析51Job数据并可视化岗位信息

    在本项目中,我们主要探讨如何使用Python编程语言来实现对51Job网站的网络爬虫,以获取相关的职位信息,包括工作名称、工作描述、公司名称以及薪资范围等,并进一步进行数据分析和可视化。以下是对整个过程的详细...

    oracle定时删除表空间的数据并释放表空间

    "Oracle定时删除表空间的数据并释放表空间" Oracle数据库定时删除表空间的数据并释放表空间是通过创建存储过程和定时任务来实现的。下面详细讲解该知识点: 一、创建存储过程 首先,需要创建一个存储过程来删除...

    springboot2.3集成quartz定时任务持久化数据库,支持集群

    在本文中,我们将深入探讨如何在Spring Boot 2.3版本中集成Quartz定时任务,并实现其持久化到数据库,以便支持集群环境。这个过程的关键在于配置Quartz Scheduler,设置数据库连接,以及确保任务在多节点环境中能够...

    定时调度器 xxl-job

    6. **应用场景**:xxl-job广泛应用于大数据处理、报表生成、数据同步、定时备份等多种定时任务场景。 总结来说,xxl-job是一个强大且灵活的分布式任务调度框架,它简化了定时任务的开发和管理,同时具备高可用、可...

    Oracle 定时删除数据 并释放空间

    Oracle 定时删除数据 并释放空间 ,创建存储过程并使用job完成。

    基于python的51job工作岗位数据分析与可视化-交互式数据可视化期末作业项目.zip

    数据源:基于51job招聘网站爬取的数据 51job招聘网 二、数据分析目标 全国各省的平均月薪情况、岗位分别情况 学历需求情况 工作经验与岗位数量相关情况 福利待遇情况 相关岗位描述 三、数据价值 1.有相关岗位需求的...

    oracle的job定时

    Oracle的Job定时功能是Oracle数据库系统提供的一种自动化任务调度机制,允许用户设定特定的时间点或时间间隔执行数据库操作。在项目开发中,特别是在大数据管理和分析的场景下,定时任务经常被用于更新数据、生成...

    spring的quartz定时任务相关(持久化任务)

    ### Spring中的Quartz定时任务与持久化管理 #### 一、Spring与Quartz简介 Spring框架作为Java领域中广泛使用的轻量级应用框架之一,它提供了丰富的功能支持,包括依赖注入、面向切面编程以及MVC框架等。而Quartz是...

    SSM整合quartzb并持久化到数据库实现动态增删改查

    下面将详细介绍如何将这三个组件与Quartz整合,并将定时任务的配置和状态持久化到数据库,实现动态的增删改查功能。 1. **Spring配置** 在Spring的配置文件中,我们需要引入Quartz的相关依赖。首先,定义一个`...

    爬取51job网站实现数据可视化实验报告书.doc

    《爬取51job网站实现数据可视化实验报告书》 本实验报告主要涉及计算机科学与技术领域的数据获取、预处理及可视化技术,适用于计算机、软件工程、通信工程等相关专业的大学生进行课程设计或毕业设计参考。实验的...

    quartz 持久化数据库表结构sql

    本篇将详细介绍Quartz如何实现数据库持久化,并提供各类型数据库的SQL创建语句。 Quartz的持久化机制依赖于一组特定的数据库表,这些表存储了Job(任务)和Trigger(触发器)的信息。以下是Quartz默认使用的数据库...

    Talend Job - Windows 版本定时任务 简单操作 希望能帮助各位同是初学者的人们

    在数据处理领域,定时任务常用于定期从源系统中抽取数据并将其加载到目标系统中,确保数据的实时性和准确性。 #### 三、准备工作 在开始配置定时任务之前,我们需要做一些基础性的准备工作: 1. **安装Talend ...

    quartz的持久化

    总的来说,理解并正确配置 Quartz 的持久化机制对于确保定时任务的可靠性和可维护性至关重要。开发者需要根据项目需求和现有的数据存储方案选择合适的 JobStore 实现,并确保正确地执行相关的初始化和维护操作。

    某招聘网站数据爬取,51job就业数据爬取数据可视化分析

    某招聘网站51job就业数据爬取数据据分析,pyecharts数据可视化 该资源是一份Python爬虫实战指南,内容涵盖数据采集、处理和分析的全过程。通过该资源,读者可以了解Python爬虫的基本原理、常用库和工具,学习如何...

    quarzt定时任务(包含动态切换数据源)

    因此,你可以在Job实例化时通过JobDataMap传递当前需要的数据源信息。 2. **数据源配置**:在Spring框架中,可以使用`@ConfigurationProperties`注解和`@Profile`来配置多个数据源,每个数据源对应不同的环境或用途...

Global site tag (gtag.js) - Google Analytics