Storm 学习记录
原创编写: 王宇
2016-10-20
Storm 概念
Let us now have a closer look at the components of Apache Storm −
Tuple | Tuple is the main data structure in Storm. It is a list of ordered elements. By default, a Tuple supports all data types. Generally, it is modelled as a set of comma separated values and passed to a Storm cluster. |
Stream | Stream is an unordered sequence of tuples. |
Spouts | Source of stream. Generally, Storm accepts input data from raw data sources like Twitter Streaming API, Apache Kafka queue, Kestrel queue, etc. Otherwise you can write spouts to read data from datasources. “ISpout” is the core interface for implementing spouts. Some of the specific interfaces are IRichSpout, BaseRichSpout, KafkaSpout, etc. |
Bolts | Bolts are logical processing units. Spouts pass data to bolts and bolts process and produce a new output stream. Bolts can perform the operations of filtering, aggregation, joining, interacting with data sources and databases. Bolt receives data and emits to one or more bolts. “IBolt” is the core interface for implementing bolts. Some of the common interfaces are IRichBolt, IBasicBolt, etc. |
Topology | Spouts and bolts are connected together and they form a topology. Real-time application logic is specified inside Storm topology. In simple words, a topology is a directed graph where vertices are computation and edges are stream of data. |
Tasks | In simple words, a task is either the execution of a spout or a bolt. |
Nimbus | Nimbus is a master node of Storm cluster. All other nodes in the cluster are called as worker nodes. Master node is responsible for distributing data among all the worker nodes, assign tasks to worker nodes and monitoring failures. |
Supervisor | The nodes that follow instructions given by the nimbus are called as Supervisors. A supervisor has multiple worker processes and it governs worker processes to complete the tasks assigned by the nimbus. |
Worker process | A worker process will execute tasks related to a specific topology. A worker process will not run a task by itself, instead it creates executors and asks them to perform a particular task. A worker process will have multiple executors. |
Executor | An executor is nothing but a single thread spawn by a worker process. An executor runs one or more tasks but only for a specific spout or bolt. |
Task | A task performs actual data processing. So, it is either a spout or a bolt. |
ZooKeeper framework | Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintaining shared data with robust synchronization techniques. Nimbus is stateless, so it depends on ZooKeeper to monitor the working node status. |
ZooKeeper helps the supervisor to interact with the nimbus. It is responsible to maintain the state of nimbus and supervisor.|
- Stream Grouping(消息分发策略)
- Shuffle Grouping 随机分组
- Fields Grouping 按字段分组
- All Grouping 广播发送,对于每个tuple, 所有Bolts都会收到
- Global Grouping 全局分组
- None Grouping 同随机分组相同
- Direct Grouping 指向分组
- Local or shuffle Grouping 本地或随机分组
Storm Workflow
- Local Mode
- Production Mode
Storm 配置
- 步骤一: 安装JDK 并配置环境变量 JAVA_HOME CLASSPATH
-
步骤二 : 安装ZooKeeper
下载ZooKeeper
解包$ tar xzvf zookeeper-3.5.2-alpha.tar.gz
$ mv ./zookeeper-3.5.2-alpha /opt/zookeepter
$ cd /opt/zookeeper
$ mkdir data
创建配置文件
$ cd /opt/zookeeper
$ vim conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
启动ZooKeeper Seve
$ bin/zkServer.sh start
-
步骤三:在安装配置Storm
下载Storm
解包$ tar xvfz apache-storm-1.0.2.tar.gz
$ mv apache-storm-1.0.2/opt/storm
$ cd /opt/storm
$ mkdir data
编辑Storm配置
$ cd /opt/storm
$ vim conf/storm.yaml
storm.zookeeper.servers:
-"localhost"
storm.local.dir:“/path/to/storm/data(any path)”
nimbus.host:"localhost"
supervisor.slots.ports:
-6700
-6701
-6702
-6703
ui.port:6969
启动 Nimbus
$ cd /opt/storm
$ ./bin/storm nimbus
启动 Supervisor
$ cd /opt/storm
$ ./bin/stormi supervisor
启动 UI
$ cd /opt/storm
$ ./bin/storm ui
在Storm上开发实现一个统计任务
- 场景 - 统计移动电话的数量.
在Spout中,准备4个电话号码和电话之间随机通话数量。
分别创建不同的Bolt,用于统计
使用 Topology 将 Spout 和 Bolt 关联起来 - 以下程序在Ubuntu 16.04 64位 JDK1.8 环境下编译执行通过
-
创建 Spout 组件
Spout 需要继承 IRichSpout 接口, 接口描述如下:open − Provides the spout with an environment to execute. The executors will run this method to initialize the spout.
nextTuple − Emits the generated data through the collector.
close − This method is called when a spout is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.
ack − Acknowledges that a specific tuple is processed
fail − Specifies that a specific tuple is not processed and not to be reprocessed.import java.util.*;
//import storm tuple packages
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import Spout interface packages
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities
publicclassFakeCallLogReaderSpoutimplementsIRichSpout{
//Create instance for SpoutOutputCollector which passes tuples to bolt.
privateSpoutOutputCollector collector;
privateboolean completed =false;
//Create instance for TopologyContext which contains topology data.
privateTopologyContext context;
//Create instance for Random class.
privateRandom randomGenerator =newRandom();
privateInteger idx =0;
@Override
publicvoid open(Map conf,TopologyContext context,SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
publicvoid nextTuple(){
if(this.idx <=1000){
List<String> mobileNumbers =newArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx =0;
while(localIdx++<100&&this.idx++<1000){
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber){
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(newValues(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("from","to","duration"));
}
//Override all the interface methods
@Override
publicvoid close(){}
publicboolean isDistributed(){
returnfalse;
}
@Override
publicvoid activate(){}
@Override
publicvoid deactivate(){}
@Override
publicvoid ack(Object msgId){}
@Override
publicvoid fail(Object msgId){}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
-
创建 Bolt 组件
Bolt 需要继承 IRichBolt 接口, 接口描述如下prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
execute − Process a single tuple of input.
cleanup − Called when a bolt is going to shutdown.
declareOutputFields − Declares the output schema of the tuple.//import util packages
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
//import Storm IRichBolt package
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
publicclassCallLogCreatorBoltimplementsIRichBolt{
//Create instance for OutputCollector which collects and emits tuples to produce output
privateOutputCollector collector;
@Override
publicvoid prepare(Map conf,TopologyContext context,OutputCollector collector){
this.collector = collector;
}
@Override
publicvoid execute(Tuple tuple){
Stringfrom= tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(newValues(from+" - "+ to, duration));
}
@Override
publicvoid cleanup(){}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("call","duration"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
publicclassCallLogCounterBoltimplementsIRichBolt{
Map<String,Integer> counterMap;
privateOutputCollector collector;
@Override
publicvoid prepare(Map conf,TopologyContext context,OutputCollector collector){
this.counterMap =newHashMap<String,Integer>();
this.collector = collector;
}
@Override
publicvoid execute(Tuple tuple){
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call,1);
}else{
Integer c = counterMap.get(call)+1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
publicvoid cleanup(){
for(Map.Entry<String,Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : "+ entry.getValue());
}
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(newFields("call"));
}
@Override
publicMap<String,Object> getComponentConfiguration(){
returnnull;
}
}
-
创建 Topology 和 Local Cluster
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import storm configuration packages
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
publicclassLogAnalyserStorm{
publicstaticvoid main(String[] args)throwsException{
//Create Config instance for cluster configuration
Config config =newConfig();
config.setDebug(true);
//
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout("call-log-reader-spout",newFakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt",newCallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt",newCallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt",newFields("call"));
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
- 远程模式
http://storm.apache.org/releases/current/Distributed-RPC.html -
编译并运行应用
$ cd /opt/storm/my-example
$ javac *.java
$ java LogAnalyserStorm
-
输出结果
1234123402-1234123401:78
1234123402-1234123404:88
1234123402-1234123403:105
1234123401-1234123404:74
1234123401-1234123403:81
1234123401-1234123402:81
1234123403-1234123404:86
1234123404-1234123401:63
1234123404-1234123402:82
1234123403-1234123402:83
1234123404-1234123403:86
1234123403-1234123401:93
参考
Storm 官网 : http://storm.apache.org
教程 : https://www.tutorialspoint.com/apache_storm/index.htm
Storm-Java Doc http://storm.apache.org/releases/current/javadocs/index.html
- PDF
《Storm Applied》
《Getting Started with Storm》
《Storm Real-time Processing Cookbook》
《Learning Storm》
《Storm Blueprints:Patterns for Distributed Real-time Computation》
《Hadoop The Definitive Guide》
相关推荐
根据《get started with storm》.pdf写的storm学习笔记
Storm学习文档 Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。该文档对 Storm 的学习笔记,总结了 Storm 的架构、组件、工作流程等关键知识点。 一、Storm 架构 Storm 的架构主要由四个组件组成...
《Storm实战笔记》是一份详尽的资料集,旨在引导读者从零基础开始掌握Apache Storm这一实时计算框架。这份笔记涵盖了多个关键领域的应用,包括入门基础、架构集成、日志处理以及复杂业务场景的风控与推荐系统。接...
IT十八掌第三期配套资料!...1、Storm介绍及特点 2、storm的优势与应用 3、storm使用和配置 4、配置storm并发度 5、配置storm完全分布式集群 6、storm开发环境与生产环境 7、storm的topology再平衡 8、分组、自定义分组
**Storm笔记概述** Storm是一个分布式实时计算系统,由Twitter开源并广泛应用于...通过对Storm的学习,我们可以理解实时数据处理的核心原理,掌握如何构建和优化实时数据处理系统,为企业提供更敏捷、高效的决策支持。
【Storm 深入学习】 Storm 是一个分布式实时计算系统,它允许开发者处理无界数据流,具有高可用性、容错性和低延迟的特点。在深入理解 Storm 的核心概念和特性之前,首先需要知道它的记录级容错原理,这是 Storm ...
《Storm深入学习》 在大数据实时处理领域,Apache Storm是一个不可或缺的工具,它提供了一种高效、可扩展的方式来处理无界的数据流。本篇深入探讨了Storm的核心概念和使用技巧,包括基本Bolt的实现、批处理策略、...
十八掌徐培成 storm 入门到精通视频讲解,总共5天,20个视频
标题中的"storm0.9.0jar包"指的是Apache Storm的0.9.0版本的JAR文件。Apache Storm是一个开源的分布式实时计算系统,它...通过深入学习Storm的架构、API和最佳实践,开发者可以构建出高效、可靠的实时数据处理系统。
为了利用这个资源,你需要解压文件,查阅文档,或者运行其中的示例代码来学习如何结合Storm进行Web Service的调试实践。 总之,通过Apache Storm调试Web Service是一个高效且强大的方法,它允许我们在大规模数据流...
### Apache Storm 学习记录与安装指南 #### 一、Apache Storm 概述 Apache Storm 是一款免费且开源的分布式实时计算系统。它能够保证每个消息都能够被处理,并且能够非常方便地做到水平扩展。Storm 的设计使得它...
3. 学习曲线:3轴调参可能需要一定的学习和实践,理解参数含义是关键。 通过以上对StorM32 BGC 3轴调参的深入解析,我们可以看出,调参不仅是提升设备性能的重要环节,也是一个结合理论与实践的技术活。掌握好这一...
### Storm原理分析 #### 一、Storm基本结构 ...通过对 Storm 的基本结构、元数据管理、通信机制、调度策略、ACK 框架以及 Storm-Kafka 集成等方面的学习,开发者可以更好地理解和利用 Storm 的强大功能。
标题中的"Storm第02天"表明我们正在讨论Apache Storm技术的第二天学习内容,这可能是一个...以上就是根据提供的信息推测的Storm学习内容,涵盖了理论知识和实践操作,旨在帮助学习者掌握Apache Storm的核心概念和技术。
### Storm 从零到精通知识点解析 #### 一、Storm简介 **1.1 什么是Storm** Apache Storm 是一个开源的分布式实时计算系统,能够处理大量...以上总结了从零开始学习 Storm 的基础知识和技术要点,希望对你有所帮助。
**结论**:通过本章的学习,我们了解到 Storm 的基本架构和组件,以及如何构建一个简单的 Topology。 #### 第三章 Topologies **流分组**: - **Shuffle 分组**:随机将 tuple 发送到 bolt 的实例。 - **Fields ...
《Storm实战构建大数据实时计算》一书主要涵盖了利用Apache Storm进行大数据实时处理的...通过深入学习和实践《Storm实战构建大数据实时计算》,读者将能够熟练运用Storm解决各种实时计算问题,提升大数据处理能力。
在这个案例中,Kafka被用来收集来自电信网络的各种实时数据,如基站状态、通话记录等。这些数据通过Kafka的生产者发送到不同的主题(topics),然后由消费者组实时消费并进行后续处理。 Storm是Twitter开源的分布式...