背景:
A系统向activemq发送消息,B系统以监听的方式从activemq接收消息,因为这些消息都是转换文件,是CPU消耗型的服务,而服务器都是多CPU,为了充分利用CPU资源,B系统以多线程方式处理消息,这里用到了线程池,假设线程池最大线程数量是8(和CPU数量相等),但是在接收消息的时候,发现B系统把所有的消息都接收下来,放在了线程池的队列中,这样就产生问题了,如果B系统down掉的话,所有的消息都会丢失。
其实我想要的效果是,如果线程池中工作队列里的任务数量大于一定值的时候,B系统的消息接收监听器就阻塞,不要再从activemq中接收消息,通过查看线程池ThreadPoolExecutor源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
发现工作队列BlockingQueue在插入任务的时候,执行的是workQueue.offer(),该函数并不会阻塞插入任务的操作,其实应该用workQueue.put()函数,这个函数会产生等待,直到工作队列BlockingQueue中的任务数量下降。
解决方案:
继承ArrayBlockingQueue,重写offer函数,代码如下
package cn.sh.ideal.pool;
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueueReplaceOffer<E> extends ArrayBlockingQueue<E> {
public ArrayBlockingQueueReplaceOffer(int capacity) {
super(capacity);
}
@Override
public boolean offer(E e){
try {
super.put(e);
return true;
} catch (InterruptedException e1) {
e1.printStackTrace();
return false;
}
}
}
在初始化线程池的时候,使用ArrayBlockingQueueReplaceOffer,这样就达到了阻塞的目的
new ThreadPoolExecutor(8, 8,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueueReplaceOffer <Runnable>(8))
已经有人把阻塞式的处理请求提交到了 Java Bug 数据库(Bug Id 6648211,“ThreadPoolExecutor 特性需求
”)
分享到:
相关推荐
Java线程池工作队列饱和策略是Java并发编程中的一种重要机制,用于处理线程池中工作队列的饱和问题。在本文中,我们将详细介绍Java线程池工作队列饱和策略的概念、原理和实现。 线程池(Thread Pool)是并行执行...
- `handler`: 饱和策略,当线程池无法接受新任务时的处理方式,包括AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy等。 例如,创建一个固定大小的线程池可以使用`Executors....
- **RejectedExecutionHandler**:饱和策略,处理线程池饱和时的新任务。 3. **线程池的执行策略** - **AbortPolicy**:默认策略,抛出`RejectedExecutionException`异常。 - **CallerRunsPolicy**:调用者线程...
- **饱和策略**:当所有线程都在工作且任务队列也已满时,线程池会根据配置的饱和策略来处理无法接收的任务。 - **线程销毁**:当任务减少时,超过核心线程数的空闲线程会在一定时间内被销毁,而核心线程则会一直...
4. 如果线程池已满并且阻塞队列也满,就会执行饱和策略(handler,也称为拒绝策略),这可以是抛出异常、忽略任务、调用系统默认处理或自定义处理方式。 Java提供了多种类型的线程池供选择,包括`...
- handler:饱和策略处理器,当线程池无法处理新提交的任务时,按照既定的策略处理,常见的有AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。 线程池的执行流程涉及execute()方法,当提交...
8. `handler`是线程池的饱和策略,当线程池和队列都满时,用来处理被拒绝的任务。 `FutureTask`是一个可以取消的异步计算任务,它实现了`RunnableFuture`接口,提供了获取计算结果和取消任务的功能。它通常与线程池...
4. **饱和策略(Rejected Execution Handler)**:当线程池和队列都满载时,新提交的任务处理策略,常见的策略有抛弃任务、抛弃最旧任务、抛出异常等。 配置线程池时,我们需要关注以下几点: - **线程池大小**:...
- **线程饱和策略**:当任务队列满时,线程池可以采取拒绝策略,如丢弃任务、阻塞提交任务的线程或创建新线程。 3. **C++实现线程池的关键技术**: - **线程库**:C++11及更高版本提供了标准线程库 `<thread>`,...
- 使用适当的拒绝策略,如AbortPolicy、CallerRunsPolicy、DiscardPolicy或DiscardOldestPolicy,处理线程池饱和情况。 6. **注意事项**: - 确保线程安全:在多线程环境下,共享数据的访问需要同步控制,避免...
线程池可以根据需求调整大小,处理任务积压,甚至设置饱和策略,如拒绝新任务或丢弃旧任务。 - **锁机制(Locks)**:除了synchronized,框架提供了更灵活的锁,如`ReentrantLock`,支持公平/非公平获取、可中断...
2. 应用软件处理任务和线程/进程关系的方式,包括每任务1进程、每任务1线程、单线程、多任务共享线程池以及一些更复杂的变种方案。 具体到常用的经典策略,有以下几种: 1. 每个客户端使用一个线程或进程,并使用...
通常,我们会通过Executors工厂类来创建ExecutorService实例,这样可以方便地定制线程池的行为,如饱和策略、预创建线程的数量、线程存活时间等。 除了线程池,Java.util.concurrent还包含了一系列的并发工具类,如...
4. 当添加任务大于 maximumPoolSize 时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是 AbortPolicy(直接丢弃) 阻塞队列 ThreadPoolExecutor 中使用的阻塞队列有三种: * ...
通过线程池,可以避免频繁创建和销毁线程带来的开销,同时可以配置线程池的拒绝策略,应对任务提交过快导致的资源饱和问题。 五、Java并发工具 Java提供了一系列并发工具类,如`Semaphore`(信号量)、`...
饱和策略 AbortPolicy DiscardPolicy DiscardOldestPolicy CallerRunsPolicy 线程工厂 在调用构造函数后再定制ThreadPoolExecutor 扩展 ThreadPoolExecutor afterExecute(Runnable r,...
- **`BlockingQueue`**:提供了一种阻塞式的队列操作,常用于线程池的工作队列。 - **`CopyOnWriteArrayList`**:线程安全的动态数组实现,适合读多写少的场景。 #### 六、案例分析 - **生产者消费者模式**:利用`...
如果队列也满了,可以选择拒绝新任务,或者使用饱和策略处理。 ### Tomcat参数调整 Tomcat作为常用的Servlet容器,其参数调整涉及性能优化。例如,调整最大连接数、最大线程数等参数,以适应应用程序的需求。 ###...
ROSA通过推迟处理对已饱和共享资源请求的方式来减少线程阻塞。在这种架构下,如果Web容器在接收到来自Web组件的请求时,发现某些共享资源已经被其他线程饱和占用,则ROSA体系结构会延迟这些请求的处理,等待资源可用...