`
zhaomengsen
  • 浏览: 210233 次
  • 性别: Icon_minigender_1
  • 来自: 河北
社区版块
存档分类
最新评论

JVM 并发性: Java 和 Scala 并发性基础(1)

阅读更多
转载于:转载于:http://www.itxuexiwang.com/a/javajishu/jvm_jdk_yuanmafenxi/2016/0129/50.html?1454076380

处理器速度数十年来一直持续快速发展,并在世纪交替之际走到了终点。从那时起,处理器制造商更多地是通过增加核心来提高芯片性能,而不再通过增加时钟速率来提高芯片性能。多核系统现在成为了从手机到企业服务器等所有设备的标准,而这种趋势可能继续并有所加速。开发人员越来越需要在他们的应用程序代码中支持多个核心,这样才能满足性能需求。

在本系列文章中,您将了解一些针对 Java 和 Scala 语言的并发编程的新方法,包括 Java 如何将 Scala 和其他基于 JVM 的语言中已经探索出来的理念结合在一起。第一期文章将介绍一些背景,通过介绍 Java 7 和 Scala 的一些最新技术,帮助了解 JVM 上的并发编程的全景。您将了解如何使用 Java ExecutorService 和 ForkJoinPool 类来简化并发编程。还将了解一些将并发编程选项扩展到纯 Java 中的已有功能之外的基本 Scala 特性。在此过程中,您会看到不同的方法对并发编程性能有何影响。后续几期文章将会介绍 Java 8 中的并发性改进和一些扩展,包括用于执行可扩展的 Java 和 Scala 编程的 Akka 工具包。

Java 并发性支持
在 Java 平台诞生之初,并发性支持就是它的一个特性,线程和同步的实现为它提供了超越其他竞争语言的优势。Scala 基于 Java 并在 JVM 上运行,能够直接访问所有 Java 运行时(包括所有并发性支持)。所以在分析 Scala 特性之前,我首先会快速回顾一下 Java 语言已经提供的功能。

Java 线程基础
在 Java 编程过程中创建和使用线程非常容易。它们由 java.lang.Thread 类表示,线程要执行的代码为 java.lang.Runnable 实例的形式。如果需要的话,可以在应用程序中创建大量线程,您甚至可以创建数千个线程。在有多个核心时,JVM 使用它们来并发执行多个线程;超出核心数量的线程会共享这些核心。

Java 5:并发性的转折点
Java 从一开始就包含对线程和同步的支持。但在线程间共享数据的最初规范不够完善,这带来了 Java 5 的 Java 语言更新中的重大变化 (JSR-133)。Java Language Specification for Java 5 更正并规范化了 synchronized#p#分页标题#e# 和 volatile 操作。该规范还规定不变的对象如何使用多线程。(基本上讲,只要在执行构造函数时不允许引用 “转义”,不变的对象始终是线程安全的。)以前,线程间的交互通常需要使用阻塞的 synchronized 操作。这些更改支持使用 volatile 在线程间执行非阻塞协调。因此,在 Java 5 中添加了新的并发集合类来支持非阻塞操作 — 这与早期仅支持阻塞的线程安全方法相比是一项重大改进。

线程操作的协调难以让人理解。只要从程序的角度让所有内容保持一致,Java 编译器和 JVM 就不会对您代码中的操作重新排序,这使得问题变得更加复杂。例如:如果两个相加操作使用了不同的变量,编译器或 JVM 可以安装与指定的顺序相反的顺序执行这些操作,只要程序不在两个操作都完成之前使用两个变量的总数。这种重新排序操作的灵活性有助于提高 Java 性能,但一致性只被允许应用在单个线程中。硬件也有可能带来线程问题。现代系统使用了多种缓存内存级别,一般来讲,不是系统中的所有核心都能同样看到这些缓存。当某个核心修改内存中的一个值时,其他核心可能不会立即看到此更改。

由于这些问题,在一个线程使用另一个线程修改的数据时,您必须显式地控制线程交互方式。Java 使用了特殊的操作来提供这种控制,在不同线程看到的数据视图中建立顺序。基本操作是,线程使用 synchronized 关键字来访问一个对象。当某个线程在一个对象上保持同步时,该线程将会获得此对象所独有的一个锁的独占访问。如果另一个线程已持有该锁,等待获取该锁的线程必须等待,或者被阻塞,直到该锁被释放。当该线程在一个 synchronized 代码块内恢复执行时,Java 会保证该线程可以 “看到了” 以前持有同一个锁的其他线程写入的所有数据,但只是这些线程通过离开自己的 synchronized 锁来释放该锁之前写入的数据。这种保证既适用于编译器或 JVM 所执行的操作的重新排序,也适用于硬件内存缓存。一个 synchronized 块的内部是您代码中的一个稳定性孤岛,其中的线程可依次安全地执行、交互和共享信息。

在变量上对 volatile 关键字的使用,为线程间的安全交互提供了一种稍微较弱的形式。synchronized 关键字可确保在您获取该锁时可以看到其他线程的存储,而且在您之后,获取该锁的其他线程也会看到您的存储。volatile 关键字将这一保证分解为两个不同的部分。如果一个线程向volatile 变量写入数据,那么首先将会擦除它在这之前写入的数据。如果某个线程读取该变量,那么该线程不仅会看到写入该变量的值,还会看到写入的线程所写入的其他所有值。所以读取一个 #p#分页标题#e#volatile 变量会提供与输入 一个 synchronized 块相同的内存保证,而且写入一个volatile 变量会提供与离开 一个 synchronized 块相同的内存保证。但二者之间有很大的差别:volatile 变量的读取或写入绝不会受阻塞。

抽象 Java 并发性
同步很有用,而且许多多线程应用程序都是在 Java 中仅使用基本的 synchronized 块开发出来的。但协调线程可能很麻烦,尤其是在处理许多线程和许多块的时候。确保线程仅在安全的方式下交互并 避免潜在的死锁(两个或更多线程等待对方释放锁之后才能继续执行),这很困难。支持并发性而不直接处理线程和锁的抽象,这为开发人员提供了处理常见用例的更好方法。

java.util.concurrent 分层结构包含一些集合变形,它们支持并发访问、针对原子操作的包装器类,以及同步原语。这些类中的许多都是为支持非阻塞访问而设计的,这避免了死锁的问题,而且实现了更高效的线程。这些类使得定义和控制线程之间的交互变得更容易,但他们仍然面临着基本线程模型的一些复杂性。

java.util.concurrent 包中的一对抽象,支持采用一种更加分离的方法来处理并发性:Future<T> 接口、Executor 和ExecutorService 接口。这些相关的接口进而成为了对 Java 并发性支持的许多 Scala 和 Akka 扩展的基础,所以更详细地了解这些接口和它们的实现是值得的。

Future<T> 是一个 T 类型的值的持有者,但奇怪的是该值一般在创建 Future 之后才能使用。正确执行一个同步操作后,才会获得该值。收到Future 的线程可调用方法来:

查看该值是否可用
等待该值变为可用#p#分页标题#e#
在该值可用时获取它
如果不再需要该值,则取消该操作
Future 的具体实现结构支持处理异步操作的不同方式。

Executor 是一种围绕某个执行任务的东西的抽象。这个 “东西” 最终将是一个线程,但该接口隐藏了该线程处理执行的细节。Executor 本身的适用性有限,ExecutorService 子接口提供了管理终止的扩展方法,并为任务的结果生成了 Future。Executor 的所有标准实现还会实现ExecutorService,所以实际上,您可以忽略根接口。

线程是相对重量级的资源,而且与分配并丢弃它们相比,重用它们更有意义。ExecutorService 简化了线程间的工作共享,还支持自动重用线程,实现了更轻松的编程和更高的性能。ExecutorService 的 ThreadPoolExecutor 实现管理着一个执行任务的线程池。

应用 Java 并发性
并发性的实际应用常常涉及到需要与您的主要处理逻辑独立的外部交互的任务(与用户、存储或其他系统的交互)。这类应用很难浓缩为一个简单的示例,所以在演示并发性的时候,人们通常会使用简单的计算密集型任务,比如数学计算或排序。我将使用一个类似的示例。

任务是找到离一个未知的输入最近的已知单词,其中的最近 是按照Levenshtein 距离 来定义的:将输入转换为已知的单词所需的最少的字符增加、删除或更改次数。我使用的代码基于 Wikipedia 上的 Levenshtein 距离 文章中的一个示例,该示例计算了每个已知单词的 Levenshtein 距离,并返回最佳匹配值(或者如果多个已知的单词拥有相同的距离,那么返回结果是不确定的)。

清单 1 给出了计算 Levenshtein 距离的 Java 代码。该计算生成一个矩阵,将行和列与两个对比的文本的大小进行匹配,在每个维度上加 1。为了提高效率,此实现使用了一对大小与目标文本相同的数组来表示矩阵的连续行,将这些数组包装在每个循环中,因为我只需要上一行的值就可以计算下一行。#p#分页标题#e#

清单 1. Java 中的 Levenshtein 距离计算
/** 
* Calculate edit distance from targetText to known word. 

* @param word known word 
* @param v0 int array of length targetText.length() + 1 #p#分页标题#e#
* @param v1 int array of length targetText.length() + 1 
* @return distance 
*/
private int editDistance(String word, int[] v0, int[] v1) { 
#p#分页标题#e#  
    // initialize v0 (prior row of distances) as edit distance for empty 'word' 
    for (int i = 0; i < v0.length; i++) { 
        v0[i] = i; 
    } 
   #p#分页标题#e#
    // calculate updated v0 (current row distances) from the previous row v0 
    for (int i = 0; i < word.length(); i++) { 
  
        // first element of v1 = delete (i+1) chars from target to match empty 'word' 
#p#分页标题#e#        v1[0] = i + 1; 
  
        // use formula to fill in the rest of the row 
        for (int j = 0; j < targetText.length(); j++) { 
            int#p#分页标题#e# cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1; 
            v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost); 
        } 
  
        // swap v1 (current row) and v0 (previous row) for next iteration #p#分页标题#e#
        int[] hold = v0; 
        v0 = v1; 
        v1 = hold; 
    } 
  
    // return final value representing best edit distance  #p#分页标题#e#
    return v0[targetText.length()]; 
}
如果有大量已知词汇要与未知的输入进行比较,而且您在一个多核系统上运行,那么您可以使用并发性来加速处理:将已知单词的集合分解为多个块,将每个块作为一个独立任务来处理。通过更改每个块中的单词数量,您可以轻松地更改任务分解的粒度,从而了解它们对总体性能的影响。清单 2 给出了分块计算的 Java 代码,摘自 示例代码 中的 ThreadPoolDistance 类。清单 2 使用一个标准的 ExecutorService,将线程数量设置为可用的处理器数量。

清单 2. 在 Java 中通过多个线程来执行分块的距离计算
private final ExecutorService threadPool;  #p#分页标题#e#
private final String[] knownWords; 
private final int blockSize; 
  
public ThreadPoolDistance(String[] words, int block) { 
#p#分页标题#e#    threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 
    knownWords = words; 
    blockSize = block; 

  
public DistancePair bestMatch(String target) { 
  
#p#分页标题#e#    // build a list of tasks for matching to ranges of known words 
    List<DistanceTask> tasks = new ArrayList<DistanceTask>(); 
  
    int size = 0; 
    for (int base = 0#p#分页标题#e#; base < knownWords.length; base += size) { 
        size = Math.min(blockSize, knownWords.length - base); 
        tasks.add(new DistanceTask(target, base, size)); 
    } 
    DistancePair best; 
    try { 
#p#分页标题#e#  
        // pass the list of tasks to the executor, getting back list of futures 
        List<Future<DistancePair>> results = threadPool.invokeAll(tasks); 
  
        // find the best result, waiting for each future to complete 
        best = DistancePair.WORST_CASE; 
#p#分页标题#e#        for (Future<DistancePair> future: results) { 
            DistancePair result = future.get(); 
            best = DistancePair.best(best, result); 
        } 
  
    } catch (InterruptedException e) { 
#p#分页标题#e#        throw new RuntimeException(e); 
    } catch (ExecutionException e) { 
        throw new RuntimeException(e); 
    } 
    return best; 
#p#分页标题#e#} 
  
/**
* Shortest distance task implementation using Callable. 
*/
public class DistanceTask implements Callable<DistancePair> 
#p#分页标题#e#{ 
    private final String targetText; 
    private final int startOffset; 
    private final int compareCount; 
#p#分页标题#e#  
    public DistanceTask(String target, int offset, int count) { 
        targetText = target; 
        startOffset = offset; 
        compareCount = count; 
    } 
#p#分页标题#e#  
    private int editDistance(String word, int[] v0, int[] v1) { 
        ... 
    } 
  
    /* (non-Javadoc) #p#分页标题#e#
     * @see java.util.concurrent.Callable#call() 
     */
    @Override
    public DistancePair call() throws Exception { 
  
#p#分页标题#e#        // directly compare distances for comparison words in range 
        int[] v0 = new int[targetText.length() + 1]; 
        int[] v1 = new int[targetText.length() + 1]; 
#p#分页标题#e#        int bestIndex = -1; 
        int bestDistance = Integer.MAX_VALUE; 
        boolean single = false; 
        for (int i = 0; i < compareCount; i++) { 
#p#分页标题#e#
            int distance = editDistance(knownWords[i + startOffset], v0, v1); 
            if (bestDistance > distance) { 
                bestDistance = distance; 
                bestIndex = i + startOffset; 
                single = true; 
#p#分页标题#e#            } else if (bestDistance == distance) { 
                single = false; 
            } 
        } 
        return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : 
#p#分页标题#e#                new DistancePair(bestDistance); 
    } 
}
清单 2 中的 bestMatch() 方法构造一个 DistanceTask 距离列表,然后将该列表传递给 ExecutorService。这种对 ExecutorService 的调用形式将会接受一个 Collection<? extends Callable<T>> 类型的参数,该参数表示要执行的任务。该调用返回一个 Future<T> 列表,用它来表示执行的结果。ExecutorService 使用在每个任务上调用 call() 方法所返回的值,异步填写这些结果。在本例中,T 类型为DistancePair— 一个表示距离和匹配的单词的简单的值对象,或者在没有找到惟一匹配值时近表示距离。

bestMatch() 方法中执行的原始线程依次等待每个 Future 完成,累积最佳的结果并在完成时返回它。通过多个线程来处理 DistanceTask 的执行,原始线程只需等待一小部分结果。剩余结果可与原始线程等待的结果并发地完成。

并发性性能
要充分利用系统上可用的处理器数量,必须为 ExecutorService 配置至少与处理器一样多的线程。您还必须将至少与处理器一样多的任务传递给#p#分页标题#e#ExecutorService 来执行。实际上,您或许希望拥有比处理器多得多的任务,以实现最佳的性能。这样,处理器就会繁忙地处理一个接一个的任务,近在最后才空闲下来。但是因为涉及到开销(在创建任务和 future 的过程中,在任务之间切换线程的过程中,以及最终返回任务的结果时),您必须保持任务足够大,以便开销是按比例减小的。

图 1 展示了我在使用 Oracle 的 Java 7 for 64-bit Linux® 的四核 AMD 系统上运行测试代码时测量的不同任务数量的性能。每个输入单词依次与 12,564 个已知单词相比较,每个任务在一定范围的已知单词中找到最佳的匹配值。全部 933 个拼写错误的输入单词会重复运行,每轮运行之间会暂停片刻供 JVM 处理,该图中使用了 10 轮运行后的最佳时间。从图 1 中可以看出,每秒的输入单词性能在合理的块大小范围内(基本来讲,从 256 到大于 1,024)看起来是合理的,只有在任务变得非常小或非常大时,性能才会极速下降。对于块大小 16,384,最后的值近创建了一个任务,所以显示了单线程性能。

图 1. ThreadPoolDistance 性能
perform1

Fork-Join
Java 7 引入了 ExecutorService 的另一种实现:ForkJoinPool 类。ForkJoinPool 是为高效处理可反复分解为子任务的任务而设计的,它使用 RecursiveAction 类(在任务未生成结果时)或 RecursiveTask<T> 类(在任务具有一个 T 类型的结果时)来处理任务。RecursiveTask<T> 提供了一种合并子任务结果的便捷方式,如清单 3 所示。

清单 3. RecursiveTask<DistancePair> 示例
#p#分页标题#e#private ForkJoinPool threadPool = new ForkJoinPool(); 
  
private final String[] knownWords; 
  
private final int blockSize; 
#p#分页标题#e#  
public ForkJoinDistance(String[] words, int block) { 
    knownWords = words; 
    blockSize = block; 

  
public DistancePair bestMatch(String target) {  #p#分页标题#e#
    return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords)); 

  
/**
* Shortest distance task implementation using RecursiveTask. 
#p#分页标题#e# */
public class DistanceTask extends RecursiveTask<DistancePair> 

    private final String compareText; 
    private final#p#分页标题#e# int startOffset; 
    private final int compareCount; 
    private final String[] matchWords; 
  
    public#p#分页标题#e# DistanceTask(String from, int offset, int count, String[] words) { 
        compareText = from; 
        startOffset = offset; 
        compareCount = count; 
        matchWords = words; 
    } 
#p#分页标题#e#  
    private int editDistance(int index, int[] v0, int[] v1) { 
        ... 
    } 
  
#p#分页标题#e#    /* (non-Javadoc)
     * @see java.util.concurrent.RecursiveTask#compute() 
     */
    @Override
    protected DistancePair compute() { 
        if#p#分页标题#e# (compareCount > blockSize) { 
  
            // split range in half and find best result from bests in each half of range 
            int half = compareCount / 2; 
            DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords); 
#p#分页标题#e#            t1.fork(); 
            DistanceTask t2 = new DistanceTask(compareText, startOffset + half, 
                compareCount - half, matchWords); 
            DistancePair p2 = t2.compute(); 
            return DistancePair.best(p2, t1.join()); 
        } 
#p#分页标题#e#  
        // directly compare distances for comparison words in range 
        int[] v0 = new int[compareText.length() + 1]; 
        int[] v1 = new int[compareText.length() + 1#p#分页标题#e#]; 
        int bestIndex = -1; 
        int bestDistance = Integer.MAX_VALUE; 
        boolean single = false; 
        for (int#p#分页标题#e# i = 0; i < compareCount; i++) { 
            int distance = editDistance(i + startOffset, v0, v1); 
            if (bestDistance > distance) { 
                bestDistance = distance; 
                bestIndex = i + startOffset; 
                single = #p#分页标题#e#true; 
            } else if (bestDistance == distance) { 
                single = false; 
            } 
        } 
        return#p#分页标题#e# single ? new DistancePair(bestDistance, knownWords[bestIndex]) : 
            new DistancePair(bestDistance); 
    } 
}
图 2 显示了清单 3 中的 ForkJoin 代码与 清单 2 中的 ThreadPool 代码的性能对比。ForkJoin 代码在所有块大小中稳定得多,仅在您只有单个块(意味着执行是单线程的)时性能会显著下降。标准的 ThreadPool 代码仅在块大小为 256 和 1,024 时会表现出更好的性能。

图 2. ThreadPoolDistance 与 ForkJoinDistance 的性能对比
perform2

这些结果表明,如果可调节应用程序中的任务大小来实现最佳的性能,那么使用标准 #p#分页标题#e#ThreadPool 比 ForkJoin 更好。但请注意,ThreadPool的 “最佳性能点” 取决于具体任务、可用处理器数量以及您系统的其他因素。一般而言,ForkJoin 以最小的调优需求带来了优秀的性能,所以最好尽可能地使用它。




Scala 并发性基础
Scala 通过许多方式扩展了 Java 编程语言和运行时,其中包括添加更多、更轻松的处理并发性的方式。对于初学者而言,Future<T> 的 Scala 版本比 Java 版本灵活得多。您可以直接从代码块中创建 future,可向 future 附加回调来处理这些 future 的完成。清单 4 显示了 Scala future 的一些使用示例。该代码首先定义了 futureInt() 方法,以便按需提供 Future<Int>,然后通过三种不同的方式来使用 future。

清单 4. Scala Future<T> 示例代码
import ExecutionContext.Implicits.global 
  
#p#分页标题#e#val lastInteger = new AtomicInteger 
def futureInt() = future { 
  Thread sleep 2000
  lastInteger incrementAndGet 

  
// use callbacks for completion of futures  #p#分页标题#e#
val a1 = futureInt 
val a2 = futureInt 
a1.onSuccess { 
    case i1 => { 
      a2.onSuccess { 
        case i2 => println("Sum of values is "#p#分页标题#e# + (i1 + i2)) 
      } 
    } 

Thread sleep 3000
  
// use for construct to extract values when futures complete 
#p#分页标题#e#val b1 = futureInt 
val b2 = futureInt 
for (i1 <- b1; i2 <- b2) yield println("Sum of values is " + (i1 + i2)) 
Thread sleep 3000
  
// wait directly for completion of futures 
#p#分页标题#e#val c1 = futureInt 
val c2 = futureInt 
println("Sum of values is " + (Await.result(c1, Duration.Inf) + 
  Await.result(c2, Duration.Inf)))
清单 4 中的第一个示例将回调闭包附加到一对 future 上,以便在两个 future 都完成时,将两个结果值的和打印到控制台上。回调是按照创建它们的顺序直接嵌套在 future 上,但是,即使更改顺序,它们也同样有效。如果在您附加回调时 future 已完成,该回调仍会运行,但无法保证它会立即运行。原始执行线程会在 Thread sleep 3000 行上暂停,以便在进入下一个示例之前完成 future。

第二个示例演示了使用 Scala for comprehension 从 future 中异步提取值,然后直接在表达式中使用它们。for comprehension 是一种 Scala 结构,可用于简洁地表达复杂的操作组合(map、filter、flatMap 和 foreach)。它一般与各种形式的集合结合使用,但 Scala future 实现了相同的单值方法来访问集合值。所以可以使用 future 作为一种特殊的集合,一种包含最多一个值(可能甚至在未来某个时刻之前之后才包含该值)的集合。在这种情况下,for 语句要求获取 future 的结果,并在表达式中使用这些结果值。在幕后,这种技术会生成与第一个示例完全相同的代码,但以线性代码的形式编写它会得到更容易理解的更简单的表达式。和第一个示例一样,原始执行线程会暂停,以便在进入下一个示例之前完成 future。#p#分页标题#e#

第三个示例使用阻塞等待来获取 future 的结果。这与 Java future 的工作原理相同,但在 Scala 中,一个获取最大等待时间参数的特殊Await.result() 方法调用会让阻塞等待变得更为明显。

清单 4 中的代码没有显式地将 future 传递给 ExecutorService 或等效的对象,所以如果没有使用过 Scala,那么您可能想知道 future 内部的代码是如何执行的。答案取决于 清单 4 中最上面一行:import ExecutionContext.Implicits.global。Scala API 常常为代码块中频繁重用的参数使用 implicit 值。future { } 结构要求 ExecutionContext 以隐式参数的形式提供。这个 ExecutionContext 是 JavaExecutorService 的一个 Scala 包装器,以相同方式用于使用一个或多个托管线程来执行任务。

除了 future 的这些基本操作之外,Scala 还提供了一种方式将任何集合转换为使用并行编程的集合。将集合转换为并行格式后,您在集合上执行的任何标准的 Scala 集合操作(比如 map、filter 或 fold)都会自动地尽可能并行完成。(本文稍后会在 清单 7 中提供一个相关示例,该示例使用 Scala 查找一个单词的最佳匹配值。)

错误处理
Java 和 Scala 中的 future 都必须解决错误处理的问题。在 Java 中,截至 Java 7,future 可抛出一个 ExecutionException 作为返回结果的替代方案。应用程序可针对具体的失败类型而定义自己的 ExecutionException 子类,或者可连锁异常来传递详细信息,但这限制了灵活性。

Scala future 提供了更灵活的错误处理。您可以通过两种方式完成 Scala future:成功时提供一个结果值(假设要求一个结果值),或者在失败时提供一个关联的 Throwable。您也可以采用多种方式处理 future 的完成。在 清单 4 中,onSuccess 方法用于附加回调来处理 future 的成功完成。您还可以使用 onComplete 来处理任何形式的完成(它将结果或 throwable 包装在一个 #p#分页标题#e#Try 中来适应两种情况),或者使用 onFailure 来专门处理错误结果。Scala future 的这种灵活性扩展到了您可以使用 future 执行的所有操作,所以您可以将错误处理直接集成到代码中。

这个 Scala Future<T> 还有一个紧密相关的 Promise<T> 类。future 是一个结果的持有者,该结果在某个时刻可能可用(或不可用 — 无法内在地确保一个 future 将完成)。future 完成后,结果是固定的,不会发生改变。promise 是这个相同契约的另一端:结果的一个一次性、可分配的持有者,具有结果值或 throwable 的形式。可从 promise 获取 future,在 promise 上设置了结果后,就可以在该 future 上设置此结果。

应用 Scala 并发性
现在您已熟悉一些基本的 Scala 并发性概念,是时候来了解一下解决 Levenshtein 距离问题的代码了。清单 5 显示了 Levenshtein 距离计算的一个比较符合语言习惯的 Scala 实现,该代码基本上与 清单 1 中的 Java 代码类似,但采用了函数风格。

清单 5. Scala 中的 Levenshtein 距离计算
val limit = targetText.length 
/** Calculate edit distance from targetText to known word.
#p#分页标题#e#  * 
  * @param word known word 
  * @param v0 int array of length targetText.length + 1 
  * @param v1 int array of length targetText.length + 1 
  * @return distance 
  */
#p#分页标题#e#def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { 
  
  val length = word.length 
  
  @tailrec
  def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = { 
    if (rnum >= length) r0(limit)  #p#分页标题#e#
    else { 
  
      // first element of r1 = delete (i+1) chars from target to match empty 'word' 
      r1(0) = rnum + 1
  
#p#分页标题#e#      // use formula to fill in the rest of the row 
      for (j <- 0 until limit) { 
        val cost = if (word(rnum) == targetText(j)) 0 else 1
        r1(j + 1) = min(r1(j) + 1#p#分页标题#e#, r0(j + 1) + 1, r0(j) + cost); 
      } 
  
      // recurse with arrays swapped for next row 
      distanceByRow(rnum + 1, r1, r0) 
    } 
#p#分页标题#e#  } 
  
  // initialize v0 (prior row of distances) as edit distance for empty 'word' 
  for (i <- 0 to limit) v0(i) = i 
  
  // recursively process rows matching characters in word being compared to find best  #p#分页标题#e#
  distanceByRow(0, v0, v1) 
}
清单 5 中的代码对每个行值计算使用了尾部递归 distanceByRow() 方法。此方法首先检查计算了多少行,如果该数字与检查的单词中的字符数匹配,则返回结果距离。否则会计算新的行值,然后递归地调用自身来计算下一行(将两个行数组包装在该进程中,以便正确地传递新的最新的行值)。Scala 将尾部递归方法转换为与 Java while 循环等效的代码,所以保留了与 Java 代码的相似性。

但是,此代码与 Java 代码之间有一个重大区别。清单 5 中的 for comprehension 使用了闭包。闭包并不总是得到了当前 JVM 的高效处理(参阅Why is using for/foreach on a Range slow?,了解有关的详细信息),所以它们在该计算的最里层循环上增加了大量开销。如上所述,清单 5 中的代码的运行速度没有 Java 版本那么快。清单 6 重写了代码,将 for comprehension 替换为添加的尾部递归方法。这个版本要详细得多,但执行效率与 Java 版本相当。

清单 6. 为提升性能而重新构造的计算代码
#p#分页标题#e#val limit = targetText.length 
  
/** Calculate edit distance from targetText to known word.
  * 
  * @param word known word 
  * @param v0 int array of length targetText.length + 1 
  * @param v1 int array of length targetText.length + 1 #p#分页标题#e#
  * @return distance 
  */
def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { 
  
  val length = word.length 
  
#p#分页标题#e#  @tailrec
  def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = { 
    if (row >= length) r0(limit) 
    else { 
  
      // first element of v1 = delete (i+1) chars from target to match empty 'word'  #p#分页标题#e#
      r1(0) = row + 1
  
      // use formula recursively to fill in the rest of the row 
      @tailrec
      def distanceByColumn(col: Int): Unit = { 
#p#分页标题#e#        if (col < limit) { 
          val cost = if (word(row) == targetText(col)) 0 else 1
          r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost) 
#p#分页标题#e#          distanceByColumn(col + 1) 
        } 
      } 
      distanceByColumn(0) 
  
      // recurse with arrays swapped for next row 
#p#分页标题#e#      distanceByRow(row + 1, r1, r0) 
    } 
  } 
  
  // initialize v0 (prior row of distances) as edit distance for empty 'word' 
  @tailrec
#p#分页标题#e#  def initArray(index: Int): Unit = { 
    if (index <= limit) { 
      v0(index) = index 
      initArray(index + 1) 
    } 
  } 
  initArray(0#p#分页标题#e#) 
  
  // recursively process rows matching characters in word being compared to find best 
  distanceByRow(0, v0, v1) 
}
清单 7 给出的 Scala 代码执行了与 清单 2 中的 Java 代码相同的阻塞的距离计算。bestMatch() 方法找到由 Matcher 类实例处理的特定单词块中与目标文本最匹配的单词,使用尾部递归 best() 方法来扫描单词。*Distance 类创建多个 Matcher 实例,每个对应一个单词块,然后协调匹配结果的执行和组合。

清单 7. Scala 中使用多个线程的一次阻塞距离计算
#p#分页标题#e#
class Matcher(words: Array[String]) { 
  
  def bestMatch(targetText: String) = { 
  
    val limit = targetText.length 
    val v0 = new#p#分页标题#e# Array[Int](limit + 1) 
    val v1 = new Array[Int](limit + 1) 
  
    def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { 
      ... 
    } 
#p#分页标题#e#  
    @tailrec
    /** Scan all known words in range to find best match.
      *   
      * @param index next word index 
      * @param bestDist minimum distance found so far 
#p#分页标题#e#      * @param bestMatch unique word at minimum distance, or None if not unique 
      * @return best match 
      */
    def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair = 
      if (index < words.length) { 
        val newDist = editDistance(words(index), v0, v1)  #p#分页标题#e#
        val next = index + 1
        if (newDist < bestDist) best(next, newDist, Some(words(index))) 
        else if (newDist == bestDist) best(next, bestDist, None) 
        else best(next, bestDist, bestMatch) 
#p#分页标题#e#      } else DistancePair(bestDist, bestMatch) 
  
    best(0, Int.MaxValue, None) 
  } 

  
class ParallelCollectionDistance(words: Array[String], size: Int) #p#分页标题#e#extends TimingTestBase { 
  
  val matchers = words.grouped(size).map(l => new Matcher(l)).toList 
  
  def shutdown = {} 
  
  def blockSize = size 
#p#分页标题#e#  
  /** Find best result across all matchers, using parallel collection. */
  def bestMatch(target: String) = { 
    matchers.par.map(m => m.bestMatch(target)). 
      foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m)) 
  } 

#p#分页标题#e#  
class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase { 
  
  val matchers = words.grouped(size).map(l => new Matcher(l)).toList 
  
  def shutdown = {} 
#p#分页标题#e#  
  def blockSize = size 
  
  /** Find best result across all matchers, using direct blocking waits. */
  def bestMatch(target: String) = { 
    import ExecutionContext.Implicits.global 
    val futures = matchers.map(m => future { m.bestMatch(target) })  #p#分页标题#e#
    futures.foldLeft(DistancePair.worstMatch)((a, v) => 
      DistancePair.best(a, Await.result(v, Duration.Inf))) 
  } 
}
清单 7 中的两个 *Distance 类显示了协调 Matcher 结果的执行和组合的不同方式。ParallelCollectionDistance 使用前面提到的 Scala 的并行集合 feature 来隐藏并行计算的细节,只需一个简单的 foldLeft 就可以组合结果。

DirectBlockingDistance 更加明确,它创建了一组 future,然后在该列表上为每个结果使用一个 foldLeft 和嵌套的阻塞等待。

性能再分析
清单 7 中的两个 *Distance 实现都是处理 Matcher 结果的合理方法。(它们不仅合理,而且非常高效。示例代码#p#分页标题#e# 包含我在试验中尝试的其他两种实现,但未包含在本文中。)在这种情况下,性能是一个主要问题,所以图 3 显示了这些实现相对于 Java ForkJoin 代码的性能。

图 3. ForkJoinDistance 与 Scala 替代方案的性能对比
perform3

图 3 显示,Java ForkJoin 代码的性能比每种 Scala 实现都更好,但 DirectBlockingDistance 在 1,024 的块大小下提供了更好的性能。两种 Scala 实现在大部分块大小下,都提供了比 清单 1 中的 ThreadPool 代码更好的性能。

这些性能结果仅是演示结果,不具权威性。如果您在自己的系统上运行计时测试,可能会看到不同的性能,尤其在使用不同数量的核心的时候。如果希望为距离任务获得最佳的性能,那么可以实现一些优化:可以按照长度对已知单词进行排序,首先与长度和输入相同的单词进行比较(因为编辑距离总是不低于与单词长度之差)。或者我可以在距离计算超出之前的最佳值时,提前退出计算。但作为一个相对简单的算法,此试验公平地展示了两种并发操作是如何提高性能的,以及不同的工作共享方法的影响。

在性能方面,清单 7 中的 Scale 控制代码与 清单 2 和 清单 3 中的 Java 代码的对比结果很有趣。Scala 代码短得多,而且(假设您熟悉 Scala!)比 Java 代码更清晰。Scala 和 Java 可很好的相互操作,您可以在本文的 完整示例代码 中看到:Scala 代码对 Scala 和 Java 代码都运行了计时测试,Java 代码进而直接处理 Scala 代码的各部分。得益于这种轻松的互操作性,您可以将 Scala 引入现有的 Java 代码库中,无需进行通盘修改。最初使用 Scala 为 Java 代码实现高水平控制常常很有用,这样您就可以充分利用 Scala 强大的表达特性,同时没有闭包或转换的任何重大性能影响。

清单 7 中的 ParallelCollectionDistance Scala 代码的简单性非常具有吸引力。使用此方法,您可以从代码中完全抽象出并发性,从而编写类似单线程应用程序的代码,同时仍然获得多个处理器的优势。幸运的是,对于喜欢此方法的简单性但又不愿意或无法执行 Scala 开发的人而言,Java 8 带来了一种执行直接的 Java 编程的类似特性。#p#分页标题#e#


结束语
现在您已经了解了 Java 和 Scala 并发性操作的基础知识,本系列下一篇文章将介绍 Java 8 如何改进对 Java 的并发性支持(以及从长远来讲,可能对 Scala 的并发性支持)。Java 8 的许多改动您看起来可能都很熟悉(Scala 并发性特性中使用的许多相同的概念都包含在 Java 8 中),所以您很快就能够在普通的 Java 代码中使用一些 Scala 技术。请阅读下一期文章,了解应该如何做。
分享到:
评论

相关推荐

    SCALA程序设计-JAVA虚拟机多核编程实战

    1. Scala语言特性:Scala是一种针对Java虚拟机(JVM)的编程语言,它将面向对象和函数式编程的特性无缝融合,旨在提供一种简洁、优雅且类型安全的方式来表达常见的编程模式。其目标是提高Java程序员的编程效率。 2....

    Scala程序设计:Java虚拟机多核编程实战1

    《Scala程序设计:Java虚拟机多核编程实战》是一本专为程序员和有经验的Java开发者设计的书籍,旨在引导读者掌握Scala语言,利用其功能强大的特性进行并发编程。Scala是一种融合了函数式和面向对象编程特点的静态...

    rock-the-jvm-scala-beginners:Scala基础

    1. **基础语法**:Scala的基础语法与Java有很多相似之处,比如类、对象和包的定义。但Scala也引入了一些新的概念,例如case类,它们简化了数据结构的创建,常用于模式匹配。另外,Scala的函数是一等公民,可以直接...

    面向Java开发人员的Scala指南

    Scala结合了面向对象和函数式编程的特点,提供了更高的代码抽象能力和性能优化的可能性,是很多企业级应用和大数据处理框架(如Apache Spark)的基础。 首先,我们需要理解Scala与Java之间的相似性和差异性。Scala...

    为Java虚拟机编译Scala(Michel Schinz)Compiling Scala for the Java Virtual Machine (Michel Schinz)

    - **关于并发的注释**:在涉及并发时,Scala编译器必须处理好与JVM并发模型的兼容性。 - **类型表示**:包括对各种类型(比如Java类类型、Scala类类型、数组类型、单例类型、复合类型、值类型、特殊类型)的表示和...

    王治:用Scala构建19楼社区

    - 兼容Java:Scala语言运行在Java虚拟机(JVM)上,能够无缝地调用Java库和框架,这对于已有Java背景的开发团队来说是个极大的优势。 - 表达力更强的语法:Scala提供了更多的语法糖和函数式编程特性,使得代码更加...

    IBM:面向Java开发人员的Scala指南-p178.7z

    由于Scala运行在Java虚拟机(JVM)上,因此可以直接调用Java库,而无需任何桥接代码。这使得Java开发者能够逐步迁移到Scala,而无需彻底抛弃现有的Java代码库。 在面向对象编程方面,Scala引入了特质(Traits),这...

    Scala 基础.zip

    6. ** Actors 和 Concurrency**:Scala的Actors模型提供了一种安全的并发处理方式,它通过消息传递来管理共享状态,降低了线程同步的复杂性。 7. **集合库**:Scala的集合库非常强大,提供了各种高效的集合实现,如...

    scala sdk scala-2.12.3

    1. **编译器**:Scala编译器将Scala源代码转换为Java字节码,使得程序可以在JVM上运行。2.12.3版本的编译器支持最新的语言特性,并对错误报告和编译速度进行了改进。 2. **标准库**:Scala的标准库提供了大量的类和...

    Jitawawa:使用JavaScala玩得开心

    在当今的编程世界中,Java和Scala是两种备受瞩目的编程语言。它们各自拥有独特的特性和优势,为开发者提供了丰富的工具来构建高性能、可扩展的应用程序。Jitawawa项目正是这样一个结合了Java和Scala乐趣的实践平台,...

    scala2.12.1Windows镜像包

    - **JVM兼容性**:Scala 2.12.x主要针对Java 8及更高版本进行优化,充分利用了JVM的新特性,如Lambda表达式和Stream API。 - **互操作性**:Scala可以直接调用Java库,反之亦然,这得益于良好的Java互操作性。 - ...

    无涯教程(LearnFk)-Scala教程完整离线版.pdf

    4. 兼容 Java:Scala 是为 Java 虚拟机(JVM)设计的,因此 Scala 代码可以无缝地与现有的 Java 类库和框架集成。Scala 可以调用 Java 代码,并且 Java 代码也可以调用 Scala 代码。这使得 Scala 在 Java 生态系统中...

    快学Scala & Scala完整版 & 深入理解JVM 合集(带目录)

    虽然这本书主要针对Java,但其许多原则和规范也适用于Scala,比如良好的编程习惯、代码可读性和可维护性等。 通过学习这四本书,开发者可以建立起对Scala语言的深入理解,掌握其在JVM上的运行机制,并遵循业界最佳...

    尚硅谷_韩顺平_Scala语言核心编程_PDF密码解除1

    - **IntelliJ IDEA**:是一款强大的Java和Scala IDE,通过安装Scala插件,可以提供丰富的代码提示、调试和重构功能。 - **Scala插件安装**:在IDEA中,可以通过内置的插件市场搜索并安装Scala插件,以支持Scala的...

    scala-2.13.8.zip

    例如,Akka是一个用Scala编写的库,提供了actor模型来实现并发和分布式计算,它是许多企业级应用的基础。 压缩包内的"scala-2.13.8"文件夹可能包含以下内容: 1. `bin`目录:包含了Scala命令行工具,如`scala`解释...

    图灵书籍(Scala程序设计(第2版).pdf+Scala程序设计-JAVA虚拟机多核编程实战.pdf)

    标题提及了两本书籍资源——"Scala程序设计(第2版).pdf"和"Scala程序设计-JAVA虚拟机多核编程实战.pdf",这表明主题聚焦在Scala编程语言以及其与Java虚拟机(JVM)多核编程的结合应用上。描述简单明了,确认了这两...

    scala编程入门教材

    11. **Scala与Java互操作**:由于Scala运行在JVM上,它可以无缝地与Java代码集成,调用Java库并被Java代码调用。 12. **Scaladoc**:Scala提供了Scaladoc工具,用于生成API文档,类似于Java的Javadoc。 通过阅读...

    scala-2.12.10.tgz

    6. Scala与Java互操作性:Scala可以在Java项目中无缝集成,因为它是JVM上的语言。可以调用Java库,反之亦然,这使得迁移现有Java项目到Scala变得容易。 7. 函数式编程:Scala支持高阶函数、闭包和不可变数据结构,...

Global site tag (gtag.js) - Google Analytics