实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码
Topology
这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping
String spoutId = "wordGenerator"; String counterId = "counter"; String intermediateRankerId = "intermediateRanker"; String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);
RollingCountBolt
首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的
RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds
new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes
1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间
2. TupleHelpers.isTickTuple(tuple), TickTuple
前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用
"__system" component会定时往task发送 "__tick" stream的tuple
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置
也可以在代码里面通过getComponentConfiguration()来进行配置,
public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf;
配置完成后, storm就会定期的往task发送ticktuple
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能
public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system" && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick" }
最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
obj, count(窗口内的计数和), 实际使用时间
SlotBasedCounter
基于slot的counter, 模板类, 可以指定被计数对象的类型T
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作
关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0
wipeZeros, 删除所有total count为0的obj, 以释放空间
public final class SlotBasedCounter<T> implements Serializable { private static final long serialVersionUID = 4858185737378394432L; private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); private final int numSlots; public SlotBasedCounter(int numSlots) { if (numSlots <= 0) { throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); } this.numSlots = numSlots; } public void incrementCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { counts = new long[this.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } public long getCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { return 0; } else { return counts[slot]; } } public Map<T, Long> getCounts() { Map<T, Long> result = new HashMap<T, Long>(); for (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } return result; } private long computeTotalCount(T obj) { long[] curr = objToCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } /** * Reset the slot count of any tracked objects to zero for the given slot. * * @param slot */ public void wipeSlot(int slot) { for (T obj : objToCounts.keySet()) { resetSlotCountToZero(obj, slot); } } private void resetSlotCountToZero(T obj, int slot) { long[] counts = objToCounts.get(obj); counts[slot] = 0; } private boolean shouldBeRemovedFromCounter(T obj) { return computeTotalCount(obj) == 0; } /** * Remove any object from the counter whose total count is zero (to free up memory). */ public void wipeZeros() { Set<T> objToBeRemoved = new HashSet<T>(); for (T obj : objToCounts.keySet()) { if (shouldBeRemovedFromCounter(obj)) { objToBeRemoved.add(obj); } } for (T obj : objToBeRemoved) { objToCounts.remove(obj); } } }
SlidingWindowCounter
SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念
incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据
核心的操作为, getCountsThenAdvanceWindow
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口
advanceHead的实现, 如何在数组实现循环的滑动窗口
public final class SlidingWindowCounter<T> implements Serializable { private static final long serialVersionUID = -2645063988768785810L; private SlotBasedCounter<T> objCounter; private int headSlot; private int tailSlot; private int windowLengthInSlots; public SlidingWindowCounter(int windowLengthInSlots) { if (windowLengthInSlots < 2) { throw new IllegalArgumentException("Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); } this.windowLengthInSlots = windowLengthInSlots; this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); this.headSlot = 0; this.tailSlot = slotAfter(headSlot); } public void incrementCount(T obj) { objCounter.incrementCount(obj, headSlot); } /** * Return the current (total) counts of all tracked objects, then advance the window. * * Whenever this method is called, we consider the counts of the current sliding window to be available to and * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent * objects within the next "chunk" of the sliding window. * * @return */ public Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); return counts; } private void advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); } private int slotAfter(int slot) { return (slot + 1) % windowLengthInSlots; } }
IntermediateRankingsBolt
这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重
所以先通过IntermediateRankingsBolt, 过滤掉一些
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task
IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)
并实现了updateRankingsWithTuple,
void updateRankingsWithTuple(Tuple tuple) { Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable); }
逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表
参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去
Rankable
Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口
public interface Rankable extends Comparable<Rankable> { Object getObject(); long getCount(); }
RankableObjectWithFields
RankableObjectWithFields实现Rankable接口
1. 提供将Tuple转化为RankableObject
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中
2. 实现Rankable定义的getObject()和getCount()接口
3. 实现Comparable接口, 包含compareTo, equals
public class RankableObjectWithFields implements Rankable public static RankableObjectWithFields from(Tuple tuple) { List<Object> otherFields = Lists.newArrayList(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); return new RankableObjectWithFields(obj, count, otherFields.toArray()); }
Rankings
Rankings维护需要排序的List, 并提供对List相应的操作
核心的数据结构如下, 用来存储rankable对象的list
List<Rankable> rankedItems = Lists.newArrayList();
提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)
核心的操作是,
public void updateWith(Rankable r) { addOrReplace(r); rerank(); shrinkRankingsIfNeeded(); }
上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的
1. 替换已有的, 或新增rankable对象(包含obj, count)
2. 从新排序(Collections.sort)
3. 由于只需要topN, 所以大于maxsize的需要删除
AbstractRankerBolt
首先以TopN为参数, 创建Rankings对象
private final Rankings rankings; public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { count = topN; this.emitFrequencyInSeconds = emitFrequencyInSeconds; rankings = new Rankings(count); }
在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑
public final void execute(Tuple tuple, BasicOutputCollector collector) { if (TupleHelpers.isTickTuple(tuple)) { emitRankings(collector); } else { updateRankingsWithTuple(tuple); } }
最终将整个rankings列表emit出去
private void emitRankings(BasicOutputCollector collector) { collector.emit(new Values(rankings)); getLogger().info("Rankings: " + rankings); }
TotalRankingsBolt
该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.
TotalRankingsBolt同样继承自AbstractRankerBolt
void updateRankingsWithTuple(Tuple tuple) { Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); super.getRankings().updateWith(rankingsToBeMerged); }
唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历
最终可以得到, 全局的TopN的Rankings列表
相关推荐
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
《基于YOLOv8的智慧社区独居老人生命体征监测系统》(包含源码、可视化界面、完整数据集、部署教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计
Android Studio Meerkat 2024.3.1 Patch 1(android-studio-2024.3.1.14-mac.dmg)适用于macOS Intel系统,文件使用360压缩软件分割成两个压缩包,必须一起下载使用: part1: https://download.csdn.net/download/weixin_43800734/90557060 part2: https://download.csdn.net/download/weixin_43800734/90557056
侧轴承杯加工工艺编制及夹具设计.zip
NASA数据集锂电池容量特征提取(Matlab完整源码和数据) 作者介绍:机器学习之心,博客专家认证,机器学习领域创作者,2023博客之星TOP50,主做机器学习和深度学习时序、回归、分类、聚类和降维等程序设计和案例分析,文章底部有博主联系方式。从事Matlab、Python算法仿真工作8年,更多仿真源码、数据集定制私信。
板料折弯机液压系统设计.zip
C6150车床的设计.zip
机器学习之KNN实现手写数字
python爬虫;智能切换策略,反爬检测机制
mpls-vpn-optionA-all
56tgyhujikolp[
GB 6442-86企业职工伤亡事故调查分析规则.pdf
汽车液压式主动悬架系统的设计().zip
2000-2024年各省专利侵权案件结案数数据 1、时间:2000-2024年 2、来源:国家知识产权J 3、指标:专利侵权案件结案数 4、范围:31省 5、用途:可用于衡量知识产权保护水平
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
内容概要:本文档详细复现了金融数学课程作业,涵盖欧式看涨期权定价和投资组合优化两大部分。对于欧式看涨期权定价,分别采用Black-Scholes模型和蒙特卡洛方法进行了计算,并对彩虹期权进行了基于最大值的看涨期权定价。投资组合优化部分则探讨了最小方差组合、给定收益的最小方差组合、最大效用组合以及给定风险的最大收益组合四种情形,还对比了拉格朗日乘数法和二次规划求解器两种方法。文中不仅提供了详细的MATLAB代码,还有详尽的中文解释,确保每一步骤清晰明了。 适合人群:金融工程专业学生、量化分析师、金融数学爱好者。 使用场景及目标:①帮助学生理解和掌握金融衍生品定价的基本原理和方法;②为从事量化分析的专业人士提供实用工具和技术支持;③作为教学材料辅助高校教师讲授相关内容。 其他说明:文档还包括了完整的论文结构建议,从封面页到结论,再到附录,涵盖了所有必要元素,确保提交的作业符合学术规范。此外,还特别强调了数据预处理步骤,确保代码可以顺利运行。
脉冲电解射流加工喷射装置设计(1)
ThinkPad S1 (2nd Generation) 和ThinkPad Yoga 260 用户指南V3.0,包含如何拆机更换硬件
charles描述文件下载
python代码-使用人类对话数据集lora微调deepseek