Akka 中的有类型 Actor 是 Active Objects 模式的一种实现.
Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。
有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。
对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。
有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息,
它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.
有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。
注意
和普通Akka actor一样,有类型actor也一次处理一个消息。
什么时候使用有类型的Actor
有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。
工具箱
返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension
extension =
TypedActor.get(system); //system is an instance of
ActorSystem
判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or
not
TypedActor.get(system).isTypedActor(someReference);
返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an
external Typed Actor
Proxy
TypedActor.get(system).getActorRefFor(someReference);
返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor
实现的方法中有效 // method only valid within methods of a TypedActor
implementation
ActorContext context = TypedActor.context();
返回当前有类型actor的外部代理//Returns the external proxy of the current Typed
Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a
TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();
返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor
Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if
you create other Typed Actors with this,
//they will become children to the
current Typed Actor.
TypedActor.get(TypedActor.context());
具体例子及说明
package practise.akka.typedactors
import akka.dispatch.Future
import akka.japi.Option
/**
* 这个就是对外的接口,各函数就是Typed Actor的接口方法
*/
public interface Squarer {
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i);//blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
}
package practise.akka.typedactors
import akka.dispatch.Future
import akka.dispatch.Futures
import akka.actor.TypedActor
import akka.japi.Option
import akka.actor.ActorContext
import groovy.util.logging.Log4j
import akka.actor.ActorRef
/**
* 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
*/
@Log4j
class SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver {
private String name;
public SquarerImpl() {
this.name = "default";
}
public SquarerImpl(String name) {
this.name = name;
}
public void squareDontCare(int i) {
log.debug("squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----" + i) //可以从线程号看出是异步处理的
int sq = i * i; //Nobody cares :(
//返回当前的ActorContext,
// 此方法仅在一个TypedActor 实现的方法中有效
ActorContext context = TypedActor.context();
println "context ---- " + context
//返回当前有类型actor的外部代理,
// 此方法仅在一个TypedActor 实现的方法中有效
Squarer mysq = TypedActor.<Squarer> self();
println "--self --" + mysq
}
public Future<Integer> square(int i) {
log.debug("square send-request-reply Future----" + i) //可以从线程号看出是异步处理的
return Futures.successful(i * i, TypedActor.dispatcher());
}
public Option<Integer> squareNowPlease(int i) {
log.debug("squareNowPlease send-request-reply Option----" + i) //可以从线程号看出是异步处理的
return Option.some(i * i);
}
public int squareNow(int i) {
log.debug("squareNow send-request-reply result----" + i) //可以从线程号看出是异步处理的
return i * i;
}
@Override
void onReceive(Object o, ActorRef actorRef) {
log.debug("TypedActor收到消息----${o}---from:${actorRef}")
}
}
package practise.akka.typedactors
import akka.actor.ActorSystem
import akka.actor.TypedActor
import akka.actor.TypedProps
import com.typesafe.config.ConfigFactory
import akka.japi.Creator
import groovy.util.logging.Log4j
import akka.actor.ActorContext
/**
* 这里创建Typed Actor.
*/
@Log4j
class TypedActorsFactory {
ActorSystem system
private final String config = """akka {
loglevel = "${log?.debugEnabled ? "DEBUG" : "INFO"}"
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.hostname = "127.0.0.1"
remote.netty.port = 2552
remote.log-received-messages = on
remote.log-sent-messages = on
}"""
TypedActorsFactory(String sysName) {
this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
}
Squarer getTypedActorDefault() {
Squarer mySquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
//这里创建的是代理类型
return mySquarer
}
Squarer getTypedActor(String name) {
Squarer otherSquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,
new Creator<SquarerImpl>() {
public SquarerImpl create() { return new SquarerImpl(name); } //这里创建的是具体的实现类型
}),
name); //这个name是actor的name:akka//sys@host:port/user/name
return otherSquarer
}
}
下面用几个测试用例实验一下
package practise.akka.typedactors
import akka.actor.ActorRef
import akka.actor.TypedActor
import akka.actor.UntypedActorContext
import akka.dispatch.Future
import com.baoxian.akka.AkkaClientNoReply
import com.baoxian.akka.AkkaServerApp
class TestTypedActors extends GroovyTestCase {
def testTypeActor() {
println("----")
TypedActorsFactory factory = new TypedActorsFactory("typedServer")
// Squarer squarer = factory?.getTypedActorDefault() //创建代理
Squarer squarer = factory?.getTypedActor("serv") //具体实现
squarer?.squareDontCare(10)
Future future = squarer?.square(10)
AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result") //这是我自己构建的接收器
app.messageProcessor = {msg, UntypedActorContext context ->
log.info("结果为" + msg)
}
app.startup()
akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回结果pipe到接收器中了,在log中能看到结果
println "----" + squarer?.squareNowPlease(10)?.get()
println "----" + squarer?.squareNow(10)
//返回有类型actor扩展
TypedActor.get(factory.system)
//返回一个外部有类型actor代理所代表的Akka actor
ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer);
actor.tell("消息") //这个消息将会在SquarerImpl的onReceive方法中接收到
sleep(1000 * 60 * 10)
// TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor
TypedActor.get(factory.system).poisonPill(squarer);//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它
}
def testRemoteTypedActor() {
AkkaClientNoReply client = new AkkaClientNoReply("akka://typedServer@127.0.0.1:2552/user/serv")
client.send("远程消息") //这将会在SquarerImpl的onReceive方法中接收到
sleep(1000)
client.shutdown()
}
}
分享到:
相关推荐
《Akka Typed Actor详解——基于1.0-RC2.jar版本》 在Java和Scala的世界里,Akka是一个强大的工具库,它...对于那些寻求高性能、高可用性的分布式系统开发者来说,Akka Typed Actor是一个值得深入了解和使用的工具。
typed-actors, 编译时间 TypeChecked akka参与者 注释: 对于 Akka 2.4,你必须向版本号中添加 -a24 。 例如 1.1.0 变成 1.1.0-a24 。 Akka 2.4可用的第一个版本是 1.4.0 。libraryDependencies
5. **Interaction Patterns**: Akka Typed支持多种交互模式,如请求-响应、订阅发布、事件 sourcing等,这些模式可以帮助构建复杂的应用架构。 **使用Scala编写Akka Typed** 在Scala中,我们可以使用`ActorSystem`...
Akka Typed的活动来源 该库通过事件源实现演员持久性。 它提供: 演员定义API, 与Akka Typed集成在一起, 静态类型安全, 并支持参与者的持久性(带有事件源和快照); 和此API的实现基于akka-typed和akka-...
在这个未完成的项目中,开发者可能已经探索了如何创建一个Akka Typed Actor系统,并在集群中使用Cluster Sharding策略分布actor。同时,他们可能已经实现了与Cassandra的数据交互,包括数据的存取、查询以及事务管理...
在 Akka 中,"akka-typed" 是一个专门针对类型安全的 Actor API 的模块,它旨在提供一种更严谨、更易于理解和维护的方式来处理 Actors。 **Akka Typed 基本概念** 1. **Actor**: 在 Akka 中,Actor 是并发的基本...
Typed Actors 是一种更高级的 Actor 实现方式,它使用类型安全的 API 来定义 Actor 的行为。 ##### 3.3 Fault Tolerance 介绍了 Akka 如何处理 Actor 的故障,包括: - 监督策略。 - 复原策略。 - 监控 Actor 状态...
本教学内容将详细介绍 Akka 的基本概念、核心功能以及如何在 Java 中使用 Akka 来构建强大的应用程序。 ##### 1.1 什么是 Akka? Akka 是一个基于 Actor 模型的并发框架,由 Typesafe 开发,旨在帮助开发者构建高度...
##### 3.2 Typed Actors 介绍如何使用类型安全的方式来定义 Actor。 ##### 3.3 容错性 讨论 Akka 如何处理 Actor 的失败情况,包括重启、终止等策略。 ##### 3.4 Dispatchers Dispatchers 控制 Actor 的执行线程,...
Typed Actors是Akka中的一种高级用法,它允许开发者使用类型安全的方式来定义Actor的行为。这种方式可以减少出错的可能性,并提高代码的可读性。 **3.3 Fault Tolerance** 容错性是Akka的重要特性之一,这部分介绍...
总的来说,通过使用Akka Cluster、Akka Persistence、Akka Typed Actors以及Cluster Sharding,开发人员可以构建出能够在大规模分布式环境中高效运行、容错性强的应用程序。同时,结合现代云原生技术如Docker和...
**4.2 Typed Actors (Scala)** 除了传统 Actor 外,Akka 还支持类型化的 Actor。类型化 Actor 可以通过静态类型检查来提高代码的安全性和可读性。 **4.3 Logging (Scala)** 日志记录是调试和监控应用程序的重要...
* Typed Actors:Typed Actors 是 Akka 中的一种actor 类型,提供了类型安全的actor。 * Untyped Actors:Untyped Actors 是 Akka 中的一种actor 类型,不提供类型安全的actor。 7. Akka 的消息传递 Akka 的消息...
**Typed Actors** 是一种更高级的Actor形式,它们使用类型安全的方式进行消息处理,提高了代码的可读性和安全性。 ##### 4.3 Logging (Scala) Akka提供了内置的日志记录功能,可以帮助开发者轻松记录应用程序的...
8. **Typed Actors**: Akka v2.9.1可能包含了对类型安全的Actor的支持,这使得代码更加健壮,因为类型检查在编译时就能完成,减少了运行时错误的可能性。 9. **Akka Streams**: Akka不仅限于Actor模型,还提供了...
##### 1.3 如何开始使用 Akka? - **安装环境**:确保安装了 Scala 和 sbt(Scala 构建工具)。 - **创建项目**:使用 sbt 创建一个新的 Scala 项目。 - **引入依赖**:在项目中添加 Akka 相关库的依赖。 - **编写...