由于工作的需要,写了一份异步调用的小框架,分享出来。。。
启动类:
/** * 线程启动 * @author yfguopeng */ public class ThreadExecutorListener implements ServletContextListener{ private final static Log log = LogFactory.getLog(ThreadExecutorListener.class); @SuppressWarnings("unchecked") public void contextInitialized(ServletContextEvent sce) { ServletContext servletContext = sce.getServletContext(); WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext); List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); log.info("=====================初始化线程池========================"); //创建线程组 SecurityManager s = System.getSecurityManager(); ThreadGroup father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); ThreadGroup group = new ThreadGroup(father, "root-threadgroup"); for (ThreadConfigBean configBean : worders) { //设置排队队列大小 ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity()); //设置线程工厂 ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId()); ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(), TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler()); ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker); } log.info("=====================线程池初始化完毕========================"); log.info("=====================初始化监控线程========================"); ThreadGroupUtil.monitorThreadStart(group,2000l); log.info("=====================监控线程初始化完毕========================"); } public void contextDestroyed(ServletContextEvent sce) { } }
业务监控配置:
<bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean"> <property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 --> <property name="max" value="40"></property><!-- 最好为请求线程的倍数 --> <property name="min" value="10"></property><!-- 最好为请求线程的倍数 --> <property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 --> <property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 --> <property name="rejectHandler" ><!--任务拒绝处理策略 --> <bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" > <property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 --> </bean> </property> </bean> <bean id="workers" class="java.util.ArrayList"> <constructor-arg > <list> <ref bean="xxxIndex"/> </list> </constructor-arg> </bean>
线程工厂:
/** * 线程工厂, 加入了线程名的业务描述 * * @User: guopeng * @Date: 2013-02-28 */ public class DecorateThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; public DecorateThreadFactory(final ThreadGroup group,final String bizName) { this.group = group; namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
监控线程:
/** * 监控业务线程池运行情况 * @author yfguopeng * @Date 2013-02-28 */ public class MonitorThread implements Runnable { private final static Log log = LogFactory.getLog(MonitorThread.class); private final ThreadGroup group; public MonitorThread(ThreadGroup group) { this.group = group; } public void run() { Map<String, ThreadPoolExecutor>workers = ThreadGroupUtil.getThreadWorkers(); Iterator<String> iterator = workers.keySet().iterator(); log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]"); while(iterator.hasNext()) { ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next()); RejectedExecutionHandler handler = worker.getRejectedExecutionHandler(); String rejectedSize = ""; if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) { rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize(); } log.info("business name:[ "+worker.getBizName()+" ]" + ", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" + ", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " + ", reject threads:[ "+rejectedSize+" ], largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]"); } } }
线程拒绝处理器:
/** * 线程拒绝执行控制球 * @author yfguopeng * @Date 2013-02-28 */ public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{ private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class); private static AtomicLong totals = new AtomicLong(0l); private String bizName; public RejectedPolicyHandler(){} @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size(); log.info(tip); //业务报警 TODO totals.addAndGet(1); super.rejectedExecution(r, executor); } public String getBizName() { return bizName; } public void setBizName(String bizName) { this.bizName = bizName; } public long getRejectedSize() { return totals.get(); } }
import java.util.concurrent.RejectedExecutionHandler; public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{ public long getRejectedSize() ; }
线程配置bean:
@SuppressWarnings("serial") public class ThreadConfigBean implements Serializable{ /** * 业务ID */ private String businessId; /** * 任务队列最大线程数 * 默认:80 */ private Integer max = 160; /** * 任务队列最小线程数 * 默认:40 */ private Integer min = 80; /** * 等待队列请求数 * 默认:300 */ private Integer queueCapacity = 300; /** * 空闲线程存活时间 * 默认:3分钟 */ private Long keepAliveTime = 3 * 60l; /** * 线程拒绝策略 */ private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy(); public ThreadConfigBean() { super(); } public Integer getMax() { return max; } public void setMax(Integer max) { this.max = max; } public Integer getMin() { return min; } public void setMin(Integer min) { this.min = min; } public Integer getQueueCapacity() { return queueCapacity; } public void setQueueCapacity(Integer queueCapacity) { this.queueCapacity = queueCapacity; } public Long getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(Long keepAliveTime) { this.keepAliveTime = keepAliveTime; } public RejectedExecutionHandler getRejectHandler() { return rejectHandler; } public void setRejectHandler(RejectedExecutionHandler rejectHandler) { this.rejectHandler = rejectHandler; } public String getBusinessId() { return businessId; } public void setBusinessId(String businessId) { this.businessId = businessId; } }
线程组:
/** * 各个业务获取响应线程池 * @author yfguopeng */ public class ThreadGroupUtil { private static Map<String, ThreadPoolExecutor> threadworkers; private static ScheduledExecutorService monitorThread;//监视线程 private static final long delay = 200l; private static long cycle_default = 5000l; static { threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>(); monitorThread = Executors.newScheduledThreadPool(1); } public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){ threadworkers.put(bizName, executor); } public static ThreadPoolExecutor getThreadWorker(String bizName) { return threadworkers.get(bizName); } public static Map<String, ThreadPoolExecutor> getThreadWorkers(){ return threadworkers; } public static ScheduledExecutorService getMonitorThread() { return monitorThread; } public static void setMonitorThread(ScheduledExecutorService monitorThread) { ThreadGroupUtil.monitorThread = monitorThread; } public static void monitorThreadClosed(){ if (monitorThread != null) if (!monitorThread.isTerminated()) monitorThread.shutdown(); } public static void monitorThreadStart(ThreadGroup group,Long cycle){ MonitorThread monitor = new MonitorThread(group); if (cycle > 0l) { try { cycle_default = cycle; } catch (Exception e) { } } monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS); } }
线程池实现类:
/** * 线程池 * @author yfguopeng * */ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { private String bizName; public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.bizName = bizName; } public String getBizName() { return bizName; } public void setBizName(String bizName) { this.bizName = bizName; } }
web.xml配置:
<listener> <listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class> </listener>
调用:
ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex"); String payOrgInfo = null; String cards = null; Future<String> xxxFuture = null; Future<String> yyyFuture = null; long start = System.currentTimeMillis(); xxxTask xxxTask = new xxxTask(//参数); yyyTask yyyTask = new yyyTask(//参数); System.out.println("开始......"); xxxFuture = exc.submit(xxxTask ); yyyFuture = exc.submit(yyyTask ); try { xxx= xxxFuture .get(); yyy= yyyFuture .get(); System.out.println(xxx); System.out.println(yyy); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("结束...... "+(end-start)); return "";
相关推荐
- `ThreadPoolTaskExecutor`:这个执行器实现了异步任务执行,允许我们配置线程池参数,如核心线程数、最大线程数、队列容量等,以及线程工厂,以自定义线程的行为。 三、同步与异步的区别 - `SyncTaskExecutor`是...
4. 监控和调优:通过Spring Boot Actuator等监控工具,实时查看线程池状态,根据实际运行情况调整配置。 总结,使用Spring MVC和Spring线程池处理HTTP并发请求,能够有效提升系统的并发处理能力。同时,通过数据...
Hippo4j 提供的异步线程池框架,进一步优化了这一过程,允许开发者动态调整线程池参数,如核心线程数、最大线程数、队列大小等,以适应不断变化的工作负载。 线程池动态变更能力是Hippo4j的一个重要特性。在运行时...
### 多线程编程线程池 ...无论是处理大量短时任务还是实现后台任务的调度和监控,线程池都是一个强大的工具。然而,在使用线程池时也需要考虑其局限性和潜在的风险,特别是当涉及到Web服务器环境或长时间运行的任务时。
SpringBoot实现定时任务和异步调用 SpringBoot框架提供了强大的功能来实现定时任务和异步调用,这在业务场景中非常常见。例如,在电商平台中,需要定时发送邮件、短信、统计监控数据等操作,而在订单流程中,需要...
`WSAEventSelect`是Windows Socket API(Winsock)中的一个关键函数,它允许应用程序通过事件对象(如内建的Windows事件或自定义事件)来监控套接字的状态变化。线程池则是一种优化资源管理和提高系统性能的技术,它...
这些API可以与自定义线程池结合,使得线程池中的工作线程能高效地处理这些异步I/O操作。 线程池的创建通常包括以下步骤: 1. **初始化线程池参数**:设置核心线程数、最大线程数、线程存活时间等参数。 2. **创建...
下面我们将深入探讨如何在Delphi7中创建线程以及何时和如何释放它们。 首先,我们要了解Delphi7中的线程基础。TThread类是VCL(Visual Component Library)框架提供的一种用于创建线程的抽象类。它继承自TComponent...
总结,C#线程池的高级应用涉及到线程的创建、调度、监控以及与异步I/O的结合使用。通过合理利用线程池,开发者可以构建高效、稳定的多线程应用程序。在实际开发中,应结合项目需求和系统资源,灵活运用线程池的各种...
实现多线程下载时,为了应对网络不稳定或设备中断等情况,需要支持断点续传。这需要保存每个线程下载的起始位置和已下载长度,当下载暂停或重启时,可以从上次的位置继续下载。 6. **文件分块与合并**: 每个线程...
- **线程池管理器**:负责创建、管理和调度线程,通常由开发者自定义实现。 - **任务队列**:存储待执行的任务,线程从队列中取出任务并执行。 2. **线程池的创建** 创建线程池的第一步是确定线程池的大小,即预...
1. 当一个任务被提交给线程池时,线程池首先检查当前运行的线程数是否小于`corePoolSize`。如果是,那么就会创建一个新的工作线程来执行任务。如果当前线程数已经达到`corePoolSize`,任务会被放入`workQueue`阻塞...
本项目是关于Android异步加载图像的一个毕业设计,涵盖了线程池和缓存机制的实现,目的是优化图片加载性能,避免UI阻塞,并节省网络资源。这里我们将详细讨论其中涉及的知识点。 1. **异步加载**: - 在Android中...
1. **System.Threading命名空间**:这是C#中处理线程的核心,提供了Thread、Mutex、Semaphore、Monitor等类,用于创建和管理线程,以及实现线程同步。 2. **ThreadPool类**:线程池是一种线程复用机制,它可以有效...
线程池会根据系统资源和当前负载动态调整线程数量,以实现最佳性能。 3. **线程池的优势** - **资源利用率**:线程池减少了频繁创建和销毁线程的成本,提高了系统资源的利用率。 - **调度优化**:线程池内的线程...
总之,多线程Web服务器在Java中实现的核心在于合理调度线程,高效处理并发请求,并确保线程安全。通过线程池、并发控制机制以及高效的I/O模型,我们可以构建出能应对高并发场景的Web服务器。在实际项目中,还应考虑...
4. **线程池**:创建一个线程池,当有网络事件发生时,从线程池中取出一个线程来处理,避免了频繁创建和销毁线程的开销。 5. **异步IO(Asynchronous IO,AIO)**:Windows中的WSAAsyncGetHostByAddr()和...
这个“Android应用源码 Gallery实现异步加载网络图片 并只加载当前停止页面图.zip”提供的就是一个这样的解决方案。下面将详细解释这个应用的核心技术点。 1. **异步加载**: - 在Android中,为了保证UI线程的流畅...
通常使用异步任务、线程池、或者现代编程语言中的异步/await机制来实现。 2. **异步加载框架**:在Android开发中,常见的异步图片加载库有Universal Image Loader、Picasso、Glide等。它们内部实现了图片的缓存策略...
7. **线程池管理**:为了防止大量并发请求导致的资源浪费,可以使用线程池来管理和调度下载任务。 8. **监听器回调**:在图片加载完成后,通过回调接口更新ListView中的ImageView,确保图片加载与UI更新同步。 9. ...