strom DRPC服务调用例子
DRPC服务端代码
import java.util.Map; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; @SuppressWarnings("deprecation") public class My_storm_drpc { public static class storm_drpc_Bolt implements IBasicBolt { private static final long serialVersionUID = 3812791870691350630L; StringBuilder Str = new StringBuilder(); public void prepare(Map conf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { Str.append("@"); String input = tuple.getString(1); collector .emit(new Values(tuple.getValue(0), input + Str.toString())); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder( "My_storm_drpc"); builder.addBolt(new storm_drpc_Bolt(), 3).allGrouping(); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(4); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
把代码打包到了一个mystorm.jar的包里
storm测试环境集群只有两台
mynode001 部署了nimbus、zookeeper 以及 drpc server
mynode002 部署了supervisor
storm版本是twitter storm 0.9.0.1最新版
把mystorm.jar copy到mynode001上,
执行以下命令:storm jar /data/storm/storm-yarn-master/lib/mystorm.jar My_storm_drpc My_storm_drpc -c nimbus.host=mynode001
DRPC客户端调用
import backtype.storm.utils.DRPCClient; public class My_storm_drpc_client { public static void main(String[] args) throws Exception { DRPCClient client = null; client = new DRPCClient("mynode001", 3772); System.out.println("开始执行DRPC客户端调用"); for (int i = 0; i < 10; i++) { String tt = client.execute("My_storm_drpc", "你好"); System.out.println("tt = " + tt); } } }
客户机执行以下命令:storm jar /data/storm/storm-yarn-master/lib/mystorm.jar My_storm_drpc_client -c nimbus.host=mynode001
客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
storm_drpc_Bolt类中的execute方法中参数tuple,第一个field是request-id,第二个field是这个请求的参数。LinearDRPCTopologyBuilder同时要求我们topology的最后一个bolt发射一个二维tuple: 第一个field是request-id, 第二个field是这个函数的结果。最后所有中间tuple的第一个field必须是request-id,和客户端的
request-id匹配,匹配上就传回这个客户端结果。
详细介绍看徐明明介绍:http://xumingming.sinaapp.com/756/twitter-storm-drpc/
相关推荐
综上所述,"storm DRPC简单例程"是一个关于如何在Apache Storm集群上搭建和使用DRPC服务的教程,它涵盖了实时计算、分布式调用和集群部署等多个重要概念,对于理解和实践Storm的实时处理能力具有很高的价值。...
DRPC(Distributed Remote Procedure Call)是Storm中的一个特性,它允许我们执行分布式远程过程调用,使得实时计算变得更加灵活和强大。本示例将通过一个具体的DRPC操作Demo来深入理解这一功能。 首先,DRPC的基本...
这是storm中drpc应用的一个例子。
Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...
01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解
2、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,简单易懂; 3、分享积累的经验和技巧,从架构的角度剖析场景和设计实现...
- **Storm DRPC**:DRPC允许用户在Storm拓扑中直接执行远程过程调用,提供实时计算服务。 - **Executor、Worker、Task之间的关系**:Executor是线程池,负责执行Bolt或Spout的实例。Worker是JVM进程,可以包含多个...
- **`drpc.port`**:设置Storm DRPC的服务端口,DRPC(Distributed RPC)允许外部系统调用Storm中定义的函数。 #### Supervisor 配置 - **`supervisor.slots.ports`**:定义supervisor上能够运行workers的端口列表...
Storm DRPC是一个强大的特性,它支持远程直接调用,允许客户端发送请求到Storm集群,然后由Storm处理请求并返回结果。DRPC使得Storm能够执行复杂的分布式计算任务,同时保持低延迟和高吞吐量。 在理解了Storm的基本...
1. **易用性**:dRPC通过简洁的API设计,使得在服务之间进行RPC调用变得直观且易于理解。它支持异步和同步调用模式,适应不同的应用场景。 2. **高性能**:dRPC利用高效的序列化和反序列化机制,如protobuf或...
- **DRPC**:讨论Distributed Remote Procedure Call(分布式远程过程调用)的实现方法和应用场景,为复杂系统的构建提供新的思路。 - **Storm Trident**:通过实际案例讲解Storm Trident的核心特性和优势,帮助学习...
20. **drpc.port**:Storm的DRPC服务端口。 21. **supervisor.slots.ports**:supervisor上可用于运行workers的端口列表,每个worker占用一个端口。 22. **supervisor.childopts**:配置supervisor守护进程的JVM...
第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对Storm有了直观的认识;第3章深入讲解了Storm的基本概念,同时实现一个Topology运行;第4章和第5章阐述了Storm的并发度、可靠处理的...
DRPC允许进行分布式远程过程调用,使得在Storm集群中可以实时执行函数式计算。 4. 基于Storm的报文系统初探 4.1 报文系统需求分析 报文系统需要处理高并发、业务隔离等需求,同时确保低延迟和高可用性。 4.2 ...
**DRPC**(Distributed RPC):一种特殊的 Spout,支持分布式远程过程调用,使得客户端可以直接向 Storm 集群发送请求,并获得响应。 #### 第五章 Bolts **Bolt 生命周期**: - **初始化**:在 Bolt 创建时调用。 ...
- **DRPC(Distributed RPC)**:分布式远程过程调用,允许多个客户端向集群发送请求并获取结果,支持实时计算服务。 - **Executor、Worker 和 Task**:Executor 是在 Worker 进程中执行的线程,负责处理 Tuple。...
17. drpc.servers和drpc.ports:DRPC(分布式远程过程调用)服务器列表和服务端口。 18. supervisor.slots.ports:Supervisor上能运行workers的端口列表。每个worker占用一个端口,并且每个端口只运行一个worker。...