`

java 并发(2) 生产者和消费者问题的几种实现

    博客分类:
  • Java
 
阅读更多

有几种实现方法,一种是通过临界缓冲区的wait和notify来协调多个线程的并发,一种可以借用jdk 1.5+自带的BlockingQueue来实现,还有一种可以通过jdk1.5+的信号量机制来控制并发。

 

jdk1.5- 采用Object的wait 和notify方法来实现:

package com.xx.concurrent.commonUse;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;

import com.fangming.pub.StringUtils;

class Productor extends Thread {

	Queue<String> buffer;

	int quality;

	public Productor(Queue<String> buffer, int quality) {
		this.buffer = buffer;
		this.quality = quality;
	}

	@Override
	public void run() {
		try {
			product();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		ProductorAndCustomer.latch.countDown();
	}

	private void product() throws InterruptedException {
		synchronized (buffer) {
			while (quality > 0) {
				if (buffer.size() == ProductorAndCustomer.BUFFERSIZE) {
					buffer.wait();
				} else {
					String str = StringUtils.getRandomString(10);
					buffer.offer(str);
					quality--;
					System.out.println("####producer product " + str);
					buffer.notify();
				}
			}
		}
	}
}

class Customer extends Thread {

	Queue<String> buffer;
	int quality;

	public Customer(Queue<String> buffer, int quality) {
		this.buffer = buffer;
		this.quality = quality;
	}

	@Override
	public void run() {
		try {
			cusume();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		ProductorAndCustomer.latch.countDown();
	}

	private void cusume() throws InterruptedException {
		synchronized (buffer) {
			while (quality > 0) {
				if (buffer.size() == 0) {
					buffer.wait();
				} else {
					String str = buffer.poll();
					System.out.println("$$$$customer cocume " + str);
					quality--;
					buffer.notify();
				}
			}
		}
	}
}

public class ProductorAndCustomer {

	static final int BUFFERSIZE = 10;

	static CountDownLatch latch = new CountDownLatch(15);

	public static void main(String[] args) throws InterruptedException {

		long startTime = System.nanoTime();
		Queue<String> buffer = new LinkedList<String>();
		for (int i = 0; i < 10; i++) {
			Thread t1 = new Productor(buffer, 100);
			t1.start();
		}

		for (int i = 0; i < 5; i++) {
			Thread t2 = new Customer(buffer, 200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);

	}

}

 

jdk1.5+ 采用BlockingQueue来实现

package com.xx.concurrent.commonUse;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;


public class MutiProductorAndCustomer {
	
	ArrayBlockingQueue<String> buffer = new ArrayBlockingQueue<String>(5);
	
	static CountDownLatch latch = new CountDownLatch(15); 
	
	class Productor extends Thread {
		
		int quality;
		
		Productor(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					product();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++;
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void product() throws InterruptedException{
			
			String str = StringUtils.getRandomString(10);
			buffer.put(str);
			System.out.println(this.getName() + " product " + str);
			
		}
	}
	
	class Customer extends Thread {
		
		int quality;
		
		Customer(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					consume();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++; 
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void consume() throws InterruptedException{
			
			String str = buffer.take();
			System.out.println(this.getName() + " cusume " + str);
			
		}
	}
	
	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		MutiProductorAndCustomer demo = new MutiProductorAndCustomer();
		for(int i =0 ; i< 10; i++){
			Thread t1 = demo.new Productor(100);
			t1.start();
		}
		
		for(int i =0 ; i< 5; i++){
			Thread t2 = demo.new Customer(200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);
	}

}

 字符串随机生成器StringUtils

package com.xx.pub;

import java.util.Random;

public class StringUtils {

	public static String getRandomString(int length) { //length表示生成字符串的长度
	    String base = "abcdefghijklmnopqrstuvwxyz0123456789";   
	    Random random = new Random();   
	    StringBuffer sb = new StringBuffer();   
	    for (int i = 0; i < length; i++) {   
	        int number = random.nextInt(base.length());   
	        sb.append(base.charAt(number));   
	    }   
	    return sb.toString();   
	 } 
	
}

jdk 1.5+ 采用信号量实现 

使用了3个信号量,mutex用来控制对临界缓冲区的访问,slots标识空闲的缓冲区,items标识已装入的缓冲区。如果使用ConcurrentLinkedQueue做缓冲区的话,互斥信号量mutex可以不用。

 

package com.xx.concurrent.commonUse;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;


public class ProductorAndCustomerWithSemaphore {
	//此处使用的是非线程安全LinkedList
	Queue<String> buffer = new LinkedList<String>();
        //使用ConcurrentLinkedQueue的话,互斥信号量mutex可以不用,可以提高一定的性能
	//Queue<String> buffer = new ConcurrentLinkedQueue<String>();

	static int BUFFERSIZE = 10;
	
	//线程数量
	static CountDownLatch latch = new CountDownLatch(15); 
	
	static Semaphore mutex = new Semaphore(1);
	static Semaphore slots = new Semaphore(BUFFERSIZE);
	static Semaphore items = new Semaphore(0);
	
	class Productor extends Thread {
		
		int quality;
		
		Productor(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					slots.acquire();
					mutex.acquire();
					product();
					mutex.release();
					items.release();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++;
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void product() throws InterruptedException{
			String str = StringUtils.getRandomString(10);
			buffer.offer(str);
			System.out.println(this.getName() + " product " + str);
			
		}
	}
	
	class Customer extends Thread {
		
		int quality;
		
		Customer(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					items.acquire();
					mutex.acquire();
					consume();
					mutex.release();
					slots.release();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++; 
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void consume() throws InterruptedException{
			
			String str = buffer.poll();
			System.out.println(this.getName() + " cusume " + str);
			
		}
	}
	
	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		ProductorAndCustomerWithSemaphore demo = new ProductorAndCustomerWithSemaphore();
		for(int i =0 ; i< 10; i++){
			Thread t1 = demo.new Productor(100);
			t1.start();
		}
		
		for(int i =0 ; i< 5; i++){
			Thread t2 = demo.new Customer(200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);
	}
}
 

 

性能比较:

jdk1.5-  wait() & notify()
111658991ns

jdk1.5+  ArrayBlockingQueue
98588747ns 

jdk1.5+  Semaphore 使用线程不安全的buffer  linkedList
123800982ns 

jdk1.5+  Semaphore 使用线程不安全的buffer  ConcurrentLinkedQueue
110885827ns

 

可以看出使用BlockQueue来实现生产者和消费者问题,性能最好。

 

分享到:
评论

相关推荐

    java生产者消费者demo

    在Java中,有几种常见的解决生产者消费者问题的方法: 1. **阻塞队列(BlockingQueue)**:Java并发包`java.util.concurrent`中的`BlockingQueue`是一个理想的选择。生产者可以使用`put()`方法将产品放入队列,而...

    Java 生产者消费者模式

    总之,生产者消费者模式和中介者设计模式的结合是解决并发问题的一种有效方式,它可以帮助我们构建更加灵活、可维护的系统。通过阅读你提供的`consumption`代码,我们可以深入理解这些概念在实际项目中的应用。

    java版本的生产者与消费者

    在Java编程领域,"生产者与消费者"模式是一种常见的多线程问题解决方案,它涉及到并发编程和线程协作。在这个模式中,生产者负责创建资源,而消费者则负责消费这些资源。这种模式常用于实现缓存、队列和其他数据结构...

    java并发编程2

    - **生产者-消费者模式** 使用队列作为缓冲区,一个线程生产数据,另一个线程消费数据。 - **读写锁模式** 通过分离读取和写入权限,允许多个读取线程同时进行,但写入时互斥。 - **双检锁/双重校验锁(DCL)模式...

    java多线程实现生产者和消费者

    为了解决生产者和消费者之间的同步和通信问题,Java提供了几种不同的实现方式,包括synchronized关键字、Condition接口、Lock接口以及信号量(Semaphore)和阻塞队列(BlockingQueue)。 ### 1. synchronized关键字...

    java并发编程实战源码,java并发编程实战pdf,Java

    10. **并发模式**:书中可能还会介绍生产者消费者模式、读写锁模式、双端队列模式等经典的并发设计模式,帮助开发者解决实际问题。 通过学习《Java并发编程实战》的源码,你可以更直观地了解这些概念如何在实际代码...

    多进程处理生产者消费者问题

    理解并掌握生产者消费者问题及其解决方案对于进行多进程编程至关重要,因为这类问题广泛存在于并发系统设计中,比如数据库系统、网络服务等。通过学习和实践,你可以更好地设计和实现高效、稳定的并发程序。

    生产者消费者Java—synchronized 机制

    在Java编程中,生产者消费者模型是一种典型的多线程问题,用于解决资源的共享和并发控制。这个模型中,生产者负责生成数据,而消费者负责消耗这些数据。在这个项目中,开发者利用`synchronized`关键字来实现线程同步...

    自己用Java写的简单生产者与消费者模型

    在Java编程中,生产者-消费者模型是一种经典的多线程问题解决方案,用于处理并发操作中的数据共享和资源管理。这个模型通常由四个主要组件构成:生产者、消费者、存储(或缓冲区)以及可能的市场规则。根据提供的...

    java线程安全以及生产者消费者demo

    而生产者消费者模型是一种设计模式,用于解决资源分配和消耗的问题,它通过共享缓冲区来协调生产者和消费者线程之间的通信。 线程安全主要涉及到以下几个方面: 1. **同步机制**:Java提供了多种同步机制,如`...

    java生产者消费者问题

    8. **其他同步工具**:除了`BlockingQueue`,Java并发库还提供了其他工具,如`Semaphore`信号量,可以用于控制同时访问特定资源的线程数量,这对于解决生产者消费者问题也是一种有效手段。 9. **异常处理**:在实际...

    Java线程间的通信----生产者消费者模型

    生产者消费者模型是一种经典的线程同步问题,它模拟了实际生活中的生产过程和消费过程,使得生产者线程可以将数据生产出来,而消费者线程则负责消耗这些数据,两者之间通过共享数据结构进行协同工作。 生产者消费者...

    生产者和消费者模式多线程

    在Java中,我们可以使用`java.util.concurrent`包中的工具类来实现生产者和消费者模式。主要涉及到的类有`BlockingQueue`(阻塞队列)和`Thread`(线程)。阻塞队列是一种特殊的队列,当队列为空时,尝试获取元素的...

    java中线程的生产者与消费者问题

    3. `java.util.concurrent` 包中的工具类:这个包提供了许多高级的并发工具,如 `BlockingQueue`,它是线程安全的队列,可以方便地解决生产者与消费者问题。生产者可以使用 `put()` 方法添加元素,而消费者使用 `...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    在实际的Java项目中,尤其是大型分布式系统中,阻塞队列是实现生产者消费者模式的关键组件,它可以有效地处理高并发场景,提高系统的可扩展性和响应速度。因此,掌握这些知识对于提升Java开发者的项目实战能力至关...

    生产者与消费者问题.docx

    在本篇文章中,我们将重点探讨使用Java语言实现生产者与消费者问题的几种方法。 #### 二、生产者与消费者问题描述 在一个典型的生产者与消费者模型中,存在两个角色:生产者和消费者。生产者负责生成数据并将这些...

    多线程模拟实现生产者/消费者模型

    生产者/消费者模型是多线程编程中的一个经典设计模式,它有效地利用了资源,避免了数据竞争和阻塞问题。这个模型的核心思想是将生产者和消费者分隔开,使得生产者可以专注于创建产品,而消费者则专注于消耗这些产品...

    实战Java高并发程序设计(高清版)

    3. **并发设计模式**:书里可能讨论了如何使用各种并发设计模式来解决并发问题,比如生产者消费者模型、双检锁(DCL)、读写锁(ReentrantReadWriteLock)等。 4. **并发工具类**:Java并发包(java.util....

    android 生产者消费者模式

    在Android开发中,生产者-消费者模式是一种常见的多线程设计模式,用于处理并发问题,尤其是在数据处理和异步操作中。这个模式的核心思想是通过一个共享的数据缓冲区,使得生产者线程可以生成数据并放入缓冲区,而...

Global site tag (gtag.js) - Google Analytics