`
wbj0110
  • 浏览: 1603103 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Scala并发

阅读更多

 

Runnable/Callable

Runnable只有一个没有返回值的方法

1
2
3
trait Runnable {
  def run(): Unit
}

Callable的方法和run类似,只不过它有一个返回值

1
2
3
trait Callable[V] {
  def call(): V
}

 

线程

Scala的并发是建立在Java的并发模型上的。

在Sun的JVM上,对于一个IO密集型的任务,我们可以在单机上运行成千上万的线程。

Thread是通过Runnable构造的。要运行一个Runnable的run方法,你需要调用对应线程的start方法。

1
2
3
4
5
6
7
8
9
scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]
 
scala> hello.start
hello world

当你看见一个实现Runnable的类,你应该明白它会被放到一个线程里去执行的。

 

一段单线程的代码

下面是一段代码片段,它可以运行,但是会有问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
 
  def run() {
    while (true) {
      // 这里会阻塞直到有连接进来
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}
 
class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes
 
  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}
 
(new NetworkService(2020, 2)).run

每个请求都会把当前线程的名称main作为响应。

这段代码最大的问题在于一次只能够响应一个请求!

你可以对每个请求都单独用一个线程来响应。只需要把

1
(new Handler(socket)).run()

改成

1
(new Thread(new Handler(socket))).start()

但是如果你想要复用线程或者对于线程的行为要做一些其他的控制呢?

 

Executors

随着Java 5的发布,对于线程的管理需要一个更加抽象的接口。

你可以通过Executors对象的静态方法来取得一个ExecutorService对象。这些方法可以让你使用各种不同的策略来配置一个ExecutorService,例如线程池。

下面是我们之前的阻塞式网络服务器,现在改写成可以支持并发请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
 
class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
 
  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}
 
class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes
 
  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}
 
(new NetworkService(2020, 2)).run

从下面的示例中,我们可以大致了解内部的线程是怎么进行复用的。

1
2
3
4
5
6
7
8
9
10
11
$ nc localhost 2020
pool-1-thread-1
 
$ nc localhost 2020
pool-1-thread-2
 
$ nc localhost 2020
pool-1-thread-1
 
$ nc localhost 2020
pool-1-thread-2

 

Futures

一个Future代表一次异步计算的操作。你可以把你的操作包装在一个Future里,当你需要结果的时候,你只需要简单调用一个阻塞的get()方法就好了。一个Executor返回一个Future。如果你使用Finagle RPC的话,你可以使用Future的实例来保存还没有到达的结果。

FutureTask是一个可运行的任务,并且被设计成由Executor进行运行。

1
2
3
4
5
val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

现在我需要结果,那就只能阻塞到直到结果返回。

1
val blockingResult = future.get()

参考 Scala School中关于Finagle的章节有大量使用Future的示例,也有一些组合使用的例子。Effective Scala中也有关于Futures的内容。

 

线程安全问题

1
2
3
4
5
class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

这个程序在多线程的环境下是不安全的。如果两个线程都有同一个Person示例的引用,并且都调用set方法,你没法预料在两个调用都结束的时候name会是什么。

在Java的内存模型里,每个处理器都允许在它的L1或者L2 cache里缓存变量,所以两个在不同处理器上运行的线程对于相同的数据有种不同的视图。

下面我们来讨论一下可以强制线程的数据视图保持一致的工具。

 

三个工具

同步

互斥量(Mutex)提供了锁定资源的语法。当你进入一个互斥量的时候,你会获得它。在JVM里使用互斥量最常用的方式就是在一个对象上进行同步访问。在这里,我们会在Person上进行同步访问。

在JVM里,你可以对任何非null的对象进行同步访问。

1
2
3
4
5
6
7
class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

volatile

随着Java 5对于内存模型的改变,volatile和synchronized的作用基本相同,除了一点,volatile也可以用在null上。

synchronized提供了更加细粒度的加锁控制。而volatile直接是对每次访问进行控制。

1
2
3
4
5
class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomaticReference

同样的,在Java 5中新增了一系列底层的并发原语。AtomicReference类就是其中一个。

1
2
3
4
5
6
7
import java.util.concurrent.atomic.AtomicReference
 
class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

 

它们都有额外的消耗吗?

AutomicReference是这两种方式中最耗性能的,因为如果你要取得对应的值,则需要经过方法分派(method dispatch)的过程。

volatilesynchronized都是通过Java内置的monitor来实现的。在没有竞争的情况下,monitor对性能的影响非常小。由于synchronized允许你对代码进行更加细粒度的加锁控制,这样就可以减小加锁区,进而减小竞争,因此synchronized应该是最佳的选择。

当你进入同步块,访问volatile引用,或者引用AtomicReference,Java会强制要求处理器刷新它们的缓存流水线,从而保证数据的一致性。

如果我这里说错了,请指正出来。这是一个很复杂的主题,对于这个主题肯定需要花费大量的时间来进行讨论。

 

其他来自Java 5的优秀工具

之前提到了AtomicReference,除了它之外,Java 5还提供了很多其他有用的工具。

CountDownLatch

CountDownLatch是供多个进程进行通信的一个简单机制。

1
2
3
4
5
6
val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)
 
doneSignal.await()
println("both workers finished!")

除此之外,它对于单元测试也是很有用的。假设你在做一些异步的工作,并且你想要保证所有的功能都完成了。你只需要让你的函数都对latch进行countDown操作,然后在你的测试代码里进行await

AtomicInteger/Long

由于对于Int和Long的自增操作比较常见,所以就增加了AtomicIntegerAtomicLong

AtomicBoolean

我想我没有必要来解释这个的作用了。

读写锁(ReadWriteLock)

ReadWriteLock可以实现读写锁,读操作只会在写者加锁的时候进行阻塞。

 

我们来构建一个非线程安全的搜索引擎

这是一个简单的非线程安全的倒排索引。我们这个反向排索引把名字的一部分映射到指定的用户。

下面是原生的假设只有单线程访问的写法。

注意这里的使用mutable.HashMap的另一个构造函数this()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import scala.collection.mutable
 
case class User(name: String, id: Int)
 
class InvertedIndex(val userMap: mutable.Map[String, User]) {
 
  def this() = this(new mutable.HashMap[String, User])
 
  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }
 
  def add(term: String, user: User) {
    userMap += term -> user
  }
 
  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

我把具体怎么根据索引获取用户的方法暂时省略掉了,我们后面会来进行补充。

 

我们来让它变得安全

在上面的倒排索引的示例里,userMap是没法保证线程安全的。多个客户端可以同时尝试去添加元素,这样会产生和之前Person示例里相似的问题。

因为userMap本身不是线程安全的,那么我们怎么能够保证每次只有一个线程对它进行修改呢?

你需要在添加元素的时候给userMap加锁。

1
2
3
4
5
6
7
def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

不幸的是,上面的做法有点太粗糙了。能在互斥量(mutex)外面做的工作尽量都放在外面做。记住我之前说过,如果没有竞争的话,加锁的代价是非常小的。如果你在临界区尽量少做操作,那么竞争就会非常少。

1
2
3
4
5
6
7
8
9
10
11
def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  // tokenizeName 这个操作是最耗时的。
  val tokens = tokenizeName(user.name)
 
  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

 

SynchronizedMap

我们可以通过使用SynchronizedMap trait来使得一个可变的(mutable)HashMap具有同步机制。

我们可以扩展之前的InvertedIndex,给用户提供一种构建同步索引的简单方法。

1
2
3
4
5
import scala.collection.mutable.SynchronizedMap
 
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你去看具体的实现的话,你会发现SynchronizedMap只是在每个方法上都加上了同步访问,因此它的安全是以牺牲性能为代价的。

 

Java ConcurrentHashMap

Java里有一个很不错的线程安全的ConcurrentHashMap。幸运的是,JavaConverter可以使得我们通过Scala的语法来使用它。

实际上,我们可以无缝地把我们新的,线程安全的InvertedIndex作为老的非线程安全的一个扩展。

1
2
3
4
5
6
7
8
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
 
class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {
 
  def this() = this(new ConcurrentHashMap[String, User] asScala)
}

 

现在来加载我们的InvertedIndex

最原始的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}
 
class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

对于文件里的每一行字符串,我们通过调用makeUser来生成一个User,然后通过add添加到InvertedIndex里。如果我们并发访问一个InvertedIndex,我们可以并行调用add方法,因为makeUser方法没有副作用,它本身就是线程安全的。

我们不能并行读取一个文件,但是我们可以并行构造User,并且并行将它添加到索引里。

 

解决方案:生产者/消费者

实现非同步计算的,通常采用的方法就是将生产者同消费者分开,并让它们通过队列(queue)来进行通信。让我们用下面的例子来说明我们是怎么实现搜索引擎的索引的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 
// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}
 
// 抽象的消费者
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
  def run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }
 
  def consume(x: T)
}
 
val queue = new LinkedBlockingQueue[String]()
 
//一个生产者线程
 
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()
 
trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}
 
class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}
 
// 假设我们的机器有8个核
 
val cores = 8
val pool = Executors.newFixedThreadPool(cores)
 
// 每个核设置一个消费者
 
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}

http://www.importnew.com/4750.html

分享到:
评论

相关推荐

    Scala并发编程程.rar

    Scala并发编程是高级函数式编程语言Scala中的一个重要领域,它涉及到如何在多核心处理器系统上高效、安全地执行任务。Scala提供了丰富的并发工具和模型,使得开发者能够在编写高性能、可扩展的应用程序时,同时保持...

    scala并发编程开发教程

    Scala并发编程是构建高效、可扩展的分布式系统的关键技术,特别是在大数据处理框架如Spark中。Akka是一个用Scala编写的库,它提供了基于Actor模型的并发解决方案,这使得Akka在构建高性能、高可靠的分布式应用中扮演...

    读书笔记:scala并发编程.zip

    读书笔记:scala并发编程

    Java 和 Scala 并发性基础.doc

    Java 和 Scala 并发性基础

    Learning Concurrent Programming in Scala

    ### Scala并发编程学习指南 #### 一、并发编程的重要性与挑战 随着计算机硬件技术的发展,多核处理器已经成为标准配置,这使得并发编程成为现代软件开发不可或缺的一部分。并发编程旨在利用多核处理器的能力来提高...

    Akka Scala 学习高清原版pdf

    Akka是一个用Scala和Java编写的库,用于构建并发、分布式以及容错的事件驱动应用。Akka框架借鉴了Erlang的并发模型,但它是建立在JVM之上,并且提供了丰富的抽象和工具,能够简化开发工作。 标题“Akka Scala 学习...

    Scala.pdf中文高清第二版

    Scala是一种强大的多范式编程语言,它融合了面向对象和函数式编程的特性,使得它在处理复杂数据和并发问题时表现出色。Scala的名字来源于"Scalable Language"的缩写,表明它能够平滑地扩展从小型脚本到大型应用。 ...

    scala编程 第二版

    经典scala教程 This book is a tutorial for the Scala programming language, written by people directly involved in the development of Scala. Our goal is that by reading this book, you can learn ...

    SCALA编程思想

    7. **Scala并发与Akka模型** 8. **Scala在大数据处理中的应用** #### 1. Scala语言简介 Scala是一种多范式编程语言,结合了面向对象编程和函数式编程的特点。它最初由Martin Odersky设计,并于2003年发布。Scala...

    beginners-guide-to-scala

    Promise和Future是Scala并发编程中非常重要的概念。Promise可以视为一个容器,用于最终保存计算结果,而Future则是这个结果的代理。Future可以被其他线程查询、附加回调或等待其完成。Promise和Future一起,为Scala...

    用Scala写的akka actor简单demo

    总的来说,这个简单的Scala Akka Actor demo提供了一个学习Akka和Scala并发编程的实践平台,可以帮助开发者更好地理解和应用Akka框架。通过这个示例,可以深入理解Actor模型的优势,以及如何在实际项目中利用它来...

    scala in action

    “Scala中的并发编程”章节专注于Scala对并发的支持,介绍了Scala并发工具,包括Actors模型、Futures和Promises等。这些并发工具为编写高并发应用提供了强大的支持,同时避免了传统线程模型中的复杂性和潜在错误。 ...

    Scala语言规范-v2.7.rar

    虽然Spark自身并不直接使用Scala的Actor模型,但理解这个概念对于学习更高级的Scala并发编程是有帮助的。 在学习Scala语言规范v2.7时,应重点关注其与Spark相关的部分,例如如何创建和操作RDD,如何使用DataFrame和...

    twitter scala课堂

    此外,课程还涉及了Scala并发编程的基础知识,包括Runnable和Callable接口、线程(Threads)、Futures等,以及如何在Java和Scala之间进行跨平台交互,展示了如何在Java代码中利用Scala的功能。 最后,课程通过...

    并发需求下的Scala及Erlang语言的比较与使用

    ### 并发需求下的Scala及Erlang语言的比较与使用 在当今的高并发、大数据处理场景下,选择合适的编程语言对于系统性能至关重要。在众多编程语言中,Scala和Erlang因其强大的并发处理能力和函数式编程特性而受到关注...

    scala sdk scala-2.12.3

    2. **标准库**:Scala的标准库提供了大量的类和模块,包括集合操作、I/O、反射、并发处理等,这些是编写Scala程序的基础。 3. **Scala REPL**:Read-Eval-Print Loop,交互式解释器,允许开发者即时测试代码片段,...

Global site tag (gtag.js) - Google Analytics