- 浏览: 501442 次
- 性别:
- 来自: 广州
文章分类
- 全部博客 (502)
- Java (70)
- Linux (10)
- 数据库 (38)
- 网络 (10)
- WEB (13)
- JSP (4)
- 互联网 (71)
- JavaScript (30)
- Spring MVC (19)
- HTML (13)
- CSS (3)
- AngularJS (18)
- Redis (5)
- Bootstrap CSS (1)
- ZooKeeper (4)
- kafka (6)
- 服务器缓存 (4)
- Storm (1)
- MongoDB (9)
- Spring boot (16)
- log4j (2)
- maven (3)
- nginx (5)
- Tomcat (2)
- Eclipse (4)
- Swagger (2)
- Netty (5)
- Dubbo (1)
- Docker (7)
- Hadoop (12)
- OAuth (1)
- webSocket (4)
- 服务器性能 (7)
- Session共享 (1)
- tieye修改 (1)
- 工作 (1)
- 有用的语录 (0)
- https (2)
- common (5)
- 产品开发管理 (1)
- CDN 工作原理 (1)
- APNS、GCM (1)
- 架构图 (3)
- 功能实现分析 (1)
- JMX (1)
- 服务器相关操作命令 (1)
- img02 (0)
- 服务器环境搭建 (9)
- goodMenuBook (1)
- CEInstantPot (0)
- 有用数据 (1)
- 百度地图WEB API (2)
- 正则表达式 (1)
- 样式例子 (2)
- staticRecipePressureCooker.zip (1)
- jCanvas (1)
- 网站攻击方法原理 (1)
- 架构设计 (3)
- 物联网相关 (3)
- 研发管理 (7)
- 技术需求点 (1)
- 计划 (1)
- spring cloud (11)
- 服务器开发的一些实用工具和方法 (1)
- 每天学到的技术点 (4)
- Guava (1)
- ERP 技术注意要点 (2)
- 微信小程序 (1)
- FineRepor (1)
- 收藏夹 (1)
- temp (5)
- 服务架构 (4)
- 任职资格方案 (0)
- osno_test (1)
- jquery相关 (3)
- mybatis (4)
- ueditor (1)
- VueJS (7)
- python (10)
- Spring EL (1)
- shiro (1)
- 前端开发原理与使用 (7)
- YARN (1)
- Spark (1)
- Hbase (2)
- Pig (2)
- 机器学习 (30)
- matplotlib (1)
- OpenCV (17)
- Hystrix (1)
- 公司 (1)
- miniui (4)
- 前端功能实现 (3)
- 前端插件 (1)
- 钉钉开发 (2)
- Jenkins (1)
- elasticSearch使用 (2)
- 技术规范 (4)
- 技术实现原理 (0)
最新评论
Storm 工作原理
Storm简介
1.Storm是一套分布式的、可靠的,可容错的用于处理流式数据的系统。
2.Storm也是基于C/S架构来进行工作的,C负责将数据处理的方式的jar(Topology)发送给S,S解析C发送过来的jar(Topology),并按一定规则jar变成多个Task((Spout/Bolt)),生成相关的进程和线程运行里面的Task。
相关述语说明:
1.Topology(拓扑):storm中运行的一个实时应用程序(Storm的一个任务单元),因为各个组件间的消息流动形成逻辑上的一个拓扑结构(所以叫Topology)。Topolog是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接组成的图。
2.tuple(元组):一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
3.Stream:以tuple为单位组成的一条有向无界的数据流。(就是tuple在各个组件中流动时的描述)
4.Spout组件:就是一个继承了某个基类的类,里面有类的方法进行相关的操作,用于获取数据,并传递数据到Bolt。
5.Bolt组件:就是一个继承了某个基类的类,里面有类的方法进行相关的操作,用于对Spout组件发送过来的数据进行处理。
6.Worker进程,用于运行Topology子集(可能Topology的不同组件(Spout/Bolt)会放在不同的Worker进程来运行)的进程。
7.executor线程,为Worker进程中的一个线程,executor可能会同时运行多个组件(Spout/Bolt),当然同一个executor运行的组件类型是一样的。
8.Task,任务,就是组件(Spout/Bolt),一般是一个executor线程运行一个Task
9.Nimbus进程,控制节点(Nimbus节点),主结点运行一个叫做Nimbus的守护进程,它负责在集群内分发代码,为每个工作结点指派任务和监控失败的任务。
10.Supervisor进程,工作节点(Supervisor节点),工作结点运行一个叫做Supervisor的守护进程,每个工作节点都是topology中一个子集的实现。
11.zookeeper,集群协调软件(C/S),是完成nimbus和supervisor之间协调的服务。
12.storm UI,只提供对topology的监控和统计。
架构图:
topology工作原理
1.Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。
2.所有Topology任务的 提交必须在Storm客户端节点上进行(需要配置 storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。
3.Nimbus节点首先将提交的Topology进行分片(Spout/Bolt),分成一个个的Task,并将Task和Supervisor相关的信息提交到 zookeeper集群上。
4.Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。
topology工作流程
1.提交Topology后,Storm会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个 stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件
2.在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker(Topology的worker配置参数)的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm(随机申请到可用的就OK)本身决定的。
3.Storm看一下那些Worker进程可用,就申请worker(Topology的worker配置参数)的数目给这个Topology。
4.Storm尽量平均的分配这些task到worker。
5.任务分配好之后,Nimbus节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息。
6.Supervisor 节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目 录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行。
7.一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。
8.最后一步会不间断的执行,除非手动结束Topology。
Spout组件
1.Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他Spout类来完成,也可以通过实现IRichSpout接口来实现.
2.nextTuple、ack 和fail 都在spout任务的同一个线程中被循环调用。当没有元组的发射时,应该让nextTuple睡眠一个很短的时间(如一毫秒),以免浪费太多的CPU。
3.继承了BaseRichSpout后,不用实现close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只关心最基本核心的部分。(帮你实现了一个默认的)
4.实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout
open()方法
1.初始化方法
close()方法
1.在该spout将要关闭时调用。
2.但是不保证其一定被调用,因为在集群中supervisor节点,可以使用kill -9来杀死worker进程。
3.只有当Storm是在本地模式下运行,如果是发送停止命令,可以保证close的执行
ack(Object msgId)方法
1.成功处理tuple时回调的方法,通常情况下,此方法的实现是将消息队列中的消息移除,防止消息重放
fail(Object msgId)方法
1.处理tuple失败时回调的方法,通常情况下,此方法的实现是将消息放回消息队列中然后在稍后时间里重放
nextTuple()方法
1.这是Spout类中最重要的一个方法。
2.发射一个Tuple到Topology都是通过这个方法来实现的。
3.调用此方法时,让spout发出元组(tuple)到输出器(ouput collector)。
4.这个方法会不断被调用,所以在没有tuple要处理时最好睡眠一会,让出CPU。
Bolt组件
1.Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。
2.Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口等来完成
3.实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现 IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动实现了collector.emit.ack(inputTuple)
prepare方法
1.此方法和Spout中的open方法类似,在集群中一个worker中的task初始化时调用。 它提供了bolt执行的环境
declareOutputFields方法
1.用于声明当前Bolt发送的Tuple中包含的字段(field),和Spout中类似,就是用new Values(word1,word2)放入到Tuple中时field1->word1,field2->word2的意思,(不用你每次都写MAP中的key)
cleanup方法
1.同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
execute方法
1.这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。
2.execute接受一个 tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。
3.Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。如果你确实要反馈失败,可以抛出FailedException
4.当有Tuple到达时会调用这个接口。
可靠的bolts和不可靠的bolts
1.每个节点都会调用ack(tuple)或fail(tuple),Storm因此知道一条消息在这个bolt是否失败了,并通知那个/那些制造了这些消息的spout(s)。
2.如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。
3.你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。
4.一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。(多数据流)
5.extends BaseBasicBolt,会在执行execute方法之后自动调用ack方法。(内部实现了,不用你实现)
6.ack方法(表示成功)或fail(表示失败),只是为实现可靠提供条件,要实现可靠性还要你在spout进行正确和失败的逻辑处理来实现。
TopologyBuilder(Topology定义类)
1.TopologyBuilder将用来创建拓扑,它决定Storm如何安排各节点,以及它们交换数据的方式。
2.在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。(其中一种方式,就是消息发送到bolt的方式(因为可能会运行多个bolt))
3.增加组件的方法说明:
4.Bolt还要指定应该接收那个流作为输入,以何种流分组方式进行流分发(就是按何种方式对tuple进行分送到不同的Bolt(这些Bolt是同一个处理Bolt的不同任务实例))
Config
1.就是一个MAP,你可以增加你需要的参数进去,在spout的open中可以得到这个引用
2.也有一些是定义好的,如:TOPOLOGY_MAX_SPOUT_PENDING(默认1个),缓存spout发送出去的tuple,当缓存中的tuple多于这个值时spout就挂起,等待Bolt消费到小于这个值时再运行。(这个属性只对可靠消息处理有用)
Stream Groupings(流分组方式)
1.Stream Grouping定义了一个流在Bolt任务间该如何被切分,说白了就是当有多个处理相同任务的Bolt任务(并行的Bolt)时,将按什么方式分配消息到这些并行的Bolt中的去处理。
2.torm提供的6个Stream Grouping类型:
(1).随机分组(Shuffle grouping):随机分发tuple到并行的Bolt的任务,保证每个任务获得相等数量的tuple。
(2).字段分组(Fields grouping):根据指定字段(key)分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
(a).按那个字段里的值进行分组。不是这个字段(key)的名来分组。
(b).分组后,字段里值相同的都会发送到同一个Bolt的实例中去处理(并行的Bolt)。
(3).全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
(4).全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
(5).无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
(6).直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
Topology运行方式,Storm有两种运行方式
本地运行的提交方式
分布式提交方式
注意:
在Storm代码编写完成之后,需要打包成jar包放到Nimbus中运行,打包的时候,不需要把依赖的jar都打迚去,否则如果把依赖的 storm.jar包打进去的话,运行时会出现重复的配置文件错误导致Topology无法运行。因为Topology运行之前,会加载本地的 storm.yaml 配置文件。
运行的命令如下: storm jar StormTopology.jar mainclass [args]
例子:
参考原文:http://www.open-open.com/lib/view/open1430095563146.html
参考原文:http://ifeve.com/getting-started-with-stom-index/
参考原文:https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter2/Hello%20World%20Storm.md#%E5%88%9B%E5%BB%BA%E6%88%91%E4%BB%AC%E7%9A%84%E7%AC%AC%E4%B8%80%E4%B8%AA%E6%8B%93%E6%89%91
参考原文:http://blog.itpub.net/29754888/viewspace-1260026/
参考原文:https://book.douban.com/reading/33008852/
参考原文:http://shiyanjun.cn/archives/977.html
相关jar包:
Storm简介
1.Storm是一套分布式的、可靠的,可容错的用于处理流式数据的系统。
2.Storm也是基于C/S架构来进行工作的,C负责将数据处理的方式的jar(Topology)发送给S,S解析C发送过来的jar(Topology),并按一定规则jar变成多个Task((Spout/Bolt)),生成相关的进程和线程运行里面的Task。
相关述语说明:
1.Topology(拓扑):storm中运行的一个实时应用程序(Storm的一个任务单元),因为各个组件间的消息流动形成逻辑上的一个拓扑结构(所以叫Topology)。Topolog是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接组成的图。
2.tuple(元组):一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
3.Stream:以tuple为单位组成的一条有向无界的数据流。(就是tuple在各个组件中流动时的描述)
4.Spout组件:就是一个继承了某个基类的类,里面有类的方法进行相关的操作,用于获取数据,并传递数据到Bolt。
5.Bolt组件:就是一个继承了某个基类的类,里面有类的方法进行相关的操作,用于对Spout组件发送过来的数据进行处理。
6.Worker进程,用于运行Topology子集(可能Topology的不同组件(Spout/Bolt)会放在不同的Worker进程来运行)的进程。
7.executor线程,为Worker进程中的一个线程,executor可能会同时运行多个组件(Spout/Bolt),当然同一个executor运行的组件类型是一样的。
8.Task,任务,就是组件(Spout/Bolt),一般是一个executor线程运行一个Task
9.Nimbus进程,控制节点(Nimbus节点),主结点运行一个叫做Nimbus的守护进程,它负责在集群内分发代码,为每个工作结点指派任务和监控失败的任务。
10.Supervisor进程,工作节点(Supervisor节点),工作结点运行一个叫做Supervisor的守护进程,每个工作节点都是topology中一个子集的实现。
11.zookeeper,集群协调软件(C/S),是完成nimbus和supervisor之间协调的服务。
12.storm UI,只提供对topology的监控和统计。
架构图:
topology工作原理
1.Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。
2.所有Topology任务的 提交必须在Storm客户端节点上进行(需要配置 storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。
3.Nimbus节点首先将提交的Topology进行分片(Spout/Bolt),分成一个个的Task,并将Task和Supervisor相关的信息提交到 zookeeper集群上。
4.Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。
topology工作流程
1.提交Topology后,Storm会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个 stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件
2.在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker(Topology的worker配置参数)的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm(随机申请到可用的就OK)本身决定的。
3.Storm看一下那些Worker进程可用,就申请worker(Topology的worker配置参数)的数目给这个Topology。
4.Storm尽量平均的分配这些task到worker。
5.任务分配好之后,Nimbus节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息。
6.Supervisor 节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目 录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行。
7.一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。
8.最后一步会不间断的执行,除非手动结束Topology。
Spout组件
1.Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他Spout类来完成,也可以通过实现IRichSpout接口来实现.
2.nextTuple、ack 和fail 都在spout任务的同一个线程中被循环调用。当没有元组的发射时,应该让nextTuple睡眠一个很短的时间(如一毫秒),以免浪费太多的CPU。
3.继承了BaseRichSpout后,不用实现close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只关心最基本核心的部分。(帮你实现了一个默认的)
4.实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout
public interface ISpout extends Serializable { void open(Map conf, TopologyContext context, SpoutOutputCollector collector); void close(); void nextTuple(); void ack(Object msgId); void fail(Object msgId); }
open()方法
1.初始化方法
close()方法
1.在该spout将要关闭时调用。
2.但是不保证其一定被调用,因为在集群中supervisor节点,可以使用kill -9来杀死worker进程。
3.只有当Storm是在本地模式下运行,如果是发送停止命令,可以保证close的执行
ack(Object msgId)方法
1.成功处理tuple时回调的方法,通常情况下,此方法的实现是将消息队列中的消息移除,防止消息重放
fail(Object msgId)方法
1.处理tuple失败时回调的方法,通常情况下,此方法的实现是将消息放回消息队列中然后在稍后时间里重放
nextTuple()方法
1.这是Spout类中最重要的一个方法。
2.发射一个Tuple到Topology都是通过这个方法来实现的。
3.调用此方法时,让spout发出元组(tuple)到输出器(ouput collector)。
4.这个方法会不断被调用,所以在没有tuple要处理时最好睡眠一会,让出CPU。
Bolt组件
1.Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。
2.Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口等来完成
3.实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现 IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动实现了collector.emit.ack(inputTuple)
prepare方法
1.此方法和Spout中的open方法类似,在集群中一个worker中的task初始化时调用。 它提供了bolt执行的环境
declareOutputFields方法
1.用于声明当前Bolt发送的Tuple中包含的字段(field),和Spout中类似,就是用new Values(word1,word2)放入到Tuple中时field1->word1,field2->word2的意思,(不用你每次都写MAP中的key)
cleanup方法
1.同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
execute方法
1.这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。
2.execute接受一个 tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。
3.Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。如果你确实要反馈失败,可以抛出FailedException
4.当有Tuple到达时会调用这个接口。
可靠的bolts和不可靠的bolts
1.每个节点都会调用ack(tuple)或fail(tuple),Storm因此知道一条消息在这个bolt是否失败了,并通知那个/那些制造了这些消息的spout(s)。
class SplitSentence implenents IRichBolt { private OutputCollector collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(tuple, new Values(word)); } collector.ack(tuple); } public void cleanup(){} public void declareOutputFields(OutputFieldsDeclarer declarer){ declar.declare(new Fields("word")); } }
2.如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。
3.你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。
4.一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。(多数据流)
5.extends BaseBasicBolt,会在执行execute方法之后自动调用ack方法。(内部实现了,不用你实现)
class SplitSentence extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word : sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
6.ack方法(表示成功)或fail(表示失败),只是为实现可靠提供条件,要实现可靠性还要你在spout进行正确和失败的逻辑处理来实现。
TopologyBuilder(Topology定义类)
1.TopologyBuilder将用来创建拓扑,它决定Storm如何安排各节点,以及它们交换数据的方式。
2.在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。(其中一种方式,就是消息发送到bolt的方式(因为可能会运行多个bolt))
3.增加组件的方法说明:
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) id:这个组件的ID spout:我们定义好的spout组件 parallelism_hint:并行数,就是这个spout组件并列运行多少个(多任务)(默认1个)
TopologyBuilder.setBolt(String id, IBasicBolt bolt, Number parallelism_hint) id:这个组件的ID bolt:我们定义好的Bolt组件 parallelism_hint:并行数,就是这个bolt组件并列运行多少个(多任务)(默认1个) setBolt返回一个InputDeclarer对象,用于定义Bolt的输入。 builder.setBolt("exclaim2", new ExclamationBolt(), 5).shuffleGrouping("words").shuffleGrouping("exclaim1");//指定Bolt的多个来源
4.Bolt还要指定应该接收那个流作为输入,以何种流分组方式进行流分发(就是按何种方式对tuple进行分送到不同的Bolt(这些Bolt是同一个处理Bolt的不同任务实例))
Config
1.就是一个MAP,你可以增加你需要的参数进去,在spout的open中可以得到这个引用
2.也有一些是定义好的,如:TOPOLOGY_MAX_SPOUT_PENDING(默认1个),缓存spout发送出去的tuple,当缓存中的tuple多于这个值时spout就挂起,等待Bolt消费到小于这个值时再运行。(这个属性只对可靠消息处理有用)
常见的配置 可以为每个拓扑设置大量的配置。一个可以设置的所有配置的列表可以在这个类(backtype.storm.Config)找到。那些前缀为TOPOLOGY的属性可以被特定拓扑所覆盖,其他的是集群配置,不能被覆盖。下面是一些常见的拓扑设置。 (1)Config.TOPOLOGY_WORKERS 这个设置执行topology的工作进程的数量。例如,如果你将这个参数设置为25,则将会有25个Java进程跨集群执行所有的任务。如果你有一个跨拓扑中的所有组件的组合150并行度,每个工作进程将有6个任务作为线程运行。 (2)Config.TOPOLOGY_ACKERS 这是设置任务的数量,该任务将跟踪元组树,当Spout元组已经完全处理时进行检测。Acker是Storm的可靠性模型不可或缺的一部分,你可以在2.5小节“可靠性机制——保证消息处理”阅读到关于它们的更多信息。 (3)Config.TOPOLOGY_MAX_SPOUT_PENDING 这是设置一次可以在Spout任务等待Spout元组的最大数量。等待意味着元组还尚未确认(acked)或失败(failed)。强烈推荐设置这个配置项防止队列溢出。 (4)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 这是一个Spout元组在它被认为是失败前必须完全完成的最大超时时间。这个值默认为30秒,这对于大多数拓扑来说是足够的。关于Storm的可靠性模型如何工作可查阅“可靠性机制——保证消息处理”小节以获得更多信息。 (5)Config.TOPOLOGY_SERIALIZATIONS 可以使用这个配置注册更多的序列化器,这样就可以在元组里面使用自定义类型。
Stream Groupings(流分组方式)
1.Stream Grouping定义了一个流在Bolt任务间该如何被切分,说白了就是当有多个处理相同任务的Bolt任务(并行的Bolt)时,将按什么方式分配消息到这些并行的Bolt中的去处理。
2.torm提供的6个Stream Grouping类型:
(1).随机分组(Shuffle grouping):随机分发tuple到并行的Bolt的任务,保证每个任务获得相等数量的tuple。
(2).字段分组(Fields grouping):根据指定字段(key)分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
(a).按那个字段里的值进行分组。不是这个字段(key)的名来分组。
(b).分组后,字段里值相同的都会发送到同一个Bolt的实例中去处理(并行的Bolt)。
(3).全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
(4).全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
(5).无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
(6).直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
public interface InputDeclarer<T extends InputDeclarer> { // 字段分组 public T fieldsGrouping(String componentId, Fields fields); public T fieldsGrouping(String componentId, String streamId, Fields fields); // 全局分组 public T globalGrouping(String componentId); public T globalGrouping(String componentId, String streamId); // 随机分组 public T shuffleGrouping(String componentId); public T shuffleGrouping(String componentId, String streamId); // 本地或者随机分组 public T localOrShuffleGrouping(String componentId); public T localOrShuffleGrouping(String componentId, String streamId); // 无分组 public T noneGrouping(String componentId); public T noneGrouping(String componentId, String streamId); // 广播分组 public T allGrouping(String componentId); public T allGrouping(String componentId, String streamId); // 直接分组 public T directGrouping(String componentId); public T directGrouping(String componentId, String streamId); // 自定义分组 public T customGrouping(String componentId, CustomStreamGrouping grouping); public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); public T grouping(GlobalStreamId id, Grouping grouping); }
Topology运行方式,Storm有两种运行方式
本地运行的提交方式
LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(2000); cluster.shutdown();
分布式提交方式
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
注意:
在Storm代码编写完成之后,需要打包成jar包放到Nimbus中运行,打包的时候,不需要把依赖的jar都打迚去,否则如果把依赖的 storm.jar包打进去的话,运行时会出现重复的配置文件错误导致Topology无法运行。因为Topology运行之前,会加载本地的 storm.yaml 配置文件。
运行的命令如下: storm jar StormTopology.jar mainclass [args]
例子:
pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <compilerVersion>1.7</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-nop</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-jdk14</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>chill-java</artifactId> <version>0.3.5</version> </dependency> </dependencies> </project>
import spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolts.WordCounter; import bolts.WordNormalizer; public class TopologyMain { public static void main(String[] args) throws InterruptedException { // Topology definition(定义Topology) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word")); // Configuration Config conf = new Config(); conf.put("wordsFile", "src/main/resources/words.txt");//配置参数自定义 conf.setDebug(false); conf.setNumWorkers(1);//分配多少个Worker进程去执行这个Topology // Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);//SPOUT最大Tuple缓冲区,大于就挂起,等待bolt消费小于这个值再运行 //本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); Thread.sleep(20000); cluster.shutdown(); //服务器模式 //StormSubmitter.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); } }
package spouts; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader extends BaseRichSpout { private SpoutOutputCollector collector; // 保存open时存进来的SpoutOutputCollector,要用它向Bolt发送Tuple private boolean completed = false; private String[] readerString = new String[] { "test10", "test20", "test30" }; private String[] readerCount = new String[] { "10", "20", "30" }; public void ack(Object msgId) { System.out.println("WordReader OK:" + msgId); } public void close() { System.out.println("WordReader close"); } public void fail(Object msgId) { System.out.println("WordReader FAIL:" + msgId); } public void nextTuple() { String string; String count; int messageId; if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } for (int i = 0; i < 3; i++) { string = readerString[i]; count = readerCount[i]; messageId = i; this.collector.emit(new Values(string, count), messageId); } completed = true; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { System.out.println("WordReader open:"); this.collector = collector; } // 定义map中的key,方便new Values时不用再写key了,位置一一对应 public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("WordReader declareOutputFields:"); declarer.declare(new Fields("string", "count")); } }
package bolts; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import clojure.lang.IRecord; public class WordNormalizer extends BaseBasicBolt { public void cleanup() { System.out.println("WordNormalizer cleanup"); } public void prepare(Map stormConf, TopologyContext context) { System.out.println("WordNormalizer prepare"); System.out.println("WordNormalizer prepare wordsFile == " + stormConf.get("wordsFile").toString()); System.out.println("WordNormalizer prepare2"); } public void execute(Tuple input, BasicOutputCollector collector) { String string = input.getString(0); String count = input.getString(1);// 因为WordReader只有两个域 System.out.println("WordNormalizer execute string == " + string); System.out.println("WordNormalizer execute count == " + count); collector.emit(new Values(count, string)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("WordNormalizer declareOutputFields"); declarer.declare(new Fields("count", "string")); } }
package bolts; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class WordCounter extends BaseBasicBolt { Integer id; String name; @Override public void cleanup() { System.out.println("WordCounter cleanup"); } @Override public void prepare(Map stormConf, TopologyContext context) { System.out.println("WordCounter prepare"); System.out.println("WordCounter prepare wordsFile == " + stormConf.get("wordsFile").toString()); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 这里是最后一个Bolt不用定义了 System.out.println("WordCounter declareOutputFields"); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String string = input.getString(0); String count = input.getString(1);// 因为WordReader只有两个域 System.out.println("WordCounter execute string == " + string); System.out.println("WordCounter execute count == " + count); } }
参考原文:http://www.open-open.com/lib/view/open1430095563146.html
参考原文:http://ifeve.com/getting-started-with-stom-index/
参考原文:https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter2/Hello%20World%20Storm.md#%E5%88%9B%E5%BB%BA%E6%88%91%E4%BB%AC%E7%9A%84%E7%AC%AC%E4%B8%80%E4%B8%AA%E6%8B%93%E6%89%91
参考原文:http://blog.itpub.net/29754888/viewspace-1260026/
参考原文:https://book.douban.com/reading/33008852/
参考原文:http://shiyanjun.cn/archives/977.html
相关jar包:
发表评论
-
选举算法
2022-06-17 08:48 443选举算法 常用的选举 ... -
elasticSearch使用
2022-04-27 08:42 425ElasticSearch 基于Apache Lucene构建 ... -
IDEA 快捷键
2022-03-02 16:55 253大小写转换快捷键 ctr+shift+u IDEA ... -
zookeeper dubbo 安装
2021-12-04 19:27 327docker-machine ssh default d ... -
将博客搬至CSDN
2021-11-18 19:57 198将博客搬至CSDN -
docker mysql 主从安装
2021-11-10 16:55 243docker run -d -p 13306:3306 --n ... -
rocketmq安装部署.txt
2021-11-07 19:10 222docker search rocketmq docke ... -
百度人脸识别
2021-05-21 16:11 369package com.gaojinsoft.htwy.y20 ... -
springBoot tomcat配置参数说明
2021-05-12 09:13 3038#最大连接数 server.tomcat.max-connec ... -
技术选型
2021-01-29 17:34 3001.移动端组件vux,vant,vant好点,文档好的,基于v ... -
方便开发调试和问题跟踪
2021-01-01 10:17 2541.外网最好可以连接数据库 2.关键信息可以在接口返回信息, ... -
Jenkins脚本
2020-03-12 17:55 452#!/bin/bash -ilx echo "开始 ... -
base64与file 相互转换
2019-10-23 18:19 793base64与file 相互转换 import org. ... -
钉钉开发
2019-09-17 20:16 441钉钉开发 开发者帐号 1357047443 x***310* ... -
安卓模拟器使用
2019-07-03 23:13 4逍遥pc版的安卓模拟器 http://www.xyaz.cn/ ... -
ZLTest
2019-03-19 23:41 275ZLTest -
要同步回来的文件
2019-01-25 11:14 0Spring Boot中整合Sharding-JDBC m ... -
画相关图表的工具
2019-01-25 10:59 585制作流程图的工具 1、Visio很好用,很强大,微软出的,水平 ... -
JVM 监控工具
2019-01-21 18:04 388JVM 监控工具 //========== ... -
Hystrix
2019-01-10 17:02 545Hystrix Hystrix的设计原则包括: 资源隔离 ...
相关推荐
### Storm原理分析 #### 一、Storm基本结构 Apache Storm 是一个开源的分布式实时计算系统,主要用于处理流式数据。Storm 提供了一种简单而强大的模型来定义并行计算过程,使得用户能够轻松地处理无限的数据流。...
**1.4 工作原理** 1. **Topology提交**:用户通过编写Topology定义数据流处理逻辑,并将其提交至集群。 2. **任务分配**:Nimbus将Topology分解为一系列任务,并分配给不同的Supervisor。 3. **任务执行**:...
通过分析这些图纸,我们可以更深入地理解云台的工作原理,为DIY爱好者或专业工程师提供了宝贵的参考资料。 总的来说,"storm32-bgc"云台的硬件设计体现了现代电子技术在控制领域的应用,其开源特性更是鼓励了创新和...
标题中的"storm开发jar包以及storm例子源码"表明了我们即将探讨的是关于Apache Storm的开发环境设置和示例代码。...通过学习和实践这些材料,开发者可以深入理解Storm的工作原理,掌握实时数据处理的基本技能。
本笔记主要围绕Storm的核心概念、起源、架构、组件、原理以及应用场景进行详细讲解。 **一、Storm概念** Storm设计的目标是使实时数据处理变得简单易用,它可以连续处理无限的数据流,提供低延迟和高吞吐量的处理...
本文将深入探讨该电路图的主要组成部分,帮助读者理解其工作原理和设计思路。 首先,电路图的核心部分是微控制器,它负责处理所有输入输出信号并控制设备的运行。STorm32BGC V130很可能采用了Microchip公司的PIC或...
"storm 学习资源总结" Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低...这篇论文很好的解释了 Storm 的工作原理和优势。
通过实践这些demo,你将能够熟练掌握如何使用Storm进行实时数据处理,理解其工作原理,并为更复杂的应用场景打下坚实基础。记得,学习Storm不仅涉及技术操作,还涉及到对实时计算和大数据处理理念的理解,这将有助于...
通过本文对Storm源码的分析,我们深入了解了Storm的架构、工作原理以及核心组件的实现细节。这对于开发人员来说非常重要,不仅有助于更高效地利用Storm解决实际问题,还能为进一步的研究和优化提供基础。随着大数据...
标题中的“storm”指的是Apache Storm,一个开源的流处理计算系统。...通过阅读这些PDF资料,你可以深入了解Apache Storm的工作原理,掌握实时流处理技术,并具备构建高效、可靠的实时数据处理系统的能力。
总结起来,storm-wordcount实例展示了Storm的基本工作原理和实时流处理能力,通过这个实例,开发者可以快速理解Storm的架构和编程模型,为进一步学习和应用Storm打下坚实基础。在实际项目中,我们可以借鉴storm-...
以上这些知识点对于理解Storm的工作原理至关重要,它们涵盖了从Storm的基本架构、进程启动和初始化、Topology的创建和提交,到消息的接收、传递以及可靠性的保证。通过深入分析这些知识点,可以加深对Storm源码的...
通过学习《storm实时数据处理》,读者不仅可以掌握Storm的基本原理和操作,还能了解到如何利用Storm解决实际业务问题,提升大数据实时处理的能力。在大数据时代,理解和掌握Storm这样的实时处理框架,对于从事相关...
Storm 0.9.0版本相较于后续版本可能有些过时,但它的核心原理和操作方式对于理解Storm的工作机制仍然非常有用。学习和掌握这个版本可以帮助你更好地理解分布式实时计算系统的设计与实现,为进一步研究更新的版本或...
它是一个集成了173个JAR文件的压缩文件,这些JAR文件涵盖了Storm框架的各个组成部分,包括核心库、 Nimbus(主控节点)服务、Supervisor(工作节点)服务、Zookeeper协调服务、配置文件以及各种依赖的第三方库。...
通过深入研究这些代码,开发者可以学习到如何利用Storm处理大数据流,理解实时计算的工作原理,并掌握如何设计和优化Storm拓扑。对于初学者,这是一个很好的实践资源,可以用来提升在大数据实时处理领域的技能。对于...
在 Storm 0.9 源码包中,我们可以深入理解其内部工作原理,以及如何利用 Storm 的 API 进行实时流处理应用开发。源码分析对于开发者来说,是提升技能和优化应用的关键步骤。 1. **核心组件** - **Bolt**: Bolt 是 ...
Apache Storm的工作原理基于一个由多个节点组成的集群,其中包含Supervisor、Nimbus和Worker节点。Supervisor负责管理物理机器上的worker进程,Nimbus是主控节点,负责任务调度和分配,而Worker则执行实际的计算任务...
标题中的“storm调试webservice”指的是使用Apache Storm这个实时计算框架来调试Web Service服务。...然而,要成功实施这一策略,需要对Storm的工作原理有深入理解,以及具备一定的编程和调试技能。
【标题】"StormDemo.tar.gz" 是一个与Apache Storm...通过这个"StormDemo.tar.gz"压缩包,初学者可以一步步地了解并掌握Apache Storm的基本操作和实时处理的核心原理,为后续的进阶学习和实际项目应用打下坚实的基础。