`
liyonghui160com
  • 浏览: 771836 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

storm数据流组

阅读更多

自定义数据流组

你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。

让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。

    public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;

        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }

        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

 

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

 

按下述方式word-normalizer修改即可使用这个自定义数据流组。

    builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

 

 

 

 

 

 

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

   

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

 

在prepare方法中计算任务数

    public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

 

在拓扑定义中指定数据流将被直接分组:

    builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

 

 

 

 

分享到:
评论
2 楼 liyonghui160com 2014-09-22  
AKka 写道
谢谢分享!最近在学Storm使用,一直不明白这个流组,看了你的贴子现在有点明白了。


能帮到你就好。
1 楼 AKka 2014-09-19  
谢谢分享!最近在学Storm使用,一直不明白这个流组,看了你的贴子现在有点明白了。

相关推荐

    StormStorm集成Kafka 从Kafka中读取数据

    在大数据实时处理领域,Apache Storm与Apache Kafka经常被结合使用,形成高效的数据流处理系统。本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一...

    storm流数据处理开发应用实战(linux实验环境,storm搭建完毕后的开发)

    Storm是一个开源的分布式实时计算系统,它允许开发者定义一个数据流处理逻辑,然后在一组分布式服务器上以容错的方式执行。其核心组件包括Spout(数据源)、Bolt(数据处理)和拓扑(topology)。 ### 4. Storm组件...

    Flink,Storm,Spark Streaming三种流框架的对比分析

    * Storm:Storm是一个分布式实时计算系统,主要用于处理大规模的数据流。Storm的架构主要包括Topology、Spout、Bolt、acker等组件。Storm具有良好的可扩展性和高性能,但是Storm的学习成本较高。 * Spark Streaming...

    Storm的WordCount实例

    在分布式计算领域,Apache Storm是一个实时计算框架,用于处理无界数据流。它以其高吞吐量、容错性和灵活性而受到广泛关注。在这个“Storm的WordCount实例”中,我们将深入探讨如何利用Storm来实现经典的WordCount...

    storm0.9.0jar包

    Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高度容错性和高吞吐量的特点。这个JAR包是Storm的核心组件,包含了运行Storm集群所需的库和类。 在描述中,"storm0.9.0jar包"的简单...

    基于Storm构建实时热力分布项目实战-地址.txt

    Storm的设计理念是将复杂的实时数据流处理任务拆解成一系列简单但高度灵活的组件,并通过这些组件之间的组合来完成复杂的数据处理流程。Storm支持各种编程语言,使得开发者可以轻松地用自己熟悉的语言开发实时处理...

    storm-kafka整合代码

    总的来说,storm-kafka 整合是实时大数据处理中常见的一种架构,它能够利用 Storm 的实时处理能力和 Kafka 的消息队列特性,实现高效的数据流处理。在实际开发中,还需要关注性能优化、容错机制、数据一致性等问题,...

    Storm 源码分析

    Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...

    storm 学习资源总结

    Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低延迟、分布式、可扩展、容错、消息不丢失等。 Storm 的应用场景包括实时...

    storm学习入门《Getting started with Storm》中英文版

    2. **拓扑结构**:在Storm中,处理逻辑被组织成拓扑结构,其中Spouts和Bolts通过流组连接。拓扑定义了数据流的处理路径,可以持久化,便于恢复和扩展。 3. **容错机制**:Storm具有强大的容错能力,当某个worker或...

    storm_jars.zip

    这173个JAR文件的组合,确保了Storm能够正常运行,并且可以处理各种复杂的数据流任务。 对于新手开发者来说,理解Storm的运行机制至关重要。Storm采用Master-Slave架构,Nimbus作为Master负责任务调度,Supervisor...

    Storm编程实例

    Storm是一个开源的分布式实时计算系统,它被设计用来处理大规模的数据流,具有高吞吐量和容错性。在这个“Storm编程实例”中,我们将深入理解如何利用Maven构建Storm项目,以及Storm的核心概念和运行流程。 **1. ...

    storm集成kafka插demo.zip

    通过学习这个"storm集成kafka插demo.zip",开发者可以了解到如何在实际项目中构建一个实时数据处理系统,将实时数据流从Kafka导入到Storm进行复杂计算,并了解相关的配置和编程模型。这将对理解实时大数据处理和...

    Storm :商品订单频繁项集挖掘,组合查找源码

    它能够处理无界数据流,即源源不断的实时数据,并保证每个消息只被处理一次(Exactly Once语义)。在商品订单频繁项集挖掘中,`Storm` 可以实时地接收、处理订单数据,快速找出频繁出现的商品组合。 频繁项集挖掘是...

    Apache+Storm+快速起步.pdf 亲测 好评哦

    Apache Storm是一个开源的实时计算系统,它能够有效地处理大量的数据流,是构建大规模实时数据处理应用程序的优秀平台。Storm的快速起步涉及多个方面,从环境搭建到具体应用,再到与其他系统比如ZooKeeper的集成,...

    Apache Hadoop---Storm.docx

    Apache Hadoop---Storm Apache Storm 是一个专为实时大...通过Nimbus、Supervisor和Worker的协同工作,Storm能够在分布式环境中有效地处理大规模数据流,确保数据的实时性和完整性,满足现代大数据应用的苛刻需求。

    Apache Storm.pdf

    Storm在大数据领域以其流处理能力著称,能以高吞吐量和低延迟处理实时数据流,这一点对于需要实时分析和决策支持的场景尤为重要。 Storm的核心优势包括其开源特性、强大的容错能力、灵活性、可靠性以及对编程语言的...

Global site tag (gtag.js) - Google Analytics