`
assertmyself
  • 浏览: 29570 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

生产者消费者模式,基于阻塞队列

阅读更多
基于阻塞队列可以分容易实现生产者消费者模式

基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。


应用场景
服务器段分发器的处理、消息队列实现等等

核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue


下面一个简单的例子
生产者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生产者线程。
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:34
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Producer
 */
public class Producer implements Runnable {

	private volatile boolean isRunning = true;
	private BlockingQueue queue;
	private static AtomicInteger count = new AtomicInteger();
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		String data = null;
		Random r = new Random();

		System.out.println("启动生产者线程!");
		try {
			while (isRunning) {
				System.out.println("正在生产数据...");
				Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

				data = "data:" + count.incrementAndGet();
				System.out.println("将数据:" + data + "放入队列...");
				if (queue.size() >= 5) {
					System.out
							.println("/***************** clear**********************/");
					queue.clear();
				}
				queue.put(data);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出生产者线程!");
		}
	}

	public void stop() {
		isRunning = false;
	}

}





消费者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消费线程
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:24
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Consumer
 */
public class Consumer implements Runnable {
	private BlockingQueue<String> queue;
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
	private static AtomicInteger count = new AtomicInteger();

	public Consumer(BlockingQueue<String> queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("启动消费者线程!");
		Random r = new Random();
		boolean isRunning = true;
		try {
			while (isRunning) {
				System.out.println("正从队列获取数据...");
				String data = queue.take();
				if (null != data) {
					System.out.println("拿到数据:" + data + "  : queue size = "
							+ queue.size());
					System.out.println(Thread.currentThread().getName()
							+ " - 正在消费数据:" + data + "::::consumer times="
							+ count.incrementAndGet());
					Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
				} else {
					isRunning = false;
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出消费者线程!");
		}
	}

}



客户端
package com.gbcom.java.blockqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 生产者消费者客户端
 * 
 * @author syz
 * @date 2014-7-2
 * @version v1.0.0
 * @see com.gbcom.java.blockqueue.BlockingQueueClient
 */
public class BlockingQueueClient {

	public static void main(String[] args) throws InterruptedException {
		// 声明一个容量为10的缓存队列
		BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

		Producer producer1 = new Producer(queue);
		Producer producer2 = new Producer(queue);
		Producer producer3 = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		// Consumer consumer2 = new Consumer(queue);
		// Consumer consumer3 = new Consumer(queue);

		// 借助Executors
		ThreadPoolExecutor service = (ThreadPoolExecutor) Executors
				.newCachedThreadPool();
		// 启动线程
		service.execute(producer1);
		service.execute(producer2);
		service.execute(producer3);
		// service.execute(consumer2);
		// service.execute(consumer3);
		service.execute(consumer);

		// 执行10s
		Thread.sleep(10 * 1000);

		System.out.println("active count = " + service.getActiveCount());
		// producer1.stop();
		// producer2.stop();
		// producer3.stop();
		Thread.sleep(2000);
		// 退出Executor
		service.shutdown();
	}
}
分享到:
评论

相关推荐

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    生产者 消费者 模式 c++

    生产者消费者模式是一种多线程或并发编程中的经典设计模式,它主要用于解决系统资源的高效利用和同步问题。在C++中实现生产者消费者模式,我们可以利用C++11及更高版本提供的线程库()、互斥量()、条件变量()等...

    生产者和消费者模式多线程

    阻塞队列在生产者和消费者模式中起着关键的作用。Java中的`BlockingQueue`接口提供了几个实现类,如`ArrayBlockingQueue`、`LinkedBlockingQueue`和`PriorityBlockingQueue`等,它们都实现了线程安全的队列操作。...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    在"阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip"这个压缩包中,很可能是详细介绍了如何使用Java的阻塞队列来构建生产者消费者模式,可能包括以下知识点: 1. **阻塞队列接口**:首先,会介绍`...

    生产消费者队列(c#),用于线程的队列自动同步

    在多线程编程中,生产者消费者模型是一种常见的设计模式,用于解决线程间的通信和同步问题。在C#中,我们可以利用各种机制实现这样的队列。本篇将详细讲解如何在C#中构建一个生产消费者队列,以及它如何帮助优化线程...

    多进程同步-生产者消费者模式-C实现

    生产者消费者模式基于操作系统提供的信号量(Semaphore)或管程(Monitor)等机制,以解决进程间的通信和同步问题。在这个模式中,生产者进程负责生成数据并放入缓冲区,而消费者进程则负责从缓冲区取出数据进行处理...

    架构设计 -- 生产者/消费者模式

    【生产者/消费者模式】是一种常见的并发编程和系统设计模式,它主要解决的是在多线程环境下,如何协调生产者和消费者之间的数据处理问题。在软件开发中,生产者通常是生成数据的一方,而消费者则是处理这些数据的...

    C++实现生产与消费者模型

    当队列为空或满时,生产者和消费者会被阻塞,直到条件满足(如队列有空位或有新数据)。 4. **示例代码(参考CPP_PC.cpp)** 代码中,`std::mutex`用于保护队列,`std::condition_variable`用于线程间的通信。生产...

    生产者消费者模式在java中的应用

    1. `BlockingQueue`接口:这是生产者消费者模式的核心,它提供了一种线程安全的队列,支持阻塞的插入(put)和移除(take)操作。当队列满时,生产者线程尝试插入元素会被阻塞,直到有消费者消费;当队列空时,消费...

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    综上所述,"消息分发框架(基于JAVA阻塞队列实现、生产者消费者模型)"是一个关键的并发处理组件,通过Java提供的并发工具和设计模式,实现了高效、稳定的消息处理。在实际应用中,需要根据业务需求进行适当的性能...

    界面话模拟生产者消费者模式java

    - **阻塞队列(BlockingQueue)**:Java中的`BlockingQueue`接口用于实现生产者消费者模式的关键组件。它提供了一种线程安全的数据交换方式,当队列满时,生产者会被阻塞,直到消费者取走数据;反之,当队列空时,...

    android 生产者消费者模式

    在Android开发中,生产者-消费者模式是一种常见的多线程设计模式,用于处理并发问题,尤其是在数据处理和异步操作中。这个模式的核心思想是通过一个共享的数据缓冲区,使得生产者线程可以生成数据并放入缓冲区,而...

    redis 老版本生产者消费者模式

    在早期版本中,Redis 提供了一种基于队列的数据结构来实现生产者消费者模式,这是一种多线程或分布式系统中常见的设计模式,用于解耦数据生成(生产者)与数据处理(消费者)的过程。 生产者消费者模式在 Redis 中...

    java 多线程 生产者消费者模式

    `BlockingQueue`是一个线程安全的数据结构,它提供了一种在生产者和消费者之间同步的方法,当队列满时,生产者会被阻塞,无法再添加元素;当队列空时,消费者会被阻塞,无法取出元素,这样就确保了数据的一致性和...

    单一生产者消费者队列,支持阻塞和不阻塞,支持固定大小和扩容

    在给定的“单一生产者消费者队列”实现中,它提供了阻塞和非阻塞两种模式,以及固定大小和动态扩容的能力,这使得它在实际应用中具有较高的灵活性。 首先,我们来深入理解队列这一数据结构。队列是一种先进先出...

    Java多线程 生产者-消费者模式

    在这个模式中,生产者负责生成数据并放入共享的数据结构(如队列),而消费者则从这个数据结构中取出数据并进行处理。这个模式有效地实现了线程间的协作,避免了资源竞争和死锁。 在Java中,我们可以使用`java.util...

    基于生产者消费者完整测试程序.rar

    这包括合理设置队列大小,避免阻塞和饥饿现象,以及有效地调度生产者和消费者的执行。 5. **测试与调试**:测试是验证程序正确性的关键步骤。LabVIEW提供了丰富的测试和分析工具,如模拟数据生成、错误处理和性能...

    Qt C++11 生产者消费者模式类

    生产者消费者模式是一种多线程同步的经典设计模式,它在多线程编程中扮演着重要的角色,用于协调生产数据和消费数据的两个并发过程。在本项目“Qt C++11 生产者消费者模式类”中,我们看到利用Qt框架和C++11的新特性...

    多线程生产者消费者模式

    生产者消费者模式是一种经典的多线程设计模式,它用于协调两个或多个线程之间的数据交换,确保数据的正确生产和消费。 生产者消费者模式的核心思想是通过共享缓冲区来实现线程间的通信。生产者线程负责生成数据并放...

Global site tag (gtag.js) - Google Analytics