`
wqtn22
  • 浏览: 101071 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

erlang:ports与erlang:processes引发的问题

    博客分类:
  • erts
 
阅读更多

最近4399的同学遇到一个问题,以下是他的描述:

 

“用erlang:ports得出来的port列表里,很多port的port_info都是undefined,实际上这些ports应该都已经被关闭了,手动调用close去关闭这些port的话会抛出异常。”

首先重现他的场景:

在时刻t1调用erlang:ports()时可以得到erlang虚拟机在t1的所有port,在t1时刻之后的t2时刻,再次调用erlang:port_info()得到每个port的信息,两次调用间有一个时间差tv。

根据这个场景,有两种假设:

1.在时间差tv内,可能有一部分端口被关闭,导致t1时刻有效的port不一定在t2时刻有效;

2.在时间差tv内,没有任何端口被关闭。

很显然,因为端口的关闭随时可能发生,第一种假设是绝对有可能存在的,我们仅仅需要分析第二种假设即可。

在第二种假设下,我们开始分析erlang虚拟机的部分代码:

bif.c

 

BIF_RETTYPE ports_0(BIF_ALIST_0)

{

    Eterm res = NIL;

    Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP, sizeof(Eterm)*erts_max_ports);

    /* 分配一个记录port的缓冲区,最大尺寸为erts_max_ports,该数值可由虚拟机环境变量ERL_MAX_PORTS控制,指定虚拟机最大可拥有的port数目 */

 

    Eterm* pp = port_buf;

    Eterm* dead_ports;

    int alive, dead;

    Uint32 next_ss;

    int i;

 

    /* To get a consistent snapshot... 

     * We add alive ports from start of the buffer

     * while dying ports are added from the other end by the killing threads.

     */

    /* 这段话看起来挺奇怪,为了取得一个一致的快照,我们从buffer的头部开始添加活port,从尾部添加退出port */

 

    erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */

 

    erts_smp_atomic_set(&erts_dead_ports_ptr, (erts_aint_t) (port_buf + erts_max_ports));

 

    /* 又是一段奇怪的代码,将一个全局指针设为此次分配的port记录缓冲区的尾部 */

 

 

    next_ss = erts_smp_atomic32_inctest(&erts_ports_snapshot);

 

    for (i = erts_max_ports-1; i >= 0; i--) {

Port* prt = &erts_port[i];

erts_smp_port_state_lock(prt);

if (!(prt->status & ERTS_PORT_SFLGS_DEAD)

   && prt->snapshot != next_ss) {

   ASSERT(prt->snapshot == next_ss - 1);

   *pp++ = prt->id;

   prt->snapshot = next_ss; /* Consumed by this snapshot */

}

erts_smp_port_state_unlock(prt);

    }

    /*

    这里遍历所有的port,将那些不是ERTS_PORT_SFLGS_DEAD的port记录到port记录缓冲区内,ERTS_PORT_SFLGS_DEAD是一个宏,其定义如下:

    #define ERTS_PORT_SFLGS_DEAD \

      (ERTS_PORT_SFLG_FREE \

       | ERTS_PORT_SFLG_FREE_SCHEDULED \

       | ERTS_PORT_SFLG_INITIALIZING)

     也即将那些活的port记录到这个缓冲区内,为了保持该函数的可重入性,使用了一个简单的区分标识next_ss和port的成员snapshot共同作用,读者可细细品味

    */

 

    dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr,  (erts_aint_t) NULL);

    /* 这段代码的含义是,原子的进行一次读+写操作,该操作将之前设为port记录缓冲区尾部的全局指针erts_dead_ports_ptr,设置为NULL,同时取出其原值,这段操作和之前设置erts_dead_ports_ptr的操作遥相呼应,等价于为全局指针erts_dead_ports_ptr分配了一个临时缓冲区,共给其它部分使用,然后又在此处收回,稍侯来观察谁会使用这个由erts_dead_ports_ptr指向的临时缓冲区 */

 

 

 

    ASSERT(pp <= dead_ports);

 

    alive = pp - port_buf;

    dead = port_buf + erts_max_ports - dead_ports;

 

    /* 此处终于显现了一些端倪,alive比较好理解,即port记录缓冲区头部包含的port,而dead则是port记录缓冲区尾部包含的port */

 

    ASSERT((alive+dead) <= erts_max_ports);

 

 

    if (alive+dead > 0) {

erts_aint_t i;

Eterm *hp = HAlloc(BIF_P, (alive+dead)*2);

 

for (i = 0; i < alive; i++) {

   res = CONS(hp, port_buf[i], res);    

   hp += 2;

}

for (i = 0; i < dead; i++) {

   res = CONS(hp, dead_ports[i], res);

   hp += 2;

}

    }

    /* 这里构造返回结果,将记录缓冲区头尾两端的port统统加入返回结果中去 */

 

 

 

 

    erts_free(ERTS_ALC_T_TMP, port_buf);

    /* 释放临时分配的port记录缓冲区 */

 

    BIF_RET(res);

}

 

让我们再来看看是谁使用了erts_dead_ports_ptr记录的临时port记录缓冲区:

global.h

 

ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)

{

    ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));

    if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {

/* Dead ports are added from the end of the snapshot buffer */

Eterm* tombstone = (Eterm*) erts_smp_atomic_addtest(&erts_dead_ports_ptr,

   -(erts_aint_t)sizeof(Eterm));

ASSERT(tombstone+1 != NULL);

ASSERT(prt->snapshot == erts_smp_atomic32_read(&erts_ports_snapshot) - 1);

*tombstone = prt->id;

        /* 注意此处,仅当发现port的snapshot标识变化时,才将port加入到erts_ports_snapshot所指的临时缓冲区,加入的顺序是从缓冲区的尾部向前,而erts_may_save_closed_port函数应该有可能在ports_0调用期间被别处调用 */

    }

    /*else no ongoing snapshot or port was already included or created after snapshot */

}

再来看看erts_may_save_closed_port的调用经历:

io.c terminate_port

io.c kill_port

erl_port_task.c erts_may_save_closed_port

terminate_port主要在port退出时调用,调用时,会产生一个额外的效果,即调用erts_may_save_closed_port,将退出的port记录到全局指针erts_ports_snapshot指向的临时缓冲区内,如果这个缓冲区存在,则一定有某个进程调用了ports_0,也即erlang:ports(),此时erts_ports_snapshot将这些已经退出的port也加入到ports_0的返回结果内。

为了验证这个过程,我们需要做一个实验,即调用ports_0时,也调用一些能够导致port关闭的函数,若已经关闭的port仍然出现在ports_0的返回结果内,就表明我们的猜测是正确的。

这需要对对ports_0做一点小小的hack:

在ports_0更改每个port的snapshot标识之后,加入一些延迟,保证关闭端口的函数可以在ports_0释放erts_ports_snapshot指向的临时缓冲区之前能够关闭port,以触发terminate_port最终调用erts_may_save_closed_port记录这些已经关闭的port。

改动如下:

bif.c

BIF_RETTYPE ports_0(BIF_ALIST_0)

{

    ....

 

    for (i = erts_max_ports-1; i >= 0; i--) {

Port* prt = &erts_port[i];

erts_smp_port_state_lock(prt);

if (!(prt->status & ERTS_PORT_SFLGS_DEAD)

   && prt->snapshot != next_ss) {

   ASSERT(prt->snapshot == next_ss - 1);

   *pp++ = prt->id;

   prt->snapshot = next_ss; /* Consumed by this snapshot */

}

erts_smp_port_state_unlock(prt);

    }

 

    sleep(10);

 

    /* 加入一个延迟,能够保证ports_0在返回前,某些关闭端口的函数得到执行 */

    dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr, (erts_aint_t) NULL);

    ....

}

 

以简单的打开文件为例(都是port,套接字也同理),测试过程如下:

1.打开10个文件描述符:

FDList = [begin {ok, FD} = file:open(OCFile, [raw, append]), FD end||_I <- lists:seq(1, 10)].

2.启动一个独立的进程,不要与控制台进程在同一个调度器上,否则看不到并发执行的效果:

spawn(fun() -> process_flag(scheduler, 10), io:format("all ports: ~p~n", [erlang:ports()]) end).

3.我们有10秒的时间可以去关闭这些打开的文件句柄:

[file:close(FD)||FD <-  FDList ].

静候10秒,可以发现erlang:ports()返回的结果中包含了已经关闭的文件描述符,再次运行erlang:ports(),发现返回的结果中不包含已经关闭的文件描述符,而实际上两次erlang:ports()时间间隔内没有关闭任何的port,由于第一次的erlang:ports()返回的结果中包含了已经关闭的port,对其调用erlang:port_info自然返回undefined。

由于贴图神马的还没有用过,就直接告诉大家结果了,读者也可以自行验证一下。

对于erlang:processes(),文档中就直接说明了这种情况,即即使进程在调用erlang:processes()期间退出,仍然会包含在最终的返回结果集里面。

这样的场景在进程/端口数少的时候,体现的不太明显,但若进程/端口数很多时(尤其按照霸爷所经历过的场景,并发百万级进程),erlang:processes()/erlang:ports()将执行的很慢(erlang:processes()还对这样的情况做了特殊处理,有兴趣的读者可以看看它的代码,就是erl_process.c的processes_0函数),期间如果有任何进程退出,都将包含在最终的返回结果内,有时可能会引起误解,但erlang官方可能是想给用户一个更为一致的瞬时快照结果吧。

对于这些已经退出却仍然包含在返回结果内的进程/端口,其本身是不会产生资源泄露的,这里简单分析下port的释放过程:

 

static void terminate_port(Port *prt)

{

    Eterm send_closed_port_id;

    Eterm connected_id = NIL /* Initialize to silence compiler */;

    erts_driver_t *drv;

 

    ERTS_SMP_CHK_NO_PROC_LOCKS;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

 

    ASSERT(!prt->nlinks);

    ASSERT(!prt->monitors);

 

    if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {

erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);

send_closed_port_id = prt->id;

connected_id = prt->connected;

    }

    else {

send_closed_port_id = NIL;

    }

 

#ifdef ERTS_SMP

    erts_cancel_smp_ptimer(prt->ptimer);

#else

    erts_cancel_timer(&prt->tm);

#endif

 

    drv = prt->drv_ptr;

    if ((drv != NULL) && (drv->stop != NULL)) {

int fpe_was_unmasked = erts_block_fpe();

(*drv->stop)((ErlDrvData)prt->drv_data);

        /* 若有则调用port的driver的stop函数 */

erts_unblock_fpe(fpe_was_unmasked);

#ifdef ERTS_SMP

if (prt->xports)

   erts_smp_xports_unlock(prt);

ASSERT(!prt->xports);

#endif

    }

    if(drv->handle != NULL) {

erts_smp_mtx_lock(&erts_driver_list_lock);

erts_ddll_decrement_port_count(drv->handle); 

        /* 若driver使用了动态链接库或共享库,则减少其引用计数 */

erts_smp_mtx_unlock(&erts_driver_list_lock);

    }

    stopq(prt);        /* clear queue memory */

    if(prt->linebuf != NULL){

erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf);

        /* 释放用于保存未集齐的数据的线性缓冲区 */

prt->linebuf = NULL;

    }

    if (prt->bp != NULL) {

free_message_buffer(prt->bp);

        /* 释放堆分片 */

prt->bp = NULL;

prt->data = am_undefined;

    }

 

    if (prt->psd)

erts_free(ERTS_ALC_T_PRTSD, prt->psd);

        /* 释放port特定数据结构占用的内存 */

 

    kill_port(prt);

 

    /*

     * We don't want to send the closed message until after the

     * port has been removed from the port table (in kill_port()).

     */

    if (is_internal_port(send_closed_port_id))

deliver_result(send_closed_port_id, connected_id, am_closed);

 

    ASSERT(prt->dist_entry == NULL);

}

 

static ERTS_INLINE void kill_port(Port *pp)

{

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    erts_port_task_free_port(pp);

    ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD);

}

 

void erts_port_task_free_port(Port *pp)

{

    ErtsRunQueue *runq;

    int port_is_dequeued = 0;

 

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));

    runq = erts_port_runq(pp);

    ASSERT(runq);

    ERTS_PT_CHK_PRES_PORTQ(runq, pp);

    if (pp->sched.exe_taskq) {

/* I (this thread) am currently executing this port, free it

  when scheduled out... */

ErtsPortTask *ptp = port_task_alloc();

erts_smp_port_state_lock(pp);

pp->status &= ~ERTS_PORT_SFLG_CLOSING;

pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;

erts_may_save_closed_port(pp);

erts_smp_port_state_unlock(pp);

ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 1);

ptp->type = ERTS_PORT_TASK_FREE;

ptp->event = (ErlDrvEvent) -1;

ptp->event_data = NULL;

set_handle(ptp, NULL);

push_task(pp->sched.exe_taskq, ptp);

ERTS_PT_CHK_PRES_PORTQ(runq, pp);

erts_smp_runq_unlock(runq);

    }

    else {

        /* 仅仅分析这个简单的场景以说明问题,另外一个场景类似 */

ErtsPortTaskQueue *ptqp = pp->sched.taskq;

if (ptqp) {

   dequeue_port(runq, pp);

   ERTS_PORT_NOT_IN_RUNQ(pp);

   port_is_dequeued = 1;

}

erts_smp_port_state_lock(pp);

pp->status &= ~ERTS_PORT_SFLG_CLOSING;

pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;

        /* port的状态被更改为了ERTS_PORT_SFLG_FREE_SCHEDULED,它也是ERTS_PORT_SFLGS_DEAD的一种 */

erts_may_save_closed_port(pp);

        /* 能够让erts_dead_ports_ptr保存已经退出的port,则port在退出时一定走到了这里,我们其实仅需要关注在这里之后是否有port资源泄露即可 */

erts_smp_port_state_unlock(pp);

#ifdef ERTS_SMP

erts_smp_atomic_dec(&pp->refc); /* Not alive */

#endif

ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 0); /* Lock */

handle_remaining_tasks(runq, pp); /* May release runq lock */

        /*这个函数将释放挂在port上的所有ErtsPortTask,port能够执行的各项任务,也被像消息一样发给port,由port异步执行,这里将释放port的任务队列上的所有ErtsPortTask任务的数据结构*/

ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));

pp->sched.taskq = NULL;

ERTS_PT_CHK_PRES_PORTQ(runq, pp);

#ifndef ERTS_SMP

ASSERT(pp->status & ERTS_PORT_SFLG_PORT_DEBUG);

erts_port_status_set(pp, ERTS_PORT_SFLG_FREE);

        /* port的状态又被改为了ERTS_PORT_SFLG_FREE,它也是ERTS_PORT_SFLGS_DEAD的一种,但设置为这个状态后,表名port原先的描述符Port数据结构可以被重新分配给一个新建立的port了,因为之前已经触发了erts_may_save_closed_port,因此按照顺序执行流的执行,除非发生异常,否则必然会到此处 */

#endif

erts_smp_runq_unlock(runq);

 

if (erts_system_profile_flags.runnable_ports && port_is_dequeued) {

       profile_runnable_port(pp, am_inactive);

    }

 

if (ptqp)

   port_taskq_free(ptqp);

        /*释放port的任务队列*/

    }

}

由此可见port的释放其实没有那么复杂,虚拟机本身就有port数量限制,每次的port释放都仅仅将port的描述符设置为ERTS_PORT_SFLG_FREE以进行复用,而不会真正释放数据结构。

再来看看用于获取空闲port描述符的get_free_port:

io.c

 

static int get_free_port(void)

{

    Uint num;

    Uint tries = erts_max_ports;

    Port* port;    

 

    erts_smp_spin_lock(&get_free_port_lck);

    num = last_port_num + 1;

    for (;; ++num) {

port = &erts_port[num & erts_port_tab_index_mask];

 

erts_smp_port_state_lock(port);

if (port->status & ERTS_PORT_SFLG_FREE) {

   last_port_num = num;

   erts_smp_spin_unlock(&get_free_port_lck);

   break;

}

erts_smp_port_state_unlock(port);

 

if (--tries == 0) {

   erts_smp_spin_unlock(&get_free_port_lck);

   return -1;

}

    }

    port->status = ERTS_PORT_SFLG_INITIALIZING;

#ifdef ERTS_SMP

    ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&port->refc) == 0);

    erts_smp_atomic_set(&port->refc, 2); /* Port alive + lock */

#endif

    erts_smp_port_state_unlock(port);

    return num & port_num_mask;

}

get_free_port用于取得一个空闲port描述符,它将遍历erts_port记录的所有port描述符,然后从中取得一个状态为ERTS_PORT_SFLG_FREE的描述符。

由此可见port的分配与释放都不会引发port描述符的内存分配与释放,仅仅会复用一个而已。

至此,问题原因已经基本清楚了,erlang:ports()和erlang:processes()将返回在某个时刻的端口和进程的快照,这样的结果更加一致,因为时刻的快照比时间间隔的快照更加精准。

分享到:
评论

相关推荐

    Erlang:并行计算和云计算

    **Erlang: 并行计算与云计算** Erlang是一种动态类型的函数式编程语言,由爱立信在1986年为电话交换系统设计,后来因其在处理并发、分布式和容错性上的优秀特性,逐渐在并行计算和云计算领域崭露头角。 ### 1. ...

    erlang-23.2.1-1.el7.x86-64.rpm

    Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ ...

    Introducing Erlang: Getting Started in Functional Programming

    Introducing Erlang: Getting Started in Functional Programming by Simon St. Laurent English | 6 Mar. 2017 | ASIN: B06XHSP5SH | 212 Pages | AZW3 | 1.85 MB If you’re new to Erlang, its functional style...

    分布式应用Erlang:Erlang_OTP_19_win64

    Erlang的设计灵感来源于通信领域的实际需求,其并发模型基于轻量级进程(processes),这些进程之间的通信是通过消息传递来实现的,这使得Erlang在处理大量并发连接时表现出色。OTP则在此基础上,提供了强大的行为...

    erlang programming

    8. **Erlang与其他技术的集成**:Erlang可以与其他语言如Java、Python等集成,用于构建混合系统。例如,使用Erlang的Ranch和Cowboy库可以构建高性能的Web服务器和API。 9. **实时性与并发性**:Erlang的实时性使其...

    并行编程语言Erlang:Erlang OTP框架及其应用开发指南

    内容概要:本文档详细介绍了Erlang编程语言及其并行编程模型,并重点阐述了Erlang OTP框架的特性和应用场景。首先,文档简述了Erlang语言的特点,包括轻量级进程、模式匹配和热代码升级等特性,以及它在构建高可用、...

    Erlang入门:构建application练习4(进程link的作用)

    在Erlang编程语言中,进程是其核心特性之一,它们是并发执行的实体,类似于其他语言中的线程。在Erlang中,进程间通信(IPC)是通过消息传递来实现的,而`link`机制是这个通信模型中非常重要的一部分。本教程将通过...

    erlang资源

    1. **并发模型**:Erlang的并发基于轻量级进程(Lightweight Processes, LSPs),这些进程间的通信通过消息传递实现,这与传统的线程模型不同,具有更好的隔离性和容错性。 2. ** OTP(Open Telecom Platform)**:...

    RabbitMQ3.9.13和ErLang24.2版本

    1. **并发处理**:Erlang是一种面向并发的编程语言,其虚拟机(BEAM)设计支持轻量级进程,使得Erlang系统能同时处理大量并发任务,这正是RabbitMQ处理高并发消息需求的理想选择。 2. **容错性**:Erlang的错误恢复...

    Centos7安装RabbitMQ的文档和安装包(包含erlang安装包).rar

    RabbitMQ基于Erlang编程语言,因此在安装RabbitMQ之前,我们需要先安装Erlang环境。本文将涵盖以下几个关键知识点: 1. **Erlang安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。首先,我们需要...

    win64_erlang24.2.2

    win64位系统 。 erlang24.2.2。

    某流水过千W的erlang游戏后端

    1. **并发处理**:Erlang的轻量级进程(Lightweight Processes)机制使得它能够轻松处理大量并发连接,每个进程占用资源少,互不影响。在游戏后端中,这种特性尤为重要,因为游戏通常需要同时处理成千上万的玩家请求...

    erlang19安装包

    Erlang/OTP 19.1 is a service release containing mostly bug fixes, as well as a number of new features and characteristics improvements. Some highlights of the release are: erts: Improved dirty ...

    Erlang_win64_19.1和rabbitMQ综合安装包

    总的来说,Erlang和RabbitMQ是构建高可用、高性能分布式系统的关键工具,它们的结合为开发者提供了强大的基础设施,以处理复杂的数据通信和系统集成问题。正确安装和配置这两个组件是实现这一目标的基础。

    erlang节点连通测试

    在Erlang中,你可以使用`erlang:monitor_node/2`和`erlang:demonitor/1`函数来监控和取消监控其他节点的状态。当需要两个未连通的节点C和D进行通信时,可以在节点C上执行: ```erlang erlang:monitor_node(node(d),...

    docker-erlang:Erlang 的 Docker 镜像打包

    包括码头工人集装箱图像 ,与沿和构建和发布工具。 Erlang/OTP 的最新版本以标签的形式提供。 #我可以用来做什么? 使用它来试用 erlang shell,作为 erlang 开发环境,或将其用作您自己的 erlang 应用程序的基础...

    Erlang入门:构建application练习2

    3. **启动过程**:Erlang应用通过`start/2`函数启动,通常在`bank1_app.erl`中的`start/2`回调实现。这个函数负责启动应用的监督树,即`Supervisor`,它管理应用的所有进程。 4. **Supervisor**:在Erlang OTP...

    awesome-erlang:精选的Erlang库,资源和闪亮内容的精选列表

    - ** Debugger**:Erlang的内置调试器,帮助定位和修复代码问题。 - ** Dialyzer**:静态分析工具,可以检测出程序中的类型错误和潜在问题。 通过这个"awesome-erlang"列表,你可以逐步探索Erlang的世界,从基础...

    mongodb-erlang:Erlang的MongoDB驱动程序

    3. 分布式设计:Erlang天生适合构建分布式系统,与MongoDB的分布式存储和复制集特性相结合,可构建大规模分布式应用。 三、驱动程序的核心组件 1. `mongodb_connection`模块:负责建立和管理与MongoDB服务器的连接...

Global site tag (gtag.js) - Google Analytics