`
m635674608
  • 浏览: 5027467 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Storm 实现滑动窗口计数和TopN排序

 
阅读更多

计算top N words的topology, 用于比如trending topics or trending images on Twitter.

  

实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码    

  

Topology

  

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

  

   
 String spoutId = "wordGenerator";
 String counterId = "counter";
 String intermediateRankerId = "intermediateRanker";
 String totalRankerId = "finalRanker";
 builder.setSpout(spoutId, new TestWordSpout(), 5);
 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
 builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

 

RollingCountBolt

首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的 

RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds

  

new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes

1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)     
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);   
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot   
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数   
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)   
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

 

2. TupleHelpers.isTickTuple(tuple), TickTuple

前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.   
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用   
"__system" component会定时往task发送 "__tick" stream的tuple   
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置   
也可以在代码里面通过getComponentConfiguration()来进行配置,

  
public Map<String, Object> getComponentConfiguration() {
     Map<String, Object> conf = new HashMap<String, Object>();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
     return conf;

配置完成后, storm就会定期的往task发送ticktuple   
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能

  
public static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system"
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick"
}

 

最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));   
obj, count(窗口内的计数和), 实际使用时间

 

SlotBasedCounter

基于slot的counter, 模板类, 可以指定被计数对象的类型T   
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作 

关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数   
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组   
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和   
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0   
wipeZeros, 删除所有total count为0的obj, 以释放空间

  
public final class SlotBasedCounter<T> implements Serializable {

    private static final long serialVersionUID = 4858185737378394432L;

    private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
    private final int numSlots;

    public SlotBasedCounter(int numSlots) {
        if (numSlots <= 0) {
            throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
                + ")");
        }
        this.numSlots = numSlots;
    }

    public void incrementCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            counts = new long[this.numSlots];
            objToCounts.put(obj, counts);
        }
        counts[slot]++;
    }

    public long getCount(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        if (counts == null) {
            return 0;
        }
        else {
            return counts[slot];
        }
    }

    public Map<T, Long> getCounts() {
        Map<T, Long> result = new HashMap<T, Long>();
        for (T obj : objToCounts.keySet()) {
            result.put(obj, computeTotalCount(obj));
        }
        return result;
    }

    private long computeTotalCount(T obj) {
        long[] curr = objToCounts.get(obj);
        long total = 0;
        for (long l : curr) {
            total += l;
        }
        return total;
    }

    /**
     * Reset the slot count of any tracked objects to zero for the given slot.
     * 
     * @param slot
     */

    public void wipeSlot(int slot) {
        for (T obj : objToCounts.keySet()) {
            resetSlotCountToZero(obj, slot);
        }
    }

    private void resetSlotCountToZero(T obj, int slot) {
        long[] counts = objToCounts.get(obj);
        counts[slot] = 0;
    }

    private boolean shouldBeRemovedFromCounter(T obj) {
        return computeTotalCount(obj) == 0;
    }

    /**
     * Remove any object from the counter whose total count is zero (to free up memory).
     */

    public void wipeZeros() {
        Set<T> objToBeRemoved = new HashSet<T>();
        for (T obj : objToCounts.keySet()) {
            if (shouldBeRemovedFromCounter(obj)) {
                objToBeRemoved.add(obj);
            }
        }
        for (T obj : objToBeRemoved) {
            objToCounts.remove(obj);
        }
    }
}

 

SlidingWindowCounter

SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念

incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据

核心的操作为, getCountsThenAdvanceWindow   
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map   
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间   
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口   
    advanceHead的实现, 如何在数组实现循环的滑动窗口

  
public final class SlidingWindowCounter<T> implements Serializable {

    private static final long serialVersionUID = -2645063988768785810L;

    private SlotBasedCounter<T> objCounter;
    private int headSlot;
    private int tailSlot;
    private int windowLengthInSlots;

    public SlidingWindowCounter(int windowLengthInSlots) {
        if (windowLengthInSlots < 2) {
            throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
                + windowLengthInSlots + ")");
        }
        this.windowLengthInSlots = windowLengthInSlots;
        this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);

        this.headSlot = 0;
        this.tailSlot = slotAfter(headSlot);
    }

    public void incrementCount(T obj) {
        objCounter.incrementCount(obj, headSlot);
    }

    /**
     * Return the current (total) counts of all tracked objects, then advance the window.
     * 
     * Whenever this method is called, we consider the counts of the current sliding window to be available to and
     * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
     * objects within the next "chunk" of the sliding window.
     * 
     * @return
     */

    public Map<T, Long> getCountsThenAdvanceWindow() {
        Map<T, Long> counts = objCounter.getCounts();
        objCounter.wipeZeros();
        objCounter.wipeSlot(tailSlot);
        advanceHead();
        return counts;
    }

    private void advanceHead() {
        headSlot = tailSlot;
        tailSlot = slotAfter(tailSlot);
    }

    private int slotAfter(int slot) {
        return (slot + 1) % windowLengthInSlots;
    }
}
  
 

IntermediateRankingsBolt

这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重   
所以先通过IntermediateRankingsBolt, 过滤掉一些   
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task

IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)   
并实现了updateRankingsWithTuple,

  
void updateRankingsWithTuple(Tuple tuple) {
    Rankable rankable = RankableObjectWithFields.from(tuple);
    super.getRankings().updateWith(rankable);
}
  
逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表
  
参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去

 

Rankable

Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口

  
public interface Rankable extends Comparable<Rankable> {
    Object getObject();
    long getCount();
}

RankableObjectWithFields

RankableObjectWithFields实现Rankable接口   
1. 提供将Tuple转化为RankableObject   
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中

2. 实现Rankable定义的getObject()和getCount()接口

3. 实现Comparable接口, 包含compareTo, equals

  
public class RankableObjectWithFields implements Rankable
  
public static RankableObjectWithFields from(Tuple tuple) {
    List<Object> otherFields = Lists.newArrayList(tuple.getValues());
    Object obj = otherFields.remove(0);
    Long count = (Long) otherFields.remove(0);
    return new RankableObjectWithFields(obj, count, otherFields.toArray());
}

Rankings

Rankings维护需要排序的List, 并提供对List相应的操作

核心的数据结构如下, 用来存储rankable对象的list   
List<Rankable> rankedItems = Lists.newArrayList();

提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)

核心的操作是, 

  
public void updateWith(Rankable r) {
    addOrReplace(r);
    rerank();
    shrinkRankingsIfNeeded();
}

上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的 
1. 替换已有的, 或新增rankable对象(包含obj, count) 
2. 从新排序(Collections.sort) 
3. 由于只需要topN, 所以大于maxsize的需要删除 

AbstractRankerBolt

首先以TopN为参数, 创建Rankings对象

  
private final Rankings rankings;
public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
    count = topN;
    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
    rankings = new Rankings(count);
}

在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple   
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings   
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑

  
public final void execute(Tuple tuple, BasicOutputCollector collector) {
    if (TupleHelpers.isTickTuple(tuple)) {
        emitRankings(collector);
    }
    else {
        updateRankingsWithTuple(tuple);
    }
}

最终将整个rankings列表emit出去 

  
private void emitRankings(BasicOutputCollector collector) {
    collector.emit(new Values(rankings));
    getLogger().info("Rankings: " + rankings);
}

 

TotalRankingsBolt 

该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.   
TotalRankingsBolt同样继承自AbstractRankerBolt

  
void updateRankingsWithTuple(Tuple tuple) {
    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
    super.getRankings().updateWith(rankingsToBeMerged);
}

唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历 

最终可以得到, 全局的TopN的Rankings列表

 

 

http://www.51studyit.com/html/notes/20140329/49.html

分享到:
评论

相关推荐

    storm实时单词计数

    Apache Storm 是一个分布式实时计算系统,它被设计用于处理无界数据流,提供高吞吐量和低延迟的数据处理...通过学习这个例子,我们可以深入理解Storm如何处理实时数据,以及如何设计和实现一个简单的实时数据分析应用。

    Storm流计算项目:1号店电商实时数据分析系统-29.项目2-省份销售排行-前台和图表交互开发和Top N实现.pptx

    在本项目中,可能使用了优先队列或者排序算法来实时维护销售排行的Top N,确保在数据流变化时,排行榜能够迅速更新。 **7. HBase存储** HBase是一个分布式的、面向列的NoSQL数据库,适合存储海量半结构化数据。在...

    storm实现井字棋游戏源码

    井字棋游戏(Tic-Tac-Toe)是一个简单的两人对战游戏,这里用Storm实现,可能是为了展示如何在分布式环境中实时处理和分析游戏状态。 描述中提到的“详情见博文”,指的是CSDN博客上的一篇文章,通过这篇文章我们...

    storm统计单词数的demo

    Apache Storm是一个开源的流处理框架,它允许开发者处理无界数据流,实现高吞吐量、低延迟的数据分析。在《Learning Storm》这本书中,作者深入浅出地介绍了如何利用Storm进行实时数据处理,而本demo正是对书中教程...

    storm自定义计数小案例

    在这个小案例中,我们将学习如何使用Storm来实现一个简单的单词计数功能。 描述中提到的“自定义storm计数小程序”,是指开发人员根据Storm的工作原理编写的一段代码,这个小程序的主要任务是接收输入的文本流,...

    Storm API实现词频统计

    在这个“Storm API实现词频统计”的案例中,我们将深入探讨如何利用Java编程语言和Storm API来构建一个实时的词频统计应用。 首先,我们需要理解Storm的基本架构。Storm由多个组件构成,包括Spout(数据源)、Bolt...

    Storm实现的应用模型研究

    实验结果通常显示,基于Storm实现的数据分析处理系统在性能和可伸缩性方面明显优于传统的大数据分析处理系统。 #### 五、结论 综上所述,Storm作为一种分布式实时计算框架,在大数据实时处理领域展现出了显著的...

    Storm流计算项目:1号店电商实时数据分析系统-30.项目2-省份销售排行-Top N展示优化和项目开发思路总结.pptx

    通过优化Top N展示,前后端分离,以及合理的数据存储和传输策略,实现了高效稳定的数据分析系统。同时,项目还涉及了大数据生态中的多个组件,如Kafka、HBase等,以及Web开发和数据可视化技术。

    storm 实战笔记

    可能会涉及到Top N、滑动窗口等时间窗口操作,以及如何在高并发环境下保证数据的准确性和实时性。 五、Storm之电商交易风控项目实战 在电商交易风控场景中,Storm扮演了实时风险评估的角色。读者将学习如何通过实时...

    中国移动storm项目代码

    8. **数据分析与算法**:计算掉话率可能涉及特定的统计算法,例如滑动窗口计数、比率计算等。理解并实现这些算法对于项目至关重要。 9. **结果存储与展示**:处理后的数据可能需要存储到数据库中,或者通过前端界面...

    storm1.2.1-wangzs-可靠单词计数

    《storm1.2.1-wangzs-可靠单词计数》是基于Apache Storm的一个实践项目,专注于演示如何在分布式环境中实现可靠且精确的单词计数。Apache Storm是一个开源的流处理系统,它允许实时处理数据流,确保每个事件都能得到...

    storm实时数据处理

    书中可能涉及的高阶应用开发包括:窗口化操作(时间窗口、滑动窗口、会话窗口),用于处理数据流的分组策略,如全局分组、字段分组、shuffle分组等,以及复杂的流处理逻辑如状态管理和分布式协调。 六、与其他...

    Storm实时数据处理-超清文字版.pdf

    Storm支持滑动窗口、滚动窗口和会话窗口等多种类型,这些窗口机制让开发者可以根据业务需求灵活控制数据处理的时间范围。 六、 storm Trident Trident是Storm的一个高级接口,它提供了一种更强大的状态管理和精确一...

    storm-wordcount例子

    在实现storm-wordcount时,我们还需要配置拓扑结构,定义各个组件之间的连接关系和数据流方向。Storm提供了灵活的API来创建和提交拓扑,这使得我们可以根据实际需求调整并行度,优化性能。 此外,Storm的容错机制也...

    Storm入门教程 之Storm原理和概念详解

    Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...

    storm企业应用 实战 运维和调优

    - 利用Storm的实时分析能力进行数据聚合、窗口计算和去重等操作。 - 结合Hadoop生态系统中的其他工具(如HBase、Hive)进行大数据的实时分析与批处理整合。 Storm框架在企业级应用中扮演着重要角色,运维和调优...

    Storm-Windowing-Analysis:这个 Repository 有零碎的部分,当然是代码,从分析到在 STORM 中实现窗口机制

    标题中的“Storm-Windowing-Analysis”指的是对Apache Storm中窗口操作的深入理解和实现。Apache Storm是一个开源的分布式实时计算系统,它允许开发者处理无界数据流,而窗口操作是实时流处理中的关键概念,用于将...

    Storm的WordCount实例

    总结,通过这个“Storm的WordCount实例”,我们可以了解到如何使用Apache Storm来处理实时数据流,并实现一个简单但功能强大的计数系统。这不仅展示了Storm的核心概念,也为我们提供了实践分布式实时计算的一个起点...

    storm深入学习.pdf

    深入学习Storm,我们需要理解其核心概念,包括数据模型(topology)、容错机制(ack和fail)、批处理、TOPN算法、流程聚合、DRPC(Direct Remote Procedure Call)以及executor、worker和task的关系及优化,以及如何...

Global site tag (gtag.js) - Google Analytics