浏览 2160 次
锁定老帖子 主题:erlang对port子进程退出的处理
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2008-12-08
最后修改:2009-04-20
erts运行的时候会初始化smp_sig_notify,开启一个信号处理线程,在这个线程里面做具体的信号处理。 static void init_smp_sig_notify(void) { erts_smp_thr_opts_t thr_opts = ERTS_SMP_THR_OPTS_DEFAULT_INITER; thr_opts.detached = 1; if (pipe(sig_notify_fds) < 0) { erl_exit(ERTS_ABORT_EXIT, "Failed to create signal-dispatcher pipe: %s (%d)\n", erl_errno_id(errno), errno); } /* Start signal handler thread */ erts_smp_thr_create(&sig_dispatcher_tid, signal_dispatcher_thread_func, NULL, &thr_opts); } static void * signal_dispatcher_thread_func(void *unused) { int initialized = 0; #if !CHLDWTHR int notify_check_children = 0; #endif #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_set_thread_name("signal_dispatcher"); #endif erts_thread_init_fp_exception(); while (1) { char buf[32]; int res, i; /* Block on read() waiting for a signal notification to arrive... */ res = read(sig_notify_fds[0], (void *) &buf[0], 32); if (res < 0) { if (errno == EINTR) continue; erl_exit(ERTS_ABORT_EXIT, "signal-dispatcher thread got unexpected error: %s (%d)\n", erl_errno_id(errno), errno); } for (i = 0; i < res; i++) { /* * NOTE 1: The signal dispatcher thread should not do work * that takes a substantial amount of time (except * perhaps in test and debug builds). It needs to * be responsive, i.e, it should only dispatch work * to other threads. * * NOTE 2: The signal dispatcher thread is not a blockable * thread (i.e., it hasn't called * erts_register_blockable_thread()). This is * intentional. We want to be able to interrupt * writing of a crash dump by hitting C-c twice. * Since it isn't a blockable thread it is important * that it doesn't change the state of any data that * a blocking thread expects to have exclusive access * to (unless the signal dispatcher itself explicitly * is blocking all blockable threads). */ switch (buf[i]) { case 0: /* Emulator initialized */ initialized = 1; #if !CHLDWTHR if (!notify_check_children) #endif break; #if !CHLDWTHR case 'C': /* SIGCHLD */ if (initialized) erts_smp_notify_check_children_needed(); else notify_check_children = 1; break; #endif case 'I': /* SIGINT */ break_requested(); break; case 'Q': /* SIGQUIT */ quit_requested(); break; case '1': /* SIGUSR1 */ sigusr1_exit(); break; #ifdef QUANTIFY case '2': /* SIGUSR2 */ quantify_save_data(); /* Might take a substantial amount of time, but this is a test/debug build */ break; #endif default: erl_exit(ERTS_ABORT_EXIT, "signal-dispatcher thread received unknown " "signal notification: '%c'\n", buf[i]); } } ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING); } return NULL; } void erts_sys_main_thread(void) { /* Become signal receiver thread... */ #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_set_thread_name("signal_receiver"); #endif smp_sig_notify(0); /* Notify initialized */ while (1) { /* Wait for a signal to arrive... */ #ifdef DEBUG int res = #else (void) #endif select(0, NULL, NULL, NULL, NULL); ASSERT(res < 0); ASSERT(errno == EINTR); } } 因为外部的程序是fork exec来执行的,所以退出的时候erts进程就会受到SIGCHLD信号。 static int spawn_init() { ... sys_sigset(SIGCHLD, onchld); /* Reap children */ ... } onchld就会被调用 static RETSIGTYPE onchld(int signum) { #if CHLDWTHR ASSERT(0); /* We should *never* catch a SIGCHLD signal */ #elif defined(ERTS_SMP) smp_sig_notify('C'); #else children_died = 1; ERTS_CHK_IO_INTR(1); /* Make sure we don't sleep in poll */ #endif } static void smp_sig_notify(char c) { int res; do { /* write() is async-signal safe (according to posix) */ res = write(sig_notify_fds[1], &c, 1); } while (res < 0 && errno == EINTR); if (res != 1) { char msg[] = "smp_sig_notify(): Failed to notify signal-dispatcher thread " "about received signal"; (void) write(2, msg, sizeof(msg)); abort(); } } 于是erts_smp_notify_check_children_needed()被调用。 void erts_smp_notify_check_children_needed(void) { ErtsSchedulerData *esdp; erts_smp_sched_lock(); for (esdp = schedulers; esdp; esdp = esdp->next) esdp->check_children = 1; if (block_multi_scheduling) { /* Also blocked schedulers need to check children */ erts_smp_mtx_lock(&msched_blk_mtx); for (esdp = schedulers; esdp; esdp = esdp->next) esdp->blocked_check_children = 1; erts_smp_cnd_broadcast(&msched_blk_cnd); erts_smp_mtx_unlock(&msched_blk_mtx); } wake_all_schedulers(); erts_smp_sched_unlock(); } 这个函数设置调度器的check_children的标志 并且唤醒所有的调度器。 调度器的入口process_main我们来看下如何处理的: Process *schedule(Process *p, int calls) { ... if (esdp->check_children) { esdp->check_children = 0; erts_smp_sched_unlock(); erts_check_children(); erts_smp_sched_lock(); } ... } 调用erts_check_children。 void erts_check_children(void) { (void) check_children(); } static int check_children(void) { int res = 0; int pid; int status; #ifndef ERTS_SMP if (children_died) #endif { sys_sigblock(SIGCHLD); CHLD_STAT_LOCK; while ((pid = waitpid(-1, &status, WNOHANG)) > 0) note_child_death(pid, status); #ifndef ERTS_SMP children_died = 0; #endif CHLD_STAT_UNLOCK; sys_sigrelease(SIGCHLD); res = 1; } return res; } static void note_child_death(int pid, int status) { ErtsSysReportExit **repp = &report_exit_list; ErtsSysReportExit *rep = report_exit_list; while (rep) { if (pid == rep->pid) { *repp = rep->next; ERTS_REPORT_EXIT_STATUS(rep, status); break; } repp = &rep->next; rep = rep->next; } } static ERTS_INLINE void report_exit_status(ErtsSysReportExit *rep, int status) { Port *pp; #ifdef ERTS_SMP CHLD_STAT_UNLOCK; #endif pp = erts_id2port_sflgs(rep->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); #ifdef ERTS_SMP CHLD_STAT_LOCK; #endif if (pp) { if (rep->ifd >= 0) { driver_data[rep->ifd].alive = 0; driver_data[rep->ifd].status = status; (void) driver_select((ErlDrvPort) internal_port_index(pp->id), rep->ifd, DO_READ, 1); } if (rep->ofd >= 0) { driver_data[rep->ofd].alive = 0; driver_data[rep->ofd].status = status; (void) driver_select((ErlDrvPort) internal_port_index(pp->id), rep->ofd, DO_WRITE, 1); } erts_port_release(pp); } erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep); } 移除对该port的监视 销毁port. static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd) { ... res = read(ready_fd, read_buf, ERTS_SYS_READ_BUF_SZ); if (res < 0) { if ((errno != EINTR) && (errno != ERRNO_BLOCK)) port_inp_failure(port_num, ready_fd, res); } else if (res == 0) port_inp_failure(port_num, ready_fd, res); else driver_output(port_num, (char*) read_buf, res); erts_free(ERTS_ALC_T_SYS_READ_BUF, (void *) read_buf); }.... } static int port_inp_failure(int port_num, int ready_fd, int res) /* Result: 0 (eof) or -1 (error) */ { int err = errno; ASSERT(res <= 0); (void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0); clear_fd_data(ready_fd); if (res == 0) { if (driver_data[ready_fd].report_exit) { CHLD_STAT_LOCK; if (driver_data[ready_fd].alive) { /* * We have eof and want to report exit status, but the process * hasn't exited yet. When it does report_exit_status() will * driver_select() this fd which will make sure that we get * back here with driver_data[ready_fd].alive == 0 and * driver_data[ready_fd].status set. */ CHLD_STAT_UNLOCK; return 0; } else { int status = driver_data[ready_fd].status; CHLD_STAT_UNLOCK; /* We need not be prepared for stopped/continued processes. */ if (WIFSIGNALED(status)) status = 128 + WTERMSIG(status); else status = WEXITSTATUS(status); driver_report_exit(driver_data[ready_fd].port_num, status); } } driver_failure_eof(port_num); } else { driver_failure_posix(port_num, err); } return 0; } void driver_report_exit(int ix, int status) { Port* prt = erts_drvport2port(ix); Eterm* hp; Eterm tuple; Process *rp; Eterm pid; ErlHeapFragment *bp = NULL; ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); pid = prt->connected; ASSERT(is_internal_pid(pid)); rp = erts_pid2proc_opt(NULL, 0, pid, 0, ERTS_P2P_FLG_SMP_INC_REFC); if (!rp) return; hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks); tuple = TUPLE2(hp, am_exit_status, make_small(status)); hp += 3; tuple = TUPLE2(hp, prt->id, tuple); erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); erts_smp_proc_dec_refc(rp); } 于是我们收到{Port, {exit_staus, Staus}}事件。 有点复杂吧,不过挺优雅的。记住信号处理函数里面不能做太耗时和调用有害的api。还有会有大量的退出事件发生,让调度器来调度这个事情比较公平,避免系统在处理退出处理上投入! 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |