`

队列阻塞浅析

    博客分类:
  • java
阅读更多

    这几天所做的项目中涉及到了队列阻塞机制,通过研究整理如下。在这里和大家分享。

       队列以一种先进先出的方式。如果你向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

  下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。
 
  java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
 
  生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
 
  我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。
 
 在这个程序中,我们使用队列数据结构作为一种同步机制。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest
{
   public static void main(String[] args)
   {
      Scanner in = new Scanner(System.in);
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
      String directory = in.nextLine();
      System.out.print("Enter keyword (e.g. volatile): ");
      String keyword = in.nextLine();

      final int FILE_QUEUE_SIZE = 10;
      final int SEARCH_THREADS = 100;

      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
      new Thread(enumerator).start();
      for (int i = 1; i <= SEARCH_THREADS; i++)
         new Thread(new SearchTask(queue, keyword)).start();
   }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable
{
   /**
    * Constructs a FileEnumerationTask.
    * @param queue the blocking queue to which the enumerated files are added
    * @param startingDirectory the directory in which to start the enumeration
    */
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
   {
      this.queue = queue;
      this.startingDirectory = startingDirectory;
   }

   public void run()
   {
      try
      {
         enumerate(startingDirectory);
         queue.put(DUMMY);
      }
      catch (InterruptedException e)
      {
      }
   }

   /**
    * Recursively enumerates all files in a given directory and its subdirectories
    * @param directory the directory in which to start
    */
   public void enumerate(File directory) throws InterruptedException
   {
      File[] files = directory.listFiles();
      for (File file : files)
      {
         if (file.isDirectory()) enumerate(file);
         else queue.put(file);
      }
   }

   public static File DUMMY = new File("");

   private BlockingQueue<File> queue;
   private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable
{
   /**
    * Constructs a SearchTask.
    * @param queue the queue from which to take files
    * @param keyword the keyword to look for
    */
   public SearchTask(BlockingQueue<File> queue, String keyword)
   {
      this.queue = queue;
      this.keyword = keyword;
   }

   public void run()
   {
      try
      {
         boolean done = false;
         while (!done)
         {
            File file = queue.take();
            if (file == FileEnumerationTask.DUMMY)
            {
               queue.put(file);
               done = true;
            }
            else search(file);            
         }
      }
      catch (IOException e)
      {
         e.printStackTrace();
      }
      catch (InterruptedException e)
      {
      }      
   }

   /**
    * Searches a file for a given keyword and prints all matching lines.
    * @param file the file to search
    */
   public void search(File file) throws IOException
   {
      Scanner in = new Scanner(new FileInputStream(file));
      int lineNumber = 0;
      while (in.hasNextLine())
      {
         lineNumber++;
         String line = in.nextLine().trim();
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);
      }
      in.close();
   }

   private BlockingQueue<File> queue;
   private String keyword;
}
 
31
11
分享到:
评论
14 楼 yutaozxy 2012-06-17  
while (!filesQueue.getEndSignal())  { 
       File file = queue.take(); 
       if (file == FileEnumerationTask.DUMMY) 
       { 
           filesQueue.setEndSignal(true); // queue.put(file);
       } 
       else search(file);             
}

这么写会有问题吧,如果有2个线程一个刚好从queue.take()最后一个元素而另一个线程刚好停到queue.take()那么第2个线程永远不会结束!
13 楼 w445097062 2012-06-17  
不错,可惜最近的项目接触不到这些
12 楼 railway 2012-06-16  
rogantian 写道
darrendu 写道
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?


确保其它搜索线程也能拿到虚拟包,从而结束自己!


因为Queue是阻塞式,较难判断到底遍历完,DUMMY是一种巧妙的方式,用来标示搜索完毕,为了能够让其他线程也能最终结束,所以有了搜到DUMMY的线程在结束自己的时候同时queue.put(file),让别的线程也结束:
if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            }
不过我觉得还可以有种方式,就是加入一个信号量,这需要设计一个新类,记为FilesQueue,FilesQueue中包含ArrayBlockingQueue和信号量endSignal,
public class FilesQueue {
   private ArrayBlockingQueue queue;
   private volatile boolean endSignal = false;

   //get,set
   //...
}

故SearchTask#run方法中大概是这样的:
....
while (!filesQueue.getEndSignal())  { 
       File file = queue.take(); 
       if (file == FileEnumerationTask.DUMMY) 
       { 
           filesQueue.setEndSignal(true); // queue.put(file);
       } 
       else search(file);             
}
....
11 楼 来这里学java 2012-06-15  
无法输入中文路径,试了多种编码都不行,求赐教。。
10 楼 rogantian 2012-06-15  
darrendu 写道
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?


确保其它搜索线程也能拿到虚拟包,从而结束自己!
9 楼 darrendu 2012-06-15  
不错,尤其你的英文注释写的地道,searchTask中,File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 

         为什么又放入队列呢?

8 楼 pktangshao 2012-06-15  
总有一些心理变态的程序员 无论人家的文章写的多好.多认真.就是踩你没个说
7 楼 mn_1127 2012-06-15  
最近我也在学习多线程方面的知识! 写的不错……  
6 楼 季铵盐 2012-06-14  
虽然没有看,像这样的文章很不错 ;就应该有这样的专研精神!赞一个
5 楼 javawebsoa 2012-06-14  
谢谢,学习了
4 楼 aijuans 2012-06-14  
3 楼 hae 2012-06-14  
最近我也在看多线程,可是不是很清楚用途,
2 楼 那家渔村 2012-06-14  
呵呵,真巧。昨晚我也正看《Core Java》的多线程那章。阻塞队列像线程之间通信的管道。
1 楼 dingbuoyi 2012-06-14  
写的不错 学习一下

相关推荐

    Redis实现分布式队列浅析

    **Redis 分布式队列实现解析** Redis 是一个高性能、轻量级的键值存储系统,它以数据持久化和在网络中的高效传输而闻名。作为内存数据库,Redis 的设计目标是支持快速的数据读写和发布订阅功能,适用于缓存、计数、...

    ReentrantLock流程浅析

    《ReentrantLock流程浅析——深入理解Java并发编程》 ReentrantLock,即可重入锁,是Java并发包(java.util.concurrent.locks)中的一个核心组件,它提供了比synchronized更灵活的锁机制。ReentrantLock实现了Lock...

    浅析网络交换机的线速WireSpeed

    这种状态通常被称为“无阻塞”,表明交换机的内部资源足以处理所有同时到达的帧,不存在处理队列的阻塞现象。 无阻塞特性对于高带宽需求的网络环境至关重要,因为它确保了数据的高效流动,避免了潜在的拥塞问题。在...

    浅析JavaWeb项目架构之Redis分布式日志队列

    此外,消息队列还可以实现异步处理,提高系统的响应速度,例如将非实时需求的邮件或短信发送任务放入队列,避免阻塞主线程。 在众多的消息队列中间件中,为何选择Redis呢?Redis是一个内存数据库,具备高速读写能力...

    浅析Linux进程通信的几种方式及其比较.pdf

    "浅析Linux进程通信的几种方式及其比较" Linux 进程通信是操作系统中一个非常重要的概念。进程通信是指至少两个进程之间传送数据或者信号的一些技术和方法。进程是计算机系统分配资源的基本单位,每个进程都有自己...

    实验七设备管理浅析.pdf

    * 首先定义了四个数据结构:BLOCK、DCT、SDT、COCT和CHCT,分别表示阻塞队列、设备控制表、系统设备表、控制器控制表和通道控制表。 * 然后,定义了初始化函数init(),用于初始化SDT和DCT系统原有4个设备:K、M、T和...

    浅析Laravel5中队列的配置及使用

    另一种是处理耗时任务,比如发送邮件、处理大数据等,这些操作可以异步执行,避免阻塞主请求处理流程。 Laravel 5中队列的基本配置流程大致如下: 1. 配置文件配置: Laravel 5将队列配置信息定义在`config/queue...

    浅析设备驱动程序通知应用程序的几种方法

    【浅析设备驱动程序通知应用程序的几种方法】 设备驱动程序是操作系统的核心组成部分,它们作为硬件与上层软件之间的桥梁,负责处理硬件相关的低级任务,如I/O操作、硬件中断处理、DMA(Direct Memory Access)传输...

    浅析Linux内核中的Bottom Half机制_linux内核_

    工作队列(Work Queues)是最通用的Bottom Half形式,适用于长时间、可能会阻塞的任务,如I/O操作。工作队列可以在任何CPU上执行,甚至在中断上下文之外,这使得它们能够执行复杂的操作,如唤醒进程、调度任务等。 ...

    深入浅析Android消息机制

    为了简化`Handler`的使用,还可以使用`post(Runnable)`方法,它会在主线程的下一次事件循环时执行Runnable中的代码,这在需要在UI线程执行非阻塞任务时非常方便: ```java private Handler mHandler = new Handler...

    浅析Android Handler的使用误区与避免.pdf

    在Android程序中,由于主线程(UI线程)负责绘制和更新用户界面,因此耗时操作如网络请求、大文件读写等不应在主线程执行,以免阻塞UI,导致应用无响应(ANR)。`Handler`提供了一种机制,使得后台线程可以安全地...

    浅析异步调用的ppt

    4. 使用异步处理模型,如消息队列,处理非实时性需求的任务。 对于需要与其他系统进行通讯的场景,可以根据需求选择合适的技术方案: 1. 如果需要跨平台、跨语言交互,可以选择WebService,它具有良好的互操作性。 ...

    浅析计算机电子信息系统中信息传输控制技术 (2).pdf

    软件通过封装通道管理、协议解析、信息分发、优先级控制、队列管理和信息安全等操作,为业务应用软件提供统一的输入输出接口和透明的传输服务,实现与其他应用软件的解耦合。 此外,软件采用了四层架构设计,每层...

    深入浅析Node.js 事件循环、定时器和process.nextTick()

    尽管JavaScript是单线程的,但通过尽可能将操作放到系统内核执行,事件循环允许Node.js执行非阻塞I/O操作。 由于现代大多数内核都是多线程的,因此它们可以处理在后台执行的多个操作。 当其中一个操作完成时,内核会...

    activeMQ消息发送过程与原理浅析

    activeMQ作为一款流行的开源消息队列,它支持JMS(Java Message Service)标准,提供了高效、可靠的异步通信能力。在使用activeMQ时,客户端发送消息有两种主要方式:同步发送和异步发送。 1. **同步发送**: 同步...

    Android中handler使用浅析

    在Android系统中,主线程(也称为UI线程)负责处理用户交互和绘制界面,而其他工作通常在后台线程执行以避免阻塞UI。由于Android的安全机制,非UI线程不能直接修改UI元素,这就引入了Handler、Looper和Message三个...

    深入浅析Node.js 事件循环

    当服务器接收到请求时,并不会直接处理,而是先关闭连接,然后在事件队列中排队。随后,事件循环机制会监听这些事件,一旦某个事件发生,就会触发对应的回调函数。这种模型可以让服务器持续接收新的请求,并尽快地...

    浅析iOS应用开发中线程间的通信与线程安全问题

    GCD的队列(Dispatch Queue)可以用来控制任务的执行顺序,同时它的同步和异步函数可以用来防止数据竞争。 总的来说,理解并熟练掌握iOS应用中的线程间通信和线程安全对于开发高性能、稳定的应用至关重要。开发者...

    浅析Java中线程的创建和启动

    - **可运行(Runnable)**:`start()`方法已被调用,线程可能正在CPU的调度队列中等待执行。 - **运行(Running)**:线程正在执行`run()`方法。 - **阻塞(Blocked)**:线程因为某种原因(如等待锁、等待I/O等...

Global site tag (gtag.js) - Google Analytics