Tuple接口有很多方法可以读取从上游组件发送过来的数据,这些方法可以分为2类。
- 根据下标获取数据
- 根据字段名获取数据
读取数据方法
public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple { private List<Object> values; private int taskId; private String streamId; private GeneralTopologyContext context; private MessageId id; private IPersistentMap _meta = null; public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { this.values = values; this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; String componentId = context.getComponentId(taskId); Fields schema = context.getComponentOutputFields(componentId, streamId); if(values.size()!=schema.size()) { throw new IllegalArgumentException( "Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + values.size() + " fields"); } } public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { this(context, values, taskId, streamId, MessageId.makeUnanchored()); } Long _processSampleStartTime = null; Long _executeSampleStartTime = null; //从任务的上下文【任务创建时定义好的】里获取Tuple定义的Fields public Fields getFields() { return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId()); } //获取到field的下标,直接在values【List】中获取对应的数据 public Object getValueByField(String field) { return values.get(fieldIndex(field)); } public String getStringByField(String field) { return (String) values.get(fieldIndex(field)); } //根据下标直接在Values[List]中获取数据 public Object getValue(int i) { return values.get(i); } //....... }
任务上下文
public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; private Map<Integer, String> _taskToComponent; private Map<String, List<Integer>> _componentToTasks; private Map<String, Map<String, Fields>> _componentToStreamToFields; private String _stormId; protected Map _stormConf; // pass in componentToSortedTasks for the case of running tons of tasks in single executor public GeneralTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) { _topology = topology; _stormConf = stormConf; _taskToComponent = taskToComponent; _stormId = stormId; _componentToTasks = componentToSortedTasks; _componentToStreamToFields = componentToStreamToFields; } /** * Gets the declared output fields for the specified component/stream. */ public Fields getComponentOutputFields(String componentId, String streamId) { Fields ret = _componentToStreamToFields.get(componentId).get(streamId); if(ret==null) { throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId); } return ret; } //...... }
相关推荐
元组是Storm中数据的基本单元,包含一组键值对。在Java中,你可以使用`OutputCollector`对象的`emit`方法来实现这一过程: ```java public void nextTuple() { // 生成数据 String data = "your generated data";...
在Java代码示例中,可能包含了对输入的tuple中的字符串字段进行分词处理的逻辑,例如使用Java的正则表达式或者Apache Commons Lang的WordUtils类。 5. **Java编程**: 在Storm中,开发人员使用Java API来定义拓扑...
Storm处理流程中的数据流以 Tuple为单位,数据流被打包成 Tuple后,再进入由Spout和Bolt组成的拓扑进行计算处理。 在业务模型设计方面,需要重点考虑如何在Storm平台上实现网站日志的实时统计分析,以获取网站的...
- **Tuple**:Storm 中数据传输的基本单元,通常包含一组数据项。 - **Stream Grouping**:定义了 Tuple 如何从一个 Bolt 分发到另一个 Bolt 的策略。 - **Shuffle Grouping**:随机将 Tuple 发送到下游 Bolt 中的...
- **Spout**:作为数据源,从外部系统获取数据并将其作为数据流(Tuple)发射到处理拓扑中。分为可靠和不可靠两种类型,可靠Spout能确保数据至少被处理一次,即使处理过程中出现错误也会重试。 - **Bolt**:处理...
Storm-Kafka 是 Storm 社区提供的 Kafka 消费者实现,允许 Storm 从 Kafka 中读取数据。offset 是 Kafka 中用来标识消息位置的唯一标识符。Storm-Kafka 通过 offset 管理来确保消息的有序处理和重试机制。通常情况下...
首先,**Spout**是Storm中的数据源,它负责产生或读取数据流。在实际应用中,Spout可以是从消息队列、数据库或其他数据源拉取数据的组件。在"weekend-storm"项目中,Spout可能会模拟一个实时数据生成器,例如生成...
// 从tuple中获取数据并处理 String data1 = tuple.getStringByField("field1"); String data2 = tuple.getStringByField("field2"); // 处理数据... // 如果需要,可以向下游Bolt发送新的tuple collector...
在这个"storm-kafka使用state例子源码"中,我们将会探讨如何结合两者,利用 Storm 的 State API 来处理从 Kafka 获取的数据。 首先,`storm-kafka`是Apache Storm的一个扩展,它提供了一种方便的方式去消费Kafka中...
Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 Kafka Spout**:首先,你需要配置 Kafka Spout,指定 Kafka 的...
2. **Split Bolt**:接收到Spout发送的Tuples后,Split Bolt负责对每个Tuple中的文本进行分词操作,将单词分离出来。这个Bolt会创建新的Tuples,每个Tuple包含一个单词,然后将这些Tuples发送到下一个阶段。 3. **...
3. **Topology**: Topology是Storm中数据处理的逻辑结构,由Spout和Bolt组成,它们通过流(Stream)连接起来,定义了数据的处理流程。 4. **Stream**: Stream是数据流,表示从Spout发出并由Bolt处理的一系列不可变的...
3. **消息传递机制**:Storm通过Tuple来表示数据流中的单个元素,Tuple在Spout和Bolt之间传递。消息传递机制确保了数据流的可靠性和有序性。 4. **任务调度算法**:Storm采用了基于资源利用率的任务调度策略,能够...
通过Spout获取数据,Bolt进行处理,Stream Groupings定制数据流向,配合Nimbus和Supervisor的集群管理,以及Zookeeper的协调,Storm能够实现复杂实时分析任务的高效执行。在应对现代大数据挑战时,Storm是一个强大的...
在单词计数的例子中,Spout可能是读取文本文件或者从消息队列获取数据的组件。 2. **Bolts**:它们是处理数据的核心,可以执行过滤、转换或聚合操作。在单词计数应用中,Bolt可能包含拆分单词、去重、计数等逻辑。 3...
1. **Spout**: Spout是数据流的源头,它可以是从Kestrel队列读取数据,或者连接到Twitter API获取推文流。Spout负责产生并发出`Tuple`,即Storm的数据模型,这些数据流无边界且持续不断。 2. **Bolt**: Bolt则消费...
**启动循环并读写元组**:实现非 JVM 语言的 Spout 或 Bolt 如何读取元组并发送到 Storm 集群中。 #### 第八章 事务性 Topologies **设计**:事务性 Topologies 用于处理需要高度一致性的场景,确保数据处理的完整...
完全处理意味着该 Message ID 绑定的源 Tuple 和由它生成的所有后续 Tuple 都通过了 Topology 中所有应到达的 Bolt。 实现这一机制的关键组件是 Acker,它跟踪 Spout 发射的每个 Message ID 对应的 Tuple 处理路径...
4. **Tuple**:在Storm中,数据是以Tuples的形式传递的,它是一种键值对的结构,可以在Bolt之间进行传递。 5. ** Nimbus**:Nimbus是Storm集群的主节点,它负责分配任务,监控拓扑的状态,并在需要时重新分配工作。...