浏览 4359 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2007-09-05
这个流程很长而且在erlang代码和c代码里面窜来窜去,重要的点 我用红字标注 请各位耐心。 1. net_adm.erl: ping(Node) when is_atom(Node) -> case catch gen:call({net_kernel, Node}, '$gen_call', {is_auth, node()}, infinity) of {ok, yes} -> pong; _ -> erlang:disconnect_node(Node), pang end. 2. gen.erl: %% Remote by name call({_Name, Node}=Process, Label, Request, Timeout) when is_atom(Node), Timeout =:= infinity; is_atom(Node), is_integer(Timeout), Timeout >= 0 -> if node() =:= nonode@nohost -> exit({nodedown, Node}); true -> do_call(Process, Label, Request, Timeout) end. do_call(Process, Label, Request, Timeout) -> %% We trust the arguments to be correct, i.e %% Process is either a local or remote pid, %% or a {Name, Node} tuple (of atoms) and in this %% case this node (node()) _is_ distributed and Node =/= node(). Node = case Process of {_S, N} -> N; _ when is_pid(Process) -> node(Process); _ -> node() end, case catch erlang:monitor(process, Process) of Mref when is_reference(Mref) -> receive {'DOWN', Mref, _, Pid1, noconnection} when is_pid(Pid1) -> exit({nodedown, node(Pid1)}); {'DOWN', Mref, _, _, noconnection} -> exit({nodedown, Node}); {'DOWN', Mref, _, _, _} -> exit(noproc) after 0 -> Process ! {Label, {self(), Mref}, Request}, wait_resp_mon(Process, Mref, Timeout) end; {'EXIT', _} -> %% Old node is not supporting the monitor. %% The other possible case -- this node is not distributed %% -- should have been handled earlier. %% Do the best possible with monitor_node/2. %% This code may hang indefinitely if the Process %% does not exist. It is only used for old remote nodes. monitor_node(Node, true), receive {nodedown, Node} -> monitor_node(Node, false), exit({nodedown, Node}) after 0 -> Mref = make_ref(), Process ! {Label, {self(),Mref}, Request}, Res = wait_resp(Node, Mref, Timeout), monitor_node(Node, false), Res end end. 3 . Process ! {Label, {self(),Mref}, Request}, 相当于erlang:send(Process, {Label, {self(),Mref}, Request}); 4. bif.tab bif 'erl.lang.proc':send/2 ebif_send_2 send_2 bif erlang:send/2 5. bif.c Eterm send_2(Process *p, Eterm to, Eterm msg) { Sint result = do_send(p, to, msg, !0); if (result > 0) { BUMP_REDS(p, result); BIF_RET(msg); } else switch (result) { case 0: BIF_RET(msg); break; case SEND_TRAP: BIF_TRAP2(dsend2_trap, p, to, msg); break; case SEND_RESCHEDULE: BIF_ERROR(p, RESCHEDULE); break; case SEND_BADARG: BIF_ERROR(p, BADARG); break; case SEND_USER_ERROR: BIF_ERROR(p, EXC_ERROR); break; default: ASSERT(! "Illegal send result"); break; } ASSERT(! "Can not arrive here"); BIF_ERROR(p, BADARG); } 6. bif.c Sint do_send(Process *p, Eterm to, Eterm msg, int suspend) { Eterm portid; Port *pt; Process* rp; DistEntry *dep; Eterm* tp; if (is_internal_pid(to)) { if (IS_TRACED(p)) trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); if (internal_pid_index(to) >= erts_max_processes) return SEND_BADARG; rp = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN, to, ERTS_PROC_LOCKS_MSG_SEND); if (!rp) { ERTS_SMP_ASSERT_IS_NOT_EXITING(p); return 0; } } else if (is_external_pid(to)) { Sint res; dep = external_pid_dist_entry(to); if(dep == erts_this_dist_entry) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, p->id, to, external_pid_creation(to), erts_this_node->creation); erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN); /* Send to remote process */ if (is_nil(dep->cid)) res = SEND_TRAP; else if (dist_send(p, ERTS_PROC_LOCK_MAIN, dep, to, msg) == 1) { if (is_internal_port(dep->cid)) { if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port); if (erts_system_monitor_flags.busy_dist_port) { monitor_generic(p, am_busy_dist_port, dep->cid); } } res = SEND_RESCHEDULE; } else { res = SEND_TRAP; } } else { res = 50; if (IS_TRACED(p)) trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); } erts_dist_op_finalize(dep); return res; } else if (is_atom(to)) { erts_whereis_name(p, ERTS_PROC_LOCK_MAIN, to, &rp, ERTS_PROC_LOCKS_MSG_SEND, 0, &pt); if (pt) { portid = pt->id; goto port_common; } if (IS_TRACED(p)) trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); if (!rp) { return SEND_BADARG; } } else if (is_external_port(to) && (external_port_dist_entry(to) == erts_this_dist_entry)) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, p->id, to, external_port_creation(to), erts_this_node->creation); erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } else if (is_internal_port(to)) { portid = to; pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN); port_common: ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt)); /* XXX let port_command handle the busy stuff !!! */ if (pt && (pt->status & ERTS_PORT_S_PORT_BUSY)) { if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); if (erts_system_monitor_flags.busy_port) { monitor_generic(p, am_busy_port, portid); } } erts_port_release(pt); return SEND_RESCHEDULE; } if (IS_TRACED(p)) /* trace once only !! */ trace_send(p, portid, msg); if (p->ct != NULL) save_calls(p, &exp_send); if (SEQ_TRACE_TOKEN(p) != NIL) { seq_trace_update_send(p); seq_trace_output(SEQ_TRACE_TOKEN(p), msg, SEQ_TRACE_SEND, portid, p); } /* XXX NO GC in port command */ erts_port_command(p, p->id, pt, msg); if (pt) erts_port_release(pt); if (ERTS_PROC_IS_EXITING(p)) { KILL_CATCHES(p); /* Must exit */ return SEND_USER_ERROR; } return 0; } else if (is_tuple(to)) { /* Remote send */ int ret; tp = tuple_val(to); if (*tp != make_arityval(2)) return SEND_BADARG; if (is_not_atom(tp[1]) || is_not_atom(tp[2])) return SEND_BADARG; /* sysname_to_connected_dist_entry will return NULL if there is no dist_entry or the dist_entry has no port*/ if ((dep = erts_sysname_to_connected_dist_entry(tp[2])) == NULL) { return SEND_TRAP; } if (dep == erts_this_dist_entry) { erts_deref_dist_entry(dep); if (IS_TRACED(p)) trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); erts_whereis_name(p, ERTS_PROC_LOCK_MAIN, tp[1], &rp, ERTS_PROC_LOCKS_MSG_SEND, 0, &pt); if (pt) { portid = pt->id; goto port_common; } if (!rp) { return 0; } goto send_message; } erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN); if (is_nil(dep->cid)) ret = SEND_TRAP; else if (dist_reg_send(p, ERTS_PROC_LOCK_MAIN, dep, tp[1], msg) == 1) { if (is_internal_port(dep->cid)) { if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port); if (erts_system_monitor_flags.busy_dist_port) { monitor_generic(p, am_busy_dist_port, dep->cid); } } ret = SEND_RESCHEDULE; } else { ret = SEND_TRAP; } } else { ret = 0; if (IS_TRACED(p)) trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); } erts_dist_op_finalize(dep); erts_deref_dist_entry(dep); return ret; } else { if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */ trace_send(p, to, msg); if (p->ct != NULL) save_calls(p, &exp_send); return SEND_BADARG; } send_message: { Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND; Sint res; #ifdef ERTS_SMP if (p == rp) rp_locks |= ERTS_PROC_LOCK_MAIN; #endif /* send to local process */ erts_send_message(p, rp, &rp_locks, msg, 0); #ifdef ERTS_SMP res = rp->msg_inq.len*4; if (ERTS_PROC_LOCK_MAIN & rp_locks) res += rp->msg.len*4; #else res = rp->msg.len*4; #endif erts_smp_proc_unlock(rp, p == rp ? (rp_locks & ~ERTS_PROC_LOCK_MAIN) : rp_locks); return res; } } 8. 如果能找到节点的话 就调用dist_send 否者发生SEND_TRAP 9. dist.c void init_dist(void) { init_alive(); init_nodes_monitors(); no_caches = 0; /* Lookup/Install all references to trap functions */ dsend2_trap = trap_function(am_dsend,2); dsend3_trap = trap_function(am_dsend,3); /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ dlink_trap = trap_function(am_dlink,1); dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); } static Export* trap_function(Eterm func, int arity) { return erts_export_put(am_erlang, func, arity); } 也就是说dsend2_trap 就是erlang:dsend这个函数 当send 失败 的时候 参考5 的 send_2 将执行 BIF_TRAP2(dsend2_trap, p, to, msg); 10。 bif.h #define BIF_TRAP2(Trap_, p, A0, A1) do { \ (p)->arity = 2; \ (p)->def_arg_reg[0] = (A0); \ (p)->def_arg_reg[1] = (A1); \ (p)->def_arg_reg[3] = (Eterm) (Trap_); \ (p)->freason = TRAP; \ return THE_NON_VALUE; \ } while(0) 11. beam_emu.c OpCase(call_bif2_e): { Eterm (*bf)(Process*, Eterm, Eterm, Uint*) = GET_BIF_ADDRESS(Arg(0)); Eterm result; Eterm* next; SWAPOUT; c_p->fcalls = FCALLS - 1; if (FCALLS <= 0) { save_calls(c_p, (Export *) Arg(0)); } PreFetch(1, next); CHECK_TERM(r(0)); CHECK_TERM(x(1)); PROCESS_MAIN_CHK_LOCKS(c_p); ASSERT(!ERTS_PROC_IS_EXITING(c_p)); result = (*bf)(c_p, r(0), x(1), I); ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result)); PROCESS_MAIN_CHK_LOCKS(c_p); ERTS_HOLE_CHECK(c_p); POST_BIF_GC_SWAPIN(c_p, result); FCALLS = c_p->fcalls; if (is_value(result)) { r(0) = result; CHECK_TERM(r(0)); NextPF(1, next); } else if (c_p->freason == RESCHEDULE) { c_p->arity = 2; goto suspend_bif; } else if (c_p->freason == TRAP) { goto call_bif_trap3; } call_bif_trap3: SET_CP(c_p, I+2); SET_I(((Export *)(c_p->def_arg_reg[3]))->address); SWAPIN; r(0) = c_p->def_arg_reg[0]; x(1) = c_p->def_arg_reg[1]; x(2) = c_p->def_arg_reg[2]; Dispatch(); 也就是说这时候trap erlang:dsend函数被调用。 12. erlang.erl dsend({Name, Node}, Msg, Opts) -> case net_kernel:connect(Node) of true -> erlang:send({Name,Node}, Msg, Opts); false -> ok; ignored -> ok % Not distributed. end. 先链接对方节点 成功 发数据包 转8 实际上是调用dist_send. 整个流程完毕。 这里面trap技术用的很巧妙。 net_kernel:connect 也很复杂 另文分析,请期待。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |