论坛首页 Java企业应用论坛

java多线程 消费者-生产者

浏览 12036 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (13) :: 隐藏帖 (0)
作者 正文
   发表时间:2010-03-12  

java多线程一般都会讲消费者-生产者模型

生产者与消费者模型中,要保证以下几点:
1 同一时间内只能有一个生产者生产
2 同一时间内只能有一个消费者消费
3 生产者生产的同时消费者不能消费
4 消息队列满时生产者不能继续生产
5 消息队列空时消费者不能继续消费

参考了下网上一个代码实例http://www.talented.com.cn/archives/2007/5/16_141.html  发现作者写得有问题 修改了一些代码  现在ok了

----------------------------------------------------Message类

package com.example.test;

public class Message {
 
 public static int id;
 public String content;

 public String getContent() {
  return content;
 }

 public void setContent(String content) {
  this.content = content;
 }

 public int getId() {
  return id;
 }

 public void setId(int id) {
  Message.id = id;
 }
}

----------------------------------------------------Queue类

package com.example.test;

import java.util.ArrayList;
import java.util.List;

public class Queue {
 List<Message> queue = new ArrayList<Message>();

 /** 队列中message对象的最大值,默认为5 */
 int maxMessageNum = 5;

 public synchronized void produce(Message message) {

  this.notifyAll();
  while (queue.size() == maxMessageNum) {
   System.out.println(Thread.currentThread().getName()
     + "  队列满!等待中。。。");
   try {
    this.wait();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  queue.add(message);
  System.out.println(Thread.currentThread().getName() + "正在生产"
    + message.getContent() + "。。。  ,当前个数:" + getCount());

 }

 public synchronized void consume() {

  this.notifyAll();
  while (queue.size() == 0) {
   System.out.println(Thread.currentThread().getName()
     + "  队列空!等待中。。。");
   try {
    System.out.println("begin!");
    wait();
    System.out.println("end!");
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

  Message message = queue.get(0);
  queue.remove(0);
  System.out.println(Thread.currentThread().getName() + "正在消费"
    + message.getContent() + "。。。 ,当前个数: " + getCount());

 }

 public synchronized int getCount() {
  return queue.size();
 }
}


----------------------------------------------------Test类

package com.example.test;

public class Test {

 public static void main(String[] args) {

  Queue Q = new Queue();

  Producer wQ1 = new Producer(Q);
  Producer wQ2 = new Producer(Q);
 
  Consumer rQ1 = new Consumer(Q);
  Consumer rQ2 = new Consumer(Q);
  Consumer rQ3 = new Consumer(Q);
 
  Thread threadWQ1 = new Thread(wQ1, "thread-wQ1");
  Thread threadWQ2 = new Thread(wQ2, "thread-wQ2");

  Thread threadRQ1 = new Thread(rQ1, "thread-rQ1");
  Thread threadRQ2 = new Thread(rQ2, "thread-rQ2");
  Thread threadRQ3 = new Thread(rQ3, "thread-rQ3");
 
  threadWQ1.start();
  threadWQ2.start();
 
  threadRQ1.start();
  threadRQ2.start();
  threadRQ3.start();
 }
}

class Producer extends Thread {

 private Queue queue;

 Producer(Queue queue) {
  this.queue = queue;
 }

 public void run() {
 
  while (true) {
   Message message = new Message();
   message.setId(++Message.id);
   message.setContent("food"+Message.id);
   queue.produce(message);
   try {
    sleep(100);
   } catch (Exception e) {
   }
  }

 }
}

class Consumer extends Thread {
 private Queue queue;

 Consumer(Queue queue) {
  this.queue = queue;
 }

 public void run() {
  while (true) {
   queue.consume();
   try {
    sleep(100);
   } catch (Exception e) {
   }

  }
 }
}

   发表时间:2010-03-13  
做开发一年了 没写过一个线程类
0 请登录后投票
   发表时间:2010-03-13  
不错,不过synchronized 关键字范围过大
0 请登录后投票
   发表时间:2010-03-13  
为什么在consume方法和produce方法开始的时候要调用 this.notifyAll(); 这个应该是生产者在生产完产品后调用通知其他线程,同样消费者在消费完产品后也要调用 this.notifyAll();方法来通知其他线程,为什么一上来调用它呢?
0 请登录后投票
   发表时间:2010-03-13  
p_x1984 写道

java多线程一般都会讲消费者-生产者模型

生产者与消费者模型中,要保证以下几点:
1 同一时间内只能有一个生产者生产
2 同一时间内只能有一个消费者消费
3 生产者生产的同时消费者不能消费
4 消息队列满时生产者不能继续生产
5 消息队列空时消费者不能继续消费

 

---------------------------------------------

除了4和5,其他都不是必要条件.实际上,更多的生产者消费者应用都不遵循1,2,3.

 

0 请登录后投票
   发表时间:2010-03-14  
asialee 写道
为什么在consume方法和produce方法开始的时候要调用 this.notifyAll(); 这个应该是生产者在生产完产品后调用通知其他线程,同样消费者在消费完产品后也要调用 this.notifyAll();方法来通知其他线程,为什么一上来调用它呢?


楼上有道理,我认为在consume和produce的结尾分别获得producer和consumer的锁,然后notify它们。在方法一开始唤醒不知道有什么意义。
0 请登录后投票
   发表时间:2010-03-15  
看看jdk的BlockQueue
两年前我也自己实现生产消费
后来发现有现成的东西
0 请登录后投票
   发表时间:2010-03-15  
谢谢各位的提醒!
0 请登录后投票
   发表时间:2010-03-15  
问下各位,“  public synchronized void produce(Message message) {”
该方法锁定的资源是“message”还是“queue”?
0 请登录后投票
   发表时间:2010-03-15  
下面的是我写的:
import java.util.ArrayList;
import java.util.List;


public class ProductQueue {
private List<Product> products;

private int maxSize;

public ProductQueue(int maxSize){
this.maxSize = maxSize;
products = new ArrayList<Product>(maxSize);
}

public synchronized void addProdcut(Product product){
while(isFull()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(product);
System.out.println("Produce:" + product.getId() + " " + product.getMadeDate());
notifyAll();
}

public synchronized void removeProduct(){
while(isEmpty()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = products.get(products.size() - 1);
products.remove(product);
System.out.println("Consume:" + product.getId() + " " + product.getMadeDate());
notifyAll();
}



public synchronized boolean isFull(){
boolean isFull =  products.size() == maxSize;
if(isFull){
System.out.println("The queue is full.");
}
return isFull;
}

public synchronized boolean isEmpty(){
boolean isEmpty = products.size() <= 0;
if(isEmpty){
System.out.println("The queue is empty.");
}
return isEmpty;
}
}



public class Consumer implements Runnable{
private ProductQueue queue;

public Consumer(ProductQueue queue){
this.queue = queue;
}

@Override
public void run() {
while(true){
queue.removeProduct();
try {
Thread.sleep(100l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}


import java.util.Date;


public class Producer implements Runnable{
private static int id;
private ProductQueue queue;

public Producer(ProductQueue queue){
this.queue = queue;
}

@Override
public void run() {
while(true){
Product product = new Product(id++,new Date());
queue.addProdcut(product);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}


import java.util.Date;


public class Product {
private int id;
private Date madeDate;

public Product(int id, Date madeDate){
this.id = id;
this.madeDate = madeDate;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public Date getMadeDate() {
return madeDate;
}

public void setMadeDate(Date madeDate) {
this.madeDate = madeDate;
}

}



public class ThreadMain {
public static void main(String[] args) {
ProductQueue queue = new ProductQueue(50);

for(int i = 0; i < 10; i++){
Producer producer = new Producer(queue);
new Thread(producer).start();
}

for(int i = 0; i < 10; i++){
Consumer consumer = new Consumer(queue);
new Thread(consumer).start();
}
}
}
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics