`
woodding2008
  • 浏览: 289593 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm 反压机制

 
阅读更多

反压机制

       Storm的反压机制不成熟直接带来的后果是洪峰流量或者流量预估不准确导致任务的worker OOM,频繁漂移。Storm1.0版本已经使用新的反压机制,社区解决方案:https://issues.apache.org/jira/browse/STORM-886

https://github.com/apache/storm/pull/700 

 

 

 

 

反压过程

  • worker executor的接收队列大于高水位,通知反压线程 
  • worker反压线程通知zookeeper,executor繁忙事件 
  • 所有worker监听zookeeper executor繁忙的事件 
  • worker spouts降低发送tuple速度 

 

storm 1.0以前的反压

 

   Spout tuples 不使用message id, TOPOLOGY_MAX_SPOUT_PENDING是不生效的。

public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;

The maximum number of tuples that can be pending on a spout task at any given time. 
This config applies to individual tasks, not to spouts or topologies as a whole. 

A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
Note that this config parameter has no effect for unreliable spouts that don't tag their tuples with a message id.



   spout执行nextTupe逻辑

 

(fn []
          ;; This design requires that spouts be non-blocking
          (disruptor/consume-batch receive-queue event-handler) ;;从recieve-queue取出batch tuples, 并使用tuple-action-fn处理
          
          ;; try to clear the overflow-buffer, 将overflow-buffer里面的数据放到发送的缓存queue里面
          (try-cause
            (while (not (.isEmpty overflow-buffer))
              (let [[out-task out-tuple] (.peek overflow-buffer)]
                (transfer-fn out-task out-tuple false nil)
                (.removeFirst overflow-buffer)))
          (catch InsufficientCapacityException e
            ))
          
          (let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (if (and (.isEmpty overflow-buffer)  ;;只有当overflow-buffer为空, 并且pending没有达到上限的时候, spout可以继续emit tuple
                     (or (not max-spout-pending)
                         (< (.size pending) max-spout-pending)))
              (if active?  ;;storm集群是否active
                (do  ;;storm active
                  (when-not @last-active  ;;如果当前spout出于unactive状态
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout))) ;;先active spout
               
                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) ;;调用nextTuple,产生新的tuple
                (do ;;storm unactive
                  (when @last-active ;;如果spout出于active状态
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) ;;deactive spout并休眠
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
            (if (and (= curr-count (.get emitted-count)) active?) ;;没有能够emit新的tuple(前后emitted-count没有变化)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak))) ;;调用spout-wait-strategy进行sleep
              (.set empty-emit-streak 0)
              ))           
          0)) ;;返回0, 表示async-loop的sleep时间为0
      :kill-fn (:report-error-and-die executor-data)
      :factory? true
      :thread-name component-id)]))
 tuple pending的个数是有限制
p*num-tasks 
p是TOPOLOGY-MAX-SPOUT-PENDING, num-tasks是spout的task数

max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
(defn executor-max-spout-pending [storm-conf num-tasks]
  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
    (if p (* p num-tasks))))
 

 

反压不成熟带来的问题

fieldsGrouping不合理或者洪峰流量,bolt接收队列暴涨导致OOM,完善反压后可以解决这个问题。

 

 

 

 

扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html

  • 大小: 199.4 KB
  • 大小: 92.5 KB
  • 大小: 112.7 KB
分享到:
评论

相关推荐

    Storm Acker机制

    Storm,是于90年代突然崛起,颇受年轻消费者追崇的,一个专注于时尚生活的品牌,主要产品集中在时尚腕表、珠宝首饰、箱包、雨伞以及香水等。她是由其创始人Steve Sun于1989年创立。 自诞生之日起,Storm就以独特的...

    Apache Storm Buffer内部机制简介Prezi幻灯片

    本Prezi幻灯片将带你初步了解Storm中的Buffer内部机制。 Buffer是Storm处理数据的核心组件之一,它负责存储待处理的数据,直到这些数据可以被有效地转发到下一个处理阶段。Buffer的设计灵感来源于LMAX Disruptor,...

    storm利用ack保证数据的可靠性源码

    在Storm中,"ack机制"是实现数据可靠传输的关键特性,它确保了每个消息至少被处理一次(At-Least-Once Delivery)。下面我们将详细探讨Storm的ack机制以及它如何保证数据的可靠性。 1. **什么是Storm的Ack机制?** ...

    大数据处理框架:Storm:Storm的容错机制.docx

    大数据处理框架:Storm:Storm的容错机制.docx

    storm-ui:Apache Storm 的用户界面

    主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...

    storm入门.pdf

    Storm是一个分布式实时计算系统,能够有效地处理大量数据流。它由Twitter公司开发,最初的目的是为了处理大规模的数据,如社交网络上的实时信息更新。Storm的基本单位是“topology”(拓扑结构),它可以理解为一个...

    细细品味Storm_Storm简介及安装

    - **消息处理保证**:Storm提供了一种可靠的机制来确保每个消息至少被处理一次。 - **编程语言的灵活性**:Storm支持多种编程语言,默认支持Clojure、Java、Ruby和Python,也可以通过实现特定通信协议支持其他语言。...

    Storm入门到精通

    Storm入门到精通 Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。它的核心组件包括Spout和Bolt,分别负责数据的输入和处理。下面是对 Storm 的一个概述,从基础知识到实践应用。 Storm 组件 Storm ...

    实时计算:Apache Storm:ApacheStorm的容错机制与状态管理.docx

    实时计算:Apache Storm:ApacheStorm的容错机制与状态管理.docx

    StormStorm集成Kafka 从Kafka中读取数据

    1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 2. 配置KafkaSpout:设置KafkaSpout的配置,包括Zookeeper地址、Kafka的Group ID、要消费的主题等。 3. 创建Spout实例:基于...

    Storm 源码分析

    5. **容错机制**:为了保证数据处理的可靠性,Storm设计了一套完整的容错机制,包括消息确认机制、任务重启机制等。 #### 六、Storm与Hadoop的集成 虽然Storm和Hadoop分别针对实时计算和批处理两个不同的场景,但...

    storm剖析(pdf)

    Storm的机制部分涉及到几个关键点:ACK机制、调度器(Scheduler)、多语言支持(Multilang)、CGroup资源隔离、Metrics收集以及Tick Tuple的使用。ACK机制用于确保消息的可靠性处理,调度器则负责在Worker之间分发...

    Storm入门教程 之Storm原理和概念详解

    Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...

    storm的jar包

    此外,Storm支持容错机制,即使部分节点失败,系统也能自动恢复,保证服务的高可用性。 总结来说,"storm的jar包"是Storm实时处理框架的基础,包含了运行和开发Storm应用所需的所有组件和库。通过深入理解和使用这...

    storm开发jar包以及storm例子源码

    7. **容错机制**:Storm通过检查点和故障恢复来保证容错性,即使节点失败,也能从最近的检查点恢复。 8. **Zookeeper**:作为协调服务,存储元数据和状态信息,确保集群的一致性和高可用性。 9. **nimbus**和**...

    storm_jars.zip

    此外,Storm支持容错机制,当某个Worker节点故障时,Nimbus会自动重新分配任务,确保数据处理的连续性。 在开发过程中,使用Storm JAR包创建拓扑(Topology)是关键步骤。开发者可以通过编写Java或Clojure代码定义...

    Storm源码走读笔记

    本文档是关于Storm源码的详细走读笔记,主要分析了Storm的启动场景、Topology提交过程、worker进程中的线程使用情况、消息传递机制以及 TridentTopology的创建和ack机制等多个方面。 首先,文档提到了Storm集群中的...

    storm on yarn概念架构消息机制概述

    为了保证每条消息都能被完全处理,Storm提供了消息不丢失机制,这在容错性方面是非常关键的。Storm确保一旦消息被处理,它就不会丢失,并且可以通过追踪acknowledgments(确认信号)来处理失败的情况。 Storm的系统...

Global site tag (gtag.js) - Google Analytics