`

阻塞队列

    博客分类:
  • java
阅读更多

    队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将 导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修 改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线 程集赶上来。
    下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。 java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、 ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是 ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了, 等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
    生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
    我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在 工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到 这个虚拟对象时,就将其放回并终止。
    注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。
    FileEnumerationTask线程不断的向队列中加入文件。

import java.io.File;
import java.util.concurrent.BlockingQueue;

public class FileEnumerationTask implements Runnable {
	private BlockingQueue<File> queue;
	private File startingDirectory;
	public static File DUMMY = new File("");
	
	public FileEnumerationTask(BlockingQueue<File> queue,File startingDirectory){
		this.queue=queue;
		this.startingDirectory=startingDirectory;
	}
	
	public void run() {
		try{
			enumerate(startingDirectory);
			queue.put(DUMMY);
		}catch(InterruptedException e){
		}
	}

    /*
     * 将目录下面的文件(非目录)加到阻塞队列中。这个函数是一个递归函数
     * @directory起始目录
     */
	private void enumerate(File directory) throws InterruptedException {
		File[] files=directory.listFiles();
		for(File f:files)
			if(f.isDirectory())
				enumerate(f);
			else
				queue.put(f);
	}
}
 
SearchTask.java线程从队列中取文件,然后进行判断是否含有关键字。

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;

public class SearchTask implements Runnable{
	private BlockingQueue<File> queue;
	private String keyword;
	
	public SearchTask(BlockingQueue<File> queue,String keyword){
		this.queue=queue;
		this.keyword=keyword;
	}
	
	public void run() {
		  try
	      {
	         boolean done = false;
	         while (!done)
	         {
	            File file = queue.take();
	            if (file == FileEnumerationTask.DUMMY) { queue.put(file); done = true; }
	            else search(file);
	         }
	      }
	      catch (IOException e) { e.printStackTrace(); }
	      catch (InterruptedException e) {}
	}
	
	/*
	 * 判断文件file中是否有有关健字keyword。
	 * @param file 搜索的文件
	 * 如果文件为目录,抛出FileNotFIndException
	 */
	public void search(File file) throws IOException
	   {    
	      Scanner in = new Scanner(new FileInputStream(file));
	      int lineNumber = 0;
	      while (in.hasNextLine())
	      {
	         lineNumber++;        
	         String line = in.nextLine();
	         if (line.contains(keyword))
	            System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line);
	      }
	      in.close();
	   }
}
 
      BlockingQueueTest.java是主程序,它创建了一个向队列中不断加入文件的线程和大量的从队列中取文件的线程。

import java.io.File;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest
{
   public static void main(String[] args)
   {
      Scanner in = new Scanner(System.in);
      System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");
      String directory = in.nextLine();
      System.out.print("Enter keyword (e.g. volatile): ");
      String keyword = in.nextLine();    

      final int FILE_QUEUE_SIZE = 10;
      final int SEARCH_THREADS = 100;
     
      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
      new Thread(enumerator).start();
      for (int i = 1; i <= SEARCH_THREADS; i++)
         new Thread(new SearchTask(queue, keyword)).start();
   }
}
     
    主线程创建了一个ArrayBlockingQueue,大小为10,启动FileEnumerationTask线程后,不断有文件加入到queue中,如果queue满了,这个线程就会被阻塞,直到没有满,又像其中添加文件。
主程序中又启动了100个SearchTask线程,这些线程向queue中取文件,如果queue中文件都被取了,queue的size为0,其它没有取到文件的线程就会阻塞。而FileEnumerationTask线程会继续向queue中加文件。
如此直到加入了DUMMY。
分享到:
评论

相关推荐

    支持多线程和泛型的阻塞队列

    阻塞队列是一种在多线程编程中广泛使用的并发数据结构,它在计算机科学和编程领域,特别是Java和C++等面向对象语言中扮演着重要角色。标题中的“支持多线程和泛型的阻塞队列”意味着我们讨论的是一个能够同时处理多...

    java模拟阻塞队列

    Java中的阻塞队列是一种基于同步原语的高级数据结构,它在多线程编程中扮演着重要角色,尤其在并发处理和优化系统资源利用率方面。阻塞队列结合了队列的数据结构与线程同步机制,使得生产者可以在队列满时被阻塞,而...

    阻塞队列阻塞队列阻塞队列

    在Java编程语言中,阻塞队列是一种线程安全的数据结构,它在多线程并发控制中发挥着重要作用。阻塞队列的核心特性是当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;同样,当队列满时,试图插入...

    并发-线程池和阻塞队列.pdf

    线程池和阻塞队列是实现并发的关键组件之一。本文将详细介绍线程池原理、使用场景及注意事项,以及阻塞队列的相关知识。 首先,线程池是一种基于池化思想管理线程的技术,它可以重用一组线程执行多个任务。线程池的...

    c++11 实现的阻塞队列

    C++11 实现的阻塞队列 C++11 中的阻塞队列是指在多线程环境下,实现生产者消费者模式的队列。阻塞队列的实现需要解决两个问题:线程安全和阻塞机制。在 C++11 中,我们可以使用 std::mutex、std::condition_...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    BlockingQueue(阻塞队列)详解

    ### BlockingQueue(阻塞队列)详解 #### 一、前言 随着现代软件系统对并发性能需求的不断提高,多线程编程技术逐渐成为开发人员不可或缺的技能之一。在Java平台中,`java.util.concurrent`包提供了丰富的工具来...

    Java实现简单的阻塞队列2种方式

    在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    阻塞队列(Blocking Queue)是一个支持两个附加操作的队列.txt

    阻塞队列是Java中并发编程的一个重要组件,它属于Java.util.concurrent包中的一部分。阻塞队列的主要特点在于它支持两个额外的条件操作:当队列为空时,尝试从队列中取元素的操作会被阻塞,直到队列中出现新的元素;...

    java线程聊天室(阻塞队列实现)

    【Java线程聊天室(阻塞队列实现)】 在Java编程中,多线程是构建并发应用程序的关键技术。在创建一个线程聊天室时,我们通常会涉及到多个线程之间的交互,例如用户发送消息、接收消息以及处理网络通信等。而阻塞...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    高性能阻塞队列的数据结构创新.pptx

    ### 高性能阻塞队列的数据结构创新 #### 一、阻塞队列的概念与应用 **阻塞队列**是一种特殊的队列,它具备线程安全的特点,并且当队列为空时,从队列中获取元素的操作会被阻塞;同样地,当队列满时,向队列中添加...

    Java并发编程(21)并发新特性-阻塞队列和阻塞栈(含代

    在Java并发编程中,阻塞队列和阻塞栈是两个重要的并发数据结构,它们在多线程环境下的高效通信和资源管理中扮演着至关重要的角色。这些数据结构源自Java的并发包`java.util.concurrent`,是实现并发设计模式如生产者...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    在这个场景中,阻塞队列(BlockingQueue)是实现这种模式的理想工具,因为它提供了线程安全的数据同步机制。 阻塞队列在Java中的实现主要由`java.util.concurrent`包下的几个类提供,如`ArrayBlockingQueue`、`...

    java阻塞队列实现原理及实例解析.docx

    Java 阻塞队列(Blocking Queue)是一种特殊类型的并发数据结构,它在多线程编程中扮演着重要的角色。阻塞队列的核心特性在于其在队列为空或满时能够自动阻塞线程,从而实现线程间的同步和通信。这种机制使得生产者...

    剖析Java中阻塞队列的实现原理及应用场景

    Java中的阻塞队列是一种特殊的线程安全的数据结构,它在多线程环境下用于高效地处理生产者-消费者问题。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到队列中有元素可用;同样,当队列满时...

    java学习(基于Java阻塞队列的搜索实例).pdf

    在这篇文档中,我们将详细探讨Java中基于阻塞队列的搜索实例。Java的并发API为我们提供了强大的线程间通信工具,而阻塞队列是这些工具中的核心组件之一。阻塞队列是一个支持两个附加操作的队列:在队列为空时,获取...

Global site tag (gtag.js) - Google Analytics