1.概述
Akka这样一个scala世界里的明星,给我们提供了各种各样吸引人的功能和特性,尤其在分布式、高并发领域。但就像任何其他优秀的框架,它的实现也必然会有其复杂性,在Roland Kuhn(Akka Tech Lead)的带领下,Akka的实现原理吸收了各个领域内成熟、领先的理论。尤其是Akka里cluster的实现,更是体现了非常多的优秀理论和实战经验。为了更方便大家在实际使用中碰到问题的时候,可以试着进行分析和解决。从今天开始,我将从以下几个方面试着说明cluster的实现(基于2.3.1版本):
#集群的启动
#一个普通节点的一生
2.集群启动
要使用一个cluster首先要启动它,所以我们先从启动这个步骤的实现开始进行分析。
Akka集群的启动首先就是要启动一种叫做种子节点(SeedNode)的节点们。只有种子节点启动成功,其他节点才能选择任意一个种子节点加入集群。
种子节点默认可配置多个,它们之间没有任何区别,种子节点的启动分以下几种情况:
1.某种子节点启动,它首先判断自己的ip是否在种子节点配置列表中,如果在并且是第一个,则它在一个规定时间内(默认是5秒),向其他种子节点发送‘InitJoin’消息,如果有确认消息返回,则加入第一个返回确认的种子节点所在的cluster中,否则,它自己将创建一个新的cluster。(这些任务由FirstSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)
2.某种子节点启动,它首先判断自己的ip是否在种子节点配置中,但不是第一个,则它向其他种子节点发送消息,如果在一个规定时间内(默认是5秒)没有收到任何确认消息,则它将不断重试,直到有一个种子节点返回正确的确认消息,然后就加入这个种子节点所在的cluster中。(这里注意以下,它不会自己创建一个新cluster)。(这些任务由JoinSeedNodeProcess这个Actor完成,任务完成后它就销毁自己)
从上面的分析,我们可以得出下面的一些结论:
#.一个集群第一次启动成功,那一定是种子节点配置列表中排在第一位的节点,由它来创建出集群。但是随着时间的推移,排在第一的种子节点有可能重启了,那这个时候,它将首选加入到其他种子节点去。
#一个种子节点可以加入任何一个其他节点,不用非得都加到排第一位的节点上。
下面我们举例说明,有种子节点1、2、3:
* 1. seed2启动, 但是没有收到seed1 或seed3的确认。
* 2. seed3启动,没有收到seed1 的确认消息(seed2处在’inactive’状态)。
* 3. seed1 启动,创建cluster并加入到自己中。
* 4. seed2 重试加入过程,收到seed1的确认, 加入到seed1。
* 5. seed3重试加入过程,先收到seed2的确认, 加入到seed2。
为了更好说明源码实现,我们先对节点的状态做一个介绍,下面是节点状态变迁图:
下面具体说明状态的变迁:
#节点初始状态是joining (对应源码new Member(uniqueAddress, Int.MaxValue, Joining, roles))
#通过gossip该信息被传递到所有节点上。如果gossip能够收敛(convergence ,后面会详细介绍gossip的实现),那么该节点的状态变成up。
#用户可以人工调用leave、down命令,让节点状态发生改变。
#由于Akka提供基于心跳的故障检测模块fd*(failure detector),所以当故障检测模块发现某个节点offline,则会把该节点置为unreachable*(它不是一种节点状态),当该节点online后,则自动恢复到当前状态(如图所示:unreachable*和其他节点状态之间是用虚线表示,说明其并没有发生状态的变迁)。
#从图中可以看出节点的最终状态都是removed。
#从上图还可以看出一个节点如果状态变更路径是joining-àup-àleaving-àexiting-àremoved,则它要经过4次gossip收敛(图中的leader action,图中漏了一个把removed传递给其他节点的gossip收敛)
3.Gossip协议的实现
为了更好的分析Akka的gossip协议实现, 我将从3方面来进行说明:算法说明、实例演示、源码分析。
3.1 算法说明
由于Akka采用无中性化的集群设计,在这种架构下为了能更好的传递彼此间的消息,Akka使用了Gossip协议。
为了下面更好的理解Gossip执行过程,我们先介绍几个概念:
#Vector Clock(矢量钟)
2. 在每个节点的矢量钟实例里,包含了它能接触到的所有节点对应的计数器。如有可相互通讯的A、B、C3个节点,则A节点的矢量钟也包含了B、C节点的计数器。
3.如果在更新矢量钟某个节点的值时,以max(计数器原值、新值)作为当前值。
它主要用来检测在分布式环境下,是否存在并发的更新(冲突)。如果没有冲突则说明它们满足causality,如果有冲突,解决的方式也较简单:max。
#发起gossip的节点叫gossiper,接收者叫做recipient。
#节点间gossip协议采用请求/应答模式。
#Akka的gossip协议发送的具体内容。
case class Gossip(
members: immutable.SortedSet[Member],
seen: Set[UniqueAddress],
reachability: Reachability,
)
*members:存放该节点知道的所有其他节点。
*seen:已经收到本次gossip的节点们,每个节点当接收到一个新的gossip时,会把自己放到seen这个队列中,作为响应返回给发送者。
*reachability:这个队列由心跳模块来维护,用来判断节点们是否存活。
*version:矢量钟。
3.1.1一个节点加入集群的消息交互
下面我们介绍一下一个节点加入集群的消息交互和状态变化过程,如下图所示:
节点NodeA随机选择SeedNode1作为入口加入到集群:
#它首先处于uninitialized状态(这里的状态并不是具体存在的,只是为了方便算法说明),发送InitJoin消息给SeedNode1;
# SeedNode1接收该消息,返回Ack进行确认;
#NodeA发送Join消息给SeedNode1,SeedNode1接收该消息后,会调用ClusterCoreDaemon实例的joining方法,它会调用另外一个非常重要的方法:updateLatestGossip。updateLatestGossip的作用有两个:1.更新当前gossip的矢量钟;2.清空当前gossip的seen队列,然后把自己加在里面。(后续发起gossip交互时,会优先选择那些没在seen队列中的成员)
#NodeA接收Welcome消息后,用SeedNode1的gossip作为自己当前的gossip,并且把自己加到seen队列中,然后把当前gossip再返回给SeedNode1。到此NodeA就完成了全部加入过程。
3.1.2 Gossip的执行过程
Gossip的执行过程包含这几步:
1、 某节点定时发起一次gossip:
选择gossip接收者的算法具体如下(代码在ClusterCoreDaemon类的gossip方法里):
首先选择那些没有在当前gossip的seen队列中的members,如果存在这样的members则从中随机选择一个节点,向它发起一次gossip交互。如果所有成员都已经在
seen队列中了,则随机选择一个节点向它仅仅发送当前gossip的版本信息(矢量钟),这个是对gossip协议的一个优化,因为在大部分的时候,节点间发送的都是彼此的矢量钟。
2、每个节点都会定时判断本次gossip是否结束(收敛)了(判断收敛的标志是:所有节点都看到了当前的gossip)。
3、节点发现某次gossip结束了,则判断自己是否是‘leader‘,是的话则执行相关的leader动作。
3.2 实例演示
下面我们用一个简单的场景来分析gossip协议的具体交互过程,为了去掉不必要的干扰把gossip-interval这个参数改成30秒,默认是1秒。场景说明:
firstSeedNode(在application.conf配置文件seed-nodes里排在第一)先启动,然后再启动SeedNode2,为了方便说明把firstSeedNode简写成S1,SeedNode2简写成S2。
下面是gossip交互示意图:
上图中的T0、T1、…这些都是表示时间轴,具体说明如下:
#T0时刻:S1启动,这时候的gossip为空,如(members = [], overview = GossipOverview(reachability = [], seen = []), version = VectorClock())。
#T1-T2:这是S1把自己加到cluster中去,具体可以参看‘一个节点加入集群的消息交互’章节。T2时刻结束的时候,版本为2。
#T3:S2启动后S1收到Join消息,经过一系列操作,S1的gossip版本变成3、成员新增一个Joining状态的S2。这个gossip会通过Welcome消息发给S2,版本是3了。
#T3_1:S2收到Welcome中的gossip,用它来初始化自己的gossip。
(这里重点说明一下后续的T4和T5其实它们没有必然的时间前后关系,这只是我测试时发生的,如果大家在测试的时候发现和我的顺序不一致,这没有问题的,因为原理是一样的)
#T4:Leader选举后,S1还是Leader,并且把S2的状态变成Up,版本是4。
#T5:S2在T3_1处理完后,紧接着把自己的gossip再发给S1,版本是3。
#T5_1:S1收到S2的gossip,和自己的进行比较,由于自己版本是4比3大,所以把自己的gossip再发给S2。
#T5_2:S2收到S1版本4的gossip,比自己的版本高,于是更新自己的gossip,同时发现S1的gossip的seen队列里并不包含自己,于是它又把自己更新过后gossip(版本是4)发给S1。
#T5_3:S1发现收到的gossip和自己版本一样,于是只是合并了一下彼此的seen队列。
到现在S1和S2的gossip完全相同了。上面虽然只是2个节点间的gossip交互,但其原理可以适用于任意多个节点。
3.3 源码分析
Akka在Cluster上的实现还算比较清晰,2.3.1版本只有15个主要直接相关的scala类,而文件大小最大的是ClusterDaemon.scala,它也是整个实现的核心类,这里我们重点对它进行一下分析。
这个文件中最重要的类是ClusterCoreDaemon,它的主要功能包括下面几个方面:
#节点状态管理:主要是对InitJoin、JoinTo、Join等消息的处理,这个细节可以参看前面章节的描述。
#gossip协议的管理:
1)发送gossip:Akka采用定时(通过gossip-interval参数配置的,默认1秒)的方式来发起新一轮的Gossip协议,这样做的好处是能有效的减少gossip的交互次数(1秒内的多个成员的状态变化,通过一轮gossip就完成了),Akka里每一轮gossip都是由这个定时器触发的,对应源码gossipTick方法。
2)选择哪个节点发起新一次gossip交互:
Akka采用有偏好的随机选择算法,它首先会选择当前gossip结构体中不在seen队列里的members队列中的成员,如果有这样的成员则随机选择一个进行gossip交互。如果没有这样的成员,则随机选择members队列中的一个成员进行gossip交互。对应源码gossip方法里。
3)接收gossip:首先拿remote gossip(对端)和本地gossip进行版本(矢量钟)比较,对应源码VectorClock类的compareOnlyTo方法。比较的结果有3种:
# Same:相同,则进行seen队列合并就可以了。
# Before:本地新,则向对端发送最新的gossip,本地不变。
# After:对端新,则更新本地gossip。如果remote gossip里的seen队列里没有包含该本地ip,则发送最新的gossip给对端,以减少一次它们两个间的gossip交互。
对应源码receiveGossip方法里。
#Leader的管理:
Akka采用定时(通过leader-actions-interval参数配置的,默认1秒)的方式来判断Leader的产生。这个算法具体又包括几个小部分:
1) 判断本节点是否是Leader:
Akka的Leader’选举’其实比较简单,就是判断当前 members队列(有序队列)里面排第一位置的节点就是当前整个集群的Leader。Leader不一定非得是SeedNode,普通节点也可以。具体member的排序算法在Member类的addressOrdering。举例说明:
如我们有2个seedNode(s1:10.10.10.101:2551,s2:10.10.10.102:2552),1个普通节点(n1:10.10.10.100:20000),当n1加入集群后,n1就成为Leader了,因为它的ip地址最小。
对应源码Gossip类的leaderOf方法。
2)本轮gossip是否收敛:
首先判断所有被故障检测模块fd检测出有问题的节点,它们的状态应该是Down, Exiting。如果不是的话,则等待人工设置问题节点的状态或由Leader自动执行auto-down。
当上面这个条件满足后,再判断是否所有成员状态是Up, Leaving,都在seen队列里,如果这个条件满足的话,则认为本轮gossip收敛。
对应源码Gossip类的convergence方法里。
3)执行Leader的职责:
它重点关注两类成员:changedMembers和removedUnreachable;
changedMembers是指那些有状态变化的成员,如JOINING ---> UP、LEAVING-àEXITING。而removedUnreachable是指被故障检测模块fd检测出有问题的节点,如果它们的状态是Down, Exiting中的一种。只要有满足上述任何一类的成员,Leader就会执行一系列相关操作,这些操作比较直观,请参看源码leaderActionsOnConvergence方法。
相关推荐
**Akka Cluster 简介** Akka 是一个用 Scala 编写的高性能、容错的actor模型库,它为构建可扩展的、反应式的、分布式系统提供了强大的工具。Akka Cluster 是 Akka 库的一部分,它允许应用程序节点通过网络进行通信...
在 Akka 中,"Cluster Singleton" 是一种设计模式,用于确保集群中的某个特定角色仅有一个实例存在。这个特性对于实现全局状态、领导选举或者分布式协调等场景非常有用。下面我们将深入探讨 Akka Cluster 和 Cluster...
Akka集群Kubernetes 该项目是使用Akka Cluster和Kubernetes实现弹性(在的意义上)的工作示例。 弹性是系统根据当前需求向上和向下扩展其资源的能力。 我们只想使用正确的数量:仅此而已。 我们将自定义资源指标与...
8. **Cluster**:Akka集群可以扩展到多台机器,实现分布式计算。Actor可以在集群中的任何节点上运行,系统可以自动处理节点间的通信和故障转移。 9. **Remote Deployment**:Akka支持远程部署,使得Actor可以跨网络...
《构建分布式系统:Akka Typed、Akka Cluster Sharding与Cassandra融合实践》 在现代软件开发中,构建可扩展、容错且高度并行的系统成为了一个重要的议题。Akka,一个基于Scala的框架,提供了强大的工具来解决这些...
基于scala 、akka实现了一个简单的报表工具。该项目是个玩具项目,用于个人学习scala和akka。...失败可异地恢复 (akka-cluster)。使用kryo序列化消息。流控 Back-Pressure, 避免OutOfMemory(akka-stream).
此外,还可以结合Prometheus和Grafana等开源监控工具,对Akka应用进行深度性能分析和警报设置。 至于POC(Proof of Concept),通常是在实际项目实施之前,用来验证技术可行性的小规模实验。在Akka分布式状态管理的...
Akka Cluster ...极小的设置... 运行种子节点: sbt run # default management.port = 19999 运行第二个节点: sbt -Dclustering.port=2553 -Dmanagement.port=20000 run 运行第三个节点: sbt -Dclustering....
本文将深入探讨`akka-cluster-router-example`,通过实例来解析Akka集群路由器的工作原理和使用方法。 首先,我们需要理解Akka集群的基本概念。Akka集群是Akka系统的一个分布式特性,它允许不同的Actor系统通过网络...
Akka群集自定义下降重要告示该项目已弃用,因为其所有权已转移到以下存储库: 从现在开始,将在sisioh/akka-cluster-custom-downing收到捐款介绍Akka群集具有akka.cluster.auto-down-unreachable-after配置属性。...
kubectl create -f k8s/simple-akka-cluster-rbac.yml # create deployment kubectl create -f k8s/simple-akka-cluster-deployment.yml # create service kubectl create -f k8s/simple-akka-cluster-service.yml ...
Akka类型的Java集群分片示例这是一个Akka Cluster项目,其中包括使用 , , , ,cluster仪表板和cluster sharding查看器的示例。 该项目是一系列项目中的一个,该项目以一个简单的Akka Cluster项目开始,并逐步构建...
akka-cluster-example-inloop 简单的 akka 集群示例。 跑步: 安装 cassandra 并启动它。 sbt clean 编译 xitrum-package cd 目标/xitrum/bin ./start.sh 种子1 ./start.sh 种子2 ./start.sh stat1 ./start....
java多人聊天室源码Akka Java 集群感知示例 介绍 这是一个 Java、Maven、Akka 项目,演示了如何设置一个基本的关注集群感知actor。 该项目是一系列项目中的一个,从一个简单的 Akka Cluster 项目开始,逐步构建事件...
该项目是一系列项目中的一个,该项目以一个简单的Akka Cluster项目开始,并逐步构建为事件源和命令查询责任分离的示例。 项目系列由以下项目组成: (此项目) 每个项目都可以独立于其他项目进行克隆,构建和运行。...
《akkajava源码解析:探索并行与并发编程的艺术》 在当今的高性能计算环境中,理解和掌握并行与并发编程技术是至关重要的。Akka,一个基于Actor模型的开源框架,为Java开发者提供了强大的工具来实现高效、容错的...
`zio-akka-cluster`是一个针对Akka Cluster的ZIO库,它将Akka的集群功能与ZIO的类型安全、纯函数式编程模型相结合,为开发者提供了在分布式环境中运行应用程序的强大工具。这个库的目标是帮助开发者利用ZIO的特性,...
AkkaDynoDB(React式存储服务) 使用 Akka Cluster 构建的类似 Dynamo 的分布式数据库介绍AkkaDynoDB是一种React式存储服务,其灵感来自 Amazon dynamo 分布式数据库,该数据库是高度可用、可扩展和有弹性的数据库 ...
akka-docker-cluster-example, 支持 Docker 支持的akka集群项目示例 akka-docker-cluster-example支持 Docker 支持的akka集群项目示例。 请参见博客文章 。 使用 SBT本机打包程序。:如何运行在SBT中,运行 docker:...
6. **源码分析** - **Example代码**: 压缩包中的源码提供了各种使用场景,如简单的生产消费、分组聚合、水印窗口等,通过对这些示例的学习,可以更深入地理解Akka Streams与Kafka的集成。 总结,"akka-streams-...