`

Scala学习笔记(Scala编程第30章 Actors and Concurrency 例子分析)

阅读更多
Scala编程第30章 Actors and Concurrency 例子分析。

源程序:
/**
 * 《Programming In Scala》第30章 Actors and Concurrency 例子
 */

package org.stairwaybook.simulation2
import scala.actors.Actor
import scala.actors.Actor._
//Ping消息 Clock -> Simulant
case class Ping(time: Int)
//Pong消息 Simulat -> Clock
case class Pong(time: Int, from: Actor)
//工作项目
case class WorkItem(time: Int, msg: Any, target: Actor)
//延时启动消息
case class AfterDelay(delay: Int, msg: Any, target: Actor)
//开始消息
case object Start
//终了消息
case object Stop

//时钟类,用于调度工作项目。必须保证前面的工作项目没有完成前(收到Pong)不能开始
//后面的工作项目(发送Ping)。
class Clock extends Actor {
    //是否已启动并且没有被停止
    private var running = false
    //当前时间
    private var currentTime = 0
    //工作日程表,按时间的升序排列
    private var agenda: List[WorkItem] = List()
    //所有将要处理的Simulant
    private var allSimulants: List[Actor] = List()
    //所有正在处理中的Simulat(等待Pong中)
    private var busySimulants: Set[Actor] = Set.empty
    //加入处理目标Simulant
    def add(sim: Simulant) {
        allSimulants = sim :: allSimulants
    }
    //业务方法
    def act() {
        loop {
            //在工作状态中并且无未处理Simulant情况下推进时钟
            if (running && busySimulants.isEmpty)
                advance()
            //接受并处理一个消息
            reactToOneMessage()
        }
    }
    //推进时钟并处理当前工作
    def advance() {
        if (agenda.isEmpty && currentTime > 0) {
            println("** Agenda empty. Clock exiting at time " + currentTime+".")
            self ! Stop
            return
        }
        currentTime += 1
        println("Advancing to time "+currentTime)
        //处理当前工作
        processCurrentEvents()
        //向所有Simulant发送Ping,通知当前时间
        for (sim <- allSimulants)
            sim ! Ping(currentTime)
        //设置所有Simulant为忙
        busySimulants = Set.empty ++ allSimulants
    }
    //处理当前工作
    private def processCurrentEvents() {
        val todoNow = agenda.takeWhile(_.time <= currentTime)
        agenda = agenda.drop(todoNow.length)
        for (WorkItem(time, msg, target) <- todoNow) {
            assert(time == currentTime) //由于advance每次只推进1
            //向Simulant发送动作消息
            target ! msg
        }
    }
    //接受并处理一个消息
    def reactToOneMessage() {
        react {
            //延迟启动
            case AfterDelay(delay, msg, target) =>
                val item = WorkItem(currentTime + delay, msg, target)
                agenda = insert(agenda, item)
            //Pong响应
            case Pong(time, sim) =>
                assert(time == currentTime)
                assert(busySimulants contains sim)
                busySimulants -= sim
            //开始时钟
            case Start => running = true
            //结束时钟
            case Stop =>
                //结束所有Simulant
                for (sim <- allSimulants)
                    sim ! Stop
                exit()
        }
    }
    //按升序加入一个工作项目
    private def insert(ag: List[WorkItem], item: WorkItem): List[WorkItem] = {
        if (ag.isEmpty || item.time < ag.head.time) item :: ag
        else ag.head :: insert(ag.tail, item)
    }
    //启动时钟的消息处理
    start()
}
//模拟节点
trait Simulant extends Actor {
    //所处的时钟,虚属性
    val clock: Clock
    //消息处理方法,虚方法
    def handleSimMessage(msg: Any)
    //模拟开始方法
    def simStarting() { }
    //业务方法
    def act() {
        loop {
            react {
                //结束模拟节点
                case Stop => exit()
                //响应Ping消息
                case Ping(time) =>
                    //在时间为1的开始模拟节点
                    if (time == 1) simStarting()
                    //通知Pong
                    clock ! Pong(time, self)
                //动作消息
                case msg => handleSimMessage(msg)
            }
        }
    }
    //启动模拟节点的消息处理
    start()
}

//模拟电路
class Circuit {
    //所处的时钟
    val clock = new Clock
    //设置线路值动作消息,目标模拟节点为Wire
    case class SetSignal(sig: Boolean)
    //线路值改变通知动作消息,目标模拟节点为Gate
    case class SignalChanged(wire: Wire, sig: Boolean)
    //延时常量
    val WireDelay = 1
    val InverterDelay = 2
    val OrGateDelay = 3
    val AndGateDelay = 3
    //模拟线路
    class Wire(name: String, init: Boolean) extends Simulant {
        def this(name: String) { this(name, false) }
        def this() { this("unnamed") }
        val clock = Circuit.this.clock
        //将自己加入时钟
        clock.add(this)
        //状态值
        private var sigVal = init
        //观察者例表(Gate)
        private var observers: List[Actor] = List()

        //处理动作消息
        def handleSimMessage(msg: Any) {
            msg match {
                case SetSignal(s) =>
                    //通知值改变
                    if (s != sigVal) {
                        sigVal = s
                        signalObservers()
                    }
            }
        }
        //向时钟发送AfterDelay消息,由时钟调度来通知各个观察者线路的状态值已改变
        def signalObservers() {
            for (obs <- observers)
                clock ! AfterDelay(
                    WireDelay,
                    SignalChanged(this, sigVal),
                    obs)
        }
        //开始处理,通知所有观察者
        override def simStarting() { signalObservers() }
        //加入观察者
        def addObserver(obs: Actor) {
            observers = obs :: observers
        }
        //线路的文本描述
        override def toString = "Wire("+ name +")"
    }

    //假线路,用于OrGate来表示无效的线路
    private object DummyWire extends Wire("dummy")

    //门类,in1和in2是输入线路,out是输出线路
    abstract class Gate(in1: Wire, in2: Wire, out: Wire)
            extends Simulant {
        //输出计算虚方法
        def computeOutput(s1: Boolean, s2: Boolean): Boolean
        //门延时,虚属性
        val delay: Int
        //所处的时钟
        val clock = Circuit.this.clock
        //向时钟加入自己
        clock.add(this)
        //作为in1和in2的观察者
        in1.addObserver(this)
        in2.addObserver(this)
        //in1和in2的状态值
        var s1, s2 = false

        //处理动作消息
        def handleSimMessage(msg: Any) {
            msg match {
                //线路状态改变消息
                case SignalChanged(w, sig) =>
                    if (w == in1)
                        s1 = sig
                    if (w == in2)
                        s2 = sig
                    //向时钟发送延时处理消息,由时钟来调度通知out线路改变状态
                    clock ! AfterDelay(delay,
                                SetSignal(computeOutput(s1, s2)),
                                out)
            }
        }

    }
    //构建or门
    def orGate(in1: Wire, in2: Wire, output: Wire) =
        new Gate(in1, in2, output) {
            val delay = OrGateDelay
            def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2
        }
    //构建and门
    def andGate(in1: Wire, in2: Wire, output: Wire) =
        new Gate(in1, in2, output) {
            val delay = AndGateDelay
            def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2
        }
    //构建not门
    def inverter(input: Wire, output: Wire) =
        new Gate(input, DummyWire, output) {
            val delay = InverterDelay
            def computeOutput(s1: Boolean, ignored: Boolean) = !s1
        }
    //加入探针
    def probe(wire: Wire) = new Simulant {
        val clock = Circuit.this.clock
        clock.add(this)
        wire.addObserver(this)
        def handleSimMessage(msg: Any) {
            msg match {
                case SignalChanged(w, s) =>
                    println("signal "+ w +" changed to "+ s)
            }
        }
    }
    //启动门
    def start() { clock ! Start }
}
//包含半加和全加的特征
trait Adders extends Circuit {
    def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) {
        val d, e = new Wire
        orGate(a, b, d)
        andGate(a, b, c)
        inverter(c, e)
        andGate(d, e, s)
    }
    def fullAdder(a: Wire, b: Wire, cin: Wire,
            sum: Wire, cout: Wire) {
        val s, c1, c2 = new Wire
        halfAdder(a, cin, s, c1)
        halfAdder(b, s, sum, c2)
        orGate(c1, c2, cout)
    }
}
//主单例对象
object Demo {
    def main(args: Array[String]) {
        val circuit = new Circuit with Adders
        import circuit._
        val ain = new Wire("ain", true)
        val bin = new Wire("bin", false)
        val cin = new Wire("cin", true)
        val sout = new Wire("sout")
        val cout = new Wire("cout")
        probe(ain)
        probe(bin)
        probe(cin)
        probe(sout)
        probe(cout)
        fullAdder(ain, bin, cin, sout, cout)
        circuit.start()
    }
}


执行结果:
Advancing to time 1
Advancing to time 2
signal Wire(cout) changed to false
signal Wire(sout) changed to false
signal Wire(cin) changed to true
signal Wire(bin) changed to false
signal Wire(ain) changed to true
Advancing to time 3
Advancing to time 4
Advancing to time 5
Advancing to time 6
Advancing to time 7
Advancing to time 8
Advancing to time 9
Advancing to time 10
signal Wire(cout) changed to true
Advancing to time 11
Advancing to time 12
Advancing to time 13
Advancing to time 14
Advancing to time 15
Advancing to time 16
Advancing to time 17
Advancing to time 18
signal Wire(sout) changed to true
Advancing to time 19
Advancing to time 20
Advancing to time 21
signal Wire(sout) changed to false
** Agenda empty. Clock exiting at time 21.
2
0
分享到:
评论

相关推荐

    Scala编程完整版

    这份"Scala编程完整版"文档提供了全面的Scala学习资源,覆盖了从基础概念到高级特性的各个方面。以下是一些主要的知识点: 1. **基本语法**:Scala的基础语法类似于Java,但更为简洁。它包括变量定义(var和val)、...

    atomic-scala-examples

    7. ** Actors 和 Concurrency**:Scala对Akka框架的支持,利用actors进行并发和分布式计算。 8. **泛型**:类型参数化,使代码更具通用性和可复用性。 9. **隐式转换**:Scala的隐式转换可以将一种类型的对象透明...

    Actors in Scala

    #### 三、Scala与Actors模型 Scala语言以其强大的元编程能力而闻名,其内置对Actors的支持更是使其成为了构建高度并发应用的理想选择。Scala中的Actors库提供了高级抽象,使得开发者能够轻松地创建和管理Actors。 ...

    Scala 程序设计 中+英文

    5. ** Actors 和 Concurrency**:Scala内置了Actors模型,用于构建并发和分布式系统,帮助开发者避免线程同步问题。 6. **高级特性**:涵盖类型参数、抽象类型、特质、包对象、混合类型和匿名函数等,这些特性使...

    scala-2.10.2

    scala.actors - Concurrency framework inspired by Erlang. scala.io - Input and output. scala.math - Basic math functions and additional numeric types. scala.sys - Interaction with other processes and ...

    scala2.10.6

    10. **社区支持**:Scala 2.10.6有广泛的社区支持,包括各种第三方库和框架,如Play Framework、Spray和Slick等,这些都极大地扩展了Scala的应用领域。 由于“scala2.10.6”这个标签下的压缩包文件名为“scala.msi...

    Scala 基础.zip

    6. ** Actors 和 Concurrency**:Scala的Actors模型提供了一种安全的并发处理方式,它通过消息传递来管理共享状态,降低了线程同步的复杂性。 7. **集合库**:Scala的集合库非常强大,提供了各种高效的集合实现,如...

    快学Scala-[中文]

    6. ** Actors 和 Concurrency**:Scala 支持 Akka 框架,通过 Actors 模型实现并发,这是一种轻量级的并发机制,有助于构建高度并行和分布式系统。 7. **模式匹配**:Scala 的模式匹配可以用于处理枚举、数组、列表...

    scala for the impatient

    Working with higher-order functions and the powerful Scala collections library * Leveraging Scala's powerful pattern matching and case classes * Creating concurrent programs with Scala actors * ...

    Programming Scala

    * Know how to use mixin composition with traits, pattern matching, concurrency with Actors, and other essential features, * Take advantage of Scala's built-in support for XML, * Learn how to develop ...

    Learning Concurrent Programming in Scala, 2nd Edition

    Title: Learning Concurrent Programming in Scala, 2nd Edition Author: Aleksandar Prokopec Length: 382 pages Edition: 2nd Revised edition Language: English Publisher: Packt Publishing - ebooks Account ...

    Learning+Concurrent+Programming+in+Scala,+2nd+Edition-2017.mobi

    Concurrent and parallel programming have progressed from niche disciplines, of interest only to kernel programming and high-performance computing, to something that every competent programmer must ...

    programminginscala:Scala编程练习

    7. ** Actors 和 Concurrency**:Scala通过Akka库提供了强大的并发处理能力,Actors模型可以帮助开发者编写线程安全的代码。这方面的练习可能会涵盖Actor的创建、消息传递和系统管理。 8. **Type System**:Scala的...

    rockjvm-scala-初学者

    5. ** Actors 和 Concurrency**:Scala 的 Akka 框架提供了一种基于 Actor 模型的并发处理机制,有助于构建可扩展的、容错的系统。 6. **集合库**:Scala 的集合库非常强大,提供了丰富的操作和优化,如转换、并行...

    AKKA concurrency

    Akka并发是Scala语言生态系统中的一个核心库,它基于actor模型来处理并发。Actor模型是一种并行计算框架,它允许创建许多相互独立的、封装良好的并发实体,称为actors。这些actors通过消息传递来协作,但它们之间...

    sc-interActive-openSrc-functionalProgramming-Scala-1:探索功能性编程的第二个存储库

    10. ** Actors 与 Concurrency**:Scala 中的 Actor 模型及其在并发编程中的应用。 通过这个项目,学习者可以深入理解 Scala 语言的函数式编程特性,并提升实际编程能力。同时,通过参与开源项目,他们还可以了解...

    Python algorithm

    5. 并发模型(Concurrency Model):Scala提供了高级的并发构建,如actors,它们为并发和分布式计算提供了一种更加简洁的模型。 虽然文档内容并未直接提供关于Python算法的具体知识点,但是上述提到的函数式编程和...

    Reactive Web Applications(Manning,2016)

    Java and Scala developers can use the Play Framework and the Akka concurrency toolkit to easily implement reactive applications without building everything from scratch. Reactive Web Applications ...

    Akka Concurrency Framework

    ##### 4.7 Dataflow Concurrency (Scala) **Dataflow Concurrency** 是一种基于数据流的并发模型,它可以自动处理并发和同步问题,使得编写并行代码变得更加简单。 ##### 4.8 Fault Tolerance (Scala) Akka提供了...

Global site tag (gtag.js) - Google Analytics