`
DiaoCow
  • 浏览: 244197 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

TinyMQ学习(2) 源码

阅读更多
上一节(http://diaocow.iteye.com/blog/1734253)我们对TinyMQ进行了概述,这一节我们将着重看一下作者是如何实现消息的发布/订阅


在看源代码之前我们需要了解一些module以及函数的作用:

dict:new()
创建一个 Key-Value Dictionary(可以理解为就是一个map)

dict:find(Key, Dict) -> {ok, Value} | error
从Dict1查询Key对应的Value,若存在返回 {ok, Value} 否则返回error

dict:store(Key, Value, Dict1) -> Dict2
往Dict1中插入一条Key-Value,并且返回新的Dict2(若Key已经存在,则只是更新下相应的Value)

-----------------------------------------------------------------------------
lists:foldl(Fun, Acc0, List) -> Acc1
参数Fun是一个函数,它有两个参数:第一个参数是List中的元素(从左->右依次取),第二个参数是Fun函数执行完后的返回值(第一次调用返回值取Acc0),还是来看一个例子:
> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
15
执行过程:
首先,Fun第一次执行时,X的值取列表List的第一个元素1,Sum取0,
Fun第二次执行时,X的值取列表List的第二个元素2,Sum取Fun第一次的返回值1
依次轮推,直到List中每个元素执行完,foldl返回最后一次函数调用的结果。

lists:foldr(Fun, Acc0, List) -> Acc1
同lists:foldl,唯一区别就是从List中去元素时从右->左

-----------------------------------------------------------------------------
gb_trees module
gb_trees实现了Prof. Arne Anderssons General Balanced Trees,它是是一种比AVL效率更高的平衡树.作者基于gb_trees写了一个tiny_pq.erl(优先级队列)作为消息的存储结构;该优先级队列中的每一个节点代表一条消息,消息的时间戳代表这个节点的权值或者说是priority


看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:

tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)

tinymq_app.erl 是一个Application Callback Module,用来启动整个应用

start(_StartType, _StartArgs) ->
    tinymq_sup:start_link().   

tinymq_sup:start_link()又做了些什么?

%% 创建supervisor(tinymq_supervisor是一个顶级监控者(可以参看上一节的监控树图))
start_link() ->
    supervisor:start_link(?MODULE, []).

%% tinymq_supervisor监控两个child process:
%% 一个是worker proces 用来控制执行mq的相关功能
%% 另一个是supervisor process 用来监控所有Channel服务
init([]) ->

	%% child process属性(字段具体含义可以查阅erlang文档)
    MqWorker = {mq_controller, {tinymq_controller, start_link, []},
           permanent, 2000, worker, [tinymq_controller]},

    ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []},
                  permanent, 2000, supervisor, [tinymq_channel_sup]},

    Children = [MqWorker, ChannelSup],

    %% 重启策略是:one_for_one,重启极限是10秒中内重启了10次
    RestartStrategy = {one_for_one, 10, 10},
    {ok, {RestartStrategy, Children}}.

至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:

1.worker process

tinymq_controller.erl文件

-record(state, {dict, max_age}).

%% 采用gen_server模式,注册了一个名为tinymq的服务
start_link() ->
    gen_server:start_link({local, tinymq}, ?MODULE, [], []).

init([]) ->
    %% 获取环境变量:消息超时时间(Application Resource File配置的是60s)
    {ok, MaxAgeSeconds} = application:get_env(max_age), 
    %% 初始化tinymq服务状态(这个状态包含两部分,一部分是Channel和ChannelPid关系的一组列表,另一个是消息默认超时时间)
    {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
 
下面我们看一下另一个child process:

2.supervisor process

tinymq_channel_sup.erl文件

%% 创建一个supervisor(tinymq_channel_supervisor)
start_link() ->
    supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []).

%% 该supervior监控的child process 是动态添加的
init(_StartArgs) ->
	%% child process 属性
    {ok, {{simple_one_for_one, 0, 10},
          [
           {mq_channel_controller, {tinymq_channel_controller, start_link, []},
            temporary, 2000, worker, [tinymq_channel_controller]}
          ]}}.


现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)

tinymq_controller文件:

%% 接收到客户端发出的订阅请求
handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) ->    
    %% 获取该Channel服务对应的的pid(若是一个新的Channel则为其创建一个新的Process)
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %% 将客户端的订阅请求交给这个Channle服务处理
    gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}),
    {noreply, NewState};

find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) ->
    %% 检查该Channel名对应的Channel服务是否存在,若存在返回相应的Pid
    case dict:find(Channel, Chan2Pid) of
        {ok, Pid} ->
            {Pid, State};
        _ ->
            %% 若不存在则创建该Channel服务并添加到 tinymq_channel_sup监控树下
            {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]),
            %% 维护新的Channel和ChannelPid关系列表(add) 
            {ChannelPid, State#state{
                    dict = dict:store(Channel, ChannelPid, Chan2Pid)
                }} 
    end.

接下来我们看看Channel服务是如何处理这个订阅请求的

tinymq_channel_controller文件

%% 启动一个Channel服务
start_link(MaxAge, ChannelSup, Channel) ->
    gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []).

%% 当我们新创建一个Channel服务的时候,它会初始化一些状态,这个状态主要包含以下几部分:
%% ---Channel名(其实可以理解为topic名)
%% ---消息队列(实际上是一个平衡树,每一个节点的权值就是消息的放入时间)
%% ---订阅者列表(通过这个订阅者列表可以将消息派送给每一个订阅者)
%% ---消息超时时间(当消息超时后,将会从消息队列中移除)
%% ---上一次清除超时消息的时间(默认情况下清除超时消息间隔不小于1秒)
%% ---上一次派发消息时间
init([MaxAge, ChannelSup, Channel]) ->
    {ok, #state{
            max_age = MaxAge,
            supervisor = ChannelSup,       
            channel = Channel,             
            messages = gb_trees:empty(),   
            last_pull = now_to_micro_seconds(erlang:now()),    
            last_purge = now_to_micro_seconds(erlang:now()) },  
     MaxAge * 1000}.

当Channel服务接收到订阅请求后:

tinymq_channel_controller文件

%%  处理订阅请求
handle_cast({From, subscribe, 'now', Subscriber}, State) ->
    %% 更新订阅者列表(若是一个新的订阅者则添加到订阅者列表,否则直接返回)
    NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), 
    %% 告诉客户端订阅成功了(至此客户端就可以接受发送到这个Channle(topic)中的消息了)
    gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),     
    %% 删除消息队列中的超时消息,返回新的Channel state
    %% 文档原文如下:Purging old messages occurs after any channel activity (but no more than once per second)
    {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })};  

%% 添加订阅者
add_subscriber(NewSubscriber, Subscribers) ->
        case lists:keymember(NewSubscriber, 2, Subscribers) of
        %% 若在订阅者列表中存在,则什么也不做
        true -> Subscribers; 
        %% 否则添加到订阅者列表中{monitorSubscriberPid, SubscriberPid}
        false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers]  
    end.

%% 清理消息队列中已经超时的消息(操作间隔不小于1秒)
purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),	%% now_to_micro_seconds,这个方法就是把系统时间转换成秒
    LastPurge = State#state.last_purge,     	%% 上一次清理消息队列时间
    Duration = seconds_to_micro_seconds(1), 	%% 操作间隔(默认是1秒)
    if
        Now - LastPurge > Duration ->    
            State#state{ 
                %% @tiny_pq:prune_old(Tree, Priority) -> Tree1  Remove nodes with priority less than or equal to `Priority'
                %% 这里的Tree就是消息队列messages, Priority就是Time(当前时间-消息超时时间,若某个消息的Priority小于这个,说明已经超时了)
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                %% 更新上一次清理消息队列时间
                last_purge = Now };
        true ->
            State
    end.

