Storm从0.8.1之后,在Spout调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。
ISpoutWaitStrategy是Spout没有emit时等待策略的接口,目的是合理利用Cpu,默认提供了2个实现,一个什么也没做,一个是sleep 1毫秒,我们可以自己来实现这个接口。
storm策略配置
topology.spout.wait.strategy "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms 1
ISpoutWaitStrategy接口
/** * The strategy a spout needs to use when its waiting. Waiting is * triggered in one of two conditions: * * 1. nextTuple emits no tuples * 2. The spout has hit maxSpoutPending and can't emit any more tuples * * The default strategy sleeps for one millisecond. */ public interface ISpoutWaitStrategy { void prepare(Map conf); void emptyEmit(long streak); }
SleepSpoutWaitStrategy实现
public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy { long sleepMillis; @Override public void prepare(Map conf) { sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); } @Override public void emptyEmit(long streak) { try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
NothingEmptyEmitStrategy实现
public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy { @Override public void emptyEmit(long streak) { } @Override public void prepare(Map conf) { throw new UnsupportedOperationException("Not supported yet."); } }
扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html
相关推荐
在Apache Storm分布式流处理框架中,Spout和Bolt是两个核心组件,它们协同工作以处理实时数据流。本篇文章将深入探讨Spout和Bolt之间的数据交互,并提供相关的Java源代码实例,帮助你理解这一过程。 首先,让我们...
* 然后,创建一个新的 Java 类,继承自 Storm 的 Spout 或 Bolt。 * 最后,配置 Storm 的操作模式,可以是本地模式或远程模式。 Spout Spout 是 Storm 中的数据输入组件,负责从外部数据源读取数据。Spout 的主要...
大数据处理框架:Storm:Spout与Bolt设计模式.docx
Spout需要实现Storm的`IRichSpout`接口,覆盖`nextTuple()`方法来发布新数据到拓扑中。 2. **数据处理逻辑(Bolt)** - **Split Bolt**: 数据进入系统后,首先需要进行分词。这里可以创建一个名为`WordSplitBolt`...
编写Spout时,需要实现`nextTuple()`方法来发布新的元组(tuple)到拓扑中。 其次,**Bolt**是处理数据流的逻辑单元。它可以执行各种操作,如过滤、聚合、计算等。Bolt是Storm处理能力的核心,通过`execute(Tuple ...
1. **配置管理**:确保Kafka和Storm的配置正确无误,包括网络连接、序列化方式、重试策略等。 2. **性能优化**:根据实际需求调整`KafkaSpout`的批处理大小、重试间隔和消费者组大小等参数,以优化性能。 3. **数据...
在实践中,开发者应重视对Storm各种术语的准确理解和运用,例如spout、bolt、topology、nimbus和事务性拓扑等。 Storm作为一个开源的实时计算系统,在大数据和云计算领域具有广泛的应用前景。通过阅读和学习Storm...
Apache Storm的核心概念包括:拓扑(Topology)、工作者(Worker)、节点(Spout)和 bolt(Bolt)。拓扑是 Storm 应用的基本结构,由多个节点和bolt组成,它们之间通过流(Stream)进行连接。节点负责产生数据流(Spout),而...
在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费消息。 2. Bolt:Bolt是处理数据的组件,可以进行过滤、聚合、计算等操作。`backtype.storm.task`包包含了Bolt的接口和...
- **Stream Groupings**:数据流在 Bolt 之间的路由策略,如 Shuffle Grouping、Fields Grouping 等。 - **Nimbus**:主控服务器,负责任务调度和资源分配。 - **Supervisor**:在工作节点上运行,管理 worker 进程...
Storm 的流分组策略包括洗牌分组、字段分组、Partial key grouping、ALL grouping、Global grouping、None grouping、Direct grouping 等。流分组定义了一个流在一个消费它的 bolt 内多个 task 之间如何分组。 ...
4. **任务调度算法**:Storm采用了基于资源利用率的任务调度策略,能够根据当前集群的状态动态调整任务的分配。 5. **容错机制**:为了保证数据处理的可靠性,Storm设计了一套完整的容错机制,包括消息确认机制、...
Storm由多个组件构成,包括Topology(拓扑结构)、Spout(数据源)和Bolt(处理逻辑)。Topology定义了数据流的处理方式,Spout负责产生数据流,而Bolt则执行实际的数据处理任务。在我们的WordCount例子中,Spout...
- **Spout**:Spout是数据流的源头,负责读取外部数据并将其发送到Storm中进行处理。 - **Bolt**:Bolt是数据处理单元,负责接收数据流中的数据并执行具体的处理逻辑,可以连接多个Spout或Bolt。 - **Worker**:...
将 Storm 和 Kafka 结合,主要涉及 Storm 的 Kafka Spout。Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 ...
Spout需要实现nextTuple()方法以发送 tuples,ack()方法用于确认tuple的处理完成,以及fail()方法处理未被正确处理的tuple。 **4. Bolt** Bolt 执行实际的处理逻辑,通过实现IRichBolt接口创建。Bolt有prepare()、...
Bolt是Storm的另一个关键组件,它用于处理Spout产生的元组,进行数据清洗、转换、聚合等操作。Bolt可以订阅多个Spout发射的流,并可以生成新的流传递给其他Bolt。这种模型类似水处理系统,Spout像水龙头,Bolt如同水...
涡旋风暴 一个 和用于使用和生成数据 建筑 mvn install -P cafe-build 构建 为了构建Vortex OpenSplice,必须定义OSPL_HOME环境变量。 mvn install -P ospl-build 涡流概述 PrismTech的Vortex智能数据共享平台提供...