`
wbj0110
  • 浏览: 1619361 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Twitter Storm源代码分析之TimeCacheMap

阅读更多

TimeCacheMap是Twitter Storm里面一个类, Storm使用它来保存那些最近活跃的对象,并且可以自动删除那些已经过期的对象。这个类设计的很巧妙, 我们来看一下。

TimeCacheMap里面的数据是保存在内部变量_bucket里面的:

1
private LinkedList<HashMap<K, V>> _buckets;

在这点上跟ConcurrentHashMap有点类似, ConcurrentHashMap是利用多个bucket来缩小锁的粒度, 从而实现高并发的读写。而TimeCacheMap则是利用多个bucket来使得数据清理线程占用锁的时间最小。

首先来看看TimeCacheMap的构造函数, 它的构造函数首先是生成numBuckets个空的HashMap:

1
2
3
4
_buckets = new LinkedList<HashMap<K, V>>();
for(int i=0; i<numBuckets; i++) {
    _buckets.add(new HashMap<K, V>());
}

然后就是最关键的清理线程部分,TimeCacheMap使用一个单独的线程来清理那些过期的数据:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis/(numBuckets-1);
_cleaner = new Thread(new Runnable() {
    public void run() {
        try {
            while(true) {
                Map<K, V> dead = null;
                //
                Time.sleep(sleepTime);
                synchronized(_lock) {
                  dead = _buckets.removeLast();
                  _buckets.addFirst(
                             new HashMap<K, V>());
                }
                if(_callback!=null) {
                    for(Entry<K, V> entry:
                                dead.entrySet()) {
                      _callback.expire(entry.getKey(),
                                    entry.getValue());
                    }
                }
            }
        } catch (InterruptedException ex) {
 
        }
    }
});
_cleaner.setDaemon(true);
_cleaner.start();

这个线程每隔 expirationSecs / (numBuckets - 1) 秒钟的时间去把最后一个bucket里面的数据全部都删除掉 — 这些被删除掉的数据其实就是过期的数据。(为什么不是每隔expirationSecs就来删除一次呢?我们下面会说)。这里值得注意的是:正是因为这种分成多个桶的机制, 清理线程对于_lock的占用时间极短。只要把最后一个bucket从_buckets解下,并且向头上面添加一个新的bucket就好了:

1
2
3
4
synchronized(_lock) {
    dead = _buckets.removeLast();
    _buckets.addFirst(new HashMap<K, V>());
}

如果不是这种机制的话, 那我能想到的最傻的办法可能就是给条数据一个过期时间字段, 然后清理线程就要遍历每条数据来检查数据是否过期了。那显然要HOLD住这个锁很长时间了。

同时对于每条过期的数据TimeCacheMap会执行我们的callback函数:

1
2
3
4
5
6
if(_callback!=null) {
     for(Entry<K, V> entry: dead.entrySet()) {
          _callback.expire(entry.getKey(),
                           entry.getValue());
     }
}

大致机制就是这样,那么我们现在回过头来看看前面的那个问题: 为什么这个清理线程是每隔expirationSecs / (numBuckets - 1) 秒的时间来检查,这样对吗?TimeCacheMap的内部有多个桶, 当你向这个TimeCacheMap里面添加数据的时候,数据总是添加到第一个桶里面去的。

01
02
03
04
05
06
07
08
09
10
11
12
public void put(K key, V value) {
    synchronized(_lock) {
        Iterator<HashMap<K, V>> it =
                         _buckets.iterator();
        HashMap<K, V> bucket = it.next();
        bucket.put(key, value);
        while(it.hasNext()) {
            bucket = it.next();
            bucket.remove(key);
        }
    }
}

我们看个例子就明白了,假设 numBuckets = 3, expirationSecs = 2
我们先往里面填一条数据{1: 1}, 这条数据被加到第一个桶里面去, 现在TimeCacheMap的状态是:

1
[{1:1}, {}, {}]

过了1秒钟之后(expirationSecs / (numBuckets - 1) = 2 / (3 - 1) = 1)。清理线程干掉最后一个HashMap,并且在头上添加一个新的空HashMap, 现在TimeCacheMap的状态是:

1
[{}, {1:1}, {}]

再过了一秒钟, 同上, TimeCacheMap的状态会变成:

1
[{}, {}, {1:1}]

再过一秒钟, 现在{1:1}是最后一个TimeCacheMap了,就被干掉了。
所以从{1:1}被加入到这个TimeCacheMap到被干掉一共用了3秒,其实这个3秒就等于

1
3 = expirationSecs * ( 1 + 1 / (numBuckets - 1))

它的注释里面也提到了这一点

Expires keys that have not been updated in the configured number of seconds.
The algorithm used will take between expirationSecs and
expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.

那为什么说时间是expirationSecsexpirationSecs * (1 + 1 / (numBuckets-1))之间呢?因为线程调度的不确定性。

转自淘宝

分享到:
评论

相关推荐

    仿Twitter源代码社交网络源码基于脉聊二开版本带详细安装视频.zip

    仿Twitter源代码 社交网络源码 基于脉聊二开版本这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。...

    仿Twitter源代码 社交网络源码 基于脉聊二开版本 带详细安装视频

    仿Twitter源代码 社交网络源码 基于脉聊二开版本这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。...

    仿Twitter源代码 社交网络源码 基于脉聊二开版本

    这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。注册用户可以分享他 仿Twitter源代码 社交网络源码...

    Twitter storm

    ### Twitter Storm:实时计算系统详解 #### 背景与定义 Twitter Storm 是一款由Apache基金会维护的开源分布式实时计算系统。它最初由BackType公司开发,并于2011年开源,随后被Twitter收购并进一步发展。Storm 的...

    基于java的开发源码-开放实时数据处理平台 Twitter Storm.zip

    基于java的开发源码-开放实时数据处理平台 Twitter Storm.zip 基于java的开发源码-开放实时数据处理平台 Twitter Storm.zip 基于java的开发源码-开放实时数据处理平台 Twitter Storm.zip 基于java的开发源码-开放...

    仿Twitter源代码 社交网络源码 基于脉聊二开版本 网站源码

    这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。注册用户可以分享他 仿Twitter源代码 社交网络源码...

    quakk twitter客户端源代码

    首先,`Quakk.4.0.resharper` 文件是JetBrains Resharper的配置文件,Resharper是一款强大的Visual Studio插件,它提供了代码分析、重构、错误检查等功能,有助于提高开发效率和代码质量。`.resharper`文件存储了...

    twitter源代码

    twitter源代码。。。。。。。。。。。。。。。。。。。。。。。。

    基于Java的实例源码-开放实时数据处理平台 Twitter Storm.zip

    描述"基于Java的实例源码-开放实时数据处理平台 Twitter Storm.zip"进一步确认了这个压缩文件包含的是用于学习和参考的源代码,特别是对于那些想要理解和实施实时数据流处理的开发者来说,这是一个宝贵的资源。...

    基于脉聊二开版本仿Twitter源代码 社交网络平台源码

    这是一款类似于 Twitter 网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。 程序语言是 PHP。数据库采用 Mysql。与此同时系统带有注册,评论,转发等功能。 注册用户可以分享他们的图片视频,或者...

    仿Twitter源代码 社交网络源码

    这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。注册用户可以分享他 仿Twitter源代码 社交网络源码...

    仿Twitter源代码/社交网络源码/基于脉聊二开版本/带详细安装视频

    仿Twitter源代码 社交网络源码 基于脉聊二开版本这是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。...

    基于Java的开放实时数据处理平台 Twitter Storm.zip

    这个技术源自于Twitter,但现在已经成为Apache软件基金会的一个顶级项目,被广泛用于大数据实时分析、日志处理、社交网络分析等场景。 【知识点详解】 1. **实时数据处理**:Twitter Storm的核心功能是实现实时...

    java源码:开放实时数据处理平台 Twitter Storm.zip

    这个压缩包包含的是Nathan Marz开发的Storm项目的源代码,版本号为9a3e1ec。在深入探讨Storm的核心原理和实现细节之前,我们首先来理解一下什么是实时数据处理以及为什么它如此重要。 实时数据处理是指在数据产生的...

    开放实时数据处理平台 Twitter Storm

    开放实时数据处理平台 Twitter Storm

    apache-storm-0.9.5源码

    1. Spout:Spout是Apache Storm的数据输入组件,负责从外部数据源(如Kafka、Twitter或数据库)读取数据并生成数据流。在源码中,`backtype.storm.spout`包包含了各种Spout的实现,如`KafkaSpout`用于从Kafka消费...

    仿Twitter源代码/社交网络源码/基于脉聊二开版本

    仿Twitter源代码是一款类似于Twitter网站的源代码。利用原代码,你可以快速搭建自己的社交网络平台。 程序语言是PHP。数据库采用Mysql。与此同时系统带有注册,评论,转发等功能。 注册用户可以分享他们的图片视频,...

    基于java的开放实时数据处理平台 Twitter Storm.zip

    Storm是由Nathan Marz开发的,并在2011年被Twitter收购,随后成为了其核心的数据处理技术之一。这个平台的核心理念是实现分布式计算的实时性,提供低延迟和高吞吐量的数据处理能力。 【描述】"基于Java的开放实时...

    twitter storm

    ### 分布式与容错实时计算:Twitter Storm详解 #### 引言 随着大数据时代的到来,实时数据处理的需求日益增长。...无论是社交媒体分析、实时数据分析还是流媒体处理等领域,Storm都展现出了其独特的优势和价值。

    storm大数据相关代码

    在这个压缩包中,我们可以预见到与Storm相关的各种源代码、配置文件或者示例项目。 Storm的核心概念包括: 1. **Spout**:数据源,负责产生数据流。可以是任何数据源,如数据库、消息队列等。 2. **Bolt**:数据...

Global site tag (gtag.js) - Google Analytics