`

生产者消费者模型的演变

 
阅读更多

想复习一下生产者和消费者通过Java代码如何实现,网上搜集了一个,《Thinking in Java》上面有两个,实现各有侧重。与大家分享,也当自己学习。

 

介绍:

生产者、消费者简单说这个模型核心角色有3个,即生产者、消费者、产品(关键区)。

生产者和消费者对产品(关键区)的操作时要互斥,保证并发时的正确性。

 

 

代码实现:

网络上最常见也是最简单的实现,直接对关键区加锁。生产者、消费者公共使用主线程MultiThread的container。通过对container进行synchronized控制,使用wait,notify来控制进程间的交替。

这种实现方式实现需要注意的有以下几点:

1.对关键区进行synchronized控制。

2.只有线程获取了关键区的访问权,才可以通过关键区对象调用notify,notifyAll之类的方法。否则就会抛出IllegalMonitorStateException的异常。

3.wait,notify为Object的方法,在该模式下都是关键区对象,如container来调用相关方法,而非其他线程。

 

消费者代码

package ProductAndConsume; 
import java.util.List; 

public class Consume implements Runnable{ 
	private List container = null; 
	private int count; 
	public Consume(List lst){ 
		this.container = lst; 
	} 
	public void run() { 

		while(true){ 
			synchronized (container) { 
				if(container.size()== 0){ 
					try { 
						container.wait();//容器为空,放弃锁,等待生产 
					} catch (InterruptedException e) { 
						e.printStackTrace(); 
					} 
				} 
				try { 
					Thread.sleep(100); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
				container.remove(0); 
				container.notify(); 
				System.out.println("我吃了"+(++count)+"个"); 
			} 
		} 

	} 

} 

 生产者:

package ProductAndConsume; 
import java.util.List; 

public class Product implements Runnable { 
	private List container = null; 
	private int count; 
	public Product(List lst) { 
		this.container = lst; 
	} 

	public void run() { 
		while (true) { 
			synchronized (container) { 
				if (container.size() > MultiThread.MAX) { 
					//如果容器超过了最大值,就不要在生产了,等待消费 
					try { 
						container.wait(); 
					} catch (InterruptedException e) { 
						e.printStackTrace(); 
					} 
				} 
				try { 
					Thread.sleep(100); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
				container.add(new Object()); 
				container.notify(); 
				System.out.println("我生产了"+(++count)+"个"); 
			} 
		} 

	} 

} 

 主线程,包括公共资源。

package ProductAndConsume; 
import java.util.ArrayList; 
import java.util.List; 

public class MultiThread { 
	private List container = new ArrayList(); 
	public final static int MAX = 5; 
	public static void main(String args[]){ 

		MultiThread m = new MultiThread(); 

		new Thread(new Consume(m.getContainer())).start();
		new Thread(new Consume(m.getContainer())).start();
		new Thread(new Product(m.getContainer())).start(); 
		new Thread(new Consume(m.getContainer())).start(); 
		new Thread(new Product(m.getContainer())).start(); 
	} 
	public List getContainer() { 
		return container; 
	} 

	public void setContainer(List container) { 
		this.container = container; 
	}
}

  

Thinking in Java上面关于消费者、生产者的实现与上面方式基本相同。唯一不同的是,线程直接对自身加锁,而非关键区。因为线程中保留了对关键区的一个引用。

这种设计方式很严谨,同时考虑了信号量丢失的问题。最后的打印结果也很值得分析。

Out of food, closing
Order up! 
Chef interrupted
WaitPerson interrupted 

当食物数量满了之后,关闭线程池被关闭,interrupt所有线程。此时线程在调用sleep或wait方法,就会抛出InterruptedException异常,从而打印出上面的结果。

实现代码如下:

package TIJ4PAC;
/**
 * 生产者,消费者完整的例子
 */
//: concurrency/Restaurant.java
// The producer-consumer approach to task cooperation.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Meal {
  private final int orderNum;
  public Meal(int orderNum) { this.orderNum = orderNum; }
  public String toString() { return "Meal " + orderNum; }
}

class WaitPerson implements Runnable {
  private Restaurant restaurant;
  public WaitPerson(Restaurant r) { restaurant = r; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          while(restaurant.meal == null)
            wait(); // ... for the chef to produce a meal,防止其他服务员突然闯入,夺走订单
        }
        System.out.println("Waitperson got " + restaurant.meal);
        synchronized(restaurant.chef) {
          restaurant.meal = null;
          restaurant.chef.notifyAll(); // Ready for another
        }
      }
    } catch(InterruptedException e) {
      System.out.println("WaitPerson interrupted");
    }
  }
}

class Chef implements Runnable {
  private Restaurant restaurant;
  private int count = 0;
  public Chef(Restaurant r) { restaurant = r; }
  public void run() {
    try {
      while(!Thread.interrupted()) {
        synchronized(this) {
          while(restaurant.meal != null)
            wait(); // ... for the meal to be taken,防止其他初始突然闯入,夺走机会
        }
        if(++count == 10) {
          System.out.println("Out of food, closing");
          restaurant.exec.shutdownNow();
        }
        System.out.println("Order up! ");
        synchronized(restaurant.waitPerson) {
          restaurant.meal = new Meal(count);
          restaurant.waitPerson.notifyAll();
        }
        TimeUnit.MILLISECONDS.sleep(100);
      }
    } catch(InterruptedException e) {
      System.out.println("Chef interrupted");
    }
  }
}

public class Restaurant {
  Meal meal;
  ExecutorService exec = Executors.newCachedThreadPool();
  WaitPerson waitPerson = new WaitPerson(this);
  Chef chef = new Chef(this);
  public Restaurant() {
    exec.execute(chef);
    exec.execute(waitPerson);
  }
  public static void main(String[] args) {
    new Restaurant();
  }
} /* Output:
Order up! 
Waitperson got Meal 1
Order up! 
Waitperson got Meal 2
Order up! 
Waitperson got Meal 3
Order up! 
Waitperson got Meal 4
Order up! 
Waitperson got Meal 5
Order up! 
Waitperson got Meal 6
Order up! 
Waitperson got Meal 7
Order up! 
Waitperson got Meal 8
Order up! 
Waitperson got Meal 9
Out of food, closing
Order up! 
Chef interrupted
WaitPerson interrupted
*///:~

 

最后一种实现,采用了BlockingQueue的方式,采用这种方式的好处就是,生产者、消费者无需再对产品(关键区)进行加锁控制了。BlockingQueue的put和take方法都会自动对关键区进行互斥的,无需编码者手动控制。代码结构简单许多。

下面这个例子挺有意思,通过线程模拟制作吐司面包,然后抹黄油,最后加果酱,吃吐司的过程。老外就是幽默呀~

代码如下:

package BlockingQueue;

//: concurrency/ToastOMatic.java
// A toaster that uses queues.
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Toast {
	public enum Status { DRY, BUTTERED, JAMMED }
	private Status status = Status.DRY;
	private final int id;
	public Toast(int idn) { id = idn; }
	public void butter() { status = Status.BUTTERED; }	//抹黄油
	public void jam() { status = Status.JAMMED; }	//涂果酱
	public Status getStatus() { return status; }
	public int getId() { return id; }
	public String toString() {
		return "Toast " + id + ": " + status;
	}
}

class ToastQueue extends LinkedBlockingQueue<Toast> {}

class Toaster implements Runnable {	//制作土司的线程
	private ToastQueue toastQueue;
	private int count = 0;
	private Random rand = new Random(47);
	public Toaster(ToastQueue tq) { toastQueue = tq; }
	public void run() {
		try {
			while(!Thread.interrupted()) {
				TimeUnit.MILLISECONDS.sleep(
						100 + rand.nextInt(500));
				// Make toast
				Toast t = new Toast(count++);
				System.out.println(t);
				// Insert into queue
				toastQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Toaster interrupted");
		}
		System.out.println("Toaster off");
	}
}

// Apply butter to toast:
class Butterer implements Runnable {	//抹黄油的人
	private ToastQueue dryQueue, butteredQueue;
	public Butterer(ToastQueue dry, ToastQueue buttered) {
		dryQueue = dry;
		butteredQueue = buttered;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = dryQueue.take();
				t.butter();
				System.out.println(t);
				butteredQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Butterer interrupted");
		}
		System.out.println("Butterer off");
	}
}

// Apply jam to buttered toast:
class Jammer implements Runnable {	//擦果酱的人
	private ToastQueue butteredQueue, finishedQueue;
	public Jammer(ToastQueue buttered, ToastQueue finished) {
		butteredQueue = buttered;
		finishedQueue = finished;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = butteredQueue.take();
				t.jam();
				System.out.println(t);
				finishedQueue.put(t);
			}
		} catch(InterruptedException e) {
			System.out.println("Jammer interrupted");
		}
		System.out.println("Jammer off");
	}
}

// Consume the toast:
class Eater implements Runnable {	//吃吐司的人
	private ToastQueue finishedQueue;
	private int counter = 0;
	public Eater(ToastQueue finished) {
		finishedQueue = finished;
	}
	public void run() {
		try {
			while(!Thread.interrupted()) {
				// Blocks until next piece of toast is available:
				Toast t = finishedQueue.take();
				// Verify that the toast is coming in order,
				// and that all pieces are getting jammed:
				if(t.getId() != counter++ ||
						t.getStatus() != Toast.Status.JAMMED) {
					System.out.println(">>>> Error: " + t);
					System.exit(1);
				} else
					System.out.println("Chomp! " + t);
			}
		} catch(InterruptedException e) {
			System.out.println("Eater interrupted");
		}
		System.out.println("Eater off");
	}
}

public class ToastOMatic {
	public static void main(String[] args) throws Exception {
		ToastQueue dryQueue = new ToastQueue(),
		butteredQueue = new ToastQueue(),
		finishedQueue = new ToastQueue();	//生成3个队列。分别是干吐司,抹了黄油的吐司,抹了果酱的吐司(完成的吐司)
		
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new Toaster(dryQueue));	//制作
		exec.execute(new Butterer(dryQueue, butteredQueue));	//抹黄油
		exec.execute(new Jammer(butteredQueue, finishedQueue));		//抹果酱
		exec.execute(new Eater(finishedQueue));		//吃
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}
} /* (Execute to see output) *///:~
//使用BolckingQueue 简化明显,在使用显式的wait和notifyAll方法时存在的类和类直接的耦合被消除了,每一个类都和它的BlockinQueue通讯

 

就与大家分享到这里,代码如附件,如有错误欢迎指正。

分享到:
评论

相关推荐

    Dixit-Stiglitz模型与克鲁格曼模型浅析[借鉴].pdf

    在该模型中,生产者由于规模经济效应倾向于减少产品种类以降低成本,而消费者则倾向于多样化消费。这种矛盾在市场竞争下得以解决,形成了垄断竞争的市场结构,即每个厂商生产略微差异化的商品,以满足消费者对多样性...

    动态 CGE Model,动态cge模型

    5. **优化行为**:模型中的经济主体(如消费者、生产者)都会追求最大化效用或利润。 6. **递归动态**:模型将时间序列纳入,每个时期的决策取决于上一时期的状态,并影响下一时期的结果。 7. **Stata编程**:在...

    动态CGE Model_stataCGE_动态CGE_动态cge模型_CGE模型_CGE

    2. **需求函数**:表示消费者对各种商品和服务的需求,通常基于希克斯-马歇尔效用理论或瓦尔拉斯需求法则。 3. **价格决定**:根据供求关系,确定商品和服务的市场价格,确保各市场的均衡。 4. **劳动力市场**:考虑...

    消费者需求分析PPT课件.pptx

    暗箱理论是消费者行为研究的重要模型,关注消费者购买过程中的5W1H(What、Who、When、Where、How、Why)。通过对这些因素的深入理解,企业能更准确地预测和引导消费者行为。此外,影响消费者购买行为的因素还包括...

    微分方程模型.ppt

    2. **经济增长模型**:这类模型可能涉及到国民生产总值、消费、投资等因素的动态关系,通过微分方程可以预测经济的发展趋势和调控政策的效果。 3. **战争策略模型**:如正规战与游击战的模拟,可能需要考虑军队规模...

    经济结构的完全信息静态博弈模型

    本模型将三次产业与投资消费结构联系起来,构建了产业结构完全信息静态博弈模型和支出结构完全信息静态博弈模型。这两个模型共同构成了一个完整的经济结构静态博弈模型系统。 #### 模型中的关键参数与假设 在模型...

    基于CORBA的C_S分布式计算模型.pdf

    在该模型中,系统被分为客户端、服务器端和数据库三个层次,形成一种生产者和消费者模型。在这种模型中,客户端作为消费者,向服务器请求服务或数据。服务器端作为生产者,提供服务或数据。ORB(Object Request ...

    行业数据-20年中国消费者对文胸有无钢圈偏好.rar

    标题中的“行业数据-20年中国消费者对文胸有无钢圈偏好.rar”指的是一个关于中国消费者在2020年对于文胸是否含有...同时,这也提醒我们在设计和销售产品时,必须密切关注消费者的需求变化,以适应不断演变的市场环境。

    数学建模-差分方程模型.ppt

    在这种模型中,供需函数是分析的关键,消费者的购买意愿和生产者的供给决策是构成模型的两个基本要素。通过构建差分方程,我们可以分析市场在不同条件下的动态行为,比如,价格和数量如何达到均衡状态,以及政府的...

    PCR三大营销理论演变与案例分析PPT教案.pptx

    这些理论的演变反映了营销实践的发展,从单一的产品推销转变为以消费者为中心,再到建立长期合作关系的过程。在实际应用中,企业通常会结合这些理论,根据市场环境、目标客户和自身资源灵活调整营销策略。理解这些...

    基于时序运行模拟的新能源配置储能替代火电规划模型.pdf

    未来,随着新能源和储能技术的进一步发展与应用,该模型和相应的分析方法有望成为制定能源战略的重要工具,为实现能源生产和消费的绿色转型提供更加坚实的技术支撑和决策支持。通过科学规划和技术创新,我们有理由...

    毕业设计MATLAB_使用两种不同的方法创建和模拟 Ramsey-Cass-Koopmans 经济模型.zip

    3. **动态优化**:模型试图找到一条经济增长路径,使得消费者的跨时期效用最大化。 4. **稳态**:当人口增长率、储蓄率和生产力参数等达到一定平衡时,经济会达到稳态,此时资本存量、产出和消费稳定不变。 在...

    电子商务与物流配送的模型.pptx

    物流在电子商务中扮演着至关重要的角色,它是连接商家与消费者的关键纽带,确保商品能够从生产者顺利到达消费者手中。 物流的演变历程: 1. 早期的“以物易物”时代,交易中没有货币参与,物流即商品的实际转移。 2...

    颠覆性创新四阶段扩散过程模型——基于液晶电视机与山寨手机案例.docx

    随着技术的快速进步和消费者需求的多元化,颠覆性创新已成为推动行业变革和重塑市场格局的重要力量。《颠覆性创新四阶段扩散过程模型——基于液晶电视机与山寨手机案例》一文,深入分析了颠覆性创新如何在不同市场...

    社经市场经济学习教案.pptx

    这个市场是所有市场中最基础的,因为生产者市场需求、中间商市场需求和政府需求都是由消费者需求派生出来的。消费者市场的需求和购买行为直接影响企业的营销策略。 组织市场则是由各种组织作为购买单位,目的是为了...

    物流第三章 运输及运输优化模型.pptx

    它是连接生产者与消费者之间的桥梁,影响着整个物流系统效率与成本的关键环节。在物流领域,运输不仅仅是货物移动的简单过程,更是一个复杂的系统工程,它涉及到货物的载运、输送及相关操作的整合,如集货、分配、...

    联合国世界计量经济联接模型系统.docx

    通过这个模型,政策制定者可以模拟不同的经济政策情景,比如增加外商直接投资、调整利率、改变汇率、调整农产品收购价格、改变社会消费和税收政策。例如,增加外商直接投资可以促进GDP和固定资产投资的增长,提高...

    基于动力学模型的住房投资分析

    通过深入研究这些模型,我们可以更好地理解和预测房地产市场的演变,为政策制定者提供关于如何平衡投资、消费和经济增长的策略建议。这种研究方法对于预防市场过度波动,维护金融稳定,以及实现可持续的城市发展具有...

    区域经济学第七讲区域空间相互作用模型.ppt

    在假设消费者均匀分布、市场需求无限且不变、生产成本和运费在各地均等、产品运费率一致且由消费者承担的条件下,企业可以通过比较相邻生产基地的生产和运输成本来找到合理的边界点Z,从而最小化社会总劳动耗费。...

Global site tag (gtag.js) - Google Analytics