在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。
一个典型的生产者-消费者模型如下:
在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。
对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。
更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。
于是一个高效的支持阻塞的生产消费模型就实现了。
等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?
我们来看一下ThreadPoolExecutor的基本结构:
可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。
但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:
public void execute(Runnable command){ if(command ==null) thrownewNullPointerException(); if(poolSize >= corePoolSize ||!addIfUnderCorePoolSize(command)){ if(runState == RUNNING && workQueue.offer(command)){ if(runState != RUNNING || poolSize ==0) ensureQueuedTaskHandled(command); } elseif(!addIfUnderMaximumPoolSize(command)) reject(command);// is shutdown or saturated } }
这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。
关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。
线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。
几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。
public static class CallerRunsPolicy implements RejectedExecutionHandler{ /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy(){} /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r,ThreadPoolExecutor e){ if(!e.isShutdown()){ r.run(); } } }
但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。
参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:
new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r,ThreadPoolExecutor executor){ if(!executor.isShutdown()){ try{ executor.getQueue().put(r); }catch(InterruptedException e){ // should not be interrupted } } } };
这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。
相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。
转自:http://blog.hesey.net/2013/04/blocking-threadpool-executor.html
相关推荐
通过这种方式,我们创建了一个支持生产阻塞的Java线程池,当生产者试图向队列添加任务而队列已满时,生产者线程会被阻塞,直到队列中有空闲位置。这样的设计有助于平衡生产者和消费者的处理速度,避免资源浪费和可能...
例如,生产者-消费者模型中,阻塞队列能很好地协调生产者和消费者的速率。 线程池与队列结合使用时,可以实现高效的任务调度和资源管理。线程池中的线程从队列中取出任务执行,当队列为空时,线程可能会进入等待...
4. **PriorityBlockingQueue**:支持优先级的无界阻塞队列,按照自然顺序或自定义比较器进行排序。 使用`ThreadPoolExecutor` 创建线程池的基本代码如下: ```java int corePoolSize = 5; int maximumPoolSize = ...
在学习这些知识时,初学者可以通过创建简单的多线程程序来实践,比如实现一个生产者消费者模型,或者利用线程池处理并发请求。对于XML解析,可以尝试读取和解析配置文件,或者通过XML与Java对象之间的绑定进行数据...
线程池的设计思路与生产者-消费者模型类似,通过一个任务队列作为缓冲区,线程从队列中取出任务执行,完成后再等待新任务。这种设计可以有效地平衡任务提交速度和处理速度,避免资源浪费。 在《阿里巴巴 Java 开发...
- 线程池维护一定数量的线程,可接收和并发执行任意数量的任务,类似生产者/消费者模式。 - Java的`Executor`接口提供线程池实现,但这里先不介绍,而是自定义实现以加深理解。 3. **自定义线程池设计**: - `...
标题中的“支持多线程和泛型的阻塞队列”意味着我们讨论的是一个能够同时处理多个线程并能存储不同类型数据的队列实现。 ### 阻塞队列的基本概念 阻塞队列(Blocking Queue)是线程安全的数据结构,它结合了队列的...
- **newScheduledThreadPool**:创建一个支持定时及周期性的任务执行的线程池。 #### Executors的缺陷 虽然`Executors`提供了方便的线程池创建方式,但是它有一些明显的缺陷,例如使用`newCachedThreadPool`创建...
Java线程池会将提交的任务先置于工作队列中,在从工作队列中获取(SynchronousQueue直接由生产者提交给工作线程)。那么工作队列就有两种实现策略:无界队列和有界队列。 无界队列不存在饱和的问题,但是其问题是当...
阻塞队列的主要特点在于它支持两个额外的条件操作:当队列为空时,尝试从队列中取元素的操作会被阻塞,直到队列中出现新的元素;同样地,当队列已满时,尝试向队列中添加元素的操作也会被阻塞,直到队列中出现可用...
在Java中,我们可以使用`BlockingQueue`(阻塞队列)来实现生产者和消费者之间的同步。`BlockingQueue`是线程安全的,它内部已经实现了`wait()`和`notify()`的逻辑,可以避免数据错误问题。例如,`...
阻塞队列常用于生产者-消费者模式中,能够实现线程间的协作,保证资源的同步和一致性的操作。 并发队列是支持多线程并发访问的队列。在多线程环境中,多个线程可能会同时对同一个队列进行操作,如同时入队或出队,...
在IT领域,多线程是并发编程中的一个关键概念,特别是在Java等支持多线程的编程语言中。生产者与消费者模式是设计模式中的经典范例,它有效地展示了线程间的协作和同步。这个模式主要解决的问题是数据的生产和消费...
【MySQL线程池插件详解】 在MySQL数据库中,高并发请求可能会导致大量的线程创建与销毁,这在一定程度上会消耗系统资源并影响性能。为了解决这一问题,MySQL引入了线程池插件,它允许预先创建一组线程,并在处理...
1. `BlockingQueue`接口:这是生产者消费者模式的核心,它提供了一种线程安全的队列,支持阻塞的插入(put)和移除(take)操作。当队列满时,生产者线程尝试插入元素会被阻塞,直到有消费者消费;当队列空时,消费...
- **策略选择**:线程池对于超出容量的任务有不同的处理策略,包括但不限于拒绝策略(如抛出异常)、等待策略(如阻塞队列)以及队列策略(如固定大小队列)。 - **阻塞队列**:阻塞队列是一种特殊的队列,当队列...
阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...
在Java中,synchronized关键字和java.util.concurrent.locks包下的Lock接口(如ReentrantLock)提供了锁支持。synchronized提供互斥访问,确保同一时间只有一个线程执行特定代码块;而Lock接口则提供了更细粒度的...
`BlockingQueue`是一个线程安全的数据结构,它支持阻塞操作,即当队列满时,生产者会阻塞,直到有空间可供插入;当队列空时,消费者会阻塞,直到有数据可取。 首先,我们需要创建一个生产者类,它将生成数据并将其...
`BlockingQueue`是一个线程安全的数据结构,它支持阻塞插入(put)和删除(take)操作,当队列满时,生产者线程会阻塞直到队列有空位;当队列为空时,消费者线程会阻塞直到有元素可消费。这种机制使得生产者和消费者...