Storm
1. 信息流处理{Stream processing}
Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。
2. 连续计算{Continuous computation}
Storm可进行连续查询并把结果即时反馈给客户端。比如把Twitter上的热门话题发送到浏览器中。
3. 分布式远程程序调用{Distributed RPC}
Storm可用来并行处理密集查询。Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果。 举个例子Distributed RPC可以做并行搜索或者处理大集合的数据。
Storm集群的组件介绍:
storm集群是表面上类似于Hadoop集群。而在Hadoop上运行“MapReduce作业”,在storm运行“topologies”。 “jobs”和“topologies”本身有很大的不同 - 一个关键的区别是,一个MapReduce作业最终完成,而一个永远的拓扑信息进行处理(或直到你杀了它)。
有两种类型的节点运行在Storm集群中:
(1)主节点(master)主节点运行着一个守护进程“Nimbus“,这个类似于hadoop的jobtracker,Nimbus 负责将任务分配给worker机器,并监视故障。
(2)工作节点(worker)运行着一个守护进程"Supervisor",他主管侦听分配给它的机器和启动工作,并停止工作进程在必要时依据"Nimbus"已经分配给它的任务。
Nimbus与supervisor之间的所有通信都是通过ZK来传递的,另外Nimbus的守护进程和监事守护进程快速失败和无状态的。所有的状态都是保存在ZK中或者本地的磁盘中的。这就意味着你可以通过Kill -9来杀死 Nimbus与supervisor进程,然后他们启动就像没发生过任何事情一样。这样的设计使得storm集群非常稳定。
如何将你的storm程序在storm集群中跑呢?
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
all-my-code.jar(包名) backtype.storm.MyTopology(类名-默认是定位到main方法) arg1 arg2(参数)
Streams
一个stream是一个没有边界的tuples.storm将一个流分成以分布式和可靠的方式的新的流。提供了一个数据流的转换:“spouts”,“bolts”。spouts和bolts有你实现运行的应用程序特定的逻辑接口。spouts其实就是一个数据源,一个spout去读取数据源,以tuples的形式通过emit()发射出去,然后bolts接受tuples。实际应用中可以有多个spout发射数据,并且由多个bolts接收数据,bolts也可以将接收到的数据再发射给其他bolts。
结构如下图所示:
这个拓扑将永远运行下去,或者直到你杀死它。storm将自动重新分配任何失败的任务。此外,Storm保证不会有数据丢失,即使宕机。
一个简单的例子拓扑:
1、SimpleSpout类继承BaseRichSpout类,用来产生数据并且向topology里面发出消息:tuple。
package com.ljq.helloword; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleSpout extends BaseRichSpout{ //用来发射数据的工具类 private SpoutOutputCollector collector; private static String[] info = new String[]{ "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998,"}; Random random=new Random(); /** * 初始化collector */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ @Override public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 } }
2、SimpleBolt类继承BaseBasicBolt类,处理一个输入tuple。
package com.ljq.helloword; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。 * * @author Administrator * */ @SuppressWarnings("serial") public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ //System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); } }
3、SimpleTopology类包含一个main函数,是Storm程序执行的入口点,包括一个数据喷发节点spout和一个数据处理节点bolt。
package com.ljq.helloword; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; /** * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。 * * @author Administrator * */ public class SimpleTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); Config config = new Config(); config.setDebug(true); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } } }
运行结果效果如下:
相关推荐
PC间用ACL和TRP协议
基于西门子S7-1200PLC的立体车库创新设计:融入新能源汽车充电元素,简约而不简单的智能化方案,基于西门子S7-1200PLC的立体车库设计设计 针对古老传统的立体车库进行创新,引入当下流行的新能源汽车充电元素,简约而不简单 包含:程序,图纸,仿真 ,关键词:基于西门子S7-1200PLC; 立体车库设计; 创新; 新能源汽车充电元素; 简约设计; 程序; 图纸; 仿真。,基于新能源充电的西门子S7-1200PLC立体车库创新设计:程序、图纸与仿真,简约而不凡
陕西省国家级非物质文化遗产经纬度数据统计表 统计内容包含以下字段: 1. 项目名称 2. 遗产类别 3. 入选批次 4. 所属地区 5. 申报地区/单位 6. 保护单位 7. 地理经度 8. 地理纬度 该统计表系统记录了陕西省国家级非物质文化遗产的地理空间信息,为文化遗产的数字化保护与研究工作提供了重要的数据支撑。
GBT27023复习备考
vivado MIPS32五级流水CPU设计
基于MBD开发手册的《Application Modeling Guideline:辅助驾驶系统中的数学模型设计与应用》,Application Modeling Guideline 文档57页 基于MBD开发手册包含: 1. MBD概述:文档可能会介绍MBD的基本概念,解释为什么它在辅助驾驶系统开发中至关重要。 MBD允许工程师使用数学模型来描述系统行为,这有助于早期发现问题,减少硬件原型制作的需求。 2. 工具选择:在辅助驾驶开发中,常用的MBD工具有MATLAB Simulink、MathWorks的Simulink Design Verifier、Vector的CANoe等。 文档可能涵盖这些工具的特点和适用场景,帮助开发者选择合适的工具。 3. 模型设计:MBD的核心是模型设计,文档会详细说明如何创建和组织模型结构,包括输入 输出接口定义、状态机设计、算法实现等。 对于辅助驾驶系统,这可能涉及到视觉处理、传感器融合、路径规划、决策制定等多个模块。 4. 仿真与验证:MBD工具支持动态仿真,开发者可以运行模型以验证其功能。 文档会介绍如何设置仿真条件,进行时序分析,以及如
LabVIEW曲线处理与包络线判断功能:准确判断,附带Demo及完整源码,labview做的曲线处理,包络线判断功能,判断准确。 一个demo,有源码 ,Labview曲线处理; 包络线判断功能; 判断准确; 有源码Demo; 包络线功能Demo; 准确度。,LabVIEW曲线处理与包络线精准判断功能Demo源码分享
2021年06月Scratch三级实操
GBT27067复习备考
基于nRF905的实验室温湿度监测系统研究.pdf
2022年12月机器人六级理论
Unity Shader Graph 2D - 角色身上部件高亮Bloom效果
大四-校内实习第四天(五)
2020年09月机器人五级实操
springboot停车场管理系统,含有完整的源码和报告文档
基于MPC模型预测算法的轨迹跟踪控制研究:双仿真比较及侧偏角软约束的重要性分析,基于mpc模型预测轨迹跟踪控制,总共包含两套仿真,一套是不加入四轮侧偏角软约束,一套是加入四轮侧偏角的软约束控制,通过carsim与simulink联合仿真发现加入侧偏角软约束在进行轨迹跟踪时,能够通过控制四轮侧偏角的变化,较好的实现轨迹跟踪;而不加入侧偏角软约束的,发现车辆由于失去稳定性, 轨迹跟踪失败( 该仿真是学习mpc模型预测算法控制和基于车辆动力学轨迹跟踪控制非常好的学习资料)。 文件中参考文献和文件说明。 ,基于mpc模型预测轨迹跟踪控制; 四轮侧偏角软约束; 车辆稳定性; 轨迹跟踪失败; carsim与simulink联合仿真; mpc模型预测算法控制; 车辆动力学轨迹跟踪控制。,MPC模型预测轨迹跟踪控制:侧偏角软约束与车辆稳定性的联合仿真研究
基于扩展卡尔曼滤波算法的车辆多状态精确估计与仿真研究:融合位置、轨迹、速度及加速度的实时动态监测,基于扩展卡尔曼滤波EKF的车辆状态估计 估计的状态包括: 1. 车辆的横纵向位置 2.车辆行驶轨迹、横摆角、 3. 车速、加速度、横摆角速度 4. 相应的估计偏差。 内容附带:Simulink模型与MATLAB代马,以及参考文档。 ,基于扩展卡尔曼滤波; 车辆状态估计; 横纵向位置估计; 行驶轨迹估计; 横摆角估计; 车速估计; 加速度估计; 横摆角速度估计; 估计偏差; Simulink模型; MATLAB代码; 参考文档。,"基于扩展卡尔曼滤波的车辆多状态估计系统及其偏差分析:Simulink模型与MATLAB代码实现"
"基于V-REP与MATLAB联合仿真的小车项目:循迹、避障、走迷宫及路径规划的详细代码与文档说明匹配版",V-REP小车项目+匹配文档,基于V-REP与MATLAB联合仿真,小车能够完成循迹、避障、走迷宫和路径规划,提供详细代码和文档说明。 ,核心关键词:V-REP小车项目; 匹配文档; 联合仿真; 循迹; 避障; 走迷宫; 路径规划; 详细代码; 文档说明。,V-REP小车项目:循迹避障与路径规划联合仿真实现
2022年7月环境管理体系基础