`
ahua186186
  • 浏览: 561179 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

单线程锁数据单线程执行任务+单线程锁数据多线程执行任务+quartz

 
阅读更多
注:
这种锁数据的做法虽然比较稳定,但感觉有点弱智,性能也低,以后准备考虑生产数据的时候给数据分配一个机器号(随机算法即可),然后通过一个配置表配置机器号节点,消费数据的应用进程通过竞争机器号消费或处理数据相关逻辑即可。


一:单线程锁数据单线程执行任务(即当包工头和又当工人,即拉皮条又接客)



package xxx.thread;

import java.io.IOException;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import com.xxx.PropritesUtils;
import com.xx.IxxxBiz;

/**
 * <pre>
 * 发送数据
 * </pre>
 * 
 * @author jason
 * 
 */
public class  xxxThread extends Thread implements InitializingBean {

	private static final Logger logger = LoggerFactory
			.getLogger(xxxThread.class);

	private IxxxBiz xxxBiz;

	/**
	 * 是否 运行
	 */
	private boolean running;

	/**
	 * 每次加载数量,最大发送条数
	 */
	private int loadSize = 100;

	/**
	 * 最大发送次数
	 */
	private int maxSendCount = 3;

	/**
	 * 休眠时间
	 */
	private int sleepTime = 2000;

	/**
	 * 重置时间间隔
	 */
	private int resetInterval = 20000;

	/**
	 * 下一次重置时间
	 */
	private long nextResetTime;

	/**
	 * 重置多长时间之前的数据
	 */
	private int beforeTime = 30000;

	/**
	 * 初始化线程,读取配置文件
	 */
	private void initThread() {
		// 下一次重置时间
		nextResetTime = System.currentTimeMillis() + resetInterval;
	}

	@Override
	public void run() {

		// TODO Auto-generated method stub
		// 判断是否启动后台线程
		// 根据server.properties文件中配置的startup-model决定是否启动服务
		// ### 0 服务与后台线程都启动
		// ### 1 只启动服务
		// ### 2 只启动后台线程
		String startup_model = "0";
		try {
			startup_model = PropritesUtils.getValue("startup-model");
		} catch (IOException e1) {
			logger.error("读取配置文件错误!", e1);
		}
		if ("1".equals(startup_model)) {
			logger.error("\n### 不启动线程");
			return;
		}
				
		logger.info("xxxThread 线程启动...");
		this.setName("xxxThread");
		// 初始化参数
		initThread();

		while (running) {

			// 休眠
			sleepTask(0);

			try {
				execute();
			} catch (Exception exception) {
				logger.error("执行任务异常", exception);
			}

		}

		logger.info("xxxThread 线程停止...");

	}

	public void execute() {
		while (running) {

			int size = 0;
			try {
				size = xxxBiz.queryData(loadSize);
			} catch (Exception exception) {
				logger.error("xxxThread error========", exception);
			}

			// 休眠
			sleepTask(size);

			// 重置状态
			resetStatus();
		}
	}

	private void sleepTask(int i) {
		// 如果i >= loadSize,则休眠时间为最短
		// 默认有数据的时候休眠1秒,没数据的时候休眠2秒
		int time = sleepTime;
		if (i >= loadSize) {
			time = sleepTime / 2;
		}

		try {
			sleep(time);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	/**
	 * 重置状态
	 */
	private void resetStatus() {

		if (System.currentTimeMillis() > nextResetTime) {

			nextResetTime = System.currentTimeMillis() + resetInterval;

			long btime = System.currentTimeMillis() - beforeTime;
			Date bdate = new Date(btime);

			xxxBiz.resetStatus();

		}
	}

	public void afterPropertiesSet() throws Exception {
		// 启动线程
		this.running = true;
		this.start();
	}

	// 注意对象销毁,停止线程
	public void close() {
		this.running = false;
	}

	public IxxxBiz getxxxBiz() {
		return pcustomLaxDataBiz;
	}

	public void setxxxBiz(IxxxBiz xxxBiz) {
		this.xxxBiz = xxxBiz;
	}

	public boolean isRunning() {
		return running;
	}

	public void setRunning(boolean running) {
		this.running = running;
	}

	public int getLoadSize() {
		return loadSize;
	}

	public void setLoadSize(int loadSize) {
		this.loadSize = loadSize;
	}

	public int getMaxSendCount() {
		return maxSendCount;
	}

	public void setMaxSendCount(int maxSendCount) {
		this.maxSendCount = maxSendCount;
	}

	public int getSleepTime() {
		return sleepTime;
	}

	public void setSleepTime(int sleepTime) {
		this.sleepTime = sleepTime;
	}

	public int getResetInterval() {
		return resetInterval;
	}

	public void setResetInterval(int resetInterval) {
		this.resetInterval = resetInterval;
	}

	public long getNextResetTime() {
		return nextResetTime;
	}

	public void setNextResetTime(long nextResetTime) {
		this.nextResetTime = nextResetTime;
	}

	public int getBeforeTime() {
		return beforeTime;
	}

	public void setBeforeTime(int beforeTime) {
		this.beforeTime = beforeTime;
	}

}



BIZ接口:
public interface IxxxBiz {

	/**
	 * 查询数据
	 * 
	 * @param size
	 *            最大条数
	 * 
	 * @return 
	 */
	public int queryData(int size);

	/**
	 * 重置状态
	 */
	public void resetStatus();
}



BIZ实现:

package com.xxx.biz;

import java.util.Date;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 *  biz
 * 
 * @author jason
 * 
 */
public class xxxBiz implements IxxxBiz {

	private static final Logger logger = LoggerFactory
			.getLogger(xxxBiz.class);

	private IxxxDao xxxDao;


	public IxxxDao getxxxDao() {
		return xxxDao;
	}

	public void setxxxDao(
			IxxxDao xxxDao) {
		this.xxxDao = xxxDao;
	}

	public int queryData(int size) {
		List<xxx> list = xxxDao.queryAndLockT(size);
		int reSize = 0;
		if (list == null) {
			return reSize;
		}
		for (xxx  queue : list) {
			doTask();

		}
		return reSize;
	}

	public void resetStatus() {
		xxxDao.reset();
	}
	

}



DAO锁数据,重置数据,更新状态等:
package com.xxx.dao;

import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;

import org.hibernate.HibernateException;
import org.hibernate.SQLQuery;
import org.hibernate.Session;
import org.springframework.orm.hibernate3.HibernateCallback;

/**
 * 描述:xxx
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID   DATE            PERSON         REASON
 *  1    2015-11-09        jason         Create
 * ****************************************************************************
 * </pre>
 * 
 * @author jason
 * @since 1.0
 */
public class xxxDao extends BaseDao
		implements IxxxDao {

	private static String localIP = ManagementFactory.getRuntimeMXBean()
			.getName();
	@Override
	public List<xxxQueue> queryAndLockT(final int size) {
		final String sql = "select * from xxx_queue where "
				+ " ifsend ='F' and rownum < ? for update";

		@SuppressWarnings("unchecked")
		List<xxxQueue> list = getHibernateTemplate().executeFind(
				new HibernateCallback() {

					public Object doInHibernate(Session session)
							throws HibernateException, SQLException {

						SQLQuery query = session.createSQLQuery(sql);
						query.setInteger(0, size);
						query.addEntity(xxxQueue.class);
						return query.list();
					}
				});

		if (list == null || list.isEmpty()) {
			return null;
		}

		for (xxxQueue queue : list) {

			queue.setIfsend("I");;// 锁定
			queue.setLockBy(localIP);// 标志
			queue.setLockTime(new Date());// 时间

			getHibernateTemplate().update(queue);

		}

		return list;
	}
	
	@SuppressWarnings("unchecked")
	@Override
	public int reset() {
		//重置逻辑:重置数据生成时间在1天内,10分钟之前被锁定过的数据,直接写死时间,暂时不做系统配置。
		//设计解释:ifsend字段是用来锁数据的,RESULT_FLAG字段是用来重置数据的,
		//分2个字段解决数据重发问题,这里可能有点冗余,主要是考虑以后数据量很多时,方便改成乐观锁,多线程发送,不需要增加字段。
		final String sql = "update xxx_queue set ifsend= 'F' "
				+ " where (RESULT_FLAG ='2' or ifsend='I') "
				+ " and PRODUCTION_TIME > sysdate - interval '1' day "
				+ " and LOCK_TIME < sysdate - interval '600' second";

		return (Integer) getHibernateTemplate().execute(
				new HibernateCallback() {

					public Object doInHibernate(Session session)
							throws HibernateException, SQLException {
						SQLQuery query = session.createSQLQuery(sql);

						return query.executeUpdate();
					}
				});
	}
	@Override
	public void statusUpdate(xxxQueue queue, boolean success) {
		if(success){//更新状态,目标是保证发送失败的数据可被重置
			queue.setConsumeTime(new Date());
			queue.setConsumeFlag("1");
			queue.setResultFlag("1");
			queue.setIfsend("S");
		}else{
			queue.setResultFlag("2");
		}
		getHibernateTemplate().update(queue);
	}


	@Override
	public void saveData(List<xxxQueue> datas) {
		getHibernateTemplate().saveOrUpdateAll(datas);
	}

	
	

}



二:单线程锁数据多线程执行任务( 包工头锁数据工人执行任务)

思路1:直接用quartz(Scheduler调度线程和任务执行线程,默认1:10)

quartz线程模型理论:

在quartz里,有一个scheduler的properties文件,用于配置quartz框架运行时环境,其中有如下配置:


#============================================================================
# Configure ThreadPool
#============================================================================

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=20
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.makeThreadsDaemons=true

如果threadCount>1,才能保证两个或多个job能并发执行,这说明threadCount确实是并发执行线程数。

思路2:多线程执行结果不需要合并场景:借鉴netty IO线程设计,直接把数据放入队列,然后异步执行即可。

思路3:多线程执行结果需要合并场景:用CountDownLatch控制任务是否执行完,增加子线程:TaskExecutor(持有CountDownLatch对象)





CountDownLatch cdl = new CountDownLatch(data.size());

如:

package com.xxx.thread;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public  class TaskExecutor extends Thread
{
  protected static Logger logger = LoggerFactory.getLogger(TaskExecutor.class);
  private List<File> taskList;
  private File bakDir;
  private File errDir;
  private CountDownLatch cdl;

  public List<File> getTaskList()
  {
    return this.taskList;
  }

  public void setTaskList(List<File> paramList)
  {
    this.taskList = paramList;
  }

  public void run()
  {
    try
    {
      execute();
    }
    catch (Exception localException)
    {
      logger.error("TaskExecutor-run", localException);
    }finally{
    	cdl.countDown();
    }
  }



  protected  void execute(){
	  List<File>  fileList = getTaskList();
	  if(fileList!=null){
		  for(File file : fileList){
			try{
				long lastMofifyTime = file.lastModified();
				long diffTime =  System.currentTimeMillis() - lastMofifyTime;
				if(diffTime > 10000){	//	获取10秒之前生成的文件。
					Map<String, String> map = getMessageType(file);
					String businessType = map.get("type").toString().trim().toUpperCase();
					String xmlStr = map.get("xml").toString();
					if("aa".equals(businessType)){
						doxxx1(xmlStr, bakDir, errDir);
					}else if("bb".equals(businessType)){
						doxxx2(xmlStr, bakDir, errDir);	
					}else if("cc".equals(businessType)){
						doxxx3(xmlStr, bakDir, errDir);
					}else{//非法报文
						moveFileToErrorPath(file, errDir);
					}
					moveFileToBackUpPath(file, bakDir);
				}
			}catch (Exception e) {
				logger.error("========="+file.getName()+":{}.",StackTraceUtil.getStackTrace(e));
				moveFileToErrorPath(file, errDir);//可以考虑不处理,让其一直等待,防止网络断开等原因写回执失败
			}
			
		  }
		}else{
			logger.info("=========fileList is null, cdl.countDown()==================cdl count:" +cdl.getCount());
		}
  }
  
/**
* <pre>
* 获取xml文件的message_type
* </pre>
* 
* @param xmlFile
*            xml文件
* @return 返回 message_type
* @throws FileNotFoundException 
* @throws DocumentException
*/
public  Map<String, String> getMessageType(File xmlFile) throws FileNotFoundException {
	Map<String, String> retMap = new HashMap<String, String>();
	SAXReader saxReader = new SAXReader();
	Document document = null;
	Element msgType =null;
	FileInputStream fis = null;
	String xmlStr ="";
	try {
		fis = new FileInputStream(xmlFile.getPath());
		//使用SAXReader的read(File file)方法时,如果xml文件异常会导致文件被服务器占用不能移动文件,建议不使用read(File file)方法而使用read(FileInputStream fis)等流的方式读取文件,异常时关闭流,这样就不会造成流未关闭,文件被锁的现象了
		document = saxReader.read(fis);
		Element root = document.getRootElement();
		msgType = root.element("head").element("businessType");
		xmlStr = document.asXML();
	} catch (DocumentException e) {
		logger.error("getMessage_type error{}.", StackTraceUtil.getStackTrace(e));
	}finally{
      try{
          if(null != fis){
              fis.close();
              Thread.sleep(10L);
          }
      }catch(Exception e){
      	logger.error("getMessage_type error{}.", StackTraceUtil.getStackTrace(e));
      }
  }
	retMap.put("type", msgType == null ? "" : msgType.getText());
	retMap.put("xml", xmlStr);
	return retMap;
}

/**
 * <pre>
 * 移动文件到错误备份目录,并重命名
 * </pre>
 * 
 * @param xmlFile
 *            需要移动的文件
 * @param errDir
 *            错误备份目录
 * @return 是否成功
 */
private boolean moveFileToErrorPath(File xmlFile, File errDir) {
	errDir.mkdir();// 创建目录

	// 重命名文件
	StringBuffer fName = new StringBuffer(
			DateUtils.getCurrDateStr(DateUtils.HMS));
	fName.append("_");
	fName.append(xmlFile.getName());

	boolean b = xmlFile.renameTo(new File(errDir, fName.toString()));
	if (!b) {
		// 避免文件重名的情况
		fName = new StringBuffer();
		// 用UUID 重命名
		fName.append(UUID.randomUUID().toString());
		fName.append("_");
		fName.append(xmlFile.getName());
		return xmlFile.renameTo(new File(errDir, fName.toString()));
	}

	return b;
}

/**
 * <pre>
 * 移动文件到备份目录,并重名
 * </pre>
 * 
 * @param xmlFile
 *            需要备份的文件
 * @param bakDir
 *            备份目录
 * @return 是否成功
 */
private boolean moveFileToBackUpPath(File xmlFile, File bakDir) {
	bakDir.mkdir();// 创建目录

	// 重命名文件
	StringBuffer fName = new StringBuffer(
			DateUtils.getCurrDateStr(DateUtils.HMS));
	fName.append("_");
	fName.append(xmlFile.getName());

	boolean b = xmlFile.renameTo(new File(bakDir, fName.toString()));
	if (!b) {
		// 避免文件重名的情况
		fName = new StringBuffer();
		// 用UUID 重命名
		fName.append(UUID.randomUUID().toString());
		fName.append("_");
		fName.append(xmlFile.getName());
		return xmlFile.renameTo(new File(bakDir, fName.toString()));
	}

	return b;
}

public File getBakDir() {
	return bakDir;
}

public void setBakDir(File bakDir) {
	this.bakDir = bakDir;
}

public File getErrDir() {
	return errDir;
}

public void setErrDir(File errDir) {
	this.errDir = errDir;
}

public CountDownLatch getCdl() {
	return cdl;
}

public void setCdl(CountDownLatch cdl) {
	this.cdl = cdl;
}


}


三:扩展思路

集成:同样是 包工头 +  工人 模式
(1)单个应用程序(单线程锁数据--任务调度方) + 多个应用程序(多线程执行任务--任务执行器)
(2)单个应用程序(多线程锁数据--任务调度方) + 多个应用程序(多线程执行任务--任务执行器)
(3) 这种锁数据的做法感觉有点弱智,性能也低,可考虑生产数据的时候给数据分配一个机器号(随机算法即可),然后通过一个配置表配置机器号节点,消费数据的应用通过竞争机器号消费或处理数据即可。

四:下一步准备写个轻量级的分布式并行计算框架用来提升大批量任务的处理速度。
分享到:
评论

相关推荐

    Quartz多线程示例.rar

    总之,"Quartz多线程示例"是一个很好的学习资源,它教你如何利用Quartz的多线程能力来并发执行任务,以及如何根据需求定制调度策略。通过这个示例,你可以深入理解Quartz的内部工作原理,提升你在Java应用中处理定时...

    java定时执行多任务和quartz定时执行多任务

    Java定时执行多任务是软件开发中的常见需求,用于在特定时间点或按固定频率执行某项操作,例如数据同步、日志清理等。Java提供了一些内置的定时工具,如`java.util.Timer`和`java.util.concurrent....

    完美解决多应用服务器负载均衡环境下spring quartz同一定时任务重复执行问题

    在多应用服务器负载均衡环境下,Spring Quartz定时任务的重复执行问题是一个常见的挑战。Spring Quartz是一个强大的、开源的作业调度框架,允许开发者定义和执行复杂的定时任务。然而,当多个服务器实例并行运行时,...

    C# Quartz.Net定时任务操作明细、完整过程

    **执行多个任务**:只需要为每个任务创建不同的`JobDetail`和`Trigger`,并确保它们的名称不重复。然后将它们添加到同一个调度器,调用`Start()`即可。注意,所有任务调度器创建完成后,只需要调用一次`Start()`。 ...

    Java 项目中使用单线程 实现 Quartz-2.2.1 触发优先级 源码下载

    该示例将执行以下操作: 用一个工作线程创建一个调度程序; 安排三个不同优先级的触发器,第一次同时触发,第二次以错开的时间间隔触发; 启动Quartz Scheduler; 等待30秒让 Job 有机会触发触发器; 关闭调度...

    定时任务quartz实现分组串行并行动态配置

    4. **并行执行**:默认情况下,Quartz允许多个Job并行执行,只要它们不在同一组或满足并行执行的条件。如果需要限制同一组内的并行执行,可以通过设置Scheduler的属性`org.quartz.threadPool.class`为`org.quartz....

    定时任务+缓存+极光消息推送+多线程处理+单表查询优化

    使用定时任务框架如Quartz或Spring Boot的定时任务功能,可以灵活设置任务执行周期,确保任务在指定时间准确无误地执行。 2. **缓存技术**:缓存是提高系统响应速度的关键。常见的缓存解决方案有Redis和Memcached,...

    Quartz定时任务框架

    Quartz的主要功能在于允许开发者安排任务在特定的时间点或按照预定的周期执行。这在很多场景下都非常有用,例如数据备份、日志清理、发送通知等。Quartz的核心组件包括Scheduler、Job、Trigger和Calendar。 1. ...

    spring + Quartz 定时任务

    在IT行业中,定时任务是许多系统不可或缺的一部分,用于执行周期性的任务,如数据备份、清理、统计等。Spring框架和Quartz库结合使用,能够构建出强大且灵活的定时任务解决方案。下面我们将深入探讨这两个技术及其...

    Spring 框架自带定时任务和Quartz定时任务

    使用这种方式,程序可以按照一定的频率执行,但这种方式是单线程的,所以不适合执行复杂的、高频率的定时任务。 接着是Spring框架自带的定时任务工具Spring Task。Spring Task是在Spring 3.0之后引入的,它使用起来...

    任务调度系统基于Quartz.net

    它支持多线程执行,可以同时处理多个作业,也可以根据需要调整线程池大小。此外,Quartz.NET还允许你暂停、恢复或重新调度作业,以适应业务需求的变化。 总之,Quartz.NET为.NET开发者提供了一个强大的工具,帮助...

    Quartz-2.2.1 任务调度框架在 Java 项目中的实例 Demo

    Quartz-2.2.1 任务调度框架在Java项目中的使用实例 Demo 在这个小Demo 中使用了Java 类的反射机制,通用的项目实例,高度抽象的实例。 在业务需求不是很复杂的情况下,完全可以减少因为使用Quartz任务调度框架的代码...

    quartz单机和WEB应用

    在Web应用程序中集成Quartz,可以实现后台任务的自动化处理,比如数据同步、报表生成、邮件发送等。这篇博客“quartz单机和WEB应用”可能是探讨如何在单机环境下配置和使用Quartz,以及如何在Web应用中集成Quartz来...

    spring2.0 job Quartz 和Time一样的定时任务

    3. 执行环境:`Timer`在单线程环境下表现良好,但在多线程环境中可能存在问题;而Spring和Quartz设计时考虑了多线程和并发,更适合企业级应用。 4. 故障恢复:Quartz具有故障恢复和集群支持,如果任务执行失败,它...

    Quartz任务调度快速入门

    Quartz是一个开源的作业调度框架,它允许开发人员在Java应用程序中定义和执行定时任务。Quartz的核心概念包括调度器(Scheduler)、任务(Job)和触发器(Trigger),以及JobDetail、Calendar和ThreadPool等关键组件。 1....

    Quartz定时任务详解

    * Quartz 中有一个调度线程 QuartzSchedulerThread,调度线程可以找到将要被触发的 trigger 和 job,然后在 ThreadPool 中获取一个线程来执行这个 job。 * JobStore 主要作用是存放 job 和 trigger 的信息。 快速...

    asp.net定时任务(定时器)

    ASP.NET定时任务是Web开发中常见的一种功能,用于在特定时间间隔执行某些操作,例如数据同步、清理缓存、发送通知等。在这个场景下,我们讨论的是如何在ASP.NET环境中设置一个定时器,使得页面加载或IIS启动后,该...

    Java任务调度框架Quartz1.8.6教程实例源代码

    Java任务调度框架Quartz是Java开发中用于自动化任务执行的重要工具,特别适用于企业级应用和服务。Quartz 1.8.6是该框架的一个稳定版本,提供了强大的定时任务管理和执行能力。下面我们将深入探讨Quartz的基本概念、...

    quartz任务调度

    20多个Word文档可能包含Quartz的详细教程、配置示例、Cron表达式的解析、多线程和并发控制、集群配置等内容。深入学习这些文档,你将能够熟练掌握Quartz的使用,为你的项目构建高效稳定的任务调度系统。 总的来说,...

    spring quartz 非配置动态定时

    在实现动态定时任务时,还需要考虑线程安全和任务管理的问题。例如,可能需要一个JobStore来存储Job和Trigger的信息,以便在应用重启后仍能恢复任务。Quartz提供了多种JobStore实现,如RAMJobStore(仅内存)、 ...

Global site tag (gtag.js) - Google Analytics