`
yufeng0471
  • 浏览: 100982 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

线程池饱和策略之阻塞式处理

    博客分类:
  • java
 
阅读更多

背景:

 

A系统向activemq发送消息,B系统以监听的方式从activemq接收消息,因为这些消息都是转换文件,是CPU消耗型的服务,而服务器都是多CPU,为了充分利用CPU资源,B系统以多线程方式处理消息,这里用到了线程池,假设线程池最大线程数量是8(和CPU数量相等),但是在接收消息的时候,发现B系统把所有的消息都接收下来,放在了线程池的队列中,这样就产生问题了,如果B系统down掉的话,所有的消息都会丢失。

 

其实我想要的效果是,如果线程池中工作队列里的任务数量大于一定值的时候,B系统的消息接收监听器就阻塞,不要再从activemq中接收消息,通过查看线程池ThreadPoolExecutor源码

 

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

 发现工作队列BlockingQueue在插入任务的时候,执行的是workQueue.offer(),该函数并不会阻塞插入任务的操作,其实应该用workQueue.put()函数,这个函数会产生等待,直到工作队列BlockingQueue中的任务数量下降。

 

解决方案:

 

继承ArrayBlockingQueue,重写offer函数,代码如下

package cn.sh.ideal.pool;

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueReplaceOffer<E>  extends ArrayBlockingQueue<E> {

    public ArrayBlockingQueueReplaceOffer(int capacity) {
        super(capacity);
    }

    @Override
    public boolean offer(E e){
        try {
            super.put(e);
            return true;
        } catch (InterruptedException e1) {
            e1.printStackTrace();
            return false;
        }
    }
}
 

在初始化线程池的时候,使用ArrayBlockingQueueReplaceOffer,这样就达到了阻塞的目的

new ThreadPoolExecutor(8, 8,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueueReplaceOffer <Runnable>(8))

 

已经有人把阻塞式的处理请求提交到了 Java Bug 数据库(Bug Id 6648211,“ThreadPoolExecutor 特性需求”)

 

分享到:
评论

相关推荐

    java线程池工作队列饱和策略代码示例

    Java线程池工作队列饱和策略是Java并发编程中的一种重要机制,用于处理线程池中工作队列的饱和问题。在本文中,我们将详细介绍Java线程池工作队列饱和策略的概念、原理和实现。 线程池(Thread Pool)是并行执行...

    线程池之ThreadPoolExecutor.docx

    - `handler`: 饱和策略,当线程池无法接受新任务时的处理方式,包括AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy等。 例如,创建一个固定大小的线程池可以使用`Executors....

    JAVA线程池的分析和使用

    - **RejectedExecutionHandler**:饱和策略,处理线程池饱和时的新任务。 3. **线程池的执行策略** - **AbortPolicy**:默认策略,抛出`RejectedExecutionException`异常。 - **CallerRunsPolicy**:调用者线程...

    java线程池详解文档

    - **饱和策略**:当所有线程都在工作且任务队列也已满时,线程池会根据配置的饱和策略来处理无法接收的任务。 - **线程销毁**:当任务减少时,超过核心线程数的空闲线程会在一定时间内被销毁,而核心线程则会一直...

    18线程池1

    4. 如果线程池已满并且阻塞队列也满,就会执行饱和策略(handler,也称为拒绝策略),这可以是抛出异常、忽略任务、调用系统默认处理或自定义处理方式。 Java提供了多种类型的线程池供选择,包括`...

    面试必备:Java线程池解析.pdf

    - handler:饱和策略处理器,当线程池无法处理新提交的任务时,按照既定的策略处理,常见的有AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。 线程池的执行流程涉及execute()方法,当提交...

    java并发编程:juc线程池

    8. `handler`是线程池的饱和策略,当线程池和队列都满时,用来处理被拒绝的任务。 `FutureTask`是一个可以取消的异步计算任务,它实现了`RunnableFuture`接口,提供了获取计算结果和取消任务的功能。它通常与线程池...

    JAVA并发编程实践-线程池-学习笔记

    4. **饱和策略(Rejected Execution Handler)**:当线程池和队列都满载时,新提交的任务处理策略,常见的策略有抛弃任务、抛弃最旧任务、抛出异常等。 配置线程池时,我们需要关注以下几点: - **线程池大小**:...

    C++ 实现线程池ThreadPool

    - **线程饱和策略**:当任务队列满时,线程池可以采取拒绝策略,如丢弃任务、阻塞提交任务的线程或创建新线程。 3. **C++实现线程池的关键技术**: - **线程库**:C++11及更高版本提供了标准线程库 `&lt;thread&gt;`,...

    PollDemo.rar

    - 使用适当的拒绝策略,如AbortPolicy、CallerRunsPolicy、DiscardPolicy或DiscardOldestPolicy,处理线程池饱和情况。 6. **注意事项**: - 确保线程安全:在多线程环境下,共享数据的访问需要同步控制,避免...

    java自带并发框架

    线程池可以根据需求调整大小,处理任务积压,甚至设置饱和策略,如拒绝新任务或丢弃旧任务。 - **锁机制(Locks)**:除了synchronized,框架提供了更灵活的锁,如`ReentrantLock`,支持公平/非公平获取、可中断...

    搜狗实验室技术交流文档C10K问题

    2. 应用软件处理任务和线程/进程关系的方式,包括每任务1进程、每任务1线程、单线程、多任务共享线程池以及一些更复杂的变种方案。 具体到常用的经典策略,有以下几种: 1. 每个客户端使用一个线程或进程,并使用...

    java.util.concurrent 学习ppt

    通常,我们会通过Executors工厂类来创建ExecutorService实例,这样可以方便地定制线程池的行为,如饱和策略、预创建线程的数量、线程存活时间等。 除了线程池,Java.util.concurrent还包含了一系列的并发工具类,如...

    java中ThreadPoolExecutor常识汇总

    4. 当添加任务大于 maximumPoolSize 时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是 AbortPolicy(直接丢弃) 阻塞队列 ThreadPoolExecutor 中使用的阻塞队列有三种: * ...

    Java调度原理及使用

    通过线程池,可以避免频繁创建和销毁线程带来的开销,同时可以配置线程池的拒绝策略,应对任务提交过快导致的资源饱和问题。 五、Java并发工具 Java提供了一系列并发工具类,如`Semaphore`(信号量)、`...

    Java并发编程(学习笔记).xmind

    饱和策略 AbortPolicy DiscardPolicy DiscardOldestPolicy CallerRunsPolicy 线程工厂 在调用构造函数后再定制ThreadPoolExecutor 扩展 ThreadPoolExecutor afterExecute(Runnable r,...

    java 并发视频教程

    - **`BlockingQueue`**:提供了一种阻塞式的队列操作,常用于线程池的工作队列。 - **`CopyOnWriteArrayList`**:线程安全的动态数组实现,适合读多写少的场景。 #### 六、案例分析 - **生产者消费者模式**:利用`...

    Java面试题整理

    如果队列也满了,可以选择拒绝新任务,或者使用饱和策略处理。 ### Tomcat参数调整 Tomcat作为常用的Servlet容器,其参数调整涉及性能优化。例如,调整最大连接数、最大线程数等参数,以适应应用程序的需求。 ###...

    ROSA:资源敏感的高性能Web容器体系结构 ROSA

    ROSA通过推迟处理对已饱和共享资源请求的方式来减少线程阻塞。在这种架构下,如果Web容器在接收到来自Web组件的请求时,发现某些共享资源已经被其他线程饱和占用,则ROSA体系结构会延迟这些请求的处理,等待资源可用...

Global site tag (gtag.js) - Google Analytics