BlockingQueue详解
阻塞队列,顾名思义,首先它是一个队列,
常用的队列主要有以下两种:
先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。
从某种程度上来说这种队列也体现了一种公平性。
后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。
阻塞队列的核心就是生产者和消费着
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
BlockingQueue的核心方法:
放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间, 取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回null。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
列子:
package test;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
String data = null;
Random r = new Random();
System.out.println("====启动生产者====");
try{
while(isRunning){
System.out.println("====正在生产数据====");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
if(!queue.offer(data, 2, TimeUnit.SECONDS)){
System.out.println("放入数据失败:"+data);
continue;
}
System.out.println("将数据:" + data + "放入队列...");
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}finally{
System.out.println("====退出生产者线程====");
}
}
public void stop(){
isRunning = false;
}
}
package test;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
System.out.println("====启动消费者线程!====");
Random r = new Random();
boolean isRunning = true;
try{
while(isRunning){
System.out.println("====从队列获取数据====");
String data = queue.poll(2, TimeUnit.SECONDS);
if(null != data){
System.out.println("拿到数据:"+data);
System.out.println("消费数据:"+data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}else{
isRunning = false;
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}finally{
System.out.println("====退出消费者线程!====");
}
}
}
package test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
Thread.sleep(10 * 1000); //生产者执行10m
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
//退出Executor
service.shutdown();
}
}
相关推荐
BlockQueue是Java并发编程中非常重要的一个接口,它位于`java.util.concurrent`包下,是线程安全的队列,特别适用于多生产者多消费者(multi-producer multi-consumer, MPMC)的场景。在本练习中,我们将通过`...
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
然而,由于没有容量限制,当生产者速度远超消费者时,可能会导致内存资源耗尽。 其次,`ArrayBlockingQueue`是一个有界的阻塞队列,内部使用数组实现。它通过公平或非公平锁来保证线程安全,提供了固定的容量,从而...
通过这种方式,生产者和消费者线程可以通过阻塞队列安全地交互,无需显式地进行同步控制。 总结来说,Java中的阻塞队列BlockingQueue是并发编程中的一种高效、线程安全的数据结构,它提供了线程间的同步和通信机制...
父亲是生产者,儿子和女儿是消费者,各自消费不同类型的水果。这里设置了三个信号量S、So和Sa,分别表示盘子是否为空、是否有桔子和是否有苹果。P操作用于获取资源(如尝试放水果或取水果),V操作用于释放资源...
BlockQueue是Java并发编程中的重要工具,它是一种特殊的队列,当队列满时,生产者尝试添加元素会阻塞,直到队列有空位;当队列为空时,消费者尝试取出元素也会阻塞,直到队列中有元素可用。BlockQueue提供了一种线程...
此外,对于高效内存管理和大数据处理,还可以了解基于块的队列(BlockQueue)如 `LinkedBlockingQueue` 和 `ArrayBlockingQueue`,它们提供了阻塞操作,适用于生产者-消费者模型。 总结来说,Java中的队列数据结构...
例子 var queue = require ( 'block-queue' ) ; var q = queue ( 1 , function ( task , done ) { // working on task.. done ( ) ; } ) ; q . push ( 'task1' ) ; q . push ( 'task2' ) ; q . push ( 'task3' ) ...
void init_queue(BlockQueue* q, int size) { // 初始化队列、锁和条件变量 } void enqueue(BlockQueue* q, int item) { // 加锁、检查队列是否已满、插入元素、更新后端指针、通知等待的出队线程、解锁 } int ...
RustBlockingQueue 线程安全队列,在空时阻止出队 概念: RustBlockingQueue是使用线程安全的阻塞队列在线程之间进行通信的工具。 请注意,Rust消息传递工具执行的操作大致相同。 这很好地说明了如何构建线程之间...
`blockqueue`是一个指向等待队列的指针,该队列中包含了所有因为等待信号量而被阻塞的进程。 #### P操作 P操作的主要作用是减少信号量的值,并检查该值是否小于零。如果小于零,则表示资源不足,当前进程需要等待;...
在这个例子中,需要编写SQL查询语句,根据生日升序排序,选取年龄最小的10个学生的姓名和生日。使用PreparedStatement预编译SQL语句,避免SQL注入,然后通过ResultSet获取结果并打印。 【编程题】涉及的知识点包括...
- `BlockTargetPair`和`BlockQueue`:`DatanodeDescriptor`的内部类,用于管理和追踪DataNode上Block的复制和Lease恢复操作。 6. **权限和安全**:HDFS支持基于用户和组的权限控制,类似于UNIX系统。每个`INode`都...
`DatanodeDescriptor`包含`BlockTargetPair`和`BlockQueue`,用于跟踪复制、恢复Lease和已失效的Block。它有两个`BlockQueue`,一个用于记录正在复制的Block,另一个用于记录在Lease恢复过程中的Block。 `...
[BlockQueue] [ArrayBlockingQueue] [ConcurrentLinkedQueue] [PriorityBlockingQueue] [DelayQueue] 并发安全集合 [HashMap, ConcurrentHashMap源码] [ArrayList, LinkedList, CopyOnWriteArrayList源码]
│ ├── BlockQueue 阻塞队列│ │ └── Log 日志│ ├── Thread│ │ ├── FuncThreadPool 函数对象线程池│ │ ├── ThreadObject 线程对象│ │ └── ThreadPool 模板类线程池│ └──...
static ArrayList<PCB> blockQueue = new ArrayList();//阻塞队列 static ArrayList<PCB> endQueue = new ArrayList();//结束队列 static ArrayList<PCB> externStore = new ArrayList();//外存队列 //... 本...