`

ITridentSpout、FirstN(取Top N)实现、 流合并和join

 
阅读更多
一、ITridentSpout
基于事务
static interface ITridentSpout.BatchCoordinator<X>
           
static interface ITridentSpout.Emitter<X>

接口类的实现和之前事务ITransactionalSpout 非常类似。

二、调用链用于执行多个聚合
topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new Sum(), new Fields("yt")).aggregate(new Fields("pkt"), new Sum(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));

如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。

三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:

mystream.project(new Fields("b","d"));

输出流将只有【“b”,“d”】两个字段。

四、重分区(repartition)操作

重分区操作是通过一个函数改变元组(tuple)在task之间的分布,   重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1.      Shuffle:与hadoop一样,把同步的tuple放在一个分区
2.      Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3.      paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4.      global:所有的tuple分发到相同的分区。
5.      batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6.      patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。

五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。

另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));

Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1.      首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2.      接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。

六、FirstN

取Top N

用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));

Trident适合做汇总型,不大适合做去重型
















分享到:
评论

相关推荐

    Kettle 合并记录和Merge Join组件实现数据增量迁移(数据同步比插入更新快

    在本文中,我们将深入探讨“合并记录”和“Merge Join”这两个组件在实现数据增量迁移中的作用,以及为什么它们在数据同步时比传统的插入更新更快。 首先,我们来理解什么是数据增量迁移。在数据仓库或数据库系统中...

    mysql使用GROUP BY分组实现取前N条记录的方法

    本文实例讲述了mysql使用GROUP BY分组实现取前N条记录的方法。分享给大家供大家参考,具体如下: MySQL中GROUP BY分组取前N条记录实现 mysql分组,取记录 GROUP BY之后如何取每组的前两位下面我来讲述mysql中GROUP BY...

    All Video Joine视频合并

    All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine...

    19、Join操作map side join 和 reduce side join

    本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...

    工作流jbpm中join与fork用法

    总结一下,`Fork`和`Join`是JBPM工作流中的关键概念,它们负责流程的分支和合并,实现并行和同步。了解和熟练掌握这两个节点的用法,对于设计高效、灵活的工作流程至关重要。在实际操作中,开发者可以通过JBPM的图形...

    【MapReduce篇06】MapReduce之MapJoin和ReduceJoin1

    今天,我们将讲解 MapReduce 之 MapJoin 和 ReduceJoin 两种 Join 操作的实现原理和应用场景。 MapJoin 概述 MapJoin 是一种特殊的 Join 操作,通过在 Map 阶段对数据进行 Join 操作,减少了 Reduce 阶段的数据...

    基于开源的flink对其实时sql进行扩展;主要实现了流与维表的join

    在此基础上实现流与维表的join,对于实时数据分析和决策至关重要。 在实时数据处理中,流计算模型允许我们对持续流入的数据流进行即时分析,而维表(也称为查找表)常用于提供额外的维度信息,如客户详情、地理位置...

    sql实现多行合并一行

    总结来说,通过巧妙地运用Oracle的`CONNECT BY`和`SYS_CONNECT_BY_PATH`函数,我们可以实现多行数据到一行的合并,这对于报告展示和数据分析非常实用。不过要注意的是,这种方法在大数据量下可能会有性能问题,因为...

    DataTable实现leftJoin和rightJoi以及innerJoin

    比较实用的方法,已经用到项目里,很好用的一个方法

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    inner join、 left join 、right join、 outer join之间的区别

    ### inner join、left join、right join、outer join之间的区别 在数据库操作中,连接(Join)是一种非常重要的操作,用于组合两个或多个表中的数据。根据连接的方式不同,可以分为几种类型:`INNER JOIN`、`LEFT ...

    分布式系统中Semi-Join算法的实现.pdf

    总体来看,这篇文章对于希望了解和掌握在分布式系统中如何高效实现数据Join操作的读者来说,提供了有价值的理论指导和实践案例。通过理解和应用Semi-Join算法,可以在分布式数据库系统中实现更为高效的数据处理和...

    hadoop Join代码(map join 和reduce join)

    本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...

    S4-开发一个多路合并(join)的示例

    标题 "S4-开发一个多路合并(join)的示例" 暗示了这是一个关于分布式计算系统Apache S4的教程,重点在于演示如何在S4框架中实现多路合并操作,即数据流的join。多路合并是数据处理中的关键步骤,常用于将来自不同...

    java实现innerjoin关联算法

    现在需要根据一个输入的字符"list1.column1=list2.column2,list1.column3=list3.column4"(不是固定的)来实现inner join关系的控制,即list1中的map和list2中map通过key值column1和column2关联,同时list1中的map和...

    Hive练习项目统计各种TOP的实战(数据和代码)

    本项目以"统计各种TOP的实战"为主题,通过Hive进行数据探索和分析,涵盖了各种排名统计,例如Top N销售产品、Top N高收入用户等。这些统计可以帮助企业了解业务关键指标,做出明智的决策。 首先,我们需要理解Hive...

    sql join( inner join, outer join) 分析

    至于标签“源码”和“工具”,这可能暗示在某些数据库管理系统(如MySQL、Oracle、SQL Server等)中实现JOIN操作的特定语法,或者可能涉及到使用某种查询工具(如SQL Developer、SSMS等)来编写和执行JOIN查询。...

    Linq to datable(Left join right join full join)实例

    标题 "Linq to datable(Left join right join full join)实例" 涉及到的是在.NET框架中,使用LINQ(Language Integrated Query)查询语言处理DataTable对象时,执行不同类型的连接操作,包括左连接(Left Join)、右...

    SQL语句left join/right join/inner join 的用法比较

    选择使用哪种JOIN取决于你的查询需求,即你是否希望包含不匹配的记录或者只关注匹配的记录。 在实际数据库设计中,JOIN操作是构建复杂查询的重要工具,尤其在处理多表关系时。理解并熟练掌握LEFT JOIN、RIGHT JOIN...

    用java写的左关联右关联join类

    在Java编程语言中,"左关联"和"右关联"是数据库查询操作中的概念,通常在SQL中使用JOIN语句实现。在这个场景下,我们讨论的是如何使用Java代码来模拟这些数据库操作,以达到高效、便捷地处理数据关联的目的。 首先...

Global site tag (gtag.js) - Google Analytics