问题描述
在学习多线程编程的时候,我们经常会碰到的一个问题就是producer consumer问题。这个概念在学习操作系统的时候也会常碰到。本身这个问题的描述比较简单。假设我们有两个进程或者线程。他们俩就好比是工厂里一条流水线上的两个车间。其中线程A生产的部件要交给下一个线程B来做进一步的处理。这个时候,线程A就相当于一个生产者,而线程B就相当于一个消费者。在生产者生产了一个部件之后,它们会通过一个传送带传递这个部件。也就是说,线程A负责往传送带上面放部件,而线程B负责从传送带上面取部件。这样,两个线程在各自运行的时候需要访问同样的一个资源。那么,我们该怎么来有效的解决这个问题呢?
解决方法讨论
根据前面的假定条件,我们可以将两个线程之间要共同访问的部分定义为一个数组。那么,对于这两个线程来说,他们在一定程度上是独立的。一个线程可以往共享资源队列里放东西,另外一个可以从里面取东西。对于producer线程来说,它需要考虑的就是如果我往资源队列里放东西时,队列满了,那么我必须要等待consumer线程取走一部分元素,这样才能继续进行。另外,和其他线程访问同一个资源队列的时候,还要保证访问是线程安全的。同样,对于consumer线程来说,如果资源队列是空的,同样,该线程也必须要等待producer线程将产生的元素放入队列。
根据这部分的讨论,我们问题的核心就在于怎么样使得一个线程和另外一个线程互斥的访问同一个资源呢?另外,如果一个线程发现目前资源情况不适合自己运行,又该怎么去停止自己而让其他的线程来运行呢? 这里,我们实际上有几种办法。
wait, notify
在java里面,如果我们希望一个线程在已经占有资源的锁的情况下先中止自己的执行并释放锁,那么就需要调用wait方法。这样,我们前面的第二个问题就得到了解答。如果我们在已经完成了自己的那部分操作,需要释放锁并要其他的线程来继续使用时呢?我们可以通过调用notify和notifyAll方法来通知其他因为资源被加锁之后处于阻塞状态的线程。
通过这一部分的讨论,我们可以得出一个线程producer的大致流程如下:
1. 如果资源队列满,则调用wait方法释放所,一直等待到资源队列有空缺。
2. 在资源队列加入新的元素。
3. 调用notify/notifyAll方法来唤醒其他等待的线程。
这是Producer部分的代码:
public class Producer implements Runnable { /** * Store to work with */ private EventStorage storage; /** * Constructor of the class. Initialize the storage. * @param storage The store to work with */ public Producer(EventStorage storage){ this.storage=storage; } /** * Core method of the producer. Generates 100 events. */ @Override public void run() { for (int i=0; i<100; i++){ storage.set(); } } }
调用wait和notify部分的代码在storage.set的方法定义中:
public synchronized void set() { while(storage.size() == maxSize) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } storage.offer(new Date()); System.out.printf("Set: %d", storage.size()); notifyAll(); }
这里,我们用一个循环来一直判断资源队列的状况。
对于Consumer来说,我们也可以有一个类似的流程,只是它被阻塞的条件是资源队列为空,需要等待Producer产生新的元素。Consumer的代码如下:
public class Consumer implements Runnable { /** * Store to work with */ private EventStorage storage; /** * Constructor of the class. Initialize the storage * @param storage The store to work with */ public Consumer(EventStorage storage){ this.storage=storage; } /** * Core method for the consumer. Consume 100 events */ @Override public void run() { for (int i=0; i<100; i++){ storage.get(); } } }
storage.get方法里定义了具体怎么使用wait, notify的过程:
public synchronized void get() { while(storage.size() == 0) { try { wait(); } catch(InterruptedException e) { e.printStackTrace(); } } System.out.printf("Get: %d: %s", storage.size(), storage.poll()); notifyAll(); }
这里还有一个需要注意的地方就是,我们的get, set方法都使用了synchronized的修饰,表示它是线程互斥的。从我们对synchronized的理解就知道,对于线程访问,它每次只有一个线程能够获得这个锁来进入这个方法。而我们使用wait方法的时候,却能够在获得这个锁的情况下,将锁释放了。就是这个wait方法,挺神奇的,在这里能够实现我们一个很关键的要求。不然当我们拿到锁又不满足执行条件想要释放的时候就很麻烦。后面附件里有示例代码的完整实现。
BlockingQueue
前面我们是用的wait, notify的方式来实现的释放锁和通知线程机制。实际上在java里也有一些数据结构已经帮我们封装好了互斥的机制。一个典型的就是BlockingQueue。BlockingQueue是一个定义的接口,具体的实现里有ArrayBlockingQueue, LinkedBlockingQueue。参照前面的问题,我们也可以写一个简单的使用BlockingQueue的演示程序。
Producer:
import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { for(int i = 0; i < 100; i++) { Message msg = new Message("" + i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced " + msg.getMsg()); } catch(InterruptedException e) { e.printStackTrace(); } } Message msg = new Message("exit"); try { queue.put(msg); } catch(InterruptedException e) { e.printStackTrace(); } } }
Consumer:
import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { try { Message msg; while((msg = queue.take()).getMsg() != "exit") { Thread.sleep(10); System.out.println("Consumed " + msg.getMsg()); } } catch(InterruptedException e) { e.printStackTrace(); } } }
ProducerConsumerService
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerService { public static void main(String[] args) { BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); System.out.println("Producer and consumer has been started"); } }
这个程序里我们通过定义一个BlockingQueue,每次Producer或Consumer都从其中来存取元素,利用它本身的机制保证对数据的访问是互斥的。
Condition
除了前面提到的两种办法,还有一种似乎更加细粒度控制的机制来实现同样的效果。那就是通过condition的await, signal机制。
和前面的方法类似,这里不再列出producer, consumer的细节,只是把对被操作数据的封装样式给列了出来:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedBuffer { private final String[] buffer; private final int capacity; private int front; private int rear; private int count; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BoundedBuffer(int capacity) { super(); this.capacity = capacity; buffer = new String[capacity]; } public void deposit(String data) throws InterruptedException { lock.lock(); try { while (count == capacity) { notFull.await(); } buffer[rear] = data; rear = (rear + 1) % capacity; count++; notEmpty.signal(); } finally { lock.unlock(); } } public String fetch() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } String result = buffer[front]; front = (front + 1) % capacity; count--; notFull.signal(); return result; } finally { lock.unlock(); } } }
这部分代码比较有意思的地方是它需要定义一个Lock,通过这个Lock对象再来创建condition对象。这些对象针对正好问题里缓冲区域数据是满还是空这两个条件。如果进入一个互斥的比如说缓冲区满或者空的时候,另外一个线程则要释放当前的锁并进行等待。
condition和前面提到的thread所带方法看起来很相似。你看,thread里面用的wait, notify方法一个是让当前线程进入等待状态,一个是唤醒其他所有等待的线程。而condition里的await, signal方法恰恰看起来有类似的功能。而且,他们的结构也很类似,前者是使用一个synchronized的块,而后者是通过一个lock来保证线程访问互斥。他们的主要细节差别体现在如下。当我们使用wait()/notify()方法的时候,所有等待状态的线程或对象都会被通知到,所以我们不知道具体是哪个对象/线程最终被唤醒了。而await()/signal()方法我们可以区分哪个对象或者线程得到特定的signal。这样,我们就有一个好处,在前面的wait()/notify()过程中,我们是将所有休眠的线程一股脑儿都唤醒了,很可能使得这个时候不符合运行条件的线程抢占了锁,然后又发现不行又要去释放锁,这样造成了效率的低下。而采用这个await()/signal()的方式更加精确。像在本例中,只有等待条件符合的那些线程会被唤醒,其他的就没有任何影响。
总结
Producer Consumer问题其实不是一个很新鲜的概念。它的本质是要保证多个线程对数据的同步和互斥访问。对于多个访问的线程来说无非就是这么几个步骤,访问资源的时候先获取锁,发现自己可以执行就执行,不行就把锁给放了,通知其他线程再来获取。
参考资料
http://www.journaldev.com/1034/java-blockingqueue-example-implementing-producer-consumer-problem
http://www.baptiste-wicht.com/2010/09/java-concurrency-part-5-monitors-locks-and-conditions/
http://stackoverflow.com/questions/10395571/condition-vs-wait-notify-mechanism
相关推荐
在分析给定文件之前,首先需要明确文档讨论的是一个用于测控系统的Ethernet实时协议,其仿效了现场总线WorldFIP,并在设备级别进行了应用。文档还提到了“生产-消费”(Producer-Consumer)协议,并且对比了该协议与...
在这里,我们将讨论使用记录型信号量解决生产者-消费者问题的方法。 生产者-消费者问题 生产者-消费者问题是一个经典的同步问题,描述了两个或多个进程之间的协作和资源竞争。生产者进程负责生产数据,并将其存储...
综合上述讨论,Web攻击日志分析的挑战在于大规模数据的实时处理和高效分析。这需要构建一个包括数据收集、实时计算、存储和调度在内的分析平台。同时,攻击数据分析模型的建立应当紧密结合业务背景,才能确保分析...
这里我们关注的是“PC.rar”文件,它涉及到一个“PC问题”,这通常指的是生产者-消费者(Producer-Consumer)问题,这是一个经典的多线程同步问题。在这个问题中,"visual c"暗示我们将会讨论如何使用Microsoft的C++...
它以其高性能、高可扩展性和可靠性,广泛应用于实时数据流处理和大数据分析。本文将详细介绍如何在Linux环境下安装和使用最新版的Kafka 2.5.0,以及与Scala 2.13的兼容性。 首先,让我们了解一下Kafka的基本概念。...
- **发布/订阅模式**:Kafka 的核心功能之一,它支持发布者(Producer)和订阅者(Consumer)之间的消息传递机制。发布者并不直接与订阅者交互,而是通过中间件(Broker)来进行消息的传递。 - **Kafka集群架构**: ...
在`Producer_Consumer`文件中,可能包含实现这个例子的源代码,包括生产者和消费者线程的定义、互斥量的创建和使用,以及可能的MFC界面元素,如按钮和进度条,用于可视化显示生产与消费的过程。通过分析和学习这个...
- **监控与调优**:利用RocketMQ自带的监控工具和第三方工具进行性能监控和问题排查。 6. ** RocketMQ扩展** - **RocketMQ社区**:积极参与社区讨论,获取最新资讯,解决问题。 - **RocketMQ与Spring Boot整合**...
- `report.txt` 文件应该是实验的总结和分析,包括实验目的、实现方法、遇到的问题、解决方案以及实验结果的讨论。 在进行这个实验时,学生需要理解和应用操作系统的基本原理,如进程调度、内存管理、并发控制以及...
第十一章和第十二章则分别对Kafka中Producer和Consumer组件进行了深入剖析,包括连接池管理、sync和async机制、SimpleconsumerAPI以及High-level consumerAPI的详细分析。 第十三章对Kafka中Clients的剖析,深入...
Kafka-producer是发送消息的应用,而Kafka-consumer则负责接收和处理这些消息。Kafka-client库提供了与Kafka集群交互的API,使得开发者可以轻松地在Spring Boot等Java应用程序中集成Kafka。 Spring Boot是一个流行...
包括生产者消费者模式(Producer-Consumer)、读写锁模式(Readers-Writers Lock)、屏障模式(Barrier)和未来模式(Future)。 7. **设计模式的应用**:设计模式并非适用于所有场景,其选择应基于具体问题和需求...
此外,Kafka提供了丰富的命令行工具,如`kafka-topics.sh`用于管理主题,`kafka-console-producer.sh`和`kafka-console-consumer.sh`用于交互式生产消费消息,使得操作和调试变得简单。 5. **实际应用场景** - **...
该问题描述了两个角色:生产者(Producer)和消费者(Consumer),他们共同操作一个有限容量的缓冲区。生产者生成数据并放入缓冲区,而消费者从缓冲区取出数据进行消费。为了确保系统的正确运行,需要防止生产者过度...
4. **线程通信**:研究BlockingQueue接口及其实现,如ArrayBlockingQueue和LinkedBlockingQueue,以及Producer-Consumer模型。 实验结果与分析会包含线程运行的示例,分析并发执行的效果,以及在不同同步策略下的...
2. `client`:包含Producer和Consumer的实现,以及客户端API。 3. `remoting`:远程调用模块,实现了RocketMQ内部组件之间的网络通信。 4. `store`:存储引擎,负责消息的持久化和快速检索。 5. `tools`:工具集,...
4.1-4.11 详细解释了NameServer、Broker、Producer、Consumer、Topic、Queue、Producer Group、Consumer Group、Message、Tag和Offset的角色和功能。 **5. 集群搭建** 5.1-5.2 阐述了集群的搭建方式,特别是双主双...
- **生产者-消费者模式(Producer-Consumer Pattern)**:通过队列来解耦生产者和消费者之间的数据交换过程。 - **工作池模式(Worker Pool Pattern)**:预先创建一组线程来执行任务,从而减少线程创建和销毁的成本...
### Kafka & Mafka 技术分享及讨论 #### Kafka 设计关键点 **背景与动机:** Kafka 的设计初衷是为了应对互联网应用中产生的大量日志数据(如访问日志、用户行为数据如投票和评分等)的高效处理需求。在传统消息...