`
san_yun
  • 浏览: 2662861 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

Understanding the Internal Message Buffers of Storm

 
阅读更多

原文:http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Understanding the Internal Message Buffers of Storm

Jun 21st, 2013

 

When you are optimizing the performance of your Storm topologies it helps to understand how Storm’s internal message queues are configured and put to use. In this short article I will explain and illustrate how Storm version 0.8/0.9 implements the intra-worker communication that happens within a worker process and its associated executor threads.

Internal messaging within Storm worker processes

Terminology: I will use the terms message and (Storm) tuple interchangeably in the following sections.

When I say “internal messaging” I mean the messaging that happens within a worker process in Storm, which is communication that is restricted to happen within the same Storm machine/node. For this communication Storm relies on various message queues backed by LMAX Disruptor, which is a high performance inter-thread messaging library.

Note that this communication within the threads of a worker process is different from Storm’s inter-worker communication, which normally happens across machines and thus over the network. For the latter Storm uses ZeroMQ by default (in Storm 0.9 there is experimental support for Netty as the network messaging backend). That is, ZeroMQ/Netty are used when a task in one worker process wants to send data to a task that runs in a worker process on different machine in the Storm cluster.

So for your reference:

  • Intra-worker communication in Storm (inter-thread on the same Storm node): LMAX Disruptor
  • Inter-worker communication (node-to-node across the network): ZeroMQ or Netty
  • Inter-topology communication: nothing built into Storm, you must take care of this yourself with e.g. a messaging system such as Kafka/RabbitMQ, a database, etc.

If you do not know what the differences are between Storm’s worker processes, executor threads and tasks please take a look at Understanding the Parallelism of a Storm Topology.

Illustration

Let us start with a picture before we discuss the nitty-gritty details in the next section.

Figure 1: Overview of a worker’s internal message queues in Storm. Queues related to a worker process are colored in red, queues related to the worker’s various executor threads are colored in green. For readability reasons I show only one worker process (though normally a single Storm node runs multiple such processes) and only one executor thread within that worker process (of which, again, there are usually many per worker process).

Detailed description

Now that you got a first glimpse of Storm’s intra-worker messaging setup we can discuss the details.

Worker processes

To manage its incoming and outgoing messages each worker process has a single receive thread that listens on the worker’s TCP port (as configured via supervisor.slots.ports). The parameter topology.receiver.buffer.size determines the batch size that the receive thread uses to place incoming messages into the incoming queues of the worker’s executor threads. Similarly, each worker has a single send thread that is responsible for reading messages from the worker’s transfer queue and sending them over the network to downstream consumers. The size of the transfer queue is configured via topology.transfer.buffer.size.

  • The topology.receiver.buffer.size is the maximum number of messages that are batched together at once for appending to an executor’s incoming queue by the worker receive thread (which reads the messages from the network) Setting this parameter too high may cause a lot of problems (“heartbeat thread gets starved, throughput plummets”). The default value is 8 elements, and the value must be a power of 2 (this requirement comes indirectly from LMAX Disruptor).
1
2
3
// Example: configuring via Java API
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // default is 8
Note that topology.receiver.buffer.size is in contrast to the other buffer size related parameters described in this article actually not configuring the size of an LMAX Disruptor queue. Rather it sets the size of a simple ArrayList that is used to buffer incoming messages because in this specific case the data structure does not need to be shared with other threads, i.e. it is local to the worker’s receive thread. But because the content of this buffer is used to fill a Disruptor-backed queue (executor incoming queues) it must still be a power of 2. See launch-receive-thread! in backtype.storm.messaging.loader for details.
  • Each element of the transfer queue configured with topology.transfer.buffer.size is actually a list of tuples. The various executor send threads will batch outgoing tuples off their outgoing queues onto the transfer queue. The default value is 1024 elements.
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // default is 1024

Executors

Each worker process controls one or more executor threads. Each executor thread has its own incoming queue and outgoing queue. As described above, the worker process runs a dedicated worker receive thread that is responsible for moving incoming messages to the appropriate incoming queue of the worker’s various executor threads. Similarly, each executor has its dedicated send thread that moves an executor’s outgoing messages from its outgoing queue to the “parent” worker’s transfer queue. The sizes of the executors’ incoming and outgoing queues are configured via topology.executor.receive.buffer.size and topology.executor.send.buffer.size, respectively.

Each executor thread has a single thread that handles the user logic for the spout/bolt (i.e. your application code), and a single send thread which moves messages from the executor’s outgoing queue to the worker’s transfer queue.

  • The topology.executor.receive.buffer.size is the size of the incoming queue for an executor. Each element of this queue is a list of tuples. Here, tuples are appended in batch. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // batched; default is 1024
  • The topology.executor.send.buffer.size is the size of the outgoing queue for an executor. Each element of this queue will contain a single tuple. The default value is 1024 elements, and the value must be a power of 2 (this requirement comes from LMAX Disruptor).
1
2
// Example: configuring via Java API
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // individual tuples; default is 1024

Where to go from here

How to configure Storm’s internal message buffers

The various default values mentioned above are defined in conf/defaults.yaml. You can override these values globally in a Storm cluster’s conf/storm.yaml. You can also configure these parameters per individual Storm topology via backtype.storm.Config in Storm’s Java API.

How to configure Storm’s parallelism

The correct configuration of Storm’s message buffers is closely tied to the workload pattern of your topology as well as the configured parallelism of your topologies. See Understanding the Parallelism of a Storm Topology for more details about the latter.

Understand what’s going on in your Storm topology

The Storm UI is a good start to inspect key metrics of your running Storm topologies. For instance, it shows you the so-called “capacity” of a spout/bolt. The various metrics will help you decide whether your changes to the buffer-related configuration parameters described in this article had a positive or negative effect on the performance of your Storm topologies. See Running a Multi-Node Storm Cluster for details.

Apart from that you can also generate your own application metrics and track them with a tool like Graphite. See my articles Sending Metrics From Storm to Graphite and Installing and Running Graphite via RPM and Supervisord for details. It might also be worth checking out ooyala’s metrics_storm project on GitHub (I haven’t used it yet).

Advice on performance tuning

Watch Nathan Marz’s talk on Tuning and Productionization of Storm.

The TL;DR version is: Try the following settings as a first start and see whether it improves the performance of your Storm topology.

1
2
3
4
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8);
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);

 

 

Interested in more? You can subscribe to this blog, or follow me on Twitter.

Jun 21st, 2013 Filed under Programming, Storm

分享到:
评论

相关推荐

    Google Protocol Buffers 源码

    Google uses Protocol Buffers for almost all of its internal RPC protocols and file formats. Latest Updates http://protobuf.googlecode.com/svn/trunk/CHANGES.txt Documentation Read the documentation....

    Planning_for_Implementation_of_Riparian_Buffers_in.pdf

    The Feitsui reservoir is a major water supply source for more than five ...represents one of the major strategies and the use of best management practices (BMPs) is under careful consideration.

    protocol buffers 官网中文教程

    Protocol Buffers是Google开发的一种数据序列化协议,用于结构化数据的序列化,可以视为一种跨平台、跨语言的数据交换格式。它允许开发者定义数据结构,然后生成代码以轻松地在各种数据流之间读写这些数据。Protocol...

    Protocol Buffers 2.4.1 jar

    Protocol Buffers 2.4.1 jar

    微软内部资料-SQL性能优化2

    Before we look at how SQL Server uses and manages its memory, we need to ensure a full understanding of the more common memory related terms. The following definitions will help you understand how SQL...

    Google Protocol Buffers

    **Google Protocol Buffers**,简称Protobuf,是Google开发的一种数据序列化协议,它能够将结构化的数据序列化,可用于数据存储、通信协议等方面。它提供了比XML更小、更快、更简单的替代方案,可以用于各种编程语言...

    80486 System Architecture [3rd Edition] [1995]

    - **Write Buffers:** Explains the role and function of write buffers in the 80486 processor. ##### 6. **SL Technology** - **Introduction to SL Technology Used in the 486 Processors:** Provides an ...

    CR0120 MAX1104 DAC Controller.PDF

    This diagram is essential for understanding the internal architecture and data flow within the device. Key components include: - **DATAREG:** Represents the data register, where incoming data from ...

    Gcc连接动态库

    betraying some of the internal workings of the machine. Mainstream processor vendors all opt for the latter: ARM, IBM’s Power, SPARC-TSO, and Intel’s x86 and Itanium architectures allow the ...

    understanding IPv6

    Therefore, this book does not contain in-depth implementation details of the IPv6 protocol for the Windows .NET Server 2003 family, such as structures, tables, buffers, or coding logic. These details...

    Protocol Buffers

    **Protocol Buffers简介** Protocol Buffers(简称protobuf)是由Google开发的一种数据序列化协议,它是一种高效、灵活且跨平台的通信数据格式。这个技术允许开发者定义数据结构,然后生成对应的编码和解码代码,...

    delphi protobuf数据类型Protocol buffers数据详解解析1.1版

    This project contains the implementation of Protocol Buffers for Delphi. From the project was implemented limited functionality necessary for a specific project. At that time, I do not see any sense ...

    acpi控制笔记本风扇转速

    Disassembly of raw data buffers with byte initialization data now prefixes each output line with the current buffer offset. Disassembly of ASF! table now includes all variable-length data fields at ...

    Understanding_Ipv6

    Therefore, this book does not contain in-depth implementation details of the IPv6 protocol for the Windows .NET Server 2003 family, such as structures, tables, buffers, or coding logic. These details...

    Managed Directx 9 Kick Start - Graphics And Game Programming

    Understanding the Performance Implications of the Event Model Understanding the Cost of Methods In Brief Part VII. Appendices Appendix A. Using the Diagnostics Assemblies Enumerating ...

    Tutorial 07: Texture Mapping and Constant Buffers

    Also, we will introduce the concept of constant buffers, and explain how you can use buffers to speed up processing by minimizing bandwidth usage. The purpose of this tutorial is to modify the ...

    中文翻译Google Protocol Buffers中文教程

    中文翻译Google Protocol Buffers中文教程中文翻译Google Protocol Buffers中文教程中文翻译Google Protocol Buffers中文教程中文翻译Google Protocol Buffers中文教程

    Google.ProtocolBuffers.dll

    Google.ProtocolBuffers.dll类库

Global site tag (gtag.js) - Google Analytics