简介:
今天打算写一个关于Redis压测的小程序,想来反正也是花时间写,不如顺便研究一下Akka这个比较火的开源库。
代码是scala写得,建议大家尝试新技术!
hello word 小例子:
pom.xml
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.3.9</version> </dependency>
HelloActor.scala
/** * Created by zz on 2016/9/27. */ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case object Person extends Per{ val name:String="job" val age:Int=18 } class RunRedisActor extends Actor { def receive = { case "hello" => {println("hello word")} case Person => println("ren") case _ => println("您是?") } } object Main extends App { val system = ActorSystem("TestRedisSystem") // 缺省的Actor构造函数 val runRedisActor = system.actorOf(Props[RunRedisActor], name = "helloactor") runRedisActor ! "hello" runRedisActor ! Person runRedisActor ! "喂" }
运行结果:
hello word
ren
您是?
Process finished with exit code -1
ren
您是?
Process finished with exit code -1
注意:akka2.4.X 版本会以后错误,原因是jdk1.7及以下版本不兼容。
报错信息 写道
Exception in thread "main" java.lang.UnsupportedClassVersionError: JVMCFRE003 主要版本错误;类=akka/actor/ActorSystem$,偏移量=6
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:295)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:711)
at java.net.URLClassLoader.access$400(URLClassLoader.java:92)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1159)
at java.security.AccessController.doPrivileged(AccessController.java:314)
at java.net.URLClassLoader.findClass(URLClassLoader.java:594)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:743)
at java.lang.ClassLoader.loadClass(ClassLoader.java:711)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:690)
at com.test.lus.akka.Main$.delayedEndpoint$com$test$lus$akka$Main$1(HelloActor.scala:25)
at com.test.lus.akka.Main$delayedInit$body.apply(HelloActor.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.test.lus.akka.Main$.main(HelloActor.scala:24)
at com.test.lus.akka.Main.main(HelloActor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:613)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:295)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:711)
at java.net.URLClassLoader.access$400(URLClassLoader.java:92)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1159)
at java.security.AccessController.doPrivileged(AccessController.java:314)
at java.net.URLClassLoader.findClass(URLClassLoader.java:594)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:743)
at java.lang.ClassLoader.loadClass(ClassLoader.java:711)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:690)
at com.test.lus.akka.Main$.delayedEndpoint$com$test$lus$akka$Main$1(HelloActor.scala:25)
at com.test.lus.akka.Main$delayedInit$body.apply(HelloActor.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.test.lus.akka.Main$.main(HelloActor.scala:24)
at com.test.lus.akka.Main.main(HelloActor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:613)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
-------------------------------------------分割线----------------------------------------------------
进阶:我们实现简单分布式,远程调用actor
pom.xml 中添加远程依赖:
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.11</artifactId> <version>2.3.9</version> </dependency>
application.conf akka默认使用com.typesafe读取配置文件
MyRemoteServerSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } } } MyRemoteClientSideActor { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } } }
RemoteActor
import akka.actor.{Actor} /** * Created by zz on 2016/9/27. */ class RemoteActor extends Actor{ def receive = { case "remote" => println("remote") sender() ! "ok" } }
AkkaServerApplication
import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory /** * Created by zz on 2016/9/27. */ object AkkaServerApplication extends App { val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容 val log = system.log log.info("Remote server actor started: " + system) system.actorOf(Props[RemoteActor], "remoteActor") // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值 }
ClientActor
import akka.actor.{Actor, ActorLogging} /** * Created by zz on 2016/9/27. */ class ClientActor extends Actor with ActorLogging { // akka.<protocol>://<actor system>@<hostname>:<port>/<actor path> val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用 val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息 def receive = { case "remote" => remoteServerRef ! "remote" println("please go to request client ....") case "ok" =>println("server is ok") } }
AkkaClientApplication
import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory /** * Created byzz on 2016/9/27. */ object AkkaClientApplication extends App { val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) // 通过配置文件application.conf配置创建ActorSystem系统 val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用 clientActor ! "remote" }
AkkaServerApplication输出 写道
[INFO] [09/27/2016 17:01:52.580] [main] [Remoting] Starting remoting
[INFO] [09/27/2016 17:01:52.917] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [09/27/2016 17:01:52.919] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [09/27/2016 17:01:52.931] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system
remote
[INFO] [09/27/2016 17:01:52.917] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [09/27/2016 17:01:52.919] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
[INFO] [09/27/2016 17:01:52.931] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system
remote
AkkaClientApplication输出 写道
[INFO] [09/27/2016 17:02:00.080] [main] [Remoting] Starting remoting
[INFO] [09/27/2016 17:02:00.511] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@22.7.16.98:2552]
[INFO] [09/27/2016 17:02:00.513] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@22.7.16.98:2552]
please go to request client ....
server is ok
[INFO] [09/27/2016 17:02:00.511] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://client-system@22.7.16.98:2552]
[INFO] [09/27/2016 17:02:00.513] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://client-system@22.7.16.98:2552]
please go to request client ....
server is ok
-------------------------------------------------分割线-----------------------------------------------
再进阶:ping pong demo
PingPong.scala
import akka.actor.{Actor, ActorRef} /** * Created by zz on 2016/9/27. */ case object PingMessage case object PongMessage case object StartMessage case object StopMessage class Ping(pong: ActorRef) extends Actor { var count = 0 def incrementAndPrint { count += 1; println("ping") } def receive = { case StartMessage => incrementAndPrint pong ! PingMessage case PongMessage => if (count > 9) { sender ! StopMessage println("ping stopped") context.stop(self) } else { incrementAndPrint sender ! PingMessage } } } class Pong extends Actor { def receive = { case PingMessage => println(" pong") sender ! PongMessage case StopMessage => println("pong stopped") context.stop(self) context.system.shutdown() } }
PingPongApp
import akka.actor.{ActorSystem, Props} /** * Created by zz on 2016/9/27. */ object PingPongApp extends App { val system = ActorSystem("PingPongSystem") val pong = system.actorOf(Props[Pong], name = "pong") val ping = system.actorOf(Props(new Ping(pong)), name = "ping") // start them going ping ! StartMessage }
输出 写道
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping stopped
pong stopped
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping stopped
pong stopped
后记:例子比较简单,但是骨架基本如此,后续再做完善。。。
相关推荐
为了快速理解 Akka 的基本工作原理,通常会从实现一个简单的“Hello World”示例开始。这个例子展示了如何定义 Actor 类、创建 Actor 系统、发送消息以及处理消息响应。 ##### 1.5 使用场景与部署方案 本节介绍了 ...
一个简单的 Akka Java HelloWorld 示例通常包括: 1. 定义一个 Actor 类,重写 `createReceive` 方法来接收消息。 2. 创建 Actor 实例。 3. 发送消息给 Actor 并接收响应。 ##### 1.5 Use-case and Deployment ...
Akka是用Scala编写的,但在Java中也可以方便地使用,它提供了一个强大的actor模型来处理并发问题,使得在高度并行和分布式系统中编程变得更加简单。在这个框架中,协程(或称为actors)被用来替代传统的多线程,以...
Akka 提供了一种简单而强大的方式来处理这些问题,它允许开发者利用 Actor 模型来构建高并发、分布式的系统。通过 Akka,开发者可以更容易地构建出能够自动恢复故障、适应负载变化的系统。 **1.3 如何开始使用 Akka...
在 AKKA 中创建 Actor 非常简单,可以通过继承 `Actor` 类或者使用 `Props` 对象来创建 Actor 实例。例如: ```scala import akka.actor.Actor import akka.actor.Props class SimpleActor extends Actor { def ...
它基于actor模型,使得在Java虚拟机(JVM)上编写这类程序变得简单高效。Akka的使用通常涉及以下几个核心概念: 1. **Actor系统**:Akka的基础是Actor系统,这是一个全局的管理结构,负责创建、监控和调度Actor。它...
- **Example代码**: 压缩包中的源码提供了各种使用场景,如简单的生产消费、分组聚合、水印窗口等,通过对这些示例的学习,可以更深入地理解Akka Streams与Kafka的集成。 总结,"akka-streams-kafka-examples-源码...
**1.6 Akka的使用案例实例** - **实时数据分析**:Akka可以用来构建高效的数据流处理系统,实现实时分析大量数据。 - **微服务架构**:Akka支持轻量级的服务间通信,非常适合构建分布式的微服务架构。 - **游戏...
- **编写代码**:从简单的 Actor 开始,逐步学习 Akka 的核心概念和技术。 ##### 1.4 必不可少的 HelloWorld 示例 - **创建 Actor**:定义一个 Actor 类继承自 `Actor`。 - **发送消息**:使用 `tell` 方法向 Actor...
6. **Singletons**:Akka Cluster 提供了 Singleton 模式,确保集群中只有一个特定 Actor 实例。 7. **Distributed Data**:集群中的节点可以共享和同步数据,通过 `DistributedPubSub` 和 `At-Least-Once Delivery...
在Scala中使用Akka Actor可以利用其强大的类型系统和并发能力,使得编写高性能的并发程序变得更加简单。 **Actor系统** Akka中的Actor系统是所有Actor的容器和管理器。它负责Actor的生命周期,包括创建、调度、...
4. **Keep the Error Kernel Simple(保持错误内核简单)**:建议在Actor的错误处理中保持核心逻辑简单,避免复杂的错误处理逻辑。 5. **Failure Zones(故障区域)**:提出了在系统中划分故障区域的概念,以便于...
Java 中 Akka 的简单生产者消费者示例 此存储库包含 3 个简单网络爬虫的示例: 一个连续的例子 将逻辑拆分为 3 个 Actor 的示例 页面的检索由多个 Actor 并行处理的示例。 检索失败且应用程序挂起的示例 重新发送...
该项目是一系列项目中的一个,该项目从一个简单的Akka Cluster项目开始,逐步构建为事件源和命令查询责任隔离的示例。 该项目系列由以下GitHub存储库组成: (此项目) 一个的例子 的和一个例子 每个项目都可以...
在Scala中使用Akka时,可以通过定义actor的特质(trait)来创建具体的actor类型,然后在运行时实例化这些actor,并向它们发送消息来交互。为了支持actor之间的消息传递,Akka提供了一套消息传递协议,但并不强制要求...
在这个例子中,聊天室中的每个人本质上都是一个简单的演员,遵循一组简单的指令。 此外,每个人都知道聊天室中还有谁,并且所有其他参与者都遵循相同的指令集。 此示例场景类似于感知集群参与者使用的基本方法。 ...
如果Actor需要在初始化时接收参数,可以使用`Props`工厂类结合构造函数参数来创建Actor实例。 2.3 Actor停止监视 通过`context.watch()`方法,一个Actor可以监视另一个Actor,当被监视的Actor停止时,监视者会收到...
Akka的这种设计使得构建分布式系统变得更加简单,因为它提供了内置的故障恢复机制和负载均衡策略。 Spray是一个基于Scala的HTTP工具包,它主要用于构建RESTful API和服务。Spray提供了一套完整的工具,包括路由定义...
《Akka in Action》是一本深入探讨Akka框架的书籍,其随附的源代码提供了丰富的实例和示例,帮助读者更好地理解和应用Akka在实际项目中的功能。Akka是一个用Scala编写的开源系统,主要设计用于构建高度并发、分布式...