- 浏览: 985775 次
- 性别:
- 来自: 广州
最新评论
-
qingchuwudi:
有用,非常感谢!
erlang进程的优先级 -
zfjdiamond:
你好 这条命令 在那里输入??
你们有yum 我有LuaRocks -
simsunny22:
这个是在linux下运行的吧,在window下怎么运行escr ...
escript的高级特性 -
mozhenghua:
http://www.erlang.org/doc/apps/ ...
mnesia 分布协调的几个细节 -
fxltsbl:
A new record of 108000 HTTP req ...
Haproxy 1.4-dev2: barrier of 100k HTTP req/s crossed
鉴于很多同学在帖子里面询问erlang与外面世界交互的port如何写的问题, david king很早前就写了篇文章详细地解决了这个问题,而且很有深度。请参见http://www.ketralnis.com/roller/dking/entry/20070903
Supervision Trees and Ports in Erlang
08:31PM Sep 03, 2007 in category Erlang by David King
Erlang provides a whole lot of infrastructure for doing tasks commonly associated with building giant fault-tolerant systems. But what about when you have to talk to an external non-Erlang program? Can we make our communication to that program fault-tolerant using only native Erlang/OTP components?
The problem
I'm writing a blogging engine using Erlyweb and Mnesia, and one of my tasks is to pass the user's blog entry through a formatter. I use two: one for Markdown, and one to sanitise for XSS attacks. Since there's no Markdown or HTML-sanitiser implementation for Erlang, we need to communicate with the Perl implementations (markdown, HTML-TagFilter).
We'll want some fault-tolerance (in case one of our external programs crash, we want to re-launch it), and we don't want to introduce a bottleneck by serialising access to our external programs, so we'll launch several copies of it to spread the load and allow some concurrency. In addition, we'll want to leave room to move all of the processing to its own node in the event that our blog gets very popular.
We'll communicate using an Erlang Port. A Port is a connection to an external program, implemented using Unix pipes. So our external program just has to know how to read/write to stdout/stdin. We're going to keep this all pretty generic to the actual external program. You should be able to do the same for any other program that can take data in length-encoded packets (or can be wrapped in one that does, like our Perl wrapper) for which you want to launch mutliple workers.
To launch our workers, we'll use Erlang's gen_server interface, which is a way to write a generic server without having to write the implementation details of the server, keeping state, message-handling and responding, etc. Instead, you write a gen_server with a set of callbacks, and all you write is the code to actually handle the messages that you want to respond to. Bascially, gen_server is a module that can take another module with appropriate callbacks and handle all of the tasks of making it a server that looks more or less like (psuedo-code)
loop(State) ->
Message=receive Some_Message
if Message is an internal message, like shutdown or startup or code-replacement
let gen_server handle it
loop(State)
else it's a message that the programmer wants to respond to
pass it to the programmer, let them handle it, get new state
loop(NewState)
Our gen_server will have to keep track of its connection to our external program, and on request it will have to send data to the external program and retrieve the result. It will also keep track of its process group (pg2) and adding itself to it
Erlang does something similar for supervisors. A supervisor is a process whose entire job is to watch another process or set of processes. In our case, we just want it to make sure that our external processes are up and running, and to re-launch them in case they crash, and to bring up all of our workers at once so that we don't have to launch them individually. We'll have one supervisor for Markdown which will watch all of our Markdown gen_servers, and another for all of our sanitiser gen_servers. To make managing them easier, we'll use another supervisor that will watch those two supervisors, re-launching them if they crash, and enabling us to bring both up, and all of the associated gen_servers, at once. So the plan is to have a formatter supervisor that will launch a markdown supervisor and a sanitiser supervisor, each of which will launch N workers, like this:
I'll show you working code, and try to explain the fuzzy bits, but I encourage you to research the functions that compose it in Erlang's own docs so that you really know what is going on.
external_program.pl
Since all of this relies on an actual external program, here's a shell to use for a Perl program:
1 #!/usr/bin/env perl -w
2
3 # turn on auto-flushing
4 $cfh = select (STDOUT);
5 $| = 1;
6 select ($cfh);
7
8 sub process {
9 return $_[0];
10 }
11
12 until(eof(STDIN)) {
13 my ($length_bytes,$length,$data,$return);
14
15 read STDIN,$length_bytes,4; # four-byte length-header
16 $length=unpack("N",$length_bytes);
17
18 read STDIN,$data,$length;
19
20 $return=&process($data);
21
22 print STDOUT pack("N",length($return));
23 print STDOUT $return;
24 }
It communicates with Erlang via stdin/stdout, and handles packets with 4-byte length headers (with network byte-order, since that's how Erlang sends it). That is just a stub program that returns the data coming in to it, but you can replace process with whatever you want to process/format the data. I have two versions of this, one that applies markdown, and one that sanitises input with HTML-TagFilter.
external_program.erl
Now the for the Erlang bits. First, we'll construct the gen_server to actually communicate with the Perl program. We'll communicate using a Port (which uses a unix pipe). We want to send data in atomic packets so that the Perl program knows when we're done sending data and so that we know when the Perl program is done responding. To do this, we'll send and receive data in packets of up to 232 bytes. That is, we'll send our Perl program a four-byte header (in network byte-order) indicating how much data is to be sent, then we'll send the data, and Perl will send back a four-byte header indicating how much data it is about to send, and then it will send that data. Erlang will handle the packets and headers on its side with the {packet,4} option to open_port/2, but we'll have to handle the packetising ourselves from the Perl side (already done in the above Perl program). This one is pretty generic, so you could use it to communicate with any outside program that can send data in packets, not just a formatter. Here's the code:
1 -module(external_program).
2 -behaviour(gen_server).
3
4 % API
5 -export([start_link/3,
6 filter/2]).
7
8 % gen_server callbacks
9 -export([init/1,
10 handle_call/3,
11 handle_cast/2,
12 handle_info/2,
13 code_change/3,
14 terminate/2]).
15
16 -record(state,{port}).
17
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
20
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
31
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
38
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
44
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
And here's the blow-by-blow. Skip ahead to externalprogram_supervisor.erl if you understand it. First, we start our module with
2 -behaviour(gen_server)
This tells the compiler to warn us if we forget any functions for call-back that gen_server is expecting to find. I won't introduce the exports here, I'll introduce the functions we come to them.
We use a record to encode our state, even though the only item in our state is the connection to the external program
16 -record(state,{port}).
We do this in case we later decide to add anything to the state, it will save us from modifying all of the function headers that refer to the state. Remember that our entire state between calls must go into this record, so if we want to later add information like the time the program was launched (for statistical tracking), or the system user that launched it, this is where it goes.
start_link/3 is how we'll start our gen_server. It's the function that we call from another module (our supervisor), not one called as a gen_server callback, so it can look however we want, but our supervisor is expecting it to return {ok,Pid}.
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
It calls gen_server:start_link/4, which takes:
* {local,Id}: This is how gen_server will register the PID of the server. local means that it will only be registered on the local node, and Id is an atom passed to us by our supervisor that will appear in process listings. We don't use this anywhere, it's just nice to have for viewing in appmon and other tools, and it's optional (gen_server:start_link/3 takes only the other three arguments)
* the call-back module (i.e. this module)
* the arguments to the gen_server's init/1 function (which we'll get to)
* any special options to gen_server (we don't use any)
It returns {ok,Pid}, where Pid is the Pid of the launched process. While we used gen_server:start_link/4, which takes registered name for the gen_server, we don't use it to address the gen_server. Instead, we're going to use the pg2 group name that we set in init/1 below. This is because we're going to launch several copies of this gen_server, so communicating directly with it isn't practical.
handle_info/2 is a callback function that is called whenever our server receives a message that gen_server doesn't understand.
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
The only message that we handle is the Port indicating that it has shut down (for instance, if it crashes). In this case, there's nothing that we can do anymore (since our whole reason for being is to communicate with this Port), so we signal to our caller that we intend to shut down, and gen_server will shut us down, which includes calling our terminate/2 function and do other shutdown tasks.
terminate/2 is a callback function that gen_server calls whenever it is shutting us down for any reason so that we can free up any resources that we have (like our Port). It passes us the Reason for shutdown and the last State of the server
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
We have two clauses to handle a shutdown. If we are shutting down because our port terminated (because handle_info said so), then we just let gen_server finish shutting us down. If we shut down for any other reason, then we close the Port before we die so that it's not left open forever. terminate's return value is ignored.
handle_cast/2 handles asynchronous messages to our server
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
gen_servers can handle two types of in-band messages: synchronous messages generated by calls to gen_server:call/2, and asynchronous messages generated by gen_server:cast/2. (gen_server handles the difference.) Since we're only handling synchronous messages, we just don't return a reply if we receive any asynchronous messages, but if you really wanted you could make a synchronous call and send it as a response
code_change/3 is called when our code is being dynamically replaced
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
Erlang supports replacing code while the server is running, with no downtime. In the event that the code is replaced, a gen_server temporarily stops responding to messages (enqueueing them as they come in), waits for the code to be replaced, calls the call-back module's new code_change function, picks up the new state, and starts running again. This could be used if we were to upgrade our code to one that required a new member of the state record, for instance, to upgrade the running state to the state that the new version uses. (If the function returns anything but {ok,NewState}, the gen_server crashes, which would be fine in our case since our supervisor would just restart it under the new code anyway.) Unless we plan to convert the state of the server, which for now we don't, we just return {ok,State}
init/1 is called by gen_server just before the server-loop is run.
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
Its argument comes from gen_server:start_link, which we called in our own start_link/3 function, and is our Type (which becomes our pg2 group name) and the path to the external program that we want to launch (the Perl programs mentioned above). We first indicate that we want to trap exits, so that we can receive 'EXIT' messages from our Port in case it dies. These messages are not recognised by gen_server, so they get passed to our handle_info/2 callback function. Then we create our pg2 group (this operation is idempotent, which means that doing it several times has the same effect as doing it once), and join this process to the group. Then we open up the external program itself using open_port. It takes a tuple of {Type,Path}, which we use to indicate that we are launching an external program (the spawn type) and where to find it, and a list of options, which we use to tell it that we want to communicate using binaries instead of lists, and that we want it to handle communication to our Port with packets with a length-header of four bytes. Then we tell gen_server that all is well and to start the server with a state record containing a reference to our Port.
handle_call/2 is called whenever we receive a synchronous request from an outside caller, this is the actual guts of our server.
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
We just receive the command (which is a {filter,Data} tuple; it doesn't have to be, but that's how we're going to call it later), send it to the Port, get the reply from the Port, and tell gen_server to reply that back to the caller. We didn't have to write any of the server code, just the implementation of our code.
Finally, here's how we actually communicate with our server after it's launched:
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
We make a synchronous call to the "closest" Pid that is a member of the pg2 group that our programs join when launching. ("Closest" just means that pg2 will first look on the local node before looking on other nodes.) pg2:get_closest_pid randomises which worker it returns within a group, so this should spread the load if we get many simultaneous requests. This could be expanded to try again in the event of a timeout (assuming that the timed-out server will crash and be re-launched, or that pg2:get_closest_pid will return a different Pid next time)
At this point you should be able to create and call a gen_server
externalprogram_supervisor.erl
Now for the supervisor. Again, we'll keep this generic. It handles multiple external_program workers, launching NumWorkers (passed to start_link/3). We'll use the (aptly named) supervisor behaviour. supervisor launches a group of processes, watches them, and re-launches them if they die. It also provides a way to bring up or down a group of processes all at once, just by launching or shutting down the supervisor. I'm going to explain the functions of order for clarity.
1 -module(externalprogram_supervisor).
2 -behavior(supervisor).
3
4 % API
5 -export([start_link/3]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link(Type,NumWorkers,ExtProg) ->
11 supervisor:start_link({local,
12 list_to_atom(atom_to_list(Type) ++ "_supervisor")},
13 ?MODULE,
14 {Type,NumWorkers,ExtProg}).
15
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
33
Again, -behaviour(supervisor). tells the compiler to warn us if we forget any callback functions required by supervisor.
supervisor only has one callback, init/1:
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
It's only called once when the supervisor is created (or re-started after crashing). It gets its arguments from supervisor:start_link, which we call from our local start_link (that we'll explain in a moment). init is expected to return a restart strategy and a list of Childspecs, where a Childspec looks like (straight from the Erlang supervisor documentation):
{Id,StartFunc,Restart,Shutdown,Type,Modules},
Id = term()
StartFunc = {M,F,A}
M = F = atom()
A = [term()]
Restart = permanent | transient | temporary
Shutdown = brutal_kill | int()>=0 | infinity
Type = worker | supervisor
Modules = [Module] | dynamic
Module = atom()
You can read up on what all of that means in the supervisor documentation, but we return a list of them consisting of NumWorkers items (see the list-comprehension) such that each is launched by calling external_program:start_link/3 (specified by the StartFunc in the Childspec), passing each worker their Id, Type and ExtProg. The Id just an atom that we generate from the Type and the worker-number (which is just determined by their launch-order) that you'll recall being passed to gen_server to register the worker's PID so that in debug listings we can see markdown3 instead of <0.15.3>
The supervisor itself is created by calling our local start_link/3, which calls supervisor:start_link/2 (which actually creates the supervisor, calls our init, etc). This supervisor will be called from formatter_supervisor, defined below, but you could call it done at this point and have a working implementation of an N-worker external program.
1> externalprogram_supervisor:start_link(my_server,5,"/usr/local/bin/external_program.pl").
<0.1976.0>
2> pg2:get_members(my_server).
[<0.1981.0>,<0.1980.0>,<0.1979.0>,<0.1978.0>,<0.1977.0>]
3> gen_server:call(pg2:get_closest_pid(my_server),{filter,<<"This is some random text!">>}).
<<"This is some random text!">>
formatter_supervisor.erl
Our original goal was to have N workers running for two external programs. So far all of the code we've written has been generic to the external program, but now we're going to get into (very slightly) more specific stuff. The formatter_supervisor will be a supervisor that will launch and watch two supervisors, one for Markdown and one for our santiser. They'll both run the same externalprogram_supervisor code, they'll just be launched with different arguments.
1 -module(formatter_supervisor).
2 -behavior(supervisor).
3
4 % External exports
5 -export([start_link/0]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link() ->
11 supervisor:start_link(?MODULE, '_').
12
13 % formatters are assumed to create pg2s registered under their
14 % type-names (at the moment, markdown and sanitiser)
15 init(_Args) ->
16 {ok,
17 {{one_for_one,
18 3,
19 10},
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
That looks almost the same as our externalprogram_supervisor, except that the ChildSpecs returned by init are different. We don't have a local version of start_worker_link, we just call externalprogram_supervisor:create_link (since there's nothing we need to do with it after launching it). The ChildSpec list looks like this:
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
Nothing too special there, except that we pass in the Type atom to externalprogram_supervisor:start_link (markdown or sanitiser), which becomes the name of the pg2 group. The shutdown timeout is set to infinity for supervisors of supervisors (as recommended by the Erlang docs) to give the indirect children all time to shut down, and we've indicated that the worker type is supervisor, so that Erlang knows that we're building a supervision tree. Of course, the arguments to externalprogram_supervisor:start_link assume that myapp_config:markdown_workers/0 and friends have been defined, but you could just replace those with literal values, too. myapp_config:markdown_workers/0 is assumed to return an integer (in my case, five), which is how many copies of the program you want to launch, and myapp_config:markdown_program/0 returns the full path to the program to launch (like "/usr/local/myapp/lib/markdown.pl"). formatter_supervisor is the supervisor that will be started from my application's main supervisor.
format.erl
The actual communication with the programs should be pretty easy now, we just ask external_program to do its magic. Here it is:
1 -module(format).
2
3 % formatters are defined and launched by the formatter_supervisor
4
5 -export([markdown/1,
6 sanitise/1]).
7
8 markdown(Data) ->
9 external_program:filter(markdown,Data).
10
11 sanitise(Data) ->
12 external_program:filter(sanitiser,Data).
It should work with lists or a binaries. Add a use Text::Markdown; to the top of our Perl module module (at the beginning of this entry) and replace its process function with:
10 sub process {
11 return Text::Markdown::markdown($_[0]);
12 }
Launch our formatter_supervisor with formatter_supervisor:start_link() and now we have a working Markdown implementation:
(name@node)106> format:markdown("I *really* like cookies").
<<"<p>I <em>really<em> like cookies<p>\n">>
Supervision Trees and Ports in Erlang
08:31PM Sep 03, 2007 in category Erlang by David King
Erlang provides a whole lot of infrastructure for doing tasks commonly associated with building giant fault-tolerant systems. But what about when you have to talk to an external non-Erlang program? Can we make our communication to that program fault-tolerant using only native Erlang/OTP components?
The problem
I'm writing a blogging engine using Erlyweb and Mnesia, and one of my tasks is to pass the user's blog entry through a formatter. I use two: one for Markdown, and one to sanitise for XSS attacks. Since there's no Markdown or HTML-sanitiser implementation for Erlang, we need to communicate with the Perl implementations (markdown, HTML-TagFilter).
We'll want some fault-tolerance (in case one of our external programs crash, we want to re-launch it), and we don't want to introduce a bottleneck by serialising access to our external programs, so we'll launch several copies of it to spread the load and allow some concurrency. In addition, we'll want to leave room to move all of the processing to its own node in the event that our blog gets very popular.
We'll communicate using an Erlang Port. A Port is a connection to an external program, implemented using Unix pipes. So our external program just has to know how to read/write to stdout/stdin. We're going to keep this all pretty generic to the actual external program. You should be able to do the same for any other program that can take data in length-encoded packets (or can be wrapped in one that does, like our Perl wrapper) for which you want to launch mutliple workers.
To launch our workers, we'll use Erlang's gen_server interface, which is a way to write a generic server without having to write the implementation details of the server, keeping state, message-handling and responding, etc. Instead, you write a gen_server with a set of callbacks, and all you write is the code to actually handle the messages that you want to respond to. Bascially, gen_server is a module that can take another module with appropriate callbacks and handle all of the tasks of making it a server that looks more or less like (psuedo-code)
loop(State) ->
Message=receive Some_Message
if Message is an internal message, like shutdown or startup or code-replacement
let gen_server handle it
loop(State)
else it's a message that the programmer wants to respond to
pass it to the programmer, let them handle it, get new state
loop(NewState)
Our gen_server will have to keep track of its connection to our external program, and on request it will have to send data to the external program and retrieve the result. It will also keep track of its process group (pg2) and adding itself to it
Erlang does something similar for supervisors. A supervisor is a process whose entire job is to watch another process or set of processes. In our case, we just want it to make sure that our external processes are up and running, and to re-launch them in case they crash, and to bring up all of our workers at once so that we don't have to launch them individually. We'll have one supervisor for Markdown which will watch all of our Markdown gen_servers, and another for all of our sanitiser gen_servers. To make managing them easier, we'll use another supervisor that will watch those two supervisors, re-launching them if they crash, and enabling us to bring both up, and all of the associated gen_servers, at once. So the plan is to have a formatter supervisor that will launch a markdown supervisor and a sanitiser supervisor, each of which will launch N workers, like this:
I'll show you working code, and try to explain the fuzzy bits, but I encourage you to research the functions that compose it in Erlang's own docs so that you really know what is going on.
external_program.pl
Since all of this relies on an actual external program, here's a shell to use for a Perl program:
1 #!/usr/bin/env perl -w
2
3 # turn on auto-flushing
4 $cfh = select (STDOUT);
5 $| = 1;
6 select ($cfh);
7
8 sub process {
9 return $_[0];
10 }
11
12 until(eof(STDIN)) {
13 my ($length_bytes,$length,$data,$return);
14
15 read STDIN,$length_bytes,4; # four-byte length-header
16 $length=unpack("N",$length_bytes);
17
18 read STDIN,$data,$length;
19
20 $return=&process($data);
21
22 print STDOUT pack("N",length($return));
23 print STDOUT $return;
24 }
It communicates with Erlang via stdin/stdout, and handles packets with 4-byte length headers (with network byte-order, since that's how Erlang sends it). That is just a stub program that returns the data coming in to it, but you can replace process with whatever you want to process/format the data. I have two versions of this, one that applies markdown, and one that sanitises input with HTML-TagFilter.
external_program.erl
Now the for the Erlang bits. First, we'll construct the gen_server to actually communicate with the Perl program. We'll communicate using a Port (which uses a unix pipe). We want to send data in atomic packets so that the Perl program knows when we're done sending data and so that we know when the Perl program is done responding. To do this, we'll send and receive data in packets of up to 232 bytes. That is, we'll send our Perl program a four-byte header (in network byte-order) indicating how much data is to be sent, then we'll send the data, and Perl will send back a four-byte header indicating how much data it is about to send, and then it will send that data. Erlang will handle the packets and headers on its side with the {packet,4} option to open_port/2, but we'll have to handle the packetising ourselves from the Perl side (already done in the above Perl program). This one is pretty generic, so you could use it to communicate with any outside program that can send data in packets, not just a formatter. Here's the code:
1 -module(external_program).
2 -behaviour(gen_server).
3
4 % API
5 -export([start_link/3,
6 filter/2]).
7
8 % gen_server callbacks
9 -export([init/1,
10 handle_call/3,
11 handle_cast/2,
12 handle_info/2,
13 code_change/3,
14 terminate/2]).
15
16 -record(state,{port}).
17
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
20
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
31
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
38
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
44
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
And here's the blow-by-blow. Skip ahead to externalprogram_supervisor.erl if you understand it. First, we start our module with
2 -behaviour(gen_server)
This tells the compiler to warn us if we forget any functions for call-back that gen_server is expecting to find. I won't introduce the exports here, I'll introduce the functions we come to them.
We use a record to encode our state, even though the only item in our state is the connection to the external program
16 -record(state,{port}).
We do this in case we later decide to add anything to the state, it will save us from modifying all of the function headers that refer to the state. Remember that our entire state between calls must go into this record, so if we want to later add information like the time the program was launched (for statistical tracking), or the system user that launched it, this is where it goes.
start_link/3 is how we'll start our gen_server. It's the function that we call from another module (our supervisor), not one called as a gen_server callback, so it can look however we want, but our supervisor is expecting it to return {ok,Pid}.
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
It calls gen_server:start_link/4, which takes:
* {local,Id}: This is how gen_server will register the PID of the server. local means that it will only be registered on the local node, and Id is an atom passed to us by our supervisor that will appear in process listings. We don't use this anywhere, it's just nice to have for viewing in appmon and other tools, and it's optional (gen_server:start_link/3 takes only the other three arguments)
* the call-back module (i.e. this module)
* the arguments to the gen_server's init/1 function (which we'll get to)
* any special options to gen_server (we don't use any)
It returns {ok,Pid}, where Pid is the Pid of the launched process. While we used gen_server:start_link/4, which takes registered name for the gen_server, we don't use it to address the gen_server. Instead, we're going to use the pg2 group name that we set in init/1 below. This is because we're going to launch several copies of this gen_server, so communicating directly with it isn't practical.
handle_info/2 is a callback function that is called whenever our server receives a message that gen_server doesn't understand.
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
The only message that we handle is the Port indicating that it has shut down (for instance, if it crashes). In this case, there's nothing that we can do anymore (since our whole reason for being is to communicate with this Port), so we signal to our caller that we intend to shut down, and gen_server will shut us down, which includes calling our terminate/2 function and do other shutdown tasks.
terminate/2 is a callback function that gen_server calls whenever it is shutting us down for any reason so that we can free up any resources that we have (like our Port). It passes us the Reason for shutdown and the last State of the server
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
We have two clauses to handle a shutdown. If we are shutting down because our port terminated (because handle_info said so), then we just let gen_server finish shutting us down. If we shut down for any other reason, then we close the Port before we die so that it's not left open forever. terminate's return value is ignored.
handle_cast/2 handles asynchronous messages to our server
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
gen_servers can handle two types of in-band messages: synchronous messages generated by calls to gen_server:call/2, and asynchronous messages generated by gen_server:cast/2. (gen_server handles the difference.) Since we're only handling synchronous messages, we just don't return a reply if we receive any asynchronous messages, but if you really wanted you could make a synchronous call and send it as a response
code_change/3 is called when our code is being dynamically replaced
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
Erlang supports replacing code while the server is running, with no downtime. In the event that the code is replaced, a gen_server temporarily stops responding to messages (enqueueing them as they come in), waits for the code to be replaced, calls the call-back module's new code_change function, picks up the new state, and starts running again. This could be used if we were to upgrade our code to one that required a new member of the state record, for instance, to upgrade the running state to the state that the new version uses. (If the function returns anything but {ok,NewState}, the gen_server crashes, which would be fine in our case since our supervisor would just restart it under the new code anyway.) Unless we plan to convert the state of the server, which for now we don't, we just return {ok,State}
init/1 is called by gen_server just before the server-loop is run.
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
Its argument comes from gen_server:start_link, which we called in our own start_link/3 function, and is our Type (which becomes our pg2 group name) and the path to the external program that we want to launch (the Perl programs mentioned above). We first indicate that we want to trap exits, so that we can receive 'EXIT' messages from our Port in case it dies. These messages are not recognised by gen_server, so they get passed to our handle_info/2 callback function. Then we create our pg2 group (this operation is idempotent, which means that doing it several times has the same effect as doing it once), and join this process to the group. Then we open up the external program itself using open_port. It takes a tuple of {Type,Path}, which we use to indicate that we are launching an external program (the spawn type) and where to find it, and a list of options, which we use to tell it that we want to communicate using binaries instead of lists, and that we want it to handle communication to our Port with packets with a length-header of four bytes. Then we tell gen_server that all is well and to start the server with a state record containing a reference to our Port.
handle_call/2 is called whenever we receive a synchronous request from an outside caller, this is the actual guts of our server.
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
We just receive the command (which is a {filter,Data} tuple; it doesn't have to be, but that's how we're going to call it later), send it to the Port, get the reply from the Port, and tell gen_server to reply that back to the caller. We didn't have to write any of the server code, just the implementation of our code.
Finally, here's how we actually communicate with our server after it's launched:
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
We make a synchronous call to the "closest" Pid that is a member of the pg2 group that our programs join when launching. ("Closest" just means that pg2 will first look on the local node before looking on other nodes.) pg2:get_closest_pid randomises which worker it returns within a group, so this should spread the load if we get many simultaneous requests. This could be expanded to try again in the event of a timeout (assuming that the timed-out server will crash and be re-launched, or that pg2:get_closest_pid will return a different Pid next time)
At this point you should be able to create and call a gen_server
externalprogram_supervisor.erl
Now for the supervisor. Again, we'll keep this generic. It handles multiple external_program workers, launching NumWorkers (passed to start_link/3). We'll use the (aptly named) supervisor behaviour. supervisor launches a group of processes, watches them, and re-launches them if they die. It also provides a way to bring up or down a group of processes all at once, just by launching or shutting down the supervisor. I'm going to explain the functions of order for clarity.
1 -module(externalprogram_supervisor).
2 -behavior(supervisor).
3
4 % API
5 -export([start_link/3]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link(Type,NumWorkers,ExtProg) ->
11 supervisor:start_link({local,
12 list_to_atom(atom_to_list(Type) ++ "_supervisor")},
13 ?MODULE,
14 {Type,NumWorkers,ExtProg}).
15
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
33
Again, -behaviour(supervisor). tells the compiler to warn us if we forget any callback functions required by supervisor.
supervisor only has one callback, init/1:
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
It's only called once when the supervisor is created (or re-started after crashing). It gets its arguments from supervisor:start_link, which we call from our local start_link (that we'll explain in a moment). init is expected to return a restart strategy and a list of Childspecs, where a Childspec looks like (straight from the Erlang supervisor documentation):
{Id,StartFunc,Restart,Shutdown,Type,Modules},
Id = term()
StartFunc = {M,F,A}
M = F = atom()
A = [term()]
Restart = permanent | transient | temporary
Shutdown = brutal_kill | int()>=0 | infinity
Type = worker | supervisor
Modules = [Module] | dynamic
Module = atom()
You can read up on what all of that means in the supervisor documentation, but we return a list of them consisting of NumWorkers items (see the list-comprehension) such that each is launched by calling external_program:start_link/3 (specified by the StartFunc in the Childspec), passing each worker their Id, Type and ExtProg. The Id just an atom that we generate from the Type and the worker-number (which is just determined by their launch-order) that you'll recall being passed to gen_server to register the worker's PID so that in debug listings we can see markdown3 instead of <0.15.3>
The supervisor itself is created by calling our local start_link/3, which calls supervisor:start_link/2 (which actually creates the supervisor, calls our init, etc). This supervisor will be called from formatter_supervisor, defined below, but you could call it done at this point and have a working implementation of an N-worker external program.
1> externalprogram_supervisor:start_link(my_server,5,"/usr/local/bin/external_program.pl").
<0.1976.0>
2> pg2:get_members(my_server).
[<0.1981.0>,<0.1980.0>,<0.1979.0>,<0.1978.0>,<0.1977.0>]
3> gen_server:call(pg2:get_closest_pid(my_server),{filter,<<"This is some random text!">>}).
<<"This is some random text!">>
formatter_supervisor.erl
Our original goal was to have N workers running for two external programs. So far all of the code we've written has been generic to the external program, but now we're going to get into (very slightly) more specific stuff. The formatter_supervisor will be a supervisor that will launch and watch two supervisors, one for Markdown and one for our santiser. They'll both run the same externalprogram_supervisor code, they'll just be launched with different arguments.
1 -module(formatter_supervisor).
2 -behavior(supervisor).
3
4 % External exports
5 -export([start_link/0]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link() ->
11 supervisor:start_link(?MODULE, '_').
12
13 % formatters are assumed to create pg2s registered under their
14 % type-names (at the moment, markdown and sanitiser)
15 init(_Args) ->
16 {ok,
17 {{one_for_one,
18 3,
19 10},
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
That looks almost the same as our externalprogram_supervisor, except that the ChildSpecs returned by init are different. We don't have a local version of start_worker_link, we just call externalprogram_supervisor:create_link (since there's nothing we need to do with it after launching it). The ChildSpec list looks like this:
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
Nothing too special there, except that we pass in the Type atom to externalprogram_supervisor:start_link (markdown or sanitiser), which becomes the name of the pg2 group. The shutdown timeout is set to infinity for supervisors of supervisors (as recommended by the Erlang docs) to give the indirect children all time to shut down, and we've indicated that the worker type is supervisor, so that Erlang knows that we're building a supervision tree. Of course, the arguments to externalprogram_supervisor:start_link assume that myapp_config:markdown_workers/0 and friends have been defined, but you could just replace those with literal values, too. myapp_config:markdown_workers/0 is assumed to return an integer (in my case, five), which is how many copies of the program you want to launch, and myapp_config:markdown_program/0 returns the full path to the program to launch (like "/usr/local/myapp/lib/markdown.pl"). formatter_supervisor is the supervisor that will be started from my application's main supervisor.
format.erl
The actual communication with the programs should be pretty easy now, we just ask external_program to do its magic. Here it is:
1 -module(format).
2
3 % formatters are defined and launched by the formatter_supervisor
4
5 -export([markdown/1,
6 sanitise/1]).
7
8 markdown(Data) ->
9 external_program:filter(markdown,Data).
10
11 sanitise(Data) ->
12 external_program:filter(sanitiser,Data).
It should work with lists or a binaries. Add a use Text::Markdown; to the top of our Perl module module (at the beginning of this entry) and replace its process function with:
10 sub process {
11 return Text::Markdown::markdown($_[0]);
12 }
Launch our formatter_supervisor with formatter_supervisor:start_link() and now we have a working Markdown implementation:
(name@node)106> format:markdown("I *really* like cookies").
<<"<p>I <em>really<em> like cookies<p>\n">>
发表评论
-
OTP R14A今天发布了
2010-06-17 14:36 2700以下是这次发布的亮点,没有太大的性能改进, 主要是修理了很多B ... -
R14A实现了EEP31,添加了binary模块
2010-05-21 15:15 3053Erlang的binary数据结构非常强大,而且偏向底层,在作 ... -
如何查看节点的可用句柄数目和已用句柄数
2010-04-08 03:31 4829很多同学在使用erlang的过程中, 碰到了很奇怪的问题, 后 ... -
获取Erlang系统信息的代码片段
2010-04-06 21:49 3490从lib/megaco/src/tcp/megaco_tcp_ ... -
iolist跟list有什么区别?
2010-04-06 20:30 6548看到erlang-china.org上有个 ... -
erlang:send_after和erlang:start_timer的使用解释
2010-04-06 18:31 8412前段时间arksea 同学提出这个问题, 因为文档里面写的很不 ... -
Latest news from the Erlang/OTP team at Ericsson 2010
2010-04-05 19:23 2024参考Talk http://www.erlang-factor ... -
对try 异常 运行的疑问,为什么出现两种结果
2010-04-05 19:22 2858郎咸武<langxianzhe@163.com> ... -
Erlang ERTS Async基础设施
2010-03-19 00:03 2538其实Erts的Async做的很不错的, 相当的完备, 性能又高 ... -
CloudI 0.0.9 Released, A Cloud as an Interface
2010-03-09 22:32 2489基于Erlang的云平台 看了下代码 质量还是不错的 完成了不 ... -
Memory matters - even in Erlang (再次说明了了解内存如何工作的必要性)
2010-03-09 20:26 3466原文地址:http://www.lshift.net/blog ... -
Some simple examples of using Erlang’s XPath implementation
2010-03-08 23:30 2061原文地址 http://www.lshift.net/blog ... -
lcnt 环境搭建
2010-02-26 16:19 2626抄书:otp_doc_html_R13B04/lib/tool ... -
Erlang强大的代码重构工具 tidier
2010-02-25 16:22 2493Jan 29, 2010 We are very happy ... -
[Feb 24 2010] Erlang/OTP R13B04 has been released
2010-02-25 00:31 1398Erlang/OTP R13B04 has been rele ... -
R13B04 Installation
2010-01-28 10:28 1409R13B04后erlang的源码编译为了考虑移植性,就改变了编 ... -
Running tests
2010-01-19 14:51 1502R13B03以后 OTP的模块加入了大量的测试模块,这些模块都 ... -
R13B04在细化Binary heap
2010-01-14 15:11 1515从github otp的更新日志可以清楚的看到otp R13B ... -
R13B03 binary vheap有助减少binary内存压力
2009-11-29 16:07 1673R13B03 binary vheap有助减少binary内存 ... -
erl_nif 扩展erlang的另外一种方法
2009-11-26 01:02 3238我们知道扩展erl有2种方法, driver和port. 这2 ...
相关推荐
Port Driver 提供了一种方式,使得 Erlang 系统能够与外部世界交互,执行低级别的I/O操作或调用非Erlang代码实现的高效算法。 标题 "erlang port driver test" 指的是一个测试项目,这个项目专注于验证和测试 ...
在Erlang编程环境中,与MySQL数据库进行交互通常需要依赖特定的驱动接口。本文将深入探讨Erlang的MySQL驱动接口,包括其设计原理、使用方法和常见操作,以帮助开发者更好地理解和应用这一技术。 ### 1. Erlang与...
1. **Erlang MySQL 驱动**:Erlang MySQL 驱动是连接 MySQL 数据库的关键组件,它实现了 Erlang 与 MySQL 之间的通信协议。例如,`mysql_client` 或 `emysql` 是两个流行的 Erlang MySQL 驱动。这些驱动提供了 API,...
Windows下Erlang与C构建的节点通讯完整例子,包含cnode工程项目,erlang引用例子。 配套文章:http://blog.csdn.net/mycwq/article/details/40836273
基于Erlang的即时通讯系统的设计与实现
在Erlang中,与MySQL对接的关键在于选择合适的驱动程序,文件名为"erlang-mysql-driver"很可能是指一个Erlang的MySQL驱动库。这样的驱动库允许Erlang应用程序通过标准的数据库接口与MySQL进行通信。在Erlang中,这种...
在Erlang中,与C语言程序进行通信通常是通过端口(Port)机制来实现的。端口是Erlang提供的一种特殊机制,用于实现与外部程序的交互。下面详细介绍端口机制的工作原理: 1. **端口的创建** ```erlang YourPort = ...
openpoker源码 erlang写的网游服务器源码 OpenPoker是一个大型多人扑克网游,内建支持了容错能力,负载平衡和无限制的规模大小。
标题中的“用Erlang写了个解八数码的小程序”指的是使用Erlang编程语言实现的一个解决8数码问题(也称为滑动拼图)的算法。8数码问题是一个经典的计算机科学问题,它涉及到在一个3x3的网格上,通过空格与其他数字...
**Erlang编程:Introducing Erlang** Erlang是一种函数式编程语言,由爱立信在1986年开发,主要用于构建高可用性、容错性和并发性的分布式系统。"Introducing Erlang"是Simon St. Laurent撰写的一本入门级教程,...
Erlang是一种面向并发的、基于actor模型的编程语言,由瑞典电信设备制造商Ericsson开发,主要用于构建高可用性、容错性和可扩展性的分布式系统。在Web服务器领域,Erlang由于其天然的并发处理能力和强大的错误恢复...
Erlang支持编写外部接口程序(port drivers),它们可以用C语言等外部语言实现,以与外部世界通信。 #### 14. 系统监控和管理 Erlang的监控机制允许进程监控其它进程的状态,以便系统能够在进程失败时进行相应的...
标签中提到了“Erlang Rabbit 异步通讯”,这意味着讨论的重点是Erlang如何与RabbitMQ结合实现异步通信。RabbitMQ是一个开源的消息代理,它遵循Advanced Message Queuing Protocol (AMQP)标准,提供了一种可靠、高效...
标题中的"C#与Erlang的群通信"指的是在编程领域中,使用C#和Erlang两种不同的编程语言进行集群或分布式系统间的通信。这两种语言各有特点,C#是微软开发的面向对象的编程语言,常用于Windows平台的开发,而Erlang则...
总结,"erlang与delphi多客户端通讯"这一主题涉及到Erlang和Delphi的网络编程,Socket通信,跨语言交互,以及特定端口(843)的使用。在实际项目中,开发者需要综合运用这些技术来构建稳定、高效且安全的多客户端...
**Erlang程序设计与入门** Erlang是一种并发、函数式编程语言,主要用于构建分布式、高可用性、容错性强的系统。它的设计灵感来源于电信行业的需求,由瑞典爱立信公司于1986年开发。Erlang以其独特的并发模型、轻量...
- **概念**: Port驱动程序允许Erlang进程与外部程序进行通信。 - **实现**: 通过`erlang:open_port/2`函数创建一个Port。 - **应用场景**: 实现Erlang与C语言编写的程序之间的交互。 #### 13. SMP支持 - **概念**: ...
标题中的"C#版ErlangOtp跨平台通信框架(Java版的转译)"指的是一个用C#语言实现的框架,其目标是提供与Erlang OTP系统进行跨平台通信的能力。Erlang OTP(开放电信平台)是Erlang编程语言的一个核心组件,它包含了一...
Erlang是一门专为构建并发、容错的分布式系统而设计的编程语言,最早由瑞典电信设备公司爱立信所开发。其关键特性包括轻量级进程、消息传递、容错机制以及一个用于构建可靠系统的函数库和中间件,统称为Open Telecom...