`
ftj20003
  • 浏览: 132363 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

LinkedBlockingQueue应用--生产消费模型简单实现

    博客分类:
  • Java
阅读更多
    之前介绍时LinkedBlockingQueue提到了Queue主要应用于“生产消费模型”的实现。在尝试分析ConcurrentLinkedQueue之前,写了个简陋的“生产消费模型”的实现。分享的同时加深对LinkedBlockingQueue的印象,顺便再说说其特性:
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author: yanxuxin
 * @date: 2010-1-25
 */
public class ModelSample {
	/** 线程池提交的任务数*/
	private final int taskNum = Runtime.getRuntime().availableProcessors() + 1;
	/** 用于多线程间存取产品的队列*/
	private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(16);
	/** 记录产量*/
	private final AtomicLong output = new AtomicLong(0);
	/** 记录销量*/
	private final AtomicLong sales = new AtomicLong(0);
	/** 简单的线程起步开关*/
	private final CountDownLatch latch = new CountDownLatch(1);
	/** 停产后是否售完队列内的产品的选项*/
	private final boolean clear;
	/** 用于提交任务的线程池*/
	private final ExecutorService pool;
	/** 简陋的命令发送器*/
	private Scanner scanner;

	public ModelSample(boolean clear) {
		this.pool = Executors.newCachedThreadPool();
		this.clear = clear;
	}

	/**
	 * 提交生产和消费任务给线程池,并在准备完毕后等待终止命令
	 */
	public void service() {
		doService();
		waitCommand();
	}
	
	/**
	 * 提交生产和消费任务给线程池,并在准备完毕后同时执行
	 */
	private void doService() {
		for (int i = 0; i < taskNum; i++) {
			if (i == 0) {
				pool.submit(new Worker(queue, output, latch));
			}
			else {
				pool.submit(new Seller(queue, sales, latch, clear));
			}
		}
		latch.countDown();//开闸放狗,线程池内的线程正式开始工作
	}

	/**
	 * 接收来自终端输入的终止命令
	 */
	private void waitCommand() {
		scanner = new Scanner(System.in);
		while (!scanner.nextLine().equals("q")) {
			try {
				Thread.sleep(500);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		scanner.close();
		destory();
	}

	/**
	 * 停止一切生产和销售的线程
	 */
	private void destory() {
		pool.shutdownNow(); //不再接受新任务,同时试图中断池内正在执行的任务
		while (clear && queue.size() > 0) {
			try {
				Thread.sleep(500);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("Products:" + output.get() + "; Sales:" + sales.get());
	}

	public static void main(String[] args) {
		ModelSample model = new ModelSample(false);
		model.service();
	}
}

/**
 * 生产者
 * @author: yanxuxin
 * @date: 2010-1-25
 */
class Worker implements Runnable {

	/** 假想的产品*/
	private static final String PRODUCT = "Thinkpad";
	private final LinkedBlockingQueue<String> queue;
	private final CountDownLatch latch;
	private final AtomicLong output;

	public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch) {
		this.output = output;
		this.queue = queue;
		this.latch = latch;
	}

	public void run() {
		try {
			latch.await(); // 放闸之前老实的等待着
			for (;;) {
				doWork();
				Thread.sleep(100);
			}
		}
		catch (InterruptedException e) {
			System.out.println("Worker thread will be interrupted...");
		}
	}

	private void doWork() throws InterruptedException {
		boolean success = queue.offer(PRODUCT, 100, TimeUnit.MILLISECONDS);
		if (success) {
			output.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数
			// 可以在此处生成记录日志
		}
	}
}

/**
 * 消费者
 * @author: yanxuxin
 * @date: 2010-1-25
 */
class Seller implements Runnable {

	private final LinkedBlockingQueue<String> queue;
	private final AtomicLong sales;
	private final CountDownLatch latch;
	private final boolean clear;

	public Seller(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) {
		this.queue = queue;
		this.sales = sales;
		this.latch = latch;
		this.clear = clear;
	}

	public void run() {
		try {
			latch.await(); // 放闸之前老实的等待着
			for (;;) {
				sale();
				Thread.sleep(500);
			}
		}
		catch (InterruptedException e) {
			if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程
				cleanWarehouse();
			}
			else {
				System.out.println("Seller Thread will be interrupted...");
			}
		}
	}

	private void sale() throws InterruptedException {
		String item = queue.poll(50, TimeUnit.MILLISECONDS);
		if (item != null) {
			sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数
			// 可以在此处生成记录日志
		}
	}

	/**
	 * 销售完队列剩余的产品
	 */
	private void cleanWarehouse() {
		try {
			while (queue.size() > 0) {
				sale();
			}
		}
		catch (InterruptedException ex) {
			System.out.println("Seller Thread will be interrupted...");
		}
	}
}

    ModelSample是主控,负责生产消费的调度,Worker和Seller分别作为生产者和消费者,并均实现Runnable接口。ModelSample构造器内的参数clear主要是用于:当主控调用线程池的shutdownNow()方法时,会给池内的所有线程发送中断信号,使得线程的中断标志置位。这时候对应的Runnable的run()方法使用响应中断的LinkedBlockingQueue的方法(入队,出队)时就会抛出InterruptedException异常,生产者线程对这个异常的处理是记录信息后终止任务。而消费者线程是记录信息后终止任务,还是消费完队列内的产品再终止任务,则取决于这个选项值。

    多线程的一个难点在于适当得销毁线程,这里得益于LinkedBlockingQueue的入队和出队的操作均提供响应中断的API,使得控制起来相对的简单一点。在Worker和Seller中共享LinkedBlockingQueue的实例queue时,我没有使用put或者take在queue满和空状态时无限制的阻塞线程,而是使用offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)在指定的timeout时间内满足条件时阻塞线程。主要因为在于:先中断生产线程的情况下,如果所有的消费线程之前均被扔到等待集,那么无法它们将被唤醒。而后两者在超时后将自行恢复可运行状态。

    再者看看queue的size()方法,这也是选择LinkedBlockingQueue而不选ArrayBlockingQueue作为阻塞队列的原因。因为前者使用的AtomicInteger的count.get()返回最新值,完全无锁;而后者则需要获取唯一的锁,在此期间无法进行任何出队,入队操作。而这个例子中clear==true时,主线程和所有的消费线程均需要使用size()方法检查queue的元素个数。这类的非业务操作本就不该影响别的操作,所以这里LinkedBlockingQueue使用AtomicInteger计数无疑是个优秀的设计。

    另外编写这个例子时有点玩票的用了CountDownLatch,它的作用很简单。countDown()方法内部计数不为0时,执行了其await()方法的线程将会阻塞等待;一旦计数为0,这些线程将恢复可运行状态继续执行。这里用它就像一个发令枪,线程池submit任务的新线程在run内被阻塞,主线程一声令下countDown!这些生产消费线程均恢复执行状态。最后就是命令的实现过于简陋了,如果要响应其他的命令的话可以改造成响应事件处理的观察者模式,不过它不是演示的重点就从简了。后面就是试着写写ConcurrentLinkedQueue的分析了。
4
0
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Java线程间的通信----生产者消费者模型

    在Java编程中,线程间的通信是多线程编程中的一个重要概念,特别是在处理并发和协作任务时。生产者消费者模型是一种经典的...在实际项目中,生产者消费者模型被广泛应用于缓存管理、数据库批量操作、消息队列等场景。

    线程同步--生产者消费者问题

    其中,`BlockingQueue`是一个非常合适的数据结构,它可以自然地实现生产者消费者模型。`BlockingQueue`提供了线程安全的插入(put)和移除(take)操作,当队列为空时,`take()`操作会阻塞消费者线程,直到有新的...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    linkedblockingqueue

    1. 生产者-消费者模型:`LinkedBlockingQueue`是构建生产者-消费者模型的理想选择,生产者将任务放入队列,消费者从队列中取出并执行。 2. 并行任务调度:例如在多线程服务中,`LinkedBlockingQueue`可以用来存储待...

    java生产者消费者模型

    Java中实现生产者消费者模型通常会用到以下关键概念和技术: 1. **阻塞队列(BlockingQueue)**:这是生产者消费者模型的核心组件,用于存储待消费的数据。Java的`java.util.concurrent`包提供了多种阻塞队列实现,...

    多线程模拟实现生产者/消费者模型

    生产者/消费者模型的应用场景广泛,如在网络服务器中处理请求、数据库连接池管理、批量数据处理等。理解并能熟练运用这个模型对于提升系统性能和设计高效的多线程程序至关重要。在实际开发中,我们还需要考虑异常...

    生产者消费者模型的演变

    在Java中,`java.util.concurrent`包提供了多种实现生产者消费者模型的工具,如`BlockingQueue`。 `BlockingQueue`是Java并发编程的一个核心接口,它实现了队列的数据结构,并且具备线程安全的特性。生产者可以将...

    【IT十八掌徐培成】Java基础第08天-04.多线程-生产者-消费者.zip

    4. ** 示例代码**:一个简单的生产者-消费者模型可以通过以下代码示例来理解: ```java import java.util.concurrent.ArrayBlockingQueue; public class ProducerConsumerExample { public static void main...

    生产者消费者模型

    例如,`ArrayBlockingQueue`、`LinkedBlockingQueue`等,它们可以自然地实现生产者消费者模型,无需手动调用`wait()`和`notify()`。 在提供的`P_C.java`文件中,我们可以预期看到一个简单的生产者消费者模型的实现...

    Test5ProducerAndCustomer_生产者消费者模型思维导图_

    生产者消费者模型是多线程编程中的一个经典设计模式,它在Java中有着广泛的应用。这个模型主要用于解决并发处理过程中的资源同步问题,确保生产者(数据生成者)和消费者(数据使用者)之间能有效协作,避免因数据...

    操作系统-消费者-生产者安卓实现

    为了协调主线程和后台线程之间的交互,避免阻塞UI,我们需要用到消费者-生产者模型。 3. **解决方法:线程同步与信号量**: 在安卓中,我们通常使用`java.util.concurrent`包中的工具类来实现线程同步,如`...

    【IT十八掌徐培成】Java基础第08天-05.多线程-生产者-消费者2.zip

    在Java中实现生产者-消费者模型,主要依赖于线程同步机制,包括`wait()`, `notify()` 和 `notifyAll()` 方法,它们都是在 `Object` 类中定义的。这些方法配合 `synchronized` 关键字使用,可以确保线程安全地访问...

    java 多线程生产者消费者模型demo

    Java多线程生产者消费者模型是一种典型的线程协作模式,用于解决并发编程中资源的高效利用和同步问题。在这个模型中,"生产者"线程负责生成数据,而"消费者"线程则负责处理这些数据。为了实现这种模式,Java提供了...

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    基于Java的实现通常会利用阻塞队列(BlockingQueue)和生产者消费者模型来确保线程安全和高效率。在这个框架中,生产者负责生成任务或消息,而消费者则负责处理这些任务或消息。 ### Java 阻塞队列 Java阻塞队列是...

    Java生产者消费者模型.rar

    Java生产者消费者模型是多线程编程中的一个经典设计模式,它主要用来解决资源的共享问题,通过协调生产者和消费者之间的工作流程,避免资源的过度消耗或浪费。在这个模型中,生产者负责生成数据,而消费者则负责处理...

    java实现生产者消费者

    在Java中,`java.util.concurrent`包下的`BlockingQueue`接口提供了一种线程安全的数据结构,非常适合用于实现生产者消费者模式。生产者将产品放入队列,消费者从队列中取出产品。`BlockingQueue`提供了`put()`和`...

    生产者和消费者模式多线程

    创建一个简单的生产者消费者模型,可以使用以下伪代码: ```java class Producer implements Runnable { private final BlockingQueue&lt;String&gt; queue; public Producer(BlockingQueue&lt;String&gt; queue) { this....

    java 编写的生产者与消费者问题

    这个库提供了多种工具类,如Semaphore(信号量)、BlockingQueue(阻塞队列)和Condition(条件变量),这些都可以用来实现生产者-消费者模型。 1. **BlockingQueue**: 阻塞队列是一种特殊的队列,它具有线程安全的...

    无边界-生成者消费者源码

    在Java编程中,"生成者消费者"模式...它利用阻塞队列作为共享资源,通过线程同步和通信实现生产者和消费者之间的协调,是解决并发问题的一种高效策略。通过学习和实践这个模式,开发者能够提高程序的并发性能和稳定性。

    生产者消费者

    在Java中,`BlockingQueue`接口是实现生产者消费者模式的理想选择。`BlockingQueue`提供了线程安全的队列操作,并且内置了阻塞机制。当队列满时,`put()`方法会使得生产者线程自动阻塞,直至队列有空位;当队列空时...

Global site tag (gtag.js) - Google Analytics