`
cwqcwk1
  • 浏览: 86807 次
文章分类
社区版块
存档分类
最新评论

探讨erlang消息选择性接收特性

 
阅读更多
从 rabbitMQ 代码中找到 gen_server2 , 对gen_server进行了一些优化。看到前辈写的博文也提到这个,引发了我的思考。见gen_server2 - OTP gen_server优化版

gen_server2 引发的思考

正如 litaocheng 所说的:
gen_server 和 gen_server2 最大的不同是:
gen_server2 收到任何一条消息放到外部的队列中,当VM内部消息队列为空后,才进行消息处理,继续循环
gen_server 收到任何一条消息后,立即进行处理,处理完成后继续循环

其次,还有一个很重要的不同点:
gen_server2 使用的外部队列是带优先级排序的,功能模块本身可以定制消息优先级,甚至直接抛弃消息。(导出 prioritise_call/prioritise_cast/prioritise_info 几个函数实现定制,返回的数值越大优先级越高,返回drop就抛弃消息)
最高优先级是 infinity, 在处理 {system, _From, _Req} 和 {'EXIT', Parent, _R} 使用了这个优先级。

但在博文也引用了Joe' Bog 在merle所做的测试,看了merle 的代码,没有用到 prioritise_XXX 函数,说明没有明显利用到 gen_server2 优先级控制的好处,那为什么能获得不错的效果?(见下图)

讨论 gen_server2 的测试

通过阅读 Joe 在Github写的 merle 代码,很快发现问题:
可以看出,merle 在 handle_call 时都会调用 send_generic_cmd 、send_get_cmd 等类似函数。这些函数实现上都会阻塞进程直到接收到某些特定消息。
下面以 send_generic_cmd 为例做说明:
send_generic_cmd(Socket, Cmd) ->
gen_tcp:send(Socket, <<Cmd/binary, "\r\n">>),
Reply = recv_simple_reply(),
Reply.


recv_simple_reply() ->
receive
{tcp,_,Data} ->
string:tokens(binary_to_list(Data), "\r\n");
{error, closed} ->
connection_closed
after ?TIMEOUT -> timeout
end.
另外,gen_tcp:send 在实现上也 receive 等待某个特定信息,见 prim_inet:send(Socket, Packet, Opts)
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
try erlang:port_command(S, Data, OptList) of
false -> % Port busy and nosuspend option passed
{error,busy};
true ->
receive
{inet_reply,S,Status} ->
Status
end
catch
error:_Error ->
{error,einval}
end.
或许可能有读者不明白,这里说的等待某个特定消息是指选择性接收。具体例子如下:
选择性接收:
receive
{ok, Result} ->
Result
end.

非选择性接收:
receive
Info ->
Info
end.
选择性接收只针对某类信件,会一直阻塞住直到找到该信件为止,或者超时。非选择性接收的结果是每条消息都会被消费掉,方式为先进先出,不存在扫描信箱的问题。

前面提到,merle 在 handle_call 时都会 receive 住,等待某个特定消息。这个的代价就是每次receive住,erlang VM都要扫描进程整个信箱队列。特别像 Joe 在做此类测试时,消息处理速度远远低于消息投递速度,换句话说,gen_server进程信箱前面所有大部分的信件都是作者自己发的 gen_server:call 请求消息,然后每次 receive 住都要匹配这些消息。
比如, Joe 测试的是 merle:getkey 操作,那么信箱大部分消息就是 gen_server:call 投递的 getkey 消息,而 handle_call 在处理时就要扫描完前面的getkey消息,才能得到想要的 {tcp,_,Data} 消息。进程信箱消息队列如下所示:
getkey
getkey
getkey
...
getkey
{tcp,_,Data}
...
换成 gen_server2 的方式,gen_server2 会清空消息队列,那么进程信箱消息队列如下所示:
getkey
getkey
{tcp,_,Data}
...
前面还有2个getkey表示 gen_server2清空后在 handle_call 处理过程中 gen_server:call 又投递了新的 getkey 消息,数据量对比 gen_server来说可以说是极少了,所以,消息匹配的次数就少了很多,这就会出现 Joe 测试的结果。

讨论erlang消息选择性接收

在讨论这个问题之前,先引用 learnyousomeerlang 对消息选择性接收的介绍(原文),很生动具体。
When there is no way to match a given message, it is put in asave queueand the next message is tried. If the second message matches, the first message is put back on top of the mailbox to be retried later.

就是说,erlang消息匹配不上,就会把消息放到 Save Queue 的队列中,当匹配到了后再把消息放回进程信箱。以上是形象化的说法,如果是这样就必然存在消息入列出列开销,那VM到底是不是这样实现呢?

所以,对于选择性接收,这里取3个问题出来讲:
1、上面提到的,消息 Save Queue 是否存在入列出列开销
2、当选择性接收时,新消息到来时会不会重复扫描信箱前面匹配不上的消息
3、假设第2点不存在重复扫描,那么如果消息已经匹配到了,再匹配多一次这个消息,会不会重复扫描前面的消息

带着上面的疑问,下面以一个简单的例子做说明
-module(test).
-compile(export_all).
t() ->
receive
ok ->
ok
end.
保存为test.erl,然后编译,生成opcode
1> c(test).
{ok,test}
2> erts_debug:df(test).
ok
在目录下找到生成的 test.dis,t() 函数opcode如下:
04BE84B0: i_func_info_IaaI 0 test t 0
04BE84C4: i_loop_rec_fr f(04BE84EC) x(0)
04BE84CC: i_is_eq_exact_immed_frc f(04BE84E4) x(0) ok
04BE84D8: remove_message
04BE84DC: move_return_cr ok x(0)
04BE84E4: loop_rec_end_f test:t/0
04BE84EC: wait_locked_f test:t/0
逐行解释这段代码:
i_loop_rec_fr
receive接收信息,如果有信息放到 x(0) 寄存器,继续下一条指令;没有消息就跳到地址 04BE84EC,即 wait_locked_f
i_is_eq_exact_immed_frc
匹配 x(0)寄存器的值和ok是否相等,如果相等继续下一条指令;否则跳到04BE84E4,即 loop_rec_end_f
remove_message
移除进程消息队列中“当前”的信息(也就是上一行匹配到的信息)
move_return_cr
将 ok 送到 x(0)寄存器并返回结果
loop_rec_end_f
将“当前消息”指针指向下一个位置,如果指向位置有消息,则跳到test:t/0第一段代码地址继续执行,即 04BE84C4;否则继续执行下一条指令 04BE84EC,即 wait_locked_f
wait_locked_f
阻塞当前进程,等待下一次调度,再检查是否有新的消息到达
以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h实现,其他在 beam_emu.c 实现,都可以找相关代码。

/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 i_loop_rec_fr 处理过程
 * 作用是从信箱取出一条消息放到 x(0) 寄存器;没消息则跳到 wait或者 wait_timeout指令
 */ 
 OpCase(i_loop_rec_fr):
 {
     BeamInstr *next;
     ErlMessage* msgp;

 loop_rec__:

     PROCESS_MAIN_CHK_LOCKS(c_p);

     // 取出“当前位置”的消息
     msgp = PEEK_MESSAGE(c_p);

     if (!msgp) { //如果消息不存在,尝试从SMP下public queue获取消息
#ifdef ERTS_SMP
	 erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	 if (ERTS_PROC_PENDING_EXIT(c_p)) {
             // 如果进程准备退出,则不处理消息了
	     erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	     SWAPOUT;
	     goto do_schedule;  // 等待下一次调度
	 }
	 // SMP下把消息移到进程私有堆尾部(纯指针操作)
	 ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
	 
	 // 再尝试取出“当前位置”的消息
	 msgp = PEEK_MESSAGE(c_p);
	 if (msgp)
	     erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	 else
#endif
	 {
             // 信箱没消息则跳到 wait或者 wait_timeout指令(实际上就是执行下一条执行)
	     SET_I((BeamInstr *) Arg(0));
	     Goto(*I);
	 }
     }
     // 解析分布式消息,把消息附加的数据复制到进程私有堆
     ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS,
				   {
				       SWAPOUT;
				       reg[0] = r(0);
				       PROCESS_MAIN_CHK_LOCKS(c_p);
				   },
				   {
				       ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
				       PROCESS_MAIN_CHK_LOCKS(c_p);
				       r(0) = reg[0];
				       SWAPIN;
				   });
     if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
	 /*
	  * 如果消息损坏就移除(出现这种情况是分布式消息解码出现错误)
	  */
	 ASSERT(!msgp->data.attached);
	 UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重将“当前”位置指向下一条消息
	 free_message(msgp); // 销毁消息
	 goto loop_rec__; // 跳到上面继续
     }
     PreFetch(1, next); // 标记下一条指令位置
     r(0) = ERL_MESSAGE_TERM(msgp);
     NextPF(1, next);   // 执行下一条指令
 }
来看下这两个宏定义:
/* Get "current" message */
#define PEEK_MESSAGE(p)  (*(p)->msg.save)
从字面上就知道这个宏是取"当前的"消息,取了 msg.save 的值
#define UNLINK_MESSAGE(p,msgp) do { \
     ErlMessage* __mp = (msgp)->next; \
     *(p)->msg.save = __mp; \
     (p)->msg.len--; \
     if (__mp == NULL) \
         (p)->msg.last = (p)->msg.save; \
     (p)->msg.mark = 0; \
} while(0)
这个宏就是移除消息操作,消息队列长度-1,把 msg.save 指向了 msgp的下一条消息;如果 msgp->next 为 NULL,表示这是最后一条消息,就把 msg.last 等于了 msg.save
/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 remove_message  处理过程(已删除不必要的代码)
 * 作用是将消息从信箱队列中移除
 */ 
 OpCase(remove_message): {
     BeamInstr *next;
     ErlMessage* msgp;

     PROCESS_MAIN_CHK_LOCKS(c_p);

     PreFetch(0, next);
     msgp = PEEK_MESSAGE(c_p); // 取出当前的消息

     if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
	 save_calls(c_p, &exp_receive);
     }
     if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
	     SEQ_TRACE_TOKEN(c_p) = NIL;
     } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
	     // 追踪调试内容,可以忽略
	     Eterm msg;
	     SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
	     c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
	     if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
		 c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
	     }
	     msg = ERL_MESSAGE_TERM(msgp);
	     seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, 
			      c_p->common.id, c_p);
     }
     UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重队列长度-1
     JOIN_MESSAGE(c_p); // 重置“当前”位置,指向了队列第一条消息
     CANCEL_TIMER(c_p);
     free_message(msgp); // 销毁消息

     ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
     PROCESS_MAIN_CHK_LOCKS(c_p);

     NextPF(0, next); // 执行下一条指令
 }
所以,当消息匹配时,就会重新指向了信箱第一条消息,这样,第3个问题就有了答案,会重新扫描信箱。
再来看看这个宏:
/* Reset message save point (after receive match) */
#define JOIN_MESSAGE(p) \
     (p)->msg.save = &(p)->msg.first
这个宏就是讲msg.save 指向了 msg.first ,就是第一个消息
下面看下消息不匹配的情况就是loop_rec_end_f
/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 loop_rec_end_f   处理过程
 * 作用是继续取出最新的消息匹配
 */ 
 /*
 * Advance the save pointer to the next message (the current
 * message didn't match), then jump to the loop_rec instruction.
 */
 OpCase(loop_rec_end_f): {
     SET_I((BeamInstr *) Arg(0));
     SAVE_MESSAGE(c_p);  // “当前”位置指向下一个位置
     goto loop_rec__;  // 继续取出消息匹配
 }
这个opcode实现了不断取消息出来匹配的过程,直到失去调度机会,等待下一次调度。
也看下这个宏:
/* Save current message */
#define SAVE_MESSAGE(p) \
     (p)->msg.save = &(*(p)->msg.save)->next
这个宏就是将msg.save 指向了下一个位置。
到这里第1个问题和第2个问题都有答案了,前面说到的 Save Queue 只是“形象化”的队列,实际不存在,这里,VM利用一个Save指针,记录当前匹配消息的位置,所以不存在消息入列出列的开销问题。然后第二个问题,消息选择性接收,当消息匹配不上,有新消息到来时不会重复扫描信箱前面匹配不上的消息。

总结

针对erlang选择性接收的问题,gen_server2给我们一个方向,通过外部队列减少了消息的匹配,而且控制优先级来控制消息的处理。
这里也说说 gen_server2 的副作用:
gen_server2会带来一种问题,erlang原来会利用进程信箱长度来抑制发送者进程(通过减少消息发送者进程的调度机会 Reduction,可以参考这篇文章《erlang send剖析及参数意义》)。但是,gen_server2 每次都会清空进程信箱的消息队列,无法利用到 VM 提供的抑制消息队列过快暴涨的保护机制。
针对这个问题,gen_server2 通过 prioritise_XXX 函数向外部模块暴露消息队列长度,使调用者可以根据消息队列长度控制是否丢弃消息,以实现对消息的抑制。

实际上,gen_server 在我们的开发中就够用了,很少需要去考虑erlang选择性接收的问题。rabbitMQ是针对消息队列的处理,必然有成千上万的消息量,那才正好需要 gen_server2 的作用。如果也到了这种消息量,那就建议使用 gen_server2

参考:http://blog.csdn.net/mycwq/article/details/44049749

分享到:
评论

相关推荐

    RabbitMQ+erlang

    API允许创建连接、声明交换机和队列、发布和接收消息等操作。 ### 5. 消息确认机制 RabbitMQ支持消息确认,允许发送者确保消息已被正确处理。当消费者确认消息时,RabbitMQ会释放相应的内存和磁盘空间,防止消息...

    Erlang语言介绍[E文]下

    - **选择性接收**:可以通过模式匹配和守卫表达式来指定想要处理的消息类型。不匹配的消息会保留在队列中。 #### 七、总结 通过以上介绍,我们可以看到 Erlang 在并发方面的强大之处。其独特的进程模型和消息传递...

    RabbitMQ_ErLang.zip

    由于Erlang的并发特性和容错能力,RabbitMQ能够高效地处理大量并发连接和消息,同时保证服务的稳定性。 RabbitMQ支持多种消息协议,其中最常用的是Advanced Message Queuing Protocol (AMQP)。AMQP定义了一套通用的...

    Erlang入门:构建application练习4(进程link的作用)

    在Erlang编程语言中,进程是其核心特性之一,它们是并发执行的实体,类似于其他语言中的线程。在Erlang中,进程间通信(IPC)是通过消息传递来实现的,而`link`机制是这个通信模型中非常重要的一部分。本教程将通过...

    RabbitMQ和Erlang开发应用

    Erlang以其轻量级进程、内置并发支持和热代码替换特性,成为了构建高可用性系统的选择。RabbitMQ正是基于Erlang构建的,利用Erlang的这些优势,提供了稳定、高效的消息传递服务。 **RabbitMQ安装** 安装RabbitMQ...

    RabbitMQ+erlang.zip

    RabbitMQ选择了Erlang作为其基础,是因为Erlang的这些特性恰好满足了消息队列系统的需求。以下是RabbitMQ的一些关键功能和概念: 1. **Exchange**:交换器是RabbitMQ的核心组件,它接收来自生产者的消息并根据预...

    erl_nif 扩展erlang的另外一种方法

    本文将深入探讨`erl_nif`,了解它是如何扩展Erlang的功能,并讨论如何使用它来提升性能。 `erl_nif`(Erlang NIF,Native Implemented Functions)允许开发者用C语言编写函数,这些函数可以无缝地在Erlang虚拟机...

    why_i_choose_Erlang

    本文将详细探讨Erlang为何成为众多开发者的首选,并深入分析其设计理念、优势及其面临的挑战。 #### Erlang为何受到关注? ##### 1. 业界趋势与Erlang的问题域 随着计算能力的提升和技术进步,如多核处理器的普及...

    Erlang-game-server开发实践.zip

    在本文中,我们将深入探讨如何使用Erlang语言进行游戏服务器的开发实践。Erlang是一种功能强大的并行计算和分布式系统编程语言,以其在高并发、容错性和实时性方面的出色性能,被广泛应用于构建大规模、高可用性的...

    Thinking in Erlang

    每个进程都有自己的状态,并通过接收消息来改变状态,类似于面向对象编程中的实例方法调用。 **通用服务器**:通用服务器是一种常见的设计模式,用于创建可复用的服务组件。本书详细介绍了如何构建可靠的通用服务器...

    erlang与delphi多客户端通讯

    Erlang的 OTP(开放电信平台)框架提供了强大的进程间通信机制,如消息传递、轻量级进程和分布式节点通信,这使得它成为构建大规模并发系统的理想选择。在多客户端通信场景下,Erlang 的进程模型可以很好地处理来自...

    erlang与C#一次通信.rar

    Erlang是一种面向并发、容错性强的编程语言,常用于构建高可用性的系统;而C#则是一款广泛应用于Windows平台,具有强大库支持的.NET语言。两者之间的通信可以通过多种方式实现,这里我们主要关注通过网络协议进行...

    erl_nif_rustler_过程宏写法

    Rust以其类型安全和内存管理特性,成为构建NIFs的理想选择,因为它可以防止常见的C/C++中的悬挂指针和内存泄漏问题。 要开始创建一个Rust-based NIF,你需要安装必要的工具链,包括Rust环境和Erlang SDK。一旦安装...

    Erlang中文手册

    - **内容**: 覆盖了基础语法和概念,但未深入探讨复杂结构和高级特性。 **1.1.2 其它方面** - **未涵盖的主题**: - 参考文献、本地错误处理、单向连接、二进制数据处理等。 - 外部通信接口(如与其他语言编写的...

    Erlang模型在坐席规模预测中的研究与改进.pdf

    本文探讨了Erlang模型在呼叫中心坐席规模预测中的应用及改进措施。通过对比不同排队论模型,选择了更适合实际情况的Erlang A模型来进行预测,并详细介绍了预测过程。通过对大量历史数据的深入分析以及对企业业务特点...

    Erlang中的socket编程简单例子

    本文将深入探讨Erlang中的TCP和UDP socket编程,以及如何实现简单的echo服务器和客户端。 首先,我们需要了解Erlang中处理TCP socket的gen_tcp模块。gen_tcp模块提供了用于创建TCP连接的接口,它允许我们监听端口、...

    RabbitMQ实战高效部署分布式消息队列.pdf+rabbitmq学习手册.pdf

    在本文中,我们将深入探讨RabbitMQ的核心概念、功能特性、部署策略以及如何通过实战来高效地部署分布式消息队列。 1. **RabbitMQ核心概念** - **Broker**: RabbitMQ服务器本身就是一个消息broker,它负责接收、...

    gen_server:Erlang 的 gen_server 的(不完整的)OcamlAsync 实现

    在OCaml/Async中,我们可以创建一个接收消息并根据其类型调用适当处理函数的函数。例如: ```ocaml let rec loop state = Async.Core.wait_and_wake (function | Call message -&gt; handle_call message state | ...

Global site tag (gtag.js) - Google Analytics