生产者消费者模式无疑是个重要的架构实现模式...
代码使用例子:
new DemoPC().startup();
//============================
import java.util.Arrays;
import java.util.List;
public class DemoPC extends ProducerConsumerSupport {
static class ProductWrapper implements Product {
String pid;
public ProductWrapper(String pid) {
this.pid = pid;
}
}
@Override
public void consume(Product product) {
System.out.println("正在消费产品【" + product + "】!");
}
@Override
public void handleConsumerException(Exception e, Product product) {
System.err.println("消费产品【" + product + "】失败!原因:" + e.getMessage());
}
@Override
public void handleProducerException(Exception e) {
System.err.println("生产失败...");
}
@Override
public List<Product> produce() {
return Arrays.asList(new Product[] { new ProductWrapper("p1"),
new ProductWrapper("p2"), new ProductWrapper("pn") });
}
}
/**
* 对生产者消费者模型的抽象
*
* @author gzit-team
*
*/
public abstract class ProducerConsumerSupport {
private int sleepIfEmptyProduct = 5 * 1000; // 没有取得产品时,休眠的时间
private int sleepIfExceptionRaise = 60 * 1000;// 如果出现异常工作线程做适当的休眠
private boolean sleepWhenExceptionRaise; // 出现异常时是否进行适当的休眠
private int producerSize = 1;// 生产者个数
private int consumerSize = 1;// 消费者个数
private int queenSize = 10; // 等待队列的大小
// 系统运行的几种状态
public final int STATE_INIT = 0;
public final int STATE_PAUSED = 1;
public final int STATE_RUNNING = 2;
public final int STATE_SHUTDOWN = 3;
// 系统的状态
private int state = STATE_INIT;
// 空的对象列表,做占位用
public static List<Product> EMPTY_PRODUCT_LIST = new ArrayList<Product>();
// 产品队列
private BlockingQueue<Product> produceQueen;
// 生产者,消费者队列
private List<Producer> produerList;
private List<Consumer> consumerList;
// 生产者
class Producer extends PeriodicalTask {
public Producer() {
this(0);
}
public Producer(int period) {
super(period);
}
// 可以在这里处理异常
public void handleException(Exception e) {
handleProducerException(e);
if (sleepWhenExceptionRaise) {
try {
Thread.sleep(sleepIfExceptionRaise);
} catch (InterruptedException e1) {
// ignore
}
}
}
@Override
protected void doWork() throws Exception {
List<Product> products = produce();
for (Product p : products) {
// 有可能发生阻塞
produceQueen.put(p);
}
// 如果生产者取得的数据为空,则适当进行休眠,以免出现狂操作
if (products.isEmpty()) {
Thread.sleep(sleepIfEmptyProduct);
}
}
}
// 消费者
class Consumer extends PeriodicalTask {
private Product curProduct;
public Consumer() {
this(0);
}
// 可以在这里处理异常
public void handleException(Exception e) {
handleConsumerException(e, this.curProduct);
if (sleepWhenExceptionRaise) {
try {
Thread.sleep(sleepIfExceptionRaise);
} catch (InterruptedException e1) {
// ignore
}
}
}
public Consumer(int period) {
super(period);
}
@Override
protected void doWork() throws Exception {
// 有可能发生阻塞
Product product = produceQueen.take();
curProduct = product;
consume(product);
}
}
// 由子类来实现生成和消费的具体逻辑
public abstract List<Product> produce();
public abstract void consume(Product product);
public abstract void handleProducerException(Exception e);
public abstract void handleConsumerException(Exception e, Product product);
// 启动之前做的事情
public void preStartup() {
}
public void startup() {
if (state == STATE_RUNNING) {
throw new IllegalStateException("系统已经处于运行状态,不能够再启动!state="
+ this.state);
}
this.preStartup();
// 创建相关的队列和线程
produceQueen = new LinkedBlockingQueue<Product>(this.queenSize);
produerList = new LinkedList<Producer>();
consumerList = new LinkedList<Consumer>();
for (int i = 0; i < this.producerSize; i++) {
Producer producer = new Producer();
produerList.add(producer);
producer.start();
}
for (int i = 0; i < this.consumerSize; i++) {
Consumer consumer = new Consumer();
consumerList.add(consumer);
consumer.start();
}
state = STATE_RUNNING;
}
// /////////////////////////几个管理接口/////////////////////
public void pause() {
if (state != STATE_RUNNING) {
throw new IllegalStateException("系统不处于运行状态,不能暂停!state="
+ this.state);
}
for (Producer p : produerList) {
p.pause();
}
for (Consumer c : consumerList) {
c.pause();
}
state = STATE_PAUSED;
}
public void resume() {
if (state != STATE_PAUSED) {
throw new IllegalStateException("系统不处于暂停状态,不能恢复!state="
+ this.state);
}
for (Producer p : produerList) {
p.resume();
}
for (Consumer c : consumerList) {
c.resume();
}
state = STATE_RUNNING;
}
public void shutdown() {
if (state != STATE_RUNNING) {
throw new IllegalStateException("系统不处于运行状态,不能关闭!state="
+ this.state);
}
for (Producer p : produerList) {
p.shutdown();
}
for (Consumer c : consumerList) {
c.shutdown();
}
state = STATE_SHUTDOWN;
}
// //////////////////////////////////////
public int getProducerSize() {
return producerSize;
}
public void setProducerSize(int producerSize) {
this.producerSize = producerSize;
}
public int getConsumerSize() {
return consumerSize;
}
public void setConsumerSize(int consumerSize) {
this.consumerSize = consumerSize;
}
public int getQueenSize() {
return queenSize;
}
public void setQueenSize(int queenSize) {
this.queenSize = queenSize;
}
public BlockingQueue<Product> getProduceQueen() {
return produceQueen;
}
public void setProduceQueen(BlockingQueue<Product> produceQueen) {
this.produceQueen = produceQueen;
}
public List<Producer> getProduerList() {
return produerList;
}
public void setProduerList(List<Producer> produerList) {
this.produerList = produerList;
}
public List<Consumer> getConsumerList() {
return consumerList;
}
public void setConsumerList(List<Consumer> consumerList) {
this.consumerList = consumerList;
}
public int getSleepIfEmptyProduct() {
return sleepIfEmptyProduct;
}
public void setSleepIfEmptyProduct(int sleepIfEmptyProduct) {
this.sleepIfEmptyProduct = sleepIfEmptyProduct;
}
public int getSleepIfExceptionRaise() {
return sleepIfExceptionRaise;
}
public void setSleepIfExceptionRaise(int sleepIfExceptionRaise) {
this.sleepIfExceptionRaise = sleepIfExceptionRaise;
}
public boolean isSleepWhenExceptionRaise() {
return sleepWhenExceptionRaise;
}
public void setSleepWhenExceptionRaise(boolean sleepWhenExceptionRaise) {
this.sleepWhenExceptionRaise = sleepWhenExceptionRaise;
}
}
/**
* 定時处理任务
*
* @author gzit-team
*
*/
public abstract class PeriodicalTask implements Runnable {
protected int period;
private AtomicBoolean running = new AtomicBoolean(false);
private AtomicBoolean paused = new AtomicBoolean(false);
// 定時寫統計結果的線程
private Thread taskThread;
private boolean daemon;
public PeriodicalTask(int period) {
this.period = period;
}
public void run() {
while (running.get() && !paused.get()) {
try {
Thread.sleep(this.period);
this.doWork();
} catch (Exception e) {
// 处理异常
try {
this.handleException(e);
} catch (Exception e1) {
// 异常处理再失败,则忽略掉
// 线程不能死...
}
}
}
}
public void shutdown() {
running.set(false);
try {
this.shutdownHook();
this.taskThread.interrupt();
} catch (Exception e) {
// ignore
// 必须要关的掉
}
}
protected void shutdownHook() {
}
public void pause() {
this.paused.set(true);
}
public void resume() {
this.paused.set(false);
}
public boolean isPaused() {
return paused.get();
}
public void start() {
this.running.set(true);
this.taskThread = new Thread(this);
this.taskThread.setDaemon(this.daemon);
this.taskThread.setName(this.getClass().getSimpleName());
this.taskThread.start();
}
// 可以在这里处理异常
public void handleException(Exception e) {
}
// 子类进行的具体工作
protected abstract void doWork() throws Exception;
public boolean isDaemon() {
return daemon;
}
public void setDaemon(boolean daemon) {
this.daemon = daemon;
}
}
分享到:
相关推荐
在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...
通过理解和掌握这些知识点,开发者能够有效地实现生产者-消费者模式,解决并发编程中的数据共享和协作问题。在实际项目中,这个模式常用于优化系统性能,尤其是在I/O密集型或计算密集型的应用中。
在Java中,我们可以使用BlockingQueue来实现生产者消费者模型,BlockingQueue是Queue的子类,它提供了一个线程安全的队列,可以用于生产者和消费者之间的数据传输。 BlockingQueue的实现类有多种,常见的有...
在Java中,使用wait() / notify()方法实现生产者消费者模式的一般步骤: 1. 定义共享资源,通常是一个容器,如上述代码中的LinkedList。 2. 使用synchronized关键字创建同步代码块,确保同一时间只有一个线程能访问...
在"阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip"这个压缩包中,很可能是详细介绍了如何使用Java的阻塞队列来构建生产者消费者模式,可能包括以下知识点: 1. **阻塞队列接口**:首先,会介绍`...
在这个"java生产者消费者demo"中,开发者使用了适配器模式来构建解决方案。适配器模式是一种设计模式,它允许不同接口的类协同工作,即使它们原本无法直接交互。在生产者消费者问题中,适配器模式可能被用来协调生产...
这个库提供了多种工具类,如Semaphore(信号量)、BlockingQueue(阻塞队列)和Condition(条件变量),这些都可以用来实现生产者-消费者模型。 1. **BlockingQueue**: 阻塞队列是一种特殊的队列,它具有线程安全的...
在Java中,我们可以使用`java.util.concurrent`包中的工具类来实现生产消费者模式。主要涉及以下关键组件: 1. **阻塞队列(BlockingQueue)**:作为生产者和消费者的共享缓冲区,如`ArrayBlockingQueue`、`...
基于Java的实现通常会利用阻塞队列(BlockingQueue)和生产者消费者模型来确保线程安全和高效率。在这个框架中,生产者负责生成任务或消息,而消费者则负责处理这些任务或消息。 ### Java 阻塞队列 Java阻塞队列是...
3. **Java并发工具类**:了解`java.util.concurrent`包下的高级并发工具类(如`BlockingQueue`)可以帮助更高效地实现生产者消费者模型。 4. **异常处理**:正确处理`InterruptedException`等异常对于保持程序的稳定...
在这个“精选_毕业设计_基于JAVA的生产者消费者问题_完整源码”项目中,开发者通过Java语言实现了一个解决方案,旨在教授如何在实际应用中处理这种问题。以下是对该项目的详细解析: 生产者消费者问题是多线程编程...
这个名为"生产者和消费者"的压缩包文件可能包含了实现上述逻辑的Java代码示例,对于初学者来说,通过阅读和理解这些代码,可以深入学习Java多线程编程和并发控制,进一步掌握生产者消费者模式的实现。
它内置了生产者-消费者模式的实现,当队列满时,生产者会自动阻塞;当队列空时,消费者也会被阻塞。这可能是解决此问题的一个有效方法,比如使用`ArrayBlockingQueue`。 3. **等待/通知机制**:在Java中,`Object`...
`ProducerAndConsumer.zip`中的代码可能包含一个简单的Java实现,生产者类(Producer)和消费者类(Consumer)分别继承自Thread类,它们共享一个`BlockingQueue`实例。生产者类的run方法不断生成数据并调用`queue....
6. **Java并发库(java.util.concurrent)**:Java提供了强大的并发库,包括`BlockingQueue`接口和其实现如`ArrayBlockingQueue`,它们内置了线程安全的队列操作,使得生产者消费者模式的实现更为简洁。`...
在Java中,我们可以使用`java.util.concurrent`包中的`BlockingQueue`接口及其实现类来构建生产者消费者模型。`BlockingQueue`是一种线程安全的数据结构,它提供了一种在生产者和消费者之间同步的方式。当队列为空时...
在Java中,实现这种消费者生产者模型可以使用`java.util.concurrent`包下的工具,比如`BlockingQueue`接口及其实现,如`ArrayBlockingQueue`。生产者会调用`put`方法添加元素,而消费者使用`take`方法获取元素。为了...
在多线程环境下,BlockingQueue能够有效地实现生产者-消费者模式,提高了程序的并发性能和效率。本文将深入探讨BlockingQueue的使用、特性以及常见操作。 首先, BlockingQueue接口位于`java.util.concurrent`包下...
BlockingQueue是Java并发编程中非常重要的一个接口,它在`java.util.concurrent`包下,是线程安全的队列,主要用于解决生产者-消费者问题。BlockingQueue的主要特点是当队列满时,生产者线程会被阻塞,直到队列有...