自定义数据流组
你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。
让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。
public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } }
这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。
按下述方式word-normalizer修改即可使用这个自定义数据流组。
builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping());
直接数据流组
这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //对元组做出应答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } }
在prepare方法中计算任务数
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); }
在拓扑定义中指定数据流将被直接分组:
builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer");
相关推荐
在大数据实时处理领域,Apache Storm与Apache Kafka经常被结合使用,形成高效的数据流处理系统。本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一...
Storm是一个开源的分布式实时计算系统,它允许开发者定义一个数据流处理逻辑,然后在一组分布式服务器上以容错的方式执行。其核心组件包括Spout(数据源)、Bolt(数据处理)和拓扑(topology)。 ### 4. Storm组件...
Storm 是一个分布式实时计算系统,主要用于处理大规模数据流。它的核心组件包括Spout和Bolt,分别负责数据的输入和处理。下面是对 Storm 的一个概述,从基础知识到实践应用。 Storm 组件 Storm 的核心组件包括: ...
* Storm:Storm是一个分布式实时计算系统,主要用于处理大规模的数据流。Storm的架构主要包括Topology、Spout、Bolt、acker等组件。Storm具有良好的可扩展性和高性能,但是Storm的学习成本较高。 * Spark Streaming...
在分布式计算领域,Apache Storm是一个实时计算框架,用于处理无界数据流。它以其高吞吐量、容错性和灵活性而受到广泛关注。在这个“Storm的WordCount实例”中,我们将深入探讨如何利用Storm来实现经典的WordCount...
Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高度容错性和高吞吐量的特点。这个JAR包是Storm的核心组件,包含了运行Storm集群所需的库和类。 在描述中,"storm0.9.0jar包"的简单...
Storm的设计理念是将复杂的实时数据流处理任务拆解成一系列简单但高度灵活的组件,并通过这些组件之间的组合来完成复杂的数据处理流程。Storm支持各种编程语言,使得开发者可以轻松地用自己熟悉的语言开发实时处理...
总的来说,storm-kafka 整合是实时大数据处理中常见的一种架构,它能够利用 Storm 的实时处理能力和 Kafka 的消息队列特性,实现高效的数据流处理。在实际开发中,还需要关注性能优化、容错机制、数据一致性等问题,...
Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时计算领域的Hadoop,支持多种编程语言,并且能够很好地...
Storm 是一个免费开源的分布式实时计算系统,利用 storm 可以很容易的做到可靠处理无线数据流。Storm 的架构特点包括编程简单、高性能、低延迟、分布式、可扩展、容错、消息不丢失等。 Storm 的应用场景包括实时...
2. **拓扑结构**:在Storm中,处理逻辑被组织成拓扑结构,其中Spouts和Bolts通过流组连接。拓扑定义了数据流的处理路径,可以持久化,便于恢复和扩展。 3. **容错机制**:Storm具有强大的容错能力,当某个worker或...
这173个JAR文件的组合,确保了Storm能够正常运行,并且可以处理各种复杂的数据流任务。 对于新手开发者来说,理解Storm的运行机制至关重要。Storm采用Master-Slave架构,Nimbus作为Master负责任务调度,Supervisor...
Storm是一个开源的分布式实时计算系统,它被设计用来处理大规模的数据流,具有高吞吐量和容错性。在这个“Storm编程实例”中,我们将深入理解如何利用Maven构建Storm项目,以及Storm的核心概念和运行流程。 **1. ...
通过学习这个"storm集成kafka插demo.zip",开发者可以了解到如何在实际项目中构建一个实时数据处理系统,将实时数据流从Kafka导入到Storm进行复杂计算,并了解相关的配置和编程模型。这将对理解实时大数据处理和...
它能够处理无界数据流,即源源不断的实时数据,并保证每个消息只被处理一次(Exactly Once语义)。在商品订单频繁项集挖掘中,`Storm` 可以实时地接收、处理订单数据,快速找出频繁出现的商品组合。 频繁项集挖掘是...
Apache Storm是一个开源的实时计算系统,它能够有效地处理大量的数据流,是构建大规模实时数据处理应用程序的优秀平台。Storm的快速起步涉及多个方面,从环境搭建到具体应用,再到与其他系统比如ZooKeeper的集成,...
Apache Hadoop---Storm Apache Storm 是一个专为实时大...通过Nimbus、Supervisor和Worker的协同工作,Storm能够在分布式环境中有效地处理大规模数据流,确保数据的实时性和完整性,满足现代大数据应用的苛刻需求。
Storm在大数据领域以其流处理能力著称,能以高吞吐量和低延迟处理实时数据流,这一点对于需要实时分析和决策支持的场景尤为重要。 Storm的核心优势包括其开源特性、强大的容错能力、灵活性、可靠性以及对编程语言的...