`
juforg
  • 浏览: 45643 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

java jdk 自带多线程 线程池 框架 ThreadPoolExecutor

 
阅读更多

1.java.util.concurrent & ThreadPoolExecutor 相关概念 及参考

  http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html

  http://heipark.iteye.com/blog/1156011

  http://dongxuan.iteye.com/blog/901689

  http://blog.csdn.net/wangwenhui11/article/details/6760474

2.

public final class ThreadExecutorFactory {
	
	static TraceLogger logger = new TraceLogger(ThreadExecutorFactory.class);
	//参看user-config.xml
	private static int corePoolSize ;
	private static int maximumPoolSize ;
	private static long keepAliveTime ;
	
	private static TimeUnit unit = TimeUnit.SECONDS; //线程池维护线程所允许的空闲时间的单位

	private static BlockingQueue<Runnable> blockingQueue = null;
	static{
		init();
	}
	private static void init() {
		//读取配置,也可以写死
		corePoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "core_pool_size"));
		maximumPoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "maximum_pool_size"));
		keepAliveTime = Long.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "keep_alive_time"));
		String queueSize = ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "queueSize");
		
		if (!StringUtil.isBlank(queueSize)) {
			blockingQueue = new ArrayBlockingQueue<Runnable>(Integer.valueOf(queueSize));
		}
		if (FactoryHolder.threadPool == null) {
			FactoryHolder.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime ,unit ,blockingQueue ,new ThreadPoolExecutor.AbortPolicy());
		}
	}
	
	
	private static class FactoryHolder{
		static ThreadPoolExecutor threadPool = null;
	}
	
	public static void execute(Runnable runnable){
		FactoryHolder.threadPool.execute(runnable);
		logger.info(runnable + " 加入线程池!");
		System.out.println(runnable + " 加入线程池!");
	}
}

 简单监听器的运用,首先产生线程,将要监听的对象放入map中, 循环Map,当map中的元素都移出后,结束线程, 再次将要监听的对象放入map时,又会产生新的线程监听,如果在此线程没结束时 加入一个新的监听对象到map 不会启动新线程。

public class BillTaskStatusListener implements Runnable ,Serializable{
	private static final long serialVersionUID = -6416044623618376341L;
	static TraceLogger logger = new TraceLogger(BillTaskStatusListener.class);
	
	private static final String RUN_STATUS_SUCCESS = "Y";	//成功
	private static final String RUN_STATUS_FAILED = "N";	//失败
	private static final String RUN_STATUS_RUNNING = "R"; //执行中
	//线程安全集合,用于存储待查询结果的任务实例Id

	private static Map<String,DataObject> billTaskMaps = new ConcurrentHashMap<String, DataObject>();
		
	/**
	 * 同步添加到监听循环
	 * @param taskInstId
	 */
	private synchronized static void addToListener(Long taskInstId,DataObject paramObj){
		if (billTaskMaps.isEmpty()) {
			billTaskMaps.put(String.valueOf(taskInstId), paramObj);
			ThreadExecutorFactory.execute(new BillTaskStatusListener());
		}else {
			billTaskMaps.put(String.valueOf(taskInstId), paramObj);
		}
		logger.info("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>");
		System.out.println("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>");
	}
	
	/**
	 * 
	 */
	public void run() {
		
		while (!billTaskMaps.isEmpty()) {
			
			try {
				//获取所有监听的任务实例对象
				DataObject[] taskInstancesArray = taskInstanceService.getTaskExcuteResultByTaskInstanceId(getInstanceIds(billTaskMaps));
				
				if (null != taskInstancesArray && taskInstancesArray.length>0) {
					
					for (DataObject object : taskInstancesArray) {
						DataObject paramObject = billTaskMaps.get(object.getString("taskInstanceId"));
						
						//查询对应作业实例
						DataObject tmpTaskDO = queryBillItemByItemId(paramObject.getString("billItemId"));
						if (null ==tmpTaskDO) {
							continue;
						}
						//任务正常完成
						if (6 == object.getInt("exeState")) {
							//将任务结果插入作业表
							updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO , RUN_STATUS_SUCCESS);
							
							//移除当前监听的任务实例
							billTaskMaps.remove(object.getString("taskInstanceId"));
							
							
							logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							//任务未正常结束 重启
						}else if (4 <= object.getInt("exeState") ) {
							
							
								
								//任务终止
						}else if ("1".equals(paramObject.get("exceptionProcessMode"))) {
								//将任务结果插入作业表
								updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO,RUN_STATUS_FAILED);
								billTaskMaps.remove(object.getString("taskInstanceId"));
								logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
								System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>");
							
						}
					}
					
				}
				Thread.sleep(5000);
				//System.out.println("监听作业-任务 sleep 5s!");
			} catch (Exception e) {
				logger.info("监听作业-任务 执行结果出错!");
				billTaskMaps.clear();
				manager.rollback();
				e.printStackTrace();
			}
		}

	}
	
	
}

 

 

在其他地方只需调用BillTaskStatusListener的静态方法addToListener(Long taskInstId,DataObject paramObj),就可以了

注意:改类不能够拿过去直接用,需根据自己的时间情况修改,像读配置文件,读监听状态等等。。。

分享到:
评论

相关推荐

    jdk自带线程池实例详解

    jdk自带的线程池是Java开发中一个非常重要的概念,特别是在多线程编程中。线程池是线程的容器,每次只执行额定数量的线程,线程池就是用来管理这些额定数量的线程。下面我们来详细了解jdk自带的线程池。 一、jdk...

    java自带并发框架

    Java并发框架是Java JDK中内置的一系列用于处理多线程并行执行的工具和类库,自JDK 5.0引入以来,极大地简化了并发编程的复杂性。这个框架由Doug Lea设计,并在JSR-166任务中提出,最终在Tiger(JDK 5)版本中被引入...

    java 监控线程

    在Java中,多线程编程允许程序同时执行多个任务,提高程序的执行效率。然而,多线程也带来了同步问题,如竞态条件、死锁等,因此,对线程的监控显得至关重要。 二、JVM内置的线程监控 1. **JConsole**: JConsole是...

    Java 高并发六:JDK并发包2详解

    - `newSingleThreadExecutor()`: 创建单线程线程池,所有任务都在一个线程中按顺序执行,保证了任务执行的顺序性。 - `newCachedThreadPool()`: 创建缓存线程池,线程数量可变,任务多时会创建新线程,空闲60秒后...

    《Java并发编程实战》的高清完整PDF版

    Java平台提供了强大的并发工具和框架,使得开发者能够有效地利用多线程和分布式计算,提高系统的性能和响应速度。 本书主要涵盖以下几个关键知识点: 1. **线程基础**:首先,你需要了解线程的基本概念,包括如何...

    java并发编程实践中文版

    JDK自带的JConsole、VisualVM等工具可以帮助我们分析线程状态、内存使用情况以及CPU负载等,以便找出性能瓶颈并进行优化。 通过阅读《Java并发编程实践》,开发者不仅可以深入理解Java并发编程的原理,还能掌握在...

    Java并发编程实践.zip

    8. **并发性能分析**:使用JDK自带的`jconsole`、`jvisualvm`等工具,或者第三方工具如`VisualVM`,可以对并发程序进行监控和分析,找出性能瓶颈和线程安全问题。 9. **并发编程模型**:包括生产者消费者模型、读者...

    《Java程序设计(人邮,朱喜福)》电子讲稿html

    - 性能调优:通过JDK自带工具进行性能监控和调整。 这本教材覆盖了Java编程的基础到高级内容,通过实例和习题帮助读者巩固理论知识并提升实践能力,对于想要深入学习Java的开发者来说是一份宝贵的资源。通过阅读和...

    [中文]Java并发编程的艺术

    10. **性能调优与监控**:最后,书中可能会涵盖如何分析和优化并发程序的性能,以及如何使用JDK自带的监控工具(如`jconsole`、`VisualVM`等)进行线程监控和诊断。 通过阅读《Java并发编程的艺术》,开发者不仅能...

    Java 208道面试.docx

    面试时,面试官通常会从多个方面考察候选人的Java技能,包括基础、容器、多线程、反射、对象拷贝、Java Web、异常处理、网络编程、设计模式以及流行的框架如Spring、Spring Boot和Spring Cloud等。以下是对这些模块...

    java-JVM-面试题从基础到高级详解-HM

    2. **JDK自带工具**:如jconsole、jvisualvm用于监控JVM状态,jmap用于内存映射,jstack用于线程堆栈快照,jhat用于堆转储分析。 五、JVM与其他技术的交互 1. **JNI(Java Native Interface)**:允许Java代码调用...

    cap-javatuning:Java优化

    8. **并发与多线程**:合理使用并发工具类(如ConcurrentHashMap、CountDownLatch、CyclicBarrier)和线程池(ThreadPoolExecutor),遵循线程安全编程原则,能有效提升程序并行处理能力。 9. **数据库优化**:与...

Global site tag (gtag.js) - Google Analytics