`

Producer Consumer using BlockingQueue

 
阅读更多

Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new ArrayBlockingQueue();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from theBlockingQueue in another thread.

 

BlockingQueue是Java Collection框架的一个接口,下面有很多实现类,包括:

ArrayBlockingQueue, (这个要自己会实现)

DelayQueue,  

LinkedBlockingQueue

PriorityBlockingQueue

 

不用BlockingQueue实现的话,可以这么做:

public class Drop {
    // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty = true;

    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }
}


public class Producer implements Runnable {
    private Drop drop;

    public Producer(Drop drop) {
        this.drop = drop;
    }

    public void run() {
        String importantInfo[] = {
            "Mares eat oats",
            "Does eat oats",
            "Little lambs eat ivy",
            "A kid will eat ivy too"
        };
        Random random = new Random();

        for (int i = 0;
             i < importantInfo.length;
             i++) {
            drop.put(importantInfo[i]);
            try {
                Thread.sleep(random.nextInt(5000));
            } catch (InterruptedException e) {}
        }
        drop.put("DONE");
    }
}

public class Consumer implements Runnable {  
    private Drop drop;  
  
    public Consumer(Drop drop) {  
        this.drop = drop;  
    }  
  
    public void run() {  
        Random random = new Random();  
        for (String message = drop.take();  
             ! message.equals("DONE");  
             message = drop.take()) {  
            System.out.format("MESSAGE RECEIVED: %s%n", message);  
            try {  
                Thread.sleep(random.nextInt(5000));  
            } catch (InterruptedException e) {}  
        }  
    }  
}

public class ProducerConsumerExample {  
    public static void main(String[] args) {  
        Drop drop = new Drop();  
        (new Thread(new Producer(drop))).start();  
        (new Thread(new Consumer(drop))).start();  
    }  
}

以上代码来自:

http://blog.csdn.net/shymi1991/article/details/39267821

分享到:
评论

相关推荐

    Producer/Consumer 多线程处理文件

    在这个"Producer/Consumer 多线程处理文件"的主题中,我们将深入探讨Java中的生产者消费者模型,以及如何利用这一模式来高效地处理大量数据,如一秒钟处理一千多条记录。 生产者消费者模型源于操作系统理论,是解决...

    ProducerConsumer(生产者消费者问题的单线程模拟)

    std::thread consumerThread(consumer, std::ref(sharedQueue), std::ref(mutex), std::ref(cv)); producerThread.join(); consumerThread.join(); return 0; } ``` 在这个例子中,生产者和消费者通过互斥量和...

    ProducerConsumer-CSharp:C# 制作的 ProducerConsumer Demo

    ProducerConsumer 模型是计算机科学中的一个经典设计模式,它主要解决了多线程环境下的资源同步和通信问题。在 C# 中,我们可以利用 .NET Framework 提供的并发和线程管理工具来实现这个模型。本项目 "Producer...

    ProducerConsumer.java

    The use of producer-consumer questions helps to better understand threads.

    57119101_王晨阳_ProducerConsumer1

    【Producer-Consumer问题】是经典的多线程同步问题,它涉及到如何在多个生产者和消费者之间共享有限资源的问题。在这个实验中,57119101_王晨阳_ProducerConsumer1通过Win32 API和Pthreads两种方式实现了这一问题的...

    consumer_producer_operatingsystems_producerconsumer_

    1、n个缓冲区的缓冲池作为一个临界资源:当生产者任务从数据源—文件中读取数据后将会申请一个缓冲区,并将此数据放缓冲区中。消费者任务从一个缓冲区中取走数据,并将其中的内容打印输出。当一个生产者任务正在访问...

    ProducerConsumer.rar

    生产者消费者代码

    Flex基于Producer和Consumer方式的简易消息推送机制

    Flex中的消息推送机制是基于Producer和Consumer模型实现的,这种机制允许应用程序实时地发送和接收消息,常用于构建聊天室、通知系统等实时交互的应用。下面将详细解释Flex消息推送的相关知识点。 1. **Producer与...

    MultiThreadExercise:Java MultiThread练习代码,包括ReaderWriter,BlockingQueue,ProducerConsumer等的实现

    本项目"MultiThreadExercise"提供了丰富的Java多线程练习,涵盖了ReaderWriter、BlockingQueue和ProducerConsumer等经典并发模式的实现。下面我们将深入探讨这些知识点。 首先,**ReaderWriter**模式是一种用于解决...

    producer-consumer问题的分析和讨论

    3. `Producer.java`: 这个文件可能包含了生产者线程的实现。生产者线程会不断地生成数据(在这种情况下可能是事件对象),然后调用`EventStorage`的`put()`方法将数据放入队列。为了防止无限制地生成数据导致队列...

    producer_consumer_v2.c

    别人的太贵了,自己写一份分享!c语言实现生产者消费者模型,支持设置生产者消费者线程数量,支持设置消息数量。消息的组织形式为链表。生产者生产消息和消费者处理消息分别需要对应的条件变量。...

    Kafka客户端producer/consumer样例

    Kafka客户端producer/consumer样例

    ProducerConsumer:使用java实现经典的并发或线程模式

    2. **`Producer`和`Consumer`接口**:虽然Java标准库中没有现成的`Producer`和`Consumer`接口,但在设计模式中,我们通常定义这两个接口来表示生成数据和消耗数据的行为。实现这些接口的类将分别代表生产者和消费者...

    consumer-producer.rar_producer_consumer.c_生产者消费者问题

    在这个问题中,有两组角色:生产者(Producer)和消费者(Consumer)。生产者负责创建或生成资源,而消费者则负责消费这些资源。在多线程环境下,为了确保数据的一致性和避免资源竞争,需要采取适当的同步机制。本...

    线程----BlockingQueue

    ### 线程与BlockingQueue知识点详解 #### 1. BlockingQueue简介 `BlockingQueue`是一种特殊类型的队列,主要用于多线程环境中的任务管理。它具有以下特性:当队列为空时,从队列中获取元素的操作会被阻塞;同样地,...

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    eureka-producer-consumer.zip

    本文将深入探讨如何使用Eureka构建Producer(服务提供者)和Consumer(服务消费者),并利用Feign和Ribbon进行远程调用。 1. **Eureka Server** Eureka Server作为服务注册中心,负责管理所有微服务实例的信息。每...

    Springboot集成Kafka实现producer和consumer的示例代码

    1. **生产者(Producer)**: 生产者是向 Kafka 集群发布消息的应用程序。在 Spring Boot 中,可以通过 `KafkaTemplate` 或自定义配置来发送消息。 2. **消费者(Consumer)**: 消费者从 Kafka 集群中读取消息并处理...

    ProducerAndConsumer:生产者和消费者模型java实现

    public Producer(BlockingQueue&lt;Integer&gt; queue) { this.queue = queue; } @Override public void run() { while (true) { try { // 生产数据 int data = produceData(); // 尝试将数据放入队列 queue....

    producer_consumer_using_multithreading_in_java:用Java实现的经典生产者消费者问题的多线程解决方案

    6. ** 示例代码 **:在`producer_consumer_using_multithreading_in_java-master`这个项目中,应该包含了一个完整的示例,演示了如何用Java实现生产者消费者模型。源代码可能包括一个`Producer`类和一个`Consumer`类...

Global site tag (gtag.js) - Google Analytics