`
wqtn22
  • 浏览: 101054 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

异步gen_server进行port访问时性能严重下降的原因和应对方法(二)成因

 
阅读更多

上文介绍了简单的问题场景,现在来分析下产生的原因。

consumer进程里面调用file:write和gen_tcp:send来处理消息:

 

handle_msg(Type, Msg, #state{file = File, socket = Socket}) when is_binary(Msg) ->

    case Type of

        file -> file:write(File, Msg);

        net -> gen_tcp:send(Socket, Msg)

    end;

handle_msg(Type, Msg, State) ->

    handle_msg(Type, term_to_binary(Msg), State).

这两个函数都是通过erlang的port体系实现的:

file:open返回一个文件描述符,如果选项包含raw,那么这个描述符将是一个record,定义为:

kernel/include/file.hrl

 

-record(file_descriptor,

{module :: module(),     % Module that handles this kind of file

data   :: term()}).     % Module dependent data

通常为#file_descriptor{module = prim_file, data = {Port, Number}},其中Port为文件对应的erlang port,Number为文件在erlang虚拟机进程中的描述符,原进程内部调用prim_file的各个接口;

如果选项不包含raw,这个描述符为pid(),这个描述符Pid对应了一个file_io_server进程,该进程代理原进程调用prim_file的各个接口;

gen_tcp:connect返回一个套接字描述符,这个套接字本身就是一个erlang port。

在erlang虚拟机中,文件对应的port_driver为efile,描述符为efile_driver_entry,而tcp套接字对应的port_driver为tcp_inet,描述符为tcp_inet_driver_entry。

通过port体系实现的接口,都需要先打开port,在实现具体功能时,都需要两步动作完成一个请求:

erlang:port_command(Port, Data).

receive

    {MsgType, MsgBody} -> handle_port_return(MsgType, MsgBody)

end.

上述动作都需要在调用者进程中完成。

例如,file:write最终定位到prim_file:write,其实现如下:

prim_file.erl

write(#file_descriptor{module = ?MODULE, data = {Port, _}}, Bytes) ->

    case drv_command(Port, [?FILE_WRITE,Bytes]) of

{ok, _Size} ->

   ok;

Error ->

   Error

    end.

drv_command(Port, Command) -> drv_command(Port, Command, undefined).

drv_command(Port, Command, R) when is_binary(Command) ->

    drv_command(Port, Command, true, R);

drv_command(Port, Command, R) ->

    try erlang:iolist_size(Command) of

_ -> drv_command(Port, Command, true, R)

    catch error:Reason -> {error, Reason}

    end.

drv_command(Port, Command, Validated, R) when is_port(Port) ->

    try erlang:port_command(Port, Command) of

true -> drv_get_response(Port, R)

    catch

error:badarg when Validated -> {error, einval};

error:badarg ->

   try erlang:iolist_size(Command) of

_ ->  {error, einval}

   catch error:_ -> {error, badarg}

   end;

error:Reason -> {error, Reason}

    end;

drv_command({Driver, Portopts}, Command, Validated, R) ->

    case drv_open(Driver, Portopts) of

{ok, Port} ->

   Result = drv_command(Port, Command, Validated, R),

   drv_close(Port),

   Result;

Error ->

   Error

    end.

 

drv_get_response(Port, R) when is_list(R) ->

    case drv_get_response(Port) of

ok ->

   {ok, R};

{ok, Name} ->

   drv_get_response(Port, [Name|R]);

Error ->

   Error

    end;

drv_get_response(Port, _) ->

    drv_get_response(Port).

drv_get_response(Port) ->

    erlang:bump_reductions(100),

    receive

{Port, {data, [Response|Rest] = Data}} ->

   try translate_response(Response, Rest)

   catch

error:Reason ->

   {error, {bad_response_from_port, Data, {Reason, erlang:get_stacktrace()}}}

   end;

{'EXIT', Port, Reason} -> {error, {port_died, Reason}}

    end.

 

而gen_tcp:send最终定位到prim_inet:send,其实现如下:

 

send(S, Data, OptList) when is_port(S), is_list(OptList) ->

    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),

    try erlang:port_command(S, Data, OptList) of

false -> % Port busy and nosuspend option passed

   ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),

   {error,busy};

true ->

   receive

{inet_reply,S,Status} ->

   ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),

   Status

   end

    catch

error:_Error ->

   ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),

    {error,einval}

    end.

send(S, Data) ->

    send(S, Data, []).

 

他们都遵从了上面的编写模式。这个模式看起来并没有什么,但是应用在异步环境里面,却会引起一些问题。

上述模式最根本的问题在于,port发送完命令后,紧接着进行了一次receive,而该receive过程中包含了一个match过程,这个match过程是对进程的消息队列进行遍历,直到找到可以匹配到的消息。

为了更清晰的看清楚这个问题,编写两段段简单的代码,使用erlc +"'S'" x.erl编译,生成它们的抽象码:

 

receive_match() ->

    receive

        {Type, Data} -> {Type, Data}

    end.

 

receive_fetch() ->

    receive

        Msg -> Msg

    end.

抽象码为:

 

01{function, receive_match, 0, 2}.

02  {label,1}.

03    {func_info,{atom,rc},{atom,receive_match},0}.

04  {label,2}.

05    {loop_rec,{f,4},{x,0}}.

06    {test,is_tuple,{f,3},[{x,0}]}.

07    {test,test_arity,{f,3},[{x,0},2]}.

08    {get_tuple_element,{x,0},0,{x,1}}.

09    {get_tuple_element,{x,0},1,{x,2}}.

10    remove_message.

11    {test_heap,3,3}.

12    {put_tuple,2,{x,0}}.

13    {put,{x,1}}.

14    {put,{x,2}}.

15    return.

16  {label,3}.

17    {loop_rec_end,{f,2}}.

18  {label,4}.

19    {wait,{f,2}}.

20

21{function, receive_fetch, 0, 6}.

22  {label,5}.

23    {func_info,{atom,rc},{atom,receive_fetch},0}.

24  {label,6}.

25    {loop_rec,{f,7},{x,0}}.

26    remove_message.

27    return.

28  {label,7}.

29    {wait,{f,6}}.

首先看receive_match的抽象码,其抽象码解释如下:

05 loop_rec指令接收消息,若进程消息队列没有消息,跳至{label,4}处wait,若有消息,将消息移动到进程堆上,绑定到变量{x,0}

06 测试消息是否为元组tuple,若不是跳至{label,3}处

07 测试消息是否为二元元组,若不是跳至{label,3}处

08-09 此时消息已经匹配,可以做后续处理,将元组的两个元绑定到{x,1}和{x,2}上

10 消息已经匹配,将其从消息队列中移除

11-15 正常处理流程

16-17 此处表示,消息队列中有消息,但是并不是最近一次receive需要的消息,此时要做的是将该消息放回到进程消息队列上,并跳转回05的{label,2}处重新取下一条消息

18-19 此处表示,消息队列中没有消息,需要重新跳转回05的{label,2}处取下一条消息

这里可以发现,receive_match函数需要扫描一遍进程消息队列,从中取出符合要求的消息;

接着看receive_fetch的抽象码,其抽象码解释如下:

24-29 若消息队列中没有消息,则继续等待,若有消息,则取出消息,将消息移动到进程堆上,绑定到变量{x,0};

这里可以发现,receive_fetch不会扫描进程消息队列,直接取出下一条消息。

上述抽象码对应的erlang虚拟机c代码此处不再分析,其执行过程即如上所述,有兴趣的读者可以自行查阅。

这里便可以解释之前场景的问题了:

consumer的进程内部调用file:write或gen_tcp:send,然后将形成一个receive_match的模式,遍历进程消息队列,然后取出需要的消息;另一方面,生产者却没有任何顾忌的往consumer进程中投递消息,如果处理(也即调用file:write或gen_tcp:send)速度慢于消息投递速度(通常情况下总是如此,gen_server:cast的qps在本地可以轻松达到40-80w),则consumer的消息队列将不断增大,receive_match模式却仍然需要遍历整个消息队列,从而导致处理速度进一步下降,消息队列进一步增大,陷入一个恶性循环。

实际应用中,也有会遭遇这种典型的场景:进程允许异步投递, 但进程内部有调用port(receive_match)的模式出现 。

未完待续...

分享到:
评论
1 楼 hoterran 2012-07-09  
嘿嘿,上次就这个问题我看erlang代码看了很长时间.

看你的分析再复习一下,很有收获~~~~~

相关推荐

    gen_server tasting 之超简单名称服务

    它可能包含了自己的`gen_server`回调函数,并在启动时通过名称服务进行注册,以便其他部分的系统可以找到并与其交互。 此外,`gen_server`通常与`gen_event`配合使用,后者提供了事件管理功能,可以用来处理日志、...

    gen_tags.vim, 用来轻松使用 ctags/gtags的vim和neovim的异步插件.zip

    gen_tags.vim, 用来轻松使用 ctags/gtags的vim和neovim的异步插件 gen_tags.vim 为方便用户使用 Vim/ NeoVim,简化了 ctags/ gtags的使用。它用于为你生成和维护多个平台支持的标签,在 Windows/Linux/macOS. 上测试...

    C#实现异步连接Sql Server数据库的方法

    在C#编程中,异步连接SQL Server数据库是提高应用程序性能和用户体验的关键技术。这是因为异步操作允许程序在等待数据库响应时执行其他任务,而不是阻塞主线程,从而避免了UI冻结或整体性能下降。本篇文章将深入探讨...

    Python库 | bareASGI_auth_server-4.0.0-py3-none-any.whl

    ASGI是Python Web服务器和框架之间的一个协议,旨在提供一种更高效、灵活的异步处理方式,与传统的WSGI(Web Server Gateway Interface)相比,ASGI允许Web应用更好地利用多核CPU和非阻塞I/O,从而提高性能和响应...

    iocp_server_client 源代码

    【标题】"IOCPServerClient源代码"是一个关于网络编程的资源,主要涉及了IO完成端口(IOCP,Input/Output Completion Port)技术在服务器...同时,这也是一个很好的实践机会,帮助你更好地理解和应对网络编程中的挑战。

    gen_tags.vim:用于vim和neovim的异步插件,可简化ctagsgtags的使用

    《gen_tags.vim:异步增强Vim与Neovim的ctags与gtags体验》 在编程领域,代码导航工具对于提升开发效率至关重要。Vim 和 Neovim,作为深受程序员喜爱的文本编辑器,提供了强大的ctags和gtags功能,帮助用户快速跳转...

    tcp_server_honorus2_pythonTCP_tcp_tornado_pythonserver_

    例如,可以使用`@gen.coroutine`装饰器和`yield`关键字来编写异步处理函数。 9. **WebSocket扩展**:Tornado的WebSocket支持使得服务器可以直接与浏览器进行双向通信,常用于实时应用。 10. **错误处理和日志记录*...

    C#基于Socket的TcpClient异步实现和基于Socket的TcpServer异步实现(AsyncTcp).rar

    本项目主要探讨了如何利用C#实现基于Socket的TcpClient和TcpServer的异步操作,这对于构建高性能、高并发的网络应用至关重要。下面将详细阐述这两个方面的知识点。 一、TcpClient异步实现 TcpClient类是.NET中的一...

    libserial_port.so Android串口驱动

    这个库文件是C或C++编写的原生代码,通过JNI(Java Native Interface)与Java层进行交互,为Android应用提供访问底层串口硬件的能力。 1. **Android串口驱动原理** Android系统基于Linux内核,因此其串口驱动也...

    C_SERVER_C.rar_c# socket

    本资源“C_SERVER_C.rar_c# socket”聚焦于C#语言中的异步套接字服务器和客户端的实现,这对于构建高性能、高并发的网络服务尤其重要。 首先,我们来深入了解C#中的Socket类。Socket是网络编程的基础,它允许程序...

    Erlang实战

    通过本案例的学习,我们不仅了解了如何使用Erlang构建高性能的在线IP查询服务,而且还深入学习了OTP的关键行为如gen_server、gen_fsm和gen_event的具体应用。这种实践不仅有助于初学者快速掌握Erlang的核心概念,还...

    C# 基于socket实现的异步TcpServer和TcpClient

    本文将深入探讨如何使用C#基于Socket实现异步TcpServer和TcpClient,以及这两个核心组件的工作原理。 首先,让我们了解TCP(传输控制协议)的基本概念。TCP是一种面向连接的、可靠的、基于字节流的传输层通信协议,...

    yafApi:使用yaf和swoole_http_server,专注于高性能api接口服务,异步任务..

    使用swoole_http_server + yaf 专注于网站api接口,同时能处理异步任务,如异步发送邮件,发送短信验证码等耗时相对长一点的任务 本项目兼容php-fpm模式,不过在异步任务调用处要调整 参照 参照 ##使用说明 //1....

    DSF+同步调用与Flower+DSF异步调用对比测试报告1

    总结来说,对比测试结果表明,Flower的DSF异步调用方式在应对服务延迟和异常时,表现出更好的性能和稳定性,为分布式系统的高可用性和容错性提供了有力保障。在实际应用中,根据系统需求和业务场景选择合适的调用...

    baserver服务器框架C++ Tcp server

    bas为boost_asio_server(baserver)的简称,是采用Half-Sync/Half-Async模式的服务器框架,使用c++实现,能够大大简化tcp server的开发工作。bas目前实现了以下功能: 1、底层基于boost及asio实现,支持ssl,跨越...

    C#异步操作 异步查询数据库 异步处理一行一行加载数据

    在C#编程中,异步操作是现代应用开发的关键特性,尤其在处理大量数据或进行I/O密集型任务时,如查询数据库。本主题将深入探讨C#中的异步概念,如何异步查询数据库,以及如何异步处理一行一行加载的数据。 首先,...

    Hessian异步请求访问包

    本知识点主要关注的是在Android平台上,如何使用Hessian进行异步请求访问,以便提高应用程序的性能和用户体验。 Hessian是由Caucho公司开发的一种轻量级、高效的RPC(Remote Procedure Call)协议。它将Java对象...

    C# Socket 异步双工通讯示例

    本示例将深入探讨C#中的Socket异步双工通讯,这种模式允许同时进行发送和接收操作,提高应用程序的效率和响应性。 首先,我们来看“异步”这个词。在C#中,异步编程是一种避免阻塞主线程的技术,它允许可并发执行多...

Global site tag (gtag.js) - Google Analytics