TopologyContext
利用这一特性,你能够把流划分到多个spouts读取。
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //从context对象获取spout大小 int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); //从这个spout得到任务id int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(","); StringBuffer tracksBuffer = new StringBuffer(); for(int i=0; i< tracks.length;i++){ //Check if this spout must read the track word if( i % spoutsSize == myIdx){ tracksBuffer.append(","); tracksBuffer.append(tracks[i]); } } if(tracksBuffer.length() == 0) { throw new RuntimeException("没有为spout得到track配置" + " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的数量必须高于spout的数量"); this.track =tracksBuffer.substring(1).toString(); } ... }
相关推荐
- **` storm-core`**: 核心库,包含了 Storm 的基本运行时组件,如`TopologyContext`为Bolt和Spout提供了上下文信息。 3. **容错机制** - Storm 使用故障恢复机制确保数据流的可靠性,即使在节点失败的情况下也能...
import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org....
一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:此方法用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream...
public void prepare(Map conf, TopologyContext context, List<Integer> targetTasks) { this.prevTasks = targetTasks; } @Override public List<Integer> chooseTasks(int taskId, List< Tuple > tuples) ...