- 浏览: 101071 次
- 性别:
- 来自: 杭州
最新评论
-
johncan:
但是如果绑定错误,反而会有反效果,请问如何看是否绑定错误?是否 ...
erlang程序优化点的总结(持续更新) -
mahengyang:
如何使用mnesia:select/4分页查询%%对事务封装了 ...
mnesia监控项目 -
zjjxxl:
mysql代码高手,我们这里mysql5.6长时间运行内存过高 ...
innodb对B树游标的定位过程以及对“小于(等于)B树最小记录”的特殊处理 -
wqtn22:
是原创啊,霸爷mryufeng认证的,增加引用计数和减少引用计 ...
erlang NIF部分接口实现(三)持久资源 -
magicxiao:
enif_release_resource当引用计数为0的时候 ...
erlang NIF部分接口实现(三)持久资源
最近4399的同学遇到一个问题,以下是他的描述:
“用erlang:ports得出来的port列表里,很多port的port_info都是undefined,实际上这些ports应该都已经被关闭了,手动调用close去关闭这些port的话会抛出异常。”
首先重现他的场景:
在时刻t1调用erlang:ports()时可以得到erlang虚拟机在t1的所有port,在t1时刻之后的t2时刻,再次调用erlang:port_info()得到每个port的信息,两次调用间有一个时间差tv。
根据这个场景,有两种假设:
1.在时间差tv内,可能有一部分端口被关闭,导致t1时刻有效的port不一定在t2时刻有效;
2.在时间差tv内,没有任何端口被关闭。
很显然,因为端口的关闭随时可能发生,第一种假设是绝对有可能存在的,我们仅仅需要分析第二种假设即可。
在第二种假设下,我们开始分析erlang虚拟机的部分代码:
bif.c
BIF_RETTYPE ports_0(BIF_ALIST_0)
{
Eterm res = NIL;
Eterm* port_buf = erts_alloc(ERTS_ALC_T_TMP, sizeof(Eterm)*erts_max_ports);
/* 分配一个记录port的缓冲区,最大尺寸为erts_max_ports,该数值可由虚拟机环境变量ERL_MAX_PORTS控制,指定虚拟机最大可拥有的port数目 */
Eterm* pp = port_buf;
Eterm* dead_ports;
int alive, dead;
Uint32 next_ss;
int i;
/* To get a consistent snapshot...
* We add alive ports from start of the buffer
* while dying ports are added from the other end by the killing threads.
*/
/* 这段话看起来挺奇怪,为了取得一个一致的快照,我们从buffer的头部开始添加活port,从尾部添加退出port */
erts_smp_mtx_lock(&ports_snapshot_mtx); /* One snapshot at a time */
erts_smp_atomic_set(&erts_dead_ports_ptr, (erts_aint_t) (port_buf + erts_max_ports));
/* 又是一段奇怪的代码,将一个全局指针设为此次分配的port记录缓冲区的尾部 */
next_ss = erts_smp_atomic32_inctest(&erts_ports_snapshot);
for (i = erts_max_ports-1; i >= 0; i--) {
Port* prt = &erts_port[i];
erts_smp_port_state_lock(prt);
if (!(prt->status & ERTS_PORT_SFLGS_DEAD)
&& prt->snapshot != next_ss) {
ASSERT(prt->snapshot == next_ss - 1);
*pp++ = prt->id;
prt->snapshot = next_ss; /* Consumed by this snapshot */
}
erts_smp_port_state_unlock(prt);
}
/*
这里遍历所有的port,将那些不是ERTS_PORT_SFLGS_DEAD的port记录到port记录缓冲区内,ERTS_PORT_SFLGS_DEAD是一个宏,其定义如下:
#define ERTS_PORT_SFLGS_DEAD \
(ERTS_PORT_SFLG_FREE \
| ERTS_PORT_SFLG_FREE_SCHEDULED \
| ERTS_PORT_SFLG_INITIALIZING)
也即将那些活的port记录到这个缓冲区内,为了保持该函数的可重入性,使用了一个简单的区分标识next_ss和port的成员snapshot共同作用,读者可细细品味
*/
dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr, (erts_aint_t) NULL);
/* 这段代码的含义是,原子的进行一次读+写操作,该操作将之前设为port记录缓冲区尾部的全局指针erts_dead_ports_ptr,设置为NULL,同时取出其原值,这段操作和之前设置erts_dead_ports_ptr的操作遥相呼应,等价于为全局指针erts_dead_ports_ptr分配了一个临时缓冲区,共给其它部分使用,然后又在此处收回,稍侯来观察谁会使用这个由erts_dead_ports_ptr指向的临时缓冲区 */
ASSERT(pp <= dead_ports);
alive = pp - port_buf;
dead = port_buf + erts_max_ports - dead_ports;
/* 此处终于显现了一些端倪,alive比较好理解,即port记录缓冲区头部包含的port,而dead则是port记录缓冲区尾部包含的port */
ASSERT((alive+dead) <= erts_max_ports);
if (alive+dead > 0) {
erts_aint_t i;
Eterm *hp = HAlloc(BIF_P, (alive+dead)*2);
for (i = 0; i < alive; i++) {
res = CONS(hp, port_buf[i], res);
hp += 2;
}
for (i = 0; i < dead; i++) {
res = CONS(hp, dead_ports[i], res);
hp += 2;
}
}
/* 这里构造返回结果,将记录缓冲区头尾两端的port统统加入返回结果中去 */
erts_free(ERTS_ALC_T_TMP, port_buf);
/* 释放临时分配的port记录缓冲区 */
BIF_RET(res);
}
让我们再来看看是谁使用了erts_dead_ports_ptr记录的临时port记录缓冲区:
global.h
ERTS_GLB_INLINE void erts_may_save_closed_port(Port *prt)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&prt->state_lck));
if (prt->snapshot != erts_smp_atomic32_read_acqb(&erts_ports_snapshot)) {
/* Dead ports are added from the end of the snapshot buffer */
Eterm* tombstone = (Eterm*) erts_smp_atomic_addtest(&erts_dead_ports_ptr,
-(erts_aint_t)sizeof(Eterm));
ASSERT(tombstone+1 != NULL);
ASSERT(prt->snapshot == erts_smp_atomic32_read(&erts_ports_snapshot) - 1);
*tombstone = prt->id;
/* 注意此处,仅当发现port的snapshot标识变化时,才将port加入到erts_ports_snapshot所指的临时缓冲区,加入的顺序是从缓冲区的尾部向前,而erts_may_save_closed_port函数应该有可能在ports_0调用期间被别处调用 */
}
/*else no ongoing snapshot or port was already included or created after snapshot */
}
再来看看erts_may_save_closed_port的调用经历:
io.c terminate_port
io.c kill_port
erl_port_task.c erts_may_save_closed_port
terminate_port主要在port退出时调用,调用时,会产生一个额外的效果,即调用erts_may_save_closed_port,将退出的port记录到全局指针erts_ports_snapshot指向的临时缓冲区内,如果这个缓冲区存在,则一定有某个进程调用了ports_0,也即erlang:ports(),此时erts_ports_snapshot将这些已经退出的port也加入到ports_0的返回结果内。
为了验证这个过程,我们需要做一个实验,即调用ports_0时,也调用一些能够导致port关闭的函数,若已经关闭的port仍然出现在ports_0的返回结果内,就表明我们的猜测是正确的。
这需要对对ports_0做一点小小的hack:
在ports_0更改每个port的snapshot标识之后,加入一些延迟,保证关闭端口的函数可以在ports_0释放erts_ports_snapshot指向的临时缓冲区之前能够关闭port,以触发terminate_port最终调用erts_may_save_closed_port记录这些已经关闭的port。
改动如下:
bif.c
BIF_RETTYPE ports_0(BIF_ALIST_0)
{
....
for (i = erts_max_ports-1; i >= 0; i--) {
Port* prt = &erts_port[i];
erts_smp_port_state_lock(prt);
if (!(prt->status & ERTS_PORT_SFLGS_DEAD)
&& prt->snapshot != next_ss) {
ASSERT(prt->snapshot == next_ss - 1);
*pp++ = prt->id;
prt->snapshot = next_ss; /* Consumed by this snapshot */
}
erts_smp_port_state_unlock(prt);
}
sleep(10);
/* 加入一个延迟,能够保证ports_0在返回前,某些关闭端口的函数得到执行 */
dead_ports = (Eterm*)erts_smp_atomic_xchg(&erts_dead_ports_ptr, (erts_aint_t) NULL);
....
}
以简单的打开文件为例(都是port,套接字也同理),测试过程如下:
1.打开10个文件描述符:
FDList = [begin {ok, FD} = file:open(OCFile, [raw, append]), FD end||_I <- lists:seq(1, 10)].
2.启动一个独立的进程,不要与控制台进程在同一个调度器上,否则看不到并发执行的效果:
spawn(fun() -> process_flag(scheduler, 10), io:format("all ports: ~p~n", [erlang:ports()]) end).
3.我们有10秒的时间可以去关闭这些打开的文件句柄:
[file:close(FD)||FD <- FDList ].
静候10秒,可以发现erlang:ports()返回的结果中包含了已经关闭的文件描述符,再次运行erlang:ports(),发现返回的结果中不包含已经关闭的文件描述符,而实际上两次erlang:ports()时间间隔内没有关闭任何的port,由于第一次的erlang:ports()返回的结果中包含了已经关闭的port,对其调用erlang:port_info自然返回undefined。
由于贴图神马的还没有用过,就直接告诉大家结果了,读者也可以自行验证一下。
对于erlang:processes(),文档中就直接说明了这种情况,即即使进程在调用erlang:processes()期间退出,仍然会包含在最终的返回结果集里面。
这样的场景在进程/端口数少的时候,体现的不太明显,但若进程/端口数很多时(尤其按照霸爷所经历过的场景,并发百万级进程),erlang:processes()/erlang:ports()将执行的很慢(erlang:processes()还对这样的情况做了特殊处理,有兴趣的读者可以看看它的代码,就是erl_process.c的processes_0函数),期间如果有任何进程退出,都将包含在最终的返回结果内,有时可能会引起误解,但erlang官方可能是想给用户一个更为一致的瞬时快照结果吧。
对于这些已经退出却仍然包含在返回结果内的进程/端口,其本身是不会产生资源泄露的,这里简单分析下port的释放过程:
static void terminate_port(Port *prt)
{
Eterm send_closed_port_id;
Eterm connected_id = NIL /* Initialize to silence compiler */;
erts_driver_t *drv;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT(!prt->nlinks);
ASSERT(!prt->monitors);
if (prt->status & ERTS_PORT_SFLG_SEND_CLOSED) {
erts_port_status_band_set(prt, ~ERTS_PORT_SFLG_SEND_CLOSED);
send_closed_port_id = prt->id;
connected_id = prt->connected;
}
else {
send_closed_port_id = NIL;
}
#ifdef ERTS_SMP
erts_cancel_smp_ptimer(prt->ptimer);
#else
erts_cancel_timer(&prt->tm);
#endif
drv = prt->drv_ptr;
if ((drv != NULL) && (drv->stop != NULL)) {
int fpe_was_unmasked = erts_block_fpe();
(*drv->stop)((ErlDrvData)prt->drv_data);
/* 若有则调用port的driver的stop函数 */
erts_unblock_fpe(fpe_was_unmasked);
#ifdef ERTS_SMP
if (prt->xports)
erts_smp_xports_unlock(prt);
ASSERT(!prt->xports);
#endif
}
if(drv->handle != NULL) {
erts_smp_mtx_lock(&erts_driver_list_lock);
erts_ddll_decrement_port_count(drv->handle);
/* 若driver使用了动态链接库或共享库,则减少其引用计数 */
erts_smp_mtx_unlock(&erts_driver_list_lock);
}
stopq(prt); /* clear queue memory */
if(prt->linebuf != NULL){
erts_free(ERTS_ALC_T_LINEBUF, (void *) prt->linebuf);
/* 释放用于保存未集齐的数据的线性缓冲区 */
prt->linebuf = NULL;
}
if (prt->bp != NULL) {
free_message_buffer(prt->bp);
/* 释放堆分片 */
prt->bp = NULL;
prt->data = am_undefined;
}
if (prt->psd)
erts_free(ERTS_ALC_T_PRTSD, prt->psd);
/* 释放port特定数据结构占用的内存 */
kill_port(prt);
/*
* We don't want to send the closed message until after the
* port has been removed from the port table (in kill_port()).
*/
if (is_internal_port(send_closed_port_id))
deliver_result(send_closed_port_id, connected_id, am_closed);
ASSERT(prt->dist_entry == NULL);
}
static ERTS_INLINE void kill_port(Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
erts_port_task_free_port(pp);
ASSERT(pp->status & ERTS_PORT_SFLGS_DEAD);
}
void erts_port_task_free_port(Port *pp)
{
ErtsRunQueue *runq;
int port_is_dequeued = 0;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
ASSERT(runq);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
if (pp->sched.exe_taskq) {
/* I (this thread) am currently executing this port, free it
when scheduled out... */
ErtsPortTask *ptp = port_task_alloc();
erts_smp_port_state_lock(pp);
pp->status &= ~ERTS_PORT_SFLG_CLOSING;
pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
erts_may_save_closed_port(pp);
erts_smp_port_state_unlock(pp);
ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 1);
ptp->type = ERTS_PORT_TASK_FREE;
ptp->event = (ErlDrvEvent) -1;
ptp->event_data = NULL;
set_handle(ptp, NULL);
push_task(pp->sched.exe_taskq, ptp);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
erts_smp_runq_unlock(runq);
}
else {
/* 仅仅分析这个简单的场景以说明问题,另外一个场景类似 */
ErtsPortTaskQueue *ptqp = pp->sched.taskq;
if (ptqp) {
dequeue_port(runq, pp);
ERTS_PORT_NOT_IN_RUNQ(pp);
port_is_dequeued = 1;
}
erts_smp_port_state_lock(pp);
pp->status &= ~ERTS_PORT_SFLG_CLOSING;
pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
/* port的状态被更改为了ERTS_PORT_SFLG_FREE_SCHEDULED,它也是ERTS_PORT_SFLGS_DEAD的一种 */
erts_may_save_closed_port(pp);
/* 能够让erts_dead_ports_ptr保存已经退出的port,则port在退出时一定走到了这里,我们其实仅需要关注在这里之后是否有port资源泄露即可 */
erts_smp_port_state_unlock(pp);
#ifdef ERTS_SMP
erts_smp_atomic_dec(&pp->refc); /* Not alive */
#endif
ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&pp->refc) > 0); /* Lock */
handle_remaining_tasks(runq, pp); /* May release runq lock */
/*这个函数将释放挂在port上的所有ErtsPortTask,port能够执行的各项任务,也被像消息一样发给port,由port异步执行,这里将释放port的任务队列上的所有ErtsPortTask任务的数据结构*/
ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
pp->sched.taskq = NULL;
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
#ifndef ERTS_SMP
ASSERT(pp->status & ERTS_PORT_SFLG_PORT_DEBUG);
erts_port_status_set(pp, ERTS_PORT_SFLG_FREE);
/* port的状态又被改为了ERTS_PORT_SFLG_FREE,它也是ERTS_PORT_SFLGS_DEAD的一种,但设置为这个状态后,表名port原先的描述符Port数据结构可以被重新分配给一个新建立的port了,因为之前已经触发了erts_may_save_closed_port,因此按照顺序执行流的执行,除非发生异常,否则必然会到此处 */
#endif
erts_smp_runq_unlock(runq);
if (erts_system_profile_flags.runnable_ports && port_is_dequeued) {
profile_runnable_port(pp, am_inactive);
}
if (ptqp)
port_taskq_free(ptqp);
/*释放port的任务队列*/
}
}
由此可见port的释放其实没有那么复杂,虚拟机本身就有port数量限制,每次的port释放都仅仅将port的描述符设置为ERTS_PORT_SFLG_FREE以进行复用,而不会真正释放数据结构。
再来看看用于获取空闲port描述符的get_free_port:
io.c
static int get_free_port(void)
{
Uint num;
Uint tries = erts_max_ports;
Port* port;
erts_smp_spin_lock(&get_free_port_lck);
num = last_port_num + 1;
for (;; ++num) {
port = &erts_port[num & erts_port_tab_index_mask];
erts_smp_port_state_lock(port);
if (port->status & ERTS_PORT_SFLG_FREE) {
last_port_num = num;
erts_smp_spin_unlock(&get_free_port_lck);
break;
}
erts_smp_port_state_unlock(port);
if (--tries == 0) {
erts_smp_spin_unlock(&get_free_port_lck);
return -1;
}
}
port->status = ERTS_PORT_SFLG_INITIALIZING;
#ifdef ERTS_SMP
ERTS_SMP_LC_ASSERT(erts_smp_atomic_read(&port->refc) == 0);
erts_smp_atomic_set(&port->refc, 2); /* Port alive + lock */
#endif
erts_smp_port_state_unlock(port);
return num & port_num_mask;
}
get_free_port用于取得一个空闲port描述符,它将遍历erts_port记录的所有port描述符,然后从中取得一个状态为ERTS_PORT_SFLG_FREE的描述符。
由此可见port的分配与释放都不会引发port描述符的内存分配与释放,仅仅会复用一个而已。
至此,问题原因已经基本清楚了,erlang:ports()和erlang:processes()将返回在某个时刻的端口和进程的快照,这样的结果更加一致,因为时刻的快照比时间间隔的快照更加精准。
发表评论
-
erlang虚拟机topology不符导致启动后crash
2013-12-24 15:49 2010线上有一台t4的机器,这些机器的cpu topo是经过伪造 ... -
erlang程序优化点的总结(持续更新)
2013-03-03 15:40 11603转载请注明出处 注意,这里只是给出一个总结,具体性能 ... -
erlang NIF部分接口实现(五)复用driver功能的接口
2012-07-22 22:08 2369NIF除了自身提供的功能外,还封装了一系列driver的功能, ... -
erlang NIF部分接口实现(四)消息发送
2012-07-22 21:42 2655erlang中不能没有消息和异步过程,NIF也必须有此项能力, ... -
erlang NIF部分接口实现(三)持久资源
2012-07-22 21:05 2573持久资源是NIF中一类非常有用接口,可以把资源看成各种数据结构 ... -
erlang NIF部分接口实现(二)类型系统和内存分配接口
2012-07-22 19:12 3582NIF的内存管理接口为enif_alloc/enif_free ... -
erlang NIF部分接口实现(一)加载过程及编写框架
2012-07-22 18:18 7880最近在项目中频繁用到erlang的NIF接口,以扩展erlan ...
相关推荐
**Erlang: 并行计算与云计算** Erlang是一种动态类型的函数式编程语言,由爱立信在1986年为电话交换系统设计,后来因其在处理并发、分布式和容错性上的优秀特性,逐渐在并行计算和云计算领域崭露头角。 ### 1. ...
Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ ...
Introducing Erlang: Getting Started in Functional Programming by Simon St. Laurent English | 6 Mar. 2017 | ASIN: B06XHSP5SH | 212 Pages | AZW3 | 1.85 MB If you’re new to Erlang, its functional style...
Erlang的设计灵感来源于通信领域的实际需求,其并发模型基于轻量级进程(processes),这些进程之间的通信是通过消息传递来实现的,这使得Erlang在处理大量并发连接时表现出色。OTP则在此基础上,提供了强大的行为...
8. **Erlang与其他技术的集成**:Erlang可以与其他语言如Java、Python等集成,用于构建混合系统。例如,使用Erlang的Ranch和Cowboy库可以构建高性能的Web服务器和API。 9. **实时性与并发性**:Erlang的实时性使其...
内容概要:本文档详细介绍了Erlang编程语言及其并行编程模型,并重点阐述了Erlang OTP框架的特性和应用场景。首先,文档简述了Erlang语言的特点,包括轻量级进程、模式匹配和热代码升级等特性,以及它在构建高可用、...
在Erlang编程语言中,进程是其核心特性之一,它们是并发执行的实体,类似于其他语言中的线程。在Erlang中,进程间通信(IPC)是通过消息传递来实现的,而`link`机制是这个通信模型中非常重要的一部分。本教程将通过...
1. **并发模型**:Erlang的并发基于轻量级进程(Lightweight Processes, LSPs),这些进程间的通信通过消息传递实现,这与传统的线程模型不同,具有更好的隔离性和容错性。 2. ** OTP(Open Telecom Platform)**:...
1. **并发处理**:Erlang是一种面向并发的编程语言,其虚拟机(BEAM)设计支持轻量级进程,使得Erlang系统能同时处理大量并发任务,这正是RabbitMQ处理高并发消息需求的理想选择。 2. **容错性**:Erlang的错误恢复...
RabbitMQ基于Erlang编程语言,因此在安装RabbitMQ之前,我们需要先安装Erlang环境。本文将涵盖以下几个关键知识点: 1. **Erlang安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。首先,我们需要...
win64位系统 。 erlang24.2.2。
1. **并发处理**:Erlang的轻量级进程(Lightweight Processes)机制使得它能够轻松处理大量并发连接,每个进程占用资源少,互不影响。在游戏后端中,这种特性尤为重要,因为游戏通常需要同时处理成千上万的玩家请求...
Erlang/OTP 19.1 is a service release containing mostly bug fixes, as well as a number of new features and characteristics improvements. Some highlights of the release are: erts: Improved dirty ...
总的来说,Erlang和RabbitMQ是构建高可用、高性能分布式系统的关键工具,它们的结合为开发者提供了强大的基础设施,以处理复杂的数据通信和系统集成问题。正确安装和配置这两个组件是实现这一目标的基础。
在Erlang中,你可以使用`erlang:monitor_node/2`和`erlang:demonitor/1`函数来监控和取消监控其他节点的状态。当需要两个未连通的节点C和D进行通信时,可以在节点C上执行: ```erlang erlang:monitor_node(node(d),...
包括码头工人集装箱图像 ,与沿和构建和发布工具。 Erlang/OTP 的最新版本以标签的形式提供。 #我可以用来做什么? 使用它来试用 erlang shell,作为 erlang 开发环境,或将其用作您自己的 erlang 应用程序的基础...
3. **启动过程**:Erlang应用通过`start/2`函数启动,通常在`bank1_app.erl`中的`start/2`回调实现。这个函数负责启动应用的监督树,即`Supervisor`,它管理应用的所有进程。 4. **Supervisor**:在Erlang OTP...
- ** Debugger**:Erlang的内置调试器,帮助定位和修复代码问题。 - ** Dialyzer**:静态分析工具,可以检测出程序中的类型错误和潜在问题。 通过这个"awesome-erlang"列表,你可以逐步探索Erlang的世界,从基础...
3. 分布式设计:Erlang天生适合构建分布式系统,与MongoDB的分布式存储和复制集特性相结合,可构建大规模分布式应用。 三、驱动程序的核心组件 1. `mongodb_connection`模块:负责建立和管理与MongoDB服务器的连接...