`

自定义异步线程池,实现异步请求以及监控当前线程运行情况

阅读更多

由于工作的需要,写了一份异步调用的小框架,分享出来。。。

 

启动类:

 

/**
 * 线程启动
 * @author yfguopeng
 */
public class ThreadExecutorListener  implements ServletContextListener{
	private final static Log log = LogFactory.getLog(ThreadExecutorListener.class);

	@SuppressWarnings("unchecked")
	public void contextInitialized(ServletContextEvent sce) {
		ServletContext servletContext = sce.getServletContext();
		WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext);
		List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); 
		log.info("=====================初始化线程池========================");
		//创建线程组
		SecurityManager s = System.getSecurityManager();
		ThreadGroup  father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		ThreadGroup group = new ThreadGroup(father, "root-threadgroup");
		
		for (ThreadConfigBean configBean : worders) {
			//设置排队队列大小
			ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity());
			//设置线程工厂
			ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId());
			
			ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(),
                    TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler());
			
			ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker);
		}
		
		log.info("=====================线程池初始化完毕========================");
		log.info("=====================初始化监控线程========================");
		ThreadGroupUtil.monitorThreadStart(group,2000l);
		log.info("=====================监控线程初始化完毕========================");
	}

	public void contextDestroyed(ServletContextEvent sce) {
	}

	
}

 

 

业务监控配置:

 

<bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean">
    	<property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 -->
    	<property name="max" value="40"></property><!-- 最好为请求线程的倍数 -->
    	<property name="min" value="10"></property><!-- 最好为请求线程的倍数 -->
    	<property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 -->
    	<property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 -->
    	<property name="rejectHandler" ><!--任务拒绝处理策略 -->
	    	<bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" >
	    		<property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 -->
	    	</bean>
    	</property>
    </bean>

	<bean id="workers" class="java.util.ArrayList">
		<constructor-arg >
			<list> 
				<ref bean="xxxIndex"/>
			</list>
		</constructor-arg>
	</bean>

 线程工厂:

 

 

/**
 *  线程工厂, 加入了线程名的业务描述
 * 
 * @User: guopeng
 * @Date: 2013-02-28
 */
public class DecorateThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    final AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;

    public DecorateThreadFactory(final ThreadGroup group,final String bizName) {
        this.group = group;
        namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 监控线程:

/**
 * 监控业务线程池运行情况
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class MonitorThread implements Runnable {
	private final static Log log = LogFactory.getLog(MonitorThread.class);
	private final ThreadGroup group;
	
	public MonitorThread(ThreadGroup group) {
		this.group = group;
	}
	
	public void run() {
		Map<String, ThreadPoolExecutor>workers =  ThreadGroupUtil.getThreadWorkers();
		Iterator<String> iterator = workers.keySet().iterator();
		
		log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]");
		while(iterator.hasNext()) {
			ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next());
			
			RejectedExecutionHandler handler = worker.getRejectedExecutionHandler();
			String rejectedSize = "";
			if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) {
				rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize();
			}
			
			log.info("business name:[ "+worker.getBizName()+" ]" +
					", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" +
							", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " +
									", reject threads:[ "+rejectedSize+" ],  largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]");
		}
	}

}

 线程拒绝处理器:

/**
 * 线程拒绝执行控制球
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{
	private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class);
	private static AtomicLong totals = new AtomicLong(0l);
	
	private String bizName;
	
	public RejectedPolicyHandler(){}
	
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size();
		log.info(tip);
		//业务报警 TODO
		totals.addAndGet(1);
		super.rejectedExecution(r, executor);
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
	public long getRejectedSize() {
		return totals.get();
	}
}

 

import java.util.concurrent.RejectedExecutionHandler;

public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{
	public long getRejectedSize() ;
}

 线程配置bean:

@SuppressWarnings("serial")
public class ThreadConfigBean implements Serializable{
	/**
	 * 业务ID
	 */
	private String businessId;
	/**
	 * 任务队列最大线程数
	 * 默认:80
	 */
	private Integer max = 160;
	/**
	 * 任务队列最小线程数
	 * 默认:40
	 */
	private Integer min = 80;
	/**
	 * 等待队列请求数
	 * 默认:300
	 */
	private Integer queueCapacity = 300;
	/**
	 * 空闲线程存活时间
	 * 默认:3分钟
	 */
	private Long keepAliveTime = 3 * 60l;
	/**
	 * 线程拒绝策略
	 */
	private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
	
	public ThreadConfigBean() {
		super();
	}
	
	public Integer getMax() {
		return max;
	}
	public void setMax(Integer max) {
		this.max = max;
	}
	public Integer getMin() {
		return min;
	}
	public void setMin(Integer min) {
		this.min = min;
	}
	public Integer getQueueCapacity() {
		return queueCapacity;
	}
	public void setQueueCapacity(Integer queueCapacity) {
		this.queueCapacity = queueCapacity;
	}
	public Long getKeepAliveTime() {
		return keepAliveTime;
	}
	public void setKeepAliveTime(Long keepAliveTime) {
		this.keepAliveTime = keepAliveTime;
	}
	public RejectedExecutionHandler getRejectHandler() {
		return rejectHandler;
	}
	public void setRejectHandler(RejectedExecutionHandler rejectHandler) {
		this.rejectHandler = rejectHandler;
	}
	public String getBusinessId() {
		return businessId;
	}
	public void setBusinessId(String businessId) {
		this.businessId = businessId;
	}
}

 线程组:

/**
 * 各个业务获取响应线程池
 * @author yfguopeng
 */
public class ThreadGroupUtil {
	private static Map<String, ThreadPoolExecutor> threadworkers;
	private static ScheduledExecutorService monitorThread;//监视线程
	private static final long delay = 200l;
	private static long cycle_default = 5000l;
	
	static {
		threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>();
		monitorThread = Executors.newScheduledThreadPool(1);
	}
	
	public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){
		threadworkers.put(bizName, executor);
	}
	
	public static ThreadPoolExecutor getThreadWorker(String bizName) {
		return threadworkers.get(bizName);
	}
	
	public static Map<String, ThreadPoolExecutor> getThreadWorkers(){
		return threadworkers;
	}

	public static ScheduledExecutorService getMonitorThread() {
		return monitorThread;
	}

	public static void setMonitorThread(ScheduledExecutorService monitorThread) {
		ThreadGroupUtil.monitorThread = monitorThread;
	}
	
	public static void monitorThreadClosed(){
		if (monitorThread != null)
			if (!monitorThread.isTerminated()) 
				monitorThread.shutdown();
	}
	
	public static void monitorThreadStart(ThreadGroup group,Long cycle){
		MonitorThread monitor = new MonitorThread(group);
		if (cycle > 0l) {
			try {
				cycle_default = cycle;
			} catch (Exception e) {
			}
		}
		monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS);
	}
}

 线程池实现类:

/**
 * 线程池
 * @author yfguopeng
 *
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

	private String bizName;
	
	public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
				threadFactory, handler);
		this.bizName = bizName;
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
}

 

web.xml配置:

  <listener>
		<listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class>
	</listener> 

调用:

		ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex");
		
		String payOrgInfo = null;
		String cards = null;
		Future<String> xxxFuture = null;
		Future<String> yyyFuture = null;
		long start = System.currentTimeMillis();
		xxxTask xxxTask = new xxxTask(//参数);
		yyyTask yyyTask = new yyyTask(//参数);
		System.out.println("开始......");
		xxxFuture = exc.submit(xxxTask );
		yyyFuture = exc.submit(yyyTask );
		
		try {
			xxx= xxxFuture .get();
			yyy= yyyFuture .get();
			System.out.println(xxx);
			System.out.println(yyy);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("结束......   "+(end-start));
		
		return "";
	

 

分享到:
评论

相关推荐

    spring线程池(同步、异步).docx

    - `ThreadPoolTaskExecutor`:这个执行器实现了异步任务执行,允许我们配置线程池参数,如核心线程数、最大线程数、队列容量等,以及线程工厂,以自定义线程的行为。 三、同步与异步的区别 - `SyncTaskExecutor`是...

    springmvc+spring线程池处理http并发请求数据同步控制问题

    4. 监控和调优:通过Spring Boot Actuator等监控工具,实时查看线程池状态,根据实际运行情况调整配置。 总结,使用Spring MVC和Spring线程池处理HTTP并发请求,能够有效提升系统的并发处理能力。同时,通过数据...

    异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入

    Hippo4j 提供的异步线程池框架,进一步优化了这一过程,允许开发者动态调整线程池参数,如核心线程数、最大线程数、队列大小等,以适应不断变化的工作负载。 线程池动态变更能力是Hippo4j的一个重要特性。在运行时...

    多线程编程线程池

    ### 多线程编程线程池 ...无论是处理大量短时任务还是实现后台任务的调度和监控,线程池都是一个强大的工具。然而,在使用线程池时也需要考虑其局限性和潜在的风险,特别是当涉及到Web服务器环境或长时间运行的任务时。

    SpringBoot实现定时任务和异步调用

    SpringBoot实现定时任务和异步调用 SpringBoot框架提供了强大的功能来实现定时任务和异步调用,这在业务场景中非常常见。例如,在电商平台中,需要定时发送邮件、短信、统计监控数据等操作,而在订单流程中,需要...

    WSAEventSelect 线程池 实现服务器示例

    `WSAEventSelect`是Windows Socket API(Winsock)中的一个关键函数,它允许应用程序通过事件对象(如内建的Windows事件或自定义事件)来监控套接字的状态变化。线程池则是一种优化资源管理和提高系统性能的技术,它...

    使用IO完成端口机制实现的定制线程池的代码

    这些API可以与自定义线程池结合,使得线程池中的工作线程能高效地处理这些异步I/O操作。 线程池的创建通常包括以下步骤: 1. **初始化线程池参数**:设置核心线程数、最大线程数、线程存活时间等参数。 2. **创建...

    Delphi7创建及释放线程实例

    下面我们将深入探讨如何在Delphi7中创建线程以及何时和如何释放它们。 首先,我们要了解Delphi7中的线程基础。TThread类是VCL(Visual Component Library)框架提供的一种用于创建线程的抽象类。它继承自TComponent...

    C# 线程池的使用 高级应用

    总结,C#线程池的高级应用涉及到线程的创建、调度、监控以及与异步I/O的结合使用。通过合理利用线程池,开发者可以构建高效、稳定的多线程应用程序。在实际开发中,应结合项目需求和系统资源,灵活运用线程池的各种...

    Android实现多线程下载

    实现多线程下载时,为了应对网络不稳定或设备中断等情况,需要支持断点续传。这需要保存每个线程下载的起始位置和已下载长度,当下载暂停或重启时,可以从上次的位置继续下载。 6. **文件分块与合并**: 每个线程...

    delphi线程池mcpage实例

    - **线程池管理器**:负责创建、管理和调度线程,通常由开发者自定义实现。 - **任务队列**:存储待执行的任务,线程从队列中取出任务并执行。 2. **线程池的创建** 创建线程池的第一步是确定线程池的大小,即预...

    java并发编程:juc线程池

    1. 当一个任务被提交给线程池时,线程池首先检查当前运行的线程数是否小于`corePoolSize`。如果是,那么就会创建一个新的工作线程来执行任务。如果当前线程数已经达到`corePoolSize`,任务会被放入`workQueue`阻塞...

    Android异步加载图像小结 (含线程池,缓存方法)毕业设计—(包含完整源码可运行).zip

    本项目是关于Android异步加载图像的一个毕业设计,涵盖了线程池和缓存机制的实现,目的是优化图片加载性能,避免UI阻塞,并节省网络资源。这里我们将详细讨论其中涉及的知识点。 1. **异步加载**: - 在Android中...

    多线程采集,可以自定义URL链接,适用于各种爬虫获取数据。.zip

    1. **System.Threading命名空间**:这是C#中处理线程的核心,提供了Thread、Mutex、Semaphore、Monitor等类,用于创建和管理线程,以及实现线程同步。 2. **ThreadPool类**:线程池是一种线程复用机制,它可以有效...

    Smart Thread Pool

    线程池会根据系统资源和当前负载动态调整线程数量,以实现最佳性能。 3. **线程池的优势** - **资源利用率**:线程池减少了频繁创建和销毁线程的成本,提高了系统资源的利用率。 - **调度优化**:线程池内的线程...

    多线程Web服务器 处理多个响应 java

    总之,多线程Web服务器在Java中实现的核心在于合理调度线程,高效处理并发请求,并确保线程安全。通过线程池、并发控制机制以及高效的I/O模型,我们可以构建出能应对高并发场景的Web服务器。在实际项目中,还应考虑...

    C++编写的WINSOCKET异步程序

    4. **线程池**:创建一个线程池,当有网络事件发生时,从线程池中取出一个线程来处理,避免了频繁创建和销毁线程的开销。 5. **异步IO(Asynchronous IO,AIO)**:Windows中的WSAAsyncGetHostByAddr()和...

    Android应用源码 Gallery实现异步加载网络图片 并只加载当前停止页面图.zip

    这个“Android应用源码 Gallery实现异步加载网络图片 并只加载当前停止页面图.zip”提供的就是一个这样的解决方案。下面将详细解释这个应用的核心技术点。 1. **异步加载**: - 在Android中,为了保证UI线程的流畅...

    异步加载网路图片源码

    通常使用异步任务、线程池、或者现代编程语言中的异步/await机制来实现。 2. **异步加载框架**:在Android开发中,常见的异步图片加载库有Universal Image Loader、Picasso、Glide等。它们内部实现了图片的缓存策略...

    安卓Android源码——listview实现图片的异步加载.zip

    7. **线程池管理**:为了防止大量并发请求导致的资源浪费,可以使用线程池来管理和调度下载任务。 8. **监听器回调**:在图片加载完成后,通过回调接口更新ListView中的ImageView,确保图片加载与UI更新同步。 9. ...

Global site tag (gtag.js) - Google Analytics