`
willvvv
  • 浏览: 333086 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

erlang分布式入门(四)-TCP Server的详解与改进

阅读更多

在上一篇实现的erlang分布式入门(三)-TCP Server-Client 中的accept函数如下:

 

accept(LSocket) ->  
    {ok, Socket} = gen_tcp:accept(LSocket),  
    spawn(fun() -> loop(Socket) end),  
    accept(LSocket). 

 

使用BIF的spawn方法,创建了一个新的进程loop来处理客户端连接,主要业务在loop函数中实现,然后继续accept新的客户端连接。

spawn的说明如下:

spawn(Fun) -> pid()

Types:
Fun = function()

Returns the pid of a new process started by the application of Fun to the empty list []. Otherwise works like spawn/3.

spawn(Node, Fun) -> pid()

Types:
Node = node()
Fun = function()

Returns the pid of a new process started by the application of Fun to the empty list [] on Node. If Node does not exist, a useless pid is returned. Otherwise works like spawn/3.

spawn(Module, Function, Args) -> pid()

Types:
Module = Function = atom()
Args = [term()]

Returns the pid of a new process started by the application of Module:Function to Args. The new process created will be placed in the system scheduler queue and be run some time later.

error_handler:undefined_function(Module, Function, Args) is evaluated by the new process if Module:Function/Arity does not exist (where Arity is the length of Args). The error handler can be redefined (see process_flag/2). If error_handler is undefined, or the user has redefined the default error_handler its replacement is undefined, a failure with the reason undef will occur.

> spawn(speed, regulator, [high_speed, thin_cut]).
<0.13.1>

spawn(Node, Module, Function, Args) -> pid()

Types:
Node = node()
Module = module()
Function = atom()
Args = [term()]

Returns the pid of a new process started by the application of Module:Function to Args on Node. If Node does not exists, a useless pid is returned. Otherwise works like spawn/3.

 

但是由于gen_tcp:accept是阻塞的,所以我们在服务端启动tcp_server之后没有返回值,erlang的命令行一直阻塞在那里,一直等到有客户端连接上去才有反应。

以下是accept函数的说明:

accept(ListenSocket) -> {ok, Socket} | {error, Reason}
accept(ListenSocket, Timeout) -> {ok, Socket} | {error, Reason}

Types:
ListenSocket = socket()
Returned by listen/2.
Timeout = timeout()
Socket = socket()
Reason = closed | timeout | system_limit | inet:posix()

Accepts an incoming connection request on a listen socket. Socket must be a socket returned from listen/2. Timeout specifies a timeout value in ms, defaults to infinity.

Returns {ok, Socket} if a connection is established, or {error, closed} if ListenSocket is closed, or {error, timeout} if no connection is established within the specified time, or {error, system_limit} if all available ports in the Erlang emulator are in use. May also return a POSIX error value if something else goes wrong, see inet(3) for possible error values.

Packets can be sent to the returned socket Socket using send/2. Packets sent from the peer are delivered as messages:

{tcp, Socket, Data}

unless {active, false} was specified in the option list for the listen socket, in which case packets are retrieved by calling recv/2.

 

我们知道nginx的Master进程只用于接收客户端连接,然后把连接交给worker子进程去处理。这是高并发服务器处理大量连接的很有效的方式。


以下是改进版的TCP服务端:

 

echo_server.erl

-module(echo_server).

-export([start/2, loop/1]).

%% @spec start(Port::integer(), Max::integer())  
%% @doc echo server
start(Port, Max) ->
        generic_server:start(echo_server, Port, Max, {?MODULE, loop}).

%% @spec loop(Sock::port())
%% @doc echo_server

loop(Sock) ->
        case gen_tcp:recv(Sock, 0) of
                {ok, Data} ->
                        io:format("server recv data close~p~n",[Data]),
                        gen_tcp:send(Sock, Data),
                        loop(Sock);
                {error, closed} ->
                        io:format("client sock close~n"),
                        gen_server:cast(echo_server, {connect_close, self()})
        end.

 

generic_server.erl

%% a generic tcp server
-module(generic_server).
-author('cheng litaocheng@gmail.com').
-vsn('0.1').
-behaviour(gen_server).

-define(TCP_OPTIONS, [binary, {packet, raw}, {active, false}, {reuseaddr, true}]).
-record(server_state, {
                        port,                   % listen port
                        loop,                   % the logic fun
                        ip=any,                 % ip
                        lsocket=null,           % listen socket
                        conn=0,                 % curent connect
                        maxconn                 % max connect
                        }).

%% internal export
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).

%% start tcp server 
-export([start/4]).

-export([accept_loop/5]).

%% start the generic server
start(Name, Port, Max, Loop) ->
        State = #server_state{port=Port, loop=Loop, maxconn=Max},
        io:format("max connection is ~p~n", [Max]),
        gen_server:start_link({local, Name}, ?MODULE, State, []).

%% create listen socket
init(State = #server_state{port=Port}) ->
        case gen_tcp:listen(Port, ?TCP_OPTIONS) of
                {ok, LSocket} ->
                        {ok, accept(State#server_state{lsocket=LSocket})};
                {error, Reason} ->
                        {stop, {create_listen_socket, Reason}}
        end.


%% accept spawn a new process
accept(State =#server_state{lsocket=LSocket, loop=Loop, conn=Conn, maxconn=Max}) ->
        proc_lib:spawn(generic_server, accept_loop, [self(), LSocket, Loop, Conn, Max]),
        State.

%% accept the new connection
accept_loop(Server, LSocket, {M, F}, Conn, Max) ->
        {ok, Sock} = gen_tcp:accept(LSocket),
        if
                Conn + 1 > Max ->
                        io:format("reach the max connection~n"),
                        gen_tcp:close(Sock);
                true ->
                        gen_server:cast(Server, {accept_new, self()}),
                        M:F(Sock)
        end.

%% the server receive the notify that a connect has construct
handle_cast({accept_new, _Pid}, State=#server_state{conn=Cur}) ->
        io:format("current connect:~p~n", [Cur+1]),
        {noreply, accept(State#server_state{conn=Cur+1})};

%% someone connect has been close, so change the max connect
handle_cast({connect_close, _Pid}, State=#server_state{conn=Cur}) ->
        io:format("current connect:~p~n", [Cur-1]),
        {noreply, State#server_state{conn=Cur-1}}.

handle_call(_From, _Request, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.

terminate(_Reason, _State) -> ok.
 

主要使用了erlang的gen_server内置函数,详解:http://www.erlang.org/doc/design_principles/gen_server_concepts.html

分享到:
评论

相关推荐

    erlang server源码

    Erlang Server源码分析与详解 Erlang是一种面向并发的、函数式编程语言,以其在分布式系统、高可用性和容错性方面的优势而受到广泛关注。尤其在构建大规模聊天室服务器这样的实时通信系统中,Erlang的性能表现突出...

    rabbitmq-install-3.8.3-centos7-1908.zip

    在这个案例中,文件名分别为 "rabbitmq-server-3.8.3-1.el7.noarch.rpm" 和 "erlang-22.3.2-1.el7.x86_64.rpm"。 安装步骤如下: 1. **安装 Erlang**:Erlang 是 RabbitMQ 的运行时环境,首先需要安装。在命令行中...

    erlang学习4.pdf

    ### Erlang分布式调用与TableServer支持详解 #### 前言 Erlang作为一种专为构建高并发、高可用性系统而设计的编程语言,其核心特性之一就是强大的分布式处理能力。本篇文档旨在深入解析Erlang的分布式调用机制及...

    远古封神Server(erlang源码)

    《远古封神Server(Erlang源码)详解及技术要点》 “远古封神Server”是一款基于Erlang编程语言构建的游戏服务器,它以其高效、并发能力强的特点,为游戏开发提供强大的支撑。Erlang作为一种面向分布式计算的编程...

    RabbitMq 集群搭建linux

    - **25672**:Erlang分布式的内部通信端口,默认计算公式为 AMQP 端口 + 20000。 - **15672 (Management Plugin)**:Web管理界面端口。 - **61613, 61614 (STOMP)**:STOMP协议端口。 - **1883, 8883 (MQTT)**:MQTT...

    erlang websocket

    Erlang Websocket详解:构建实时通信应用 在IT领域,Erlang作为一种并发和分布式计算的编程语言,因其在高可用性和实时性上的优势,常用于构建电信、实时系统和互联网服务。Websocket作为互联网协议,允许双向通信...

    RabbitMQ技术帮助文档

    ### RabbitMQ 技术帮助文档知识点详解 #### 一、RabbitMQ 概述 RabbitMQ 是一种基于 AMQP(高级消息队列协议)的消息中间件,它提供了一个健壮的消息传递系统,用于在分布式系统中处理消息的发布、订阅和路由。...

    tsung+erlang包

    **Tsung + Erlang 包详解** Tsung 是一个开源的多协议负载和性能测试工具,它可以模拟大量用户并发访问服务器,从而评估系统的负载能力和稳定性。Tsung 的设计目标是提供一个灵活、可扩展的解决方案,能够测试各种...

    RabbitMQ Linux 安装

    ### RabbitMQ Linux 安装详解 #### 一、概述 RabbitMQ 是一款开源的消息中间件,基于 AMQP(Advanced Message Queuing Protocol)协议。它能够帮助开发者在分布式系统之间传递消息,支持多种消息传递模式,包括...

    erlang高性能网络库esockd的编译和使用(二)

    **Erlang与ESockd库详解** Erlang是一种为构建高并发、分布式和容错系统而设计的编程语言,特别适用于实时通信系统。在Erlang中,`esockd`是一个高效的网络库,它提供了一种简单且强大的方式来处理套接字操作,使得...

    Tsung负载测试tigase

    &lt;server host="192.168.3.242" port="5222" type="tcp"&gt;&lt;/server&gt; &lt;!-- 负载配置 --&gt; &lt;users maxnumber="12000" interarrival="0.1" unit="second"&gt;&lt;/users&gt; &lt;!-- XMPP 参数配置 --&gt; &lt;!-- 其他参数...

    介绍 Erlang binary 和 bit string 数据类型的经典文章

    ### Erlang Binary与Bit String 数据类型详解 #### 引言 Erlang 是一种功能强大且灵活的编程语言,尤其适合开发高并发、分布式及容错性应用。自 R12B 版本以来,Erlang 在处理二进制数据方面引入了两项重大更新:...

    rabbitMq安装包

    7. **安装步骤**:在Linux环境下,可以通过`rpm`命令安装这些包,例如`sudo rpm -ivh erlang-*.rpm socat-*.rpm rabbitmq-server-*.rpm`。安装后,还需要启动RabbitMQ服务并配置相关参数。 8. **管理工具**:...

    hackney:Erlang中的简单HTTP客户端

    - **erlang**:Erlang是一种面向并发的、函数式编程语言,常用于构建高可用性、分布式系统,尤其适合处理大量并发连接和实时性需求。 - **HTTPErlang**:可能是指Erlang中的HTTP相关库或工具,强调了这个客户端是用...

    MQ技术分享-RabbitMQ-11

    - **Connection**:TCP连接,应用程序与Broker之间的网络通道。 - **Channel**:信道,基于TCP连接的逻辑通道,减少系统资源消耗。 - **Message**:消息,包含Properties和Body,是实际传输的数据。 - **...

    rabbitmq学习笔记和软件和插件

    **RabbitMQ学习笔记与软件插件详解** RabbitMQ是一种广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。在分布式系统中,RabbitMQ扮演着数据交换中心的角色,...

    emqttd-centos7-v2.3.11.tar.gz

    《emqttd在CentOS7系统上的安装与应用详解》 emqttd是一款基于MQTT协议的开源消息中间件,其稳定性和高性能在物联网(IoT)领域得到了广泛应用。本篇将详细介绍如何在CentOS7 64位系统上安装并使用emqttd-centos7-v...

Global site tag (gtag.js) - Google Analytics