public static void initConnectDB(){ primaryKey = "id"; rdbmsUrl = "jdbc:mysql://hadoop/DB" ; rdbmsUserName = ""; rdbmsPassword = ""; connector = new RDBMSConnector(); try { con = connector.getConnection(rdbmsUrl, rdbmsUserName, rdbmsPassword); communicator = new RDBMSCommunicator2UFN(con); } catch (Exception e){ System.out.println("connect to db exception in initConnectDB()"); e.printStackTrace(); } } public static class GetUserID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { System.out.println("in prepare con : "+con); //this.communicator = new RDBMSCommunicator(con); System.out.println("in pretpare communicator :"+communicator); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userName = input.getString(1); String sql = String.format("select userID from usersinfo where username='%s'", userName); System.out.println("sql in get-user-id: "+sql); rs = communicator.selecteExec(sql); String userID = null; if (rs != null){ try { rs.next(); userID = rs.getString("userID"); } catch (Exception e){ e.printStackTrace(); } collector.emit(new Values(id, userID)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "userID")); } } public static class GetUserFunctionsID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { //communicator = new RDBMSCommunicator(con); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userID = input.getString(1); if (userID == null || userID.trim().length() == 0){ return; } String sql = String.format("select functionID from userfunctions where userID='%s'", userID); System.out.println("sql in get-user-functionid : "+sql); rs = communicator.selecteExec(sql); String functionID = null; if (rs != null){ try { while(rs.next()){ functionID = rs.getString("functionID"); collector.emit(new Values(id,functionID)); } } catch(Exception e){ e.printStackTrace(); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","functionID")); } } public static class GetUserFunctionsName extends BaseBatchBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; List<String> functionsName = new ArrayList<String>(); BatchOutputCollector _collector; Object _id; public void execute(Tuple tuple) { String functionID = tuple.getString(1); if (functionID == null || functionID.trim().length() == 0){ return ; } String sql = String.format("select functionName from functionsinfo where functionID='%s'",functionID); System.out.println("sql in get-user-functionname : "+sql ); rs = communicator.selecteExec(sql); String functionName = null; if(rs != null){ try { rs.next(); functionName = rs.getString("functionName"); functionsName.add(functionName); } catch (Exception e){ e.printStackTrace(); } } } public void finishBatch() { _collector.emit(new Values(_id,functionsName.toString())); } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "user-funcions-name")); } } public static LinearDRPCTopologyBuilder construct(){ initConnectDB(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("user-functions-name"); builder.addBolt(new GetUserID(), 2); builder.addBolt(new GetUserFunctionsID(),2).shuffleGrouping(); builder.addBolt(new GetUserFunctionsName(),2).fieldsGrouping(new Fields("id","functionID")); return builder; } public static void main(String[] args) throws Exception{ LinearDRPCTopologyBuilder builder = construct(); Config conf = new Config(); if(args==null || args.length==0) { conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("user-fn-drpc", conf, builder.createLocalTopology(drpc)); String[] userNames = new String[] { "qingwu.fu"}; for(String un: userNames) { System.out.println("Functions name of : " + un + ": " + drpc.execute("user-functions-name", un)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(6); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
相关推荐
这是storm中drpc应用的一个例子。
"storm DRPC简单例程" 指的是使用Apache Storm分布式实时计算系统中的DRPC(Distributed Remote Procedure Calls)功能的一个基础示例。DRPC允许用户在Storm集群上执行分布式远程过程调用,使得在分布式环境中进行高...
DRPC(Distributed Remote Procedure Call)是Storm中的一个特性,它允许我们执行分布式远程过程调用,使得实时计算变得更加灵活和强大。本示例将通过一个具体的DRPC操作Demo来深入理解这一功能。 首先,DRPC的基本...
Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...
2、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,简单易懂; 3、分享积累的经验和技巧,从架构的角度剖析场景和设计实现...
第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对Storm有了直观的认识;第3章深入讲解了Storm的基本概念,同时实现一个Topology运行;第4章和第5章阐述了Storm的并发度、可靠处理的...
DRPC允许外部应用向Storm集群提交一个远程调用请求,请求执行一个特定的topology任务,然后返回结果。这种方式提供了灵活的交互式查询能力。 7. **Storm executor、worker、task之间的关系和调优** - **Executor...
Storm Trident作为Storm的一个高级API,提供了更高级别的抽象和事务支持,使得复杂的数据处理变得更加简单和可靠。 **2. Storm的全面讲解** - **深度解析**:课程不仅覆盖了Storm的基础概念和架构,还深入探讨了其...
Storm是一个分布式实时计算系统,它允许开发者处理无界数据流,具有高容错性和高性能的特点。在本节中,我们将深入探讨Storm的核心概念和特性,包括它的记录级容错原理、配置详解、批处理、TOPN操作、流程聚合、DRPC...
01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解
8. **简单拓扑示例**: 一个简单的例子包括一个Spout和两个Bolt。Spout产生单词,第一个Bolt添加"!!!",然后传递给第二个Bolt。这种线性排列展示了数据如何在拓扑中流动。 在深入学习Storm时,还需要关注其他概念,...
**结论**:通过本章的学习,我们了解到 Storm 的基本架构和组件,以及如何构建一个简单的 Topology。 #### 第三章 Topologies **流分组**: - **Shuffle 分组**:随机将 tuple 发送到 bolt 的实例。 - **Fields ...
Storm是一个分布式实时计算系统,能够处理大规模数据流,确保每个事件都得到正确的处理。以下是搭建Storm集群的详细步骤: 1. **设置Zookeeper集群**: Storm依赖Zookeeper进行集群协调,确保高可用性和容错性。...
- **`storm.id`**:标识正在运行的拓扑ID,由拓扑名称和一个唯一随机数组成。 #### Nimbus 服务配置 - **`nimbus.host`**:指定Nimbus服务器的地址,Nimbus是Storm集群的核心管理节点。 - **`nimbus.thrift.port`*...
Storm是一个分布式实时计算系统,允许开发者编写处理无限数据流的程序。配置项是Storm的核心组成部分,它们用于定制Storm集群的行为,以适应不同的环境和需求。以下是对文档中提及的一些主要配置项的详细解释: 1. ...
Storm 是一个分布式实时计算系统,它允许开发者处理无界数据流,具有高可用性、容错性和低延迟的特点。在深入理解 Storm 的核心概念和特性之前,首先需要知道它的记录级容错原理,这是 Storm 强大功能的基础。 **...
利用Storm的可扩展性和灵活性,设计了一个原型系统,通过定制Spout和Bolt来处理特定的报文格式和业务逻辑,同时应用合适的Grouping策略以满足并发和隔离要求。 4.3 原型系统实现 实现过程中,关注点在于优化数据流...