至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。

现在我们来看看tinymq如何响应客户端的发消息请求(publish request).

记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理

tinymq_controller文件:

%% 接受到客户端的发布消息请求
handle_call({push, Channel, Message}, From, State) ->
    %% 获取该Channel服务对应的的pid,同之前分析
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %% 将该消息转发给响应的Channel process处理
    gen_server:cast(ChannelPid, {From, push, Message}),
    {noreply, NewState};

当Channel服务接收到发消息请求后:

tinymq_channel_controller文件
(代码我做了一点修改,作者原来只要订阅者成功接受到一条消息,就自动取消订阅,我改成可以持续订阅):

%% 处理发布消息请求
handle_cast({From, push, Message}, State) ->
    Now = now_to_micro_seconds(erlang:now()),  

    %% 向每个订阅者发送这条消息
    LastPull = lists:foldr(fun({Ref, Sub}, _) -> 
                Sub ! {self(), Now, [Message]},   
                %% erlang:demonitor(Ref), 取消monitor 
                Now    %% 函数调用的返回值:上一次派发消息时间
        end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers是一个List,维护着该Channel的所有订阅者

    %% 告诉发布者(publisher)消息发送成功
    gen_server:reply(From, {ok, Now}),  
    %% 删除消息队列中超时的消息
    State2 = purge_old_messages(State),
    %% 将该条消息插入消息队列(priorit,message, tree)                                    
    NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), 
    %% 作者原来设计:Subscribers are removed from the channel as soon as the first message is delivered 他会:subscribers = []
    {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};


