前言:
本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述。避免干巴巴的讲理论。
1. 建立Maven项目
我们用Maven来管理项目,方便lib依赖的引用和版本控制。
建立最基本的pom.xml如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.edi.storm</groupId>
- <artifactId>storm-samples</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <repositories>
- <repository>
- <id>clojars.org</id>
- <url>http://clojars.org/repo</url>
- </repository>
- </repositories>
- <build>
- <finalName>storm-samples</finalName>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.9.0-rc2</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </project>
这里我额外添加了两个build 插件:
maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.
和
maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。
2. 建立Spout
前文提到过,Storm中的spout负责发射数据。
我们来实现这样一个spout:
它会随机发射一系列的句子,句子的格式是 谁:说的话
代码如下:
- public class RandomSpout extends BaseRichSpout {
- private SpoutOutputCollector collector;
- private Random rand;
- private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.rand = new Random();
- }
- @Override
- public void nextTuple() {
- String toSay = sentences[rand.nextInt(sentences.length)];
- this.collector.emit(new Values(toSay));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- }
这里要先理解Tuple的概念。
Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。
该Spout代码里面最核心的部分有两个:
a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }
b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。
3. 建立Bolt
既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。
两个功能,两个Bolt。
先看添加"!"的Bolt
- public class ExclaimBasicBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- //String sentence = tuple.getString(0);
- String sentence = (String) tuple.getValue(0);
- String out = sentence + "!";
- collector.emit(new Values(out));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("excl_sentence"));
- }
- }
在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。
Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。
取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"
打印Bolt
- public class PrintBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String rec = tuple.getString(0);
- System.err.println("String recieved: " + rec);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // do nothing
- }
- }
仍然是取第一个,因为我们并没有定义过第二个value
4. 建立Topology
现在我们建立拓扑结构的主要组件都有了,可以创建topology了。
- public class ExclaimBasicTopo {
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomSpout());
- builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
- builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
- Config conf = new Config();
- conf.setDebug(false);
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
- }
很简单,对吧。
其中,
- builder.setSpout("spout", new RandomSpout());
定义一个spout,id为"spout"
- builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple
- builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple
- .shuffleGrouping
是指明Storm按照何种策略将tuple分配到后续的bolt去。
可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。
把项目用M2E插件导入Eclipse直接运行试试
- String recieved: marry:I'm angry!
- String recieved: edi:I'm happy!
- String recieved: john:I'm sad!
- String recieved: edi:I'm happy!
- String recieved: ted:I'm excited!
- String recieved: laden:I'm dangerous!
- String recieved: edi:I'm happy!
- String recieved: edi:I'm happy!
这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。
我们修改下代码,指定并行数
- builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
- builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");
由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。
为了方便体现确实是并行,我们修改PrintBolt代码如下:
- public class PrintBolt extends BaseBasicBolt {
- private int indexId;
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- this.indexId = context.getThisTaskIndex();
- }
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String rec = tuple.getString(0);
- System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // do nothing
- }
- }
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。
运行下看看:
- Bolt[0] String recieved: marry:I'm angry!
- Bolt[2] String recieved: john:I'm sad!
- Bolt[2] String recieved: ted:I'm excited!
- Bolt[2] String recieved: john:I'm sad!
- Bolt[2] String recieved: john:I'm sad!
证实确实是并发了。
本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行
- ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test
在StormUI里面,点进 test
看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。
截止到这里,一个简单的Storm的topology已经完成了。
但是,这里依然有些问题:
1. 什么是acker?
2. Bolt为什么有两个继承类和接口?
3. Topology的提交方式到底有几种?
4. 除了随机分组,还有哪些分组策略?
5. Storm是如何保证tuple不被丢失的?
6. 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep?
7. 并发数要如何指定呢?
http://blog.csdn.net/xeseo/article/details/17683049
相关推荐
区块链_智能合约_Solidity_保险应用_基于以太坊的技_1744433266
内容概要:本文档详细介绍了在Windows系统上安装MySQL数据库的具体步骤。首先,需要配置系统环境变量,包括新建MYSQL_HOME变量并将其添加到PATH中;其次,创建并编辑my.ini配置文件,设置MySQL的基本参数如端口、字符集、数据存放目录等;接着,在命令行工具中通过一系列指令完成MySQL的初始化、服务安装、启动以及root用户的密码设置和权限调整。整个流程涵盖了从环境搭建到最终确保MySQL服务正常运行的所有关键环节。 适合人群:适用于有一定计算机操作基础,尤其是对数据库管理有一定兴趣或需求的技术人员。 使用场景及目标:①帮助用户在本地机器上成功部署MySQL数据库环境;②确保用户能够掌握MySQL的基本配置与管理技能,如环境变量配置、服务安装与卸载、用户权限管理等。 其他说明:在安装过程中可能会遇到一些常见问题,例如由于之前版本残留导致的服务安装失败,此时可以通过命令行删除旧服务(sc delete mysql)来解决。此外,为了保证安全性,务必及时修改root用户的初始密码。
内容概要:`STARTUP.A51` 是 Keil C51 编译器自带的启动文件,用于初始化 8051 单片机的硬件和软件环境。该文件主要完成三个任务:初始化堆栈指针、清零内部数据存储器、跳转到主程序。文件中定义了内存模式(如 SMALL),并设置了堆栈指针的初始值为 0x60。接着通过循环将内部数据存储器的所有字节清零,确保程序开始时数据存储器的状态是确定的。此外,文件还列出了 8051 单片机的各个中断向量地址,并为每个中断提供占位符,实际的中断处理程序需要在其他文件中实现。最后,启动代码段初始化堆栈指针和数据段后,跳转到 `MAIN` 函数开始执行主程序。; 适合人群:对嵌入式系统开发有一定了解,尤其是使用 8051 单片机的开发者。; 使用场景及目标:①理解 8051 单片机启动文件的工作原理;②掌握如何初始化堆栈指针和数据段;③熟悉中断向量表的设置及其作用。; 其他说明:此文件为程序正常运行提供了必要的初始化操作,开发者可以根据具体需求修改该文件以适应不同的硬件和软件环境。
内容概要:该论文研究了一种基于行波理论的输电线路故障诊断方法。当输电线路发生故障时,故障点会产生向两侧传播的电流和电压行波。通过相模变换对三相电流行波解耦,利用解耦后独立模量间的关系确定故障类型和相别,再采用小波变换模极大值法标定行波波头,从而计算故障点距离。仿真结果表明,该方法能准确识别故障类型和相别,并对故障点定位具有高精度。研究使用MATLAB进行仿真验证,为输电线路故障诊断提供了有效解决方案。文中详细介绍了三相电流信号生成、相模变换(Clarke变换)、小波变换波头检测、故障诊断主流程以及结果可视化等步骤,并通过多个实例验证了方法的有效性和准确性。 适合人群:具备一定电力系统基础知识和编程能力的专业人士,特别是从事电力系统保护与控制领域的工程师和技术人员。 使用场景及目标:①适用于电力系统的故障检测与诊断;②能够快速准确地识别输电线路的故障类型、相别及故障点位置;③为电力系统的安全稳定运行提供技术支持,减少停电时间和损失。 其他说明:该方法不仅在理论上进行了深入探讨,还提供了完整的Python代码实现,便于读者理解和实践。此外,文中还讨论了行波理论的核心公式、三相线路行波解耦、行波测距实现等关键技术点,并针对工程应用给出了注意事项,如波速校准、采样率要求、噪声处理等。这使得该方法不仅具有学术价值,也具有很强的实际应用前景。
内容概要:本文详细介绍了光伏-混合储能微电网能量管理系统的模型架构及其控制策略。首先探讨了光伏发电模块中的MPPT(最大功率点跟踪)控制,采用扰动观察法和改进型变步长策略来提高光伏板的发电效率。接着重点讲解了混合储能系统的功率分配,利用一阶低通滤波算法将功率需求分为低频和高频两部分,分别由蓄电池和超级电容处理。此外,文中还深入讨论了SOC(荷电状态)管理策略,确保电池和超级电容在不同工作状态下保持最佳性能。仿真结果显示,在光伏出力剧烈波动的情况下,系统能够有效地维持稳定的电压水平,并显著提高了储能设备的使用寿命。 适合人群:对光伏微电网、储能技术和能量管理系统感兴趣的科研人员、工程师和技术爱好者。 使用场景及目标:适用于研究和开发高效、可靠的光伏-混合储能微电网系统,旨在优化能量管理和提高系统稳定性。具体应用场景包括但不限于家庭光伏系统、小型微电网以及工业能源管理系统。 其他说明:文中提供了详细的代码实现和仿真结果,便于读者理解和复现实验。同时,模型设计采用了模块化思路,方便进行个性化修改和扩展。
内容概要:本文详细介绍了基于MATLAB和CVX平台实现的储能调峰调频联合优化模型。该模型不仅涵盖了储能的基本参数设定、负荷不确定性处理、充放电策略制定,还包括了调峰调频的联合调度、功率约束处理、鲁棒优化等方面的内容。通过构建考虑电池退化成本、充放电功率约束以及用户负荷不确定性的储能优化模型,展现了储能系统在电力系统中的高效协同工作。文中提供了详细的代码示例,解释了各个部分的功能和实现方法,强调了模型的深度与创新性。 适合人群:适用于具有一定编程基础和技术背景的研究人员、工程师以及希望深入了解储能系统优化的学生。 使用场景及目标:该模型主要用于电力系统中储能设备的优化调度,旨在提高储能系统的经济效益和社会效益。通过联合调峰调频,能够显著提升储能系统的收益,实现1+1>2的超线性增益效果。此外,该模型还可以用于教学和科研,帮助初学者理解和掌握储能优化的相关技术和理论。 其他说明:代码中包含了丰富的注释和模块化的子程序,使得整个模型易于理解和扩展。对于有经验的开发者,可以在现有基础上进一步改进和定制,以适应不同的应用场景。
大模型技术白皮书2023版
图像增广 PyTorch 版
批量修改文件名可以帮助用户节省大量时间,提高工作效率 里面附带使用教程
《计算机应用基础》第2章--Windows-XP操作系统.ppt
包括:源程序工程文件、Proteus仿真工程文件、电路原理图文件、配套技术手册、论文资料等 1、采用51/52单片机(通用)作为主控芯片; 2、采用1602液晶显示使用过程及状态,液晶屏亮度会随光线自动调整; 3、按键输入6位密码,输入密码正确则锁打开,显示open!输入密码错误次数超过3次,蜂鸣器报警并且锁定键盘; 4、密码可以自己修改,必须是锁打开时才能改密,为防止误操作,修改密码得输入两次; 5、采用24C02保存密码,掉电不丢失; 6、可通过红外遥控器输入密码操作锁的状态;
内容概要:本文深入剖析了2025年全球感知技术的十大发展趋势,涵盖多模态感知融合、3D感知与空间计算、脑机接口中的感知反馈技术、5G/6G赋能的超低延迟感知、语音与情感识别的高级化、生物感知与数字健康、环境感知与自适应智能、增强现实(AR)与触觉反馈技术、气味与化学感知、量子感知与极端条件测量。文章详细介绍了每项技术的技术原理、关键算法、实现方式、商业案例及未来前景,强调了感知技术在智慧城市、自动驾驶、智慧医疗、工业自动化等领域的深刻影响。报告指出,感知技术正从单一传感模式向多模态融合、从二维数据向三维空间重建、从传统网络通信向超低延迟和高可靠性网络升级,实现全场景、全维度的智能感知。; 适合人群:对感知技术感兴趣的科技爱好者、研究人员、决策者、企业管理层和投资人。; 使用场景及目标:①了解感知技术的最新进展和未来发展方向;②为技术研究提供全面、深入的参考;③为商业应用提供具体的案例和前景分析;④推动跨领域协同创新,构建开放共赢的产业生态。; 其他说明:报告基于近年来技术研发的最新进展、业界前沿的技术路线以及各大科技企业在商业落地方面的丰富实践。随着感知技术的不断成熟,数据隐私与安全保护问题也需高度重视,以确保技术进步与社会伦理和谐统一。未来,感知技术将成为推动社会进步和产业升级的重要力量,为实现万物互联、智慧决策和智能体验提供无限可能。
本论文主要论述了如何使用JAVA语言开发一个校园新闻网站 ,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将论述校园新闻网站的当前背景以及系统开发的目的,后续章节将严格按照软件开发流程,对系统进行各个阶段分析设计。 校园新闻网站的主要使用者分为管理员和用户,实现功能包括管理员:首页、个人中心、用户管理、新闻类型管理、校园新闻管理、留言板管理、论坛交流、系统管理,用户前台:首页、校园新闻、论坛交流、留言反馈、个人中心、后台管理等功能。由于本网站的功能模块设计比较全面,所以使得整个校园新闻网站信息管理的过程得以实现。 本系统的使用可以实现本校园新闻网站管理的信息化,可以方便管理员进行更加方便快捷的管理,可以提高管理人员的工作效率。 基于Springboot+vue的校园新闻网站【源码+数据库+参考论文】 感兴趣自行下载学习!
内容概要:本文详细探讨了三相三电平PWM整流器的闭环控制策略及其核心技术——三电平SVPWM算法。文章首先介绍了三相三电平PWM整流器的基本概念和优势,如输出三种电平以降低谐波含量并减少滤波器体积和成本。接着阐述了闭环控制策略的重要性,强调了电压外环和电流内环的双闭环控制机制。随后,文章深入讲解了三电平SVPWM算法的工作原理,包括空间电压矢量的选择、扇区判断、矢量作用时间和死区补偿等关键技术环节。此外,还讨论了中点电位平衡的问题以及PI参数的整定方法。最后,通过示波器测试验证了系统的性能指标,如THD低于3%,直流电压纹波小于1%。 适合人群:从事电力电子领域的工程师和技术人员,尤其是对三相三电平PWM整流器及其控制策略感兴趣的读者。 使用场景及目标:适用于高压大功率场合,旨在提高整流器的性能,降低谐波含量,实现单位功率因数运行。通过合理设计闭环控制策略和优化SVPWM算法,确保整流器在各种工况下都能稳定、高效地工作。 其他说明:文中提供了大量MATLAB和C语言代码片段,帮助读者更好地理解和实现相关算法。同时,针对实际调试过程中遇到的问题给出了实用的解决方案,如中点电位平衡和死区补偿等。
全新红娘本地交友系统定制版源码 相亲婚恋交友小程序源码
内容概要:文章探讨了AI技术,特别是DeepSeek,如何驱动地图生成的变革。首先介绍了地图制图在AI时代的背景与挑战,强调了DeepSeek与地图融合的两种主要方式:嵌入地图制图链和研发地图语言自身的预训练模型。随后详细描述了DeepSeek在地图生成中的具体应用,包括智能化地图生成器DoMapAI的整体框架,地图制图链中的知识图谱推理路径,以及地图语言的Token化过程。最后,文章总结了AI时代地图制图的职业变化和技术变革,指出地图制图正经历“大变局”。 适合人群:从事地图制图及相关领域的研究人员、工程师,以及对AI与地图生成感兴趣的学者。 使用场景及目标:①理解AI技术在地图生成中的应用,特别是DeepSeek的作用;②掌握智能化地图生成器DoMapAI的工作原理及其应用场景;③学习地图语言Token化的方法及其在地图生成中的应用;④探索AI时代地图制图的职业发展方向和技术变革。 阅读建议:本文内容较为专业,建议读者先了解基本的AI技术和地图制图知识。重点关注DeepSeek与地图融合的具体方法和应用场景,理解智能化地图生成器DoMapAI的工作流程,以及地图语言Token化的实现过程。在阅读过程中,可以结合实际案例进行思考,以更好地理解AI技术对地图制图的影响。
chromedriver-mac-arm64-135.0.7049.114.zip
《网络布线与小型局域网搭建(第2版)》第3章-布线系统的设计.ppt
内容概要:本文详细介绍了使用Abaqus软件进行子弹穿钢板模型的模拟方法,重点探讨了CAE文件的作用和创建过程。首先概述了子弹穿钢板模拟的重要性和应用场景,接着深入讲解了CAE文件的概念及其作为模拟‘大脑’的关键地位。文中提供了详细的Python代码示例,涵盖创建部件、定义材料属性、划分网格、设置接触条件以及显式动力学分析步骤等方面的内容。此外,还讨论了网格划分的艺术、接触设置的注意事项、求解器参数的选择以及后处理技巧,强调了每个环节的具体操作和优化建议。 适合人群:从事工程模拟领域的研究人员和技术人员,尤其是对Abaqus软件有一定了解并希望深入掌握其高级特性的用户。 使用场景及目标:适用于需要模拟高速冲击条件下材料行为的研究项目,如防护材料研发、结构抗冲击设计等。通过学习本文,读者能够掌握创建复杂工程模拟模型的方法,提高模拟效率和准确性。 其他说明:文章不仅提供了理论指导,还包括大量实用的操作提示和代码片段,有助于读者快速上手并在实践中不断改进模型。同时,文中提到的一些优化技巧对于提升计算性能和结果可靠性具有重要价值。
内容概要:本文详细介绍了机器视觉系统的关键技术及其应用,涵盖相机标定、OpenCV图像处理以及QT界面开发。首先,文章讲解了相机标定的基本概念和实现方法,通过OpenCV的camera_calibration工具进行标定,确保图像处理和识别的准确性。接着,探讨了图像处理的各种技术,如边缘检测、阈值处理和轮廓检测,展示了如何利用OpenCV库对图像进行预处理。随后,介绍了QT界面开发,通过PyQt5创建了一个直观友好的界面,使用户能够实时查看处理结果并控制设备。最后,讨论了视觉识别与抓取的具体实现,包括物体识别、坐标转换和机械臂控制,强调了多传感器融合的重要性。 适合人群:具备一定编程基础,尤其是对机器视觉感兴趣的开发者和技术爱好者。 使用场景及目标:适用于工业自动化、智能制造等领域,旨在帮助读者理解和实现完整的机器视觉系统,提高生产效率和精度。 其他说明:文中不仅提供了详细的代码示例,还分享了许多实践经验,如标定板制作、图像格式转换等,有助于读者避免常见错误并优化系统性能。