`
xumingyong
  • 浏览: 182525 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

当你net_adm:ping(Node)的时候发生了什么?流程复杂 但是对理解dist工作原理至关重

阅读更多
    当你net_adm:ping(Node)的时候发生了什么? 这个涉及到很复杂的流程。让我为你解刨:
    这个流程很长而且在erlang代码和c代码里面窜来窜去,重要的点 我用红字标注 请各位耐心。

    1. net_adm.erl:
    ping(Node) when is_atom(Node) ->
        case catch gen:call({net_kernel, Node},
                '$gen_call',
                {is_auth, node()},
                infinity)
    of
        {ok, yes} -> pong;
        _ ->
            erlang:disconnect_node(Node),
            pang
        end.

    2. gen.erl:
    %% Remote by name
    call({_Name, Node}=Process, Label, Request, Timeout)
      when is_atom(Node), Timeout =:= infinity;
           is_atom(Node), is_integer(Timeout), Timeout >= 0 ->
        if
         node() =:= nonode@nohost ->
             exit({nodedown, Node});
         true ->
             do_call(Process, Label, Request, Timeout)
        end.

    do_call(Process, Label, Request, Timeout) ->
        %% We trust the arguments to be correct, i.e
        %% Process is either a local or remote pid,
        %% or a {Name, Node} tuple (of atoms) and in this
        %% case this node (node()) _is_ distributed and Node =/= node().
        Node = case Process of
               {_S, N} ->
               N;
               _ when is_pid(Process) ->
               node(Process);
               _ ->
               node()
           end,
        case catch erlang:monitor(process, Process) of
        Mref when is_reference(Mref) ->
            receive
            {'DOWN', Mref, _, Pid1, noconnection} when is_pid(Pid1) ->
                exit({nodedown, node(Pid1)});
            {'DOWN', Mref, _, _, noconnection} ->
                exit({nodedown, Node});
            {'DOWN', Mref, _, _, _} ->
                exit(noproc)
            after 0 ->
                Process ! {Label, {self(), Mref}, Request},
                wait_resp_mon(Process, Mref, Timeout)
            end;
        {'EXIT', _} ->
            %% Old node is not supporting the monitor.
            %% The other possible case -- this node is not distributed
            %% -- should have been handled earlier.
            %% Do the best possible with monitor_node/2.
            %% This code may hang indefinitely if the Process
            %% does not exist. It is only used for old remote nodes.
            monitor_node(Node, true),
            receive
            {nodedown, Node} ->
                monitor_node(Node, false),
                exit({nodedown, Node})
            after 0 ->
                Mref = make_ref(),
                Process ! {Label, {self(),Mref}, Request},   
                Res = wait_resp(Node, Mref, Timeout),
                monitor_node(Node, false),
                Res
            end
        end.

    3 . Process ! {Label, {self(),Mref}, Request},     相当于erlang:send(Process,  {Label, {self(),Mref}, Request});

    4. bif.tab
     bif 'erl.lang.proc':send/2    ebif_send_2 send_2 bif erlang:send/2

    5. bif.c
    Eterm send_2(Process *p, Eterm to, Eterm msg) {
        Sint result = do_send(p, to, msg, !0);
       
        if (result > 0) {
        BUMP_REDS(p, result);
        BIF_RET(msg);
        } else switch (result) {
        case 0:
        BIF_RET(msg);
        break;
        case SEND_TRAP:
        BIF_TRAP2(dsend2_trap, p, to, msg);

        break;
        case SEND_RESCHEDULE:
        BIF_ERROR(p, RESCHEDULE);
        break;
        case SEND_BADARG:
        BIF_ERROR(p, BADARG);
        break;
        case SEND_USER_ERROR:
        BIF_ERROR(p, EXC_ERROR);
        break;
        default:
        ASSERT(! "Illegal send result");
        break;
        }
        ASSERT(! "Can not arrive here");
        BIF_ERROR(p, BADARG);
    }


    6. bif.c
     Sint
    do_send(Process *p, Eterm to, Eterm msg, int suspend) {
        Eterm portid;
        Port *pt;
        Process* rp;
        DistEntry *dep;
        Eterm* tp;

        if (is_internal_pid(to)) {
        if (IS_TRACED(p))
            trace_send(p, to, msg);
        if (p->ct != NULL)
           save_calls(p, &exp_send);
       
        if (internal_pid_index(to) >= erts_max_processes)
            return SEND_BADARG;
        rp = erts_pid2proc(p, ERTS_PROC_LOCK_MAIN, to, ERTS_PROC_LOCKS_MSG_SEND);
       
        if (!rp) {
            ERTS_SMP_ASSERT_IS_NOT_EXITING(p);
            return 0;
        }
        } else if (is_external_pid(to)) {
        Sint res;
        dep = external_pid_dist_entry(to);
        if(dep == erts_this_dist_entry) {
            erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
            erts_dsprintf(dsbufp,
                  "Discarding message %T from %T to %T in an old "
                  "incarnation (%d) of this node (%d)\n",
                  msg,
                  p->id,
                  to,
                  external_pid_creation(to),
                  erts_this_node->creation);
            erts_send_error_to_logger(p->group_leader, dsbufp);
            return 0;
        }

        erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);

        /* Send to remote process */
        if (is_nil(dep->cid))
            res = SEND_TRAP;
        else if (dist_send(p, ERTS_PROC_LOCK_MAIN, dep, to, msg) == 1) {

            if (is_internal_port(dep->cid)) {
            if (suspend) {
                erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
                if (erts_system_monitor_flags.busy_dist_port) {
                monitor_generic(p, am_busy_dist_port, dep->cid);
                }
            }
            res = SEND_RESCHEDULE;
            }
            else {
            res = SEND_TRAP;
            }
        }
        else {
            res = 50;
            if (IS_TRACED(p))
            trace_send(p, to, msg);
            if (p->ct != NULL)
            save_calls(p, &exp_send);
        }
       
        erts_dist_op_finalize(dep);

        return res;
        } else if (is_atom(to)) {
        erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
                  to,
                  &rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
                  &pt);

        if (pt) {
            portid = pt->id;
            goto port_common;
        }

        if (IS_TRACED(p))
            trace_send(p, to, msg);
        if (p->ct != NULL)
           save_calls(p, &exp_send);
       
        if (!rp) {
            return SEND_BADARG;
        }
        } else if (is_external_port(to)
               && (external_port_dist_entry(to)
               == erts_this_dist_entry)) {
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
        erts_dsprintf(dsbufp,
                  "Discarding message %T from %T to %T in an old "
                  "incarnation (%d) of this node (%d)\n",
                  msg,
                  p->id,
                  to,
                  external_port_creation(to),
                  erts_this_node->creation);
        erts_send_error_to_logger(p->group_leader, dsbufp);
        return 0;
        } else if (is_internal_port(to)) {
        portid = to;
        pt = erts_id2port(to, p, ERTS_PROC_LOCK_MAIN);
          port_common:
        ERTS_SMP_LC_ASSERT(!pt || erts_lc_is_port_locked(pt));
        /* XXX let port_command handle the busy stuff !!! */
        if (pt && (pt->status & ERTS_PORT_S_PORT_BUSY)) {
            if (suspend) {
            erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
            if (erts_system_monitor_flags.busy_port) {
                monitor_generic(p, am_busy_port, portid);
            }
            }
            erts_port_release(pt);
            return SEND_RESCHEDULE;
        }
       
        if (IS_TRACED(p))     /* trace once only !! */
            trace_send(p, portid, msg);
        if (p->ct != NULL)
           save_calls(p, &exp_send);
       
        if (SEQ_TRACE_TOKEN(p) != NIL) {
            seq_trace_update_send(p);
            seq_trace_output(SEQ_TRACE_TOKEN(p), msg,
                     SEQ_TRACE_SEND, portid, p);
        }       
       
        /* XXX NO GC in port command */
        erts_port_command(p, p->id, pt, msg);
        if (pt)
            erts_port_release(pt);
        if (ERTS_PROC_IS_EXITING(p)) {
            KILL_CATCHES(p); /* Must exit */
            return SEND_USER_ERROR;
        }
        return 0;
        } else if (is_tuple(to)) { /* Remote send */
        int ret;
        tp = tuple_val(to);
        if (*tp != make_arityval(2))
            return SEND_BADARG;
        if (is_not_atom(tp[1]) || is_not_atom(tp[2]))
            return SEND_BADARG;
       
        /* sysname_to_connected_dist_entry will return NULL if there
           is no dist_entry or the dist_entry has no port*/
        if ((dep = erts_sysname_to_connected_dist_entry(tp[2])) == NULL) {
            return SEND_TRAP;
        }

        if (dep == erts_this_dist_entry) {
            erts_deref_dist_entry(dep);
            if (IS_TRACED(p))
            trace_send(p, to, msg);
            if (p->ct != NULL)
               save_calls(p, &exp_send);

            erts_whereis_name(p, ERTS_PROC_LOCK_MAIN,
                      tp[1],
                      &rp, ERTS_PROC_LOCKS_MSG_SEND, 0,
                      &pt);
            if (pt) {
            portid = pt->id;
            goto port_common;
            }

            if (!rp) {
            return 0;
            }
            goto send_message;
        }

        erts_dist_op_prepare(dep, p, ERTS_PROC_LOCK_MAIN);
        if (is_nil(dep->cid))
            ret = SEND_TRAP;
        else if (dist_reg_send(p, ERTS_PROC_LOCK_MAIN, dep, tp[1], msg) == 1) {
            if (is_internal_port(dep->cid)) {
            if (suspend) {
                erts_suspend(p, ERTS_PROC_LOCK_MAIN, dep->port);
                if (erts_system_monitor_flags.busy_dist_port) {
                monitor_generic(p, am_busy_dist_port, dep->cid);
                }
            }
            ret = SEND_RESCHEDULE;
            }
            else {
            ret = SEND_TRAP;
            }

        }
        else {
            ret = 0;
            if (IS_TRACED(p))
            trace_send(p, to, msg);
            if (p->ct != NULL)
            save_calls(p, &exp_send);
        }

        erts_dist_op_finalize(dep);
        erts_deref_dist_entry(dep);
        return ret;
        } else {
        if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */
            trace_send(p, to, msg);
        if (p->ct != NULL)
           save_calls(p, &exp_send);
        return SEND_BADARG;
        }
       
     send_message: {
        Uint32 rp_locks = ERTS_PROC_LOCKS_MSG_SEND;
        Sint res;
    #ifdef ERTS_SMP
        if (p == rp)
            rp_locks |= ERTS_PROC_LOCK_MAIN;
    #endif
        /* send to local process */
        erts_send_message(p, rp, &rp_locks, msg, 0);
    #ifdef ERTS_SMP
        res = rp->msg_inq.len*4;
        if (ERTS_PROC_LOCK_MAIN & rp_locks)
            res += rp->msg.len*4;
    #else
        res = rp->msg.len*4;
    #endif
        erts_smp_proc_unlock(rp,
                     p == rp
                     ? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
                     : rp_locks);
        return res;
        }
    }

    8. 如果能找到节点的话 就调用dist_send  否者发生SEND_TRAP

    9. dist.c


    void init_dist(void)
    {
        init_alive();
        init_nodes_monitors();

        no_caches = 0;

        /* Lookup/Install all references to trap functions */
        dsend2_trap = trap_function(am_dsend,2);
        dsend3_trap = trap_function(am_dsend,3);
        /*    dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
        dlink_trap = trap_function(am_dlink,1);
        dunlink_trap = trap_function(am_dunlink,1);
        dmonitor_node_trap = trap_function(am_dmonitor_node,3);
        dgroup_leader_trap = trap_function(am_dgroup_leader,2);
        dexit_trap = trap_function(am_dexit, 2);
        dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
    }

    static Export*
    trap_function(Eterm func, int arity)
    {
        return erts_export_put(am_erlang, func, arity);
    }
    也就是说
    dsend2_trap 就是erlang:dsend这个函数

    当send 失败 的时候 参考5 的 send_2   将执行 BIF_TRAP2(dsend2_trap, p, to, msg);

    10。 bif.h
    #define BIF_TRAP2(Trap_, p, A0, A1) do {    \
          (p)->arity = 2;                \
          (p)->def_arg_reg[0] = (A0);        \
          (p)->def_arg_reg[1] = (A1);        \
          (p)->def_arg_reg[3] = (Eterm) (Trap_);    \
          (p)->freason = TRAP;            \
          return THE_NON_VALUE;            \
     } while(0)

    11. beam_emu.c

     OpCase(call_bif2_e):
        {
        Eterm (*bf)(Process*, Eterm, Eterm, Uint*) = GET_BIF_ADDRESS(Arg(0));
        Eterm result;
        Eterm* next;

        SWAPOUT;
        c_p->fcalls = FCALLS - 1;
        if (FCALLS <= 0) {
           save_calls(c_p, (Export *) Arg(0));
        }
        PreFetch(1, next);
        CHECK_TERM(r(0));
        CHECK_TERM(x(1));
        PROCESS_MAIN_CHK_LOCKS(c_p);
        ASSERT(!ERTS_PROC_IS_EXITING(c_p));
        result = (*bf)(c_p, r(0), x(1), I);
        ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
        PROCESS_MAIN_CHK_LOCKS(c_p);
        ERTS_HOLE_CHECK(c_p);
        POST_BIF_GC_SWAPIN(c_p, result);
        FCALLS = c_p->fcalls;
        if (is_value(result)) {
            r(0) = result;
            CHECK_TERM(r(0));
            NextPF(1, next);
        } else if (c_p->freason == RESCHEDULE) {
            c_p->arity = 2;
            goto suspend_bif;
        } else if (c_p->freason == TRAP) {
            goto call_bif_trap3;

        }

        call_bif_trap3:
            SET_CP(c_p, I+2);
            SET_I(((Export *)(c_p->def_arg_reg[3]))->address);
            SWAPIN;
            r(0) = c_p->def_arg_reg[0];
            x(1) = c_p->def_arg_reg[1];
            x(2) = c_p->def_arg_reg[2];
            Dispatch();
    也就是说这时候trap erlang:dsend函数被调用。

    12. erlang.erl

    dsend({Name, Node}, Msg, Opts) ->
        case net_kernel:connect(Node) of
        true -> erlang:send({Name,Node}, Msg, Opts);
        false -> ok;
        ignored -> ok                % Not distributed.
        end.

    先链接对方节点 成功 发数据包 转8 实际上是调用dist_send.
    整个流程完毕。

    这里面trap技术用的很巧妙。

    net_kernel:connect 也很复杂 另文分析,请期待。
分享到:
评论

相关推荐

    中国边界矢量数据.rar_CHN_adm1矢量图_shp数据_site:www.pudn.com_中国边界矢量_全国 shp

    标题中的“中国边界矢量数据.rar_CHN_adm1矢量图_shp数据_site:www.pudn.com_中国边界矢量_全国 shp”揭示...同时,数据的更新也是关键,因为行政区划可能会随着政策调整而发生变化,及时获取最新的矢量数据至关重要。

    SSRS 中国空间数据地图CHN_adm.zip

    CHN_adm0.dbf、CHN_adm1.dbf、CHN_adm2.dbf和CHN_adm3.dbf分别代表了中国不同级别的行政区划数据。具体来说: 1. CHN_adm0.dbf:这是最高级别的行政区域数据,通常对应于中国的省份。 2. CHN_adm1.dbf:代表次一级...

    CHN_adm.zip

    "CHN_adm.zip"是一个压缩包文件,主要用于在Python环境中进行地理数据的可视化。这个压缩包中的数据可能包含了中国行政区域的详细划分,通常这种数据格式是为地图绘制准备的,尤其适用于地理信息系统(GIS)和数据...

    AFG_adm.zip

    《GIS数据:AFG_adm.zip的深度解析与应用》 在信息技术的众多领域中,地理信息系统(GIS)因其在地图制作、空间分析和决策支持等方面的重要作用而备受关注。对于初学者而言,掌握GIS的基本操作和理解相关数据格式至...

    Android_ADM_

    Android_ADM

    student_adm_system.rar_Admission_adm_asp.net_student

    【标题】"student_adm_system.rar" 是一个包含学生管理系统源代码的压缩文件,而 "Admission_adm_asp.net_student" 暗示这个系统是关于学生入学管理的,采用了 ASP.NET 技术进行开发。 【描述】"Student admission ...

    adm3053brwz.rar_ADM3053BRWZ_adm3053_labview can收发

    标题中的“ADM3053BRWZ.rar_ADM3053BRWZ_adm3053_labview can收发”表明该压缩包包含了关于ADM3053芯片在CAN(Controller Area Network)总线应用中的LabVIEW(Laboratory Virtual Instrument Engineering Workbench...

    ADM-LDM.rar_ADM matlab_adm_adm matlab_增量调制_增量调制matlab

    基于自适应增量调制和线性增量调制的原理,用matlab实现LDM和ADM,并比较两者之间的优缺点

    zw_USA_adm.rar

    标题“zw_USA_adm.rar”暗示这可能是一个包含与美国行政区域或管理相关的数据的压缩文件。"adm"标签进一步支持了这一假设,因为"adm"经常用于表示行政区域或行政区划的数据。在IT领域,这样的数据集可能包含关于美国...

    ADM2587E.rar_ADM2582E _adm2587e_rs422

    ADM2582E-ADM2587E-完全集成式隔离数据收发器 for RS485,RS422

    oracle streams

    在 Oracle 10g R2 中,Streams 进行了重大改进,简化了配置流程,通过一系列 PL/SQL 过程封装了原本复杂的配置步骤,使得设置更加便捷,同时也保留了手动调整脚本的灵活性。 Streams 的核心原理基于日志解析...

    ADM515Database_adm_ORA.7z.002

    ADM515Database_adm_ORA.7z.002ADM515Database_adm_ORA.7z.002

    ADM515Database_adm_ORA.7z.001

    ADM515Database_adm_ORA.7z.001ADM515Database_adm_ORA.7z.001

    ADMtek_ADM8511_drv_USB_LAN

    标题“ADMtek_ADM8511_drv_USB_LAN”涉及的是一个专为ADMtek ADM8511芯片设计的USB到LAN驱动程序。这款驱动程序使得计算机可以通过USB接口连接到局域网(LAN),从而实现网络通信。ADMtek ADM8511是一款流行的USB到...

    adm6996.tar.gz_adm6996_交换机

    标题中的"adm6996.tar.gz_adm6996_交换机"指的是一个针对ADM6996芯片的交换机驱动程序的压缩包文件。这个文件采用tar.gz格式,是Linux和Unix环境中常见的归档和压缩方式,用于收集多个文件并将其压缩成一个单一的可...

    ADM2587应用_ADM2587应用

    在面临复杂环境,如高共模电压的工业控制系统中,非隔离型RS485可能无法满足需求。此时,采用隔离型RS485电路可以解决问题。隔离型电路通过DC-DC隔离电源和RS485收发器,利用光耦或隔离芯片如ADM2483、ADM2587E等...

    CHN_adm_shp_2016.7z

    标题 "CHN_adm_shp_2016.7z" 暗示着这是一个包含中国行政区域划分数据的压缩文件,而描述中的 "GADM CHN ADM SHP 2016.07.14" 提供了更多关于文件内容的信息。GADM,全称为Global Administrative Areas,是一个全球...

    sap_ora_adm

    根据提供的文件信息,我们深入探讨“sap_ora_adm”这一主题,即SAP系统与Oracle数据库的管理。 ### SAP与Oracle的整合 #### 1. **SAP NetWeaver 04** SAP NetWeaver是SAP公司推出的一系列集成技术平台之一,旨在...

    CHN_adm_2016.7z

    标题 "CHN_adm_2016.7z" 和描述 "GADM CHN ADM 2016.07.14" 暗示这是一个地理数据集,其中包含中国行政区域的2016年数据。GADM(Global Administrative Areas)是一个全球知名的项目,专注于提供全球各地的行政区划...

    ADM.rar_adm matlab_交替方向_交替方向ADM_交替方向法_变分不等式

    交替方向法(Alternating Direction Method, 简称ADM)是一种强大的数值计算方法,尤其在处理结构型单调变分不等式问题时表现卓越。它源于优化理论,旨在解决那些包含多个变量且彼此关联的复杂优化问题。在MATLAB环境...

Global site tag (gtag.js) - Google Analytics