- 浏览: 56638 次
- 性别:
- 来自: 北京
文章分类
最新评论
一、ITridentSpout
基于事务
static interface ITridentSpout.BatchCoordinator<X>
static interface ITridentSpout.Emitter<X>
接口类的实现和之前事务ITransactionalSpout 非常类似。
二、调用链用于执行多个聚合
如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。
三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:
mystream.project(new Fields("b","d"));
输出流将只有【“b”,“d”】两个字段。
四、重分区(repartition)操作
重分区操作是通过一个函数改变元组(tuple)在task之间的分布, 重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1. Shuffle:与hadoop一样,把同步的tuple放在一个分区
2. Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3. paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4. global:所有的tuple分发到相同的分区。
5. batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6. patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。
五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。
另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1. 首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2. 接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。
六、FirstN
取Top N
用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
Trident适合做汇总型,不大适合做去重型
基于事务
static interface ITridentSpout.BatchCoordinator<X>
static interface ITridentSpout.Emitter<X>
接口类的实现和之前事务ITransactionalSpout 非常类似。
二、调用链用于执行多个聚合
topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new Sum(), new Fields("yt")).aggregate(new Fields("pkt"), new Sum(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));
如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。
三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:
mystream.project(new Fields("b","d"));
输出流将只有【“b”,“d”】两个字段。
四、重分区(repartition)操作
重分区操作是通过一个函数改变元组(tuple)在task之间的分布, 重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1. Shuffle:与hadoop一样,把同步的tuple放在一个分区
2. Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3. paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4. global:所有的tuple分发到相同的分区。
5. batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6. patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。
五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。
另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1. 首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2. 接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。
六、FirstN
取Top N
用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
Trident适合做汇总型,不大适合做去重型
发表评论
-
Trident实战之计算网站PV
2017-05-24 13:24 6481、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 750一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 516英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 416一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6791、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5821.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4861、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 8191、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 613Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2101事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4481、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1130统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 895汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 685一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10681、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 701一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 589并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5331、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 395本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 671一、安装Storm wget ...
相关推荐
在本文中,我们将深入探讨“合并记录”和“Merge Join”这两个组件在实现数据增量迁移中的作用,以及为什么它们在数据同步时比传统的插入更新更快。 首先,我们来理解什么是数据增量迁移。在数据仓库或数据库系统中...
本文实例讲述了mysql使用GROUP BY分组实现取前N条记录的方法。分享给大家供大家参考,具体如下: MySQL中GROUP BY分组取前N条记录实现 mysql分组,取记录 GROUP BY之后如何取每组的前两位下面我来讲述mysql中GROUP BY...
All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine...
本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...
总结一下,`Fork`和`Join`是JBPM工作流中的关键概念,它们负责流程的分支和合并,实现并行和同步。了解和熟练掌握这两个节点的用法,对于设计高效、灵活的工作流程至关重要。在实际操作中,开发者可以通过JBPM的图形...
今天,我们将讲解 MapReduce 之 MapJoin 和 ReduceJoin 两种 Join 操作的实现原理和应用场景。 MapJoin 概述 MapJoin 是一种特殊的 Join 操作,通过在 Map 阶段对数据进行 Join 操作,减少了 Reduce 阶段的数据...
在此基础上实现流与维表的join,对于实时数据分析和决策至关重要。 在实时数据处理中,流计算模型允许我们对持续流入的数据流进行即时分析,而维表(也称为查找表)常用于提供额外的维度信息,如客户详情、地理位置...
总结来说,通过巧妙地运用Oracle的`CONNECT BY`和`SYS_CONNECT_BY_PATH`函数,我们可以实现多行数据到一行的合并,这对于报告展示和数据分析非常实用。不过要注意的是,这种方法在大数据量下可能会有性能问题,因为...
比较实用的方法,已经用到项目里,很好用的一个方法
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
### inner join、left join、right join、outer join之间的区别 在数据库操作中,连接(Join)是一种非常重要的操作,用于组合两个或多个表中的数据。根据连接的方式不同,可以分为几种类型:`INNER JOIN`、`LEFT ...
总体来看,这篇文章对于希望了解和掌握在分布式系统中如何高效实现数据Join操作的读者来说,提供了有价值的理论指导和实践案例。通过理解和应用Semi-Join算法,可以在分布式数据库系统中实现更为高效的数据处理和...
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
标题 "S4-开发一个多路合并(join)的示例" 暗示了这是一个关于分布式计算系统Apache S4的教程,重点在于演示如何在S4框架中实现多路合并操作,即数据流的join。多路合并是数据处理中的关键步骤,常用于将来自不同...
现在需要根据一个输入的字符"list1.column1=list2.column2,list1.column3=list3.column4"(不是固定的)来实现inner join关系的控制,即list1中的map和list2中map通过key值column1和column2关联,同时list1中的map和...
本项目以"统计各种TOP的实战"为主题,通过Hive进行数据探索和分析,涵盖了各种排名统计,例如Top N销售产品、Top N高收入用户等。这些统计可以帮助企业了解业务关键指标,做出明智的决策。 首先,我们需要理解Hive...
至于标签“源码”和“工具”,这可能暗示在某些数据库管理系统(如MySQL、Oracle、SQL Server等)中实现JOIN操作的特定语法,或者可能涉及到使用某种查询工具(如SQL Developer、SSMS等)来编写和执行JOIN查询。...
标题 "Linq to datable(Left join right join full join)实例" 涉及到的是在.NET框架中,使用LINQ(Language Integrated Query)查询语言处理DataTable对象时,执行不同类型的连接操作,包括左连接(Left Join)、右...
选择使用哪种JOIN取决于你的查询需求,即你是否希望包含不匹配的记录或者只关注匹配的记录。 在实际数据库设计中,JOIN操作是构建复杂查询的重要工具,尤其在处理多表关系时。理解并熟练掌握LEFT JOIN、RIGHT JOIN...
在Java编程语言中,"左关联"和"右关联"是数据库查询操作中的概念,通常在SQL中使用JOIN语句实现。在这个场景下,我们讨论的是如何使用Java代码来模拟这些数据库操作,以达到高效、便捷地处理数据关联的目的。 首先...