`
AvinDev
  • 浏览: 112395 次
社区版块
存档分类
最新评论

Erlang中频繁发送远程消息要注意的问题

阅读更多
注:这篇文章可能会有争议,欢迎提出意见

在Erlang中,如果要实现两个远程节点之间的通信,就需要通过网络来实现,对于消息发送,是使用TCP。如果要在两个节点间频繁发送消息,比如每秒几百上千条,那样就要注意了。

无论是网游服务器开发的书籍,或是经验老道的工程师,都会告诉你,在发送数据包时,尽可能把小的消息组合为一个比较大的包来发送,毕竟一个TCP包的头也很大,首先是浪费带宽,其次调用底层发送的指令也是有开销的。有工程师告诉我,一般每秒大概是2W次左右。

简单测试一下,先是代码

一个接收消息并马上抛弃的Server:
start() ->
    register(nullserver, self()),
    loop().

loop() ->
    receive
	Any ->
	    loop() %drop message and loop
    end.


一个在循环中向它发送消息的Client:
start() ->
    start_send(100).

start_send(0) ->
    ok;
start_send(N) ->
    {nullserver, 'foo@192.168.0.3'} ! hi,
    start_send(N-1).


然后打开截包工具,运行server和client,截取到接近200个包的发送和接收记录,其中,大部分是这样的数据:

引用
00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00
00 45 EE 77 40 00 80 06-80 E4 C0 A8 00 CC DB E8
ED F9 13 58 C1 C6 AA 4E-59 F2 38 CF 22 2D 50 18
FF 19 B9 EE 00 00 00 00-00 19 70 83 68 04 61 06
67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD
83 43 BF                                     


引用
00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00
00 45 EE 78 40 00 80 06-80 E3 C0 A8 00 CC DB E8
ED F9 13 58 C1 C6 AA 4E-5A 0F 38 CF 22 2D 50 18
FF 19 B9 D1 00 00 00 00-00 19 70 83 68 04 61 06
67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD
83 43 BF                                      



实际上,只有从 00 00-00 19 这里开始,才是TCP包的内容,前面都是底层协议的数据,就是这样的数据包发送了100次,浪费是巨大的。而且,在消息发送后,还收到同样数目类似

引用
00 11 11 9F 91 1A 00 14-78 B9 14 BC 08 00 45 00
00 28 8C FC 40 00 32 06-30 7D DB E8 ED F9 C0 A8
00 CC C1 C6 13 58 38 CF-22 2D AA 4E 59 F2 50 10
19 20 D7 01 00 00 00 00-00 00 00 00 
         

这样的响应包,也浪费着带宽。


从目前我所阅读过的文档来看,暂时没有有关如何缓存这些消息定期一并发送的参数设置。那么有什么解决办法,我自己有两种。

一种是将要发送的一批Message打包到一个list发送,接收方从list中取出所有message并处理。

另一种是通过一个Proxy,发送方不通过 {Name, Node} ! Message 这种方式来发送,而是通过一个本地的Proxy Process,代理会将所有发送到某个节点的消息累积起来,定时批量发送过去;接收方也有一个Listening Process,它接收批量的Message,遍历后发送给本地的相应进程。

这里是我初步写出来的实现,不太漂亮,仅供参考~

message_agent.erl: 实现消息的批量发送,接收和转发

-module(message_agent).
-export([listen/0, proxy/2, block_exit/1]).
-export([loop_receive/0]).
-define(MAX_BATCH_MESSAGE_SIZE, 50).

listen() ->
    io:format("Message agent server start listen~n"),
    spawn(fun() -> register('MsgServerAgent', self()), loop_receive() end),
    ok.

loop_receive() ->
    receive
	{forward_message, PName, Messages} ->
	    forward_messages(PName, Messages),
            loop_receive();
	Any ->
	    message_agent:loop_receive()
    end.

forward_messages(PName, []) ->
    ok;
forward_messages(PName, [H|T]) ->
    %io:format("Forward message ~w to process ~w~n", [H, PName]),
    catch PName ! H,
    forward_messages(PName, T).


proxy(Node, PName) ->
    spawn_link(fun() -> handle_message_forward(Node, PName, []) end).

block_exit(Agent) ->
    Agent ! {block_wait, self()},
    receive
	{unblock} ->
	    ok
    end.

handle_message_forward(Node, PName, Messages) ->
    receive
	{block_wait, Pid} ->
	    catch send_batch(Node, PName, lists:reverse(Messages)),
	    Pid ! {unblock};
	Any ->
	    NewMessages = [Any|Messages],
	    case length(NewMessages)>=?MAX_BATCH_MESSAGE_SIZE of
		true ->
		    send_batch(Node, PName, lists:reverse(NewMessages)),
		    handle_message_forward(Node, PName, []);
		false ->
		    handle_message_forward(Node, PName, NewMessages)
	    end
    after
	0 ->
	    case length(Messages)>0 of
		true ->
		    catch send_batch(Node, PName, lists:reverse(Messages));
		false ->
		    ok
	    end,
	    handle_message_forward(Node, PName, [])
    end.

send_batch(Node, PName, Messages) ->
    %io:format("Send batch message, size ~p~n", [length(Messages)]),
    {'MsgServerAgent', Node} ! {forward_message, PName, Messages}.




使用方式很简单,在接收Message的一端调用 message_agent:listen() 启动监听代理,客户端使用 register(agent, message_agent:proxy(?NODE, 'MsgServer')) 的方式启动代理进程,消息发送给这个代理进程就可以了。下面是我写的简单例子:

-module(message_server).
-export([start/0]).
-define(TIMEOUT_MS, 1000).

start() ->
    io:format("Message server start~n"),
    register('MsgServer', self()),
    message_agent:listen(),
    loop_receive(0).

loop_receive(Count) ->
    receive
	Any ->
	    %io:format("Receive msg ~w~n", [Any]),
            loop_receive(Count+1)
    after
	?TIMEOUT_MS ->
	    if 
		Count>0 ->
		    io:format("Previous receive msg count: ~p~n", [Count]),
		    loop_receive(0);
		true ->
		    loop_receive(0)
	    end
    end.


-module(message_client).
-define(NODE, 'msgsrv@192.168.0.3').
-define(COUNT, 20000).
-export([start/0]).

start() ->
    statistics(wall_clock),
    register(agent, message_agent:proxy(?NODE, 'MsgServer')),
    send_loop(?COUNT).

send_loop(0) ->
    message_agent:block_exit(agent),
    {_, Interval} = statistics(wall_clock),
    io:format("Finished ~p sends in ~p ms, exiting...~n", [?COUNT, Interval]);
send_loop(Count) ->
    agent ! {self(), lalala},
    send_loop(Count-1).


这里要注意的是,消息发送端和接收端都是由一个单独的进程来处理消息。在Erlang的默认堆实现,是私有堆,本地进程间的消息发送是需要拷贝的,在数据量大的时候,该进程堆的垃圾回收会相当频繁。
分享到:
评论
8 楼 lzy.je 2009-02-25  
mryufeng 写道

to lzy.je:1. 写的时候是先尝试写 对端堵塞的时候 才挂写事件的 等对端解除堵塞马上发起写操作 怎么不提高吞吐量。2. 和nagle毫无关系 完全是inet drv自己实现的!


1. 嗯,一直同意吞吐量会提高,但这种“半异步”的处理方式,对响应时间的帮助不大。个人意见。
2. 了解了。
7 楼 mryufeng 2009-02-25  
to lzy.je:
1. 写的时候是先尝试写 对端堵塞的时候 才挂写事件的 等对端解除堵塞马上发起写操作 怎么不提高吞吐量。
2. 和nagle毫无关系 完全是inet drv自己实现的!
6 楼 lzy.je 2009-02-25  
mryufeng 写道

to  lzy.je: 这个不是关闭 nagle算法 而是在内部缓存数据 登记可写事件  在对端可写的时候 才发生写操作 所以说delay 但是这样极大提高吞吐量!blog.yufeng.info


to mryufeng:
1.这种方式来提高吞吐量并不意味着响应时间的缩短,因此对于socket client来说,是不能真实性能的。这相当于引入缓存机制。
2. 想请教,这种{delay_send, Boolean}方式下,erlang实现的方式不同于这样么?
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, ....)
是完全由erlang driver内部实现?
5 楼 mryufeng 2009-02-24  
to  lzy.je: 这个不是关闭 nagle算法 而是在内部缓存数据 登记可写事件  在对端可写的时候 才发生写操作 所以说delay 但是这样极大提高吞吐量!

blog.yufeng.info
4 楼 AvinDev 2009-02-11  
是的,具体情况还要权衡
3 楼 lzy.je 2009-02-11  
AvinDev 写道

发现 inet:setopts 有一个选项可以解决这个问题:

引用{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

有空试试


这种方式将禁用Nagle算法,通过对数据合并尽管有效减少了发送的次数及数据量,但会引入一些延时。
2 楼 AvinDev 2007-06-11  
发现 inet:setopts 有一个选项可以解决这个问题:

引用
{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.


有空试试
1 楼 potian 2007-05-11  
这在Joe的新书里面叫做 socket based distribution, 代码在
http://media.pragprog.com/titles/jaerlang/code/socket_dist/目录下面,上面的连接对中国人封锁,你可以用Tor上去

相关推荐

    erlang使用post方式发送json数据

    学习erlang的时候尝试编写的小例子,使用post方式发送json数据来进行http请求,希望能帮到大家~

    Erlang并发编程,Erlang程序设计,Erlang中文手册

    使用Erlang编写出的应用运行时通常由成千上万个轻量级进程组成,并通过消息传递相互通讯。进程间上下文切换对于Erlang来说仅仅 只是一两个环节,比起C程序的线程切换要高效得多得多了。 使用Erlang来编写分布式应用...

    erlang 中文基础教程

    Erlang Shell是Erlang编程语言提供的一种交互式编程环境,允许开发者直接在命令行中编写、执行Erlang代码并观察结果。无论是Linux、UNIX还是Windows操作系统,Erlang Shell都能顺利运行。只需在命令行中输入`erl`...

    erlang中文基础教程

    用户可以在 Erlang Shell 中输入命令,例如数学运算符号,函数调用等。Erlang Shell 提供了一个交互式的环境,用户可以实时查看输出结果。 2. 顺序编程 顺序编程是 Erlang 编程语言的基础,用户可以使用 Erlang ...

    erlang编程 Introducing Erlang

    Erlang中的链接(Linking)和监控(Monitoring)机制允许进程间建立关系,以便在另一进程崩溃时得到通知。链接用于追踪相关进程的状态,而监控则可以观察进程的生存状态。 ### 5. 消息传递 Erlang的进程间通信主要...

    硝烟中的erlang

    诊断以及调试生产环境中的Erlang 系统。在程序员学习新的语言和环境时,都需要一个摸索 阶段,也就是学会在社团的帮助下,脱离指南,解决实际问题。 本书假设读者精通基本的Erlang和OTP框架。在本书中,会对一些难以...

    erlang 中进程

    在Erlang中,进程是并发执行的基本单元,它们轻量级且独立,彼此通过消息传递进行通信。下面将详细介绍Erlang中的进程以及如何使用它们进行并发开发。 1. 进程概念 在Erlang中,进程不同于操作系统中的线程或进程。...

    Erlang官网下载过慢

    标题中提到的“Erlang官网下载过慢”可能是因为网络问题或者官方服务器的繁忙导致的,这对于急需安装或更新Erlang的开发者来说是一个常见问题。在这种情况下,用户可以选择通过第三方镜像站点或者从他人分享的安装包...

    erlang9.rar

    Erlang语言的核心特点包括轻量级进程(Erlang中的进程与操作系统进程不同,它们更轻便且能快速切换)、模式匹配、函数式编程和热代码替换等。这些特性使得Erlang在处理高并发场景下表现出色,例如在电信、网络设备和...

    gcm-erlang, 用于Google云消息传递的Erlang应用程序.zip

    gcm-erlang, 用于Google云消息传递的Erlang应用程序 gcm 软件提供了一个用于 Google Cloud Messaging的Erlang客户机。,你可以对 gcm-erlang做什么:你可以以使用 gcm-erlang:启动几个代表由不同 GCM API keys 定义...

    Erlang 中的Module级别热部署

    ### Erlang中的Module级别热部署 #### 一、引言 Erlang 是一种专为构建高并发、容错性强的分布式系统而设计的编程语言。它的独特之处在于支持轻量级进程(也称为协程)和热部署能力。本文将深入探讨Erlang 中的 ...

    Erlang_CNode用户指

    CNode的主要功能包括创建Erlang进程,发送和接收消息,以及调用Erlang的函数。 **Erlang_CNode用户指南** Erlang_CNode用户指南通常会涵盖以下几个核心主题: 1. **安装和配置**:指导用户如何在系统上安装Erlang...

    Rabbit,Erlang,消息队列。

    消息队列的工作原理是生产者将消息发送到队列,消费者从队列中接收并处理消息。RabbitMQ支持多种工作模式,如简单模式、Direct模式、Fanout模式、Topic模式和Header模式,以适应不同的应用场景。例如,Direct模式...

    erlang 中文乱码

    要解决Erlang中的中文乱码问题,你需要确保以下几个方面都正确无误: 1. **文件编码**:确认你要处理的文件是以正确的编码(如UTF-8)保存的。可以使用诸如Notepad++之类的文本编辑器检查和转换文件编码。 2. **...

    erlang中dns解析

    以下是关于Erlang中DNS解析的一些核心知识点: 1. **inet** 模块:Erlang的标准库包含了`inet`模块,它提供了一系列的函数用于网络相关的操作,包括DNS查询。例如,`inet:gethostbyname/1,2`函数可以用来获取主机名...

    erlang 中文,chm参考文档

    4. **模式匹配**:Erlang中的模式匹配是其语法的一大特色,用于在函数定义中解构复杂的数据结构。 5. **错误处理**:讲解Erlang的异常处理机制,如try-catch-finally语句。 6. **模式和类型**:涵盖模式匹配和类型...

    xiandiao_erlang_Erlang课后习题_

    在Erlang中,程序是由一系列相互独立的函数构成的,它们可以并行执行,提高了系统的性能。 2. **并发与并行**:Erlang的轻量级进程(称为Erlang进程)使得并发编程变得简单。每个进程有自己的堆栈和消息队列,通过...

Global site tag (gtag.js) - Google Analytics