`

生产者消费者

 
阅读更多
1.
class Meal {
private final int orderNum;

public Meal(int orderNum) {
this.orderNum = orderNum;
}

public String toString() {
return "Meal " + orderNum;
}
}

class WaitPerson implements Runnable {
private Restaurant restaurant;

public WaitPerson(Restaurant r) {
restaurant = r;
}

public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null)
wait(); // ... for the chef to produce a meal
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}

class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;

public Chef(Restaurant r) {
restaurant = r;
}

public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait(); // ... for the meal to be taken
}
if (++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.println("Order up! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}

public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);

public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}

public static void main(String[] args) {
new Restaurant();
}
}

注意点:
(1).注意同步this是同步哪个对象
(2).exec.shutdownNow()当调用这个方法的时候,将会向所有由它启动的任务发生interrupt,当线程接受到中断时,并不会立即中断,而是遇到可中断方法时才中断,比如sleep。


2.java.util.concurrent.locks.Condition类的用法:
没有Condition的时,如下:

import java.util.concurrent.*;

class Car {
private boolean waxOn = false;

public synchronized void waxed() {
waxOn = true; // Ready to buff
notifyAll();
}

public synchronized void buffed() {
waxOn = false; // Ready for another coat of wax
notifyAll();
}

public synchronized void waitForWaxing() throws InterruptedException {
while (waxOn == false)
wait();
}

public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn == true)
wait();
}
}

class WaxOn implements Runnable {
private Car car;

public WaxOn(Car c) {
car = c;
}

public void run() {
try {
// while (!Thread.interrupted()) {
for(int i=0;i<2;i++){
System.out.println("in on"+i);
System.out.println("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
System.out.println("out on"+i);
}

// }
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}

class WaxOff implements Runnable {
private Car car;

public WaxOff(Car c) {
car = c;
}

public void run() {
try {
// while (!Thread.interrupted()) {
for(int i=0;i<2;i++){

System.out.println("in off"+i);
TimeUnit.MILLISECONDS.sleep(1000);
car.waitForWaxing();
System.out.println("Wax Off! ");
car.buffed();
System.out.println("out off"+i);
}
// }
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}

public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
exec.shutdownNow(); // Interrupt all tasks
}
}

注意:当调用notifyAll时,如果没有没有任何线程挂起,将会忽略

使用condition之后:
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;

public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}

public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}

public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while (waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}

public void waitForBuffing() throws InterruptedException {
lock.lock();
try {
while (waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}

class WaxOn implements Runnable {
private Car car;

public WaxOn(Car c) {
car = c;
}

public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}

class WaxOff implements Runnable {
private Car car;

public WaxOff(Car c) {
car = c;
}

public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(1000);
car.waitForWaxing();
System.out.println("Wax Off! ");
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}

public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}

3.生产者消费者队列
wait和notifyAll方法以一种非常低效的方式解决了任务互操作问题,即每次交互都需要握手。我们可以使用同步队列来解决任务协作问题,同步队列在任何时候都只允许一个任务插入或异常元素。如果消费者试图从队列中获取对象,而改队列此时为空,那么队列还可以挂起消费者任务,当有内容时再恢复消费者任务。例如:

class LiftOffRunner implements Runnable {
BlockingQueue<LiftOff> rockets;

public LiftOffRunner(BlockingQueue<LiftOff> queue) {
rockets = queue;
}

public void add(LiftOff lo) {
try {
System.out.println("前:"+rockets.size());
rockets.put(lo);
System.out.println("后:"+rockets.size());
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}

public void run() {
try {
while (!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run(); // Use this thread
}
} catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println("Exiting LiftOffRunner");
}
}

public class TestBlockingQueues {
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (java.io.IOException e) {
throw new RuntimeException(e);
}
}

static void getkey(String message) {
System.out.println(message);
getkey();
}

static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for (int i = 0; i < 5; i++){
System.out.println(":"+runner.rockets.size());
runner.add(new LiftOff(5));

}
getkey("Press 'Enter' (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}

public static void main(String[] args) {
                //大小无限制,可以随便往里面添加元素
test("LinkedBlockingQueue", // Unlimited size
new LinkedBlockingQueue<LiftOff>());
                //大小限制固定值,当添加元素时,如果队列已经满,那将会挂起,等有元素取走时,再添加
test("ArrayBlockingQueue", // Fixed size
new ArrayBlockingQueue<LiftOff>(3));
                //大小限制固定值为1,当添加1个元素,而没有取走时,将会挂起,等取走后,再恢复
test("SynchronousQueue", // Size of 1
new SynchronousQueue<LiftOff>());
}
}


更好的一个例子:先生产土司、然后涂黄油、最后涂果酱

class Toast {
  public enum Status { DRY, BUTTERED, JAMMED }
  private Status status = Status.DRY;
  private final int id;
  public Toast(int idn) { id = idn; }
  public void butter() { status = Status.BUTTERED; }
  public void jam() { status = Status.JAMMED; }
  public Status getStatus() { return status; }
  public int getId() { return id; }
  public String toString() {
    return "Toast " + id + ": " + status;
  }
}

class ToastQueue extends LinkedBlockingQueue<Toast> {}

class Toaster implements Runnable {
  private ToastQueue toastQueue;
  private int count = 0;
  private Random rand = new Random(47);
  public Toaster(ToastQueue tq) { toastQueue = tq; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        TimeUnit.MILLISECONDS.sleep(
          100 + rand.nextInt(500));
        // Make toast
        Toast t = new Toast(count++);
        System.out.println(t);
        // Insert into queue
        toastQueue.put(t);
      }
    } catch(InterruptedException e) {
      System.out.println("Toaster interrupted");
    }
    System.out.println("Toaster off");
  }
}

// Apply butter to toast:
class Butterer implements Runnable {
  private ToastQueue dryQueue, butteredQueue;
  public Butterer(ToastQueue dry, ToastQueue buttered) {
    dryQueue = dry;
    butteredQueue = buttered;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = dryQueue.take();
        t.butter();
        System.out.println(t);
        butteredQueue.put(t);
      }
    } catch(InterruptedException e) {
      System.out.println("Butterer interrupted");
    }
    System.out.println("Butterer off");
  }
}

// Apply jam to buttered toast:
class Jammer implements Runnable {
  private ToastQueue butteredQueue, finishedQueue;
  public Jammer(ToastQueue buttered, ToastQueue finished) {
    butteredQueue = buttered;
    finishedQueue = finished;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = butteredQueue.take();
        t.jam();
        System.out.println(t);
        finishedQueue.put(t);
      }
    } catch(InterruptedException e) {
      System.out.println("Jammer interrupted");
    }
    System.out.println("Jammer off");
  }
}

// Consume the toast:
class Eater implements Runnable {
  private ToastQueue finishedQueue;
  private int counter = 0;
  public Eater(ToastQueue finished) {
    finishedQueue = finished;
  }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        // Blocks until next piece of toast is available:
        Toast t = finishedQueue.take();
        // Verify that the toast is coming in order,
        // and that all pieces are getting jammed:
        if(t.getId() != counter++ ||
           t.getStatus() != Toast.Status.JAMMED) {
          System.out.println(">>>> Error: " + t);
          System.exit(1);
        } else
          System.out.println("Chomp! " + t);
      }
    } catch(InterruptedException e) {
      System.out.println("Eater interrupted");
    }
System.out.println("Eater off");
  }
}

public class ToastOMatic {
  public static void main(String[] args) throws Exception {
    ToastQueue dryQueue = new ToastQueue(),
               butteredQueue = new ToastQueue(),
               finishedQueue = new ToastQueue();
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Toaster(dryQueue));
    exec.execute(new Butterer(dryQueue, butteredQueue));
    exec.execute(new Jammer(butteredQueue, finishedQueue));
    exec.execute(new Eater(finishedQueue));
    TimeUnit.SECONDS.sleep(5);
    exec.shutdownNow();
  }
}

4.在输入输出问题上,也可以使用类似的可自动挂起和恢复的类pipedWriter和pipedReader类,例如:

class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();

public PipedWriter getPipedWriter() {
return out;
}

public void run() {
try {
while (true)
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch (IOException e) {
System.out.println(e + " Sender write exception");
} catch (InterruptedException e) {
System.out.println(e + " Sender sleep interrupted");
}
}
}

class Receiver implements Runnable {
private PipedReader in;

public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}

public void run() {
try {
while (true) {
// Blocks until characters are there:
System.out.println("Read: " + (char) in.read() + ", ");
}
} catch (IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}

public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
}

但是和同步队列比起来,还是同步队列更加健壮,而且容易使用。
分享到:
评论

相关推荐

    生产者消费者问题

    生产者消费者问题解决方案 生产者消费者问题是计算机科学中的一种经典问题,描述的是在多线程环境中,多个生产者线程和消费者线程之间的协作问题。生产者线程负责生产数据,并将其存储在缓冲区中,而消费者线程则从...

    生产者 消费者 模式 c++

    生产者消费者模式是一种多线程或并发编程中的经典设计模式,它主要用于解决系统资源的高效利用和同步问题。在C++中实现生产者消费者模式,我们可以利用C++11及更高版本提供的线程库()、互斥量()、条件变量()等...

    labview 生产者消费者例子

    7. **例程分析**:在提供的"生产者消费者"例程中,可能包含了创建生产者和消费者线程、初始化队列、添加数据到队列、从队列中取出数据、以及使用同步机制保证正确性的代码片段。通过对这些例程的分析和运行,可以...

    生产者消费者的c++代码实现

    "生产者消费者问题C++代码实现" 生产者消费者问题是一个经典的进程同步问题,该问题最早由 Dijkstra 提出,用以演示他提出的信号量机制。在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品...

    生产者消费者问题c++实现

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的同步和异步操作。在C++中,我们可以利用标准库中的互斥量(mutex)、条件变量(condition_variable)等工具来实现这个问题...

    生产者消费者架构的串口高速数据采集.rar_greatervgw_labview_串口_串口 消费者_生产者消费者

    在IT领域,生产者消费者架构是一种常见的多线程或并发编程模型,用于高效地处理数据流。这个模型基于消息队列的概念,适用于各种环境,包括LabVIEW(Laboratory Virtual Instrument Engineering Workbench)这样的...

    生产者消费者问题,MFC实现

    生产者消费者问题是多线程编程中的经典模型,用于展示如何高效地在多个线程之间共享资源。MFC(Microsoft Foundation Classes)是微软提供的一套面向对象的C++库,用于构建Windows应用程序。在这个问题中,我们将...

    多线程简易实现生产者消费者模式

    生产者消费者模式是一种经典的多线程同步问题解决方案,它源于现实世界中的生产流水线,用于描述生产者(Producer)和消费者(Consumer)之间的协作关系。在这个模式中,生产者负责生成产品并放入仓库,而消费者则从...

    生产者消费者问题C#

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过线程间的协作来解决资源的并发访问问题。在C#中,我们可以利用System.Threading命名空间提供的工具来实现这一模型。下面将详细阐述这个问题的背景、...

    利用数组解决生产者消费者问题

    生产者消费者问题是多线程编程中的经典模型,用于模拟两个或多个并发执行的实体(生产者和消费者)共享有限资源的情况。在这个问题中,生产者负责生成数据并放入缓冲区,而消费者则从缓冲区取出数据进行处理。当缓冲...

    python实现生产者消费者并发模型

    多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...

    生产者消费者问题 MFC

    生产者消费者问题是多线程编程中的一个经典案例,它展示了如何通过共享资源在并发环境中实现线程间的协调。在这个问题中,"生产者"线程负责生成数据,而"消费者"线程则负责消费这些数据。MFC(Microsoft Foundation ...

    Qt入门练习项目——生产者消费者模型.zip

    **Qt入门练习项目——生产者消费者模型** 在编程领域,生产者消费者模型是一种常见的多线程同步问题的解决方案。这个模型通常用于处理数据流的异步处理,其中一个或多个线程(生产者)生成数据,而其他线程(消费者...

    操作系统实验:生产者消费者

    操作系统中的“生产者消费者”问题是一个经典的多线程同步问题,源自计算机科学的并发控制理论。这个模型描述了两个或多个线程之间的交互,其中一部分线程(生产者)负责生成数据,另一部分线程(消费者)则负责处理...

    操作系统课程设计——生产者消费者问题Java图形界面动态演示

    设计目的:通过研究Linux 的进程机制和信号量实现生产者消费者问题的并发控制。说明:有界缓冲区内设有20 个存储单元,放入/取出的数据项设定为1‐20 这20 个整型数。设计要求:1)每个生产者和消费者对有界缓冲区...

    C语言实现生产者消费者问题

    C语言实现生产者消费者问题,分配具有n个缓冲区的缓冲池,作为共享资源。 定义两个资源型信号量empty 和full,empty信号量表示当前空的缓冲区数量,full表示当前满的缓冲区数量。 定义互斥信号量mutex,当某个进程...

Global site tag (gtag.js) - Google Analytics