`
liyonghui160com
  • 浏览: 771832 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

自己创建DRPC服务

阅读更多

 



先讲下DRPC的原理:

    客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结 果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

    知道这个原理之后我们可以自己创建DRPC服务。

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class ManualDRPC {
  public static class ExclamationBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("result", "return-info"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String arg = tuple.getString(0);
      Object retInfo = tuple.getValue(1);
      collector.emit(new Values(arg + "!!!", retInfo));
    }

  }
   
 
  public static void main(String[] args) {
    //自定义DRPC
    TopologyBuilder builder = new TopologyBuilder();
    LocalDRPC drpc = new LocalDRPC();

    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
    builder.setSpout("drpc", spout);
    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");

    LocalCluster cluster = new LocalCluster();
    Config conf = new Config();
    cluster.submitTopology("exclaim", conf, builder.createTopology());

    System.out.println(drpc.execute("exclamation", "aaa"));
    System.out.println(drpc.execute("exclamation", "bbb"));

  }
}

 

分享到:
评论

相关推荐

    storm DRPC简单例程

    4. **DRPC的使用**:创建一个DRPC服务需要定义一个Storm拓扑,其中包含DRPCSpout(用于接收请求)和DRPCWorker(用于执行请求)。客户端可以通过HTTP或者Thrift协议与DRPC服务器通信。 5. **集群环境**:Storm集群...

    storm之drpc操作demo示例.zip

    在分布式计算领域,Apache Storm是一个实时计算系统,用于处理无界数据流。DRPC(Distributed Remote Procedure Call)是...通过实际操作这个示例,你将能够深入理解DRPC的工作流程,并具备构建自己DRPC应用的能力。

    dRPC

    安装完成后,可以导入dRPC库并按照文档或示例代码进行服务的创建和调用。 **示例代码:** ```python from dRPC import Client, Server # 创建服务端 class MyService: def add(self, a, b): return a + b ...

    drpc:纯Java实现基础rpc框架

    2. 示例代码:演示如何使用drpc框架创建和调用服务。 3. 测试用例:用于验证框架功能的正确性和性能。 4. 文档:详细解释框架的使用方法、配置选项和API说明。 5. 构建脚本:如build.gradle或pom.xml,帮助用户构建...

    DRPC:简单的Discord RPC程序

    使用DRPC程序,开发者可以为自己的应用添加专业且富有特色的Discord状态,增加社区的互动性和趣味性。同时,了解并掌握DRPC的工作原理,对于想要深入学习Discord API和事件驱动编程的开发者来说,也是宝贵的实践经验...

    storm-drpc-node:适用于Node.js的Apache Storm DRPC客户端

    Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...

    chill-zone-DRPC:寒带-申请Dicord Rich Presence

    - Discord API是Discord提供的一套接口,允许开发者创建自定义的插件、机器人和服务,以扩展Discord的功能。 - Rich Presence是Discord API的一部分,它允许开发者向Discord客户端展示更加详细的应用程序状态,...

    cqrs-topology

    设置DRPC服务器意味着在Storm拓扑中集成DRPC服务,以便其他系统可以向Storm提交任务,进行分布式计算。这通常涉及配置DRPC服务器,定义DRPC函数,并在Storm拓扑中创建相应的bolt来处理这些请求。 最后,"在DH中实施...

    grpc-android-demo:一个最小的GRPC演示项目,可以从原始文件自动生成GRPC实现。 还包括一个简约的服务器实现。 专为您在Android实验上的第一个GRPC设计的简单死机场

    GRPC Android演示关于由于目前似乎没有与GRPC服务器通信的Android应用程序的现成示例,该示例不涉及设置大量工具,因此我们决定创建一个简单的演示项目,其中包含一个简单的GRPC Java服务器和一个匹配的Android...

    trident-demo:三叉戟示威

    我希望通过在本地创建的多节点群集中运行此拓扑来演示这些功能。 当拓扑运行时,我们将杀死所有工作节点,并通过查看redis服务器中存储的最终计数来得出结论,三叉戟只有一次语义和状态处理。 要遵循本教程,我...

    storm深入学习.pdf

    本篇深入探讨了Storm的核心概念和使用技巧,包括基本Bolt的实现、批处理策略、TOPN功能、流程聚合、DRPC(Direct Remote Procedure Call)以及优化与异常处理。 首先,我们来看BasicBolt。BasicBolt是Storm中设计...

    Storm流计算项目:1号店电商实时数据分析系统-24.项目1-地区销售额-Trident代码开发二.pptx

    在数据验证方面,DRPC(Direct RPC)被用来提供服务,允许客户端发起特定查询并获得实时结果。DRPC客户端的开发使得可以从外部系统触发实时计算请求,增强了系统的交互性和灵活性。 然而,内存State在处理大量数据...

    Storm流计算项目:1号店电商实时数据分析系统-30.项目2-省份销售排行-Top N展示优化和项目开发思路总结.pptx

    尽管DRPC提供了服务访问,但其稳定性可能不及数据库。 4. **其他知识点**: - **CDH5**:一个集成的大数据平台,包括Hadoop、HBase、Zookeeper等组件,用于搭建和管理大数据集群。 - **Kafka**:分布式消息系统,...

    1号店电商实时数据分析系统-22.项目1-地区销售额-项目需求分析和分区Trident Spout开发.pptx

    - **DRPC(Distributed Remote Procedure Call)**:分布式远程过程调用,允许Web端通过DRPC向Storm集群发起请求,获取Top N的结果数据,提供了一种灵活的交互方式。 - **Trident State**:Trident状态管理机制,...

    M2H_Networking_Tutorial(unity3d网络入门教程源文件)

    通过阅读文档和逐步跟随示例,你可以深入理解Unity3D的网络机制,并建立起自己的网络应用。 总之,这个教程对于想要涉足Unity3D网络开发的初学者来说是一个很好的起点。通过学习,你可以掌握如何使用Unity3D的网络...

    大数据课程体系.docx

    - **Eclipse开发环境**:介绍Eclipse集成开发环境的安装与配置方法,包括如何在Eclipse中创建Java项目、编写代码、编译及运行程序。 - **多线程技术**:深入理解Java中的线程概念、线程生命周期以及线程间通信机制,...

    Kanade-s-Soundcloud-Discord-Rich-Presence:Soundcloud和Disord之间的丰富状态集成可以正常工作

    安装: 在计算机上的任何位置创建一个空文件夹。 将所有文件从“ Kanade-s-Soundcloud-Discord-Rich-Presence-master.zip”复制到该文件夹​​。 打开“ setup.bat”,然后等待其关闭。 下载tampermonkey- //tamper...

    storm集群部署和配置过程详解

    根据具体需求,可能还需要配置其他的组件,如drpc(分布式RPC)或logviewer。 在实际部署中,还需要考虑网络拓扑,确保nimbus和worker之间的通信畅通。同时,为了保证系统的稳定性和性能,需要监控资源使用情况,如...

Global site tag (gtag.js) - Google Analytics