`
woodding2008
  • 浏览: 291102 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm Spout nextTuple策略

 
阅读更多

 

      Storm从0.8.1之后,在Spout调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。

      

       ISpoutWaitStrategy是Spout没有emit时等待策略的接口,目的是合理利用Cpu,默认提供了2个实现,一个什么也没做,一个是sleep 1毫秒,我们可以自己来实现这个接口。

 

storm策略配置

topology.spout.wait.strategy "backtype.storm.spout.SleepSpoutWaitStrategy"

topology.sleep.spout.wait.strategy.time.ms       1   

 

 

ISpoutWaitStrategy接口

 

/**
 * The strategy a spout needs to use when its waiting. Waiting is
 * triggered in one of two conditions:
 * 
 * 1. nextTuple emits no tuples
 * 2. The spout has hit maxSpoutPending and can't emit any more tuples
 * 
 * The default strategy sleeps for one millisecond.
 */
public interface ISpoutWaitStrategy {
    void prepare(Map conf);
    void emptyEmit(long streak);
}
 

 

SleepSpoutWaitStrategy实现

 

public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {

    long sleepMillis;
    
    @Override
    public void prepare(Map conf) {
        sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
    }

    @Override
    public void emptyEmit(long streak) {
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
 

 

 

 NothingEmptyEmitStrategy实现

public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
    @Override
    public void emptyEmit(long streak) {        
    }

    @Override
    public void prepare(Map conf) {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}

 

 扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html

分享到:
评论

相关推荐

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    在Apache Storm分布式流处理框架中,Spout和Bolt是两个核心组件,它们协同工作以处理实时数据流。本篇文章将深入探讨Spout和Bolt之间的数据交互,并提供相关的Java源代码实例,帮助你理解这一过程。 首先,让我们...

    Storm入门到精通

    * 然后,创建一个新的 Java 类,继承自 Storm 的 Spout 或 Bolt。 * 最后,配置 Storm 的操作模式,可以是本地模式或远程模式。 Spout Spout 是 Storm 中的数据输入组件,负责从外部数据源读取数据。Spout 的主要...

    Storm API实现词频统计

    Spout需要实现Storm的`IRichSpout`接口,覆盖`nextTuple()`方法来发布新数据到拓扑中。 2. **数据处理逻辑(Bolt)** - **Split Bolt**: 数据进入系统后,首先需要进行分词。这里可以创建一个名为`WordSplitBolt`...

    storm demo

    编写Spout时,需要实现`nextTuple()`方法来发布新的元组(tuple)到拓扑中。 其次,**Bolt**是处理数据流的逻辑单元。它可以执行各种操作,如过滤、聚合、计算等。Bolt是Storm处理能力的核心,通过`execute(Tuple ...

    StormStorm集成Kafka 从Kafka中读取数据

    1. **配置管理**:确保Kafka和Storm的配置正确无误,包括网络连接、序列化方式、重试策略等。 2. **性能优化**:根据实际需求调整`KafkaSpout`的批处理大小、重试间隔和消费者组大小等参数,以优化性能。 3. **数据...

    storm入门.pdf

    在实践中,开发者应重视对Storm各种术语的准确理解和运用,例如spout、bolt、topology、nimbus和事务性拓扑等。 Storm作为一个开源的实时计算系统,在大数据和云计算领域具有广泛的应用前景。通过阅读和学习Storm...

    storm程序代码示例

    Apache Storm的核心概念包括:拓扑(Topology)、工作者(Worker)、节点(Spout)和 bolt(Bolt)。拓扑是 Storm 应用的基本结构,由多个节点和bolt组成,它们之间通过流(Stream)进行连接。节点负责产生数据流(Spout),而...

    apache-storm-2.4.0.tar.gz

    - **Stream Groupings**:数据流在 Bolt 之间的路由策略,如 Shuffle Grouping、Fields Grouping 等。 - **Nimbus**:主控服务器,负责任务调度和资源分配。 - **Supervisor**:在工作节点上运行,管理 worker 进程...

    apache-storm-0.9.5源码

    在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费消息。 2. Bolt:Bolt是处理数据的组件,可以进行过滤、聚合、计算等操作。`backtype.storm.task`包包含了Bolt的接口和...

    storm 学习资源总结

    Storm 的流分组策略包括洗牌分组、字段分组、Partial key grouping、ALL grouping、Global grouping、None grouping、Direct grouping 等。流分组定义了一个流在一个消费它的 bolt 内多个 task 之间如何分组。 ...

    Storm 源码分析

    4. **任务调度算法**:Storm采用了基于资源利用率的任务调度策略,能够根据当前集群的状态动态调整任务的分配。 5. **容错机制**:为了保证数据处理的可靠性,Storm设计了一套完整的容错机制,包括消息确认机制、...

    Storm的WordCount实例

    Storm由多个组件构成,包括Topology(拓扑结构)、Spout(数据源)和Bolt(处理逻辑)。Topology定义了数据流的处理方式,Spout负责产生数据流,而Bolt则执行实际的数据处理任务。在我们的WordCount例子中,Spout...

    细细品味Storm_Storm简介及安装

    - **Spout**:Spout是数据流的源头,负责读取外部数据并将其发送到Storm中进行处理。 - **Bolt**:Bolt是数据处理单元,负责接收数据流中的数据并执行具体的处理逻辑,可以连接多个Spout或Bolt。 - **Worker**:...

    storm-kafka整合代码

    将 Storm 和 Kafka 结合,主要涉及 Storm 的 Kafka Spout。Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 ...

    Apache Storm-0.9.1 API 参考文档

    Spout需要实现nextTuple()方法以发送 tuples,ack()方法用于确认tuple的处理完成,以及fail()方法处理未被正确处理的tuple。 **4. Bolt** Bolt 执行实际的处理逻辑,通过实现IRichBolt接口创建。Bolt有prepare()、...

    流式计算Storm

    Bolt是Storm的另一个关键组件,它用于处理Spout产生的元组,进行数据清洗、转换、聚合等操作。Bolt可以订阅多个Spout发射的流,并可以生成新的流传递给其他Bolt。这种模型类似水处理系统,Spout像水龙头,Bolt如同水...

    vortex-storm:一个Apache Storm Spout和Bolt,用于使用和生成Vortex数据

    涡旋风暴 一个 和用于使用和生成数据 建筑 mvn install -P cafe-build 构建 为了构建Vortex OpenSplice,必须定义OSPL_HOME环境变量。 mvn install -P ospl-build 涡流概述 PrismTech的Vortex智能数据共享平台提供...

Global site tag (gtag.js) - Google Analytics