`
liyonghui160com
  • 浏览: 778596 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

TopologyContext

阅读更多

TopologyContext
利用这一特性,你能够把流划分到多个spouts读取。



public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
    //从context对象获取spout大小
    int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size();
    //从这个spout得到任务id
    int myIdx = context.getThisTaskIndex();
    String[] tracks = ((String) conf.get("track")).split(",");
    StringBuffer tracksBuffer = new StringBuffer();
    for(int i=0; i< tracks.length;i++){
        //Check if this spout must read the track word
        if( i % spoutsSize == myIdx){
            tracksBuffer.append(",");
            tracksBuffer.append(tracks[i]);
        }
    }
    if(tracksBuffer.length() == 0) {
        throw new RuntimeException("没有为spout得到track配置" +
 " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的数量必须高于spout的数量");
 this.track =tracksBuffer.substring(1).toString();
    }
 ...
 }

 

分享到:
评论

相关推荐

    java-sdk-storm

    - **` storm-core`**: 核心库,包含了 Storm 的基本运行时组件,如`TopologyContext`为Bolt和Spout提供了上下文信息。 3. **容错机制** - Storm 使用故障恢复机制确保数据流的可靠性,即使在节点失败的情况下也能...

    storm程序代码示例

    import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org....

    storm-编程入门

    一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:此方法用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream...

    partial-key-grouping:Apache Storm的部分密钥分组的实现和示例。 部分密钥分组是分布式流处理系统的负载平衡策略

    public void prepare(Map conf, TopologyContext context, List&lt;Integer&gt; targetTasks) { this.prevTasks = targetTasks; } @Override public List&lt;Integer&gt; chooseTasks(int taskId, List&lt; Tuple &gt; tuples) ...

Global site tag (gtag.js) - Google Analytics