这几天所做的项目中涉及到了队列阻塞机制,通过研究整理如下。在这里和大家分享。
队列以一种先进先出的方式。如果你向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。
下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。
java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。
在这个程序中,我们使用队列数据结构作为一种同步机制。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest
{
public static void main(String[] args)
{
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, keyword)).start();
}
}
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable
{
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
{
this.queue = queue;
this.startingDirectory = startingDirectory;
}
public void run()
{
try
{
enumerate(startingDirectory);
queue.put(DUMMY);
}
catch (InterruptedException e)
{
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException
{
File[] files = directory.listFiles();
for (File file : files)
{
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue<File> queue;
private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable
{
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for
*/
public SearchTask(BlockingQueue<File> queue, String keyword)
{
this.queue = queue;
this.keyword = keyword;
}
public void run()
{
try
{
boolean done = false;
while (!done)
{
File file = queue.take();
if (file == FileEnumerationTask.DUMMY)
{
queue.put(file);
done = true;
}
else search(file);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException
{
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine())
{
lineNumber++;
String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
private BlockingQueue<File> queue;
private String keyword;
}
分享到:
相关推荐
**Redis 分布式队列实现解析** Redis 是一个高性能、轻量级的键值存储系统,它以数据持久化和在网络中的高效传输而闻名。作为内存数据库,Redis 的设计目标是支持快速的数据读写和发布订阅功能,适用于缓存、计数、...
《ReentrantLock流程浅析——深入理解Java并发编程》 ReentrantLock,即可重入锁,是Java并发包(java.util.concurrent.locks)中的一个核心组件,它提供了比synchronized更灵活的锁机制。ReentrantLock实现了Lock...
这种状态通常被称为“无阻塞”,表明交换机的内部资源足以处理所有同时到达的帧,不存在处理队列的阻塞现象。 无阻塞特性对于高带宽需求的网络环境至关重要,因为它确保了数据的高效流动,避免了潜在的拥塞问题。在...
此外,消息队列还可以实现异步处理,提高系统的响应速度,例如将非实时需求的邮件或短信发送任务放入队列,避免阻塞主线程。 在众多的消息队列中间件中,为何选择Redis呢?Redis是一个内存数据库,具备高速读写能力...
"浅析Linux进程通信的几种方式及其比较" Linux 进程通信是操作系统中一个非常重要的概念。进程通信是指至少两个进程之间传送数据或者信号的一些技术和方法。进程是计算机系统分配资源的基本单位,每个进程都有自己...
* 首先定义了四个数据结构:BLOCK、DCT、SDT、COCT和CHCT,分别表示阻塞队列、设备控制表、系统设备表、控制器控制表和通道控制表。 * 然后,定义了初始化函数init(),用于初始化SDT和DCT系统原有4个设备:K、M、T和...
另一种是处理耗时任务,比如发送邮件、处理大数据等,这些操作可以异步执行,避免阻塞主请求处理流程。 Laravel 5中队列的基本配置流程大致如下: 1. 配置文件配置: Laravel 5将队列配置信息定义在`config/queue...
【浅析设备驱动程序通知应用程序的几种方法】 设备驱动程序是操作系统的核心组成部分,它们作为硬件与上层软件之间的桥梁,负责处理硬件相关的低级任务,如I/O操作、硬件中断处理、DMA(Direct Memory Access)传输...
工作队列(Work Queues)是最通用的Bottom Half形式,适用于长时间、可能会阻塞的任务,如I/O操作。工作队列可以在任何CPU上执行,甚至在中断上下文之外,这使得它们能够执行复杂的操作,如唤醒进程、调度任务等。 ...
为了简化`Handler`的使用,还可以使用`post(Runnable)`方法,它会在主线程的下一次事件循环时执行Runnable中的代码,这在需要在UI线程执行非阻塞任务时非常方便: ```java private Handler mHandler = new Handler...
在Android程序中,由于主线程(UI线程)负责绘制和更新用户界面,因此耗时操作如网络请求、大文件读写等不应在主线程执行,以免阻塞UI,导致应用无响应(ANR)。`Handler`提供了一种机制,使得后台线程可以安全地...
4. 使用异步处理模型,如消息队列,处理非实时性需求的任务。 对于需要与其他系统进行通讯的场景,可以根据需求选择合适的技术方案: 1. 如果需要跨平台、跨语言交互,可以选择WebService,它具有良好的互操作性。 ...
activeMQ作为一款流行的开源消息队列,它支持JMS(Java Message Service)标准,提供了高效、可靠的异步通信能力。在使用activeMQ时,客户端发送消息有两种主要方式:同步发送和异步发送。 1. **同步发送**: 同步...
软件通过封装通道管理、协议解析、信息分发、优先级控制、队列管理和信息安全等操作,为业务应用软件提供统一的输入输出接口和透明的传输服务,实现与其他应用软件的解耦合。 此外,软件采用了四层架构设计,每层...
尽管JavaScript是单线程的,但通过尽可能将操作放到系统内核执行,事件循环允许Node.js执行非阻塞I/O操作。 由于现代大多数内核都是多线程的,因此它们可以处理在后台执行的多个操作。 当其中一个操作完成时,内核会...
在Android系统中,主线程(也称为UI线程)负责处理用户交互和绘制界面,而其他工作通常在后台线程执行以避免阻塞UI。由于Android的安全机制,非UI线程不能直接修改UI元素,这就引入了Handler、Looper和Message三个...
当服务器接收到请求时,并不会直接处理,而是先关闭连接,然后在事件队列中排队。随后,事件循环机制会监听这些事件,一旦某个事件发生,就会触发对应的回调函数。这种模型可以让服务器持续接收新的请求,并尽快地...
GCD的队列(Dispatch Queue)可以用来控制任务的执行顺序,同时它的同步和异步函数可以用来防止数据竞争。 总的来说,理解并熟练掌握iOS应用中的线程间通信和线程安全对于开发高性能、稳定的应用至关重要。开发者...
- **可运行(Runnable)**:`start()`方法已被调用,线程可能正在CPU的调度队列中等待执行。 - **运行(Running)**:线程正在执行`run()`方法。 - **阻塞(Blocked)**:线程因为某种原因(如等待锁、等待I/O等...