1.java.util.concurrent & ThreadPoolExecutor 相关概念 及参考
http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html
http://heipark.iteye.com/blog/1156011
http://dongxuan.iteye.com/blog/901689
http://blog.csdn.net/wangwenhui11/article/details/6760474
2.
public final class ThreadExecutorFactory { static TraceLogger logger = new TraceLogger(ThreadExecutorFactory.class); //参看user-config.xml private static int corePoolSize ; private static int maximumPoolSize ; private static long keepAliveTime ; private static TimeUnit unit = TimeUnit.SECONDS; //线程池维护线程所允许的空闲时间的单位 private static BlockingQueue<Runnable> blockingQueue = null; static{ init(); } private static void init() { //读取配置,也可以写死 corePoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "core_pool_size")); maximumPoolSize = Integer.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "maximum_pool_size")); keepAliveTime = Long.valueOf(ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "keep_alive_time")); String queueSize = ConfigurationUtil.getUserConfigSingleValue("thread_pool", "pool_config", "queueSize"); if (!StringUtil.isBlank(queueSize)) { blockingQueue = new ArrayBlockingQueue<Runnable>(Integer.valueOf(queueSize)); } if (FactoryHolder.threadPool == null) { FactoryHolder.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime ,unit ,blockingQueue ,new ThreadPoolExecutor.AbortPolicy()); } } private static class FactoryHolder{ static ThreadPoolExecutor threadPool = null; } public static void execute(Runnable runnable){ FactoryHolder.threadPool.execute(runnable); logger.info(runnable + " 加入线程池!"); System.out.println(runnable + " 加入线程池!"); } }
简单监听器的运用,首先产生线程,将要监听的对象放入map中, 循环Map,当map中的元素都移出后,结束线程, 再次将要监听的对象放入map时,又会产生新的线程监听,如果在此线程没结束时 加入一个新的监听对象到map 不会启动新线程。
public class BillTaskStatusListener implements Runnable ,Serializable{ private static final long serialVersionUID = -6416044623618376341L; static TraceLogger logger = new TraceLogger(BillTaskStatusListener.class); private static final String RUN_STATUS_SUCCESS = "Y"; //成功 private static final String RUN_STATUS_FAILED = "N"; //失败 private static final String RUN_STATUS_RUNNING = "R"; //执行中 //线程安全集合,用于存储待查询结果的任务实例Id private static Map<String,DataObject> billTaskMaps = new ConcurrentHashMap<String, DataObject>(); /** * 同步添加到监听循环 * @param taskInstId */ private synchronized static void addToListener(Long taskInstId,DataObject paramObj){ if (billTaskMaps.isEmpty()) { billTaskMaps.put(String.valueOf(taskInstId), paramObj); ThreadExecutorFactory.execute(new BillTaskStatusListener()); }else { billTaskMaps.put(String.valueOf(taskInstId), paramObj); } logger.info("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>"); System.out.println("任务实例:" + paramObj.getString("taskInstId") + " 加入<作业-任务结果监听>"); } /** * */ public void run() { while (!billTaskMaps.isEmpty()) { try { //获取所有监听的任务实例对象 DataObject[] taskInstancesArray = taskInstanceService.getTaskExcuteResultByTaskInstanceId(getInstanceIds(billTaskMaps)); if (null != taskInstancesArray && taskInstancesArray.length>0) { for (DataObject object : taskInstancesArray) { DataObject paramObject = billTaskMaps.get(object.getString("taskInstanceId")); //查询对应作业实例 DataObject tmpTaskDO = queryBillItemByItemId(paramObject.getString("billItemId")); if (null ==tmpTaskDO) { continue; } //任务正常完成 if (6 == object.getInt("exeState")) { //将任务结果插入作业表 updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO , RUN_STATUS_SUCCESS); //移除当前监听的任务实例 billTaskMaps.remove(object.getString("taskInstanceId")); logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); //任务未正常结束 重启 }else if (4 <= object.getInt("exeState") ) { //任务终止 }else if ("1".equals(paramObject.get("exceptionProcessMode"))) { //将任务结果插入作业表 updateFillinContent(object , billTaskMaps.get(object.getString("taskInstanceId")),tmpTaskDO,RUN_STATUS_FAILED); billTaskMaps.remove(object.getString("taskInstanceId")); logger.info("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); System.out.println("作业-任务:" + object.getLong("taskInstanceId") + " 移出<作业-任务结果监听循环>"); } } } Thread.sleep(5000); //System.out.println("监听作业-任务 sleep 5s!"); } catch (Exception e) { logger.info("监听作业-任务 执行结果出错!"); billTaskMaps.clear(); manager.rollback(); e.printStackTrace(); } } } }
在其他地方只需调用BillTaskStatusListener的静态方法addToListener(Long taskInstId,DataObject paramObj),就可以了
注意:改类不能够拿过去直接用,需根据自己的时间情况修改,像读配置文件,读监听状态等等。。。
相关推荐
jdk自带的线程池是Java开发中一个非常重要的概念,特别是在多线程编程中。线程池是线程的容器,每次只执行额定数量的线程,线程池就是用来管理这些额定数量的线程。下面我们来详细了解jdk自带的线程池。 一、jdk...
Java并发框架是Java JDK中内置的一系列用于处理多线程并行执行的工具和类库,自JDK 5.0引入以来,极大地简化了并发编程的复杂性。这个框架由Doug Lea设计,并在JSR-166任务中提出,最终在Tiger(JDK 5)版本中被引入...
在Java中,多线程编程允许程序同时执行多个任务,提高程序的执行效率。然而,多线程也带来了同步问题,如竞态条件、死锁等,因此,对线程的监控显得至关重要。 二、JVM内置的线程监控 1. **JConsole**: JConsole是...
- `newSingleThreadExecutor()`: 创建单线程线程池,所有任务都在一个线程中按顺序执行,保证了任务执行的顺序性。 - `newCachedThreadPool()`: 创建缓存线程池,线程数量可变,任务多时会创建新线程,空闲60秒后...
Java平台提供了强大的并发工具和框架,使得开发者能够有效地利用多线程和分布式计算,提高系统的性能和响应速度。 本书主要涵盖以下几个关键知识点: 1. **线程基础**:首先,你需要了解线程的基本概念,包括如何...
JDK自带的JConsole、VisualVM等工具可以帮助我们分析线程状态、内存使用情况以及CPU负载等,以便找出性能瓶颈并进行优化。 通过阅读《Java并发编程实践》,开发者不仅可以深入理解Java并发编程的原理,还能掌握在...
8. **并发性能分析**:使用JDK自带的`jconsole`、`jvisualvm`等工具,或者第三方工具如`VisualVM`,可以对并发程序进行监控和分析,找出性能瓶颈和线程安全问题。 9. **并发编程模型**:包括生产者消费者模型、读者...
- 性能调优:通过JDK自带工具进行性能监控和调整。 这本教材覆盖了Java编程的基础到高级内容,通过实例和习题帮助读者巩固理论知识并提升实践能力,对于想要深入学习Java的开发者来说是一份宝贵的资源。通过阅读和...
10. **性能调优与监控**:最后,书中可能会涵盖如何分析和优化并发程序的性能,以及如何使用JDK自带的监控工具(如`jconsole`、`VisualVM`等)进行线程监控和诊断。 通过阅读《Java并发编程的艺术》,开发者不仅能...
面试时,面试官通常会从多个方面考察候选人的Java技能,包括基础、容器、多线程、反射、对象拷贝、Java Web、异常处理、网络编程、设计模式以及流行的框架如Spring、Spring Boot和Spring Cloud等。以下是对这些模块...
2. **JDK自带工具**:如jconsole、jvisualvm用于监控JVM状态,jmap用于内存映射,jstack用于线程堆栈快照,jhat用于堆转储分析。 五、JVM与其他技术的交互 1. **JNI(Java Native Interface)**:允许Java代码调用...
8. **并发与多线程**:合理使用并发工具类(如ConcurrentHashMap、CountDownLatch、CyclicBarrier)和线程池(ThreadPoolExecutor),遵循线程安全编程原则,能有效提升程序并行处理能力。 9. **数据库优化**:与...