`

Java并发编程之线程池任务监控

    博客分类:
  • Java
阅读更多

Java并发编程之线程池任务监控

 

当我们提交runnable或者callable<?>到ThreadPoolExecutor时,我们是无法知道这些任务是在什么时候才真正的执行的,为了实现这个需求,我们需要扩展ThreadPoolExecutor,重写beforeExecute和afterExecute,在这两个方法里分别做一些任务执行前和任务执行后的相关监控逻辑,还有个terminated方法,是在线程池关闭后回调,,另外,我们可以通过getLargestPoolSize()和getCompletedTaskCount()来分别获取线程池数的峰值和线程池已完成的任务数。

 

下面就一个完整的例子来说明如何进行:

自定义MonitorHandler接口,把before和after抽象出来:

 

package cc.lixiaohui.demo.concurrent;


/**
 * 监控处理器, 目的是把before和after抽象出来, 以便在{@link MonitorableThreadPoolExecutor}中形成一条监控处理器链
 * 
 * @author lixiaohui
 * @date 2016年10月11日 下午7:18:38
 * 
 */
public interface MonitorHandler {
	
	/**
	 * 改监控任务是否可用
	 * 
	 * @return
	 */
	boolean usable(); 
	
	/**
	 * 任务执行前回调
	 * 
	 * @param thread 即将执行该任务的线程
	 * @param runnable 即将执行的任务
	 */
	void before(Thread thread, Runnable runnable);  
	
	/**
	 * <pre>
	 * 任务执行后回调
	 * 注意:
	 *     1.当你往线程池提交的是{@link Runnable} 对象时, 参数runnable就是一个{@link Runnable}对象
	 *     2.当你往线程池提交的是{@link java.util.concurrent.Callable<?>} 对象时, 参数runnable实际上就是一个{@link java.util.concurrent.FutureTask<?>}对象
	 *       这时你可以通过把参数runnable downcast为FutureTask<?>或者Future来获取任务执行结果
	 *       
	 * @param runnable 执行完后的任务
	 * @param throwable 异常信息
	 */
	void after(Runnable runnable, Throwable throwable);
	
	/**
	 * 线程池关闭后回调
	 * 
	 * @param largestPoolSize
	 * @param completedTaskCount
	 */
	void terminated(int largestPoolSize, long completedTaskCount);
}

 

 

扩展ThreadPoolExecutor,增加监控的逻辑,如果监控比较耗时的话,为了不影响业务线程池的执行效率,我们应该将before,after和terminated方法的调用封装为统一的Runnable交给非业务线程池内的Thread来跑(新建个Thread或者线程池):

 

package cc.lixiaohui.demo.concurrent;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 可监控的线程池, 可有多个监控处理器,如果监控的逻辑是比较耗时的话, 最好另起个线程或者线程池专门用来跑MonitorHandler的方法.
 * 
 * @author lixiaohui
 * @date 2016年10月11日 下午7:15:16
 * 
 */
public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {
	
	/**
	 * 可有多个监控处理器
	 */
	private Map<String, MonitorHandler> handlerMap = new HashMap<String, MonitorHandler>();
	
	private final Object lock = new Object();
	
	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}
	
	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		// 依次调用处理器
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.before(t, r);
			}
		}
	}
	
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		// 依次调用处理器
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.after(r, t);
			}
		}
	}
	
	/* 
	 * @see java.util.concurrent.ThreadPoolExecutor#terminated()
	 */
	@Override
	protected void terminated() {
		super.terminated();
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.terminated(getLargestPoolSize(), getCompletedTaskCount());
			}
		}
		
	}
	
	public MonitorHandler addMonitorTask(String key, MonitorHandler task, boolean overrideIfExist) {
		if (overrideIfExist) {
			synchronized (lock) {
				return handlerMap.put(key, task);
			}
		} else {
			synchronized (lock) {
				return handlerMap.putIfAbsent(key, task);
			}
		}
	}
	
	public MonitorHandler addMonitorTask(String key, MonitorHandler task) {
		return addMonitorTask(key, task, true);
	}
	
	public MonitorHandler removeMonitorTask(String key) {
		synchronized (lock) {
			return handlerMap.remove(key);
		}
	}
	
}

 

 

 测试程序:

 

package cc.lixiaohui.demo.concurrent;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import cc.lixiaohui.util.RandomUtils;

/**
 * @author lixiaohui
 * @date 2016年10月11日 下午8:11:39
 * 
 */
public class Tester {
	
	static volatile boolean stop = false;

	public static void main(String[] args) throws InterruptedException, IOException {
		// fixed size 5
		final MonitorableThreadPoolExecutor pool = new MonitorableThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

		pool.addMonitorTask("TimeMonitorTask", newTimeMonitorHandler());
		// 起一个线程不断地往线程池丢任务
		Thread t = new Thread(new Runnable() {
			public void run() {
				startAddTask(pool);
			}
		});
		t.start();
		
		// 丢任务丢20 ms
		Thread.sleep(50);
		stop = true;
		t.join();
		pool.shutdown();
		// 等线程池任务跑完
		pool.awaitTermination(100, TimeUnit.SECONDS);
	}

	private static MonitorHandler newTimeMonitorHandler() {

		return new MonitorHandler() {
			// 任务开始时间记录map, 多线程增删, 需用ConcurrentHashMap
			Map<Runnable, Long> timeRecords = new ConcurrentHashMap<Runnable, Long>();

			public boolean usable() {
				return true;
			}
			
			public void terminated(int largestPoolSize, long completedTaskCount) {
				System.out.println(String.format("%s:largestPoolSize=%d, completedTaskCount=%s", time(), largestPoolSize, completedTaskCount));
			}

			public void before(Thread thread, Runnable runnable) {
				System.out.println(String.format("%s: before[%s -> %s]", time(), thread, runnable));
				timeRecords.put(runnable, System.currentTimeMillis());
			}

			public void after(Runnable runnable, Throwable throwable) {
				long end = System.currentTimeMillis();
				Long start = timeRecords.remove(runnable);
				
				Object result = null;
				if (throwable == null && runnable instanceof FutureTask<?>) { // 有返回值的异步任务,不一定是Callable<?>,也有可能是Runnable
					try {
						result = ((Future<?>) runnable).get();
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt(); // reset
					} catch (ExecutionException e) {
						throwable = e;
					} catch (CancellationException e) {
						throwable = e;
					}
				}

				if (throwable == null) { // 任务正常结束
					if (result != null) { // 有返回值的异步任务
						System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond, result: %s", time(), Thread.currentThread(), runnable, end - start, result));
					} else {
						System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond", time(), Thread.currentThread(), runnable, end - start));
					}
				} else {
					System.err.println(String.format("%s: after[%s -> %s], costs %d millisecond, exception: %s", time(), Thread.currentThread(), runnable, end - start, throwable));
				}
			}

		};
	}

	// 随机runnable或者callable<?>, 任务随机抛异常
	private static void startAddTask(MonitorableThreadPoolExecutor pool) {
		int count = 0;
		while (!stop) {
			if (RandomUtils.randomBoolean()) {// 丢Callable<?>任务
				pool.submit(new Callable<Boolean>() {

					public Boolean call() throws Exception {
						// 随机抛异常
						boolean bool = RandomUtils.randomBoolean();
						// 随机耗时 0~100 ms
						Thread.sleep(RandomUtils.randomInt(100));
						if (bool) {
							throw new RuntimeException("thrown randomly");
						}
						return bool;
					}

				});
			} else { // 丢Runnable
				pool.submit(new Runnable() {

					public void run() {
						// 随机耗时 0~100 ms
						try {
							Thread.sleep(RandomUtils.randomInt(100));
						} catch (InterruptedException e) {}
						// 随机抛异常
						if (RandomUtils.randomBoolean()) {
							throw new RuntimeException("thrown randomly");
						}
					};

				});
			}
			System.out.println(String.format("%s:submitted %d task", time(), ++count));
		}
	}

	private static String time() {
		return String.valueOf(System.currentTimeMillis());
	}
}

 

 

一个较短的结果:

 

1476253228222: before[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979]
1476253228222:Thread[Thread-0,5,main], submitted 1 task
1476253228253:Thread[Thread-0,5,main], submitted 2 task
1476253228264: before[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d]
1476253228264:Thread[Thread-0,5,main], submitted 3 task
1476253228265: before[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc]
1476253228271: after[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d], costs 7 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
1476253228295: after[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979], costs 42 millisecond
1476253228347: after[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc], costs 82 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
1476253228347:Thread[pool-1-thread-3,5,main], largestPoolSize=3, completedTaskCount=3

 

 

 

0
1
分享到:
评论
3 楼 sll1097892736 2017-03-21  
import cc.lixiaohui.util.RandomUtils 这个类哪里有?
我邮箱:1097892736.qq.com
2 楼 莫名的拉风 2017-01-14  
漂泊一剑客 写道
public MonitorHandler removeMonitorTask(MonitorHandler task)

博主,发现一个问题,这个方法的参数,是有问题吧,应该是String的key


感谢提出,已修正
1 楼 漂泊一剑客 2017-01-14  
public MonitorHandler removeMonitorTask(MonitorHandler task)

博主,发现一个问题,这个方法的参数,是有问题吧,应该是String的key

相关推荐

    Java并发编程:线程池的使用 - 平凡希 - 博客园1

    Java并发编程中的线程池是提高系统效率的关键工具,它解决了频繁创建和销毁线程的问题。线程池通过复用已存在的线程来处理任务,从而避免了每次任务执行完毕后销毁线程的开销。在Java中,线程池的核心实现是`java....

    Java 并发编程实战.pdf

    《Java并发编程实战》这本书是关于Java语言中并发编程技术的经典著作。它详细介绍了如何在Java环境中有效地实现多线程程序和并发控制机制。在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程...

    java并发编程:juc线程池

    Java并发编程中的JUC线程池是Java程序员必须掌握的关键技术之一,它允许开发者高效地管理并发执行的任务,充分利用多核处理器的性能。线程池的出现解决了在并发环境中线程创建、销毁带来的开销,提高了系统资源的...

    并发容器和线程池,java并发编程3

    `Future`接口提供了获取异步计算结果的能力,它是Java并发编程中重要的API之一。 ##### 3.1 FutureTask `FutureTask`是`Future`接口的一个具体实现,它可以包装一个`Runnable`或`Callable`对象,并允许用户获取...

    java并发编程从入门到精通

    《Java并发编程从入门到精通》内容包括并发编程概念,线程,线程安全,线程集合类,线程阀,线程池,Fork/Join,线程、线程池在互联网项目开发的应用,线程监控及线程分析,Android中线程应用。 本书适合Java开发...

    java并发编程实战高清版pdf

    《Java并发编程实战》是Java开发者深入理解和掌握并发编程的一本经典著作。这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者在多核时代编写出高效、可伸缩且线程安全的代码。 并发编程是现代...

    JAVA并发编程实践

    《JAVA并发编程实践》是Java开发者深入理解和应用并发编程的重要参考书籍,由Doug Lea等专家撰写,书中全面探讨了Java平台上的并发编程技术。在Java世界中,并发编程是提高程序性能和效率的关键手段,尤其在多核...

    JAVA并发编程实践-线程池-学习笔记

    Java并发编程实践中的线程池是一个关键的概念,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,以提高系统的性能和效率。线程池通过复用已存在的线程来减少线程的创建和销毁开销,避免了频繁的上...

    Java并发编程:设计原则与模式(第二版)-3

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java多线程编程技术的权威著作。这本书详细阐述了在Java平台中进行高效并发处理的关键概念、设计原则和实用模式。以下是对该书内容的一些核心知识点的概述...

    Java并发编程的艺术

    第四章聚焦于多线程技术,这是Java并发编程的核心之一。本章首先介绍了多线程带来的优势,比如提高应用程序的响应速度和效率。接着,作者们详细讲解了如何创建和管理线程,包括线程的生命周期、状态转换等内容。此外...

    Java 并发编程实战(高清带目录).zip

    这本书高清且带有详细的目录,方便读者快速定位到所需的主题,是Java并发编程领域的重要参考资料。 并发编程是现代软件开发中的核心技能,尤其是在Java平台上,因为Java以其强大的并发支持而著称。本书涵盖了以下几...

    java并发编程实战.zip

    《Java并发编程实战》这本书是Java开发者深入理解并发编程的重要参考资料。它涵盖了Java并发的核心概念、工具和最佳实践,旨在帮助读者在多线程环境中构建高效、可靠的系统。以下是本书涉及的一些关键知识点: 1. *...

    Java并发编程实践(Java Concurrency in Practice) (中英版)

    《Java并发编程实践》是Java开发者深入理解和应用并发编程的权威指南,这本书全面覆盖了Java并发编程的各种核心概念和技术,旨在帮助程序员编写出高效、安全的并发代码。书中的内容既包括理论知识,也包含丰富的实战...

    《Java并发编程实战》的高清完整PDF版

    《Java并发编程实战》这本书是Java开发者深入理解并发编程的重要参考资料。并发编程是现代软件开发中的核心技能之一,尤其是在多核处理器和分布式系统环境中。Java平台提供了强大的并发工具和框架,使得开发者能够...

    ( java并发编程.zip )文字版 电子书 高清版

    《Java并发编程》是一本深度探讨Java平台上的并发与多线程编程的权威书籍,适合对并发编程有深入了解需求的开发者阅读,特别是对于那些志在加入BAT(百度、阿里巴巴、腾讯)等顶级互联网企业的程序员来说,这本书是...

    一本经典的多线程书籍 Java并发编程 设计原则与模式 第二版 (英文原版)

    《Java并发编程 设计原则与模式 第二版》是一本深受程序员喜爱的经典书籍,由Addison Wesley出版。这本书深入探讨了Java平台上的多线程编程技术,为开发者提供了丰富的设计原则和模式,帮助他们理解和解决并发环境中...

    java并发编程实践中文版和英文版

    《Java并发编程实践》是一本深入探讨Java平台上的并发编程的经典著作。这本书旨在帮助开发者理解和掌握如何有效地编写多线程程序,以充分利用现代处理器的多核能力,提高应用程序的性能和响应性。以下是对该书内容的...

    [Java并发编程实践].(Java.Concurrency.in.Practice).Brian.Goetz.文字版(1)

    6. **线程池**:Executor框架是Java并发编程的重要组成部分,它通过线程池管理线程,提高了系统资源的利用率,减少了线程创建和销毁的开销。 7. **并发异常处理**:书中强调了在并发环境中正确处理异常的重要性,...

Global site tag (gtag.js) - Google Analytics