`
阅读更多

阻塞队列(BlockingQueue)
对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。
例如:生产者线程向队列插入元素,消费者线程则取出它们。
在协调多个线程之间的合作时,阻塞队列是一个有用的工具。工作者线程可以周期性将中间结果存储在阻塞队列中。其他工作者的线程移出中间结果并进一步加以修改。队列会自动平衡负载。

阻塞队列(java.util.concurrent.BlockingQueue)的方法
方法            正常动作                        特殊情况下的动作
add            添加一个元素                如果队列满,则抛出IllegalStateException异常
element        返回队列的头元素            如果队列空,则抛出NoSuchElementException异常
remove        移出并返回头元素            如果队列空,则抛出NoSuchElementException异常

offer        添加一个元素并返回true        如果队列满,返回false
peek        返回队列的头元素            如果队列空,则返回null
poll        移出并返回队列的头元素        如果队列空,则返回null

put            添加一个元素                如果队列满,则阻塞
take        移出并返回头元素            如果队列空,则阻塞

阻塞队列方法分为以下3类,取决于当队列满或空时它们的响应方式
1.如果将阻塞队列当做线程管理工具来使用,将要用到put和take方法。如果队列满用put方法阻塞,如果队列空用take方法阻塞。
2.当试图想满的阻塞队列中添加或从空的阻塞队列中返回或移出元素时,add、element、remove操作将抛出异常
3.在一个多线程程序中,阻塞队列会在任何时刻空或满,因此一定要用offer、peek、poll方法作为替代。这些方法如果不能完成任务,只是给一个错误提示而不会抛出异常。
注意:peek和poll方法返回空来指示失败。因此,向这些队列中插入null值是非法的。

带有超时的offer方法和poll方法(上述方法中只有offer和poll方法存在超时参数):
offer(E e, long timeout, TimeUnit unit) : 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
poll(long timeout, TimeUnit unit) : 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
e.g.boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS)
    Object head = q.poll(100, TimeUnit.MILLISECONDS)

java.util.concurrent包提供的阻塞队列实现类:
(1)LinkedBlockingQueue : 默认情况下,LinkedBlockingQueue的容量没有上边界,但是也可以选择指定大容量。
(2)LinkedBlockingDeque : 是一个双端的版本。
(3)ArrayBlockingQueue : 在构件时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的的线程会优先得到处理。通常公平性会降低性能,只有在确实非常需要时才使用它。
(4)PriorityBlockingQueue : 是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容量上限的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError);如果队列是空的,取元素的操作会阻塞。
(5)DelayQueue : 包含实现Delayed接口的对象:getDelay方法返回对象的残留延迟,负值表示延迟已经结束。元素只有在延迟用完的情况下才能从DelayQueue移除。还必须实现compareTo方法。DelayQueue使用该方法对元素进行排序。
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

 

 

DEMO:FileEnumerationTask的中File DUMMY = new File("")作用是作为最后一个元素take放入进去,当用于读取的线程读取到DUMMY后,表示全部任务完成,终止读取,防止最后一次put一直阻塞线程,导致线程无法终止。

 

import java.io.File;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
	private final static int FILE_QUEUE_SIZE = 10;
	private final static int SEARCH_THREADS = 1;
	
	public static void main(String[] args) {
		Scanner in = new Scanner(System.in);
		System.out.println("Enter base directory (e.g. D:\\EDPWorkspace):");
		String directory = in.nextLine();
		System.out.println("Enter keyword (e.g. python):");
		String keyword = in.nextLine();
		System.out.println(directory + "," + keyword);
		
		BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
		
		FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
		new Thread(enumerator).start();
		new Thread(new SearchTask(queue, keyword)).start();
		
	}
}

 

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class FileEnumerationTask implements Runnable{
	
	private BlockingQueue<File> queue;
	private File startDirectory;
	public static File DUMMY = new File("");
	
	public FileEnumerationTask(BlockingQueue<File> queue, File startDirectory){
		this.queue = queue;
		this.startDirectory = startDirectory;
	}
	
	@Override
	public void run(){
		try{
			enumerate(startDirectory);
			queue.put(DUMMY);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	public void enumerate(File directory) throws InterruptedException{
		File[] files = directory.listFiles();
		for(File file:files){
			if(file.isDirectory()){
				enumerate(file);
			}else{
				queue.put(file);
			}
		}
	}
	
	
}

 

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;

public class SearchTask implements Runnable {
	
	private BlockingQueue<File> queue;
	private String keyword;
	
	public SearchTask(BlockingQueue<File> queue, String keyword){
		this.queue = queue;
		this.keyword = keyword;
	}
	
	@Override
	public void run() {
		try{
			boolean done = false;
			while(!done){
				File file = queue.take();
				if(file == FileEnumerationTask.DUMMY){
					queue.put(file);
					done = true;
				}else{
					search(file);
				}
			}
		}catch(IOException e){
			e.printStackTrace();
		}catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	
	public void search(File file) throws IOException{
		Scanner in = new Scanner(new FileInputStream(file));
		int lineNumber = 0;
		while(in.hasNextLine()){
			lineNumber++;
			String line = in.nextLine();
			if(line.contains(keyword)){
				System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
			}
		}
		in.close();
	}

}
分享到:
评论

相关推荐

    支持多线程和泛型的阻塞队列

    阻塞队列是一种在多线程编程中广泛使用的并发数据结构,它在计算机科学和编程领域,特别是Java和C++等面向对象语言中扮演着重要角色。标题中的“支持多线程和泛型的阻塞队列”意味着我们讨论的是一个能够同时处理多...

    并发-线程池和阻塞队列.pdf

    阻塞队列的一个重要特点是线程在队列满时加入元素会阻塞,在队列空时取出元素也会阻塞,直到有空间或元素可用。Java中的ArrayBlockingQueue和LinkedBlockingQueue都是典型的阻塞队列实现。 阻塞队列为线程间通信...

    阻塞队列阻塞队列阻塞队列

    在Java编程语言中,阻塞队列是一种线程安全的数据结构,它在多线程并发控制中发挥着重要作用。阻塞队列的核心特性是当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;同样,当队列满时,试图插入...

    java模拟阻塞队列

    阻塞队列结合了队列的数据结构与线程同步机制,使得生产者可以在队列满时被阻塞,而消费者则在队列空时被阻塞,这样可以避免无效的循环检查,提高程序的运行效率。 首先,我们需要了解什么是生产者-消费者模型。在...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...

    c++11 实现的阻塞队列

    C++11 实现的阻塞队列 C++11 中的阻塞队列是指在多线程环境下,实现生产者消费者模式的队列。阻塞队列的实现需要解决两个问题:线程安全和阻塞机制。在 C++11 中,我们可以使用 std::mutex、std::condition_...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    BlockingQueue(阻塞队列)详解

    - **定义**:阻塞队列是一种特殊的队列,除了具有队列的基本特性外,还提供了额外的阻塞行为,即当队列空时,从队列中获取元素的操作将会阻塞,等待队列变得非空;当队列满时,向队列插入元素的操作也会阻塞,等待...

    Java实现简单的阻塞队列2种方式

    在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...

    PI解决队列堵塞问题

    3. **查看堵塞队列**: - 进入SMQ2界面后,点击“Execute”按钮。 - 在显示的结果列表中,注意观察是否有状态为“SYSFAIL”的队列。这样的队列通常表示存在处理问题。 4. **分析堵塞原因**: - 双击状态为...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    同时,作为阻塞队列,当生产者尝试向满队列添加元素时,或者消费者尝试从空队列中获取元素时,线程会被阻塞,直到队列有可用空间或数据,这大大简化了多线程同步的问题。 在生产者/消费者模式中,生产者通常通过`...

    java线程聊天室(阻塞队列实现)

    【Java线程聊天室(阻塞队列实现)】 在Java编程中,多线程是构建并发应用程序的关键技术。在创建一个线程聊天室时,我们通常会涉及到多个线程之间的交互,例如用户发送消息、接收消息以及处理网络通信等。而阻塞...

    阻塞队列(Blocking Queue)是一个支持两个附加操作的队列.txt

    阻塞队列的主要特点在于它支持两个额外的条件操作:当队列为空时,尝试从队列中取元素的操作会被阻塞,直到队列中出现新的元素;同样地,当队列已满时,尝试向队列中添加元素的操作也会被阻塞,直到队列中出现可用...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    剖析Java中阻塞队列的实现原理及应用场景

    Java中的阻塞队列是一种特殊的线程安全的数据结构,它在多线程环境下用于高效地处理生产者-消费者问题。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到队列中有元素可用;同样,当队列满时...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    java阻塞队列实现原理及实例解析

    阻塞队列与普通队列的不同在于,当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列...

    高性能阻塞队列的数据结构创新.pptx

    ### 高性能阻塞队列的数据结构创新 #### 一、阻塞队列的概念与应用 **阻塞队列**是一种特殊的队列,它具备线程安全的特点,并且当队列为空时,从队列中获取元素的操作会被阻塞;同样地,当队列满时,向队列中添加...

    Java并发编程:阻塞队列

     在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。  使用非阻塞队列的时候有一个很大问题是:它不会对当前线程产生阻塞,那么在面对类似...

    快速非阻塞并发队列算法

    ### 快速非阻塞并发队列算法 #### 摘要与背景 本文提出了一种新的非阻塞并发队列算法以及一种两锁队列算法,这两种算法都允许一个入队操作和一个出队操作同时进行。这些算法简单、快速且实用,在先前的研究文献中...

Global site tag (gtag.js) - Google Analytics