`
san_yun
  • 浏览: 2666333 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

一个更加健壮的ThreadPool

 
阅读更多

从dubbo中找到的:

/**
 * 此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程,来源于:<code>Executors.newCachedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 * @author william.liangf
 */
public class CachedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
        		queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
        					: new LinkedBlockingQueue<Runnable>(queues)),
        		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

 

/**
 * 此线程池启动时即创建固定大小的线程数,不做任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 * @author william.liangf
 */
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
        		queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
        					: new LinkedBlockingQueue<Runnable>(queues)),
        		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

 

 

/**
 * 此线程池一直增长,直到上限,增长后不收缩。
 * 
 * @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
 */
public class LimitedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
        		queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
        					: new LinkedBlockingQueue<Runnable>(queues)),
        		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

 

 

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    
    private final String threadName;
    
    private final URL url;
    
    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        throw new RejectedExecutionException(msg);
    }

}

 

分享到:
评论

相关推荐

    c# 线程池的管理 通过ThreadPool

    线程池的工作原理是预先创建一组线程,当需要执行任务时,它会从池中分配一个空闲线程,而不是每次都创建新的线程。这样可以减少线程创建和销毁的开销,因为创建和销毁线程是相对昂贵的操作。线程池还可以根据需要...

    cpp-ThreadPoolLibrary使用现代C实现仅头文件的线程池库快速并且易于使用

    用户可以通过实例化ThreadPool对象来创建一个线程池,并通过提交任务(通常是一个函数对象或lambda表达式)到线程池来执行任务。 2. **任务提交机制**:cpp-ThreadPoolLibrary提供了一个简单的API,允许用户异步...

    boost threadpool(修复内存泄露后的版本)

    Boost.Threadpool是一个功能强大的线程池库,由Boost库家族提供,用于高效地管理和调度并发任务。在实际应用中,内存泄漏问题可能会严重影响程序的性能和稳定性,因此,对于修复内存泄漏后的Boost.Threadpool版本,...

    C_epoll_server-threadpool-linkpooll-main.zip

    在IT领域,尤其是在服务器开发中,`epoll`是一个关键的概念,它是一种高效地处理大量并发连接的I/O多路复用技术。本项目“C_epoll_server-...通过对这些核心组件的优化,可以构建出一个健壮、高效的服务器系统。

    另外一个java多线程下载程序源代码

    在这个“另外一个java多线程下载程序源代码”中,我们可以深入学习如何利用Java的多线程特性来创建一个高效的文件下载应用。 首先,我们了解Java中创建线程的两种主要方式: 1. 继承`Thread`类:自定义一个新的类...

    资源池等待所有线程执行完毕的方法

    `ThreadPool.RegisterWaitForSingleObject`方法允许注册一个等待句柄,该句柄会在指定的对象状态变为信号状态时被触发。下面是如何使用此方法来等待所有线程执行完毕的一个示例: ```csharp using System.Threading...

    java线程池_2.pdf

    `ThreadPool` 包含一个布尔变量 `isClosed` 用于标识线程池是否已关闭,以及一个 `LinkedList` 类型的 `workQueue` 作为任务队列。在构造函数中,根据传入的 `poolSize` 参数创建相应数量的工作线程(`WorkThread`)...

    Qt thread pool 线程池样例,信号槽,跨线程 lambda

    在本文中,我们将深入探讨Qt库中的线程池(Thread Pool)机制,以及如何结合使用信号槽(Signals and Slots)和跨线程的lambda表达式。...通过理解并熟练运用这些技术,我们可以编写出更加健壮、响应迅速的应用程序。

    MyAsyncThread.rar

    在C#编程中,理解和掌握同步与异步的概念至关重要,特别是在构建高性能、高并发的应用时。本教程将深入探讨这些主题,以及相关的多线程技术,包括...通过深入学习和实践,开发者能够编写出更加健壮和高效的多线程应用。

    多线程实验_1

    通过定义委托,我们可以创建一个方法的引用,并将其作为参数传递给另一个方法(如`ThreadPool.QueueUserWorkItem()`或`Task.Run()`)。这样,目标方法会在新的线程上执行传入的委托方法,实现了异步执行。回调函数...

    Delegate方式的异步线程程学习Dome

    BeginInvoke方法启动一个异步操作,它不会等待操作完成就立即返回,而是立即返回一个IAsyncResult对象,可以用于后续跟踪操作状态。EndInvoke方法则用于在操作完成后获取结果或异常。例如: ```csharp public ...

    一个基于C#实现的多线程Multi-Threading Deadlock Tracer Utility处理类库源码

    通过学习和理解这个类库,开发者不仅可以深入了解多线程编程,还能掌握死锁的检测与预防技术,从而编写出更加健壮和高效的多线程应用程序。这个源码资源对于C#开发者来说是一份宝贵的参考资料,有助于提升他们在多...

    流式套接字传输程序

    流式套接字基于TCP协议,提供了一个可靠、面向连接的数据传输服务,确保数据按照发送顺序到达,并且在传输过程中不会丢失或重复。 首先,让我们从服务器端说起。`server.exe`通常负责监听特定的端口,等待客户端的...

    MYQQ聊天小程序源代码

    【MYQQ聊天小程序源代码】是一个基于Windows Forms(Winform)开发的应用,旨在模拟QQ聊天的基本功能。这个项目为开发者提供了一个学习和实践的机会,了解如何构建一个简单的即时通讯应用。Winform是.NET Framework...

    C# winform动态创建和关闭多线程源码 可运行

    在C#编程中,Windows Forms(Winform)应用程序经常需要处理多线程,以实现并发操作,提高程序的响应性和效率。...通过学习和使用这些技术,开发者可以构建更加健壮、高效的多线程Winform应用程序。

    C#.NET多线程实例6个(包括多线程基本使用,多线程互斥等全部多线程使用实例)(201903)

    通过实例化`Thread`类,并提供一个执行的方法(通常为`ThreadStart`委托),可以启动新线程。此外,`ThreadPool`类提供了一种池化线程的方法,可以更高效地利用系统资源。 2. **线程同步与互斥**:在多线程环境中,...

    一个用完成端口编写的聊天程序,包括服务器和客户端-VC.rar

    在这个“一个用完成端口编写的聊天程序,包括服务器和客户端-VC.rar”压缩包中,包含了使用C++编程语言在Visual C++环境下开发的一个基于完成端口的聊天程序实例,这对于学习和理解IOCP机制及其在实际应用中的工作...

    关于多线程的C#小程序

    2. `ThreadPool`:这是一个线程池,可以更高效地管理线程。通过`ThreadPool.QueueUserWorkItem()` 方法可以将任务放入队列,由线程池中的空闲线程处理。 ```csharp ThreadPool.QueueUserWorkItem(new WaitCallback...

    c#线程池应用

    可以通过`ThreadPool.RegisterWaitForSingleObject`方法注册一个等待句柄,当指定的等待条件满足时,线程池将执行一个回调函数。 ```csharp public static void Main() { ManualResetEvent resetEvent = new ...

    C#多线程教程,经典清析教程

    在编程领域,多线程是实现并发执行任务的关键技术,尤其在C#...理解并掌握上述知识点,能够帮助开发者编写出更加健壮和高效的多线程程序。在实践中,应根据具体需求灵活运用这些概念和技术,以达到最佳性能和资源利用。

Global site tag (gtag.js) - Google Analytics