`
wbj0110
  • 浏览: 1614058 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

scala akka 修炼之路1(使用actor实现一个job的并发计算和task失败重启)

    博客分类:
  • Akka
阅读更多

package cn.yangg.scala.akka.init

 

import akka.actor.Actor

import akka.actor.Props

import akka.actor.ActorRef

import akka.actor.Terminated

import akka.event.Logging

import akka.actor.ActorSystem

import akka.pattern.ask

import scala.util.Success

import scala.util.Failure

import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.ExecutionContext

import akka.actor.Terminated

 

 

 

case class HeartBeat(taskid:Int,parent:String)

case class TaskFinished(taskid:Int)

case object JobFinished

case object JobStart

case class TaskFailure(taskid:Int)

case class TaskMessage

case class TaskRestartMessage

case class TaskFinishedMessage

 

class Parent(jobid:String,tasknum:Int) extends Actor{

  val log=Logging(this.context.system,this)

  var tasks=Array[ActorRef]()

  var replySender=this.context.system.deadLetters

  varcount=0;//存在问题 

  def receive={

    case JobStart=>{

       this.replySender=this.context.sender

       tasks=(1 to tasknum).map(id=>this.context.actorOf(Props(new Child(id)))).toArray

      tasks.foreach(actor=>(actor ! TaskMessage))

    }

    case heartBeat:HeartBeat=>{

      println("taskid-0000"+heartBeat.taskid+",finished:"+heartBeat.parent)

    }

    case TaskFinished(taskid)=>{

      println("taskid-0000"+taskid+"finished...")

      this.self ! TaskFinishedMessage

    }

    case Terminated(actor)=>

      println(actor.path.toString()+"stop")

    case TaskFailure(taskid)=>{

      //restart task

      valrestartActor=this.context.actorOf(Props(new Child(taskid*10)))

      restartActor ! TaskRestartMessage

    } 

    case TaskFinishedMessage=>{

      this.count+=1

      if(this.count==tasknum){

        this.replySender ! akka.actor.Status.Success("all task finished")

      }

      println(this.count)

    }

  }

}

class Child(taskid:Int) extends Actor{

  val log=Logging(this.context.system,this)

  def receive={

    case TaskMessage=>{

      Thread.sleep(1000)

      this.context.parent ! HeartBeat(taskid,"10%")

      Thread.sleep(2000)

      this.context.parent ! HeartBeat(taskid,"70%")

      

      //task failed 

      this.context.stop(this.self)

      if(taskid%3==0){

        this.context.parent ! TaskFailure(this.taskid)

        log.info("taskid="+taskid+" task failed")

      }else{

        this.context.parent ! TaskFinished(this.taskid)

      }     

    }

    case TaskRestartMessage=>{

      log.info(taskid+" restart...")

      this.context.parent ! TaskFinished(this.taskid)

    }

  }

}

object StartMoon {

def main(args:Array[String]){

  val system=ActorSystem("actorSystem")

  val jobActor=system.actorOf(Props(new Parent("DataSplit-job",10)),"DataSplitJob")

  val jobListener=ask(jobActor,JobStart)(10000)

  jobListener.onComplete(result => resultmatch{

    case Success(result)=>{

      println("job finished...,message:"+result)

    }

    case Failure(result)=>{

      println("job failed...,message:"+result.getMessage())

    }

  })

}

}

 

http://blog.csdn.net/yangguo_2011/article/details/27399431

 
分享到:
评论

相关推荐

    响应式架构 消息模式Actor实现与Scala.Akka应用集成

    1. Actor系统的创建和配置:理解如何在Scala中初始化一个Akka ActorSystem,以及如何设置配置参数,如调度器、内存管理等。 2. Actor的定义和交互:学习如何定义一个新的Actor类,以及如何在Actor之间发送消息,...

    用Scala写的akka actor简单demo

    标题中的“用Scala写的akka actor简单demo”指的是一个使用Scala编程语言编写的Akka Actor系统的基本示例。Akka是轻量级、基于actor模型的框架,它用于构建高度并发、分布式和容错的应用程序。Scala是多范式编程语言...

    响应式架构++消息模式Actor实现与Scala.Akka应用集成+,沃恩·弗农+

    Akka是实现响应式架构的一个强大工具,尤其在Scala编程语言中广泛使用。 Akka是一个由Lightbend公司开发的开源框架,它基于Actor模型,为构建高度并发、分布式和容错的系统提供了基础。Actor模型是一种并发计算的...

    scala akka

    - **Akka**: 是一个基于Actor模型的并发框架,由Typesafe Inc.开发。它为构建高度并发、分布式、容错的应用程序提供了强大的工具。 - **Scala**: 是一种多范式编程语言,结合了面向对象和函数式的特性,广泛应用于大...

    响应式架构 消息模式Actor实现与Scala.Akka应用集成 ,沃恩·弗农

    在Scala中,创建和使用Actor非常直观,可以利用ActorSystem来启动和管理Actor,通过定义消息类型和Actor的行为来实现业务逻辑。 在响应式架构中,Actor模型扮演着关键角色,因为它们能够有效地处理异步事件,这正是...

    Scala Akka项目源码

    Scala Akka是一个强大的工具包,用于构建高度并发、分布式和反应式的应用程序。它基于actor模型,使得在Java虚拟机(JVM)上编写这类程序变得简单高效。Akka的使用通常涉及以下几个核心概念: 1. **Actor系统**:...

    响应式架构 消息模式Actor实现与Scala.Akka应用集成.rar

    《响应式架构:消息模式Actor实现与Scala、Akka应用集成》由10章构成,详细介绍了使用Actor模型中的响应式消息传输模式的理论和实用技巧。其中包括:Actor模型和响应式软件的主要概念、Scala语言的基础知识、Akka...

    响应式架构 消息模式Actor实现与Scala.Akka应用集成 高清扫描版

    响应式架构 消息模式Actor实现与Scala.Akka应用集成 高清扫描版

    Akka Scala 学习高清原版pdf

    Akka是一个用Scala和Java编写的库,用于构建并发、分布式以及容错的事件驱动应用。Akka框架借鉴了Erlang的并发模型,但它是建立在JVM之上,并且提供了丰富的抽象和工具,能够简化开发工作。 标题“Akka Scala 学习...

    akka框架,应用于scala

    至于"akka-quickstart-scala"这个文件,很可能是Akka的Scala快速入门示例,其中可能包含了如何创建和交互Actor、配置Actor系统、处理消息以及实现简单的并发和分布式应用的代码示例。通过学习和实践这些示例,开发者...

    akka-actor-1.0-RC2.jar.zip

    Akka的核心组件之一就是Actor系统,这是一个设计模式,用于处理并发和并行计算。在Akka中,Actors通过消息传递进行通信,而非共享状态,这种模型极大地简化了多线程编程中的同步问题。`akka-actor-1.0-RC2.jar`是这...

    akka scala分布式计算

    akka scala 实现求连续平方和,分布式计算,快速理解分布式计算原理!

    scala并发编程开发教程

    Akka是一个用Scala编写的库,它提供了基于Actor模型的并发解决方案,这使得Akka在构建高性能、高可靠的分布式应用中扮演着重要角色。 首先,我们要理解什么是Actor模型。Actor模型是一种并行计算模型,它将Actor视...

    基于Akka的加密消息通信.rar_scala akka_基于Akka通信框架的加密通信项目_消息通信

    Akka是一个用于构建高度并发、分布式和容错的应用程序的开源工具包,它基于actor模型,运行在Java虚拟机(JVM)上。Scala是一种函数式和面向对象编程语言,它与Java无缝集成,是开发Akka应用的首选语言。尽管描述中...

    akka-actor-1.1.2.jar.zip

    在现代软件开发中,处理并发和分布式计算已经成为一个重要的挑战。Akka,一个由Lightbend公司维护的开源框架,提供了强大的工具来解决这些问题。尤其值得一提的是其核心组件——Akka Actor,它是实现异步、反应式...

    响应式架构小模式Actor实现与scala

    Akka是一个用Scala编写的开源工具包和运行时,用于构建并发、分布式、容错的事件驱动应用程序。 1. **Scala语言特性**:Scala将面向对象和函数式编程结合在一起,并有强大的类型系统,可以编译成Java字节码,在JVM...

Global site tag (gtag.js) - Google Analytics