自定义Grouping测试
Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。
这是我写的一个自定义分组,总是把数据分到第一个Task:
public classMyFirstStreamGroupingimplementsCustomStreamGrouping{
private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);
private List<Integer> tasks;
@Override
publicvoidprepare(WorkerTopologyContext context, GlobalStreamId stream,
List<Integer> targetTasks){
this.tasks = targetTasks;
log.info(tasks.toString());
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values){
log.info(values.toString());
return Arrays.asList(tasks.get(0));
}
}
从上面的代码可以看出,该自定义分组会把数据归并到第一个Task<code>Arrays.asList(tasks.get(0));</code>,也就是数据到达后总是被派发到第一组。
测试代码:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2);
//自定义分组,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
.customGrouping("words", new MyFirstStreamGrouping());
和之前的测试用例一样,Spout总是发送<code>new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}</code>列表的字符串。我们运行验证一下:
11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。
理解自定义分组实现
自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。
Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:
public classPartitioner<K, V> {
@Override
publicintgetPartition(K key, V value, int numReduceTasks){
return 0;
}
}
上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:
publicintgetPartition(K key, V value, int numReduceTasks) {
return hash(key) % numReduceTasks;
}
其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。
搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。
这是CustomStreamGrouping类的源码:
public interfaceCustomStreamGroupingextendsSerializable{
voidprepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
List<Integer> chooseTasks(int taskId, List<Object> values);
}
一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是让你选择,你的这条数据values,是要哪几个目标Task处理?
如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个Task来处理数据,<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List<Integer>.就是让你来在提供的 'List<Integer> targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。
由此,Storm的自定义分组策略也就不那么麻烦了吧?
转载:https://my.oschina.net/zhzhenqin/blog/223304
相关推荐
4. Stream Groupings:流分组是决定数据如何在Bolt之间传递的策略,包括Shuffle Grouping(随机分组)、Fields Grouping(字段分组)等。这些策略在`backtype.storm.task`包下的`Grouping`类中定义。 在源码中,你...
7. **Grouping(分组)**:Storm 提供多种分组策略,如 shuffle grouping(随机分组)、fields grouping(字段分组)、global grouping(全局分组)和 all grouping(全分组),用于控制流如何在 bolts 之间进行分布...
Stream的分组方式称为Stream grouping,它定义了流中的tuple如何分发给Bolts中的tasks。Storm提供了多种不同的Stream grouping策略,例如Shuffle grouping、Fields grouping、All grouping等,这些策略决定了数据在...
Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream ...
- **Stream Groupings**:数据流在 Bolt 之间的路由策略,如 Shuffle Grouping、Fields Grouping 等。 - **Nimbus**:主控服务器,负责任务调度和资源分配。 - **Supervisor**:在工作节点上运行,管理 worker 进程...
- **2.1.8 流分组(Stream Grouping)** - 流分组定义了如何将元组从一个组件发送到另一个组件。 - **2.1.9 工作进程(Worker)** - Worker 是实际执行任务的进程,每个 Supervisor 上可以运行多个 Worker。 - **...
Storm提供了多种分组方式,如**shuffle grouping**(随机分组)、**fields grouping**(按字段分组)、**global grouping**(全局分组)等,以满足不同类型的并行处理需求。 5. **容错机制**:Storm具有强大的容错...
- **Stream Groupings**:数据流在Bolt之间传输时,需要指定Grouping策略,如shuffle grouping(随机分发)、fields grouping(按字段分组)等。 4. **部署与运维** - **本地模式**:Storm提供本地模式用于开发和...
有多种分组策略,如字段分组(field grouping)、全局分组(global grouping)、shuffle分组(shuffle grouping)等。 5. **Topology提交**:编写好Toplogy后,需要通过Storm客户端将其提交到运行的Storm集群上。这...
- **Storm分组策略(stream groupings)**:解释不同的数据分发策略,如shuffle grouping、fields grouping等。 - **使用Storm开发一个WordCount例子**:通过WordCount示例来演示Storm的应用开发过程。 - **Storm...