`
农村外出务工男JAVA
  • 浏览: 106000 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm 如何编写可靠的spout和bolt

阅读更多

一、前言

   对于不使用trident的人来说,使用基本的storm spout,bolt操作,需要理解storm的ack机制,保证消息的完整性,Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

   怎样才认为消息被完全处理?每个从 Spout发出的 Tuple可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。

   这里我主要给不使用trident实现业务的同事讲如何实现可靠的spout,bolt。

二、实现可靠的spout

   让我们先来看下ISpout接口的几个方法

public class ISpout接口测试 implements ISpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
       /**
        * 1、在任务集群的工作进程内被初始化,提供spout执行所需要的环境
        * 2、conf参数是这个spout的strom配置,提供给拓扑与这台主机上的集群配置一起合并
        * 3、context主要用来获取这个任务在拓扑中的位置信息,包括该任务的id,该任务的组件id,输入和输出消息等
        * 4、collector是收集器,用于从spout发送元祖,收集器是线程安全的,应该作为这个spout对象的实例变量进行保存。
        *
        */
	}

	@Override
	public void close() { 
		 /**
		  * 1、当ISpout关闭时被调用,不能保证close一定被调用,因为在集群中可以使用kill -9 直接杀死工作进程/本地模式除外
		  */
	}

	@Override
	public void activate() {
        /**
         * 当spout从失效模式中激活的时候被调用
         */
	}

	@Override
	public void deactivate() {
      /**
       * 当spout已经失效的时候被调用,在失效期间,nextTuple()方法不会被调用
       */
	}

	@Override
	public void nextTuple() {

		/**
		 * 1、非阻塞,如果没有元祖可以发送,可休眠,不浪费CPU
		 * 2、发送元祖到输出收集器SpoutOutputCollector
		 */
	}

	@Override
	public void ack(Object msgId) {

	  /**
	   * 1、storm断定该spout发送的标识符msgId的元祖已经被成功处理时调用
	   * 2、ack()方法调用后将消息移除队列(之前的消息是挂起的)
	   */
	}

	@Override
	public void fail(Object msgId) {
		/**
		   * 1、storm断定该spout发送的标识符msgId的元祖没有被成功处理时调用
		   * 2、fail()方法调用后将消息放入队列(之前的消息是挂起的)
		   */
	}

}

      那么我们如何实现可靠的spout呢?

   1. 在 nextTuple 函数中调用 emit 函数时需要带一个msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
   2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护,这点比较坑)

   例子:

 

public class 可靠的spout implements ISpout{
	
    private SpoutOutputCollector collector;

	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector  = collector;
	}

	@Override
	public void close() {
		
	}

	@Override
	public void activate() {
		
	}

	@Override
	public void deactivate() {
		
	}

	@Override
	public void nextTuple() {
		String curMsg = "发送消息";
		String msgId = "发送消息";
		//这里我假设MsgId和发送的消息一样,便于维护msgId和消息之间的对应关系
		collector.emit(new Values(curMsg),msgId);
	}

	@Override
	public void ack(Object msgId) {
		
	}

	@Override
	public void fail(Object msgId) {
		String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
		//消息进行重发
		collector.emit(new Values(tmp), msgId);
	}

 三、实现可靠的bolt

      同样,先看看IBolt接口提供的几个方法

public class IBolt接口测试  implements IBolt{

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		/**
		 * 1、提供bolt运行的一些环境
		 */
	}

	@Override
	public void execute(Tuple input) {
		/**
		 * 1、一次处理一个输入的元祖,元祖对象包括来自哪个组件/流/任务的元数据
		 * 2、IBolt没有立即处理元祖,而是完整的捕获一个元祖并在以后进行处理
		 * 3、如果实现basicBolt则不用手动ack()
		 */
	}

	@Override
	public void cleanup() {
		/**
		 * 1、当一个bolt即将关闭时调用,不能保证一定被调用,集群的kill -9 不行
		 * 
		 */
	}
	
	/**
	 * bolt的生命周期:在客户端主机上创建Ibolt对象,bolt被序列化到拓扑,并提及到nimbus,然后nimbus
	 * 启动工作进程(worker)进行反序列化,调用其prepare()方法开始处理
	 */
}

    那我们如何实现可靠的bolt呢,主要有2种方式

   3.1 继承 BaseBasicBolt


public final class 第一种可靠的bolt extends BaseBasicBolt {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split("\\s+")) {
			//storm自动ack和fail
			collector.emit(new Values(word));
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

  对于继承BaseBasicBolt的Bolt来说,storm内部已经替我们自动ack和fail了,不需我们手动ack,然而这个抽象类不太使用,使用场景单一。

  3.2 继承 BaseRichBolt

package com.storm.bolt.可靠性;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class 可靠的bolt extends BaseRichBolt  {

	private static final long serialVersionUID = 1L;
	
	OutputCollector _collector;

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this._collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		 String sentence = tuple.getString(0);
	     for(String word: sentence.split("\\s+")) {
	    	// 建立 anchor 树
	         _collector.emit(tuple, new Values(word));  
	     }
	     //手动ack
	    _collector.ack(tuple);  
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

  需要我们自己手动ack,但其适应场景更广泛。

 四、ack原理

    对于每个spout tuple保存一个ack_val值,初始值为0,然后每发射一个tuple或者ack 一个tuple, tuple的 id都要和这个校验值(ack_val)异或,并更新ack_val,如果每个发射出去的tuple都ack了,那么最后ack_val一定是0。
 五、ack流程
    1、spout发射消息生成一个messageId对象{属性Map<RootId,消息ID>}放入pendingMap中,在未超时时间内保留
    2、spout发出消息后给acker bolt(ack其实也是一个特殊的bolt)发射tuple消息 {tuple-id,ack_val,task_id}:
       tupe_id:实际上就是rootId
       ack_val:刚开始为初始值0
       task_id:为spout的id,这样acker才知道是哪个spout发射过来的,如果有多个acker,那么根据task_id哈希取模也能找到对应的acker,保证同一个spout发射出来的消息被同一个acker处理.然后acker bolt从自己的pending对象中新增一条记录{tuple_id,{task_id,ack_val}}
    3、bolt接收到消息后(该bolt可能是第一个也可能是最后一个,原理都一样),发射消息给下一个task的过程中也会构建一个MessageId对象,messageId中会进行消息ID(本身消息id)和接收到的消息ID(上一个bolt或者spout传过来)进行异或得到ack_val发给acker.
    4、acker收的后根据tuple_id从penging中取出旧的ack_val然后进行异或。
    5、继续bolt处理...........
    6、如果最终异或结果为0,调用spout的ack方法,如果失败,调用fail方法。
 5.1 例子:
   1:spout产生一个tuple,初始值0100,同时发送给ack和Bolt1   {acker 值 0100}
    2:bolt1接收spout发送过来的0100消息,经过处理后产生了新消息0010,那么bolt1就讲0100^0010发送给acker  {
             acker值 0100^0010 = 0110
                    0110^0100=0010
           }

   3:bolt2接收bolt1发送过来的消息,没有产生任何消息(直接持久化了),那么Bolt2将bolt1的消息 0010发送给acker  {       
              acker值 0100^0010 = 0110
                    0110^0100=0010
                    0010^0010=0000
                }
   4:acker进行整个流程的异或操作 {acker求最终的异或值}

分享到:
评论

相关推荐

    基于 OpenCV 的魔兽世界钓鱼机器人

    基于 OpenCV 的魔兽世界钓鱼机器人

    供应链管理中信息共享问题的研究.docx

    供应链管理中信息共享问题的研究

    青春文学中的爱情观呈现.doc

    青春文学中的爱情观呈现

    分布式光伏储能系统的优化配置方法 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    Delphi 12.3 控件之XLSReadWriteII6.02.01.7z

    XLSReadWriteII6.02.01.7z

    图解系统-小林coding-v1.0.rar

    图解系统-小林coding-v1.0

    【光伏功率预测】基于EMD-PCA-LSTM的光伏功率预测模型 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    漫画作品与乌托邦理想追求.doc

    漫画作品与乌托邦理想追求

    江苏建筑消防设施维护保养规程.rar

    江苏建筑消防设施维护保养规程.rar

    基于交互式可视化的Transformer模型注意机制探索工具-DODRIO及其应用

    内容概要:论文介绍了一款名为DODRIO的交互式可视化工具,帮助自然语言处理(NLP)研究人员和从业者解析基于转换器架构的语言模型内部工作机理。DODRIO整合了概述图与详尽视图,支持用户比较注意力权重与其输入文本的句法结构和语义特征。具体而言,它包含了依赖关系视图(Dependency View)、语义关注图(Semantic Attention Graph)以及注意力头概览(Attention Head Overview),并利用不同的图形展示方法使复杂的多层多头转换器模型中的注意力模式更容易理解和研究。 适用人群:适用于从事深度学习、自然语言处理的研究人员和技术从业者;尤其适合对基于变换器架构的大规模预训练语言模型感兴趣的开发者们。 使用场景及目标:DODRIO用于探索转换器模型各层级之间的联系、验证已有研究成果,同时激发新假设形成。具体使用时可以选择特定数据集中的句子作为样本输入,观察不同注意力机制如何响应文本内容的变化。此外,还可以用来对比精简版本DistilBERT的表现,评估其相对全量模型BERT的优势与不足。 其他说明:DODRIO为开源项目,提供web端实施方式,使得

    基于机器学习的疾病数据集分析

    该代码使用scikit-learn的乳腺癌数据集,完成分类模型训练与评估全流程。主要功能包括:数据标准化、三类模型(逻辑回归、随机森林、SVM)的训练、模型性能评估(分类报告、混淆矩阵、ROC曲线)、随机森林特征重要性分析及学习曲线可视化。通过`train_test_split`划分数据集,`StandardScaler`标准化特征,循环遍历模型进行统一训练和评估。关键实现细节包含:利用`classification_report`输出精确度/召回率等指标,绘制混淆矩阵和ROC曲线量化模型效果,随机森林的特征重要性通过柱状图展示,学习曲线分析模型随训练样本变化的拟合趋势。最终将原始数据和预测结果保存为CSV文件,便于后续分析,并通过matplotlib进行多维度可视化比较。代码结构清晰,实现了数据处理、模型训练、评估与可视化的整合,适用于乳腺癌分类任务的多模型对比分析。

    数字化智慧园区建设实施PPT(43页).pptx

    在智慧城市建设的大潮中,智慧园区作为其中的璀璨明珠,正以其独特的魅力引领着产业园区的新一轮变革。想象一下,一个集绿色、高端、智能、创新于一体的未来园区,它不仅融合了科技研发、商业居住、办公文创等多种功能,更通过深度应用信息技术,实现了从传统到智慧的华丽转身。 智慧园区通过“四化”建设——即园区运营精细化、园区体验智能化、园区服务专业化和园区设施信息化,彻底颠覆了传统园区的管理模式。在这里,基础设施的数据收集与分析让管理变得更加主动和高效,从温湿度监控到烟雾报警,从消防水箱液位监测到消防栓防盗水装置,每一处细节都彰显着智能的力量。而远程抄表、空调和变配电的智能化管控,更是在节能降耗的同时,极大地提升了园区的运维效率。更令人兴奋的是,通过智慧监控、人流统计和自动访客系统等高科技手段,园区的安全防范能力得到了质的飞跃,让每一位入驻企业和个人都能享受到“拎包入住”般的便捷与安心。 更令人瞩目的是,智慧园区还构建了集信息服务、企业服务、物业服务于一体的综合服务体系。无论是通过园区门户进行信息查询、投诉反馈,还是享受便捷的电商服务、法律咨询和融资支持,亦或是利用云ERP和云OA系统提升企业的管理水平和运营效率,智慧园区都以其全面、专业、高效的服务,为企业的发展插上了腾飞的翅膀。而这一切的背后,是大数据、云计算、人工智能等前沿技术的深度融合与应用,它们如同智慧的大脑,让园区的管理和服务变得更加聪明、更加贴心。走进智慧园区,就像踏入了一个充满无限可能的未来世界,这里不仅有科技的魅力,更有生活的温度,让人不禁对未来充满了无限的憧憬与期待。

    Matlab实现BO贝叶斯优化-Transformer-GRU多特征分类预测的详细项目实例(含完整的程序,GUI设计和代码详解)

    内容概要:本文档介绍了基于MATLAB实现的贝叶斯优化(BO)、Transformer和GRU相结合的多特征分类预测项目实例,涵盖了详细的程序设计思路和具体代码实现。项目旨在应对数据的多样性与复杂性,提供一种更高效的多特征数据分类解决方案。文档主要内容包括:项目背景与意义,技术难点与解决方案,具体的实施流程如数据处理、模型构建与优化、超参数调优、性能评估以及精美的GUI设计;详细说明了Transformer和GRU在多特征数据分类中的应用及其与贝叶斯优化的有效结合,强调了其理论与实际应用中的价值。 适合人群:具备一定机器学习和MATLAB编程基础的研发人员,特别是从事多维数据处理与预测工作的专业人士和技术爱好者。 使用场景及目标:① 适用于金融、医疗、交通等行业,进行复杂的多维数据处理和预测任务;② 提升现有分类任务中复杂数据处理的准确度和效率,为各行业提供智能预测工具,如金融市场预测、患者病情发展跟踪、交通流量管理等。 其他说明:本文档包含了丰富的实战案例和技术细节,不仅限于模型设计本身,还涉及到数据清洗、模型优化等方面的知识,帮助使用者深入理解每一步骤背后的原理与实现方法。通过完整的代码样例和GUI界面设计指导,读者可以从头到尾跟随文档搭建起一套成熟的分类预测系统。

    Hive sql练习题,只是参考作用

    大数据的sql练习题,初级中级高级

    基于自注意力机制的序列转换模型-Transformer的提出及其应用

    内容概要:论文介绍了名为Transformer的新网络架构,它完全基于自注意力机制,在不使用递归或卷积神经网络的情况下建模输入与输出之间的全局依赖关系,尤其适用于长文本处理。通过多头自注意力层和平行化的全连接前馈网络,使得在机器翻译任务上的表现优于当时最佳模型。具体地,作者用此方法实现了对英语-德语和英语-法语翻译、句法解析等任务的高度并行化计算,并取得显著效果。在实验方面,Transformer在较短训练时间内获得了高质量的翻译结果以及新的单一模型基准。除此之外,研究人员还探索了模型变体的效果及其对于不同参数变化时性能的变化。 适用人群:从事自然语言处理领域的研究者、工程师、学生,熟悉深度学习概念尤其是编码器-解码器模型以及关注模型创新的人士。 使用场景及目标:主要适用于序列到序列(seq2seq)转换任务如机器翻译、语法分析、阅读理解和总结等任务的研究和技术开发;目标在于提高计算效率、缩短训练时间的同时确保模型性能达到或超过现有技术。 其他说明:本文不仅提出了一个新的模型思路,更重要的是展示了自注意力机制相较于传统LSTM或其他方式所拥有的优势,例如更好地捕捉远距离上下文关系的能力

    【故障诊断】一种滚动体轴承或齿轮的重复瞬态提取方法研究 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    用于平抑可再生能源功率波动的储能电站建模及评价 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    使用 KTH 数据集进行人类行为识别 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    【深度学习】基于计算机视觉的自动驾驶应用 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    自己写的远控木马,欢迎各位大佬改善

    自己写的远控木马,欢迎各位大佬改善

Global site tag (gtag.js) - Google Analytics