public static void initConnectDB(){ primaryKey = "id"; rdbmsUrl = "jdbc:mysql://hadoop/DB" ; rdbmsUserName = ""; rdbmsPassword = ""; connector = new RDBMSConnector(); try { con = connector.getConnection(rdbmsUrl, rdbmsUserName, rdbmsPassword); communicator = new RDBMSCommunicator2UFN(con); } catch (Exception e){ System.out.println("connect to db exception in initConnectDB()"); e.printStackTrace(); } } public static class GetUserID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { System.out.println("in prepare con : "+con); //this.communicator = new RDBMSCommunicator(con); System.out.println("in pretpare communicator :"+communicator); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userName = input.getString(1); String sql = String.format("select userID from usersinfo where username='%s'", userName); System.out.println("sql in get-user-id: "+sql); rs = communicator.selecteExec(sql); String userID = null; if (rs != null){ try { rs.next(); userID = rs.getString("userID"); } catch (Exception e){ e.printStackTrace(); } collector.emit(new Values(id, userID)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "userID")); } } public static class GetUserFunctionsID extends BaseBasicBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; @Override public void prepare(Map stormConf, TopologyContext context) { //communicator = new RDBMSCommunicator(con); } public void execute(Tuple input, BasicOutputCollector collector) { Object id = input.getValue(0); String userID = input.getString(1); if (userID == null || userID.trim().length() == 0){ return; } String sql = String.format("select functionID from userfunctions where userID='%s'", userID); System.out.println("sql in get-user-functionid : "+sql); rs = communicator.selecteExec(sql); String functionID = null; if (rs != null){ try { while(rs.next()){ functionID = rs.getString("functionID"); collector.emit(new Values(id,functionID)); } } catch(Exception e){ e.printStackTrace(); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","functionID")); } } public static class GetUserFunctionsName extends BaseBatchBolt{ //private RDBMSCommunicator communicator = null; private ResultSet rs = null; List<String> functionsName = new ArrayList<String>(); BatchOutputCollector _collector; Object _id; public void execute(Tuple tuple) { String functionID = tuple.getString(1); if (functionID == null || functionID.trim().length() == 0){ return ; } String sql = String.format("select functionName from functionsinfo where functionID='%s'",functionID); System.out.println("sql in get-user-functionname : "+sql ); rs = communicator.selecteExec(sql); String functionName = null; if(rs != null){ try { rs.next(); functionName = rs.getString("functionName"); functionsName.add(functionName); } catch (Exception e){ e.printStackTrace(); } } } public void finishBatch() { _collector.emit(new Values(_id,functionsName.toString())); } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "user-funcions-name")); } } public static LinearDRPCTopologyBuilder construct(){ initConnectDB(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("user-functions-name"); builder.addBolt(new GetUserID(), 2); builder.addBolt(new GetUserFunctionsID(),2).shuffleGrouping(); builder.addBolt(new GetUserFunctionsName(),2).fieldsGrouping(new Fields("id","functionID")); return builder; } public static void main(String[] args) throws Exception{ LinearDRPCTopologyBuilder builder = construct(); Config conf = new Config(); if(args==null || args.length==0) { conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("user-fn-drpc", conf, builder.createLocalTopology(drpc)); String[] userNames = new String[] { "qingwu.fu"}; for(String un: userNames) { System.out.println("Functions name of : " + un + ": " + drpc.execute("user-functions-name", un)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(6); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
相关推荐
这是storm中drpc应用的一个例子。
第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对Storm有了直观的认识;第3章深入讲解了Storm的基本概念,同时实现一个Topology运行;第4章和第5章阐述了Storm的并发度、可靠处理的...
8. **简单拓扑示例**: 一个简单的例子包括一个Spout和两个Bolt。Spout产生单词,第一个Bolt添加"!!!",然后传递给第二个Bolt。这种线性排列展示了数据如何在拓扑中流动。 在深入学习Storm时,还需要关注其他概念,...
- **使用Storm开发一个WordCount例子**:通过WordCount示例来演示Storm的应用开发过程。 - **Storm程序本地模式debug、Storm程序远程debug**:指导如何调试Storm程序。 - **Storm事务处理**:介绍Storm如何支持事务...
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
wrf转mp4播放器1.1.1
内容概要:本文档详细介绍了如何在Simulink中设计一个满足特定规格的音频带ADC(模数转换器)。首先选择了三阶单环多位量化Σ-Δ调制器作为设计方案,因为这种结构能在音频带宽内提供高噪声整形效果,并且多位量化可以降低量化噪声。接着,文档展示了具体的Simulink建模步骤,包括创建模型、添加各个组件如积分器、量化器、DAC反馈以及连接它们。此外,还进行了参数设计与计算,特别是过采样率和信噪比的估算,并引入了动态元件匹配技术来减少DAC的非线性误差。性能验证部分则通过理想和非理想的仿真实验评估了系统的稳定性和各项指标,最终证明所设计的ADC能够达到预期的技术标准。 适用人群:电子工程专业学生、从事数据转换器研究或开发的技术人员。 使用场景及目标:适用于希望深入了解Σ-Δ调制器的工作原理及其在音频带ADC应用中的具体实现方法的人群。目标是掌握如何利用MATLAB/Simulink工具进行复杂电路的设计与仿真。 其他说明:文中提供了详细的Matlab代码片段用于指导读者完成整个设计流程,同时附带了一些辅助函数帮助分析仿真结果。
国网台区终端最新规范
《基于YOLOv8的智慧农业水肥一体化控制系统》(包含源码、可视化界面、完整数据集、部署教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计
GSDML-V2.33-LEUZE-AMS3048i-20170622.xml
微信小程序项目课程设计,包含LW+ppt
微信小程序项目课程设计,包含LW+ppt
终端运行进度条脚本
幼儿园预防肺结核教育培训课件资料
python,python相关资源
《基于YOLOv8的智慧校园电动车充电桩状态监测系统》(包含源码、可视化界面、完整数据集、部署教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计
deepseek 临床之理性软肋.pdf
SM2258XT量产工具(包含16种程序),固态硬盘量产工具使用
RecyclerView.zip
水务大脑让水务运营更智能(23页)