- 浏览: 982384 次
- 性别:
- 来自: 广州
最新评论
-
qingchuwudi:
有用,非常感谢!
erlang进程的优先级 -
zfjdiamond:
你好 这条命令 在那里输入??
你们有yum 我有LuaRocks -
simsunny22:
这个是在linux下运行的吧,在window下怎么运行escr ...
escript的高级特性 -
mozhenghua:
http://www.erlang.org/doc/apps/ ...
mnesia 分布协调的几个细节 -
fxltsbl:
A new record of 108000 HTTP req ...
Haproxy 1.4-dev2: barrier of 100k HTTP req/s crossed
原文地址: http://www.trapexit.org.nyud.net:8080/Building_a_Non-blocking_TCP_server_using_OTP_principles
这篇文章很好的演示了几个特性:
1. superbisor 动态添加child.
2. async accept.
3. gen_fsm
4. 流控制
5. application
很值得一看。
Building a Non-blocking TCP server using OTP principles
From Erlang Community
Contents
[hide]
1 Author
2 Overview
3 Server Design
4 Application and Supervisor behaviours
5 Listener Process
6 Client Socket Handling Process
7 Application File
8 Compiling
9 Running
10 Conclusion
[edit] Author
Serge Aleynikov <saleyn at gmail.com>
[edit] Overview
A reader of this tutorial is assumed to be familiar with gen_server and gen_fsm behaviours, TCP socket communications using gen_tcp module, active and passive socket modes, and OTP supervision principles.
OTP provides a convenient framework for building reliable applications. This is in part accomplished by abstracting common functionality into a set of reusable behaviours such as gen_server and gen_fsm that are linked to OTP's supervision hierarchy.
There are several known TCP server designs. The one we are going to cover involves one process listening for client connections and spawning an FSM process per connecting client. While there is support for TCP communications available in OTP through the gen_tcp module, there is no standard behavior for building non-blocking TCP servers using OTP standard guidelines. By non-blocking we imply that the listening process and the client-handling FSMs should not make any blocking calls and be readily responsive to incoming control messages (such as changes in system configuration, restart requests, etc.) without causing timeouts. Note that blocking in the context of Erlang means blocking an Erlang process rather than the emulator's OS process(es).
In this tutorial we will show how to build a non-blocking TCP server using gen_server and gen_fsm behaviours that offers flow control and is fully compliant with OTP application design principles.
A reader who is new to the OTP framework is encouraged to read Joe Armstrong's tutorial on how to build A Fault-tolerant Server using blocking gen_tcp:connect/3 and gen_tcp:accept/1 calls without involving OTP.
This tutorial was inspired by several threads (e.g. one, two) on the Erlang Questions mailing list mentioning an approach to building non-blocking asynchronous TCP servers.
[edit] Server Design
The design of our server will include the main application's supervisor tcp_server_app process with one_for_one restart strategy and two child specifications. The first one being a listening process implemented as a gen_server behaviour that will wait for asynchronous notifications of client socket connections. The second one is another supervisor tcp_client_sup responsible for starting client handling FSMs and logging abnormal disconnects via standard SASL error reports.
For the sake of simplicity of this tutorial, the client handling FSM (tcp_echo_fsm) will implement an echo server that will echo client's requests back to the client.
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+
[edit] Application and Supervisor behaviours
In order to build an OTP application we need to construct modules implementing an application and supervisor behaviour callback functions. While traditionally these functionalities are implemented in separate modules, given their succinctness we'll combine them in one module.
As an added bonus we implement a get_app_env function that illustrates how to process configuration options as well as command-line options given to the emulator at start-up.
The two instances of init/1 function are for two tiers of supervision hierarchy. Since two different restart strategies for each supervisor are needed, we implement them at different tiers.
Upon application's startup the tcp_server_app:start/2 callback function calls supervisor:start_link/2 that creates main application's supervisor calling tcp_server_app:init([Port, Module]) callback. This supervisor creates a tcp_listener process and a child supervisor tcp_client_sup responsible for spawning client connections. The Module argument in the init function is the name of client-connection handling FSM (in this case tcp_echo_fsm).
TCP Server Application (tcp_server_app.erl)
-module(tcp_server_app).
-author('saleyn@gmail.com').
-behaviour(application).
%% Internal API
-export([start_client/0]).
%% Application and Supervisor callbacks
-export([start/2, stop/1, init/1]).
-define(MAX_RESTART, 5).
-define(MAX_TIME, 60).
-define(DEF_PORT, 2222).
%% A startup function for spawning new client connection handling FSM.
%% To be called by the TCP listener process.
start_client() ->
supervisor:start_child(tcp_client_sup, []).
%%----------------------------------------------------------------------
%% Application behaviour callbacks
%%----------------------------------------------------------------------
start(_Type, _Args) ->
ListenPort = get_app_env(listen_port, ?DEF_PORT),
supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]).
stop(_S) ->
ok.
%%----------------------------------------------------------------------
%% Supervisor behaviour callbacks
%%----------------------------------------------------------------------
init([Port, Module]) ->
{ok,
{_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% TCP Listener
{ tcp_server_sup, % Id = internal id
{tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}
permanent, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[tcp_listener] % Modules = [Module] | dynamic
},
% Client instance supervisor
{ tcp_client_sup,
{supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]},
permanent, % Restart = permanent | transient | temporary
infinity, % Shutdown = brutal_kill | int() >= 0 | infinity
supervisor, % Type = worker | supervisor
[] % Modules = [Module] | dynamic
}
]
}
};
init([Module]) ->
{ok,
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% TCP Client
{ undefined, % Id = internal id
{Module,start_link,[]}, % StartFun = {M, F, A}
temporary, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[] % Modules = [Module] | dynamic
}
]
}
}.
%%----------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------
get_app_env(Opt, Default) ->
case application:get_env(application:get_application(), Opt) of
{ok, Val} -> Val;
_ ->
case init:get_argument(Opt) of
[[Val | _]] -> Val;
error -> Default
end
end.
[edit] Listener Process
One of the shortcomings of the gen_tcp module is that it only exports interface to a blocking accept call. This leads most of developers working on an implementation of a TCP server build a custom process linked to a supervisor using proc_lib or come up with some other proprietary design.
Examining prim_inet module reveals an interesting fact that the actual call to inet driver to accept a client socket is asynchronous. While this is a non-documented property, which means that the OTP team is free to change this implementation, we will exploit this functionality in the construction of our server.
The listener process is implemented as a gen_server behaviour:
TCP Listener Process (tcp_listener.erl)
-module(tcp_listener).
-author('saleyn@gmail.com').
-behaviour(gen_server).
%% External API
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {
listener, % Listening socket
acceptor, % Asynchronous acceptor's internal reference
module % FSM handling module
}).
%%--------------------------------------------------------------------
%% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}
%
%% @doc Called by a supervisor to start the listening process.
%% @end
%%----------------------------------------------------------------------
start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------
%%----------------------------------------------------------------------
%% @spec (Port::integer()) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%
%% @doc Called by gen_server framework at process startup.
%% Create listening socket.
%% @end
%%----------------------------------------------------------------------
init([Port, Module]) ->
process_flag(trap_exit, true),
Opts = [binary, {packet, 2}, {reuseaddr, true},
{keepalive, true}, {backlog, 30}, {active, false}],
case gen_tcp:listen(Port, Opts) of
{ok, Listen_socket} ->
%%Create first accepting process
{ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
{ok, #state{listener = Listen_socket,
acceptor = Ref,
module = Module}};
{error, Reason} ->
{stop, Reason}
end.
%%-------------------------------------------------------------------------
%% @spec (Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @doc Callback for synchronous server calls. If `{stop, ...}' tuple
%% is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_call(Request, _From, State) ->
{stop, {unknown_call, Request}, State}.
%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Callback for asyncrous server calls. If `{stop, ...}' tuple
%% is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Callback for messages sent directly to server's mailbox.
%% If `{stop, ...}' tuple is returned, the server is stopped and
%% `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
#state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
try
case set_sockopt(ListSock, CliSocket) of
ok -> ok;
{error, Reason} -> exit({set_sockopt, Reason})
end,
%% New client connected - spawn a new process using the simple_one_for_one
%% supervisor.
{ok, Pid} = tcp_server_app:start_client(),
gen_tcp:controlling_process(CliSocket, Pid),
%% Instruct the new FSM that it owns the socket.
Module:set_socket(Pid, CliSocket),
%% Signal the network driver that we are ready to accept another connection
case prim_inet:async_accept(ListSock, -1) of
{ok, NewRef} -> ok;
{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
end,
{noreply, State#state{acceptor=NewRef}}
catch exit:Why ->
error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
{stop, Why, State}
end;
handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
{stop, Error, State};
handle_info(_Info, State) ->
{noreply, State}.
%%-------------------------------------------------------------------------
%% @spec (Reason, State) -> any
%% @doc Callback executed on server shutdown. It is only invoked if
%% `process_flag(trap_exit, true)' is set by the server process.
%% The return value is ignored.
%% @end
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, State) ->
gen_tcp:close(State#state.listener),
ok.
%%-------------------------------------------------------------------------
%% @spec (OldVsn, State, Extra) -> {ok, NewState}
%% @doc Convert process state when code is changed.
%% @end
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%------------------------------------------------------------------------
%%% Internal functions
%%%------------------------------------------------------------------------
%% Taken from prim_inet. We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
true = inet_db:register_socket(CliSocket, inet_tcp),
case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
{ok, Opts} ->
case prim_inet:setopts(CliSocket, Opts) of
ok -> ok;
Error -> gen_tcp:close(CliSocket), Error
end;
Error ->
gen_tcp:close(CliSocket), Error
end.
In this module init/1 call takes two parameters - the port number that the TCP listener should be started on and the name of a protocol handling module for client connections. The initialization function opens a listening socket in passive {active, false} mode. This is done so that we have flow control of the data received on the connected client sockets that will inherit this option from the listening socket.
The most interesting part of this code is the prim_inet:async_accept/2 call as well as the handling of asynchronous inet_async messages. In order to get this working we also needed to copy some of the internal OTP code encapsulated in the set_sockopt/2 function that handles socket registration with inet database and copying some options to the client socket.
As soon as a client socket is connected inet driver will notify the listening process using {inet_async, ListSock, Ref, {ok, CliSocket}} message. At this point we'll instantiate a new client socket handling process and set its ownership of the CliSocket.
[edit] Client Socket Handling Process
While tcp_listener is a generic implementation, tcp_echo_fsm is a mere stub FSM for illustrating how to write TCP servers. This modules needs to export two functions - one start_link/0 for a tcp_client_sup supervisor and another set_socket/2 for the listener process to notify the client connection handling FSM process that it is now the owner of the socket, and can begin receiving messages by setting the {active, once} or {active, true} option.
We would like to highlight the synchronization pattern used between the listening process and client connection-handling FSM to avoid possible message loss due to dispatching some messages from the socket to the wrong (listening) process. The process owning the listening socket has it open with {active, false}. After accepting the client's socket that socket inherits its socket options (including {active, false}) from the listener, transfers ownership of the socket to the newly spawned client connection-handling FSM by calling gen_tcp:controlling_process/2 and calls Module:set_socket/2 to notify the FSM that it can start receiving messages from the socket. Until the FSM process enables message delivery by setting the active mode on the socket by calling inet:setopts(Socket, [{active, once}]), the data sent by the TCP sender stays in the socket buffer.
When socket ownership is transfered to FSM in the 'WAIT_FOR_SOCKET' state the FSM sets {active, once} option to let inet driver send it one TCP message at a time. This is the OTP way of preserving flow control and avoiding process message queue flooding with TCP data and crashing the system in case of a fast-producer-slow-consumer case.
The FSM states are implemented by special functions in the tcp_echo_fsm module that use a naming convention with capital case state names enclosed in single quotes. The FSM consists of two states. 'WAIT_FOR_SOCKET' is the initial state in which the FSM is waiting for assignment of socket ownership, and 'WAIT_FOR_DATA' is the state that represents awaiting for TCP message from a client. In this state FSM also handles a special 'timeout' message that signifies no activity from a client and causes the process to stop and close client connection.
TCP Client Socket Handling FSM (tcp_echo_fsm.erl)
-module(tcp_echo_fsm).
-author('saleyn@gmail.com').
-behaviour(gen_fsm).
-export([start_link/0, set_socket/2]).
%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
%% FSM States
-export([
'WAIT_FOR_SOCKET'/2,
'WAIT_FOR_DATA'/2
]).
-record(state, {
socket, % client socket
addr % client address
}).
-define(TIMEOUT, 120000).
%%%------------------------------------------------------------------------
%%% API
%%%------------------------------------------------------------------------
%%-------------------------------------------------------------------------
%% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
%% @doc To be called by the supervisor in order to start the server.
%% If init/1 fails with Reason, the function returns {error,Reason}.
%% If init/1 returns {stop,Reason} or ignore, the process is
%% terminated and the function returns {error,Reason} or ignore,
%% respectively.
%% @end
%%-------------------------------------------------------------------------
start_link() ->
gen_fsm:start_link(?MODULE, [], []).
set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
gen_fsm:send_event(Pid, {socket_ready, Socket}).
%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------
%%-------------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%% @private
%%-------------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
{ok, 'WAIT_FOR_SOCKET', #state{}}.
%%-------------------------------------------------------------------------
%% Func: StateName/2
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
% Now we own the socket
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
{ok, {IP, _Port}} = inet:peername(Socket),
{next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
'WAIT_FOR_SOCKET'(Other, State) ->
error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
%% Allow to receive async messages
{next_state, 'WAIT_FOR_SOCKET', State}.
%% Notification event coming from client
'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
ok = gen_tcp:send(S, Data),
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
'WAIT_FOR_DATA'(timeout, State) ->
error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
{stop, normal, State};
'WAIT_FOR_DATA'(Data, State) ->
io:format("~p Ignoring data: ~p\n", [self(), Data]),
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
%%-------------------------------------------------------------------------
%% Func: handle_event/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{stop, {StateName, undefined_event, Event}, StateData}.
%%-------------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_sync_event(Event, _From, StateName, StateData) ->
{stop, {StateName, undefined_event, Event}, StateData}.
%%-------------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
% Flow control: enable forwarding of next TCP message
inet:setopts(Socket, [{active, once}]),
?MODULE:StateName({data, Bin}, StateData);
handle_info({tcp_closed, Socket}, _StateName,
#state{socket=Socket, addr=Addr} = StateData) ->
error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
{stop, normal, StateData};
handle_info(_Info, StateName, StateData) ->
{noreply, StateName, StateData}.
%%-------------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, _StateName, #state{socket=Socket}) ->
(catch gen_tcp:close(Socket)),
ok.
%%-------------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
[edit] Application File
Another required part of building an OTP application is creation of an application file that includes application name, version, startup module and environment.
Application File (tcp_server.app)
{application, tcp_server,
[
{description, "Demo TCP server"},
{vsn, "1.0"},
{id, "tcp_server"},
{modules, [tcp_listener, tcp_echo_fsm]},
{registered, [tcp_server_sup, tcp_listener]},
{applications, [kernel, stdlib]},
%%
%% mod: Specify the module name to start the application, plus args
%%
{mod, {tcp_server_app, []}},
{env, []}
]
}.
[edit] Compiling
Create the following directory structure for this application:
./tcp_server
./tcp_server/ebin/
./tcp_server/ebin/tcp_server.app
./tcp_server/src/tcp_server_app.erl
./tcp_server/src/tcp_listener.erl
./tcp_server/src/tcp_echo_fsm.erl
$ cd tcp_server/src
$ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f
[edit] Running
We are going to start an Erlang shell with SASL support so that we can view all progress and error reports for our TCP application. Also we are going to start appmon application in order to examine visually the supervision hierarchy.
$ cd ../ebin
$ erl -boot start_sasl
...
1> appmon:start().
{ok,<0.44.0>}
2> application:start(tcp_server).
ok
Now click on the tcp_server button in the appmon's window in order to display supervision hierarchy of the tcp_server application.
3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).
{ok,#Port<0.150>}
The step above initiated a new client connection to the echo server.
4> gen_tcp:send(S,<<"hello">>).
ok
5> f(M), receive M -> M end.
{tcp,#Port<0.150>,"hello"}
We verified that the echo server works as expected. Now let's try to crash the client connection on the server and watch for the supervisor generating an error report entry on screen.
6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).
[{undefined,<0.64.0>,worker,[]}]
7> exit(Pid,kill).
true
=SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 ===
Supervisor: {local,tcp_client_sup}
Context: child_terminated
Reason: killed
Offender: [{pid,<0.77.0>},
{name,undefined},
{mfa,{tcp_echo_fsm,start_link,[]}},
{restart_type,temporary},
{shutdown,2000},
{child_type,worker}]
Note that if you are putting this server under a stress test with many incoming connections, the listener process may fail to accept new connections after the number of open file descriptors reaches the limit set by the operating system. In that case you will see the error:
"too many open files"
If you are running Linux/UNIX, google for a solution (which ultimately boils down to increasing the per-process limit by setting "ulimit -n ..." option).
[edit] Conclusion
OTP provides building blocks for constructing non-blocking TCP servers. This tutorial showed how to create a simple TCP server with flow control using standard OTP behaviours. As an exercise the reader is encouraged to try abstracting generic non-blocking TCP server functionality into a stand-along behaviour
可能是我没有说太清楚,这里调用prim_inet:async_accept进程好像也没有什么其它的任务在执行啊,也就没有看到异步的好处啊。
这篇文章很好的演示了几个特性:
1. superbisor 动态添加child.
2. async accept.
3. gen_fsm
4. 流控制
5. application
很值得一看。
Building a Non-blocking TCP server using OTP principles
From Erlang Community
Contents
[hide]
1 Author
2 Overview
3 Server Design
4 Application and Supervisor behaviours
5 Listener Process
6 Client Socket Handling Process
7 Application File
8 Compiling
9 Running
10 Conclusion
[edit] Author
Serge Aleynikov <saleyn at gmail.com>
[edit] Overview
A reader of this tutorial is assumed to be familiar with gen_server and gen_fsm behaviours, TCP socket communications using gen_tcp module, active and passive socket modes, and OTP supervision principles.
OTP provides a convenient framework for building reliable applications. This is in part accomplished by abstracting common functionality into a set of reusable behaviours such as gen_server and gen_fsm that are linked to OTP's supervision hierarchy.
There are several known TCP server designs. The one we are going to cover involves one process listening for client connections and spawning an FSM process per connecting client. While there is support for TCP communications available in OTP through the gen_tcp module, there is no standard behavior for building non-blocking TCP servers using OTP standard guidelines. By non-blocking we imply that the listening process and the client-handling FSMs should not make any blocking calls and be readily responsive to incoming control messages (such as changes in system configuration, restart requests, etc.) without causing timeouts. Note that blocking in the context of Erlang means blocking an Erlang process rather than the emulator's OS process(es).
In this tutorial we will show how to build a non-blocking TCP server using gen_server and gen_fsm behaviours that offers flow control and is fully compliant with OTP application design principles.
A reader who is new to the OTP framework is encouraged to read Joe Armstrong's tutorial on how to build A Fault-tolerant Server using blocking gen_tcp:connect/3 and gen_tcp:accept/1 calls without involving OTP.
This tutorial was inspired by several threads (e.g. one, two) on the Erlang Questions mailing list mentioning an approach to building non-blocking asynchronous TCP servers.
[edit] Server Design
The design of our server will include the main application's supervisor tcp_server_app process with one_for_one restart strategy and two child specifications. The first one being a listening process implemented as a gen_server behaviour that will wait for asynchronous notifications of client socket connections. The second one is another supervisor tcp_client_sup responsible for starting client handling FSMs and logging abnormal disconnects via standard SASL error reports.
For the sake of simplicity of this tutorial, the client handling FSM (tcp_echo_fsm) will implement an echo server that will echo client's requests back to the client.
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+
[edit] Application and Supervisor behaviours
In order to build an OTP application we need to construct modules implementing an application and supervisor behaviour callback functions. While traditionally these functionalities are implemented in separate modules, given their succinctness we'll combine them in one module.
As an added bonus we implement a get_app_env function that illustrates how to process configuration options as well as command-line options given to the emulator at start-up.
The two instances of init/1 function are for two tiers of supervision hierarchy. Since two different restart strategies for each supervisor are needed, we implement them at different tiers.
Upon application's startup the tcp_server_app:start/2 callback function calls supervisor:start_link/2 that creates main application's supervisor calling tcp_server_app:init([Port, Module]) callback. This supervisor creates a tcp_listener process and a child supervisor tcp_client_sup responsible for spawning client connections. The Module argument in the init function is the name of client-connection handling FSM (in this case tcp_echo_fsm).
TCP Server Application (tcp_server_app.erl)
-module(tcp_server_app).
-author('saleyn@gmail.com').
-behaviour(application).
%% Internal API
-export([start_client/0]).
%% Application and Supervisor callbacks
-export([start/2, stop/1, init/1]).
-define(MAX_RESTART, 5).
-define(MAX_TIME, 60).
-define(DEF_PORT, 2222).
%% A startup function for spawning new client connection handling FSM.
%% To be called by the TCP listener process.
start_client() ->
supervisor:start_child(tcp_client_sup, []).
%%----------------------------------------------------------------------
%% Application behaviour callbacks
%%----------------------------------------------------------------------
start(_Type, _Args) ->
ListenPort = get_app_env(listen_port, ?DEF_PORT),
supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]).
stop(_S) ->
ok.
%%----------------------------------------------------------------------
%% Supervisor behaviour callbacks
%%----------------------------------------------------------------------
init([Port, Module]) ->
{ok,
{_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% TCP Listener
{ tcp_server_sup, % Id = internal id
{tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}
permanent, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[tcp_listener] % Modules = [Module] | dynamic
},
% Client instance supervisor
{ tcp_client_sup,
{supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]},
permanent, % Restart = permanent | transient | temporary
infinity, % Shutdown = brutal_kill | int() >= 0 | infinity
supervisor, % Type = worker | supervisor
[] % Modules = [Module] | dynamic
}
]
}
};
init([Module]) ->
{ok,
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% TCP Client
{ undefined, % Id = internal id
{Module,start_link,[]}, % StartFun = {M, F, A}
temporary, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
[] % Modules = [Module] | dynamic
}
]
}
}.
%%----------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------
get_app_env(Opt, Default) ->
case application:get_env(application:get_application(), Opt) of
{ok, Val} -> Val;
_ ->
case init:get_argument(Opt) of
[[Val | _]] -> Val;
error -> Default
end
end.
[edit] Listener Process
One of the shortcomings of the gen_tcp module is that it only exports interface to a blocking accept call. This leads most of developers working on an implementation of a TCP server build a custom process linked to a supervisor using proc_lib or come up with some other proprietary design.
Examining prim_inet module reveals an interesting fact that the actual call to inet driver to accept a client socket is asynchronous. While this is a non-documented property, which means that the OTP team is free to change this implementation, we will exploit this functionality in the construction of our server.
The listener process is implemented as a gen_server behaviour:
TCP Listener Process (tcp_listener.erl)
-module(tcp_listener).
-author('saleyn@gmail.com').
-behaviour(gen_server).
%% External API
-export([start_link/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {
listener, % Listening socket
acceptor, % Asynchronous acceptor's internal reference
module % FSM handling module
}).
%%--------------------------------------------------------------------
%% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}
%
%% @doc Called by a supervisor to start the listening process.
%% @end
%%----------------------------------------------------------------------
start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------
%%----------------------------------------------------------------------
%% @spec (Port::integer()) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%
%% @doc Called by gen_server framework at process startup.
%% Create listening socket.
%% @end
%%----------------------------------------------------------------------
init([Port, Module]) ->
process_flag(trap_exit, true),
Opts = [binary, {packet, 2}, {reuseaddr, true},
{keepalive, true}, {backlog, 30}, {active, false}],
case gen_tcp:listen(Port, Opts) of
{ok, Listen_socket} ->
%%Create first accepting process
{ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
{ok, #state{listener = Listen_socket,
acceptor = Ref,
module = Module}};
{error, Reason} ->
{stop, Reason}
end.
%%-------------------------------------------------------------------------
%% @spec (Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @doc Callback for synchronous server calls. If `{stop, ...}' tuple
%% is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_call(Request, _From, State) ->
{stop, {unknown_call, Request}, State}.
%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Callback for asyncrous server calls. If `{stop, ...}' tuple
%% is returned, the server is stopped and `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%-------------------------------------------------------------------------
%% @spec (Msg, State) ->{noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Callback for messages sent directly to server's mailbox.
%% If `{stop, ...}' tuple is returned, the server is stopped and
%% `terminate/2' is called.
%% @end
%% @private
%%-------------------------------------------------------------------------
handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
#state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
try
case set_sockopt(ListSock, CliSocket) of
ok -> ok;
{error, Reason} -> exit({set_sockopt, Reason})
end,
%% New client connected - spawn a new process using the simple_one_for_one
%% supervisor.
{ok, Pid} = tcp_server_app:start_client(),
gen_tcp:controlling_process(CliSocket, Pid),
%% Instruct the new FSM that it owns the socket.
Module:set_socket(Pid, CliSocket),
%% Signal the network driver that we are ready to accept another connection
case prim_inet:async_accept(ListSock, -1) of
{ok, NewRef} -> ok;
{error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
end,
{noreply, State#state{acceptor=NewRef}}
catch exit:Why ->
error_logger:error_msg("Error in async accept: ~p.\n", [Why]),
{stop, Why, State}
end;
handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),
{stop, Error, State};
handle_info(_Info, State) ->
{noreply, State}.
%%-------------------------------------------------------------------------
%% @spec (Reason, State) -> any
%% @doc Callback executed on server shutdown. It is only invoked if
%% `process_flag(trap_exit, true)' is set by the server process.
%% The return value is ignored.
%% @end
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, State) ->
gen_tcp:close(State#state.listener),
ok.
%%-------------------------------------------------------------------------
%% @spec (OldVsn, State, Extra) -> {ok, NewState}
%% @doc Convert process state when code is changed.
%% @end
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%------------------------------------------------------------------------
%%% Internal functions
%%%------------------------------------------------------------------------
%% Taken from prim_inet. We are merely copying some socket options from the
%% listening socket to the new client socket.
set_sockopt(ListSock, CliSocket) ->
true = inet_db:register_socket(CliSocket, inet_tcp),
case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
{ok, Opts} ->
case prim_inet:setopts(CliSocket, Opts) of
ok -> ok;
Error -> gen_tcp:close(CliSocket), Error
end;
Error ->
gen_tcp:close(CliSocket), Error
end.
In this module init/1 call takes two parameters - the port number that the TCP listener should be started on and the name of a protocol handling module for client connections. The initialization function opens a listening socket in passive {active, false} mode. This is done so that we have flow control of the data received on the connected client sockets that will inherit this option from the listening socket.
The most interesting part of this code is the prim_inet:async_accept/2 call as well as the handling of asynchronous inet_async messages. In order to get this working we also needed to copy some of the internal OTP code encapsulated in the set_sockopt/2 function that handles socket registration with inet database and copying some options to the client socket.
As soon as a client socket is connected inet driver will notify the listening process using {inet_async, ListSock, Ref, {ok, CliSocket}} message. At this point we'll instantiate a new client socket handling process and set its ownership of the CliSocket.
[edit] Client Socket Handling Process
While tcp_listener is a generic implementation, tcp_echo_fsm is a mere stub FSM for illustrating how to write TCP servers. This modules needs to export two functions - one start_link/0 for a tcp_client_sup supervisor and another set_socket/2 for the listener process to notify the client connection handling FSM process that it is now the owner of the socket, and can begin receiving messages by setting the {active, once} or {active, true} option.
We would like to highlight the synchronization pattern used between the listening process and client connection-handling FSM to avoid possible message loss due to dispatching some messages from the socket to the wrong (listening) process. The process owning the listening socket has it open with {active, false}. After accepting the client's socket that socket inherits its socket options (including {active, false}) from the listener, transfers ownership of the socket to the newly spawned client connection-handling FSM by calling gen_tcp:controlling_process/2 and calls Module:set_socket/2 to notify the FSM that it can start receiving messages from the socket. Until the FSM process enables message delivery by setting the active mode on the socket by calling inet:setopts(Socket, [{active, once}]), the data sent by the TCP sender stays in the socket buffer.
When socket ownership is transfered to FSM in the 'WAIT_FOR_SOCKET' state the FSM sets {active, once} option to let inet driver send it one TCP message at a time. This is the OTP way of preserving flow control and avoiding process message queue flooding with TCP data and crashing the system in case of a fast-producer-slow-consumer case.
The FSM states are implemented by special functions in the tcp_echo_fsm module that use a naming convention with capital case state names enclosed in single quotes. The FSM consists of two states. 'WAIT_FOR_SOCKET' is the initial state in which the FSM is waiting for assignment of socket ownership, and 'WAIT_FOR_DATA' is the state that represents awaiting for TCP message from a client. In this state FSM also handles a special 'timeout' message that signifies no activity from a client and causes the process to stop and close client connection.
TCP Client Socket Handling FSM (tcp_echo_fsm.erl)
-module(tcp_echo_fsm).
-author('saleyn@gmail.com').
-behaviour(gen_fsm).
-export([start_link/0, set_socket/2]).
%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
%% FSM States
-export([
'WAIT_FOR_SOCKET'/2,
'WAIT_FOR_DATA'/2
]).
-record(state, {
socket, % client socket
addr % client address
}).
-define(TIMEOUT, 120000).
%%%------------------------------------------------------------------------
%%% API
%%%------------------------------------------------------------------------
%%-------------------------------------------------------------------------
%% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
%% @doc To be called by the supervisor in order to start the server.
%% If init/1 fails with Reason, the function returns {error,Reason}.
%% If init/1 returns {stop,Reason} or ignore, the process is
%% terminated and the function returns {error,Reason} or ignore,
%% respectively.
%% @end
%%-------------------------------------------------------------------------
start_link() ->
gen_fsm:start_link(?MODULE, [], []).
set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
gen_fsm:send_event(Pid, {socket_ready, Socket}).
%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------
%%-------------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%% @private
%%-------------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
{ok, 'WAIT_FOR_SOCKET', #state{}}.
%%-------------------------------------------------------------------------
%% Func: StateName/2
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
% Now we own the socket
inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
{ok, {IP, _Port}} = inet:peername(Socket),
{next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
'WAIT_FOR_SOCKET'(Other, State) ->
error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
%% Allow to receive async messages
{next_state, 'WAIT_FOR_SOCKET', State}.
%% Notification event coming from client
'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
ok = gen_tcp:send(S, Data),
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
'WAIT_FOR_DATA'(timeout, State) ->
error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
{stop, normal, State};
'WAIT_FOR_DATA'(Data, State) ->
io:format("~p Ignoring data: ~p\n", [self(), Data]),
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
%%-------------------------------------------------------------------------
%% Func: handle_event/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{stop, {StateName, undefined_event, Event}, StateData}.
%%-------------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_sync_event(Event, _From, StateName, StateData) ->
{stop, {StateName, undefined_event, Event}, StateData}.
%%-------------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
% Flow control: enable forwarding of next TCP message
inet:setopts(Socket, [{active, once}]),
?MODULE:StateName({data, Bin}, StateData);
handle_info({tcp_closed, Socket}, _StateName,
#state{socket=Socket, addr=Addr} = StateData) ->
error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
{stop, normal, StateData};
handle_info(_Info, StateName, StateData) ->
{noreply, StateName, StateData}.
%%-------------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, _StateName, #state{socket=Socket}) ->
(catch gen_tcp:close(Socket)),
ok.
%%-------------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
[edit] Application File
Another required part of building an OTP application is creation of an application file that includes application name, version, startup module and environment.
Application File (tcp_server.app)
{application, tcp_server,
[
{description, "Demo TCP server"},
{vsn, "1.0"},
{id, "tcp_server"},
{modules, [tcp_listener, tcp_echo_fsm]},
{registered, [tcp_server_sup, tcp_listener]},
{applications, [kernel, stdlib]},
%%
%% mod: Specify the module name to start the application, plus args
%%
{mod, {tcp_server_app, []}},
{env, []}
]
}.
[edit] Compiling
Create the following directory structure for this application:
./tcp_server
./tcp_server/ebin/
./tcp_server/ebin/tcp_server.app
./tcp_server/src/tcp_server_app.erl
./tcp_server/src/tcp_listener.erl
./tcp_server/src/tcp_echo_fsm.erl
$ cd tcp_server/src
$ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f
[edit] Running
We are going to start an Erlang shell with SASL support so that we can view all progress and error reports for our TCP application. Also we are going to start appmon application in order to examine visually the supervision hierarchy.
$ cd ../ebin
$ erl -boot start_sasl
...
1> appmon:start().
{ok,<0.44.0>}
2> application:start(tcp_server).
ok
Now click on the tcp_server button in the appmon's window in order to display supervision hierarchy of the tcp_server application.
3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).
{ok,#Port<0.150>}
The step above initiated a new client connection to the echo server.
4> gen_tcp:send(S,<<"hello">>).
ok
5> f(M), receive M -> M end.
{tcp,#Port<0.150>,"hello"}
We verified that the echo server works as expected. Now let's try to crash the client connection on the server and watch for the supervisor generating an error report entry on screen.
6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).
[{undefined,<0.64.0>,worker,[]}]
7> exit(Pid,kill).
true
=SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 ===
Supervisor: {local,tcp_client_sup}
Context: child_terminated
Reason: killed
Offender: [{pid,<0.77.0>},
{name,undefined},
{mfa,{tcp_echo_fsm,start_link,[]}},
{restart_type,temporary},
{shutdown,2000},
{child_type,worker}]
Note that if you are putting this server under a stress test with many incoming connections, the listener process may fail to accept new connections after the number of open file descriptors reaches the limit set by the operating system. In that case you will see the error:
"too many open files"
If you are running Linux/UNIX, google for a solution (which ultimately boils down to increasing the per-process limit by setting "ulimit -n ..." option).
[edit] Conclusion
OTP provides building blocks for constructing non-blocking TCP servers. This tutorial showed how to create a simple TCP server with flow control using standard OTP behaviours. As an exercise the reader is encouraged to try abstracting generic non-blocking TCP server functionality into a stand-along behaviour
评论
9 楼
rain2005
2009-12-23
呵呵,有认真的看了一遍,FSM很好很强大,在这里也看到了handle_info的作用。
其实我们只需要写个handler process在接受处理term消息就一个网络服务器了。
其实我们只需要写个handler process在接受处理term消息就一个网络服务器了。
8 楼
rain2005
2009-12-23
呵呵,原来是为了节约这个N次进程切换开销。
7 楼
mryufeng
2009-12-18
这个程序用异步的好处有2个:
1. 因为同步的accept是这样的 让inet_tcp accept->等待返回消息. 是个阻塞操作.
那么整个过程的进程切换有2次, 如果你的系统有成千上万的accept要处理的话, 那么进程调度的次数是 2* N, 但是如果是异步的话,由于反馈的消息是批量回来的那么进程调度次数就是 N+1, 对于很大的N 这个节约是很明显的.
2. 由于处理是批量的,可以利用到cpu的cache.
1. 因为同步的accept是这样的 让inet_tcp accept->等待返回消息. 是个阻塞操作.
那么整个过程的进程切换有2次, 如果你的系统有成千上万的accept要处理的话, 那么进程调度的次数是 2* N, 但是如果是异步的话,由于反馈的消息是批量回来的那么进程调度次数就是 N+1, 对于很大的N 这个节约是很明显的.
2. 由于处理是批量的,可以利用到cpu的cache.
6 楼
rain2005
2009-12-18
mryufeng 写道
accept在inet_driver里面支持多进程接受, 所以同步接受那一个经常只能发起一次 异步的话就可以发起多个 而且在等accept返回消息的时候 可以做其他事情...
可能是我没有说太清楚,这里调用prim_inet:async_accept进程好像也没有什么其它的任务在执行啊,也就没有看到异步的好处啊。
5 楼
mryufeng
2009-12-17
accept在inet_driver里面支持多进程接受, 所以同步接受那一个经常只能发起一次 异步的话就可以发起多个 而且在等accept返回消息的时候 可以做其他事情...
4 楼
rain2005
2009-12-17
不好意思,就看了标题就乱发言了。
不过就是这里使用prim_inet:async_accept异步接收与同步accept比有什么优势,毕竟这里的accept proccess好像也没有什么别的任务需要处理啊。
不过就是这里使用prim_inet:async_accept异步接收与同步accept比有什么优势,毕竟这里的accept proccess好像也没有什么别的任务需要处理啊。
3 楼
mryufeng
2009-12-16
这个例子就是一个tcp连接一个process呀, 一个gen_server就是个process呀. 你不用考虑底层的事情, 用就好了.
2 楼
rain2005
2009-12-13
问个问题,erlang进程创建和调度开销都很小,tcp服务器为什么不使用每连接一进程的方式?本身erlang底层driver应该是非阻塞的吧,难道是为了节省进程调度开销?
1 楼
美洲豹
2009-05-30
原文我看过,我现在都按照这样的方式写server,有了这花样的框架,只要实现自己的逻辑就好了,很方便
发表评论
-
OTP R14A今天发布了
2010-06-17 14:36 2677以下是这次发布的亮点,没有太大的性能改进, 主要是修理了很多B ... -
R14A实现了EEP31,添加了binary模块
2010-05-21 15:15 3030Erlang的binary数据结构非常强大,而且偏向底层,在作 ... -
如何查看节点的可用句柄数目和已用句柄数
2010-04-08 03:31 4814很多同学在使用erlang的过程中, 碰到了很奇怪的问题, 后 ... -
获取Erlang系统信息的代码片段
2010-04-06 21:49 3475从lib/megaco/src/tcp/megaco_tcp_ ... -
iolist跟list有什么区别?
2010-04-06 20:30 6529看到erlang-china.org上有个 ... -
erlang:send_after和erlang:start_timer的使用解释
2010-04-06 18:31 8386前段时间arksea 同学提出这个问题, 因为文档里面写的很不 ... -
Latest news from the Erlang/OTP team at Ericsson 2010
2010-04-05 19:23 2013参考Talk http://www.erlang-factor ... -
对try 异常 运行的疑问,为什么出现两种结果
2010-04-05 19:22 2842郎咸武<langxianzhe@163.com> ... -
Erlang ERTS Async基础设施
2010-03-19 00:03 2517其实Erts的Async做的很不错的, 相当的完备, 性能又高 ... -
CloudI 0.0.9 Released, A Cloud as an Interface
2010-03-09 22:32 2476基于Erlang的云平台 看了下代码 质量还是不错的 完成了不 ... -
Memory matters - even in Erlang (再次说明了了解内存如何工作的必要性)
2010-03-09 20:26 3439原文地址:http://www.lshift.net/blog ... -
Some simple examples of using Erlang’s XPath implementation
2010-03-08 23:30 2050原文地址 http://www.lshift.net/blog ... -
lcnt 环境搭建
2010-02-26 16:19 2614抄书:otp_doc_html_R13B04/lib/tool ... -
Erlang强大的代码重构工具 tidier
2010-02-25 16:22 2486Jan 29, 2010 We are very happy ... -
[Feb 24 2010] Erlang/OTP R13B04 has been released
2010-02-25 00:31 1387Erlang/OTP R13B04 has been rele ... -
R13B04 Installation
2010-01-28 10:28 1390R13B04后erlang的源码编译为了考虑移植性,就改变了编 ... -
Running tests
2010-01-19 14:51 1486R13B03以后 OTP的模块加入了大量的测试模块,这些模块都 ... -
R13B04在细化Binary heap
2010-01-14 15:11 1508从github otp的更新日志可以清楚的看到otp R13B ... -
R13B03 binary vheap有助减少binary内存压力
2009-11-29 16:07 1668R13B03 binary vheap有助减少binary内存 ... -
erl_nif 扩展erlang的另外一种方法
2009-11-26 01:02 3218我们知道扩展erl有2种方法, driver和port. 这2 ...
相关推荐
Netty的核心设计理念是基于NIO(Non-blocking I/O,非阻塞I/O)模型,通过事件驱动的方式处理网络连接和数据传输,提高了系统的并发能力。它提供了一套完整的API,包括TCP、UDP、HTTP、WebSocket等多种网络协议的...
1. **异步事件驱动**:Netty基于非阻塞I/O模型,利用Java的NIO(Non-blocking I/O)库,实现了高并发下的高效网络通信。通过事件循环EventLoop和事件通道EventChannel,Netty能够同时处理多个连接,避免了线程上下文...
- NIO (Non-blocking I/O) 和 BIO (Blocking I/O) 的实现,这涉及到Java的Socket编程和I/O模型。 - SSL/TLS的支持,包括证书配置和加密处理。 - Keep-Alive和Pipeline特性,用于提高HTTP通信效率。 通过学习源代码...
1. **非阻塞I/O**:Netty基于Java NIO(Non-blocking I/O)实现,允许在一个线程中处理多个连接,提高了系统的资源利用率和并发能力。 2. **零拷贝**:通过使用Direct Buffer和FileChannel.transferTo方法,Netty...
It is event-driven and uses a non-blocking I/O model. This makes it asynchronous, and one of the biggest advantages for using asynchronous operations is that you can maximize the usage of a single ...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
官方离线安装包,亲测可用。使用rpm -ivh [rpm完整包名] 进行安装
9 月 8 日,字节跳动正式宣布开源 CloudWeGo。这是一套以 Go 语言为核心、专注于微服务通信与治理的中间件集合,具有高性能、可扩展、高可靠的特点。 Kitex具有以下特性: ... 扩展性: 提供了较多的扩展接口以及默认...
jar包,亲测可用
jar包,亲测可用
jar包,亲测可用
在现代网络编程中,非阻塞套接字(non-blocking socket)和多路复用(multiplexing)是处理高并发连接的关键技术之一。这些技术能够帮助服务器高效地管理多个客户端连接,避免因等待某个操作完成而浪费资源。本文将...
1. **异步模型**:Netty采用了非阻塞I/O模型,基于Java的NIO(Non-blocking I/O)库,提高了处理大量并发连接的能力。这种模型使得Netty可以高效地利用系统资源,尤其在高并发场景下性能显著。 2. **丰富的协议支持...
1. **GNet介绍**:GNet库为Java应用提供了异步、非阻塞的网络I/O模型,它基于NIO(Non-blocking I/O)和Epoll,适用于处理大量并发连接。这种设计使得GNet在处理高并发、大数据量的网络通信时表现出色。 2. **...
1. **异步事件驱动**:Netty使用非阻塞I/O模型,基于Java NIO(Non-blocking I/O)实现,能够处理大量的并发连接,尤其适合高负载、大规模网络应用。 2. **高效的数据编码与解码**:Netty提供了多种编解码器,如...
Java NIO(New IO)是Java 1.4版本引入的一个新模块,全称为Non-blocking Input/Output,它提供了一种不同于传统IO的I/O操作方式。传统的IO模型基于阻塞IO,即当进行读写操作时,如果数据未准备好,程序会一直等待,...
《Fast Portable non-blocking network programming with Libevent》是一本关于Libevent库的书籍,它专注于指导读者如何使用Libevent 2.0或更高版本来编写快速、可移植且支持异步I/O的网络程序。本书的作者是Nick ...
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger def my_task(): print("Task executed at", datetime.now()) scheduler = ...
1. 导入APScheduler库:`from apscheduler.schedulers.blocking import BlockingScheduler` 2. 创建一个Scheduler实例:`scheduler = BlockingScheduler()` 3. 定义要执行的任务,例如:`def my_job(): print("Hello...
from apscheduler.schedulers.blocking import BlockingScheduler ``` 3. **创建调度器**:实例化调度器对象: ```python scheduler = BlockingScheduler() ``` 4. **定义任务**:创建一个函数作为要执行的...