- 浏览: 982233 次
- 性别:
- 来自: 广州
最新评论
-
qingchuwudi:
有用,非常感谢!
erlang进程的优先级 -
zfjdiamond:
你好 这条命令 在那里输入??
你们有yum 我有LuaRocks -
simsunny22:
这个是在linux下运行的吧,在window下怎么运行escr ...
escript的高级特性 -
mozhenghua:
http://www.erlang.org/doc/apps/ ...
mnesia 分布协调的几个细节 -
fxltsbl:
A new record of 108000 HTTP req ...
Haproxy 1.4-dev2: barrier of 100k HTTP req/s crossed
当你net_adm:ping(Node)的时候发生了什么? 这个涉及到很复杂的流程。让我为你解刨:
这个流程很长而且在erlang代码和c代码里面窜来窜去,重要的点 我用红字标注 请各位耐心。
1. net_adm.erl:
ping(Node) when is_atom(Node) ->
case catch gen:call({net_kernel, Node},
'$gen_call',
{is_auth, node()},
infinity) of
{ok, yes} -> pong;
_ ->
erlang:disconnect_node(Node),
pang
end.
2. gen.erl:
%% Remote by name
call({_Name, Node}=Process, Label, Request, Timeout)
when is_atom(Node), Timeout =:= infinity;
is_atom(Node), is_integer(Timeout), Timeout >= 0 ->
if
node() =:= nonode@nohost ->
exit({nodedown, Node});
true ->
do_call(Process, Label, Request, Timeout)
end.
do_call(Process, Label, Request, Timeout) ->
%% We trust the arguments to be correct, i.e
%% Process is either a local or remote pid,
%% or a {Name, Node} tuple (of atoms) and in this
%% case this node (node()) _is_ distributed and Node =/= node().
Node = case Process of
{_S, N} ->
N;
_ when is_pid(Process) ->
node(Process);
_ ->
node()
end,
case catch erlang:monitor(process, Process) of
Mref when is_reference(Mref) ->
receive
{'DOWN', Mref, _, Pid1, noconnection} when is_pid(Pid1) ->
exit({nodedown, node(Pid1)});
{'DOWN', Mref, _, _, noconnection} ->
exit({nodedown, Node});
{'DOWN', Mref, _, _, _} ->
exit(noproc)
after 0 ->
Process ! {Label, {self(), Mref}, Request},
wait_resp_mon(Process, Mref, Timeout)
end;
{'EXIT', _} ->
%% Old node is not supporting the monitor.
%% The other possible case -- this node is not distributed
%% -- should have been handled earlier.
%% Do the best possible with monitor_node/2.
%% This code may hang indefinitely if the Process
%% does not exist. It is only used for old remote nodes.
monitor_node(Node, true),
receive
{nodedown, Node} ->
monitor_node(Node, false),
exit({nodedown, Node})
after 0 ->
Mref = make_ref(),
Process ! {Label, {self(),Mref}, Request},
Res = wait_resp(Node, Mref, Timeout),
monitor_node(Node, false),
Res
end
end.
3 . Process ! {Label, {self(),Mref}, Request}, 相当于erlang:send(Process, {Label, {self(),Mref}, Request});
4. bif.tab
bif 'erl.lang.proc':send/2 ebif_send_2 send_2 bif erlang:send/2
5. bif.c
Eterm send_2(Process *p, Eterm to, Eterm msg) {
Sint result = do_send(p, to, msg, !0);
if (result > 0) {
BUMP_REDS(p, result);
BIF_RET(msg);
} else switch (result) {
case 0:
BIF_RET(msg);
break;
case SEND_TRAP:
BIF_TRAP2(dsend2_trap, p, to, msg);
break;
case SEND_RESCHEDULE:
BIF_ERROR(p, RESCHEDULE);
break;
case SEND_BADARG:
BIF_ERROR(p, BADARG);
break;
case SEND_USER_ERROR:
BIF_ERROR(p, EXC_ERROR);
break;
default:
ASSERT(! "Illegal send result");
break;
}
ASSERT(! "Can not arrive here");
BIF_ERROR(p, BADARG);
}
6. bif.c
Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend) {
Eterm portid;
Port *pt;
Process* rp;
DistEntry *dep;
Eterm* tp;
if (is_internal_pid(to)) {
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (internal_pid_index(to) >= erts_max_processes)
return SEND_BADARG;
rp = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN, to, ERTS_PROC_LOCKS_MSG_SEND);
if (!rp) {
ERTS_SMP_ASSERT_IS_NOT_EXITING(p);
return 0;
}
} else if (is_external_pid(to)) {
Sint res;
dep = external_pid_dist_entry(to);
if(dep == erts_this_dist_entry) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Discarding message %T from %T to %T in an old "
"incarnation (%d) of this node (%d)\n",
msg,
p->id,
to,
external_pid_creation(to),
erts_this_node->creation);
erts_send_error_to_logger(p->group_leader, dsbufp);
return 0;
}
erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
/* Send to remote process */
if (is_nil(dep->cid))
res = SEND_TRAP;
else if (dist_send(p, ERTS_PROC_LOCK_MAIN, dep, to, msg) == 1) {
if (is_internal_port(dep->cid)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
if (erts_system_monitor_flags.busy_dist_port) {
monitor_generic(p, am_busy_dist_port, dep->cid);
}
}
res = SEND_RESCHEDULE;
}
else {
res = SEND_TRAP;
}
}
else {
res = 50;
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
}
erts_dist_op_finalize(dep);
return res;
} else if (is_atom(to)) {
erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
to,
&rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
&pt);
if (pt) {
portid = pt->id;
goto port_common;
}
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (!rp) {
return SEND_BADARG;
}
} else if (is_external_port(to)
&& (external_port_dist_entry(to)
== erts_this_dist_entry)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Discarding message %T from %T to %T in an old "
"incarnation (%d) of this node (%d)\n",
msg,
p->id,
to,
external_port_creation(to),
erts_this_node->creation);
erts_send_error_to_logger(p->group_leader, dsbufp);
return 0;
} else if (is_internal_port(to)) {
portid = to;
pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN);
port_common:
ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt));
/* XXX let port_command handle the busy stuff !!! */
if (pt && (pt->status & ERTS_PORT_S_PORT_BUSY)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
if (erts_system_monitor_flags.busy_port) {
monitor_generic(p, am_busy_port, portid);
}
}
erts_port_release(pt);
return SEND_RESCHEDULE;
}
if (IS_TRACED(p)) /* trace once only !! */
trace_send(p, portid, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (SEQ_TRACE_TOKEN(p) != NIL) {
seq_trace_update_send(p);
seq_trace_output(SEQ_TRACE_TOKEN(p), msg,
SEQ_TRACE_SEND, portid, p);
}
/* XXX NO GC in port command */
erts_port_command(p, p->id, pt, msg);
if (pt)
erts_port_release(pt);
if (ERTS_PROC_IS_EXITING(p)) {
KILL_CATCHES(p); /* Must exit */
return SEND_USER_ERROR;
}
return 0;
} else if (is_tuple(to)) { /* Remote send */
int ret;
tp = tuple_val(to);
if (*tp != make_arityval(2))
return SEND_BADARG;
if (is_not_atom(tp[1]) || is_not_atom(tp[2]))
return SEND_BADARG;
/* sysname_to_connected_dist_entry will return NULL if there
is no dist_entry or the dist_entry has no port*/
if ((dep = erts_sysname_to_connected_dist_entry(tp[2])) == NULL) {
return SEND_TRAP;
}
if (dep == erts_this_dist_entry) {
erts_deref_dist_entry(dep);
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
tp[1],
&rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
&pt);
if (pt) {
portid = pt->id;
goto port_common;
}
if (!rp) {
return 0;
}
goto send_message;
}
erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
if (is_nil(dep->cid))
ret = SEND_TRAP;
else if (dist_reg_send(p, ERTS_PROC_LOCK_MAIN, dep, tp[1], msg) == 1) {
if (is_internal_port(dep->cid)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
if (erts_system_monitor_flags.busy_dist_port) {
monitor_generic(p, am_busy_dist_port, dep->cid);
}
}
ret = SEND_RESCHEDULE;
}
else {
ret = SEND_TRAP;
}
}
else {
ret = 0;
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
}
erts_dist_op_finalize(dep);
erts_deref_dist_entry(dep);
return ret;
} else {
if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
return SEND_BADARG;
}
send_message: {
Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND;
Sint res;
#ifdef ERTS_SMP
if (p == rp)
rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
/* send to local process */
erts_send_message(p, rp, &rp_locks, msg, 0);
#ifdef ERTS_SMP
res = rp->msg_inq.len*4;
if (ERTS_PROC_LOCK_MAIN & rp_locks)
res += rp->msg.len*4;
#else
res = rp->msg.len*4;
#endif
erts_smp_proc_unlock(rp,
p == rp
? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
: rp_locks);
return res;
}
}
8. 如果能找到节点的话 就调用dist_send 否者发生SEND_TRAP
9. dist.c
void init_dist(void)
{
init_alive();
init_nodes_monitors();
no_caches = 0;
/* Lookup/Install all references to trap functions */
dsend2_trap = trap_function(am_dsend,2);
dsend3_trap = trap_function(am_dsend,3);
/* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
dlink_trap = trap_function(am_dlink,1);
dunlink_trap = trap_function(am_dunlink,1);
dmonitor_node_trap = trap_function(am_dmonitor_node,3);
dgroup_leader_trap = trap_function(am_dgroup_leader,2);
dexit_trap = trap_function(am_dexit, 2);
dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
}
static Export*
trap_function(Eterm func, int arity)
{
return erts_export_put(am_erlang, func, arity);
}
也就是说dsend2_trap 就是erlang:dsend这个函数
当send 失败 的时候 参考5 的 send_2 将执行 BIF_TRAP2(dsend2_trap, p, to, msg);
10。 bif.h
#define BIF_TRAP2(Trap_, p, A0, A1) do { \
(p)->arity = 2; \
(p)->def_arg_reg[0] = (A0); \
(p)->def_arg_reg[1] = (A1); \
(p)->def_arg_reg[3] = (Eterm) (Trap_); \
(p)->freason = TRAP; \
return THE_NON_VALUE; \
} while(0)
11. beam_emu.c
OpCase(call_bif2_e):
{
Eterm (*bf)(Process*, Eterm, Eterm, Uint*) = GET_BIF_ADDRESS(Arg(0));
Eterm result;
Eterm* next;
SWAPOUT;
c_p->fcalls = FCALLS - 1;
if (FCALLS <= 0) {
save_calls(c_p, (Export *) Arg(0));
}
PreFetch(1, next);
CHECK_TERM(r(0));
CHECK_TERM(x(1));
PROCESS_MAIN_CHK_LOCKS(c_p);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
result = (*bf)(c_p, r(0), x(1), I);
ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
PROCESS_MAIN_CHK_LOCKS(c_p);
ERTS_HOLE_CHECK(c_p);
POST_BIF_GC_SWAPIN(c_p, result);
FCALLS = c_p->fcalls;
if (is_value(result)) {
r(0) = result;
CHECK_TERM(r(0));
NextPF(1, next);
} else if (c_p->freason == RESCHEDULE) {
c_p->arity = 2;
goto suspend_bif;
} else if (c_p->freason == TRAP) {
goto call_bif_trap3;
}
call_bif_trap3:
SET_CP(c_p, I+2);
SET_I(((Export *)(c_p->def_arg_reg[3]))->address);
SWAPIN;
r(0) = c_p->def_arg_reg[0];
x(1) = c_p->def_arg_reg[1];
x(2) = c_p->def_arg_reg[2];
Dispatch();
也就是说这时候trap erlang:dsend函数被调用。
12. erlang.erl
dsend({Name, Node}, Msg, Opts) ->
case net_kernel:connect(Node) of
true -> erlang:send({Name,Node}, Msg, Opts);
false -> ok;
ignored -> ok % Not distributed.
end.
先链接对方节点 成功 发数据包 转8 实际上是调用dist_send.
整个流程完毕。
这里面trap技术用的很巧妙。
net_kernel:connect 也很复杂 另文分析,请期待。
这个流程很长而且在erlang代码和c代码里面窜来窜去,重要的点 我用红字标注 请各位耐心。
1. net_adm.erl:
ping(Node) when is_atom(Node) ->
case catch gen:call({net_kernel, Node},
'$gen_call',
{is_auth, node()},
infinity) of
{ok, yes} -> pong;
_ ->
erlang:disconnect_node(Node),
pang
end.
2. gen.erl:
%% Remote by name
call({_Name, Node}=Process, Label, Request, Timeout)
when is_atom(Node), Timeout =:= infinity;
is_atom(Node), is_integer(Timeout), Timeout >= 0 ->
if
node() =:= nonode@nohost ->
exit({nodedown, Node});
true ->
do_call(Process, Label, Request, Timeout)
end.
do_call(Process, Label, Request, Timeout) ->
%% We trust the arguments to be correct, i.e
%% Process is either a local or remote pid,
%% or a {Name, Node} tuple (of atoms) and in this
%% case this node (node()) _is_ distributed and Node =/= node().
Node = case Process of
{_S, N} ->
N;
_ when is_pid(Process) ->
node(Process);
_ ->
node()
end,
case catch erlang:monitor(process, Process) of
Mref when is_reference(Mref) ->
receive
{'DOWN', Mref, _, Pid1, noconnection} when is_pid(Pid1) ->
exit({nodedown, node(Pid1)});
{'DOWN', Mref, _, _, noconnection} ->
exit({nodedown, Node});
{'DOWN', Mref, _, _, _} ->
exit(noproc)
after 0 ->
Process ! {Label, {self(), Mref}, Request},
wait_resp_mon(Process, Mref, Timeout)
end;
{'EXIT', _} ->
%% Old node is not supporting the monitor.
%% The other possible case -- this node is not distributed
%% -- should have been handled earlier.
%% Do the best possible with monitor_node/2.
%% This code may hang indefinitely if the Process
%% does not exist. It is only used for old remote nodes.
monitor_node(Node, true),
receive
{nodedown, Node} ->
monitor_node(Node, false),
exit({nodedown, Node})
after 0 ->
Mref = make_ref(),
Process ! {Label, {self(),Mref}, Request},
Res = wait_resp(Node, Mref, Timeout),
monitor_node(Node, false),
Res
end
end.
3 . Process ! {Label, {self(),Mref}, Request}, 相当于erlang:send(Process, {Label, {self(),Mref}, Request});
4. bif.tab
bif 'erl.lang.proc':send/2 ebif_send_2 send_2 bif erlang:send/2
5. bif.c
Eterm send_2(Process *p, Eterm to, Eterm msg) {
Sint result = do_send(p, to, msg, !0);
if (result > 0) {
BUMP_REDS(p, result);
BIF_RET(msg);
} else switch (result) {
case 0:
BIF_RET(msg);
break;
case SEND_TRAP:
BIF_TRAP2(dsend2_trap, p, to, msg);
break;
case SEND_RESCHEDULE:
BIF_ERROR(p, RESCHEDULE);
break;
case SEND_BADARG:
BIF_ERROR(p, BADARG);
break;
case SEND_USER_ERROR:
BIF_ERROR(p, EXC_ERROR);
break;
default:
ASSERT(! "Illegal send result");
break;
}
ASSERT(! "Can not arrive here");
BIF_ERROR(p, BADARG);
}
6. bif.c
Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend) {
Eterm portid;
Port *pt;
Process* rp;
DistEntry *dep;
Eterm* tp;
if (is_internal_pid(to)) {
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (internal_pid_index(to) >= erts_max_processes)
return SEND_BADARG;
rp = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN, to, ERTS_PROC_LOCKS_MSG_SEND);
if (!rp) {
ERTS_SMP_ASSERT_IS_NOT_EXITING(p);
return 0;
}
} else if (is_external_pid(to)) {
Sint res;
dep = external_pid_dist_entry(to);
if(dep == erts_this_dist_entry) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Discarding message %T from %T to %T in an old "
"incarnation (%d) of this node (%d)\n",
msg,
p->id,
to,
external_pid_creation(to),
erts_this_node->creation);
erts_send_error_to_logger(p->group_leader, dsbufp);
return 0;
}
erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
/* Send to remote process */
if (is_nil(dep->cid))
res = SEND_TRAP;
else if (dist_send(p, ERTS_PROC_LOCK_MAIN, dep, to, msg) == 1) {
if (is_internal_port(dep->cid)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
if (erts_system_monitor_flags.busy_dist_port) {
monitor_generic(p, am_busy_dist_port, dep->cid);
}
}
res = SEND_RESCHEDULE;
}
else {
res = SEND_TRAP;
}
}
else {
res = 50;
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
}
erts_dist_op_finalize(dep);
return res;
} else if (is_atom(to)) {
erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
to,
&rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
&pt);
if (pt) {
portid = pt->id;
goto port_common;
}
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (!rp) {
return SEND_BADARG;
}
} else if (is_external_port(to)
&& (external_port_dist_entry(to)
== erts_this_dist_entry)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"Discarding message %T from %T to %T in an old "
"incarnation (%d) of this node (%d)\n",
msg,
p->id,
to,
external_port_creation(to),
erts_this_node->creation);
erts_send_error_to_logger(p->group_leader, dsbufp);
return 0;
} else if (is_internal_port(to)) {
portid = to;
pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN);
port_common:
ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt));
/* XXX let port_command handle the busy stuff !!! */
if (pt && (pt->status & ERTS_PORT_S_PORT_BUSY)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
if (erts_system_monitor_flags.busy_port) {
monitor_generic(p, am_busy_port, portid);
}
}
erts_port_release(pt);
return SEND_RESCHEDULE;
}
if (IS_TRACED(p)) /* trace once only !! */
trace_send(p, portid, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
if (SEQ_TRACE_TOKEN(p) != NIL) {
seq_trace_update_send(p);
seq_trace_output(SEQ_TRACE_TOKEN(p), msg,
SEQ_TRACE_SEND, portid, p);
}
/* XXX NO GC in port command */
erts_port_command(p, p->id, pt, msg);
if (pt)
erts_port_release(pt);
if (ERTS_PROC_IS_EXITING(p)) {
KILL_CATCHES(p); /* Must exit */
return SEND_USER_ERROR;
}
return 0;
} else if (is_tuple(to)) { /* Remote send */
int ret;
tp = tuple_val(to);
if (*tp != make_arityval(2))
return SEND_BADARG;
if (is_not_atom(tp[1]) || is_not_atom(tp[2]))
return SEND_BADARG;
/* sysname_to_connected_dist_entry will return NULL if there
is no dist_entry or the dist_entry has no port*/
if ((dep = erts_sysname_to_connected_dist_entry(tp[2])) == NULL) {
return SEND_TRAP;
}
if (dep == erts_this_dist_entry) {
erts_deref_dist_entry(dep);
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
tp[1],
&rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
&pt);
if (pt) {
portid = pt->id;
goto port_common;
}
if (!rp) {
return 0;
}
goto send_message;
}
erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
if (is_nil(dep->cid))
ret = SEND_TRAP;
else if (dist_reg_send(p, ERTS_PROC_LOCK_MAIN, dep, tp[1], msg) == 1) {
if (is_internal_port(dep->cid)) {
if (suspend) {
erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
if (erts_system_monitor_flags.busy_dist_port) {
monitor_generic(p, am_busy_dist_port, dep->cid);
}
}
ret = SEND_RESCHEDULE;
}
else {
ret = SEND_TRAP;
}
}
else {
ret = 0;
if (IS_TRACED(p))
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
}
erts_dist_op_finalize(dep);
erts_deref_dist_entry(dep);
return ret;
} else {
if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */
trace_send(p, to, msg);
if (p->ct != NULL)
save_calls(p, &exp_send);
return SEND_BADARG;
}
send_message: {
Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND;
Sint res;
#ifdef ERTS_SMP
if (p == rp)
rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
/* send to local process */
erts_send_message(p, rp, &rp_locks, msg, 0);
#ifdef ERTS_SMP
res = rp->msg_inq.len*4;
if (ERTS_PROC_LOCK_MAIN & rp_locks)
res += rp->msg.len*4;
#else
res = rp->msg.len*4;
#endif
erts_smp_proc_unlock(rp,
p == rp
? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
: rp_locks);
return res;
}
}
8. 如果能找到节点的话 就调用dist_send 否者发生SEND_TRAP
9. dist.c
void init_dist(void)
{
init_alive();
init_nodes_monitors();
no_caches = 0;
/* Lookup/Install all references to trap functions */
dsend2_trap = trap_function(am_dsend,2);
dsend3_trap = trap_function(am_dsend,3);
/* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
dlink_trap = trap_function(am_dlink,1);
dunlink_trap = trap_function(am_dunlink,1);
dmonitor_node_trap = trap_function(am_dmonitor_node,3);
dgroup_leader_trap = trap_function(am_dgroup_leader,2);
dexit_trap = trap_function(am_dexit, 2);
dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
}
static Export*
trap_function(Eterm func, int arity)
{
return erts_export_put(am_erlang, func, arity);
}
也就是说dsend2_trap 就是erlang:dsend这个函数
当send 失败 的时候 参考5 的 send_2 将执行 BIF_TRAP2(dsend2_trap, p, to, msg);
10。 bif.h
#define BIF_TRAP2(Trap_, p, A0, A1) do { \
(p)->arity = 2; \
(p)->def_arg_reg[0] = (A0); \
(p)->def_arg_reg[1] = (A1); \
(p)->def_arg_reg[3] = (Eterm) (Trap_); \
(p)->freason = TRAP; \
return THE_NON_VALUE; \
} while(0)
11. beam_emu.c
OpCase(call_bif2_e):
{
Eterm (*bf)(Process*, Eterm, Eterm, Uint*) = GET_BIF_ADDRESS(Arg(0));
Eterm result;
Eterm* next;
SWAPOUT;
c_p->fcalls = FCALLS - 1;
if (FCALLS <= 0) {
save_calls(c_p, (Export *) Arg(0));
}
PreFetch(1, next);
CHECK_TERM(r(0));
CHECK_TERM(x(1));
PROCESS_MAIN_CHK_LOCKS(c_p);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
result = (*bf)(c_p, r(0), x(1), I);
ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
PROCESS_MAIN_CHK_LOCKS(c_p);
ERTS_HOLE_CHECK(c_p);
POST_BIF_GC_SWAPIN(c_p, result);
FCALLS = c_p->fcalls;
if (is_value(result)) {
r(0) = result;
CHECK_TERM(r(0));
NextPF(1, next);
} else if (c_p->freason == RESCHEDULE) {
c_p->arity = 2;
goto suspend_bif;
} else if (c_p->freason == TRAP) {
goto call_bif_trap3;
}
call_bif_trap3:
SET_CP(c_p, I+2);
SET_I(((Export *)(c_p->def_arg_reg[3]))->address);
SWAPIN;
r(0) = c_p->def_arg_reg[0];
x(1) = c_p->def_arg_reg[1];
x(2) = c_p->def_arg_reg[2];
Dispatch();
也就是说这时候trap erlang:dsend函数被调用。
12. erlang.erl
dsend({Name, Node}, Msg, Opts) ->
case net_kernel:connect(Node) of
true -> erlang:send({Name,Node}, Msg, Opts);
false -> ok;
ignored -> ok % Not distributed.
end.
先链接对方节点 成功 发数据包 转8 实际上是调用dist_send.
整个流程完毕。
这里面trap技术用的很巧妙。
net_kernel:connect 也很复杂 另文分析,请期待。
发表评论
-
OTP R14A今天发布了
2010-06-17 14:36 2676以下是这次发布的亮点,没有太大的性能改进, 主要是修理了很多B ... -
R14A实现了EEP31,添加了binary模块
2010-05-21 15:15 3030Erlang的binary数据结构非常强大,而且偏向底层,在作 ... -
如何查看节点的可用句柄数目和已用句柄数
2010-04-08 03:31 4814很多同学在使用erlang的过程中, 碰到了很奇怪的问题, 后 ... -
获取Erlang系统信息的代码片段
2010-04-06 21:49 3475从lib/megaco/src/tcp/megaco_tcp_ ... -
iolist跟list有什么区别?
2010-04-06 20:30 6529看到erlang-china.org上有个 ... -
erlang:send_after和erlang:start_timer的使用解释
2010-04-06 18:31 8386前段时间arksea 同学提出这个问题, 因为文档里面写的很不 ... -
Latest news from the Erlang/OTP team at Ericsson 2010
2010-04-05 19:23 2013参考Talk http://www.erlang-factor ... -
对try 异常 运行的疑问,为什么出现两种结果
2010-04-05 19:22 2842郎咸武<langxianzhe@163.com> ... -
Erlang ERTS Async基础设施
2010-03-19 00:03 2517其实Erts的Async做的很不错的, 相当的完备, 性能又高 ... -
CloudI 0.0.9 Released, A Cloud as an Interface
2010-03-09 22:32 2476基于Erlang的云平台 看了下代码 质量还是不错的 完成了不 ... -
Memory matters - even in Erlang (再次说明了了解内存如何工作的必要性)
2010-03-09 20:26 3439原文地址:http://www.lshift.net/blog ... -
Some simple examples of using Erlang’s XPath implementation
2010-03-08 23:30 2050原文地址 http://www.lshift.net/blog ... -
lcnt 环境搭建
2010-02-26 16:19 2614抄书:otp_doc_html_R13B04/lib/tool ... -
Erlang强大的代码重构工具 tidier
2010-02-25 16:22 2486Jan 29, 2010 We are very happy ... -
[Feb 24 2010] Erlang/OTP R13B04 has been released
2010-02-25 00:31 1387Erlang/OTP R13B04 has been rele ... -
R13B04 Installation
2010-01-28 10:28 1390R13B04后erlang的源码编译为了考虑移植性,就改变了编 ... -
Running tests
2010-01-19 14:51 1486R13B03以后 OTP的模块加入了大量的测试模块,这些模块都 ... -
R13B04在细化Binary heap
2010-01-14 15:11 1508从github otp的更新日志可以清楚的看到otp R13B ... -
R13B03 binary vheap有助减少binary内存压力
2009-11-29 16:07 1668R13B03 binary vheap有助减少binary内存 ... -
erl_nif 扩展erlang的另外一种方法
2009-11-26 01:02 3218我们知道扩展erl有2种方法, driver和port. 这2 ...
相关推荐
Erlang OTP 19_win64是一款专为Windows 64位系统设计的Erlang软件开发工具包,它包含Erlang编程语言和OTP(Open Telecom Platform)框架。Erlang是一种强大的、动态类型的函数式编程语言,特别适合构建高可用性、...
erlang_23.0.2-1版本 centos7 64bit esl-erlang_23.0.2-1_centos_7_amd64.rpm
esl-erlang_23.0和rabbitmq-3.8.4windows版本 直接下载安装就行,可以直接下载就可安装,非常的方便 ,欢迎大家下载 注意事项: 1. Erlang版本和RabbitMQ版本要配套 (Erlang23.0, RabbitMQ3.8.4) 2. amd芯片请乖乖...
在Erlang中,你可以使用`erlang:monitor_node/2`和`erlang:demonitor/1`函数来监控和取消监控其他节点的状态。当需要两个未连通的节点C和D进行通信时,可以在节点C上执行: ```erlang erlang:monitor_node(node(d),...
erlang-sd_notify-1.0-2.el7.x86_64.rpm
Erlang OTP (Open Telephony Platform) 是一种高级并发编程语言和框架,主要由瑞典电信设备制造商Ericsson开发,用于构建高度可靠、可扩展和实时的分布式系统。Erlang OTP 25.0是该平台的一个版本,特别针对Windows ...
"xiandiao_erlang_Erlang课后习题_"这个压缩包文件包含了Erlang程序设计第二版的课后习题源码,这对于学习和深入理解Erlang编程至关重要。 Erlang的特点: 1. **函数式编程**:Erlang基于函数式编程范式,强调无副...
这个erlang23.0版本,根据rabbitMQ官网的介绍,可以和下面这几个版本的rabbitMQ配合使用: 3.8.9 3.8.8 3.8.7 3.8.6 3.8.5 3.8.4 其他版本的rabbit,请移步其他资源下载
这两个公式由丹麦工程师 Agner Krarup Erlang 在20世纪初开发,对于理解通信系统中的呼叫占用率、阻塞率和系统容量具有重要意义。 Erlang B 公式主要用来计算在给定的呼叫到达速率和服务器数量下,系统不会发生阻塞...
Erlang是一种强大的编程语言,尤其在分布式计算、并发处理和实时系统中有着广泛的应用。OTP(Open Telecom Platform)是Erlang的核心组件,提供了一系列的库和设计原则,用于构建可靠、可扩展的系统。在Linux CentOS...
Erlang是一种面向并发的、函数式编程语言,被广泛应用于分布式系统和高可用性服务。在Erlang中,为了实现与C或其他低级语言的高效交互,Erlang提供了一个名为`erl_nif`的...因此,明智地选择何时使用NIFs是至关重要的。
其源代码的可用性让开发者有机会深入理解其工作原理,并根据需要自定义和扩展 OTP 的功能。通过学习和掌握 Erlang 语言以及 OTP 框架,开发者可以构建出高效、健壮的系统,应对现代软件工程的挑战。
Erlang opt_win64 20.2 windows exe 安装包 Erlang 20.2 is the upcoming version of Erlang For Windows x64 installer 截至2018.01.25 groovy最新最稳定版本
理解并熟练应用Erlang B模型,对于优化通信系统的性能和成本效益至关重要。通过深入研究"Erlang_B_model.pdf",读者可以更全面地了解这一模型的原理和应用,从而提升在通信工程领域的专业素养。
otp_win64_24.1.7.exe
标题“esl-erlang_19.3_osx_10.10_amd64.dmg”指的是一个特定版本的Erlang编程环境的安装镜像文件,由Erlang Solutions Limited(ESL)发布,适用于Apple macOS 10.10 Yosemite操作系统,并且是为64位AMD处理器设计...
标题中的"erlang_otp_22.2_win64"和"rabbitmq-server-3.8.3"提到了两个关键的IT技术:Erlang OTP和RabbitMQ服务器,这两个都是在分布式系统和消息队列领域非常重要的组件。 **Erlang OTP** Erlang是一种函数式编程...
Erlang OTP(Open Telephony Platform)是一种开源的编程语言和框架,专为构建高度并发、分布式和容错系统而设计。"Erlang otp_win64_21.2.exe" 是Erlang OTP在Windows 64位平台上的一个特定版本,编号为21.2。这个...
erlang_otp_20.3_man开发手册,erlang_otp_20.3_man开发手册,erlang_otp_20.3_man开发手册
Erlang OTP (Open Telephony Platform) 是一个用于构建高度并发、分布式和容错系统的软件框架,由瑞典的Ericsson公司开发。...了解并熟练掌握Erlang OTP和RabbitMQ的使用,对开发分布式系统和微服务架构至关重要。