`
liyonghui160com
  • 浏览: 777205 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Stormstarter-RollingTopWords

阅读更多

 

实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码
Topology

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

String spoutId = "wordGenerator";
String counterId = "counter";
String intermediateRankerId = "intermediateRanker";
String totalRankerId = "finalRanker";
builder.setSpout(spoutId, new TestWordSpout(), 5);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

 
 
RollingCountBolt

首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的

RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds

    new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes

1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

 

2. TupleHelpers.isTickTuple(tuple), TickTuple

前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用
"__system" component会定时往task发送 "__tick" stream的tuple
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置
也可以在代码里面通过getComponentConfiguration()来进行配置,

public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;

 

配置完成后, storm就会定期的往task发送ticktuple
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能

public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system"
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick"
}

 
最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
obj, count(窗口内的计数和), 实际使用时间

 
SlotBasedCounter

基于slot的counter, 模板类, 可以指定被计数对象的类型T
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作

关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0
wipeZeros, 删除所有total count为0的obj, 以释放空间

public final class SlotBasedCounter<T> implements Serializable {
private static final long serialVersionUID = 4858185737378394432L;
private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
private final int numSlots;
public SlotBasedCounter(int numSlots) {
if (numSlots <= 0) {
throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
+ ")");
}
this.numSlots = numSlots;
}
public void incrementCount(T obj, int slot) {
long[] counts = objToCounts.get(obj);
if (counts == null) {
counts = new long[this.numSlots];
objToCounts.put(obj, counts);
}
counts[slot]++;
}
public long getCount(T obj, int slot) {
long[] counts = objToCounts.get(obj);
if (counts == null) {
return 0;
}
else {
return counts[slot];
}
}
public Map<T, Long> getCounts() {
Map<T, Long> result = new HashMap<T, Long>();
for (T obj : objToCounts.keySet()) {
result.put(obj, computeTotalCount(obj));
}
return result;
}
private long computeTotalCount(T obj) {
long[] curr = objToCounts.get(obj);
long total = 0;
for (long l : curr) {
total += l;
}
return total;
}
/**
* Reset the slot count of any tracked objects to zero for the given slot.
*
* @param slot
*/
public void wipeSlot(int slot) {
for (T obj : objToCounts.keySet()) {
resetSlotCountToZero(obj, slot);
}
}
private void resetSlotCountToZero(T obj, int slot) {
long[] counts = objToCounts.get(obj);
counts[slot] = 0;
}
private boolean shouldBeRemovedFromCounter(T obj) {
return computeTotalCount(obj) == 0;
}
/**
* Remove any object from the counter whose total count is zero (to free up memory).
*/
public void wipeZeros() {
Set<T> objToBeRemoved = new HashSet<T>();
for (T obj : objToCounts.keySet()) {
if (shouldBeRemovedFromCounter(obj)) {
objToBeRemoved.add(obj);
}
}
for (T obj : objToBeRemoved) {
objToCounts.remove(obj);
}
}
}

 


SlidingWindowCounter

SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念

incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据

核心的操作为, getCountsThenAdvanceWindow
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口
    advanceHead的实现, 如何在数组实现循环的滑动窗口

public final class SlidingWindowCounter<T> implements Serializable {
private static final long serialVersionUID = -2645063988768785810L;
private SlotBasedCounter<T> objCounter;
private int headSlot;
private int tailSlot;
private int windowLengthInSlots;
public SlidingWindowCounter(int windowLengthInSlots) {
if (windowLengthInSlots < 2) {
throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ windowLengthInSlots + ")");
}
this.windowLengthInSlots = windowLengthInSlots;
this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
this.headSlot = 0;
this.tailSlot = slotAfter(headSlot);
}
public void incrementCount(T obj) {
objCounter.incrementCount(obj, headSlot);
}
/**
* Return the current (total) counts of all tracked objects, then advance the window.
*
* Whenever this method is called, we consider the counts of the current sliding window to be available to and
* successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
* objects within the next "chunk" of the sliding window.
*
* @return
*/
public Map<T, Long> getCountsThenAdvanceWindow() {
Map<T, Long> counts = objCounter.getCounts();
objCounter.wipeZeros();
objCounter.wipeSlot(tailSlot);
advanceHead();
return counts;
}
private void advanceHead() {
headSlot = tailSlot;
tailSlot = slotAfter(tailSlot);
}
private int slotAfter(int slot) {
return (slot + 1) % windowLengthInSlots;
}
}

 

IntermediateRankingsBolt

这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重
所以先通过IntermediateRankingsBolt, 过滤掉一些
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task

IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)
并实现了updateRankingsWithTuple,

void updateRankingsWithTuple(Tuple tuple) {
Rankable rankable = RankableObjectWithFields.from(tuple);
super.getRankings().updateWith(rankable);
}

 

逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表

参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去


Rankable

Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口

public interface Rankable extends Comparable<Rankable> {
Object getObject();
long getCount();
}

 

RankableObjectWithFields

RankableObjectWithFields实现Rankable接口
1. 提供将Tuple转化为RankableObject
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中

2. 实现Rankable定义的getObject()和getCount()接口

3. 实现Comparable接口, 包含compareTo, equals

public class RankableObjectWithFields implements Rankable

public static RankableObjectWithFields from(Tuple tuple) {
List<Object> otherFields = Lists.newArrayList(tuple.getValues());
Object obj = otherFields.remove(0);
Long count = (Long) otherFields.remove(0);
return new RankableObjectWithFields(obj, count, otherFields.toArray());
}

 

Rankings

Rankings维护需要排序的List, 并提供对List相应的操作

核心的数据结构如下, 用来存储rankable对象的list
List<Rankable> rankedItems = Lists.newArrayList();

提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)

核心的操作是,

public void updateWith(Rankable r) {
addOrReplace(r);
rerank();
shrinkRankingsIfNeeded();
}

 

上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的
1. 替换已有的, 或新增rankable对象(包含obj, count)
2. 从新排序(Collections.sort)
3. 由于只需要topN, 所以大于maxsize的需要删除
AbstractRankerBolt

首先以TopN为参数, 创建Rankings对象

private final Rankings rankings;
public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
count = topN;
this.emitFrequencyInSeconds = emitFrequencyInSeconds;
rankings = new Rankings(count);
}

 

在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑

public final void execute(Tuple tuple, BasicOutputCollector collector) {
if (TupleHelpers.isTickTuple(tuple)) {
emitRankings(collector);
}
else {
updateRankingsWithTuple(tuple);
}
}

 

最终将整个rankings列表emit出去

private void emitRankings(BasicOutputCollector collector) {
collector.emit(new Values(rankings));
getLogger().info("Rankings: " + rankings);
}

 


TotalRankingsBolt

该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.
TotalRankingsBolt同样继承自AbstractRankerBolt

void updateRankingsWithTuple(Tuple tuple) {
Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
super.getRankings().updateWith(rankingsToBeMerged);
}

 

唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历

最终可以得到, 全局的TopN的Rankings列表

 

 

 

分享到:
评论

相关推荐

    java全大撒大撒大苏打

    sdad

    (175820822)基于java的工资管理系统设计与实现

    本课程设计是某公司的工资管理系统。在这个计算机快速发展的世界里,计算机为信息处理提供了物美价廉的手段,对于推动我国管理信息处理现代化起到了重要作用。工资管理是一项琐碎、复杂而又十分细致的工作,工资计算、发放、核算的工作量很大,一般不允许出错,如果实行手工操作,每月发放工资须手工填制大量的表格,这就会耗费工作人员大量的时间和精力,计算机进行工资发放工作,不仅能够保证工资核算准确无误、快速输出,而且还可以利用计算机对有关工资的各种信息进行统计,既方便又快捷地完成员工工资的发放。 本课程设计过程中根据设计中的需求及对工资管理系统采用了模块化的设计思想,在机房我们在Windows XP 操作系统环境下,采用 myeclipse7作为开发工具,主要连接 Access 数据库来实现公司的工资管理系统的主要功能。在设计过程中,我们首先小组首先对整体的思路进行分析,然后进行分工。对数据库和类进行设计,实现了工资管理系统的功能。其功能主要包括公司用户管理、人员管理、部门管理、工资管理等功能.。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    YOLO算法-水泥路面裂纹检测数据集-5005张图像带标签-裂纹.zip

    YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;

    基于鸟鸣声识别的鸟类分类系统项目源代码全套技术资料.zip

    基于鸟鸣声识别的鸟类分类系统项目源代码全套技术资料.zip

    zigbee CC2530无线自组网协议栈系统代码实现协议捕捉与数据分析.zip

    1、嵌入式物联网单片机项目开发例程,简单、方便、好用,节省开发时间。 2、代码使用IAR软件开发,当前在CC2530上运行,如果是其他型号芯片,请自行移植。 3、软件下载时,请注意接上硬件,并确认烧录器连接正常。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看账号发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件有差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。 9、例程具有一定专业性,非专业人士请谨慎操作。

    毕业设计前后端分离博客项目源代码.zip

    毕业设计前后端分离博客项目源代码.zip

    (170644008)Eclipse+MySql+JavaSwing选课成绩管理系统

    Eclipse+MySql+JavaSwing选课成绩管理系统,原文博客在https://blog.csdn.net/qq_50062694/article/details/124649345?spm=1001.2014.3001.5502。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    IBM Process Mining流程挖掘

    什么是流程挖掘?为什么需要流程挖掘?流程挖掘面向的部门是哪些?流程挖掘之后做什么?如果想知道这些,请阅读此文。

    Android程序开发初级教程WORD文档doc格式最新版本

    ### Android程序开发初级教程(一):初识Android **平台概述** Google推出的Android操作系统平台已经正式亮相,这是一个基于Linux内核的开源操作系统。对于开发者而言,了解其架构和支持的开发语言至关重要。以下是Android平台的架构概览: **平台架构及功能** 1. **应用框架(Application Framework)**:包含可重用和可替换的组件,确保所有软件在该层面上的平等性。 2. **Dalvik虚拟机(Dalvik Virtual Machine)**:一个基于Linux的虚拟机,为Android应用提供运行环境。 3. **集成浏览器(Integrated Browser)**:基于开源WebKit引擎的浏览器,位于应用层。 4. **优化图形(Optimized Graphics)**:包括自定义的2D图形库和遵循OpenGL ES 1.0标准的3D实现。 5. **SQLite数据库**:用于数据存储。 6. **多媒体支持(Media Support)**:支持通用音频、视频以及多种图片格式(如MPEG4, H.264

    java毕设项目之ssm小型企业办公自动化系统的设计和开发+vue(完整前后端+说明文档+mysql+lw).zip

    项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    212) Outgrid - 多用途 Elementor WordPress 主题 v2.0.0.zip

    212) Outgrid - 多用途 Elementor WordPress 主题 v2.0.0.zip

    weixin138社区互助养老+ssm(论文+源码)-kaic.zip

    weixin138社区互助养老+ssm(论文+源码)_kaic.zip

    深圳建筑安装公司“高处作业安全技术操作规程”.docx

    深圳建筑安装公司“高处作业安全技术操作规程”

    计算机视觉项目:Swin-Transformer 【tiny、small、base】模型实现的图像识别项目:番茄病害图像分类

    【项目简介】 代码主干网络采用Swin-Transformer 家族系列,包括【tiny、small、base】三种模型。pretrained和freeze_layers参数为是否采用官方预训练模型和是否仅训练分类头。为了做对比消融试验,优化器采用了Adam和SGD、AdamW三种。损失函数采用多类别的交叉熵、学习率优化策略采用cos余弦退火算法 【评估网络】 评估的指标采用loss和准确率(accuracy),分别会在训练集和验证集上进行评估、输出、绘制曲线图像。同时会在训练集、验证集进行一系列评估,包含混淆矩阵、recall、precision、F1 score等等曲线图像,以及recall、precision、F1 score、特异度的输出信息等等。 【具体各类别的指标在json文件中查看】 【如果想要更换数据集训练,参考readme文件】 【本项目为8种番茄病害图片(约4k张数据),包含数据集和标签,可以一键运行】

    城市公交查询-java-基于springBoot的城市公交查询系统设计与实现(毕业论文)

    城市公交查询功能描述 城市公交查询系统的主要目的是为市民提供便捷的公交信息查询服务,帮助用户快速获取公交线路、站点、时刻表等信息,从而提高出行效率。以下是该系统可能具备的功能描述: 1. 公交线路查询 线路搜索:用户可以通过输入公交线路编号或线路名称,快速查询到该线路的详细信息。 线路详情:展示所选线路的起点、终点、途经站点、首末班车时间、发车间隔等信息。 线路图展示:提供线路的可视化地图,显示线路走向及各个站点位置。 2. 站点查询 站点搜索:用户可以通过输入站点名称或编号,查询该站点的相关信息。 站点详情:展示所选站点的上下车线路、周边设施、换乘信息等。 实时到站信息:提供该站点即将到达的公交车信息,包括预计到达时间和车牌号。 3. 实时公交信息 实时位置追踪:用户可以查看公交车的实时位置,了解公交车的行驶状态。 到站预测:根据实时数据,预测公交车到达各个站点的时间,帮助用户合理安排出行。 4. 换乘查询 换乘方案推荐:用户输入起点和终点后,系统提供最佳的换乘方案,包括所需的公交线路、换乘站点及步行距离。 换乘时间估算:计算并展示换乘所需的总时间,包括等车时间和步行时间。 5.

    交通旅游订票-JAVA-基于spring boot的交通旅游订票系统设计与实现(毕业论文)

    交通旅游订票功能描述 交通旅游订票系统是为了简化旅游出行过程,提升用户的预定体验。该系统通常集成了机票、火车票、汽车票、船票、景区门票等多种交通和旅游产品的预订、支付及管理功能。以下是该系统可能具备的功能描述: 1. 用户管理 用户注册与登录:提供游客注册与登录功能,支持邮箱、手机号等多种方式注册,保证用户信息安全。 个人信息管理:用户可以查看和编辑个人信息,如身份证号、联系方式、常用地址等。 乘客信息保存:可保存常用乘客信息,如身份证、护照、儿童票信息,方便快速预定。 2. 交通票务管理 票务查询:提供交通工具的实时查询功能,支持机票、火车票、汽车票、船票等的查询,包含出发时间、到达时间、票价、座位情况等信息。 多种票务类型支持:支持单程票、往返票、联程票、团体票等多种票种,满足不同用户需求。 票价比较:根据日期、交通工具等条件,自动比较票价,帮助用户选择最合适的票务。 票务预订与支付:提供便捷的在线预订和支付功能,支持多种支付方式(如银行卡、支付宝、微信等)。 票务改签与退票:用户可以在线申请改签和退票,并查看相关费用及政策。 3. 旅游产品预订 景点门票预订:用户可以在线选择

    企业数据管理系统项目源代码.zip

    企业数据管理系统项目源代码.zip

    java毕设项目之ssm高校专业信息管理系统设计与实现+jsp(完整前后端+说明文档+mysql+lw).zip

    项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    基于plc的污水处理,组态王动画仿真,带PLC源代码,组态王源代码,图纸,IO地址分配

    基于plc的污水处理,组态王动画仿真,带PLC源代码,组态王源代码,图纸,IO地址分配

    SINAMICS S120驱动第三方直线永磁同步电机系列视频-配置和优化.mp4

    SINAMICS S120驱动第三方直线永磁同步电机系列视频_配置和优化.mp4

Global site tag (gtag.js) - Google Analytics