`
woodding2008
  • 浏览: 290386 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm Bolt中读取Tuple数据

 
阅读更多

 

 

Tuple接口有很多方法可以读取从上游组件发送过来的数据,这些方法可以分为2类。

  • 根据下标获取数据
  • 根据字段名获取数据

读取数据方法

public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
    private List<Object> values;
    private int taskId;
    private String streamId;
    private GeneralTopologyContext context;
    private MessageId id;
    private IPersistentMap _meta = null;
    
    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
        this.values = values;
        this.taskId = taskId;
        this.streamId = streamId;
        this.id = id;
        this.context = context;
        
        String componentId = context.getComponentId(taskId);
        Fields schema = context.getComponentOutputFields(componentId, streamId);
        if(values.size()!=schema.size()) {
            throw new IllegalArgumentException(
                    "Tuple created with wrong number of fields. " +
                    "Expected " + schema.size() + " fields but got " +
                    values.size() + " fields");
        }
    }

    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
        this(context, values, taskId, streamId, MessageId.makeUnanchored());
    }    
    
    Long _processSampleStartTime = null;
    Long _executeSampleStartTime = null;
	
	//从任务的上下文【任务创建时定义好的】里获取Tuple定义的Fields
    public Fields getFields() {
        return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
    }
	
	//获取到field的下标,直接在values【List】中获取对应的数据
	public Object getValueByField(String field) {
        return values.get(fieldIndex(field));
    }
	
	public String getStringByField(String field) {
        return (String) values.get(fieldIndex(field));
    }
	
	//根据下标直接在Values[List]中获取数据
	public Object getValue(int i) {
        return values.get(i);
    }
     
    //.......	 
	
}

 

任务上下文

public class GeneralTopologyContext implements JSONAware {
    private StormTopology _topology;
    private Map<Integer, String> _taskToComponent;
    private Map<String, List<Integer>> _componentToTasks;
    private Map<String, Map<String, Fields>> _componentToStreamToFields;
    private String _stormId;
    protected Map _stormConf;
    
    // pass in componentToSortedTasks for the case of running tons of tasks in single executor
    public GeneralTopologyContext(StormTopology topology, Map stormConf,
            Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
            Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) {
        _topology = topology;
        _stormConf = stormConf;
        _taskToComponent = taskToComponent;
        _stormId = stormId;
        _componentToTasks = componentToSortedTasks;
        _componentToStreamToFields = componentToStreamToFields;
    }
	    /**
     * Gets the declared output fields for the specified component/stream.
     */
    public Fields getComponentOutputFields(String componentId, String streamId) {
        Fields ret = _componentToStreamToFields.get(componentId).get(streamId);
        if(ret==null) {
            throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
        }
        return ret;
    }
	
     //......	
	
}

 

分享到:
评论

相关推荐

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    元组是Storm中数据的基本单元,包含一组键值对。在Java中,你可以使用`OutputCollector`对象的`emit`方法来实现这一过程: ```java public void nextTuple() { // 生成数据 String data = "your generated data";...

    storm之Tuple元组分词操作Java代码.zip

    在Java代码示例中,可能包含了对输入的tuple中的字符串字段进行分词处理的逻辑,例如使用Java的正则表达式或者Apache Commons Lang的WordUtils类。 5. **Java编程**: 在Storm中,开发人员使用Java API来定义拓扑...

    基于Storm技术的实时数据处理平台研究与实现.pdf

    Storm处理流程中的数据流以 Tuple为单位,数据流被打包成 Tuple后,再进入由Spout和Bolt组成的拓扑进行计算处理。 在业务模型设计方面,需要重点考虑如何在Storm平台上实现网站日志的实时统计分析,以获取网站的...

    Storm的文档详解

    - **Tuple**:Storm 中数据传输的基本单元,通常包含一组数据项。 - **Stream Grouping**:定义了 Tuple 如何从一个 Bolt 分发到另一个 Bolt 的策略。 - **Shuffle Grouping**:随机将 Tuple 发送到下游 Bolt 中的...

    Storm分布式实时计算在物联网系统中的应用.pdf

    - **Spout**:作为数据源,从外部系统获取数据并将其作为数据流(Tuple)发射到处理拓扑中。分为可靠和不可靠两种类型,可靠Spout能确保数据至少被处理一次,即使处理过程中出现错误也会重试。 - **Bolt**:处理...

    storm原理分析

    Storm-Kafka 是 Storm 社区提供的 Kafka 消费者实现,允许 Storm 从 Kafka 中读取数据。offset 是 Kafka 中用来标识消息位置的唯一标识符。Storm-Kafka 通过 offset 管理来确保消息的有序处理和重试机制。通常情况下...

    storm demo

    首先,**Spout**是Storm中的数据源,它负责产生或读取数据流。在实际应用中,Spout可以是从消息队列、数据库或其他数据源拉取数据的组件。在"weekend-storm"项目中,Spout可能会模拟一个实时数据生成器,例如生成...

    storm程序代码示例

    // 从tuple中获取数据并处理 String data1 = tuple.getStringByField("field1"); String data2 = tuple.getStringByField("field2"); // 处理数据... // 如果需要,可以向下游Bolt发送新的tuple collector...

    storm-kakfa使用state例子源码

    在这个"storm-kafka使用state例子源码"中,我们将会探讨如何结合两者,利用 Storm 的 State API 来处理从 Kafka 获取的数据。 首先,`storm-kafka`是Apache Storm的一个扩展,它提供了一种方便的方式去消费Kafka中...

    storm-kafka整合代码

    Kafka Spout 是一个预定义的 spout 类,可以从 Kafka 主题中读取数据,并将数据作为 tuple 发送到 Storm 的数据流中。以下是一般整合过程: 1. **配置 Kafka Spout**:首先,你需要配置 Kafka Spout,指定 Kafka 的...

    storm-wordcount例子

    2. **Split Bolt**:接收到Spout发送的Tuples后,Split Bolt负责对每个Tuple中的文本进行分词操作,将单词分离出来。这个Bolt会创建新的Tuples,每个Tuple包含一个单词,然后将这些Tuples发送到下一个阶段。 3. **...

    Test_Storm_0_java_begun6u4_zookeeper_storm_apachestorm_

    3. **Topology**: Topology是Storm中数据处理的逻辑结构,由Spout和Bolt组成,它们通过流(Stream)连接起来,定义了数据的处理流程。 4. **Stream**: Stream是数据流,表示从Spout发出并由Bolt处理的一系列不可变的...

    Storm 源码分析

    3. **消息传递机制**:Storm通过Tuple来表示数据流中的单个元素,Tuple在Spout和Bolt之间传递。消息传递机制确保了数据流的可靠性和有序性。 4. **任务调度算法**:Storm采用了基于资源利用率的任务调度策略,能够...

    使用Storm实现实时大数据分析.doc

    通过Spout获取数据,Bolt进行处理,Stream Groupings定制数据流向,配合Nimbus和Supervisor的集群管理,以及Zookeeper的协调,Storm能够实现复杂实时分析任务的高效执行。在应对现代大数据挑战时,Storm是一个强大的...

    storm1.2.1-wangzs-可靠单词计数

    在单词计数的例子中,Spout可能是读取文本文件或者从消息队列获取数据的组件。 2. **Bolts**:它们是处理数据的核心,可以执行过滤、转换或聚合操作。在单词计数应用中,Bolt可能包含拆分单词、去重、计数等逻辑。 3...

    02、Storm入门到精通storm3-1.pptx

    1. **Spout**: Spout是数据流的源头,它可以是从Kestrel队列读取数据,或者连接到Twitter API获取推文流。Spout负责产生并发出`Tuple`,即Storm的数据模型,这些数据流无边界且持续不断。 2. **Bolt**: Bolt则消费...

    Getting Started with Storm

    **启动循环并读写元组**:实现非 JVM 语言的 Spout 或 Bolt 如何读取元组并发送到 Storm 集群中。 #### 第八章 事务性 Topologies **设计**:事务性 Topologies 用于处理需要高度一致性的场景,确保数据处理的完整...

    Storm深入学习.pdf

    完全处理意味着该 Message ID 绑定的源 Tuple 和由它生成的所有后续 Tuple 都通过了 Topology 中所有应到达的 Bolt。 实现这一机制的关键组件是 Acker,它跟踪 Spout 发射的每个 Message ID 对应的 Tuple 处理路径...

    apache-storm-2.1.0.tar.gz

    4. **Tuple**:在Storm中,数据是以Tuples的形式传递的,它是一种键值对的结构,可以在Bolt之间进行传递。 5. ** Nimbus**:Nimbus是Storm集群的主节点,它负责分配任务,监控拓扑的状态,并在需要时重新分配工作。...

Global site tag (gtag.js) - Google Analytics