`
毕竟红尘
  • 浏览: 12772 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

基于BlockingQueue的生产者消费者模式实现(即用型)

阅读更多

生产者消费者模式无疑是个重要的架构实现模式...

代码使用例子:


new DemoPC().startup();

//============================
import java.util.Arrays;
import java.util.List;

public class DemoPC extends ProducerConsumerSupport {

	static class ProductWrapper implements Product {
		String pid;

		public ProductWrapper(String pid) {
			this.pid = pid;
		}

	}

	@Override
	public void consume(Product product) {

		System.out.println("正在消费产品【" + product + "】!");

	}

	@Override
	public void handleConsumerException(Exception e, Product product) {
		System.err.println("消费产品【" + product + "】失败!原因:" + e.getMessage());

	}

	@Override
	public void handleProducerException(Exception e) {
		System.err.println("生产失败...");

	}

	@Override
	public List<Product> produce() {

		return Arrays.asList(new Product[] { new ProductWrapper("p1"),
				new ProductWrapper("p2"), new ProductWrapper("pn") });
	}

}











/**
 * 对生产者消费者模型的抽象
 * 
 * @author gzit-team
 * 
 */

public abstract class ProducerConsumerSupport {

	private int sleepIfEmptyProduct = 5 * 1000; // 没有取得产品时,休眠的时间
	private int sleepIfExceptionRaise = 60 * 1000;// 如果出现异常工作线程做适当的休眠
	private boolean sleepWhenExceptionRaise; // 出现异常时是否进行适当的休眠
	private int producerSize = 1;// 生产者个数
	private int consumerSize = 1;// 消费者个数
	private int queenSize = 10; // 等待队列的大小

	// 系统运行的几种状态
	public final int STATE_INIT = 0;
	public final int STATE_PAUSED = 1;
	public final int STATE_RUNNING = 2;
	public final int STATE_SHUTDOWN = 3;

	// 系统的状态
	private int state = STATE_INIT;

	// 空的对象列表,做占位用
	public static List<Product> EMPTY_PRODUCT_LIST = new ArrayList<Product>();

	// 产品队列
	private BlockingQueue<Product> produceQueen;

	// 生产者,消费者队列
	private List<Producer> produerList;
	private List<Consumer> consumerList;

	// 生产者
	class Producer extends PeriodicalTask {

		public Producer() {
			this(0);
		}

		public Producer(int period) {
			super(period);

		}

		// 可以在这里处理异常
		public void handleException(Exception e) {
			handleProducerException(e);
			if (sleepWhenExceptionRaise) {
				try {
					Thread.sleep(sleepIfExceptionRaise);
				} catch (InterruptedException e1) {
					// ignore
				}
			}
		}

		@Override
		protected void doWork() throws Exception {

			List<Product> products = produce();
			for (Product p : products) {

				// 有可能发生阻塞
				produceQueen.put(p);
			}

			// 如果生产者取得的数据为空,则适当进行休眠,以免出现狂操作
			if (products.isEmpty()) {
				Thread.sleep(sleepIfEmptyProduct);

			}

		}

	}

	// 消费者
	class Consumer extends PeriodicalTask {

		private Product curProduct;

		public Consumer() {
			this(0);
		}

		// 可以在这里处理异常
		public void handleException(Exception e) {
			handleConsumerException(e, this.curProduct);
			if (sleepWhenExceptionRaise) {
				try {
					Thread.sleep(sleepIfExceptionRaise);
				} catch (InterruptedException e1) {
					// ignore
				}
			}
		}

		public Consumer(int period) {
			super(period);

		}

		@Override
		protected void doWork() throws Exception {

			// 有可能发生阻塞
			Product product = produceQueen.take();
			curProduct = product;
			consume(product);

		}

	}

	// 由子类来实现生成和消费的具体逻辑
	public abstract List<Product> produce();

	public abstract void consume(Product product);

	public abstract void handleProducerException(Exception e);

	public abstract void handleConsumerException(Exception e, Product product);

	// 启动之前做的事情
	public void preStartup() {

	}

	public void startup() {

		if (state == STATE_RUNNING) {
			throw new IllegalStateException("系统已经处于运行状态,不能够再启动!state="
					+ this.state);
		}

		this.preStartup();

		// 创建相关的队列和线程
		produceQueen = new LinkedBlockingQueue<Product>(this.queenSize);
		produerList = new LinkedList<Producer>();
		consumerList = new LinkedList<Consumer>();
		for (int i = 0; i < this.producerSize; i++) {
			Producer producer = new Producer();
			produerList.add(producer);
			producer.start();
		}

		for (int i = 0; i < this.consumerSize; i++) {
			Consumer consumer = new Consumer();
			consumerList.add(consumer);
			consumer.start();
		}

		state = STATE_RUNNING;

	}

	// /////////////////////////几个管理接口/////////////////////
	public void pause() {

		if (state != STATE_RUNNING) {
			throw new IllegalStateException("系统不处于运行状态,不能暂停!state="
					+ this.state);

		}

		for (Producer p : produerList) {
			p.pause();
		}

		for (Consumer c : consumerList) {
			c.pause();
		}

		state = STATE_PAUSED;

	}

	public void resume() {

		if (state != STATE_PAUSED) {
			throw new IllegalStateException("系统不处于暂停状态,不能恢复!state="
					+ this.state);
		}

		for (Producer p : produerList) {
			p.resume();
		}

		for (Consumer c : consumerList) {
			c.resume();
		}

		state = STATE_RUNNING;

	}

	public void shutdown() {

		if (state != STATE_RUNNING) {
			throw new IllegalStateException("系统不处于运行状态,不能关闭!state="
					+ this.state);

		}

		for (Producer p : produerList) {
			p.shutdown();
		}

		for (Consumer c : consumerList) {
			c.shutdown();
		}

		state = STATE_SHUTDOWN;

	}

	// //////////////////////////////////////

	public int getProducerSize() {
		return producerSize;
	}

	public void setProducerSize(int producerSize) {
		this.producerSize = producerSize;
	}

	public int getConsumerSize() {
		return consumerSize;
	}

	public void setConsumerSize(int consumerSize) {
		this.consumerSize = consumerSize;
	}

	public int getQueenSize() {
		return queenSize;
	}

	public void setQueenSize(int queenSize) {
		this.queenSize = queenSize;
	}

	public BlockingQueue<Product> getProduceQueen() {
		return produceQueen;
	}

	public void setProduceQueen(BlockingQueue<Product> produceQueen) {
		this.produceQueen = produceQueen;
	}

	public List<Producer> getProduerList() {
		return produerList;
	}

	public void setProduerList(List<Producer> produerList) {
		this.produerList = produerList;
	}

	public List<Consumer> getConsumerList() {
		return consumerList;
	}

	public void setConsumerList(List<Consumer> consumerList) {
		this.consumerList = consumerList;
	}

	public int getSleepIfEmptyProduct() {
		return sleepIfEmptyProduct;
	}

	public void setSleepIfEmptyProduct(int sleepIfEmptyProduct) {
		this.sleepIfEmptyProduct = sleepIfEmptyProduct;
	}

	public int getSleepIfExceptionRaise() {
		return sleepIfExceptionRaise;
	}

	public void setSleepIfExceptionRaise(int sleepIfExceptionRaise) {
		this.sleepIfExceptionRaise = sleepIfExceptionRaise;
	}

	public boolean isSleepWhenExceptionRaise() {
		return sleepWhenExceptionRaise;
	}

	public void setSleepWhenExceptionRaise(boolean sleepWhenExceptionRaise) {
		this.sleepWhenExceptionRaise = sleepWhenExceptionRaise;
	}

}






/**
 * 定時处理任务
 * 
 * @author gzit-team
 * 
 */
public abstract class PeriodicalTask implements Runnable {
	protected int period;
	private AtomicBoolean running = new AtomicBoolean(false);
	private AtomicBoolean paused = new AtomicBoolean(false);

	// 定時寫統計結果的線程
	private Thread taskThread;
	private boolean daemon;

	public PeriodicalTask(int period) {
		this.period = period;
	}

	public void run() {

		
		while (running.get() && !paused.get()) {
			try {
				Thread.sleep(this.period);
				this.doWork();
			} catch (Exception e) {
				// 处理异常
				try {
					this.handleException(e);
				} catch (Exception e1) {
					// 异常处理再失败,则忽略掉
					// 线程不能死...

				}
			}
		}
	}

	public void shutdown() {
		running.set(false);
		try {
			this.shutdownHook();
			this.taskThread.interrupt();
		} catch (Exception e) {
			// ignore
			// 必须要关的掉

		}
	}

	protected void shutdownHook() {

	}

	public void pause() {
		this.paused.set(true);
	}

	public void resume() {
		this.paused.set(false);
	}

	public boolean isPaused() {
		return paused.get();
	}

	public void start() {
		this.running.set(true);
		this.taskThread = new Thread(this);
		this.taskThread.setDaemon(this.daemon);
		this.taskThread.setName(this.getClass().getSimpleName());
		this.taskThread.start();
	}

	// 可以在这里处理异常
	public void handleException(Exception e) {

	}

	// 子类进行的具体工作
	protected abstract void doWork() throws Exception;

	public boolean isDaemon() {
		return daemon;
	}

	public void setDaemon(boolean daemon) {
		this.daemon = daemon;
	}
}





分享到:
评论

相关推荐

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    java多线程实现生产者和消费者

    通过理解和掌握这些知识点,开发者能够有效地实现生产者-消费者模式,解决并发编程中的数据共享和协作问题。在实际项目中,这个模式常用于优化系统性能,尤其是在I/O密集型或计算密集型的应用中。

    Java多线程 BlockingQueue实现生产者消费者模型详解

    在Java中,我们可以使用BlockingQueue来实现生产者消费者模型,BlockingQueue是Queue的子类,它提供了一个线程安全的队列,可以用于生产者和消费者之间的数据传输。 BlockingQueue的实现类有多种,常见的有...

    java生产者消费者模式

    在Java中,使用wait() / notify()方法实现生产者消费者模式的一般步骤: 1. 定义共享资源,通常是一个容器,如上述代码中的LinkedList。 2. 使用synchronized关键字创建同步代码块,确保同一时间只有一个线程能访问...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    在"阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip"这个压缩包中,很可能是详细介绍了如何使用Java的阻塞队列来构建生产者消费者模式,可能包括以下知识点: 1. **阻塞队列接口**:首先,会介绍`...

    java生产者消费者demo

    在这个"java生产者消费者demo"中,开发者使用了适配器模式来构建解决方案。适配器模式是一种设计模式,它允许不同接口的类协同工作,即使它们原本无法直接交互。在生产者消费者问题中,适配器模式可能被用来协调生产...

    java 编写的生产者与消费者问题

    这个库提供了多种工具类,如Semaphore(信号量)、BlockingQueue(阻塞队列)和Condition(条件变量),这些都可以用来实现生产者-消费者模型。 1. **BlockingQueue**: 阻塞队列是一种特殊的队列,它具有线程安全的...

    Java 生产消费者模式

    在Java中,我们可以使用`java.util.concurrent`包中的工具类来实现生产消费者模式。主要涉及以下关键组件: 1. **阻塞队列(BlockingQueue)**:作为生产者和消费者的共享缓冲区,如`ArrayBlockingQueue`、`...

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    基于Java的实现通常会利用阻塞队列(BlockingQueue)和生产者消费者模型来确保线程安全和高效率。在这个框架中,生产者负责生成任务或消息,而消费者则负责处理这些任务或消息。 ### Java 阻塞队列 Java阻塞队列是...

    生产者消费者 java

    3. **Java并发工具类**:了解`java.util.concurrent`包下的高级并发工具类(如`BlockingQueue`)可以帮助更高效地实现生产者消费者模型。 4. **异常处理**:正确处理`InterruptedException`等异常对于保持程序的稳定...

    精选_毕业设计_基于JAVA的生产者消费者问题_完整源码

    在这个“精选_毕业设计_基于JAVA的生产者消费者问题_完整源码”项目中,开发者通过Java语言实现了一个解决方案,旨在教授如何在实际应用中处理这种问题。以下是对该项目的详细解析: 生产者消费者问题是多线程编程...

    java生产者消费者

    这个名为"生产者和消费者"的压缩包文件可能包含了实现上述逻辑的Java代码示例,对于初学者来说,通过阅读和理解这些代码,可以深入学习Java多线程编程和并发控制,进一步掌握生产者消费者模式的实现。

    操作系统课程设计多线程 生产者消费者问题

    它内置了生产者-消费者模式的实现,当队列满时,生产者会自动阻塞;当队列空时,消费者也会被阻塞。这可能是解决此问题的一个有效方法,比如使用`ArrayBlockingQueue`。 3. **等待/通知机制**:在Java中,`Object`...

    生产者与消费者实例代码,线程学习必备

    `ProducerAndConsumer.zip`中的代码可能包含一个简单的Java实现,生产者类(Producer)和消费者类(Consumer)分别继承自Thread类,它们共享一个`BlockingQueue`实例。生产者类的run方法不断生成数据并调用`queue....

    线程间资源互斥与通信(生产者消费者)

    6. **Java并发库(java.util.concurrent)**:Java提供了强大的并发库,包括`BlockingQueue`接口和其实现如`ArrayBlockingQueue`,它们内置了线程安全的队列操作,使得生产者消费者模式的实现更为简洁。`...

    生产者消费者(java)

    在Java中,我们可以使用`java.util.concurrent`包中的`BlockingQueue`接口及其实现类来构建生产者消费者模型。`BlockingQueue`是一种线程安全的数据结构,它提供了一种在生产者和消费者之间同步的方式。当队列为空时...

    消费者生产者程序 1到100间 按顺序 进,乱序有规律出

    在Java中,实现这种消费者生产者模型可以使用`java.util.concurrent`包下的工具,比如`BlockingQueue`接口及其实现,如`ArrayBlockingQueue`。生产者会调用`put`方法添加元素,而消费者使用`take`方法获取元素。为了...

    BlockingQueue的使用

    在多线程环境下,BlockingQueue能够有效地实现生产者-消费者模式,提高了程序的并发性能和效率。本文将深入探讨BlockingQueue的使用、特性以及常见操作。 首先, BlockingQueue接口位于`java.util.concurrent`包下...

    简单实现BlockingQueue,BlockingQueue源码详解

    BlockingQueue是Java并发编程中非常重要的一个接口,它在`java.util.concurrent`包下,是线程安全的队列,主要用于解决生产者-消费者问题。BlockingQueue的主要特点是当队列满时,生产者线程会被阻塞,直到队列有...

Global site tag (gtag.js) - Google Analytics