- 浏览: 244793 次
- 性别:
- 来自: 南京
文章分类
最新评论
-
phplife:
写的很棒,对我很有帮助,thx!
Erlang OTP学习(1):gen_server -
longshaohang:
写的不错,不过最后markLimit的讨论太少,我的理解是bu ...
BufferedInputStream实现原理分析 -
lin_464025910:
Erlang 很牛逼的 开发语言, 最近才关注 性能确实 不 ...
Erlang 文件处理(读书笔记) -
jaychang:
写的很不错MARK下
BufferedInputStream实现原理分析 -
vavi:
while (count == items.length) ...
Java锁相关总结
上一节(http://diaocow.iteye.com/blog/1734253)我们对TinyMQ进行了概述,这一节我们将着重看一下作者是如何实现消息的发布/订阅
在看源代码之前我们需要了解一些module以及函数的作用:
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
tinymq_sup:start_link()又做了些什么?
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
下面我们看一下另一个child process:
2.supervisor process
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
接下来我们看看Channel服务是如何处理这个订阅请求的
当Channel服务接收到订阅请求后:
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
当Channel服务接收到发消息请求后:
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
在看源代码之前我们需要了解一些module以及函数的作用:
dict:new() 创建一个 Key-Value Dictionary(可以理解为就是一个map) dict:find(Key, Dict) -> {ok, Value} | error 从Dict1查询Key对应的Value,若存在返回 {ok, Value} 否则返回error dict:store(Key, Value, Dict1) -> Dict2 往Dict1中插入一条Key-Value,并且返回新的Dict2(若Key已经存在,则只是更新下相应的Value) ----------------------------------------------------------------------------- lists:foldl(Fun, Acc0, List) -> Acc1 参数Fun是一个函数,它有两个参数:第一个参数是List中的元素(从左->右依次取),第二个参数是Fun函数执行完后的返回值(第一次调用返回值取Acc0),还是来看一个例子: > lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). 15 执行过程: 首先,Fun第一次执行时,X的值取列表List的第一个元素1,Sum取0, Fun第二次执行时,X的值取列表List的第二个元素2,Sum取Fun第一次的返回值1 依次轮推,直到List中每个元素执行完,foldl返回最后一次函数调用的结果。 lists:foldr(Fun, Acc0, List) -> Acc1 同lists:foldl,唯一区别就是从List中去元素时从右->左 ----------------------------------------------------------------------------- gb_trees module gb_trees实现了Prof. Arne Anderssons General Balanced Trees,它是是一种比AVL效率更高的平衡树.作者基于gb_trees写了一个tiny_pq.erl(优先级队列)作为消息的存储结构;该优先级队列中的每一个节点代表一条消息,消息的时间戳代表这个节点的权值或者说是priority
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
start(_StartType, _StartArgs) -> tinymq_sup:start_link().
tinymq_sup:start_link()又做了些什么?
%% 创建supervisor(tinymq_supervisor是一个顶级监控者(可以参看上一节的监控树图)) start_link() -> supervisor:start_link(?MODULE, []). %% tinymq_supervisor监控两个child process: %% 一个是worker proces 用来控制执行mq的相关功能 %% 另一个是supervisor process 用来监控所有Channel服务 init([]) -> %% child process属性(字段具体含义可以查阅erlang文档) MqWorker = {mq_controller, {tinymq_controller, start_link, []}, permanent, 2000, worker, [tinymq_controller]}, ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []}, permanent, 2000, supervisor, [tinymq_channel_sup]}, Children = [MqWorker, ChannelSup], %% 重启策略是:one_for_one,重启极限是10秒中内重启了10次 RestartStrategy = {one_for_one, 10, 10}, {ok, {RestartStrategy, Children}}.
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
tinymq_controller.erl文件 -record(state, {dict, max_age}). %% 采用gen_server模式,注册了一个名为tinymq的服务 start_link() -> gen_server:start_link({local, tinymq}, ?MODULE, [], []). init([]) -> %% 获取环境变量:消息超时时间(Application Resource File配置的是60s) {ok, MaxAgeSeconds} = application:get_env(max_age), %% 初始化tinymq服务状态(这个状态包含两部分,一部分是Channel和ChannelPid关系的一组列表,另一个是消息默认超时时间) {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
下面我们看一下另一个child process:
2.supervisor process
tinymq_channel_sup.erl文件 %% 创建一个supervisor(tinymq_channel_supervisor) start_link() -> supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []). %% 该supervior监控的child process 是动态添加的 init(_StartArgs) -> %% child process 属性 {ok, {{simple_one_for_one, 0, 10}, [ {mq_channel_controller, {tinymq_channel_controller, start_link, []}, temporary, 2000, worker, [tinymq_channel_controller]} ]}}.
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
tinymq_controller文件: %% 接收到客户端发出的订阅请求 handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) -> %% 获取该Channel服务对应的的pid(若是一个新的Channel则为其创建一个新的Process) {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将客户端的订阅请求交给这个Channle服务处理 gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}), {noreply, NewState}; find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) -> %% 检查该Channel名对应的Channel服务是否存在,若存在返回相应的Pid case dict:find(Channel, Chan2Pid) of {ok, Pid} -> {Pid, State}; _ -> %% 若不存在则创建该Channel服务并添加到 tinymq_channel_sup监控树下 {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]), %% 维护新的Channel和ChannelPid关系列表(add) {ChannelPid, State#state{ dict = dict:store(Channel, ChannelPid, Chan2Pid) }} end.
接下来我们看看Channel服务是如何处理这个订阅请求的
tinymq_channel_controller文件 %% 启动一个Channel服务 start_link(MaxAge, ChannelSup, Channel) -> gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []). %% 当我们新创建一个Channel服务的时候,它会初始化一些状态,这个状态主要包含以下几部分: %% ---Channel名(其实可以理解为topic名) %% ---消息队列(实际上是一个平衡树,每一个节点的权值就是消息的放入时间) %% ---订阅者列表(通过这个订阅者列表可以将消息派送给每一个订阅者) %% ---消息超时时间(当消息超时后,将会从消息队列中移除) %% ---上一次清除超时消息的时间(默认情况下清除超时消息间隔不小于1秒) %% ---上一次派发消息时间 init([MaxAge, ChannelSup, Channel]) -> {ok, #state{ max_age = MaxAge, supervisor = ChannelSup, channel = Channel, messages = gb_trees:empty(), last_pull = now_to_micro_seconds(erlang:now()), last_purge = now_to_micro_seconds(erlang:now()) }, MaxAge * 1000}.
当Channel服务接收到订阅请求后:
tinymq_channel_controller文件 %% 处理订阅请求 handle_cast({From, subscribe, 'now', Subscriber}, State) -> %% 更新订阅者列表(若是一个新的订阅者则添加到订阅者列表,否则直接返回) NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), %% 告诉客户端订阅成功了(至此客户端就可以接受发送到这个Channle(topic)中的消息了) gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}), %% 删除消息队列中的超时消息,返回新的Channel state %% 文档原文如下:Purging old messages occurs after any channel activity (but no more than once per second) {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })}; %% 添加订阅者 add_subscriber(NewSubscriber, Subscribers) -> case lists:keymember(NewSubscriber, 2, Subscribers) of %% 若在订阅者列表中存在,则什么也不做 true -> Subscribers; %% 否则添加到订阅者列表中{monitorSubscriberPid, SubscriberPid} false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers] end. %% 清理消息队列中已经超时的消息(操作间隔不小于1秒) purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), %% now_to_micro_seconds,这个方法就是把系统时间转换成秒 LastPurge = State#state.last_purge, %% 上一次清理消息队列时间 Duration = seconds_to_micro_seconds(1), %% 操作间隔(默认是1秒) if Now - LastPurge > Duration -> State#state{ %% @tiny_pq:prune_old(Tree, Priority) -> Tree1 Remove nodes with priority less than or equal to `Priority' %% 这里的Tree就是消息队列messages, Priority就是Time(当前时间-消息超时时间,若某个消息的Priority小于这个,说明已经超时了) messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), %% 更新上一次清理消息队列时间 last_purge = Now }; true -> State end.
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
tinymq_controller文件: %% 接受到客户端的发布消息请求 handle_call({push, Channel, Message}, From, State) -> %% 获取该Channel服务对应的的pid,同之前分析 {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将该消息转发给响应的Channel process处理 gen_server:cast(ChannelPid, {From, push, Message}), {noreply, NewState};
当Channel服务接收到发消息请求后:
tinymq_channel_controller文件 (代码我做了一点修改,作者原来只要订阅者成功接受到一条消息,就自动取消订阅,我改成可以持续订阅): %% 处理发布消息请求 handle_cast({From, push, Message}, State) -> Now = now_to_micro_seconds(erlang:now()), %% 向每个订阅者发送这条消息 LastPull = lists:foldr(fun({Ref, Sub}, _) -> Sub ! {self(), Now, [Message]}, %% erlang:demonitor(Ref), 取消monitor Now %% 函数调用的返回值:上一次派发消息时间 end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers是一个List,维护着该Channel的所有订阅者 %% 告诉发布者(publisher)消息发送成功 gen_server:reply(From, {ok, Now}), %% 删除消息队列中超时的消息 State2 = purge_old_messages(State), %% 将该条消息插入消息队列(priorit,message, tree) NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), %% 作者原来设计:Subscribers are removed from the channel as soon as the first message is delivered 他会:subscribers = [] {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
-module(subscriber). -export([start/1]). start(Num) -> create_subscriber(Num). create_subscriber(0) -> ok; create_subscriber(Num) -> spawn(fun subscribe/0), create_subscriber(Num - 1). subscribe() -> tinymq:subscribe("hello_channel", now, self()), loop(). loop() -> receive {_From, _Timestamp, Messages} -> io:format("Pid: ~p received messages: ~p~n", [self(), Messages]) end, loop().
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
-module(publisher). -export([start/0]). start() -> spawn(fun publish/0). publish() -> Messages = {hello_erlang, erlang:now()}, io:format("Pid: ~p send messages: ~p~n", [self(), Messages]), tinymq:push("hello_channel", Messages), receive after 5000 -> true end, publish().
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
评论
3 楼
standalone
2012-12-28
原来是这样。。。受教了!谢谢!
2 楼
DiaoCow
2012-12-28
standalone 写道
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
1 楼
standalone
2012-12-28
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
发表评论
-
Erlang rebar源码学习(二)
2013-01-28 11:27 2127之前说了rebar编译的核心部分(rebar_base_com ... -
Erlang rebar源码学习(一)
2013-01-27 17:34 5555最近看霸爷的微博(http ... -
Eralng ets学习总结
2013-01-13 13:57 10307ets是什么? ets是Erlang Te ... -
Erlang 文件处理(读书笔记)
2013-01-08 12:57 12339今天看了下erlang file章节,内容感觉比较散,现在做个 ... -
Erlang OTP学习(3):supervisor
2013-01-05 21:11 10585今天细致的看了下supervisor,现在做个总结: 其中, ... -
Erlang 并发错误处理
2013-01-04 19:51 1742这一章有三个关键的概 ... -
Erlang receive代码块
2012-12-31 11:28 7176receive代码块是如何执行的呢? process会尝 ... -
Erlang OTP学习(2):gen_event
2012-12-30 17:26 4564说完了gen_server,今天我们来看看gen_event。 ... -
Erlang OTP学习(1):gen_server
2012-12-28 19:20 12595在《Programming Erlang》的OTP intro ... -
Erlang 单元测试
2012-12-24 23:29 4826今天学习了下Erlang单元测试,发现非常有用,现在做个总结: ... -
TinyMQ学习(1) 概述
2012-11-24 00:27 2576最近在学习erlang,了解了下它的基本语法以及相关特性,但是 ...
相关推荐
基于python爬虫学习项目源码.zip基于python爬虫学习项目源码.zip基于python爬虫学习项目源码.zip基于python爬虫学习项目源码.zip基于python爬虫学习项目源码.zip基于python爬虫学习项目源码.zip基于python爬虫学习...
微信小游戏源码 猫咪游戏源码(仅用于学习参考)微信小游戏源码 猫咪游戏源码(仅用于学习参考)微信小游戏源码 猫咪游戏源码(仅用于学习参考)微信小游戏源码 猫咪游戏源码(仅用于学习参考)微信小游戏源码 猫咪...
MySQL学习源码(MySQL入门教程).zipMySQL学习源码(MySQL入门教程).zipMySQL学习源码(MySQL入门教程).zipMySQL学习源码(MySQL入门教程).zipMySQL学习源码(MySQL入门教程).zipMySQL学习源码(MySQL入门教程).zipMySQL...
[178]VC源码:串口通讯.zip上位机开发VC串口学习资料源码下载[178]VC源码:串口通讯.zip上位机开发VC串口学习资料源码下载[178]VC源码:串口通讯.zip上位机开发VC串口学习资料源码下载[178]VC源码:串口通讯.zip...
美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及源码.zip美赛备战学习资料及...
通过分析和学习这些源码,开发者能够提升自己的编程技能,进一步掌握.NET Framework 应用程序开发的深层次知识。随着软件界面的日趋复杂,理解并能够运用这些源码中的概念和技巧,对于任何有意深入 Windows Forms ...
5. **图形化界面设计**:FreeScada2的用户界面是其重要组成部分,源码中会包含用C#和WPF(Windows Presentation Foundation)或者WinForms创建的控件和布局,可以学习到如何设计可视化图表、趋势曲线、报警窗口等。...
本教程将深入探讨如何使用Camera2 API创建一个自定义相机应用,通过分析提供的OneSelfCamera源码,我们可以学习到以下几个关键知识点: 1. **Camera2 API基础**:Camera2 API取代了旧版的Camera API,提供了更多的...
易语言源码神经网络学习易语言源码.rar 易语言源码神经网络学习易语言源码.rar 易语言源码神经网络学习易语言源码.rar 易语言源码神经网络学习易语言源码.rar 易语言源码神经网络学习易语言源码.rar 易语言源码...
总的来说,这份"完整卡牌游戏源码"涵盖了游戏开发的多个方面,从基础的框架使用到复杂的逻辑设计,对于想要学习游戏开发,特别是卡牌游戏类型的开发者来说,是一份非常有价值的参考资料。通过深入研究源码,可以提升...
深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类...
**Angular2实例源码解析** ...通过分析和研究这个Angular2实例源码,我们可以提升对框架的理解,学习最佳实践,以及如何解决实际项目中的问题。同时,也可以借鉴其结构设计,为自己的项目提供参考。
微信小游戏源码 2048游戏源码(仅用于学习参考)微信小游戏源码 2048游戏源码(仅用于学习参考)微信小游戏源码 2048游戏源码(仅用于学习参考)微信小游戏源码 2048游戏源码(仅用于学习参考)微信小游戏源码 2048...
微信小游戏源码 捕鱼游戏源码(仅用于学习参考)微信小游戏源码 捕鱼游戏源码(仅用于学习参考)微信小游戏源码 捕鱼游戏源码(仅用于学习参考)微信小游戏源码 捕鱼游戏源码(仅用于学习参考)微信小游戏源码 捕鱼...
2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设项目,作为参考资料学习借鉴。 3、本资源作为“参考资料”如果需要实现其他功能,需要能看懂代码,并且热爱钻研,自行调试。 基于...
【图像识别】基于yolo v2深度学习检测识别车辆matlab源码.md
2、机器学习数据处理库-pandas示例教程(源码示例) 3、机器学习可视化库-Matplotlib示例教程(源码示例) 4、机器学习可视化库-Seaborn示例教程(源码示例) 5、K近邻算法实战(源码示例) 6、常用机器学习算法原理...
这个实例源码使用的是Spring 3.2、Struts2 2.3.4和Hibernate 4.2这三个框架的较新版本,提供了一个基础的用户登录和用户管理功能的实现。下面我们将详细探讨这些技术及其在项目中的应用。 **Spring框架**: Spring...
通过这个源码学习指南,你可以深入了解 Spring MVC 的内部机制,学习如何创建控制器、配置处理器映射器和适配器、编写拦截器,以及如何利用各种注解实现数据绑定和验证。此外,还能掌握视图解析和异常处理策略,这...
在这个"Unity3D制作的2D游戏"项目中,包含的源码为我们提供了深入理解Unity2D游戏开发的宝贵机会。 Unity3D的2D功能非常强大,它支持精灵(Sprites)、物理引擎、2D光照、UI系统等,这些都是制作2D游戏的关键元素。...