- 浏览: 244197 次
- 性别:
- 来自: 南京
文章分类
最新评论
-
phplife:
写的很棒,对我很有帮助,thx!
Erlang OTP学习(1):gen_server -
longshaohang:
写的不错,不过最后markLimit的讨论太少,我的理解是bu ...
BufferedInputStream实现原理分析 -
lin_464025910:
Erlang 很牛逼的 开发语言, 最近才关注 性能确实 不 ...
Erlang 文件处理(读书笔记) -
jaychang:
写的很不错MARK下
BufferedInputStream实现原理分析 -
vavi:
while (count == items.length) ...
Java锁相关总结
上一节(http://diaocow.iteye.com/blog/1734253)我们对TinyMQ进行了概述,这一节我们将着重看一下作者是如何实现消息的发布/订阅
在看源代码之前我们需要了解一些module以及函数的作用:
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
tinymq_sup:start_link()又做了些什么?
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
下面我们看一下另一个child process:
2.supervisor process
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
接下来我们看看Channel服务是如何处理这个订阅请求的
当Channel服务接收到订阅请求后:
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
当Channel服务接收到发消息请求后:
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
在看源代码之前我们需要了解一些module以及函数的作用:
dict:new() 创建一个 Key-Value Dictionary(可以理解为就是一个map) dict:find(Key, Dict) -> {ok, Value} | error 从Dict1查询Key对应的Value,若存在返回 {ok, Value} 否则返回error dict:store(Key, Value, Dict1) -> Dict2 往Dict1中插入一条Key-Value,并且返回新的Dict2(若Key已经存在,则只是更新下相应的Value) ----------------------------------------------------------------------------- lists:foldl(Fun, Acc0, List) -> Acc1 参数Fun是一个函数,它有两个参数:第一个参数是List中的元素(从左->右依次取),第二个参数是Fun函数执行完后的返回值(第一次调用返回值取Acc0),还是来看一个例子: > lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). 15 执行过程: 首先,Fun第一次执行时,X的值取列表List的第一个元素1,Sum取0, Fun第二次执行时,X的值取列表List的第二个元素2,Sum取Fun第一次的返回值1 依次轮推,直到List中每个元素执行完,foldl返回最后一次函数调用的结果。 lists:foldr(Fun, Acc0, List) -> Acc1 同lists:foldl,唯一区别就是从List中去元素时从右->左 ----------------------------------------------------------------------------- gb_trees module gb_trees实现了Prof. Arne Anderssons General Balanced Trees,它是是一种比AVL效率更高的平衡树.作者基于gb_trees写了一个tiny_pq.erl(优先级队列)作为消息的存储结构;该优先级队列中的每一个节点代表一条消息,消息的时间戳代表这个节点的权值或者说是priority
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
start(_StartType, _StartArgs) -> tinymq_sup:start_link().
tinymq_sup:start_link()又做了些什么?
%% 创建supervisor(tinymq_supervisor是一个顶级监控者(可以参看上一节的监控树图)) start_link() -> supervisor:start_link(?MODULE, []). %% tinymq_supervisor监控两个child process: %% 一个是worker proces 用来控制执行mq的相关功能 %% 另一个是supervisor process 用来监控所有Channel服务 init([]) -> %% child process属性(字段具体含义可以查阅erlang文档) MqWorker = {mq_controller, {tinymq_controller, start_link, []}, permanent, 2000, worker, [tinymq_controller]}, ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []}, permanent, 2000, supervisor, [tinymq_channel_sup]}, Children = [MqWorker, ChannelSup], %% 重启策略是:one_for_one,重启极限是10秒中内重启了10次 RestartStrategy = {one_for_one, 10, 10}, {ok, {RestartStrategy, Children}}.
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
tinymq_controller.erl文件 -record(state, {dict, max_age}). %% 采用gen_server模式,注册了一个名为tinymq的服务 start_link() -> gen_server:start_link({local, tinymq}, ?MODULE, [], []). init([]) -> %% 获取环境变量:消息超时时间(Application Resource File配置的是60s) {ok, MaxAgeSeconds} = application:get_env(max_age), %% 初始化tinymq服务状态(这个状态包含两部分,一部分是Channel和ChannelPid关系的一组列表,另一个是消息默认超时时间) {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
下面我们看一下另一个child process:
2.supervisor process
tinymq_channel_sup.erl文件 %% 创建一个supervisor(tinymq_channel_supervisor) start_link() -> supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []). %% 该supervior监控的child process 是动态添加的 init(_StartArgs) -> %% child process 属性 {ok, {{simple_one_for_one, 0, 10}, [ {mq_channel_controller, {tinymq_channel_controller, start_link, []}, temporary, 2000, worker, [tinymq_channel_controller]} ]}}.
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
tinymq_controller文件: %% 接收到客户端发出的订阅请求 handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) -> %% 获取该Channel服务对应的的pid(若是一个新的Channel则为其创建一个新的Process) {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将客户端的订阅请求交给这个Channle服务处理 gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}), {noreply, NewState}; find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) -> %% 检查该Channel名对应的Channel服务是否存在,若存在返回相应的Pid case dict:find(Channel, Chan2Pid) of {ok, Pid} -> {Pid, State}; _ -> %% 若不存在则创建该Channel服务并添加到 tinymq_channel_sup监控树下 {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]), %% 维护新的Channel和ChannelPid关系列表(add) {ChannelPid, State#state{ dict = dict:store(Channel, ChannelPid, Chan2Pid) }} end.
接下来我们看看Channel服务是如何处理这个订阅请求的
tinymq_channel_controller文件 %% 启动一个Channel服务 start_link(MaxAge, ChannelSup, Channel) -> gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []). %% 当我们新创建一个Channel服务的时候,它会初始化一些状态,这个状态主要包含以下几部分: %% ---Channel名(其实可以理解为topic名) %% ---消息队列(实际上是一个平衡树,每一个节点的权值就是消息的放入时间) %% ---订阅者列表(通过这个订阅者列表可以将消息派送给每一个订阅者) %% ---消息超时时间(当消息超时后,将会从消息队列中移除) %% ---上一次清除超时消息的时间(默认情况下清除超时消息间隔不小于1秒) %% ---上一次派发消息时间 init([MaxAge, ChannelSup, Channel]) -> {ok, #state{ max_age = MaxAge, supervisor = ChannelSup, channel = Channel, messages = gb_trees:empty(), last_pull = now_to_micro_seconds(erlang:now()), last_purge = now_to_micro_seconds(erlang:now()) }, MaxAge * 1000}.
当Channel服务接收到订阅请求后:
tinymq_channel_controller文件 %% 处理订阅请求 handle_cast({From, subscribe, 'now', Subscriber}, State) -> %% 更新订阅者列表(若是一个新的订阅者则添加到订阅者列表,否则直接返回) NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), %% 告诉客户端订阅成功了(至此客户端就可以接受发送到这个Channle(topic)中的消息了) gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}), %% 删除消息队列中的超时消息,返回新的Channel state %% 文档原文如下:Purging old messages occurs after any channel activity (but no more than once per second) {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })}; %% 添加订阅者 add_subscriber(NewSubscriber, Subscribers) -> case lists:keymember(NewSubscriber, 2, Subscribers) of %% 若在订阅者列表中存在,则什么也不做 true -> Subscribers; %% 否则添加到订阅者列表中{monitorSubscriberPid, SubscriberPid} false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers] end. %% 清理消息队列中已经超时的消息(操作间隔不小于1秒) purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), %% now_to_micro_seconds,这个方法就是把系统时间转换成秒 LastPurge = State#state.last_purge, %% 上一次清理消息队列时间 Duration = seconds_to_micro_seconds(1), %% 操作间隔(默认是1秒) if Now - LastPurge > Duration -> State#state{ %% @tiny_pq:prune_old(Tree, Priority) -> Tree1 Remove nodes with priority less than or equal to `Priority' %% 这里的Tree就是消息队列messages, Priority就是Time(当前时间-消息超时时间,若某个消息的Priority小于这个,说明已经超时了) messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), %% 更新上一次清理消息队列时间 last_purge = Now }; true -> State end.
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
tinymq_controller文件: %% 接受到客户端的发布消息请求 handle_call({push, Channel, Message}, From, State) -> %% 获取该Channel服务对应的的pid,同之前分析 {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将该消息转发给响应的Channel process处理 gen_server:cast(ChannelPid, {From, push, Message}), {noreply, NewState};
当Channel服务接收到发消息请求后:
tinymq_channel_controller文件 (代码我做了一点修改,作者原来只要订阅者成功接受到一条消息,就自动取消订阅,我改成可以持续订阅): %% 处理发布消息请求 handle_cast({From, push, Message}, State) -> Now = now_to_micro_seconds(erlang:now()), %% 向每个订阅者发送这条消息 LastPull = lists:foldr(fun({Ref, Sub}, _) -> Sub ! {self(), Now, [Message]}, %% erlang:demonitor(Ref), 取消monitor Now %% 函数调用的返回值:上一次派发消息时间 end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers是一个List,维护着该Channel的所有订阅者 %% 告诉发布者(publisher)消息发送成功 gen_server:reply(From, {ok, Now}), %% 删除消息队列中超时的消息 State2 = purge_old_messages(State), %% 将该条消息插入消息队列(priorit,message, tree) NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), %% 作者原来设计:Subscribers are removed from the channel as soon as the first message is delivered 他会:subscribers = [] {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
-module(subscriber). -export([start/1]). start(Num) -> create_subscriber(Num). create_subscriber(0) -> ok; create_subscriber(Num) -> spawn(fun subscribe/0), create_subscriber(Num - 1). subscribe() -> tinymq:subscribe("hello_channel", now, self()), loop(). loop() -> receive {_From, _Timestamp, Messages} -> io:format("Pid: ~p received messages: ~p~n", [self(), Messages]) end, loop().
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
-module(publisher). -export([start/0]). start() -> spawn(fun publish/0). publish() -> Messages = {hello_erlang, erlang:now()}, io:format("Pid: ~p send messages: ~p~n", [self(), Messages]), tinymq:push("hello_channel", Messages), receive after 5000 -> true end, publish().
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
评论
3 楼
standalone
2012-12-28
原来是这样。。。受教了!谢谢!
2 楼
DiaoCow
2012-12-28
standalone 写道
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
1 楼
standalone
2012-12-28
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
发表评论
-
Erlang rebar源码学习(二)
2013-01-28 11:27 2106之前说了rebar编译的核心部分(rebar_base_com ... -
Erlang rebar源码学习(一)
2013-01-27 17:34 5547最近看霸爷的微博(http ... -
Eralng ets学习总结
2013-01-13 13:57 10300ets是什么? ets是Erlang Te ... -
Erlang 文件处理(读书笔记)
2013-01-08 12:57 12332今天看了下erlang file章节,内容感觉比较散,现在做个 ... -
Erlang OTP学习(3):supervisor
2013-01-05 21:11 10571今天细致的看了下supervisor,现在做个总结: 其中, ... -
Erlang 并发错误处理
2013-01-04 19:51 1738这一章有三个关键的概 ... -
Erlang receive代码块
2012-12-31 11:28 7166receive代码块是如何执行的呢? process会尝 ... -
Erlang OTP学习(2):gen_event
2012-12-30 17:26 4551说完了gen_server,今天我们来看看gen_event。 ... -
Erlang OTP学习(1):gen_server
2012-12-28 19:20 12583在《Programming Erlang》的OTP intro ... -
Erlang 单元测试
2012-12-24 23:29 4818今天学习了下Erlang单元测试,发现非常有用,现在做个总结: ... -
TinyMQ学习(1) 概述
2012-11-24 00:27 2569最近在学习erlang,了解了下它的基本语法以及相关特性,但是 ...
相关推荐
2. **源码结构**:从压缩包中的文件名称列表可以看出,源码包含了多个关键组件。例如,`index.php`是网站的入口文件,处理用户请求;`license.php`可能包含软件的授权信息和使用条款;`资源说明.txt`可能提供了关于...
微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)微信小游戏源码 猜数字小游戏源码2(仅用于学习参考)...
总的来说,Mir2源码完整版为游戏开发者提供了一个全面的学习平台,涵盖了网络编程、数据库设计、图形渲染、多线程技术等多个领域,对于提升个人技能和团队项目开发能力都有极大的帮助。同时,通过对比分析不同的Mir...
Java机器学习实例源码,一共有10个源码示例,机器学习入门。
总的来说,这份"完整卡牌游戏源码"涵盖了游戏开发的多个方面,从基础的框架使用到复杂的逻辑设计,对于想要学习游戏开发,特别是卡牌游戏类型的开发者来说,是一份非常有价值的参考资料。通过深入研究源码,可以提升...
本资源是一个基于cocos2d-x 2.1.0版本的闯关类游戏源码,对于想要学习游戏开发,尤其是cocos2d-x平台的开发者来说,这是一个很好的学习材料。 首先,我们要理解cocos2d-x的核心特性。它支持跨平台开发,可以在iOS、...
深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类识别项目源码.zip深度学习大作业基于TensorFlow 2实现手写数字识别和猫狗分类...
exjts4 学习笔记源码,源码包含windws,hbox,vbox和Grid的应用,其中grid介绍比较多。下载解压后,部署后就可以使用,所有代码均在demo文件夹下。更多extjs4教程,请关注http://www.mhzg.net
机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器学习房价预测实战源码.zip机器...
【图像识别】基于yolo v2深度学习检测识别车辆matlab源码.md
在线学习网站DEMO源码基于JAVA技术和J2EE框架,为开发者提供了一个完整的在线教育平台的实例。这个项目是JAVA毕业设计的一部分,旨在帮助学生理解并实践企业级应用的开发流程,同时也适合进行课程设计或者技术研究。...
这个实例源码使用的是Spring 3.2、Struts2 2.3.4和Hibernate 4.2这三个框架的较新版本,提供了一个基础的用户登录和用户管理功能的实现。下面我们将详细探讨这些技术及其在项目中的应用。 **Spring框架**: Spring...
通过这样的示例,你可以学习如何在Windows环境下构建和运行Android游戏,了解cocos2dx在不同平台上的兼容性和调试技巧。 总的来说,cocos2dx提供的源码和Android工程创建能力,使得开发者能够轻松地进行跨平台游戏...
ASP.NET在线学习平台网站源码是一个专为学院内部管理设计的Web应用程序,它利用Microsoft的ASP.NET技术构建,提供了一套完整的在线学习和管理解决方案。ASP.NET是.NET框架的一部分,是一个用于构建动态网站、Web应用...
渗透学习测试网站源码打包带走。asp+access注入源码;cookie注入源码;DVWA漏洞平台测试源码;ewebeditor;fck漏洞源码;fck突破“_”源码;iis6.0解析漏洞源码;phpcms2008;php包含漏洞源码;WEBSHELL箱子系统V2.0...
易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶返回命令方法2源码.rar 易语言学习进阶...
在这个"Unity3D制作的2D游戏"项目中,包含的源码为我们提供了深入理解Unity2D游戏开发的宝贵机会。 Unity3D的2D功能非常强大,它支持精灵(Sprites)、物理引擎、2D光照、UI系统等,这些都是制作2D游戏的关键元素。...
24个scratch小游戏源码,scratch小游戏合集,适合小朋友自学。新颖、有趣的游戏制作可以激发孩子的学习兴趣。 包括: BoatRace.sb2 俄罗斯方块.sb2 养鱼.sb2 坦克大战.sb2 太空迷航.sb2 夹娃娃.sb3 宇宙大战.sb2 小猫...
花了两周时间,参考的商业游戏《武士2》源码,在5.61版本上进行重写,改写,高度还原了原始的作品。学习到了挺多的东西,真正的感受到商业代码框架的优良,重点学习了动画系统,由于是老版的动画,所以参考的写了一...
通过对比分析这些源码,开发者可以学习到不同的游戏设计思路,理解不同游戏机制的实现方式,从而提高自身的游戏开发技能。 此外,"已解密"的标签意味着这些源码已经去除了保护措施,可以直接阅读和学习,这对于初学...