`
liyonghui160com
  • 浏览: 777229 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

strom DRPC服务调用例子

阅读更多

 


strom DRPC服务调用例子


DRPC服务端代码

import java.util.Map;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
@SuppressWarnings("deprecation")
public class My_storm_drpc {
    public static class storm_drpc_Bolt implements IBasicBolt {

        private static final long serialVersionUID = 3812791870691350630L;
 
        StringBuilder Str = new StringBuilder();
 
        public void prepare(Map conf, TopologyContext context) {
        }
 
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            Str.append("@");
            String input = tuple.getString(1);
            collector
                    .emit(new Values(tuple.getValue(0), input + Str.toString()));
        }
 
        public void cleanup() {
        }
 
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
 
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
 
    }
 
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {
 
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(
                "My_storm_drpc");
        builder.addBolt(new storm_drpc_Bolt(), 3).allGrouping();
        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(4);
        StormSubmitter.submitTopology(args[0], conf,
                builder.createRemoteTopology());
    }
}

 


把代码打包到了一个mystorm.jar的包里
storm测试环境集群只有两台
mynode001 部署了nimbus、zookeeper 以及 drpc server
mynode002 部署了supervisor
storm版本是twitter storm 0.9.0.1最新版

 


把mystorm.jar copy到mynode001上,

执行以下命令:storm jar /data/storm/storm-yarn-master/lib/mystorm.jar My_storm_drpc My_storm_drpc -c nimbus.host=mynode001


DRPC客户端调用

import backtype.storm.utils.DRPCClient;
 
public class My_storm_drpc_client {
    public static void main(String[] args) throws Exception {

        DRPCClient client = null;
        client = new DRPCClient("mynode001", 3772);
        System.out.println("开始执行DRPC客户端调用");
        for (int i = 0; i < 10; i++) {
            String tt = client.execute("My_storm_drpc", "你好");
            System.out.println("tt = " + tt);
        }
    }
}

 


客户机执行以下命令:storm jar /data/storm/storm-yarn-master/lib/mystorm.jar My_storm_drpc_client -c nimbus.host=mynode001

 

 

 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

 

storm_drpc_Bolt类中的execute方法中参数tuple,第一个field是request-id,第二个field是这个请求的参数。LinearDRPCTopologyBuilder同时要求我们topology的最后一个bolt发射一个二维tuple: 第一个field是request-id, 第二个field是这个函数的结果。最后所有中间tuple的第一个field必须是request-id,和客户端的

request-id匹配,匹配上就传回这个客户端结果。

 

详细介绍看徐明明介绍:http://xumingming.sinaapp.com/756/twitter-storm-drpc/

 

 

分享到:
评论

相关推荐

    storm DRPC简单例程

    综上所述,"storm DRPC简单例程"是一个关于如何在Apache Storm集群上搭建和使用DRPC服务的教程,它涵盖了实时计算、分布式调用和集群部署等多个重要概念,对于理解和实践Storm的实时处理能力具有很高的价值。...

    storm之drpc操作demo示例.zip

    DRPC(Distributed Remote Procedure Call)是Storm中的一个特性,它允许我们执行分布式远程过程调用,使得实时计算变得更加灵活和强大。本示例将通过一个具体的DRPC操作Demo来深入理解这一功能。 首先,DRPC的基本...

    Storm的drpc应用

    这是storm中drpc应用的一个例子。

    storm-drpc-node:适用于Node.js的Apache Storm DRPC客户端

    Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...

    大数据平台Storm入门到精通

    01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解

    Storm入门教程 之Storm原理和概念详解

    2、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,简单易懂; 3、分享积累的经验和技巧,从架构的角度剖析场景和设计实现...

    02、Storm入门到精通storm3-0.pptx

    - **Storm DRPC**:DRPC允许用户在Storm拓扑中直接执行远程过程调用,提供实时计算服务。 - **Executor、Worker、Task之间的关系**:Executor是线程池,负责执行Bolt或Spout的实例。Worker是JVM进程,可以包含多个...

    Storm配置项详解

    - **`drpc.port`**:设置Storm DRPC的服务端口,DRPC(Distributed RPC)允许外部系统调用Storm中定义的函数。 #### Supervisor 配置 - **`supervisor.slots.ports`**:定义supervisor上能够运行workers的端口列表...

    storm深入学习.pdf

    Storm DRPC是一个强大的特性,它支持远程直接调用,允许客户端发送请求到Storm集群,然后由Storm处理请求并返回结果。DRPC使得Storm能够执行复杂的分布式计算任务,同时保持低延迟和高吞吐量。 在理解了Storm的基本...

    dRPC

    1. **易用性**:dRPC通过简洁的API设计,使得在服务之间进行RPC调用变得直观且易于理解。它支持异步和同步调用模式,适应不同的应用场景。 2. **高性能**:dRPC利用高效的序列化和反序列化机制,如protobuf或...

    基于Storm流计算天猫双十一作战室项目实战

    - **DRPC**:讨论Distributed Remote Procedure Call(分布式远程过程调用)的实现方法和应用场景,为复杂系统的构建提供新的思路。 - **Storm Trident**:通过实际案例讲解Storm Trident的核心特性和优势,帮助学习...

    Storm配置项详解.docx

    20. **drpc.port**:Storm的DRPC服务端口。 21. **supervisor.slots.ports**:supervisor上可用于运行workers的端口列表,每个worker占用一个端口。 22. **supervisor.childopts**:配置supervisor守护进程的JVM...

    Storm实战:构建大数据实时计算

    第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对Storm有了直观的认识;第3章深入讲解了Storm的基本概念,同时实现一个Topology运行;第4章和第5章阐述了Storm的并发度、可靠处理的...

    实时计算Storm核心技术及其在报文系统中的应用.pdf

    DRPC允许进行分布式远程过程调用,使得在Storm集群中可以实时执行函数式计算。 4. 基于Storm的报文系统初探 4.1 报文系统需求分析 报文系统需要处理高并发、业务隔离等需求,同时确保低延迟和高可用性。 4.2 ...

    Getting Started with Storm

    **DRPC**(Distributed RPC):一种特殊的 Spout,支持分布式远程过程调用,使得客户端可以直接向 Storm 集群发送请求,并获得响应。 #### 第五章 Bolts **Bolt 生命周期**: - **初始化**:在 Bolt 创建时调用。 ...

    Storm深入学习.pdf

    - **DRPC(Distributed RPC)**:分布式远程过程调用,允许多个客户端向集群发送请求并获取结果,支持实时计算服务。 - **Executor、Worker 和 Task**:Executor 是在 Worker 进程中执行的线程,负责处理 Tuple。...

    Storm配置详解

    17. drpc.servers和drpc.ports:DRPC(分布式远程过程调用)服务器列表和服务端口。 18. supervisor.slots.ports:Supervisor上能运行workers的端口列表。每个worker占用一个端口,并且每个端口只运行一个worker。...

Global site tag (gtag.js) - Google Analytics