DRPC是建立在Storm基本概念(Topology、Spout、Bolt、Stream等)之上的高层抽象,个人理解它的目标是在Storm 集群之上提供一种分布式的RPC框架,以便能够利用Storm快速的实现RPC请求的分布式计算过程,即发起一次RPC请求,多个worker计算节点参与计算,最后汇总后将计算结果返回给客户端。
DRPC实现框架
Storm中使用Thrift作为其RPC框架,同样地,DRPC的实现也是构建在Thrift协议之上,相关的源码文件如下:
1. storm-core/src/storm.thrift,定义了Storm中实现的Thrift协议,其中有两个service是与DRPC相关的:DistributedRPC和DistributedRPCInvocations,它们的接口定义如下:
- DistributedRPC.Iface:定义了execute方法,用于客户端发起RPC请求;
- DistributedRPCInvocations.Iface:定义了fetchRequest、failRequest、result方法,分别用于获取RPC请求、将RPC请求标记为失败、返回RPC请求的处理结果。
2. storm-core/src/clj/backtype/storm/daemon/drpc.clj,实现了DRPC的Thrift服务端(即DRPC Server),使用Clojure语言实现。
3. storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java 和storm-core/src/jvm/backtype/storm/utils/DRPCClient.java,作为RPC客户端,实现了 DistributedRPC.Iface接口,用于客户端向DRPC Server发起RPC请求。
4. storm-core/src/jvm/backtype/storm/generated /DistributedRPCInvocations.java和storm-core/src/jvm/backtype/storm/drpc /DRPCInvocationsClient.java,作为RPC客户端,实现了DistributedRPCInvocations.Iface接 口,用于DRPC Topology触发执行DRPC Request并返回结果给DRPC Server。
下图所示是Storm DRPC的整体实现情况。从中可以看出,对于DRPC Server来说,DRPC Client和DRPC Topology都是Thrift的客户端,只是分别调用了不同的Thrift服务而已。
1. 首先,前提是集群上已经运行了DRPC Topology,每个DRPC服务注册了一个RPC方法,包含方法名称和参数形式(上图中假设Topology已经启动运行);
2. 接下来是处理流程,客户端通过DRPCClient调用execute方法,发起一次RPC调用给DRPC Server,目前受限的是只支持一个String类型的DRPC方法调用参数,社区中正在讨论对此进行扩展;
3. 然后,DRPC Server中有一个handler-server pool,用于接收RPC请求,并为每个请求生成唯一的request id,生成一条DRPC Request记录,并放到request queue中等待被消费(计算);
4. 最后,DRPC Topology中的相关模块(DRPC Spout、ReturnResults Bolt,后面会介绍)通过invoke-server pool从request queue中取出该方法的RPC请求,并将处理结果(成功/失败)返回给DRPC Server,直到最终返回给阻塞着的DRPC Client。
DRPC拓扑实现细节
下面以storm-starter项目中的ReachTopology为例,详细介绍Storm中通过LinearDRPCTopologyBuilder构造的DRPC Topology的数据流关系:
其中,DRPC Topology由1个DRPCSpout、1个Prepare-Request Bolt、若干个User Bolts(即用户通过LinearDRPCTopologyBuilder添加的Bolts)、1个JoinResult Bolt和1个ReturnResults Bolt组成。除了User Bolts以外,其他的都是由LinearDRPCTopologyBuilder内置添加到Topology中的。接下来,我们从数据流的流动关系来 看,这些Spout和Bolts是如何工作的:
1. DRPCSpout中维护了若干个DRPCInvocationsClient,通过fetchRequest方法从DRPC Server读取需要提交到Topology中计算的RPC请求,然后发射一条数据流给Prepare-Request Bolt:<”args”, ‘”return-info”>,其中args表示RPC请求的参数,而return-info中则包含了发起这次RPC请求的RPC Server信息(host、port、request id),用于后续在ReturnResults Bolt中返回计算结果时使用。
2. Prepare-Request Bolt接收到数据流后,会新生成三条数据流:
- <”request”, ”args”>:发给用户定义的User Bolts,提取args后进行DRPC的实际计算过程;
- <”request”, ”return-info”>:发给JoinResult Bolt,用于和User Bolts的计算结果做join以后将结果返回给客户端;
- <”request”>:在用户自定义Bolts实现了FinishedCallback接口的情况下,作为ID流发给用户定义的最后一级Bolt,用于判断batch是否处理完成。
3. User Bolts按照用户定义的计算逻辑,以及RPC调用的参数args,进行业务计算,并最终输出一条数据流给JoinResult Bolt:<”request”, ”result”>。
4. JoinResult Bolt将上游发来的<”request”, ”return-info”>和<”request”, ”result”>两条数据流做join,然后输出一条新的数据流给ReturnResults Bolt: <”result”, ”return-info”>。
5. ReturnResults Bolt接收到数据流后,从return-info中提取出host、port、request id,根据host和port生成DRPCInvocationsClient对象,并调用result方法将request id及result返回给DRPC Server,如果result方法调用成功,则对tuple进行ack,否则对tuple进行fail,并最终在DRPCSpout中检测到tuple 失败后,调用failRequest方法通知DRPC Server该RPC请求执行失败。
DRPC整体工作流程
最后,将以上两个小节的内容合并起来就可以看到Storm中DRPC的整体工作流程了,如下图所示:
参考资料链接
1. https://github.com/apache/incubator-storm
2. https://github.com/nathanmarz/storm-starter
相关推荐
7. **stormdemo**:这个文件可能是storm项目的示例代码,包含了一个使用DRPC的简单示例,可能包括了拓扑定义、DRPC服务端实现、客户端调用的代码示例。 综上所述,"storm DRPC简单例程"是一个关于如何在Apache ...
Storm可以实现TOPN功能,即找出数据流中的前N个最大或最小元素,这在实时数据分析中非常常见,如发现热门话题。 5. **Storm 流程聚合** 流程聚合是指在数据流处理过程中,通过多个步骤和多个Bolts来实现复杂转换...
- **案例1:Storm + Kafka 数据流处理**:利用Storm和Kafka构建实时数据处理管道,实现从数据采集到清洗、过滤直至最终分析结果展示的全过程。 - **案例2:HighCharts实现实时数据可视化**:结合HighCharts等前端...
Storm通过Ack机制确保每个数据元被正确处理,即使在网络故障或节点失效的情况下,也能保证数据不丢失。 3. Storm中的高层机制 3.1 事务处理 Storm支持事务处理,保证数据处理的原子性和一致性,尤其适用于金融等...
- **DRPC(Distributed Remote Procedure Call)**:支持分布式远程过程调用,允许实时查询和响应,增强了Storm的交互性。 4. **基于Storm的报文系统** - **报文系统需求分析**:报文系统通常需要处理高并发、低...
**DRPC**(Distributed RPC):一种特殊的 Spout,支持分布式远程过程调用,使得客户端可以直接向 Storm 集群发送请求,并获得响应。 #### 第五章 Bolts **Bolt 生命周期**: - **初始化**:在 Bolt 创建时调用。 ...
- **Storm DRPC(分布式远程调用)介绍**:介绍Storm DRPC的功能及其实现原理。 - **Storm DRPC实战讲解**:通过具体案例演示如何使用Storm DRPC。 - **Storm和Hadoop2.x的整合**:指导如何将Storm与Hadoop2.x进行...