`

BlockQueue生产者消费者例子

    博客分类:
  • java
 
阅读更多

BlockingQueue详解


阻塞队列,顾名思义,首先它是一个队列,
    
常用的队列主要有以下两种:
        先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。
        从某种程度上来说这种队列也体现了一种公平性。
   后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

阻塞队列的核心就是生产者和消费着
        当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
        队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    
BlockingQueue的核心方法:
        放入数据:    
        offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
        offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
        put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

        获取数据:
          poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间, 取不到时返回null;
          poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回null。
          take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
          drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

 

列子:

 

                package test;  

  

                import java.util.Random;  

                import java.util.concurrent.BlockingQueue;  

                import java.util.concurrent.TimeUnit;  

                import java.util.concurrent.atomic.AtomicInteger;  

  

                public class Producer implements Runnable {  

  

                    private volatile boolean isRunning = true;  

                    private BlockingQueue queue;  

                    private static AtomicInteger count = new AtomicInteger();  

                    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;  

                      

                    public Producer(BlockingQueue queue){  

                        this.queue = queue;  

                    }  

                      

                    @Override  

                    public void run() {  

                        String data = null;  

                        Random r = new Random();  

                          

                        System.out.println("====启动生产者====");  

                        try{  

                            while(isRunning){  

                                System.out.println("====正在生产数据====");  

                                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));  

                                  

                                data = "data:" + count.incrementAndGet();  

                                if(!queue.offer(data, 2, TimeUnit.SECONDS)){  

                                    System.out.println("放入数据失败:"+data);  

                                    continue;  

                                }  

                                  

                                System.out.println("将数据:" + data + "放入队列...");  

                                  

                            }  

                        }catch(InterruptedException e){  

                            e.printStackTrace();  

                            Thread.currentThread().interrupt();  

                        }finally{  

                            System.out.println("====退出生产者线程====");  

                        }  

                    }  

                      

                    public void stop(){  

                        isRunning = false;  

                    }  

                      

                }  

  

                package test;  

  

                import java.util.Random;  

                import java.util.concurrent.BlockingQueue;  

                import java.util.concurrent.TimeUnit;  

  

                public class Consumer implements Runnable{  

                      

                    private BlockingQueue<String> queue;  

                    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;  

                      

                    public Consumer(BlockingQueue<String> queue){  

                        this.queue = queue;  

                    }  

  

                    @Override  

                    public void run() {  

                        System.out.println("====启动消费者线程!====");  

                        Random r = new Random();  

                        boolean isRunning = true;  

                          

                        try{  

                            while(isRunning){  

                                System.out.println("====从队列获取数据====");  

                                String data = queue.poll(2, TimeUnit.SECONDS);  

                                if(null != data){  

                                    System.out.println("拿到数据:"+data);  

                                    System.out.println("消费数据:"+data);  

                                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));  

                                }else{  

                                    isRunning = false;  

                                }  

                            }  

                        }catch(InterruptedException e){  

                            e.printStackTrace();  

                            Thread.currentThread().interrupt();  

                        }finally{  

                            System.out.println("====退出消费者线程!====");  

                        }  

                    }  

                      

                }  

  

                package test;  

  

                import java.util.concurrent.BlockingQueue;  

                import java.util.concurrent.ExecutorService;  

                import java.util.concurrent.Executors;  

                import java.util.concurrent.LinkedBlockingQueue;  

  

  

                public class Test {  

  

                    public static void main(String[] args) throws InterruptedException {  

                        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);  

                          

                        Producer producer1 = new Producer(queue);  

                        Producer producer2 = new Producer(queue);  

                        Producer producer3 = new Producer(queue);  

                        Consumer consumer = new Consumer(queue);  

                          

                        ExecutorService service = Executors.newCachedThreadPool();  

                        service.execute(producer1);  

                    service.execute(producer2);  

                    service.execute(producer3);  

                    service.execute(consumer);  

  

                    Thread.sleep(10 * 1000); //生产者执行10m  

                    producer1.stop();  

                    producer2.stop();  

                    producer3.stop();  

                      

                    Thread.sleep(2000);  

                      

                    //退出Executor  

                    service.shutdown();  

                    }  

                }  

分享到:
评论

相关推荐

    BlockQueue练习

    BlockQueue是Java并发编程中非常重要的一个接口,它位于`java.util.concurrent`包下,是线程安全的队列,特别适用于多生产者多消费者(multi-producer multi-consumer, MPMC)的场景。在本练习中,我们将通过`...

    【Java】Queue、BlockingQueue和队列实现生产者消费者模式

    源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...

    实战Concurrent-BlockQueue

    然而,由于没有容量限制,当生产者速度远超消费者时,可能会导致内存资源耗尽。 其次,`ArrayBlockingQueue`是一个有界的阻塞队列,内部使用数组实现。它通过公平或非公平锁来保证线程安全,提供了固定的容量,从而...

    java 中 阻塞队列BlockingQueue详解及实例

    通过这种方式,生产者和消费者线程可以通过阻塞队列安全地交互,无需显式地进行同步控制。 总结来说,Java中的阻塞队列BlockingQueue是并发编程中的一种高效、线程安全的数据结构,它提供了线程间的同步和通信机制...

    操作系统计算题总结实用.pdf

    父亲是生产者,儿子和女儿是消费者,各自消费不同类型的水果。这里设置了三个信号量S、So和Sa,分别表示盘子是否为空、是否有桔子和是否有苹果。P操作用于获取资源(如尝试放水果或取水果),V操作用于释放资源...

    大数据基础复习

    BlockQueue是Java并发编程中的重要工具,它是一种特殊的队列,当队列满时,生产者尝试添加元素会阻塞,直到队列有空位;当队列为空时,消费者尝试取出元素也会阻塞,直到队列中有元素可用。BlockQueue提供了一种线程...

    队列(数据结构--Java版)

    此外,对于高效内存管理和大数据处理,还可以了解基于块的队列(BlockQueue)如 `LinkedBlockingQueue` 和 `ArrayBlockingQueue`,它们提供了阻塞操作,适用于生产者-消费者模型。 总结来说,Java中的队列数据结构...

    可以阻塞读的循环队列

    void init_queue(BlockQueue* q, int size) { // 初始化队列、锁和条件变量 } void enqueue(BlockQueue* q, int item) { // 加锁、检查队列是否已满、插入元素、更新后端指针、通知等待的出队线程、解锁 } int ...

    RustBlockingQueue:线程安全队列,在空时阻止出队

    RustBlockingQueue 线程安全队列,在空时阻止出队 概念: RustBlockingQueue是使用线程安全的阻塞队列在线程之间进行通信的工具。 请注意,Rust消息传递工具执行的操作大致相同。 这很好地说明了如何构建线程之间...

    山东大学操作系统计算题总结.docx

    `blockqueue`是一个指向等待队列的指针,该队列中包含了所有因为等待信号量而被阻塞的进程。 #### P操作 P操作的主要作用是减少信号量的值,并检查该值是否小于零。如果小于零,则表示资源不足,当前进程需要等待;...

    迅雷2010Java笔试题哈尔滨站

    在这个例子中,需要编写SQL查询语句,根据生日升序排序,选取年龄最小的10个学生的姓名和生日。使用PreparedStatement预编译SQL语句,避免SQL注入,然后通过ResultSet获取结果并打印。 【编程题】涉及的知识点包括...

    源代码分析(一九).docx

    - `BlockTargetPair`和`BlockQueue`:`DatanodeDescriptor`的内部类,用于管理和追踪DataNode上Block的复制和Lease恢复操作。 6. **权限和安全**:HDFS支持基于用户和组的权限控制,类似于UNIX系统。每个`INode`都...

    Hadoop源代码分析(一九)

    `DatanodeDescriptor`包含`BlockTargetPair`和`BlockQueue`,用于跟踪复制、恢复Lease和已失效的Block。它有两个`BlockQueue`,一个用于记录正在复制的Block,另一个用于记录在Lease恢复过程中的Block。 `...

    javaforkjoin源码-gitbook-BAT-interview:本文综合自己在一线互联网工作感悟,经验。记录开源框架的源码解读,数据

    [BlockQueue] [ArrayBlockingQueue] [ConcurrentLinkedQueue] [PriorityBlockingQueue] [DelayQueue] 并发安全集合 [HashMap, ConcurrentHashMap源码] [ArrayList, LinkedList, CopyOnWriteArrayList源码]

    NetLib:C++ Linux 网络库

     │ ├── BlockQueue 阻塞队列│ │ └── Log 日志│ ├── Thread│ │ ├── FuncThreadPool 函数对象线程池│ │ ├── ThreadObject 线程对象│ │ └── ThreadPool 模板类线程池│ └──...

    中南大学操作系统实验报告.pdf

    static ArrayList&lt;PCB&gt; blockQueue = new ArrayList();//阻塞队列 static ArrayList&lt;PCB&gt; endQueue = new ArrayList();//结束队列 static ArrayList&lt;PCB&gt; externStore = new ArrayList();//外存队列 //... 本...

Global site tag (gtag.js) - Google Analytics