`
coolxing
  • 浏览: 874841 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
9a45b66b-c585-3a35-8680-2e466b75e3f8
Java Concurre...
浏览量:97541
社区版块
存档分类
最新评论

使用BlockingQueue构建生产者消费者模式--JCIP5.3读书笔记

阅读更多

[本文是我对Java Concurrency In Practice 5.3的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ] 

生产者消费者模式

以缓冲区作为生产者和消费者之间沟通的桥梁: 生产者只负责生产, 将生产出来的数据存入缓冲区. 而消费者只负责消费, 不断的从缓冲区中取出数据进行处理.

生产者消费者模式是非常常用的, 因为应用该模式有效的解耦了生产者和消费者. 生产者不需要知道有没有其他生产者在生产, 也不需要知道有多少个消费者在消费, 而消费者不需要知道数据来自哪个生产者. 另外该模式支持并发操作, 如果生产者直接调用消费者的方法, 生产者就必须等到消费者处理完毕才能返回, 万一消费者处理的速度很慢, 就会白白浪费生产者的时间. 而使用模式的话, 生产者只需要将数据存入缓冲区就可以了.

缓冲区是生产者消费者模式的核心. 生产者将数据存入缓冲区的一端, 消费者则负责从缓冲区的另一端取出数据进行处理, 队列非常适用这样的场景. 由于生产者消费者大多处于不同的线程, 队列就必须是线程安全的--java的BlockingQueue可以满足要求.

 

BlockingQueue

BlockingQueue的put方法用于将数据放入队列, 如果队列已满, put方法所在的线程将阻塞, 直到队列不满. take方法用于从队列中取出数据, 如果队列为空, take方法所在的线程将阻塞, 直到队列不为空.

public void put(E e) throws InterruptedException {
	if (e == null)
		throw new NullPointerException();
	final E[] items = this.items;
	final ReentrantLock lock = this.lock;
	// 锁定
	lock.lockInterruptibly();
	try {
		try {
			// 如果队列已满, 就阻塞线程
			while (count == items.length)
				notFull.await();
		} catch (InterruptedException ie) {
			notFull.signal(); // propagate to non-interrupted thread
			throw ie;
		}
		insert(e);
	} finally {
		lock.unlock();
	}
}

private void insert(E x) {
	items[putIndex] = x;
	putIndex = inc(putIndex);
	++count;
	// 插入数据后唤醒在非空条件上阻塞的线程
	notEmpty.signal();
}

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	// 锁定
	lock.lockInterruptibly();
	try {
		try {
			// 如果队列为空, 就阻塞线程
			while (count == 0)
				notEmpty.await();
		} catch (InterruptedException ie) {
			notEmpty.signal(); // propagate to non-interrupted thread
			throw ie;
		}
		E x = extract();
		return x;
	} finally {
		lock.unlock();
	}
}

private E extract() {
	final E[] items = this.items;
	E x = items[takeIndex];
	items[takeIndex] = null;
	takeIndex = inc(takeIndex);
	--count;
	// 取出数据后唤醒在notFull条件上阻塞的线程
	notFull.signal();
	return x;
}

offer(E e, long timeout, TimeUnit unit)用于将数据放入队列, 如果队列已满, 将最多等待指定的时间, offer返回true时说明数据成功入队, 否则说明没有成功. poll(long timeout, TimeUnit unit)是与offer配对的方法, 用于从队列中取出数据, 如果队列为空, 最多等待指定的时间, poll返回值为null时说明没有取到数据.

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	if (e == null)
		throw new NullPointerException();
	// 获得阻塞的最大时间
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			// 如果队列没有满, 则插入数据并返回true
			if (count != items.length) {
				insert(e);
				return true;
			}
			// 如果剩余的等待时间小于等于0说明等待时间已超过最大值, 此时返回false, 表明插入没有成功
			if (nanos <= 0)
				return false;
			try {
				// awaitNanos方法用于阻塞队列, 并返回剩余的时间值
				nanos = notFull.awaitNanos(nanos);
			} catch (InterruptedException ie) {
				notFull.signal(); // propagate to non-interrupted thread
				throw ie;
			}
		}
	} finally {
		lock.unlock();
	}
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		for (;;) {
			// 如果队列不为空, 就取出数据然后返回
			if (count != 0) {
				E x = extract();
				return x;
			}
			// 如果阻塞时间已过最大时间, 就返回null, 说明没有取到数据
			if (nanos <= 0)
				return null;
			try {
				// awaitNanos方法用于阻塞队列, 并返回剩余的时间值
				nanos = notEmpty.awaitNanos(nanos);
			} catch (InterruptedException ie) {
				notEmpty.signal(); // propagate to non-interrupted thread
				throw ie;
			}
		}
	} finally {
		lock.unlock();
	}
}

BlockingQueue的容量可以是无限的, 也可以是有限的. 无限容量的BlockingQueue永远也不会发生队列已满的事件.

BlockingQueue的常见实现类有ArrayBlockingQueue, LinkedBlockingQueue, 以及PriorityBlockingQueue等. ArrayBlockingQueue底层使用循环数组实现, LinkedBlockingQueue底层使用链表实现. PriorityBlockingQueue则是一个可排序的阻塞队列, 可以按照元素的自然顺序(元素需要实现Comparable接口)或者指定的Comparator排序.

 

生产者消费者模式的例子

该例子用于模拟对文件进行索引, 生产者FileCrawler类将待索引的文件放入队列, 消费者Indexer则负责从队列中取出文件进行索引标记. 

/**
 * 生产者, 生产待索引的文件
 */
public class FileCrawler implements Runnable {
	private final BlockingQueue<File> fileQueue;
	private final FileFilter fileFilter;
	private final File root;

	public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
		this.fileQueue = fileQueue;
		this.fileFilter = fileFilter;
		this.root = root;
	}

	public void run() {
		try {
			crawl(root);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

	private void crawl(File root) throws InterruptedException {
		File[] entries = root.listFiles(fileFilter);
		if (entries != null) {
			for (File entry : entries)
				if (entry.isDirectory()) {
					// 递归调用
					crawl(entry);
				} else if (!alreadyIndexed(entry)) {
					// 向队列中添加文件, 如果队列是BOUND的, 且队列已满, 则put方法将阻塞, 直到队列不满
					System.out.println(entry + ": 等待进行索引 by " + Thread.currentThread().getName());
					fileQueue.put(entry);
				}
		}
	}

	private boolean alreadyIndexed(File entry) {
		return false;
	}
}

/**
 * 消费者, 从队列中取出文件进行索引标记
 */
public class Indexer implements Runnable { 
    private final BlockingQueue<File> queue; 

    public Indexer(BlockingQueue<File> queue) { 
        this.queue = queue; 
    } 

    public void run() { 
        try { 
            while (true) {
            	// 从队列中取出file标记索引. 如果队列为空, take方法将阻塞, 直到队列重新不为空.
            	File file = queue.take();
                indexFile(file); 
                System.out.println(file + ": 已进行索引 by " + Thread.currentThread().getName());
            }
        } catch (InterruptedException e) { 
            Thread.currentThread().interrupt(); 
        } 
    }

	private void indexFile(File file) {
	}
}

/**
 * 测试生产者消费者模式
 */
public class FileIndexer {
	public static void startIndexing(File[] roots) {
		// 创建一个BOUNDED队列, 队列中最大的元素为10个
		BlockingQueue<File> queue = new LinkedBlockingQueue<File>(10); 
	    FileFilter filter = new FileFilter() { 
	        public boolean accept(File file) { return true; } 
	    }; 
	 
	    for (int i = 0; i < roots.length; i++) {
	    	File root = roots[i];
	    	// 启动生产者线程
	        new Thread(new FileCrawler(queue, filter, root), "producer " + i).start(); 
	    }
	 
	    // 启动3个消费者线程
	    for (int i = 0; i < 3; i++) {
	        new Thread(new Indexer(queue), "consumer " + i).start();
	    }
	}
	
	public static void main(String[] args) {
		File dir = new File("E:\\TDDOWNLOAD\\mina doc");
		startIndexing(dir.listFiles(new FileFilter() {
			@Override
			public boolean accept(File pathname) {
				if (pathname.isDirectory()) {
					return true;
				}
				return false;
			}
		}));
	}
}

 

1
0
分享到:
评论
1 楼 yuanliangding 2016-08-02  
awaitNanos。
学到了一个技巧。
可以在方法中操作直到有返回值了再返回。
类似异步方法调用写得像是同步的方法一样。

^_^

相关推荐

    生产者和消费者模式多线程

    生产者和消费者模式是多线程编程中一个经典的设计模式,它主要解决的是在多线程环境下资源的有效利用和同步问题。在这个模式中,生产者负责生成数据,而消费者负责消费这些数据。为了保证生产与消费的平衡以及避免...

    Java 生产者消费者模式

    总之,生产者消费者模式和中介者设计模式的结合是解决并发问题的一种有效方式,它可以帮助我们构建更加灵活、可维护的系统。通过阅读你提供的`consumption`代码,我们可以深入理解这些概念在实际项目中的应用。

    架构设计 -- 生产者/消费者模式

    在Java中,可以使用`java.util.concurrent`包中的`BlockingQueue`来实现这个模式,它提供了线程安全的队列操作,可以很好地控制生产者和消费者的并发行为。 在寄信的例子中,生产者对应写信的人,他负责生成信件...

    生产者消费者问题---源码.rar

    `BlockingQueue`的使用简化了解决生产者消费者问题的过程,避免了手动管理锁和条件变量。此外,`BlockingQueue`还提供了其他方法,如`offer()`(非阻塞添加)和`poll()`(非阻塞获取),可以根据具体需求选择合适的...

    Java多线程 BlockingQueue实现生产者消费者模型详解

    在Java中,我们可以使用BlockingQueue来实现生产者消费者模型,BlockingQueue是Queue的子类,它提供了一个线程安全的队列,可以用于生产者和消费者之间的数据传输。 BlockingQueue的实现类有多种,常见的有...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    java并发学习之BlockingQueue实现生产者消费者详解

    在生产者消费者模式中,BlockingQueue可以作为共享的队列,生产者不断地将元素放入队列中,而消费者则不断地从队列中取出元素。如果队列已满,生产者将被阻塞直到队列非满。如果队列为空,消费者将被阻塞直到队列...

    Java多线程 生产者-消费者模式

    总之,Java中的生产者-消费者模式是多线程编程中解决数据共享和同步问题的有效手段,通过合理利用`BlockingQueue`等并发工具类,我们可以构建高效、稳定的多线程应用。在开发过程中,了解和掌握这种模式有助于提高...

    界面话模拟生产者消费者模式java

    在"界面话模拟生产者消费者模式java"的项目中,开发者创建了一个用户界面,允许用户自定义生产者生产数据的数量、消费者消费数据的数量,以及仓库(缓冲区)的最大容量、最小剩余量,还有生产者每次生产的数据量和...

    android 生产者消费者模式

    在实际使用中,生产者-消费者模式可以通过`BlockingQueue`来简化实现。`BlockingQueue`是一个线程安全的数据结构,它提供了在满时阻塞生产者、空时阻塞消费者的能力,这样可以避免无谓的资源浪费,同时简化同步代码...

    java多线程实现生产者和消费者

    在并发编程中,"生产者-消费者"模式是一种经典的解决问题的范式,用于协调两个或更多线程间的协作,其中一部分线程(生产者)生成数据,另一部分线程(消费者)消费这些数据。 生产者-消费者模型的核心在于共享资源...

    生产者消费者模式在java中的应用

    4. `mine`目录下的源代码可能包含了作者自定义的生产者和消费者类,展示了如何利用`BlockingQueue`实现具体业务场景下的生产者消费者模式。 5. `create`目录下的源代码可能提供了另一种实现方式,可能包括不同的...

    多线程_生产者与消费者模式示例

    在使用生产者消费者模式时,需要注意以下几点: - **线程安全**:确保所有的操作都是线程安全的,避免数据竞争和死锁。 - **公平性**:考虑是否需要保证生产者和消费者公平地访问队列,避免某个角色长时间等待。 - ...

    java 多线程 生产者消费者模式

    生产者消费者模式还有其他变体,比如使用信号量(Semaphore)或条件变量(Condition)来控制生产者和消费者的数量或行为。例如,你可以限制同时运行的消费者数量,或者在特定条件下唤醒某个线程。 此外,`java.util...

    生产者-消费者.zip

    生产者-消费者模式的核心思想是共享资源(通常是一个缓冲区)的分离,生产者负责生成数据并放入缓冲区,而消费者则从缓冲区取出数据进行消费。这种模式利用了线程间的协作,实现了数据的生产和消费的解耦,并避免了...

    java实现生产者消费者

    在Java编程中,"生产者消费者"模式是一种典型的多线程问题解决模型,它通过共享资源来协调生产者和消费者之间的操作。这个模式的核心在于如何有效地管理资源的生产和消费,以避免生产过快导致资源浪费,或者消费者...

    生产者-消费者多线程处理

    在Java中,可以使用`BlockingQueue`接口来实现生产者-消费者模式,它已经内置了线程安全的队列操作。生产者可以使用`offer()`方法添加元素,消费者则用`take()`方法取出元素,这两个方法会自动处理等待和唤醒操作。 ...

    生产者消费者演示程序

    生产者消费者问题是一个经典的多线程同步问题,来源于操作系统理论,用于模拟两个或多个相互依赖的进程或线程之间的协作。在这个场景下,“生产者”是生成数据的实体,而“消费者”则负责处理这些数据。这个问题的...

Global site tag (gtag.js) - Google Analytics