阻塞队列和生产者-消费模式
示例:桌面搜索
结合
程序清单5-8
程序清单5-9
程序清单5-9
程序清单7-11
程序清单7-18
程序清单7-19
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
public class IndexingService {
private static final File POISON=new File("");
private final IndexThread consumer=new IndexThread();
private final CrawlerThread producer=new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
private final Map<File,String> alreadyIndexFileMap=new ConcurrentHashMap<File,String>();
public IndexingService(BlockingQueue<File> queue,FileFilter fileFilter,File root){
this.queue=queue;
this.fileFilter=fileFilter;
this.root=root;
}
class CrawlerThread extends Thread {
public void run(){
try {
crawl(root);
} catch (InterruptedException e){
e.printStackTrace();
} finally {
while (true){
try {
queue.put(POISON);
break;
} catch (InterruptedException ie){
}
}
}
}
private void crawl(File root) throws InterruptedException{
System.out.println("crawl File:"+root.getAbsolutePath());
File[] entries=root.listFiles();
if (entries!=null){
for (File entry:entries){
System.out.println("file:"+entry.getName());
if (entry.isDirectory()){
crawl(entry);
} else if (fileFilter.accept(entry)){
if (!alreadyIndexed(entry)){
queue.put(entry);
}
}
}
}
}
private boolean alreadyIndexed(File entry){
return alreadyIndexFileMap.containsKey(entry);
}
}
class IndexThread extends Thread {
public void run(){
try {
while (true){
File file=queue.take();
if (file==POISON){
break;
} else {
indexFile(file);
}
}
} catch (InterruptedException counsumed) {
}
}
private void indexFile(File file){
System.out.println("indexFile:"+file.getPath());
}
}
public void start(){
producer.start();
consumer.start();
}
public void stop(){
System.out.println("stop*******************************");
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<File> queue=new LinkedBlockingQueue<File>(10);
FileFilter fileFilter=new FileFilter(){
public boolean accept(File file){
if (file.getName().endsWith(".doc") || file.getName().endsWith(".docx")){
return true;
}
return false;
}
};
File root=Paths.get("D:\\doc\\微云网盘").toFile();
IndexingService is=new IndexingService(queue,fileFilter,root);
is.start();
//2s
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
is.stop();
is.awaitTermination();
}
}
改成两个生产者一个消费者
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class IndexingServiceTwoProducerOneConsumer {
private static final File POISON1=new File("");
private static final File POISON2=new File("");
private final IndexThread consumer=new IndexThread();
private final CrawlerThread producerOne;
private final CrawlerThread producerTwo;
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root1;
private final File root2;
private final Map<File,String> alreadyIndexFileMap=new ConcurrentHashMap<File,String>();
IndexingServiceTwoProducerOneConsumer(BlockingQueue<File> queue,FileFilter fileFilter,File root1,File root2){
this.queue=queue;
this.fileFilter=fileFilter;
this.root1=root1;
this.root2=root2;
producerOne=new CrawlerThread(this.root1,POISON1);
producerTwo=new CrawlerThread(this.root2,POISON2);
}
public void start(){
producerOne.start();
producerTwo.start();
consumer.start();
}
public void stop(){
System.out.println("stop");
producerOne.interrupt();
producerTwo.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<File> queue=new LinkedBlockingQueue<File>(10);
FileFilter fileFilter=new FileFilter(){
public boolean accept(File file){
if (file.getName().endsWith(".doc") || file.getName().endsWith(".docx")){
return true;
}
return false;
}
};
File root1=Paths.get("D:\\doc\\dir1").toFile();
File root2=Paths.get("D:\\doc\\dir2").toFile();
IndexingServiceTwoProducerOneConsumer is=new IndexingServiceTwoProducerOneConsumer(queue,fileFilter,root1,root2);
is.start();
//2s
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
is.stop();
is.awaitTermination();
}
class CrawlerThread extends Thread {
private File root;
private File poison;
CrawlerThread(File root,File poison){
this.root=root;
this.poison=poison;
}
public void run(){
try {
crawl(root);
} catch (InterruptedException e){
} finally {
while (true){
try {
queue.put(poison);
break;
} catch (InterruptedException ie){
}
}
}
}
private void crawl(File root) throws InterruptedException{
System.out.println("crawl File:"+root.getAbsolutePath());
File[] entries=root.listFiles();
if (entries!=null){
for (File entry:entries){
System.out.println("file:"+entry.getName());
if (entry.isDirectory()){
crawl(entry);
} else if (fileFilter.accept(entry)){
if (!alreadyIndexed(entry)){
queue.put(entry);
}
}
}
}
}
private boolean alreadyIndexed(File entry){
return alreadyIndexFileMap.containsKey(entry);
}
}
class IndexThread extends Thread {
final AtomicBoolean flag1=new AtomicBoolean(false);
final AtomicBoolean flag2=new AtomicBoolean(false);
public void run(){
try {
while (true){
File file=queue.take();
if (file==POISON1){
flag1.set(true);
} else if (file==POISON2){
flag2.set(true);
} else {
indexFile(file);
}
if (flag1.get() && flag2.get()){
break;
}
}
} catch (InterruptedException counsumed) {
}
}
private void indexFile(File file){
System.out.println("indexFile:"+file.getPath());
}
}
}
改成一个生产者三个消费者
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
public class IndexingServiceOneProducerThreeConsumer {
private static final File POISON=new File("");
private final IndexThread consumer1=new IndexThread();
private final IndexThread consumer2=new IndexThread();
private final IndexThread consumer3=new IndexThread();
private final CrawlerThread producer=new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
private final Map<File,String> alreadyIndexFileMap=new ConcurrentHashMap<File,String>();
public IndexingServiceOneProducerThreeConsumer(BlockingQueue<File> queue,FileFilter fileFilter,File root){
this.queue=queue;
this.fileFilter=fileFilter;
this.root=root;
}
class CrawlerThread extends Thread {
public void run(){
try {
crawl(root);
} catch (InterruptedException e){
e.printStackTrace();
} finally {
while (true){
try {
queue.put(POISON);
System.out.println("put poison ok");
break;
} catch (InterruptedException ie){
}
}
}
}
private void crawl(File root) throws InterruptedException{
System.out.println("crawl File:"+root.getAbsolutePath());
File[] entries=root.listFiles();
if (entries!=null){
for (File entry:entries){
System.out.println("file:"+entry.getName());
if (entry.isDirectory()){
crawl(entry);
} else if (fileFilter.accept(entry)){
if (!alreadyIndexed(entry)){
queue.put(entry);
}
}
}
}
}
private boolean alreadyIndexed(File entry){
return alreadyIndexFileMap.containsKey(entry);
}
}
class IndexThread extends Thread {
public void run(){
try {
while (true){
File file=queue.take();
if (file==POISON){
System.out.println("thread Name:"+Thread.currentThread().getName());
System.out.println("get poison ok");
//有n个消费者就丢n-1个
queue.put(POISON);
queue.put(POISON);
break;
} else {
indexFile(file);
}
}
} catch (InterruptedException counsumed) {
}
}
private void indexFile(File file){
System.out.println("indexFile:"+file.getPath());
}
}
public void start(){
producer.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
public void stop(){
System.out.println("stop*******************************");
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer1.join();
consumer2.join();
consumer3.join();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<File> queue=new LinkedBlockingQueue<File>(10);
FileFilter fileFilter=new FileFilter(){
public boolean accept(File file){
if (file.getName().endsWith(".doc") || file.getName().endsWith(".docx")){
return true;
}
return false;
}
};
File root=Paths.get("D:\\doc\\dir").toFile();
IndexingServiceOneProducerThreeConsumer is=new IndexingServiceOneProducerThreeConsumer(queue,fileFilter,root);
is.start();
//2s
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
is.stop();
is.awaitTermination();
}
}
分享到:
相关推荐
Using the concurrency building blocks in java.util.concurrent Performance optimization dos and don'ts Testing concurrent programs Advanced topics such as atomic variables, nonblocking algorithms, ...
Java Concurrency in Practice JAVA并发编程实践中文版(全)第二部分
《Java并发编程实践》是一本由Brian Goetz等人编写的关于Java并发编程的经典著作。本书深入浅出地介绍了Java 5.0及之后版本中新增加的并发特性,并对并发编程进行了全面而详尽的讲解。自发布以来,这本书因其内容的...
《Java并发编程实践》是Java开发者深入理解和应用并发编程的权威指南,这本书全面覆盖了Java并发编程的各种核心概念和技术,旨在帮助程序员编写出高效、安全的并发代码。书中的内容既包括理论知识,也包含丰富的实战...
《JAVA并发编程实践》随着多核处理器的普及,使用并发成为构建高性能应用程序的关键。Java 5以及6在开发并发程序中取得了显著的进步,提高了Java虚拟机的性能以及并发类的可伸缩性,并加入了丰富的新并发构建块。在...
《Java并发编程实践》是Java开发者必读的经典之作,由Brian Goetz等多位专家共同撰写。这本书深入浅出地探讨了Java平台上的并发问题,帮助读者理解和掌握如何编写高效、可靠且可维护的多线程应用程序。以下是该书...
《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...
《JAVA并发编程实践》既能够成为读者的理论支持,又可以作为构建可靠的、可伸缩的、可维护的并发程序的技术支持。《JAVA并发编程实践》并不仅仅提供并发API的清单及其机制,还提供了设计原则、模式和思想模型,使...
《Java Concurrency in Practice》是Java并发编程领域的一本权威著作,由Brian Goetz、Tim Peierls、Joshua Bloch、David Holmes和Doug Lea等多位Java并发领域的专家共同编写。这本书深入探讨了Java平台上的多线程和...
- **书名**:《Java并发实践》(Java Concurrency in Practice) - **作者**:Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, Doug Lea - **出版社**:Addison Wesley Professional - **...
博文链接:https://projector.iteye.com/blog/209730
《JAVA并发编程实践》适合于具有一定Java编程经验的程序员、希望了解Java SE 5以及6在线程技术上的改进和新特性的程序员,以及Java和并发编程的爱好者。 作者简介 作者:(美)戈茨 等 本书作者系lava标准化组织...
《C++并发编程实践》这本书由Anthony Williams编写,是一本深入讲解C++多线程编程技术的专业书籍。本书旨在帮助读者掌握C++中的并发编程技巧,并通过大量的示例代码来加深理解。 **并发编程**是指在计算机程序中...
Java Concurrency in Practice 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者...
根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...
### Java并发编程实践知识点详解 #### 一、Java并发编程基础 ##### 1.1 并发与并行概念区分 在Java并发编程实践中,首先需要理解“并发”与“并行”的区别。“并发”指的是多个任务同时进行,但实际上可能是在多...
《Java Concurrency in Practice》是Java并发编程领域的一本经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和Doug Lea等专家共同编写。这本书深入探讨了Java平台上的多线程和并发编程,旨在...