package test;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程
*/
public class Producer implements Runnable {
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("========放入数据失败:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
==================================================================================
package test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
/**
* 消费者线程
*/
public class Consumer implements Runnable {
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("启动消费者线程!");
//Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
//String data = queue.poll(2, TimeUnit.SECONDS);
//if (null != data) {
//System.out.println("拿到数据:" + data);
//System.out.println("正在消费数据:" + data);
//Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
//} else {
//// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
//isRunning = false;
//}
Collection<String> c = new ArrayList<String>();
queue.drainTo(c);
if(c.isEmpty()){
isRunning = false;
}else{
for(String s : c){
System.out.println("拿到数据:" + s);
System.out.println("正在消费数据:" + s);
}
}
Thread.sleep(DEFAULT_RANGE_FOR_SLEEP*3);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
========================================================
package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(50);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Producer producer4 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(producer4);
Thread.sleep(2 * 1000);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
// 执行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
producer4.stop();
Thread.sleep(5000);
// 退出Executor
service.shutdown();
}
}
相关推荐
BlockingQueue是Java并发编程中非常重要的一个数据结构,它是一个具有阻塞特性的队列,主要用于线程间的协作。在多线程环境下,BlockingQueue能够有效地实现生产者-消费者模式,提高了程序的并发性能和效率。本文将...
在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...
"java并发学习之BlockingQueue实现生产者消费者详解" BlockingQueue是Java util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。在多线程应用中,常用于生产-消费场景。BlockingQueue有多种实现,...
在Java编程中,队列是一种线性数据结构,它遵循先进先出(FIFO)的原则。...通过阅读《Java队列》这篇博文,读者可以学习到如何在Java中正确地使用和实现队列,以及如何利用队列解决实际编程问题。
通过分析和理解这个"java队列源码",开发者可以学习如何在高并发环境中构建高效且线程安全的队列,这对于优化并发性能、减少线程竞争和提高系统吞吐量至关重要。在实际项目中,这种技术可以广泛应用于各种并发场景,...
标题 "2011.08.30(2)——— java BlockingQueue ExecutorService" 涉及到Java并发编程中的两个核心组件:BlockingQueue(阻塞队列)和ExecutorService。这篇博客可能深入探讨了如何使用这两个工具来优化多线程环境...
【描述】:此文档是关于Java并发编程的学习资料,以漫画形式讲解,聚焦于Java并发编程中的核心概念——BlockingQueue接口及其具体实现ArrayBlockingQueue。 【标签】:“java”、“并发”、“编程”、“宝典” ...
2. **工作队列(BlockingQueue)** - 一个阻塞队列,用于存储待处理的任务。当队列满时,生产者会阻塞直到队列有空位;当队列空时,消费者会阻塞直到有新任务可用。 3. **拒绝策略** - 当线程池和工作队列都满载时,...
在提供的"blockingQuence"文件中,可能包含关于阻塞队列(BlockingQueue)的内容。阻塞队列是Java并发包`java.util.concurrent`中的一个数据结构,它结合了队列和锁的概念,常用于构建高效的消息传递机制。在生产者-...
通过这些代码,我们可以学习如何创建线程,如何定义队列操作,以及如何确保线程安全。 总结来说,多线程队列利用是一种高效的并发编程模式,通过队列作为中间媒介,可以实现线程间的协作和任务调度。深入理解和实践...
数据结构是计算机科学中的核心概念,它涉及到如何高效地组织和操作数据。...通过阅读和分析这些代码,我们可以更深入地理解队列的内部工作机制,学习如何在实际编程中有效地使用和实现队列数据结构。
`BlockingQueue`是线程安全的队列,它提供了在生产者和消费者之间同步数据的方法,使得一个线程可以等待另一个线程向队列中添加或移除元素。`@PostConstruct`注解则是Spring框架中用于标记初始化方法的,该方法会在...
在队列化执行线程和代码块的场景中,BlockingQueue常作为线程池的任务队列,用来暂存待执行的任务。 4. **事件驱动编程**: 这是一种编程模型,其中程序响应来自外部源的事件(如用户输入、网络消息等)。在这种模式...
3. **阻塞队列(BlockingQueue)**:`java.util.concurrent`包下的`BlockingQueue`接口扩展了Queue接口,并添加了线程安全的阻塞操作,如`put(E e)`、`take()`等。这些方法会在队列满时阻塞生产者,空时阻塞消费者,...
这个项目可能采用了`BlockingQueue`接口,它是Java并发包中的一种高效队列实现,提供线程安全的插入和取出操作,且具有阻塞特性:当队列为空时,取操作会阻塞,直到有新的元素加入;当队列满时,插入操作也会阻塞,...
在Java编程中,生产者消费者问题是多线程同步的一个经典示例,用于演示如何有效地管理共享资源。...这个Java实例和教程是学习并发编程的好资源,不仅可以加深对`BlockingQueue`的理解,还能提升处理多线程问题的能力。
通过这个示例,我们不仅了解了生产者消费者模式,还学习了Java并发编程中`BlockingQueue`接口的使用,以及如何创建和管理线程。这是一个基础但非常实用的例子,有助于理解多线程环境中的同步和协作。在实际应用中,...
`BlockingQueue`有三个核心方法:`put()`(用于插入元素,当队列满时会阻塞)、`take()`(用于获取并移除元素,当队列空时会阻塞)以及`offer()`(非阻塞插入,如果队列满则返回false)。 在这个操作系统实验中,...
操作系统中的“生产者-消费者问题”是一个经典的多线程同步问题,主要涉及...学习和理解这个问题对于进行高并发系统设计和优化是非常重要的,因为它是并发编程中常见的模式,广泛应用于消息队列、数据库连接池等场景。
同时,`BlockingQueue`接口(如`LinkedBlockingQueue`、`ArrayBlockingQueue`等实现)提供了一种线程安全的队列实现,它内置了阻塞操作,使得当队列为空时,尝试获取元素的线程会被阻塞,直到有元素可用。...