`
zhangzhenjj
  • 浏览: 27853 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

(转)理解storm 进程内消息流(很好的一篇文章)

阅读更多

FROM:http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Understanding the Internal Message Buffers of Storm

JUN 21ST, 2013

Table of Contents
  • Internal messaging within Storm worker processes
  • Illustration
  • Detailed description
    • Worker processes
    • Executors
  • Where to go from here
    • How to configure Storm’s internal message buffers
    • How to configure Storm’s parallelism
    • Understand what’s going on in your Storm topology
    • Advice on performance tuning

When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal message queues are configured and put to use. In this short article I will explain and illustrate how Storm version 0.8/0.9 implements the intra-worker communication that happens within a worker process and its associated executor threads.

Internal messaging within Storm worker processes

Terminology: I will use the terms message and (Storm) tuple interchangeably in the following sections.

When I say “internal messaging” I mean the messaging that happens within a worker process in Storm, which is communication that is restricted to happen within the same Storm machine/node. For this communication Storm relies on various message queues backed by LMAX Disruptor, which is a high performance inter-thread messaging library.

Note that this communication within the threads of a worker process is different from Storm’sinter-worker communication, which normally happens across machines and thus over the network. For the latter Storm uses ZeroMQ by default (in Storm 0.9 there is experimental support for Nettyas the network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data to a task that runs in a worker process on different machine in the Storm cluster.

So for your reference:

  • Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
  • Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
  • Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.

If you do not know what the differences are between Storm’s worker processes, executor threads and tasks please take a look at Understanding the Parallelism of a Storm Topology.

Illustration

Let us start with a picture before we discuss the nitty-gritty details in the next section.

Figure 1: Overview of a worker’s internal message queues in Storm. Queues related to a worker process are colored in red, queues related to the worker’s various executor threads are colored in green. For readability reasons I show only one worker process (though normally a single Storm node runs multiple such processes) and only one executor thread within that worker process (of which, again, there are usually many per worker process).

Detailed description

Now that you got a first glimpse of Storm’s intra-worker messaging setup we can discuss the details.

Worker processes

To manage its incoming and outgoing messages each worker process has a single receive thread that listens on the worker’s TCP port (as configured via supervisor.slots.ports). The parameter topology.receiver.buffer.size determines the batch size that the receive thread uses to place incoming messages into the incoming queues of the worker’s executor threads. Similarly, each worker has a single send thread that is responsible for reading messages from the worker’s transfer queue and sending them over the network to downstream consumers. The size of the transfer queue is configured via topology.transfer.buffer.size.

  • The topology.receiver.buffer.size is the maximum number of messages that are batched together at once for appending to an executor’s incoming queue by the worker receive thread (which reads the messages from the network) Setting this parameter too high may cause a lot of problems (“heartbeat thread gets starved, throughput plummets”). The default value is 8 elements, and the value must be a power of 2 (this requirement comes indirectly from LMAX Disruptor).
1
2
3
// Example: configuring via Java API
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // default is 8
Note that topology.receiver.buffer.size is in contrast to the other buffer size related parameters described in this article actually not configuring the size of an LMAX Disruptor queue. Rather it sets the size of a simple ArrayList that is used to buffer incoming messages because in this specific case the data structure does not need to be shared with other threads, i.e. it is local to the worker’s receive thread. But because the content of this buffer is used to fill a Disruptor-backed queue (executor incoming queues) it must still be a power of 2. See launch-receive-thread! in backtype.storm.messaging.loader for details.
  • Each element of the transfer queue configured with topology.transfer.buffer.size is actually a list of tuples. The various executor send threads will batch outgoing tuples off their outgoing queues onto the transfer queue. The default value is 1024 elements.
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // default is 1024

Executors

Each worker process controls one or more executor threads. Each executor thread has its ownincoming queue and outgoing queue. As described above, the worker process runs a dedicated worker receive thread that is responsible for moving incoming messages to the appropriate incoming queue of the worker’s various executor threads. Similarly, each executor has its dedicated send thread that moves an executor’s outgoing messages from its outgoing queue to the “parent” worker’s transfer queue. The sizes of the executors’ incoming and outgoing queues are configured via topology.executor.receive.buffer.size and topology.executor.send.buffer.size, respectively.

Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. your application code), and a single send thread which moves messages from the executor’s outgoing queue to the worker’s transfer queue.

  • The topology.executor.receive.buffer.size is the size of the incoming queue for an executor. Each element of this queue is a list of tuples. Here, tuples are appended in batch. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // batched; default is 1024
  • The topology.executor.send.buffer.size is the size of the outgoing queue for an executor. Each element of this queue will contain a single tuple. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // individual tuples; default is 1024

Where to go from here

How to configure Storm’s internal message buffers

The various default values mentioned above are defined in conf/defaults.yaml. You can override these values globally in a Storm cluster’s conf/storm.yaml. You can also configure these parameters per individual Storm topology via backtype.storm.Config in Storm’s Java API.

How to configure Storm’s parallelism

The correct configuration of Storm’s message buffers is closely tied to the workload pattern of your topology as well as the configured parallelism of your topologies. See Understanding the Parallelism of a Storm Topology for more details about the latter.

Understand what’s going on in your Storm topology

The Storm UI is a good start to inspect key metrics of your running Storm topologies. For instance, it shows you the so-called “capacity” of a spout/bolt. The various metrics will help you decide whether your changes to the buffer-related configuration parameters described in this article had a positive or negative effect on the performance of your Storm topologies. See Running a Multi-Node Storm Cluster for details.

Apart from that you can also generate your own application metrics and track them with a tool like Graphite. See Installing and Running Graphite via RPM and Supervisord for details. It might also be worth checking out ooyala’s metrics_storm project on GitHub (I haven’t used it yet).

Advice on performance tuning

Watch Nathan Marz’s talk on Tuning and Productionization of Storm.

The TL;DR version is: Try the following settings as a first start and see whether it improves the performance of your Storm topology.

1
2
3
4
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);

 

Interested in more? You can subscribe to this blog, or follow me on Twitter.

Posted by Michael G. Noll Jun 21st, 2013  Filed under ProgrammingStorm

FROM:http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

1
4
分享到:
评论

相关推荐

    storm实时数据处理

    《storm实时数据处理》这本书深入探讨了Apache Storm这一强大的实时计算系统,它是大数据处理领域中的重要工具,尤其在实时流处理方面具有显著优势。Storm设计的核心理念是简单、可扩展和容错性,使得它在处理大规模...

    storm1.2.1-helloword可靠消息

    Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,确保每个事件都能得到精确一次(exactly-once)的处理,从而实现高度可靠的消息传递。 首先,我们需要理解Storm的核心概念。Storm由多个...

    Storm流计算项目

    Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容

    Flink,Storm,Spark Streaming三种流框架的对比分析

    在Flink中,所有的数据都看作流,是一种很好的抽象,因为这更接近于现实世界。 Flink的基本架构 Flink系统的架构与Spark类似,是一个基于Master-Slave风格的架构。当Flink集群启动后,首先会启动一个JobManager和...

    基于Storm流计算天猫双十一作战室项目实战

    - **Storm Trident**:通过实际案例讲解Storm Trident的核心特性和优势,帮助学习者更好地理解和运用这一强大工具。 **4. 平台搭建与管理** - **CDH5生态环境**:课程指导学员搭建基于Cloudera CDH5的完整生态环境...

    【46】2018年最新java大数据基于storm开发实时流处理器视频教程 .txt

    根据提供的文件信息,本篇文章将围绕“2018年最新Java大数据基于Storm开发实时流处理器”的主题展开,深入解析Storm的基本概念、架构原理、在大数据处理中的应用场景以及如何使用Java进行开发等内容。 ### 一、...

    Storm源码走读笔记

    以上这些知识点对于理解Storm的工作原理至关重要,它们涵盖了从Storm的基本架构、进程启动和初始化、Topology的创建和提交,到消息的接收、传递以及可靠性的保证。通过深入分析这些知识点,可以加深对Storm源码的...

    Storm 源码分析

    Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...

    storm的jar包

    在Storm中,数据流被抽象为持续不断的Tuple(元组)序列,这些Tuple在网络中的worker节点间进行分布式处理,保证每个消息至少被处理一次(At-Least-Once Processing Guarantees)。这使得Storm非常适合于实时数据...

    细细品味Storm_Storm简介及安装

    - **消息处理保证**:Storm提供了一种可靠的机制来确保每个消息至少被处理一次。 - **编程语言的灵活性**:Storm支持多种编程语言,默认支持Clojure、Java、Ruby和Python,也可以通过实现特定通信协议支持其他语言。...

    storm项目-流数据监控系列1《设计文档》

    分别为1、storm项目-流数据监控系列1《设计文档》2、storm项目-流数据监控系列2《代码解析》 3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5...

    storm 学习资源总结

    "storm 学习资源总结" Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低...这篇论文很好的解释了 Storm 的工作原理和优势。

    Storm笔记-PPT

    Storm的核心思想是微批处理,即每个数据单元作为一个“小批次”进行处理,确保每个消息只被处理一次,实现了精确一次的处理语义。 **二、Storm起源** Storm最初由Nathan Marz开发,并在2011年由Twitter收购。...

    storm项目-流数据监控系列5《zookeeper统一配置》

    分别为1、storm项目-流数据监控系列1《设计文档》2、storm项目-流数据监控系列2《代码解析》 3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5...

    Storm API实现词频统计

    在大数据处理领域,Apache Storm是一个实时计算系统,它能够持续处理数据流,实现低延迟、高吞吐量的数据分析。在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个...

    storm一个简单实例

    【标题】:“storm一个简单实例”揭示了Apache Storm的基础应用,这是一个实时大数据处理框架,尤其在流处理领域具有广泛的应用。这个实例适用于初次接触Storm的开发者,帮助他们快速上手并理解其工作原理。 【描述...

    storm 流式处理 安装软件(包括zookeeper,jzmq,zeroMQ,storm)

    Storm作为Apache软件基金会的顶级项目,是一个分布式、容错的实时计算系统,能够处理无界数据流,确保每个消息至少被处理一次,从而提供高可靠性的保障。 **Zookeeper在Storm中的角色** Zookeeper是一个分布式的,...

    Twitter storm

    Storm 的设计目标是提供一个简单易用、可扩展且容错性高的平台,用于处理无界数据流(即持续不断地产生数据的数据流),并且能够保证消息的可靠传输。 Storm 与 Hadoop 类似,但主要针对的是实时数据处理,而Hadoop...

    storm入门.pdf

    Storm是一个分布式实时计算系统,能够有效地处理大量数据流。它由Twitter公司开发,最初的目的是为了处理大规模的数据,如社交网络上的实时信息更新。Storm的基本单位是“topology”(拓扑结构),它可以理解为一个...

Global site tag (gtag.js) - Google Analytics