发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解

至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:

订阅者:

-module(subscriber).
-export([start/1]).

start(Num) ->
	create_subscriber(Num).

create_subscriber(0) -> ok;
create_subscriber(Num) ->
	spawn(fun subscribe/0),
	create_subscriber(Num - 1).

subscribe() ->
	tinymq:subscribe("hello_channel", now, self()),
	loop().

loop() ->
	receive
   		{_From, _Timestamp, Messages} ->
        	io:format("Pid: ~p received messages: ~p~n", [self(), Messages])
	end,
	loop().

start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...

发布者

-module(publisher).
-export([start/0]).

start() ->
	spawn(fun publish/0).

publish() ->
	Messages = {hello_erlang, erlang:now()},
    io:format("Pid: ~p send messages: ~p~n", [self(), Messages]),
	tinymq:push("hello_channel", Messages),

	receive
	after 5000 ->
		true
	end,

	publish().

发布者每个5秒向Channel里发送消息

最后,在看一下执行效果图



关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看



ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....






  • 大小: 85.8 KB
0
0
分享到:
评论
3 楼 standalone 2012-12-28  
原来是这样。。。受教了!谢谢!
2 楼 DiaoCow 2012-12-28  
standalone 写道
分析的很好。

问一下,我看到
tinymq__channel_controller.erl里面的函数

purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),
    LastPurge = State#state.last_purge,
    Duration = seconds_to_micro_seconds(1),
    if
        Now - LastPurge > Duration ->
            State#state{ 
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                last_purge = Now };
        true ->
            State
    end.


用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?



不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案 
1 楼 standalone 2012-12-28  
分析的很好。

问一下,我看到
tinymq__channel_controller.erl里面的函数

purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),
    LastPurge = State#state.last_purge,
    Duration = seconds_to_micro_seconds(1),
    if
        Now - LastPurge > Duration ->
            State#state{ 
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                last_purge = Now };
        true ->
            State
    end.


用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?

