一、 重定向
重定向定义了我们的tuple如何被route到下一个处理层,当然不同的层之间可能会有不同的并行度。storm提供了如下的重定向操作:
shuffle:通过随机分配算法来均衡tuple到各个分区
broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区。
global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream。
batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区。
Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping
二、实战
Main:
pubzlic static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, AuthorizationException {
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("actor", "text"), 2,
new Values("dave", "dave text"), new Values("dave",
"dave text2"), new Values("dave", "dave text3"),
new Values("dave", "dave text4"), new Values(
"tanjie is a very good man", "very very good man"));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.parallelismHint(5)
.partitionBy(new Fields("actor"))
// .shuffle()
// .batchGlobal()
.each(new Fields("actor", "text"),
new PerActorTweetsFilter("dave")).parallelismHint(5)
.each(new Fields("actor", "text"), new PrintFilter());
Config config = new Config();
config.setNumWorkers(2);
config.setNumAckers(1);
config.setDebug(false);
StormSubmitter.submitTopology("trident_aggregate_partitionBy", config,
topology.build());
}
public class PerActorTweetsFilter extends BaseFilter {
private static final long serialVersionUID = 1L;
private int partitionIndex;
private String actor;
public PerActorTweetsFilter(String actor) {
this.actor = actor;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionIndex = context.getPartitionIndex();
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean filter = tuple.getString(0).equals(actor);
if (filter) {
System.out.println("I am partition [" + partitionIndex
+ "] and I have kept a tweet by: " + actor);
}
return filter;
}
}
测试:
1、partitionBy:相同字段hash后到同一个分区,我这个地方根据actor进行分区,hash后肯定到了同一个分区,即使我指定了5个分区
2016-11-14 17:06:50.806 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:06:50.808 STDIO [INFO] first value: dave 2016-11-14 17:06:50.814 STDIO [INFO] seconde value: dave text 2016-11-14 17:06:50.819 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:06:50.826 STDIO [INFO] first value: dave 2016-11-14 17:06:50.832 STDIO [INFO] seconde value: dave text2 2016-11-14 17:06:50.992 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:06:50.993 STDIO [INFO] first value: dave 2016-11-14 17:06:50.998 STDIO [INFO] seconde value: dave text3 2016-11-14 17:06:51.001 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:06:51.004 STDIO [INFO] first value: dave 2016-11-14 17:06:51.007 STDIO [INFO] seconde value: dave text4
2、改成shuffle,会随机分配到某个分区
2016-11-14 17:18:16.019 STDIO [INFO] I am partition [0] and I have kept a tweet by: dave 2016-11-14 17:18:16.027 STDIO [INFO] I am partition [4] and I have kept a tweet by: dave 2016-11-14 17:18:16.028 STDIO [INFO] first value: dave 2016-11-14 17:18:16.030 STDIO [INFO] seconde value: dave text 2016-11-14 17:18:16.037 STDIO [INFO] first value: dave 2016-11-14 17:18:16.037 STDIO [INFO] seconde value: dave text2 2016-11-14 17:18:16.101 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:18:16.102 STDIO [INFO] first value: dave 2016-11-14 17:18:16.102 STDIO [INFO] seconde value: dave text3 2016-11-14 17:18:16.103 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:18:16.105 STDIO [INFO] first value: dave 2016-11-14 17:18:16.107 STDIO [INFO] seconde value: dave text4
3、改成batchGlobal
2016-11-14 17:23:30.333 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:23:30.341 STDIO [INFO] first value: dave 2016-11-14 17:23:30.342 STDIO [INFO] seconde value: dave text 2016-11-14 17:23:30.343 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave 2016-11-14 17:23:30.343 STDIO [INFO] first value: dave 2016-11-14 17:23:30.344 STDIO [INFO] seconde value: dave text2 2016-11-14 17:24:15.683 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave 2016-11-14 17:24:15.684 STDIO [INFO] first value: dave 2016-11-14 17:24:15.685 STDIO [INFO] seconde value: dave text3 2016-11-14 17:24:15.685 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave 2016-11-14 17:24:15.686 STDIO [INFO] first value: dave 2016-11-14 17:24:15.686 STDIO [INFO] seconde value: dave text4
相关推荐
01-storm简介 02-storm部署-1 03-storm部署-2 04-storm部署概念 05-streamgrouping 06-storm组件生命周期 07-storm可靠性1 08-storm可靠性2
《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,不仅包括对基本概念、特性的介绍,也涵盖了一些原理说明。 实战性很强,各章节...
《Storm实战构建大数据实时计算》一书主要涵盖了利用Apache Storm进行大数据实时处理的核心技术和实践案例。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,具有高吞吐量、低延迟和容错...
《Storm实战:构建大数据实时计算 》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,不仅包括对基本概念、特性的介绍,也涵盖了一些原理说明。 实战性很强,各章节都...
【标题】"03、storm项目实战课程-Kafka0.8Storm0.9.1Optr.rar" 提供了一个关于实时大数据处理的实战教程,主要聚焦于Apache Storm和Apache Kafka的整合应用。Apache Storm是一个开源的分布式实时计算系统,而Kafka则...
《Storm实战构建大数据实时计算》是一本专注于大数据领域实时处理技术的专著,主要围绕Apache Storm这一开源流处理系统展开。这本书深入浅出地讲解了如何利用Storm进行实时数据流的处理,为读者揭示了大数据实时计算...
Storm项目实战 之案例优化引入Zookeeper锁控制线程操作,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例 学习此课程需要...
本实战案例将重点介绍如何使用Storm Trident来计算网站的页面浏览量(PV,Page View)。 页面浏览量是衡量一个网站受欢迎程度的重要指标,通常通过记录用户对各个页面的访问次数来计算。在传统的批处理场景下,这...
【标题】"03、storm项目实战课程-Kafka0.8Optr2.rar" 提供的是一个关于Apache Storm和Kafka 0.8版本结合使用的实战教程资源。这个压缩包可能包含了课程讲义、代码示例、配置文件以及其他辅助学习材料,旨在帮助用户...