在executor里,你可以运行以下两种任务:
- 基于Runnable接口的任务:该任务实现run()方法,但该方法不具返回值
- 基于Callable接口:该任务实现了call()接口并返回一个对象作为结果。call()接口返回的对象类型由Callable接口里的泛型参数决定。executor会返回一个Future接口实现类的对象。
Callable接口
Callable接口具有以下特性:
- 它有一个类型参数来决定call()方法返回对象的类型
- 声明了call()方法。当任务被executor执行时该方法会被调用。它必须具有返回值
- call()方法能够抛出任何的checked异常。你可以通过实现自定义的executor并重载 afterExecute() 方法来处理这个异常
Future接口
当你发送一个Callable任务给executor时,executor会返回一个Future接口实现类的对象,该对象允许你控制任务的执行,任务的状态以及获取任务返回的结果。该接口的主要特性有:
- 可以调用cancel()方法来取消任务。这个方法接收一个boolean参数来指定你是否中断运行中的任务。
- 你可以检验一个任务是否已经被取消了(使用isCancelled()方法)或是否已经完成(使用isDone()方法)
- 你可以通过get()方法来获取任务的返回值。该方法有两种形式:1. 无参数形式,当任务结束时返回运行结果。如果任务没有结束,运行任务的线程被暂停直至任务结束。2. 有参数形式,传入的参数为timeout时间。如果timeout时间之内任务不能完成,则抛出TimeoutException异常。
示范例子 - 最佳单词匹配算法
该算法的目的是为了找出和传入的单词最接近的单词。在这个例子中,我们实现了两个方法:
- 第一个方法使用Levenshtein距离来找出所有最接近输入的字符串的单词
- 第二个方法使用Levenshtein距离来确认一个字符串是否存在于我们的单词集里(当然使用equals()方法更快,但是我们这个例子只是为了演示多线程)
我们将会实现一个串行和两个并行两个版本来验证并行能否帮助我们提高效率。以下代码分为串行和并行版本的公共类,以及它们各自特有的类。
公共类
我们将需要以下三个基础类:
- WordsLoader类:读取单词集到到内存中 (我们忽略该类的代码,它只是单纯的从文本文档中读取字符串)
- LevenshteinDistance类:计算两个字符串之间的Levenshtein距离
- BestMatchingData类:保存单词匹配结果。它保存了一组相似单词以及它们和输入字符串之间的距离
/** * 该类的calculate()方法接收两个字符串参数,计算并返回这两个字符串之间的Levenshtein距离 */ public class LevenshteinDistance { public static int calculate (String string1, String string2) { int[][] distances=new int[string1.length()+1][string2.length()+1]; for (int i=1; i<=string1.length();i++) { distances[i][0]=i; } for (int j=1; j<=string2.length(); j++) { distances[0][j]=j; } for(int i=1; i<=string1.length(); i++) { for (int j=1; j<=string2.length(); j++) { if (string1.charAt(i-1)==string2.charAt(j-1)) { distances[i][j]=distances[i-1][j-1]; } else { distances[i][j]=minimum(distances[i-1][j], distances[i][j-1],distances[i-1][j-1])+1; } } } return distances[string1.length()][string2.length()]; } private static int minimum(int i, int j, int k) { return Math.min(i,Math.min(j, k)); } }
public class BestMatchingData { private List<String> words; private int distance; public void setWords(List<String> words) { this.words = words; } public void setDistance(int distance) { this.distance = distance; } public List<String> getWords() { return words; } public int getDistance() { return distance; } }
串行版本
以下两个类实现串行版本:
- BestMatchingSerialCalculation类,计算得出和输入字符串最相似的一组单词。
- BestMatchingSerialMain,main()方法运行算法,记录算法运行时间以及打印出结果
public class BestMatchingSerialCalculation { public static BestMatchingData getBestMatchingWords(String word, List<String> dictionary) { List<String> results=new ArrayList<>(); int minDistance=Integer.MAX_VALUE; int distance; for (String str: dictionary) { distance=LevenshteinDistance.calculate(word,str); if (distance<minDistance) { results.clear(); minDistance=distance; results.add(str); } else if (distance==minDistance) { results.add(str); } } BestMatchingData result=new BestMatchingData(); result.setWords(results); result.setDistance(minDistance); return result; } } public class BestMatchingSerialMain { public static void main(String[] args) { Date startTime, endTime; List<String> dictionary=WordsLoader.load("data/UK Advanced" + "Cryptics Dictionary.txt"); System.out.println("Dictionary Size: "+dictionary.size()); startTime=new Date(); BestMatchingData result= BestMatchingSerialCalculation.getBestMatchingWords (args[0], dictionary); List<String> results=result.getWords(); endTime=new Date(); System.out.println("Word: "+args[0]); System.out.println("Minimum distance: " +result.getDistance()); System.out.println("List of best matching words: " +results.size()); results.forEach(System.out::println); System.out.println("Execution Time: "+(endTime.getTime()- startTime.getTime())); } }
第一个并行版本
这个版本是基于Callable接口和AbstractExecutorService接口定义的submit()方法,我们用以下三个类来实现:
- BestMatchingBasicTask类:该类实现Callable接口
- BestMatchingBasicConcurrentCalculation类:该类创建executor和所需的任务并发送给executor
- BestMatchingConcurrentMain:主文件,运行算法并打印结果
该版本中,我们把几个任务返回的Future对象保存在List中,然后待任务全部结束后我们一次从各个Future对象获取返回的数据并寻找最接近的单词。
/** * 此类负责处理单词集的一部分单词 * @param startIndex 任务所负责处理的单词集的开始位置(包含此位置的单词) * @param endIndex 任务所负责处理的单词集的结束位置(不包含次位置的单词) * @param dictionary 需要处理的单词集 * @param word 输入的单词样本 */ public class BestMatchingBasicTask implements Callable <BestMatchingData > { private int startIndex; private int endIndex; private List < String > dictionary; private String word; public BestMatchingBasicTask(int startIndex, int endIndex, List < String > dictionary, String word) { this.startIndex = startIndex; this.endIndex = endIndex; this.dictionary = dictionary; this.word = word; } @Override public BestMatchingData call() throws Exception { List<String> results=new ArrayList<>(); int minDistance=Integer.MAX_VALUE; int distance; for (int i=startIndex; i<endIndex; i++) { distance = LevenshteinDistance.calculate (word,dictionary.get(i)); if (distance < minDistance) { results.clear(); minDistance=distance; results.add(dictionary.get(i)); } else if (distance==minDistance) { results.add(dictionary.get(i)); } } BestMatchingData result=new BestMatchingData(); result.setWords(results); result.setDistance(minDistance); return result; } }
public class BestMatchingBasicConcurrentCalculation { public static BestMatchingData getBestMatchingWords( String word, List<String> dictionary) throws InterruptedException,ExecutionException { int numCores = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numCores); int size = dictionary.size(); int step = size / numCores; int startIndex, endIndex; List<Future<BestMatchingData>> results = new ArrayList<>(); for (int i = 0; i < numCores; i++) { startIndex = i * step; if (i == numCores - 1) { endIndex = dictionary.size(); } else { endIndex = (i + 1) * step; } BestMatchingBasicTask task = new BestMatchingBasicTask(startIndex, endIndex, dictionary, word); Future<BestMatchingData> future = executor.submit(task); results.add(future); } executor.shutdown(); List<String> words=new ArrayList<>(); int minDistance=Integer.MAX_VALUE; for (Future<BestMatchingData> future: results) { BestMatchingData data=future.get(); if (data.getDistance() < minDistance) { words.clear(); minDistance=data.getDistance(); words.addAll(data.getWords()); } else if (data.getDistance() == minDistance) { words.addAll(data.getWords()); } } BestMatchingData result=new BestMatchingData(); result.setDistance(minDistance); result.setWords(words); return result; } }
public class BestMatchingConcurrentMain { public static void main(String[] args) { Date startTime, endTime; List<String> dictionary=WordsLoader.load("data/UK Advanced" + "Cryptics Dictionary.txt"); System.out.println("Dictionary Size: "+dictionary.size()); startTime=new Date(); try { BestMatchingData result = BestMatchingBasicConcurrentCalculation.getBestMatchingWords (args[0], dictionary); List<String> results = result.getWords(); endTime = new Date(); System.out.println("Word: " + args[0]); System.out.println("Minimum distance: " + result.getDistance()); System.out.println("List of best matching words: " + results.size()); results.forEach(System.out::println); System.out.println("Execution Time: " + (endTime.getTime() - startTime.getTime())); } catch (Exception ex) { ex.printStackTrace(); } } }
第二个并行版本
在这个版本中,我们使用AbstractExecutorService (在ThreadPoolExecutorClass类中实现) 的invokeAll() 方法。第一个版本的并行算法中,我们只用了submit()方法,接收一个Callable对象并返回Future对象。invokeAll() 方法接收一组Callable对象作为参数,并返回一组Future对象。队列中的第一个callable对象对应返回队列中的第一个Future对象,以此类推。submit()和invokeAll()方法另一个重要差异是:submit()方法马上返回,而invokeAll()方法则在所有的Callable任务完成后返回。这意味着调用invokeAll()方法后,如果调用返回回来Future对象的isDone()方法,将返回true。
/** * 此类负责处理单词集的一部分单词 * @param startIndex 任务所负责处理的单词集的开始位置(包含此位置的单词) * @param endIndex 任务所负责处理的单词集的结束位置(不包含次位置的单词) * @param dictionary 需要处理的单词集 * @param word 输入的单词样本 */ public class BestMatchingBasicTask implements Callable <BestMatchingData > { private int startIndex; private int endIndex; private List < String > dictionary; private String word; public BestMatchingBasicTask(int startIndex, int endIndex, List < String > dictionary, String word) { this.startIndex = startIndex; this.endIndex = endIndex; this.dictionary = dictionary; this.word = word; } @Override public BestMatchingData call() throws Exception { List<String> results=new ArrayList<>(); int minDistance=Integer.MAX_VALUE; int distance; for (int i=startIndex; i<endIndex; i++) { distance = LevenshteinDistance.calculate (word,dictionary.get(i)); if (distance < minDistance) { results.clear(); minDistance=distance; results.add(dictionary.get(i)); } else if (distance==minDistance) { results.add(dictionary.get(i)); } } BestMatchingData result=new BestMatchingData(); result.setWords(results); result.setDistance(minDistance); return result; } } public class BestMatchingAdvancedConcurrentCalculation { public static BestMatchingData getBestMatchingWords( String word, List<String> dictionary) throws InterruptedException, ExecutionException { int numCores = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numCores); int size = dictionary.size(); int step = size / numCores; int startIndex, endIndex; List<BestMatchingBasicTask> tasks = new ArrayList<>(); for (int i = 0; i < numCores; i++) { startIndex = i * step; if (i == numCores - 1) { endIndex = dictionary.size(); } else { endIndex = (i + 1) * step; } BestMatchingBasicTask task = new BestMatchingBasicTask(startIndex, endIndex, dictionary, word); tasks.add(task); } // 这里我们使用invokeAll()方法传入一组任务而不是submit()方法传入单个任务 List<Future<BestMatchingData>> results = executor.invokeAll(tasks); executor.shutdown(); List<String> words = new ArrayList<>(); int minDistance = Integer.MAX_VALUE; for (Future<BestMatchingData> future : results) { BestMatchingData data = future.get(); if (data.getDistance() < minDistance) { words.clear(); minDistance = data.getDistance(); words.addAll(data.getWords()); } else if (data.getDistance()== minDistance) { words.addAll(data.getWords()); } } BestMatchingData result = new BestMatchingData(); result.setDistance(minDistance); result.setWords(words); return result; } } public class BestMatchingConcurrentAdvancedMain { public static void main(String[] args) { Date startTime, endTime; List<String> dictionary=WordsLoader.load("data/UK Advanced" + "Cryptics Dictionary.txt"); System.out.println("Dictionary Size: "+dictionary.size()); startTime=new Date(); try { BestMatchingData result = BestMatchingAdvancedConcurrentCalculation.getBestMatchingWords (args[0], dictionary); List<String> results = result.getWords(); endTime = new Date(); System.out.println("Word: " + args[0]); System.out.println("Minimum distance: " + result.getDistance()); System.out.println("List of best matching words: " + results.size()); results.forEach(System.out::println); System.out.println("Execution Time: " + (endTime.getTime() - startTime.getTime())); } catch (Exception ex) { ex.printStackTrace(); } } }
一些重要的方法
本章我们使用了来自AbstractExecutorService接口 (实现于ThreadPoolExecutor类)以及CompletionService接口 (实现于ExecutorCompletionService类) 的一些方法来管理 Callable 任务的返回值。同时它们还提供了以下有用的方法:
- invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):这个方法在所有任务执行完成后或者指定的timeout时间结束后返回一组和 Callable 任务相关联的 Future对象 (如果timeout时间到了但有些任务未被完成,那么这些任务将被取消)
- invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):这个任务返回在指定的timeout时间内第一个顺利完成的Callable任务。如果没有一个任务在timeout时间前完成,那么将抛出TimeoutException异常。
关于CompletionService接口,提供了如下重要方法:
- poll() 方法:这个方法返回并删除自从上一次调用poll()或take()方法之后的下一个完成的任务返回的Future对象。如果此时没有任务完成,那么则返回 null 值
- take() 方法:这个方法和 poll() 方法类似,但是如果没有任务完成,那么执行任务的线程将进入休眠知道有任务完成
相关推荐
Java使用Callable和Future创建线程操作示例主要介绍了Java使用Callable和Future创建线程操作,结合实例形式分析了java使用Callable接口和Future类创建线程的相关操作技巧与注意事项。 首先,Java 5开始,Java提供了...
在Java多线程编程中,`Callable`接口和`Future`接口是两个重要的组件,它们提供了比`Runnable`接口更强大的功能,尤其是在处理异步计算和结果获取时。本文将详细介绍这两个接口以及如何使用它们来创建自定义任务类以...
通过上述分析,我们可以看出,在Java中,使用`Callable`接口和`Future`或`FutureTask`可以轻松地实现异步执行任务并获取结果。这种方法非常适合那些需要等待长时间运行的任务完成并获取结果的场景。同时,通过使用...
- 高级特性:线程池,Callable和Future接口,Executor框架。 8. **异常处理**(第07章) - 异常分类:检查型异常和运行时异常。 - try-catch-finally:异常捕获和处理,finally块的作用。 - throws和throw...
第十四章Java线程深入探讨了Java平台如何支持并发编程。 1. **线程的创建与启动** - 创建线程有两种主要方式:继承`Thread`类或实现`Runnable`接口。继承`Thread`类直接创建新的线程类,而实现`Runnable`接口则...
你需要理解Java并发编程的基本概念,如synchronized关键字、Callable和Future接口,或者使用CompletableFuture进行异步操作。 6. **短信模板与签名**:第三方平台通常会提供预设的短信模板,包含动态变量,比如...
10. **并发工具类**:Java并发库提供了一些高级工具,如ExecutorService、Future、Callable、Semaphore等,它们使得并发编程更加安全和高效。 通过深入学习这些主题,你将能够编写出更健壮、高效和易于维护的Java...
4. **第四章:线程池** - 讨论了ThreadPoolExecutor的工作原理和配置,以及如何有效地使用Executor框架来管理线程生命周期,提高系统效率并减少资源消耗。 5. **第五章:原子变量** - 这一章可能深入讲解了Java的...
- **第4章:wait与sleep的区别** 比较`wait`和`sleep`方法的不同之处,以及如何正确唤醒等待中的线程。 - **第5章:Future的使用** 讲解如何创建`Future`对象,如何从`Future`获取结果,以及如何取消正在执行的...
- Future与Callable接口 - **并发实用案例** - 生产者消费者模式 - 并发集合与队列 #### 六、网络编程 - **网络基础** - TCP/IP协议栈简介 - Socket编程 - **客户端/服务器架构** - 客户端与服务器端编程技巧...
第十一章可能涉及到多线程编程,涵盖Thread类的使用,同步机制(synchronized关键字,wait()、notify()和notifyAll()方法),以及Callable和Future接口用于创建和管理异步任务。 9. **intro.pdf** - 序言 序言...
- Future接口代表异步计算的结果,可以检查计算是否完成,获取结果,或者取消计算。 - Callable接口类似于Runnable,但可以返回一个结果,FutureTask实现了Future和Runnable,可用于线程池。 通过学习和实践"Java...
* 异步计算的结果:包括Future接口和实现Future接口的FutureTask类,代表异步计算的结果。 4. Executor框架的使用 使用Executor框架可以分为三步: 1. 主线程首先要创建实现Runnable接口或者Callable接口的任务...
Future接口表示Callable任务的结果,可以检查任务是否完成、获取结果或取消任务。 七、并发工具类 Java并发包(java.util.concurrent)提供了丰富的工具类,如Semaphore(信号量)、CountDownLatch(计数器)、...
7. **Callable、Future和FutureTask** - `Callable`接口类似Runnable,但可以返回结果并抛出异常。 - `Future`接口代表异步计算的结果,提供了检查任务是否完成、获取结果、取消任务等方法。 - `FutureTask`是`...
《JAVA核心技术--高级特征(第八版)--第四部分》聚焦于Java编程语言的高级特性,这部分内容通常涉及到更复杂的编程概念和技术,是Java开发者进阶学习的重要篇章。在这一部分中,我们可以期待涵盖以下核心知识点: 1....
- 第四种:通过 `Map.values()` 获取值集合,遍历所有值。这种方法只适用于不需要键的情况。 2. **HashMap 的特性**: - 允许空键和空值,但键只能为空一次。 - 无序性,插入顺序可能改变。 - 键需重写 `...
《第10章 多线程》主要涵盖了关于并发、并行、进程和线程的基础概念,以及如何在Java中创建和管理线程。以下是详细的知识点解析: 1. **并发与并行**: - **并发**:指的是在一段时间内,多个事件看似同时发生,但...
在JDK中,java.util.concurrent.Future接口代表了这种异步计算的结果。以下是如何使用Future模式的步骤: 1. **创建Future**: 使用ExecutorService的submit()方法提交Callable任务,返回一个Future对象,表示任务的...