相关推荐

    在线学习平台(易学堂学习系统)源码

    2. **源码结构**:从压缩包中的文件名称列表可以看出,源码包含了多个关键组件。例如,`index.php`是网站的入口文件,处理用户请求;`license.php`可能包含软件的授权信息和使用条款;`资源说明.txt`可能提供了关于...

    微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)

    微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)...

    Mir2源码完整版(服务端+客户端)

    总的来说,Mir2源码完整版为游戏开发者提供了一个全面的学习平台,涵盖了网络编程、数据库设计、图形渲染、多线程技术等多个领域,对于提升个人技能和团队项目开发能力都有极大的帮助。同时,通过对比分析不同的Mir...

    机器学习实例源码

    Java机器学习实例源码,一共有10个源码示例,机器学习入门。

    完整卡牌游戏源码(学习参考)

    总的来说,这份"完整卡牌游戏源码"涵盖了游戏开发的多个方面,从基础的框架使用到复杂的逻辑设计,对于想要学习游戏开发,特别是卡牌游戏类型的开发者来说,是一份非常有价值的参考资料。通过深入研究源码,可以提升...

    cocos2d-x游戏源码

    本资源是一个基于cocos2d-x 2.1.0版本的闯关类游戏源码,对于想要学习游戏开发,尤其是cocos2d-x平台的开发者来说,这是一个很好的学习材料。 首先,我们要理解cocos2d-x的核心特性。它支持跨平台开发,可以在iOS、...

    深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip

    深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类...

    extjs4 学习笔记源码

    exjts4 学习笔记源码,源码包含windws,hbox,vbox和Grid的应用,其中grid介绍比较多。下载解压后,部署后就可以使用,所有代码均在demo文件夹下。更多extjs4教程,请关注http://www.mhzg.net

    机器学习房价预测实战源码.zip

    机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器...

    【图像识别】基于yolo v2深度学习检测识别车辆matlab源码.md

    【图像识别】基于yolo v2深度学习检测识别车辆matlab源码.md

    在线学习网站DEMO源码(JAVA 毕业设计 J2EE)

    在线学习网站DEMO源码基于JAVA技术和J2EE框架,为开发者提供了一个完整的在线教育平台的实例。这个项目是JAVA毕业设计的一部分,旨在帮助学生理解并实践企业级应用的开发流程,同时也适合进行课程设计或者技术研究。...

    SSH2框架搭建实例源码

    这个实例源码使用的是Spring 3.2、Struts2 2.3.4和Hibernate 4.2这三个框架的较新版本,提供了一个基础的用户登录和用户管理功能的实现。下面我们将详细探讨这些技术及其在项目中的应用。 **Spring框架**: Spring...

    cocos2dx 游戏开发系列之三 源码

    通过这样的示例,你可以学习如何在Windows环境下构建和运行Android游戏,了解cocos2dx在不同平台上的兼容性和调试技巧。 总的来说,cocos2dx提供的源码和Android工程创建能力,使得开发者能够轻松地进行跨平台游戏...

    ASP.NET在线学习平台网站源码

    ASP.NET在线学习平台网站源码是一个专为学院内部管理设计的Web应用程序,它利用Microsoft的ASP.NET技术构建,提供了一套完整的在线学习和管理解决方案。ASP.NET是.NET框架的一部分,是一个用于构建动态网站、Web应用...

    渗透学习测试网站源码打包带走

    渗透学习测试网站源码打包带走。asp+access注入源码;cookie注入源码;DVWA漏洞平台测试源码;ewebeditor;fck漏洞源码;fck突破“_”源码;iis6.0解析漏洞源码;phpcms2008;php包含漏洞源码;WEBSHELL箱子系统V2.0...

    易语言学习进阶返回命令方法2源码.rar

    易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶...

    Unity3D制作的2D游戏)[含项目源码]

    在这个"Unity3D制作的2D游戏"项目中,包含的源码为我们提供了深入理解Unity2D游戏开发的宝贵机会。 Unity3D的2D功能非常强大,它支持精灵(Sprites)、物理引擎、2D光照、UI系统等,这些都是制作2D游戏的关键元素。...

    24个scratch小游戏源码.zip

    24个scratch小游戏源码,scratch小游戏合集,适合小朋友自学。新颖、有趣的游戏制作可以激发孩子的学习兴趣。 包括: BoatRace.sb2 俄罗斯方块.sb2 养鱼.sb2 坦克大战.sb2 太空迷航.sb2 夹娃娃.sb3 宇宙大战.sb2 小猫...

    ACT动作游戏《武士2》源码

    花了两周时间,参考的商业游戏《武士2》源码,在5.61版本上进行重写,改写,高度还原了原始的作品。学习到了挺多的东西,真正的感受到商业代码框架的优良,重点学习了动画系统,由于是老版的动画,所以参考的写了一...

    网狐6.6完整源码+内核源码+105款游戏源码(已解密).zip

    通过对比分析这些源码,开发者可以学习到不同的游戏设计思路,理解不同游戏机制的实现方式,从而提高自身的游戏开发技能。 此外,"已解密"的标签意味着这些源码已经去除了保护措施,可以直接阅读和学习,这对于初学...

Global site tag (gtag.js) - Google Analytics