`

简单的java生产者消费者代码示例

 
阅读更多

一、背景

在并发编程中生产者-消费者模式是一个典型的问题。是数据共享简单而有效的手段之一。下面是这个模式的一个简单示例

 

二、代码简介

多个数据生产者将数据存入缓冲区,一个或者多个数据消费者将数据从缓冲区取走

package com.two;

import java.util.Date;
import java.util.LinkedList;
import java.util.List;

public class EventStorage {
    
    public int maxSize;
    private List<Date> storage;
    
    public EventStorage(){
        maxSize=10;
        storage=new LinkedList<Date>();
    }

    public synchronized void set(){
       while(storage.size()>=10){
           
           try{
               wait();
           }catch(InterruptedException ex){
               ex.printStackTrace();
           }
       } 
       storage.add(new Date());
       System.out.println("set storage size :"+storage.size());
       notifyAll();
    }
    
    
    public synchronized void get(){
        
        while(storage.size()<=0){
            try{
                wait();
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
        System.out.println("get "+((LinkedList<Date>)storage).poll());
        notifyAll();
    }
}

 

    

package com.two;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{
    
    private EventStorage storage;
    
    public Consumer(EventStorage storage){
        this.storage=storage;
    }

    @Override
    public void run() {
        while(true){
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(" storage.get.. ");
            storage.get();
        }
    }

}

 

   

package com.two;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Producer implements Runnable {
    
    private EventStorage storage;
    
    public Producer(EventStorage storage){
       this.storage=storage; 
    }

    @Override
    public void run() {
        
        while(true){
            
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(" storage.set ");
            storage.set();
        }
        
    }

}

   

package com.two;

/**
 * 生产者消费者主程序
 * 
 * @author yebing.li
 * @version $Id: Main.java, v 0.1 2014年7月25日 上午11:04:42 yebing.li Exp $
 */
public class Main {

    public static void main(String[] args) {
        
        EventStorage storage=new EventStorage();
        
        Producer producer=new Producer(storage);
        Thread thread1=new Thread(producer);
        
        Consumer consumer=new Consumer(storage);
        Thread thread2=new Thread(consumer);

        thread1.start();
        thread2.start();
    }

}

 

 

三、另一种实现方式java.util.concurrent.locks.Condition

 

package com.two;

import java.util.Random;

public class FileMock {
    
    private String[] content;
    private int index;
    
    public FileMock(int size,int length){
        content=new String[size];
        for(int i=0;i<size;i++){
            StringBuilder buffer=new StringBuilder();
            for(int j=0;j<length;j++){
                int indice=new Random().nextInt(1000);
                buffer.append(indice);
            }
            content[i]=buffer.toString();
        }
        index=0;
    }

    public boolean hasMoreLines(){
        return index<content.length;
    }
    
    public String getLine(){
        if(hasMoreLines()){
            return content[index++];  
        }
        return null;
    }
}

 

   

package com.two;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
    
    private LinkedList<String> buffer;
    private int maxSize;
    private ReentrantLock lock;
    private Condition lines;
    private Condition space;
    private boolean peedingLines;
    
    public Buffer(int maxSize){
        this.maxSize=maxSize;
        buffer=new LinkedList<String>();
        lock=new ReentrantLock();
        lines=lock.newCondition();
        space=lock.newCondition();
        peedingLines=true;
    }

    public void insert(String line){
        lock.lock();
        try{
            while(buffer.size()>=maxSize){
                space.await();
            }
            buffer.offer(line);
            lines.signalAll();
            
        }catch(InterruptedException e){
           e.printStackTrace(); 
        }finally{
            lock.unlock();
        }
    }
    
    public String get(){
        String line=null;
        lock.lock();
        try{
            while(buffer.size()==0&&hasPeedingLines()){
                lines.await();
            }
            if(hasPeedingLines()){
                line=buffer.poll();
                space.signalAll();
            }
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
        return line;
    }
    
    
    public void setPeedingLines(boolean peedingLines){
        this.peedingLines=peedingLines;
    }
    
    public boolean hasPeedingLines(){
        return peedingLines||buffer.size()>0;
    }
}

 

   

package com.two;

public class Producerfile implements Runnable{

    private FileMock mock;
    private Buffer buffer;
    
    public Producerfile(FileMock mock,Buffer buffer){
        this.mock=mock;
        this.buffer=buffer;
    }
    
    @Override
    public void run() {
        buffer.setPeedingLines(true);
        while(mock.hasMoreLines()){
           String line=mock.getLine();
           System.out.println("Producerfile,content:"+line);
           buffer.insert(line);
        }
        buffer.setPeedingLines(false);
    }

}

 

    

package com.two;

public class ConsumerFile implements Runnable{

    private Buffer buffer;
    
    public ConsumerFile(Buffer buffer){
       this.buffer=buffer; 
    }
    
    @Override
    public void run() {
         
        while(buffer.hasPeedingLines()){
           String line=buffer.get();
           System.out.println(String.format("ConsumerFile content:%s", line));
        }
        
    }

}

    

package com.two;

public class FileMockMain {

    public static void main(String[] args) {
        
        FileMock mock=new FileMock(100,10);
        Buffer buffer=new Buffer(20);
        
        Producerfile producer=new Producerfile(mock,buffer);
        Thread threadProducer=new Thread(producer,"Producer");
        
        ConsumerFile consumers[]=new ConsumerFile[3];
        Thread threadConsumer[]=new Thread[3];
        for(int i=0;i<3;i++){
            consumers[i]=new ConsumerFile(buffer);
            threadConsumer[i]=new Thread(consumers[i],"consumer"+i);
        }

        threadProducer.start();
        for(int i=0;i<3;i++){
            threadConsumer[i].start();  
        }
    }

}

 

 

   代码来源于《java7并发编程手册》

分享到:
评论

相关推荐

    java生产者消费者

    Java生产者消费者模式是一种多线程设计模式,它在并发编程中被广泛使用,用于解决资源的共享问题。在这个模式中,"生产者"负责创建数据,而"消费者"则负责处理这些数据。它们通过一个共享的数据缓冲区进行通信,避免...

    java生产者消费者问题

    Java生产者消费者问题是多线程编程中的一个经典问题,它主要涉及到线程间的协作与通信。在并发编程中,生产者负责生成数据,而消费者则负责处理这些数据。为了解决生产者和消费者之间的问题,Java提供了一系列的同步...

    java实现生产者消费者

    在Java编程中,"生产者消费者"模式是一种典型的多线程问题解决模型,它通过共享资源来协调生产者和消费者之间的操作。这个模式的核心在于如何有效地管理资源的生产和消费,以避免生产过快导致资源浪费,或者消费者...

    java生产者消费者模型

    Java生产者消费者模型是多线程编程中一种经典的并发控制模型,它源于操作系统中的哲学思想,用于解决资源的共享和异步处理问题。在该模型中,"生产者"线程负责生成数据,而"消费者"线程则负责消费这些数据。两者之间...

    producer-java.rar_java 生产者 消费者_java 生产者消费者_producer.java_生产者 消费者

    在提供的压缩包文件中,`producer.java`可能是实现生产者消费者问题的Java代码示例。通常,这样的代码会包含两个主要类:一个代表生产者,另一个代表消费者。生产者类会有一个或多个方法用于生成数据并将其放入共享...

    JAVA实现线程间同步与互斥生产者消费者问题

    在`JThreadSynch`这个压缩包文件中,我们可以预见到包含的Java源代码将展示以上的一种或多种机制,通过具体的示例代码来解释和实现生产者消费者问题的线程同步和互斥。通过学习和理解这个示例,开发者可以更好地掌握...

    Java生产者消费者问题

    在Java编程领域,生产者消费者问题是多线程同步的一个经典示例,它源自并发编程中的一个常见场景。这个问题描述了两个角色:生产者和消费者,它们共享一个有限大小的缓冲区。生产者负责生成数据并将数据放入缓冲区,...

    操作系统生产者与消费者java源代码

    在 main 方法中,我们创建了一个生产者消费者对象,并将其添加到窗口中。然后,我们使用一个循环来创建多个.JTextField 组件,并将其添加到面板上。这些 JTextField 组件将用来显示缓冲池中的数据。 三、缓冲池的...

    kafka-java-demo 基于java的kafka生产消费者示例

    在"Kafka-java-demo"中,你将看到如何使用这些接口来实现一个简单的生产者和消费者示例。 【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关...

    生产者与消费者java实现源代码

    以下是对生产者-消费者问题及其Java实现的详细解析: 1. **生产者-消费者模型**: - **生产者**:负责生产数据并将其放入一个共享的数据缓冲区。 - **消费者**:从共享缓冲区中取出数据并进行处理。 - **缓冲区*...

    java生产者与消费者问题

    在解压缩后的"proandcom"文件中,可能包含了进一步的代码示例或详细的解释,你可以查看这些文件以加深对生产者消费者问题的理解。同时,为了优化性能和避免死锁,编写多线程程序时,需要遵循良好的设计原则,例如...

    JAVA死锁and生产者消费者问题

    在Java编程中,死锁和生产者消费者问题是多线程编程中的两个重要概念,它们涉及到并发执行和资源管理。理解并正确处理这些问题对于构建高效、可靠的多线程应用至关重要。 死锁是指两个或多个线程在执行过程中,因...

    生产者消费者问题java的java编写的

    在提供的代码示例中,`Storage`类扮演了缓冲区的角色,其中的`push()`方法用于添加产品(生产者的行为)而`get()`方法用于取出产品(消费者的行为)。这两个方法都被声明为`synchronized`,这意味着同一时间只有一个...

    RocketMQ生产者和消费者Java代码示例.zip

    这个压缩包文件中可能包含的就是以上所述的Java代码示例,分别展示了如何创建和使用RocketMQ的生产者和消费者。通过学习和实践这些示例,开发者可以更好地理解和应用RocketMQ在实际项目中的功能。

    java 多线程 生产者消费者模式

    以下是一段简单的生产者消费者模式的代码示例: ```java import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumerExample { public static...

    Java实现简易生产者消费者模型过程解析

    在本文中,我们将通过一个简单的示例代码介绍Java实现简易生产者消费者模型的过程。 二、Java实现生产者消费者模型 在Java中,生产者消费者模型可以通过使用同步机制和wait/notify机制来实现。在下面的示例代码中...

    Java多线程 生产者-消费者模式

    Java多线程中的生产者-消费者模式是一种典型的同步与协作模型,它用于解决在多线程环境下资源的有效利用和协调问题。在这个模式中,通常有两个角色:生产者和消费者。生产者负责创建产品,而消费者则负责消费这些...

    生产者与消费者源代码

    以下是一个简单的Java生产者消费者模型的代码示例: ```java import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumerExample { public ...

Global site tag (gtag.js) - Google Analytics