`
sunbin
  • 浏览: 354846 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

strom远程drpc函数接口

 
阅读更多

drpc是storm的远程函数接口,实现方式有两种

此处代码仅适用集群运行,本地调试请自行修改local启动方式

1、服务器端代码,服务器代码打jar包后在服务器端运行,请注意“统一写法”

方法1

package bhz.drcp;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class ManualDRPC {
    //服务器端
    public static void main(String[] args) throws  Exception   {
        TopologyBuilder builder = new TopologyBuilder();
        DRPCSpout spout = new DRPCSpout("add");
        builder.setSpout("drpc", spout);
        builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc");
        builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add");
        Config conf = new Config();
        StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology());
    }
    public static class AddBolt extends BaseBasicBolt{
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object returnInfo = input.getValue(1);
            String params = input.getString(0);
            String conversValue = params+"!!!!";
            collector.emit(new Values(conversValue,returnInfo));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("result","return-info"));//统一写法,注意
        }
        
    }
}

 

方法2

 

package bhz.drcp;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class ManualDRPC {
	public static void main(String[] args) throws Exception {
		LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add1");
		builder.addBolt(new AddBolt(), 1);
		Config conf = new Config();
		conf.setDebug(false);
		conf.setNumWorkers(2);
		StormSubmitter.submitTopology("HelloDRPC", conf, builder.createRemoteTopology());

	}
	public static class AddBolt extends BaseBasicBolt {
		@Override
		public void execute(Tuple input, BasicOutputCollector collector) {
			Object returnInfo = input.getValue(1);
			String params = input.getString(0);
			String conversValue = params + "!!!!";
			collector.emit(new Values(conversValue, returnInfo));
		}
		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
		        declarer.declare(new Fields("id","value"));//统一写法,注意
		}

	}
}
 

 

2、客户端、可以运行在任何环境

package bhz.drcp;

import org.apache.thrift7.TException;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.utils.DRPCClient;
public class ManualClientDRPC {
    public static void main(String[] args) throws TException, DRPCExecutionException {
            DRPCClient client = new DRPCClient( "sto1", 3772);
            String result = client.execute("add", "1,asdf2");
            System.err.println(result);
    }
}

 

分享到:
评论

相关推荐

    WebService接口测试工具—Strom

    【Strom:强大的WebService接口测试工具】 WebService接口测试是软件开发过程中不可或缺的一环,它确保了服务间的通信正常且高效。Strom是一款优秀的测试工具,专为开发者设计,用于快速、方便地对WebService接口...

    strom的jar包

    strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的...

    GPU数据库PG_strom的安装及使用

    - 配置信任连接: 修改 `/usr/pgsql-9.5/data/pg_hba.conf` 文件,允许特定服务器进行远程连接。 ##### 6. 设置开机启动 - 开机启动服务: `systemctl enable postgresql-9.5.service`。 - 启动服务: `systemctl start...

    Strom流处理的基础知识总结

    【Strom流处理的基础知识总结】 Strom是一个分布式实时计算系统,由Twitter开源,用于处理大规模数据流。它被设计成可扩展、容错且低延迟的,适用于实时数据分析、在线机器学习、持续计算和大数据处理等多个场景。...

    Strom webService测试工具

    2. **RESTful API测试**:针对RESTful接口,Strom提供了直观的界面来构造HTTP请求(GET, POST, PUT, DELETE等),并能处理JSON和XML格式的数据。用户可以轻松设置请求头,查询参数和请求体,进行端到端的测试。 3. ...

    strom 部署文档资料

    strom zookeeper kafka 部署文档 原理解析

    Strom项目依赖

    9. `rocksdbjni-4.8.0.jar`: RocksDB的Java JNI接口库,RocksDB是一个高性能、嵌入式级别的键值存储系统,常用于大数据存储和日志记录。 10. `kafka_2.11-0.10.0.1-test.jar`: Kafka的测试库,包含了测试用例和相关...

    Strom优化

    在IT行业中,"Strom优化"是一个非常关键的主题,特别是在大数据处理和实时计算领域。Storm是一个开源的分布式实时计算系统,由Twitter开发并贡献给Apache软件基金会。它被设计用来处理无界数据流,能够持续地处理...

    Strom实时流处理大数据框架

    strom介绍,包括出现背景,应用场景,环境搭建,基本架构。

    postgresql数据库插件PG-Strom中Scan算子执行流程分析

    **PostgreSQL数据库插件PG-Strom** PG-Strom是一款针对PostgreSQL数据库的高性能计算扩展,它利用GPU(图形处理器)的并行计算能力,优化数据库的查询处理,尤其是在大数据量和复杂计算场景下表现优越。PG-Strom的...

    pg-strom, PG Strom开发知识库.zip

    pg-strom, PG Strom开发知识库 pgpg strom是PostgreSQL数据库的定制扫描提供程序模块。 它是用于使用GPU设备进行accelarate顺序扫描,hash-基于表的Join 和聚合函数。 它的基本概念是CPU和GPU应该集中在它们具有优势...

    pg_strom:PostgreSQL 的 FDW 模块使用 GPU 进行异步超并行查询执行

    PG-Strom在执行大规模数据集上的聚合、窗口函数和复杂JOIN操作时,性能提升尤其显著。例如,对于包含数亿条记录的数据,使用GPU进行并行处理可以比纯CPU执行快几个数量级。 **挑战与限制** 虽然PG-Strom提供了显著...

    超级简单入门的strom的java代码demo

    【标题】"超级简单入门的storm的java代码demo"提供了对Apache Storm的初步理解与实践。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高容错性和可扩展性。本示例项目适用于Java...

    strom学习笔记.md

    strom学习笔记

    Strom安装手册.pdf

    在安装Strom之前,我们需要对基础环境进行配置。首先,要修改服务器的主机名,这可以通过编辑`/etc/sysconfig/network`文件实现。找到`HOSTNAME`行,并将其值更改为所需的主机名,例如`storm1`、`storm2`或`storm3`...

    前端开发工具strom.zip

    前端开发工具strom.zip

    strom-core-1.0.2

    这个是strom 1.0.2 的jar 包,版本比较老了,但是还是

Global site tag (gtag.js) - Google Analytics