`
cwqcwk1
  • 浏览: 87388 次
文章分类
社区版块
存档分类
最新评论

erlang send剖析及参数意义

 
阅读更多
erlang send是一个很基础的消息发送函数,用于进程把一个消息发给另外一个进程。这个函数可以同时用于本地节点进程通信,或者和远程节点进程之间的通信。

前言

最近有同事遇到erlang:send导致消息堆积问题,这个引起了我的强烈关注。我也看了这块的代码,这里简单做个分享。

函数原型:
erlang:send(Dest, Msg, Options) -> Res
Options可以是以下2个:
nosuspend
If the sender would have to be suspended to do the send, nosuspend is returned instead.

noconnect
If the destination node would have to be auto-connected before doing the send, noconnect is returned instead.

字面上意思是说:
nosuspend 遇到会挂起进程的场合不挂起进程,直接返回nosuspend
noconnect 遇到远程节点没有连接时不会自动连接发送消息,直接返回noconnect

但是,erlang直接在文档说慎用nosuspend
Warning

As witherlang:send_nosuspend/2,3: Use with extreme care!

到底为什么这么说,而且返回nosuspend 时这个消息是发送出去了,还是没发送出去?

源码剖析

看了erlang代码,erlang:send是bif实现的,这里以R16B02做说明:
/**
 *   bif.c send_3()函数,实现erlang:send/3
 */
BIF_RETTYPE send_3(BIF_ALIST_3)
{
    Eterm ref;
    Process *p = BIF_P;
    Eterm to = BIF_ARG_1;
    Eterm msg = BIF_ARG_2;
    Eterm opts = BIF_ARG_3;

    int connect = !0; // 初始值设1,表示非0值
    int suspend = !0; // 同上
    Eterm l = opts;
    Sint result;
    
    while (is_list(l)) { //遍历参数列表
	if (CAR(list_val(l)) == am_noconnect) {
	    connect = 0; // 参数带 noconnect,则 connect 取值0
	} else if (CAR(list_val(l)) == am_nosuspend) {
	    suspend = 0; // 同上
	} else {
	    BIF_ERROR(p, BADARG);
	}
	l = CDR(list_val(l));
    }
    if(!is_nil(l)) {
	BIF_ERROR(p, BADARG);
    }

    // 调用 do_send 发送消息;result 大于0表示本次消息发送要扣除的reds,其他则表示错误码
    result = do_send(p, to, msg, suspend, &ref); 
    if (result > 0) {
	ERTS_VBUMP_REDS(p, result); // 扣除本次消息发送的 reds
	if (ERTS_IS_PROC_OUT_OF_REDS(p))
	    goto yield_return;
	BIF_RET(am_ok);
    }

    switch (result) {
    case 0:
	/* May need to yield even though we do not bump reds here... */
	if (ERTS_IS_PROC_OUT_OF_REDS(p))
	    goto yield_return;
	BIF_RET(am_ok); 
	break;

    // 遇到 SEND_TRAP 错误
    case SEND_TRAP: 
	if (connect) {
            // connect 不等于 0
	    BIF_TRAP3(dsend3_trap, p, to, msg, opts); 
	} else {
            // connect 等于 0,直接返回 noconnect
	    BIF_RET(am_noconnect);
	}
	break;
    // 遇到 SEND_YIELD 错误
    case SEND_YIELD: 
	if (suspend) {
            // suspend 不等于 0
	    ERTS_BIF_YIELD3(bif_export[BIF_send_3], p, to, msg, opts);
	} else {
            // suspend 等于 0,直接返回 nosuspend
	    BIF_RET(am_nosuspend);
	}
	break;

    // 遇到 SEND_YIELD_RETURN 错误
    case SEND_YIELD_RETURN: 
        // suspend 等于 0,直接返回 nosuspend
	if (!suspend)
	    BIF_RET(am_nosuspend);
    yield_return:
	ERTS_BIF_YIELD_RETURN(p, am_ok);
    case SEND_AWAIT_RESULT:
	ASSERT(is_internal_ref(ref));
	BIF_TRAP3(await_port_send_result_trap, p, ref, am_nosuspend, am_ok);
    case SEND_BADARG:
	BIF_ERROR(p, BADARG); 
	break;
    case SEND_USER_ERROR:
	BIF_ERROR(p, EXC_ERROR); 
	break;
    case SEND_INTERNAL_ERROR:
	BIF_ERROR(p, EXC_INTERNAL_ERROR);
	break;
    default:
	ASSERT(! "Illegal send result"); 
	break;
    }
    ASSERT(! "Can not arrive here");
    BIF_ERROR(p, BADARG);
}

再来看看 do_send() 函数:
/*
 * bif.c do_send()函数,实现发送到其他进程,或端口,或远程进程
 * 返回消息发送的reds,或错误码
 */
//以下几个是 do_send 可能返回的错误码
#define SEND_TRAP		(-1)
#define SEND_YIELD		(-2)
#define SEND_YIELD_RETURN	(-3)
#define SEND_BADARG		(-4)
#define SEND_USER_ERROR		(-5)
#define SEND_INTERNAL_ERROR	(-6)
#define SEND_AWAIT_RESULT	(-7)

Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp) {
    Eterm portid;
    Port *pt;
    Process* rp;
    DistEntry *dep;
    Eterm* tp;

	// 如果目标进程在本地节点
    if (is_internal_pid(to)) {
	if (IS_TRACED(p))
	    trace_send(p, to, msg);
	if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
	    save_calls(p, &exp_send);

	rp = erts_proc_lookup_raw(to);	
	if (!rp)
	    return 0;
	// 找到这个进程则执行最后的 send_message

    } else if (is_external_pid(to)) {
	// 如果目标进程在远程节点
	dep = external_pid_dist_entry(to);
	if(dep == erts_this_dist_entry) {
	    erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
	    erts_dsprintf(dsbufp,
			  "Discarding message %T from %T to %T in an old "
			  "incarnation (%d) of this node (%d)\n",
			  msg,
			  p->common.id,
			  to,
			  external_pid_creation(to),
			  erts_this_node->creation);
	    erts_send_error_to_logger(p->group_leader, dsbufp);
	    return 0;
	}
	// 远程消息调用 remote_send 发送
	return remote_send(p, dep, to, to, msg, suspend);
    } else if (is_atom(to)) {
	// 如果传参是原子,尝试从进程表找到这个进程
	Eterm id = erts_whereis_name_to_id(p, to);

	rp = erts_proc_lookup(id);
	if (rp)
	    goto send_message;

	// 找不到这个进程,检查目标是不是端口
	pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP);
	if (pt) {
	    portid = id;
	    goto port_common;
	}

	if (IS_TRACED(p))
	    trace_send(p, to, msg);
	if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
	    save_calls(p, &exp_send);
	
	return SEND_BADARG;
    } else if (is_external_port(to)
	       && (external_port_dist_entry(to)
		   == erts_this_dist_entry)) {
	erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
	erts_dsprintf(dsbufp,
		      "Discarding message %T from %T to %T in an old "
		      "incarnation (%d) of this node (%d)\n",
		      msg,
		      p->common.id,
		      to,
		      external_port_creation(to),
		      erts_this_node->creation);
	erts_send_error_to_logger(p->group_leader, dsbufp);
	return 0;
    } else if (is_internal_port(to)) {
	// 如果是本地端口
	int ret_val;
	portid = to;

	pt = erts_port_lookup(portid, ERTS_PORT_SFLGS_INVALID_LOOKUP);

      port_common:
	ret_val = 0;
        
	if (pt) {
	    int ps_flags = suspend ? 0 : ERTS_PORT_SIG_FLG_NOSUSPEND;
	    *refp = NIL;

		// 执行端口操作
	    switch (erts_port_command(p, ps_flags, pt, msg, refp)) {
	    case ERTS_PORT_OP_CALLER_EXIT:
		/* We are exiting... */
		return SEND_USER_ERROR;
	    case ERTS_PORT_OP_BUSY:
		/* Nothing has been sent */
		if (suspend)
		    erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
		// 如果 nosuspend 返回 SEND_YIELD,消息还没发送
		return SEND_YIELD;
	    case ERTS_PORT_OP_BUSY_SCHEDULED:
		/* Message was sent */
		if (suspend) {
		    erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt);
		    ret_val = SEND_YIELD_RETURN;
		    break;
		}
		// 这里没有break,如果 nosuspend 将执行下一步操作,消息已发送
		/* Fall through */
	    case ERTS_PORT_OP_SCHEDULED:
		if (is_not_nil(*refp)) {
		    ASSERT(is_internal_ref(*refp));
		    ret_val = SEND_AWAIT_RESULT;
		}
		break;
	    case ERTS_PORT_OP_DROPPED:
	    case ERTS_PORT_OP_BADARG:
	    case ERTS_PORT_OP_DONE:
		break;
	    default:
		ERTS_INTERNAL_ERROR("Unexpected erts_port_command() result");
		break;
	    }
	}
	
	if (IS_TRACED(p)) 	/* trace once only !! */
	    trace_send(p, portid, msg);
	if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
	    save_calls(p, &exp_send);
	
	if (SEQ_TRACE_TOKEN(p) != NIL
#ifdef USE_VM_PROBES
	    && SEQ_TRACE_TOKEN(p) != am_have_dt_utag
#endif
	    ) {
	    seq_trace_update_send(p);
	    seq_trace_output(SEQ_TRACE_TOKEN(p), msg, 
			     SEQ_TRACE_SEND, portid, p);
	}	    
	
	if (ERTS_PROC_IS_EXITING(p)) {
	    KILL_CATCHES(p); /* Must exit */
	    return SEND_USER_ERROR;
	}
	return ret_val;
    } else if (is_tuple(to)) { /* Remote send */
	// 如果to是原子,走到这里只有是发送远程消息的情况了
	int ret;
	tp = tuple_val(to);
	if (*tp != make_arityval(2))
	    return SEND_BADARG;
	if (is_not_atom(tp[1]) || is_not_atom(tp[2]))
	    return SEND_BADARG;
	
	/* sysname_to_connected_dist_entry will return NULL if there
	   is no dist_entry or the dist_entry has no port,
	   but remote_send() will handle that. */

	// 找到 dist_entry 就用本地进程消息或端口形式发送
	dep = erts_sysname_to_connected_dist_entry(tp[2]);

	if (dep == erts_this_dist_entry) {
	    Eterm id;
	    erts_deref_dist_entry(dep);
	    if (IS_TRACED(p))
		trace_send(p, to, msg);
	    if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
		save_calls(p, &exp_send);

	    id = erts_whereis_name_to_id(p, tp[1]);

	    rp = erts_proc_lookup_raw(id);
	    if (rp)
		goto send_message;
	    pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP);
	    if (pt) {
		portid = id;
		goto port_common;
	    }
	    return 0;
	}

	// 找不到 dist_entry 就用 remote_send 发送
	ret = remote_send(p, dep, tp[1], to, msg, suspend);
	if (dep)
	    erts_deref_dist_entry(dep);
	return ret;
    } else {
	if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */
	    trace_send(p, to, msg);
	if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
	    save_calls(p, &exp_send);
	return SEND_BADARG;
    }
    
	// 以下过程是处理本节点进程消息
 send_message: {
	ErtsProcLocks rp_locks = 0;
	Sint res;
#ifdef ERTS_SMP
	if (p == rp)
	    rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
	/* send to local process */
	res = erts_send_message(p, rp, &rp_locks, msg, 0);
	if (erts_use_sender_punish)
	    res *= 4;
	else
	    res = 0;
	erts_smp_proc_unlock(rp,
			     p == rp
			     ? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
			     : rp_locks);
	return res;
    }
}
再看下本地进程消息处理erts_send_message() 函数:
/*
 * erl_message.c erts_send_message()函数 实现发送本地消息给进程
 */

Sint
erts_send_message(Process* sender,
		  Process* receiver,
		  ErtsProcLocks *receiver_locks,
		  Eterm message,
		  unsigned flags)
{
    Uint msize;
    ErlHeapFragment* bp = NULL;
    Eterm token = NIL;
    Sint res = 0;
#ifdef USE_VM_PROBES
    DTRACE_CHARBUF(sender_name, 64);
    DTRACE_CHARBUF(receiver_name, 64);
    Sint tok_label = 0;
    Sint tok_lastcnt = 0;
    Sint tok_serial = 0;
#endif
    BM_STOP_TIMER(system);
    BM_MESSAGE(message,sender,receiver);
    BM_START_TIMER(send);

 #ifdef USE_VM_PROBES
    *sender_name = *receiver_name = '\0';
   if (DTRACE_ENABLED(message_send)) {
        erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
               "%T", sender->common.id);
        erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), 
               "%T", receiver->common.id);
    }
#endif
    if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
	// 发送进程打了跟踪标记 sequential_trace_token;后面处理进程跟踪过程
	// 见 http://www.erlang.org/doc/man/erlang.html#process_flag-3

    /*
	 * 篇幅有限,这里省略了部分无关代码
	 */

    } else if (sender == receiver) {
	// 进程发送消息给自己
	
	/* 如果进程正在关闭,则丢弃消息 */
#ifdef ERTS_SMP
	ErtsProcLocks need_locks = (~(*receiver_locks)
				    & (ERTS_PROC_LOCK_MSGQ
				       | ERTS_PROC_LOCK_STATUS));
	if (need_locks) {
	    *receiver_locks |= need_locks;
	    if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
		if (need_locks == ERTS_PROC_LOCK_MSGQ) {
		    erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
		    need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS;
		}
		erts_smp_proc_lock(receiver, need_locks);
	    }
	}
	if (!ERTS_PROC_PENDING_EXIT(receiver))
#endif
	{
	    ErlMessage* mp = message_alloc();

            DTRACE6(message_send, sender_name, receiver_name,
                    size_object(message), tok_label, tok_lastcnt, tok_serial);
	    mp->data.attached = NULL;
	    ERL_MESSAGE_TERM(mp) = message;
	    ERL_MESSAGE_TOKEN(mp) = NIL;
#ifdef USE_VM_PROBES
	    ERL_MESSAGE_DT_UTAG(mp) = NIL;
#endif
	    mp->next = NULL;

		// SMP下把消息移到进程私有堆尾部(纯指针操作)
	    ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);

		// 把消息追加到消息队列尾部(纯指针操作)
	    LINK_MESSAGE_PRIVQ(receiver, mp);

		// res 取进程消息队列长度
	    res = receiver->msg.len;

	    if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
		trace_receive(receiver, message);
	    }
	}
        BM_SWAP_TIMER(send,system);
    } else {
	// 进程发送消息给别的进程
	
#ifdef ERTS_SMP
	ErlOffHeap *ohp;
        Eterm *hp;
	erts_aint32_t state;

	BM_SWAP_TIMER(send,size);
	msize = size_object(message);
	BM_SWAP_TIMER(size,send);
	
	// 接收者进程分配一个大小为 msize 的堆空间,用于存放这个消息
	hp = erts_alloc_message_heap_state(msize,
					   &bp,
					   &ohp,
					   receiver,
					   receiver_locks,
					   &state);
	BM_SWAP_TIMER(send,copy);
	
	// 复制消息到接受者进程,返回值 message 有可能是引用二进制 refc binary
	message = copy_struct(message, msize, &hp, ohp);
	BM_MESSAGE_COPIED(msz);
	BM_SWAP_TIMER(copy,send);
        DTRACE6(message_send, sender_name, receiver_name,
                msize, tok_label, tok_lastcnt, tok_serial);

	// res 取接收者进程消息队列长度
	res = queue_message(sender,
			    receiver,
			    receiver_locks,
			    &state,
			    bp,
			    message,
			    token
#ifdef USE_VM_PROBES
			    , NIL
#endif
	    );
        BM_SWAP_TIMER(send,system);
#else
	ErlMessage* mp = message_alloc();
        Eterm *hp;
        BM_SWAP_TIMER(send,size);
	msize = size_object(message);
        BM_SWAP_TIMER(size,send);
	
	// 检查接收者进程内存不足,执行GC
	if (receiver->stop - receiver->htop <= msize) {
            BM_SWAP_TIMER(send,system);
	    erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity);
            BM_SWAP_TIMER(system,send);
	}
	hp = receiver->htop;
	receiver->htop = hp + msize;
        BM_SWAP_TIMER(send,copy);
		
	// 处理引用二进制的数据(修改引用计数)
	message = copy_struct(message, msize, &hp, &receiver->off_heap);
	BM_MESSAGE_COPIED(msize);
        BM_SWAP_TIMER(copy,send);
        DTRACE6(message_send, sender_name, receiver_name,
                (uint32_t)msize, tok_label, tok_lastcnt, tok_serial);
	ERL_MESSAGE_TERM(mp) = message;
	ERL_MESSAGE_TOKEN(mp) = NIL;
#ifdef USE_VM_PROBES
	ERL_MESSAGE_DT_UTAG(mp) = NIL;
#endif
	mp->next = NULL;
	mp->data.attached = NULL;
	LINK_MESSAGE(receiver, mp);
	
	// res 取接收者进程消息队列长度
	res = receiver->msg.len;
	
	/* 将接收者进程添加到调度队列
	 * 接收者进程可能receive消息导致失去调度,在新消息到来时需要将进程加到调度队列。
	 */
	erts_proc_notify_new_message(receiver);

	if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
	    trace_receive(receiver, message);
	}
        BM_SWAP_TIMER(send,system);
#endif /* #ifndef ERTS_SMP */
    }
   return res;
}
再来看看 remote_send() 函数:
/** 
 *   bif.c remote_send()函数,实现远程消息发送
 *   dist_entry是erlang分布式接口
 */
static Sint remote_send(Process *p, DistEntry *dep,
			Eterm to, Eterm full_to, Eterm msg, int suspend)
{
    Sint res;
    int code;
    ErtsDSigData dsd;

    ASSERT(is_atom(to) || is_external_pid(to));

	//检查dist_entry状态
    code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, !suspend);
    switch (code) {
    case ERTS_DSIG_PREP_NOT_ALIVE:
    case ERTS_DSIG_PREP_NOT_CONNECTED: // 连接不存在直接返回 SEND_TRAP
	res = SEND_TRAP;
	break;
	
	// suspend 走这里,返回 SEND_YIELD
    case ERTS_DSIG_PREP_WOULD_SUSPEND:
	ASSERT(!suspend);
	res = SEND_YIELD;
	break;
	
	// nosuspend 在dist_entry连接非稳定状态时会走这里,强制把消息压入发送队列,返回 0,即发送完成
    case ERTS_DSIG_PREP_CONNECTED: {

	if (is_atom(to))
	    code = erts_dsig_send_reg_msg(&dsd, to, msg);
	else
	    code = erts_dsig_send_msg(&dsd, to, msg);
	/*
	 * Note that reductions have been bumped on calling
	 * process by erts_dsig_send_reg_msg() or
	 * erts_dsig_send_msg().
	 */
	if (code == ERTS_DSIG_SEND_YIELD)
	    res = SEND_YIELD_RETURN;
	else
	    res = 0;
	break;
    }
    default:
	ASSERT(! "Invalid dsig prepare result");
	res = SEND_INTERNAL_ERROR;
    }

    if (res >= 0) {
	if (IS_TRACED(p))
	    trace_send(p, full_to, msg);
	if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
	    save_calls(p, &exp_send);
    }

    return res;
}
其中,erts_dsig_send_msg()底层调用了 dsig_send,把消息放到发送队列,再由 port_task 工作线程负责把消息投递到其他节点。这里面篇幅较大,主要是dist.c,erl_port_task.c, erl_process.c,erl_node_tables.c, io.c 这几个模块,以后找时间再讲。


问题讨论

1、erlang:send 不带 nosuspend/ noconnect, 会导致消息堆积?
erlang被挂起时(也就是进程是{status,suspended}),就会导致当前这个进程消息堆积。
30> Pid = spawn(fun() -> receive M -> M end end).
<0.68.0>
31> process_info(Pid,messages).
{messages,[]}
32> erlang:suspend_process(Pid).
true
33> Pid ! hello.
hello
34> process_info(Pid,status).
{status,suspended}
35> process_info(Pid,messages).
{messages,[hello]}

2、为什么erlang文档说慎用 nosuspend ?
因为,当消息无法发送时,原本会导致发送者进程失去调度权,但是这种方法则会以返回值的方式通知调用者端口忙,不会抑制发送者进程。特别是在分布式环境下,当erlang:send 使用了nosuspend时,当端口繁忙堆积了很多消息时,还会强制把消息压入端口发送队列,如果端口一直处于不稳定的状态,就会导致消息不停的堆积,撑爆内存。所以,还是建议使用 noconnect,当连接异常时就会返回 noconnect ,程序这边再做异常处理

3、erlang:send返回nosuspend 时这个消息是发送出去了,还是没发送出去?
当返回nosuspend 时这个消息肯定是没发送出去。(文章这里最开始写错了,所以网上搜到这篇文章都是错的,建议还是少上那些复制网站)


扩展延伸

reductions
可以理解为 Erlang的基本调度计量单位,Erlang VM基于reduction来进行调度,用来保证调度实现的准实时性。进程的 reduction 值越高,得到的调度机会就越多。

trap_send
前面代码提到了SEND_TRAP 错误,也就是这里
    case SEND_TRAP:   
    if (connect) {  
        BIF_TRAP3(dsend3_trap, p, to, msg, opts);   
    } else {  
        BIF_RET(am_noconnect);  
    }  
    break; 
看下BIF_TRAP3的代码,实际是个宏,修改当前进程的'寄存器'信息,关键是设置 freason 为 TRAP
#define BIF_TRAP3(Trap_, p, A0, A1, A2) do {			\
      Eterm* reg = ERTS_PROC_GET_SCHDATA((p))->x_reg_array;	\
      (p)->arity = 3;						\
      reg[0] = (A0);						\
      reg[1] = (A1);						\
      reg[2] = (A2);						\
      (p)->i = (BeamInstr*) ((Trap_)->addressv[erts_active_code_ix()]); \
      (p)->freason = TRAP;					\
      return THE_NON_VALUE;					\
 } while(0)
现在看下这个过程是怎么工作的:
/*
 * beam_emu.c process_main() 线程入口函数,实现VM调度
 * 以下截取 bif 处理过程
 */
OpCase(call_bif_e):
    {
	Eterm (*bf)(Process*, Eterm*, BeamInstr*) = GET_BIF_ADDRESS(Arg(0)); // 根据参数获取bif实际执行函数
	Eterm result;
	BeamInstr *next;

	PRE_BIF_SWAPOUT(c_p);
	c_p->fcalls = FCALLS - 1;
	if (FCALLS <= 0) {
	   save_calls(c_p, (Export *) Arg(0));
	}
	PreFetch(1, next);
	ASSERT(!ERTS_PROC_IS_EXITING(c_p));
	reg[0] = r(0);
	result = (*bf)(c_p, reg, I); // 执行bif函数
	ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
	ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
	ERTS_HOLE_CHECK(c_p);
	ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
	PROCESS_MAIN_CHK_LOCKS(c_p);
	if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) {
	    Uint arity = ((Export *)Arg(0))->code[2];
	    result = erts_gc_after_bif_call(c_p, result, reg, arity);
	    E = c_p->stop;
	}
	HTOP = HEAP_TOP(c_p);
	FCALLS = c_p->fcalls;
	if (is_value(result)) {
	    r(0) = result;
	    CHECK_TERM(r(0));
	    NextPF(1, next);
	} else if (c_p->freason == TRAP) { // 当 freason 设置为 TRAP 时
	    SET_CP(c_p, I+2);
	    SET_I(c_p->i);
	    SWAPIN;
	    r(0) = reg[0];
	    Dispatch(); // 到这一步之后就会调用 dsend3_trap 指向的函数,涉及VM实现,这里不多讲了

这里也说下dsend3_trap 指向哪个函数:
/* dist.c erlang分布式上层实现函数
 */
static Export* trap_function(Eterm func, int arity)
{
    return erts_export_put(am_erlang, func, arity); // 从导出函数表获取函数地址
}

void init_dist(void)
{
    init_nodes_monitors();

    nodedown.reason = NIL;
    nodedown.bp = NULL;

    erts_smp_atomic_init_nob(&no_nodes, 0);
    erts_smp_atomic_init_nob(&no_caches, 0);

    /* Lookup/Install all references to trap functions */
    dsend2_trap = trap_function(am_dsend,2);
    dsend3_trap = trap_function(am_dsend,3);    // dsend3_trap 指向的函数就是 erlang:dsend/3
    /*    dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
    dlink_trap = trap_function(am_dlink,1);
    dunlink_trap = trap_function(am_dunlink,1);
    dmonitor_node_trap = trap_function(am_dmonitor_node,3);
    dgroup_leader_trap = trap_function(am_dgroup_leader,2);
    dexit_trap = trap_function(am_dexit, 2);
    dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
}
为什么erlang要大费周章搞trap?erlang:send() bif化后又执行非bif函数(erlang:dsend/3)
erlang:send/3 只有在节点连接失败的情况下才会执行erlang:dsend/3,这个时候,当前进程就会失去CPU调度权,放到了下一次调度去执行这个函数。而且,这一来一回就扣除了不少reductions

时间不早了,有点困了。找时间再研究下reductions 的实际影响,怎么用来实现软实时

2015/1/25 修正erlang:send 返回nosuspend时消息没发送出去
2015/5/22 修正erts_proc_notify_new_message 的注释说明
参考:http://blog.csdn.net/mycwq/article/details/42845385

分享到:
评论

相关推荐

    python入门-30.寻找列表中只出现一次的数字-寻找单身狗.py

    python入门-30.寻找列表中只出现一次的数字——寻找单身狗.py

    布尔教育linux优化笔记

    linux优化笔记,配套视频:https://www.bilibili.com/list/474327672?sid=4496133&spm_id_from=333.999.0.0&desc=1

    知识付费系统-直播+讲师入驻+课程售卖+商城系统-v2.1.9版本搭建以及资源分享下载

    知识付费系统-直播+讲师入驻+课程售卖+商城系统-v2.1.9版本搭建以及资源分享下载,CRMEB知识付费分销与直播营销系统是由西安众邦科技自主开发的一款在线教育平台,该系统不仅拥有独立的知识产权,还采用了先进的ThinkPhp5.0框架和Vue前端技术栈,集成了在线直播教学及课程分销等多种功能,旨在为用户提供全方位的学习体验,默认解压密码youyacaocom

    美妆神域-JAVA-基于springBoot美妆神域设计与实现

    美妆神域-JAVA-基于springBoot美妆神域设计与实现

    原生js制作Google粘土logo动画涂鸦代码.zip

    原生js制作Google粘土logo动画涂鸦代码.zip

    golin 扫描工具使用, 检查系统漏洞、web程序漏洞

    golin 扫描工具使用, 检查系统漏洞、web程序漏洞

    原生态纯js图片网格鼠标悬停放大显示特效代码下载.zip

    原生态纯js图片网格鼠标悬停放大显示特效代码下载.zip

    用AWLUM进行灰色编码2^2n-QAM调制的精确率Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    去水印web端独立版web

    去水印web端独立版web

    原生js制作左侧浮动可折叠在线客服代码.zip

    原生js制作左侧浮动可折叠在线客服代码.zip

    Chrome 谷歌浏览器下载

    Chrome 谷歌浏览器下载

    亲测全新完整版H5商城系统源码 附教程

    全新完整版H5商城系统源码 自己花钱买的,亲测可用,需要自行下载 H5商城系统设置是实现商城基本功能的核心部分,涵盖了从网站配置、短信和支付配置,到商品、工单、订单、分站和提现管理等多个模块的设置。以下是详细的设置指南,帮助您快速上手并高效管理商城系统。 测试环境:Nginx+PHP7.0+MySQL5.6 1. 网站配置 设置商城名称、LOGO、标题、联系方式和SEO关键词等,确保商城专业和易于搜索。 2. 短信配置 配置短信接口和模板,用于发送订单通知、验证码等,提升用户体验。 3. 支付接口配置 配置微信、支付宝等支付接口,填写API密钥和回调地址,确保支付流畅。 4. 商品分类管理 对商品进行分类和排序,设置分类名称和图标,便于用户查找商品。 5. 商品管理 添加和管理商品信息、规格、图片等,确保商品信息准确丰富。 6. 工单管理 查看和回复用户工单,记录售后问题,提升用户服务质量。 7. 订单管理 查看订单详情,更新订单状态,支持批量导出,方便订单跟踪。 8. 分站管理 创建不同区域分站,设置权限,统一管理各区域市场。 9. 提现管理

    短信3.141592672893982398674234

    apk安装包

    原生js选项卡插件自定义图片滑动选项卡切换.zip

    原生js选项卡插件自定义图片滑动选项卡切换.zip

    1-宗教信息佛教佛寺寺庙庵堂相关数据-社科数据.zip

    宗教信息佛教佛寺寺庙庵堂相关数据集提供了全国各个地区省市县各个佛教寺庙的详细信息。这些数据不仅包括寺庙的名称和负责人姓名,还涵盖了所属省份、地级市、区县、具体地址、建立日期以及支派类别等关键信息。该数据集整理了超过3万条样本,为研究中国佛教寺庙的分布、历史和文化提供了丰富的第一手资料。这些信息有助于了解佛教在中国的传播和发展,以及寺庙在社会和文化中的作用。数据的整理和提供,对于宗教学、社会学、历史学和文化研究等领域的学者来说,是一个宝贵的资源。

    线性电阻网络的等效电阻计算Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    简单的 Python 版本管理.zip

    简单的 Python 版本管理pyenvpyenv 可让您轻松在多个 Python 版本之间切换。它简单、不引人注目,并遵循 UNIX 传统,即使用单一用途的工具来做好一件事。该项目由rbenv和 ruby​​-build分叉而来,并针对 Python 进行了修改。pyenv 的作用是什么......允许您根据每个用户更改全局 Python 版本。为每个项目的 Python 版本提供支持。允许您使用环境变量覆盖 Python 版本。一次搜索多个 Python 版本的命令。这可能有助于使用tox跨 Python 版本进行测试。与 pythonbrew 和 pythonz 相比,pyenv没有……依赖于Python本身。pyenv由纯shell脚本制作。不存在Python的引导问题。需要加载到你的 shell 中。相反,pyenv 的 shim 方法通过向你的 中添加目录来工作PATH。管理虚拟环境。当然,你可以自己创建虚拟环境 ,或者使用pyenv-virtualenv 来自动化该过程。目录安装获取 PyenvLinux/UNIX自动安装程序基本

    Notepad-v2.20工具,是替代Notepad++的首选工具

    Notepad-v2.20工具,是替代Notepad++的首选工具

    原生js随机图片拖拽排序代码.zip

    原生js随机图片拖拽排序代码.zip

    更快、更好、更稳定的Redis桌面(GUI)管理客户端,兼容Windows、Mac、Linux,性能出众,轻松加载海量键值

    更快、更好、更稳定的Redis桌面(GUI)管理客户端,兼容Windows、Mac、Linux,性能出众,轻松加载海量键值

Global site tag (gtag.js) - Google Analytics