drpc是storm的远程函数接口,实现方式有两种
此处代码仅适用集群运行,本地调试请自行修改local启动方式
1、服务器端代码,服务器代码打jar包后在服务器端运行,请注意“统一写法”
方法1
package bhz.drcp; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ManualDRPC { //服务器端 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); DRPCSpout spout = new DRPCSpout("add"); builder.setSpout("drpc", spout); builder.setBolt("add", new AddBolt(),3).shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(),3).shuffleGrouping("add"); Config conf = new Config(); StormSubmitter.submitTopology("ManualDRPC", conf, builder.createTopology()); } public static class AddBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { Object returnInfo = input.getValue(1); String params = input.getString(0); String conversValue = params+"!!!!"; collector.emit(new Values(conversValue,returnInfo)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result","return-info"));//统一写法,注意 } } }
方法2
package bhz.drcp; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ManualDRPC { public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add1"); builder.addBolt(new AddBolt(), 1); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(2); StormSubmitter.submitTopology("HelloDRPC", conf, builder.createRemoteTopology()); } public static class AddBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { Object returnInfo = input.getValue(1); String params = input.getString(0); String conversValue = params + "!!!!"; collector.emit(new Values(conversValue, returnInfo)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","value"));//统一写法,注意 } } }
2、客户端、可以运行在任何环境
package bhz.drcp; import org.apache.thrift7.TException; import backtype.storm.generated.DRPCExecutionException; import backtype.storm.utils.DRPCClient; public class ManualClientDRPC { public static void main(String[] args) throws TException, DRPCExecutionException { DRPCClient client = new DRPCClient( "sto1", 3772); String result = client.execute("add", "1,asdf2"); System.err.println(result); } }
相关推荐
【Strom:强大的WebService接口测试工具】 WebService接口测试是软件开发过程中不可或缺的一环,它确保了服务间的通信正常且高效。Strom是一款优秀的测试工具,专为开发者设计,用于快速、方便地对WebService接口...
strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的...
- 配置信任连接: 修改 `/usr/pgsql-9.5/data/pg_hba.conf` 文件,允许特定服务器进行远程连接。 ##### 6. 设置开机启动 - 开机启动服务: `systemctl enable postgresql-9.5.service`。 - 启动服务: `systemctl start...
【Strom流处理的基础知识总结】 Strom是一个分布式实时计算系统,由Twitter开源,用于处理大规模数据流。它被设计成可扩展、容错且低延迟的,适用于实时数据分析、在线机器学习、持续计算和大数据处理等多个场景。...
2. **RESTful API测试**:针对RESTful接口,Strom提供了直观的界面来构造HTTP请求(GET, POST, PUT, DELETE等),并能处理JSON和XML格式的数据。用户可以轻松设置请求头,查询参数和请求体,进行端到端的测试。 3. ...
strom zookeeper kafka 部署文档 原理解析
9. `rocksdbjni-4.8.0.jar`: RocksDB的Java JNI接口库,RocksDB是一个高性能、嵌入式级别的键值存储系统,常用于大数据存储和日志记录。 10. `kafka_2.11-0.10.0.1-test.jar`: Kafka的测试库,包含了测试用例和相关...
在IT行业中,"Strom优化"是一个非常关键的主题,特别是在大数据处理和实时计算领域。Storm是一个开源的分布式实时计算系统,由Twitter开发并贡献给Apache软件基金会。它被设计用来处理无界数据流,能够持续地处理...
strom介绍,包括出现背景,应用场景,环境搭建,基本架构。
**PostgreSQL数据库插件PG-Strom** PG-Strom是一款针对PostgreSQL数据库的高性能计算扩展,它利用GPU(图形处理器)的并行计算能力,优化数据库的查询处理,尤其是在大数据量和复杂计算场景下表现优越。PG-Strom的...
pg-strom, PG Strom开发知识库 pgpg strom是PostgreSQL数据库的定制扫描提供程序模块。 它是用于使用GPU设备进行accelarate顺序扫描,hash-基于表的Join 和聚合函数。 它的基本概念是CPU和GPU应该集中在它们具有优势...
【标题】"超级简单入门的storm的java代码demo"提供了对Apache Storm的初步理解与实践。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高容错性和可扩展性。本示例项目适用于Java...
PG-Strom在执行大规模数据集上的聚合、窗口函数和复杂JOIN操作时,性能提升尤其显著。例如,对于包含数亿条记录的数据,使用GPU进行并行处理可以比纯CPU执行快几个数量级。 **挑战与限制** 虽然PG-Strom提供了显著...
strom学习笔记
在安装Strom之前,我们需要对基础环境进行配置。首先,要修改服务器的主机名,这可以通过编辑`/etc/sysconfig/network`文件实现。找到`HOSTNAME`行,并将其值更改为所需的主机名,例如`storm1`、`storm2`或`storm3`...
前端开发工具strom.zip
这个是strom 1.0.2 的jar 包,版本比较老了,但是还是