`
mengqingyu
  • 浏览: 333772 次
  • 性别: Icon_minigender_1
  • 来自: 天津
社区版块
存档分类
最新评论

Java并发模式

阅读更多
在公司做培训时用到,顺便在这里做个总结。
1.生产者消费者模式
        某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。
/**
 * 
 * 类功能描述:数据对象
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2 上午11:01:53
 */
public class MyData {
	
	private final long intData;
	
	public MyData(long d){
		intData = d;
	}
	
	@Override
	public String toString(){
		return " MyData:"+intData;
	}
}

import java.util.concurrent.BlockingQueue;

public abstract class AbstractPC implements Runnable {
	
	protected BlockingQueue<MyData> queue;
	
	protected volatile boolean isRunning = true;
	
	public void stop() {
		isRunning = false;
	}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 
 * 类功能描述:生产者
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2 上午11:02:13
 */
public class Producer extends AbstractPC{
	
	private static AtomicInteger count = new AtomicInteger(0);
	
	private static final int SLEEP_TIME = 3000;

	public Producer(BlockingQueue<MyData> queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("Producer:"+Thread.currentThread().getName()+" start");
		try {
			while (isRunning) {
				Thread.sleep(SLEEP_TIME);
				MyData data = new MyData(count.incrementAndGet());
				queue.put(data);
				System.out.println(Thread.currentThread().getName()+" produce:" + data);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

import java.util.concurrent.BlockingQueue;

/**
 * 
 * 类功能描述:消费者
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2 上午11:02:22
 */
public class Consumer extends AbstractPC{
	
	private static final int SLEEP_TIME = 3000;
	
	public Consumer(BlockingQueue<MyData> queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("Consumer:"+Thread.currentThread().getName()+" start");
		try {
			while(isRunning){
				Thread.sleep(SLEEP_TIME);
				MyData data = queue.take();
				if (null != data) {
					System.out.println(Thread.currentThread().getName()+" consume:"+data);
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 
 * 类功能描述:完全利用并发包的特性,最优的实现方式。
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-3-28 下午01:10:51
 */
public class Main {
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<MyData> queue = new LinkedBlockingQueue<MyData>(10);
		Producer producer = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(producer);
		service.execute(producer);
		service.execute(producer);
		service.execute(consumer);
		Thread.sleep(10000);
		producer.stop();
		System.out.println("producer stop!");
		Thread.sleep(10000);
		System.out.println("consumer stop!");
		consumer.stop();
		service.shutdown();
	}
}

//下面是链式阻塞队列的内部简单实现
public interface BlockingQueue<T> {
	
	/**
	 * 
	 * @function:插入元素
	 * @param e
	 * @throws InterruptedException
	 * @author: mengqingyu    2013-4-2 上午10:39:19
	 */
	void put(T e) throws InterruptedException;
	
	/**
	 * 
	 * @function:取出元素
	 * @return
	 * @throws InterruptedException
	 * @author: mengqingyu    2013-4-2 上午10:39:16
	 */
	T take() throws InterruptedException;
}

/**
 * 
 * 类功能描述:阻塞队列的内部实现
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param <T> $
 * Create:  2013-4-2 上午11:04:29
 */
public class LinkedBlockingQueue<T> implements BlockingQueue<T>{

	//链表容量
    private final int capacity;

    //计数器
    private final AtomicInteger count = new AtomicInteger(0);

    //队列头引用
    private transient Node<T> head;

    //队列尾引用
    private transient Node<T> last;
    
    //锁
	private Lock lock = new ReentrantLock();  
	
	//生产者条件锁
	private Condition condition_producer = lock.newCondition();  
	
	//消费者条件锁
    private Condition condition_consumer = lock.newCondition(); 
	
    //链表内部类
    static class Node<T> {
        volatile T item; //数据区
        Node<T> next;	//后继结点
        Node(T x) { item = x; }
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<T>(null);
    }
    
	@Override
	public void put(T t) throws InterruptedException {
		if (t == null) throw new NullPointerException();
        lock.lock();
        try { 
			while (count.get() == capacity)	//当容量满时候等待,用循环的目的是每次被唤醒都需要重新检查是否满足条件。
				condition_producer.await();  
			insert(t);//插入元素
        	System.out.println(Thread.currentThread().getName()+" push:"+(capacity-1)+t);
            count.getAndIncrement();//计数器加1
            condition_consumer.signalAll();
		} catch (InterruptedException e) {
			condition_consumer.signalAll(); 
			e.printStackTrace();
		}
		finally {
			lock.unlock();
		}
	}

	/**
	 * 
	 * @function:插入元素
	 * @param x
	 * @author: mengqingyu    2013-4-2 上午10:36:52
	 */
    private void insert(T x) {
        last = last.next = new Node<T>(x);
    }
    
	@Override
	public T take() throws InterruptedException {
		T x = null;
		lock.lock();
		try{
			while (count.get() == 0)
				condition_consumer.await();
			x = extract();
			System.out.println(Thread.currentThread().getName()+" pop:"+capacity+x);
			count.getAndDecrement();
			condition_producer.signalAll(); 
		} catch (InterruptedException e) {
			condition_producer.signalAll(); 
			e.printStackTrace();
		}
		finally {
			lock.unlock();
		}
		return x;
	}
	
	/**
	 * 
	 * @function:取出元素
	 * @return
	 * @author: mengqingyu    2013-4-2 上午10:36:31
	 */
    private T extract() {
        Node<T> first = head.next;
        head = first;
        T x = first.item;
        first.item = null;
        return x;
    }
}

2.future模式
        从java 5开始,Java提供了Callable接口,该接口是Runnable接口的增强版,Callable接口提供了一个call()方法,可以作为线程执行体,但call()方法比run()方法的功能更强大。
call()方法可以有返回值,call()方法可以声明抛出异常。

import java.util.concurrent.Callable;

/**
 * 
 * 类功能描述:实现Callable接口重写call方法,支持多线程下抛出异常,主线程阻塞等待子线程的返回值。
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-4-2 上午11:06:47
 */
public class DataService implements Callable<String> {
	
    private String param;
    
    private static final int SLEEP_TIME = 1000;
    
    public DataService(String param){
    	this.param = param;
    }
    
	@Override
	public String call() throws InterruptedException{
    	StringBuffer sb=new StringBuffer();
        for (int i = 0; i < 5; i++) {
        	sb.append(param);
        	System.out.println("Chlid Thread wait 1 second...");
			Thread.sleep(SLEEP_TIME);
        }
        return sb.toString();
	}
}

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 
 * 类功能描述:jdk实现,可以直接使用
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp  $
 * Create:  2013-3-28 下午01:11:43
 */
public class Main {
    public static void main(String[] args) {
        FutureTask<String> future = new FutureTask<String>(new DataService("Hello "));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        try {
            executor.execute(future);
        	System.out.println("Main Thread wait 2 second...");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        	e.printStackTrace();
        }
        System.out.println("future.get() before");
        try {
			System.out.println("result:" + future.get());//线程阻塞,直到取到返回值
		} catch (Exception e) {
			e.printStackTrace();
		} 
		executor.shutdown();
    }
}

//下面是future的内部实现
public interface Callable<T> {
	T call() throws Exception;;
}

public interface Future<T> {
    T get()throws Exception;
}

/**
 * 
 * 类功能描述:future内部实现,包含获取结果集和异常信息的方法
 *
 * @author <a href="mailto:qingyu.meng21@gmail.com">mengqingyu </a>
 * @version $Id: codetemplates.xml,v 1.1 2009/03/06 01:13:01 mengqingyu Exp @param <T> $
 * Create:  2013-4-2 上午11:08:52
 */
public class FutureTask<T> implements Future<T>,Runnable {
	
	private T data;
    
    private Exception exception;
    
    private Callable<T> callable;
    
    private boolean isReady = false;
    
    public FutureTask(Callable<T> callable) {
    	this.callable =  callable;
	}
    
    public synchronized T get() throws Exception {
        while (!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
            	e.printStackTrace();
            }
        }
        if (exception != null)
        	 throw exception;
        return data;
    }

	@Override
	public synchronized void run() {
        if (isReady) {                        
            return;     
        }
		try {
			data = callable.call();
		} catch (Exception e) {
			exception = e;
		}
		isReady = true;
		notifyAll();
	}
}

分享到:
评论

相关推荐

    java并发编程实战源码,java并发编程实战pdf,Java

    10. **并发模式**:书中可能还会介绍生产者消费者模式、读写锁模式、双端队列模式等经典的并发设计模式,帮助开发者解决实际问题。 通过学习《Java并发编程实战》的源码,你可以更直观地了解这些概念如何在实际代码...

    Java并发编程设计原则和模式

    本资料“Java并发编程设计原则和模式”深入探讨了如何在Java环境中有效地进行并发处理,以充分利用系统资源并避免潜在的并发问题。 一、并发编程基础 并发是指两个或多个操作在同一时间段内执行,但并不意味着这些...

    Java 并发编程实战.pdf

    《Java并发编程实战》这本书是关于Java语言中并发编程技术的经典著作。它详细介绍了如何在Java环境中有效地实现多线程程序和并发控制机制。在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程...

    java并发编程2

    以上知识点覆盖了Java并发编程的主要方面,包括线程管理、同步机制、并发工具、设计模式、并发集合以及并发编程的最佳实践等,是理解和掌握Java并发编程的关键。在实际开发中,理解和熟练运用这些知识可以编写出高效...

    通过多线程编程在 Java 中发现并发模式和特性 线程、锁、原子等等 .zip

    通过多线程编程在 Java 中发现并发模式和特性。线程、锁、原子等等。Java 并发模式和特性通过多线程编程在 Java 中发现的并发模式和特性。特征线程和可运行对象锁固有的显式可重入读写同步器闩锁信号量障碍同步集合...

    java 并发编程设计原则与模式第二版

    java 并发变成设计原则与模式第二版 PDF版本,下载即看

    Java并发编程设计原则与模式.pdf

    《Java并发编程设计原则与模式》是Java并发编程领域的一部经典著作,作者Doug Lea在并发编程领域有着深厚的造诣。这本书详细介绍了如何在Java环境中有效地设计和实现并发程序,涵盖了众多关键概念、设计原则和实用...

    Java并发编程:设计原则与模式(第二版)-3

    《Java并发编程:设计原则与模式(第二版)》是一本深入探讨Java多线程编程技术的权威著作。这本书详细阐述了在Java平台中进行高效并发处理的关键概念、设计原则和实用模式。以下是对该书内容的一些核心知识点的概述...

    (PDF带目录)《Java 并发编程实战》,java并发实战,并发

    《Java 并发编程实战》是一本专注于Java并发编程的权威指南,对于任何希望深入了解Java多线程和并发控制机制的开发者来说,都是不可或缺的参考资料。这本书深入浅出地介绍了如何在Java环境中有效地管理和控制并发...

    Java并发编程实践高清pdf及源码

    《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...

    java并发编程实战中文加英文版加源码

    在《JAVA并发编程实践》中,这 些便利工具的创造者不仅解释了它们究竟如何工作、如何使用,还阐释了创造它们的原因,及其背后的设计模式。JAVA并发编程实践英文版很不错,运用了n多知识。 本书作者都是Java ...

    java并发编程实战(英文版)

    对于那些需要解决复杂并发问题的高级开发者,《Java并发编程实战》提供了许多实用的技术和设计模式,例如: - **不可变对象**:介绍如何设计不可变对象以提高代码的安全性和效率。 - **锁分离**:讲解锁分离技术,...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    java并发编程实践高清中文版+源码

    《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序...在《JAVA并发编程实践》中,这些便利工具的创造者不仅解释了它们究竟如何工作、如何使用,还阐释了创造它们的原因,及其背后的设计模式。

    java 并发编程的艺术pdf清晰完整版 源码

    《Java并发编程的艺术》这本书是Java开发者深入理解并发编程的重要参考书籍。这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者解决在实际工作中遇到的并发问题,提高程序的性能和可伸缩性。 ...

    java并发编程实践pdf笔记

    Java并发编程实践是Java开发中不可或缺的一个领域,它涉及到如何高效、正确地处理多线程环境中的任务。这本书的读书笔记涵盖了多个关键知识点,旨在帮助读者深入理解Java并发编程的核心概念。 1. **线程和进程的...

    Java并发编程的设计原则与模式

    本文将深入探讨Java并发编程的设计原则与模式,旨在帮助开发者理解并有效地应用这些原则和模式。 一、并发编程的基础概念 并发是指两个或多个操作在同一时间间隔内执行,而不是严格意义上的同一时刻。在Java中,...

Global site tag (gtag.js) - Google Analytics