前言:
本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述。避免干巴巴的讲理论。
1. 建立Maven项目
我们用Maven来管理项目,方便lib依赖的引用和版本控制。
建立最基本的pom.xml如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.edi.storm</groupId>
- <artifactId>storm-samples</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <repositories>
- <repository>
- <id>clojars.org</id>
- <url>http://clojars.org/repo</url>
- </repository>
- </repositories>
- <build>
- <finalName>storm-samples</finalName>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.9.0-rc2</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </project>
这里我额外添加了两个build 插件:
maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.
和
maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。
2. 建立Spout
前文提到过,Storm中的spout负责发射数据。
我们来实现这样一个spout:
它会随机发射一系列的句子,句子的格式是 谁:说的话
代码如下:
- public class RandomSpout extends BaseRichSpout {
- private SpoutOutputCollector collector;
- private Random rand;
- private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.rand = new Random();
- }
- @Override
- public void nextTuple() {
- String toSay = sentences[rand.nextInt(sentences.length)];
- this.collector.emit(new Values(toSay));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("sentence"));
- }
- }
这里要先理解Tuple的概念。
Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。
该Spout代码里面最核心的部分有两个:
a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }
b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。
3. 建立Bolt
既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。
两个功能,两个Bolt。
先看添加"!"的Bolt
- public class ExclaimBasicBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- //String sentence = tuple.getString(0);
- String sentence = (String) tuple.getValue(0);
- String out = sentence + "!";
- collector.emit(new Values(out));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("excl_sentence"));
- }
- }
在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。
Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。
取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"
打印Bolt
- public class PrintBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String rec = tuple.getString(0);
- System.err.println("String recieved: " + rec);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // do nothing
- }
- }
仍然是取第一个,因为我们并没有定义过第二个value
4. 建立Topology
现在我们建立拓扑结构的主要组件都有了,可以创建topology了。
- public class ExclaimBasicTopo {
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomSpout());
- builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
- builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
- Config conf = new Config();
- conf.setDebug(false);
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
- }
- }
很简单,对吧。
其中,
- builder.setSpout("spout", new RandomSpout());
定义一个spout,id为"spout"
- builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple
- builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple
- .shuffleGrouping
是指明Storm按照何种策略将tuple分配到后续的bolt去。
可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。
把项目用M2E插件导入Eclipse直接运行试试
- String recieved: marry:I'm angry!
- String recieved: edi:I'm happy!
- String recieved: john:I'm sad!
- String recieved: edi:I'm happy!
- String recieved: ted:I'm excited!
- String recieved: laden:I'm dangerous!
- String recieved: edi:I'm happy!
- String recieved: edi:I'm happy!
这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。
我们修改下代码,指定并行数
- builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
- builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");
由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。
为了方便体现确实是并行,我们修改PrintBolt代码如下:
- public class PrintBolt extends BaseBasicBolt {
- private int indexId;
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- this.indexId = context.getThisTaskIndex();
- }
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String rec = tuple.getString(0);
- System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // do nothing
- }
- }
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。
运行下看看:
- Bolt[0] String recieved: marry:I'm angry!
- Bolt[2] String recieved: john:I'm sad!
- Bolt[2] String recieved: ted:I'm excited!
- Bolt[2] String recieved: john:I'm sad!
- Bolt[2] String recieved: john:I'm sad!
证实确实是并发了。
本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行
- ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test
在StormUI里面,点进 test
看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。
截止到这里,一个简单的Storm的topology已经完成了。
但是,这里依然有些问题:
1. 什么是acker?
2. Bolt为什么有两个继承类和接口?
3. Topology的提交方式到底有几种?
4. 除了随机分组,还有哪些分组策略?
5. Storm是如何保证tuple不被丢失的?
6. 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep?
7. 并发数要如何指定呢?
http://blog.csdn.net/xeseo/article/details/17683049
相关推荐
这种软件基本上用自制的dll(因为暴雪只提供了读mpq的storm.dll,没有写入),按照mpq文件格式非常循规蹈矩地一步步读出内容。问题在于mpq数据稍有不对就会导致崩溃。例如header中mpq文件大小这项数据,war3读地图的...
总结来说,董西成在“Hadoop英雄会——暨Hadoop 10周年生日大趴”中详细介绍了Hadoop YARN的架构、API、通信协议以及如何在YARN上设计和运行应用程序。通过其演讲内容,我们可以看到YARN在构建现代大数据处理和分析...
《数据分析基础——基于Excel和SPSS》这本教材的习题答案涵盖了数据分析的基本概念和实践操作,主要针对使用Excel和SPSS这两种常见的数据分析工具。在学习这个知识点时,我们需要理解以下几个核心要点: 1. **数据...
- 例子1:原句"The keeper who had lived in the light tower for a long time, so he could keep calm in the storm." 纠正后:"The keeper, who had lived in the light tower for a long time, could keep calm ...
- **使用Storm开发一个WordCount例子**:通过WordCount示例来演示Storm的应用开发过程。 - **Storm程序本地模式debug、Storm程序远程debug**:指导如何调试Storm程序。 - **Storm事务处理**:介绍Storm如何支持事务...
文档中提到的例子包括“销量实时指标-某东”、“用户实时指标-某观”等,这些都是典型的实时指标应用场景。以销量实时指标为例,通过Spark Streaming,电商平台可以实时监控商品销售情况,快速调整库存和促销策略。 ...
《大数据导论》是南开大学的一门重要课程,涵盖了大数据的基本概念、技术体系和面临的挑战。本课程涉及的知识点广泛,主要包括以下几个方面: 1. **NoSQL数据库**:课程提到了MongoDB、HBase和Cassandra作为NoSQL...
- **非结构化数据**:与之相对的是非结构化数据,这类数据不遵循固定的格式或模型,难以直接存储在传统的表格形式中。例如,电子邮件、社交媒体帖子、图片和视频等。由于其复杂性和多样性,处理非结构化数据通常需要...
大数据的特点,即“4V”——Volume(大量)、Variety(多样)、Velocity(快速)和Value(价值),使得它能有效地处理大规模、多类型、高速度的数据,从而提高安全分析的效率和准确性。 1. **大数据分析在信息安全...
例如,TMF列出了34个具体的应用场景,如基于位置的实时个性化推荐、网络故障检测和恢复,以及基于价值的网络规划等,这些都是大数据在实际业务中发挥价值的例子。 总的来说,大数据时代为企业带来了前所未有的机遇...
大数据的4V特征——Volume(大量)、Velocity(高速)、Variety(多样)、Value(价值)——意味着你需要处理的数据不仅量大,而且类型多样,价值密度低。因此,了解并掌握各种大数据工具至关重要。例如,Hadoop ...
下面,我们通过分析题目中的例子来深入理解这个结构。 1. 题目中的句子"The thief stood before the policeman, _______ admitting what he had done." 选项D "with his head down" 是正确的。这里"with his head ...
例如:"She dressed like a man so that/in order that she could join the army." 在这个例子中,"so that/in order that she could join the army" 表明了她装扮成男人的原因——为了参军。 接着,结果状语从句...