原创作者: mryufeng
阅读:1336次
评论:0条
更新时间:2011-06-01
erlang通过port来spawn外部程序 重定向外部程序的stdin, stdout到一对pipe行通信的,利用poll来检测外部程序的读写事件。但是如果外部程序退出的话,erts如何知道并且加以处理的呢?
erts运行的时候会初始化smp_sig_notify,开启一个信号处理线程,在这个线程里面做具体的信号处理。
static void
init_smp_sig_notify(void)
{
erts_smp_thr_opts_t thr_opts = ERTS_SMP_THR_OPTS_DEFAULT_INITER;
thr_opts.detached = 1;
if (pipe(sig_notify_fds) < 0) {
erl_exit(ERTS_ABORT_EXIT,
"Failed to create signal-dispatcher pipe: %s (%d)\n",
erl_errno_id(errno),
errno);
}
/* Start signal handler thread */
erts_smp_thr_create(&sig_dispatcher_tid,
signal_dispatcher_thread_func,
NULL,
&thr_opts);
}
static void *
signal_dispatcher_thread_func(void *unused)
{
int initialized = 0;
#if !CHLDWTHR
int notify_check_children = 0;
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_set_thread_name("signal_dispatcher");
#endif
erts_thread_init_fp_exception();
while (1) {
char buf[32];
int res, i;
/* Block on read() waiting for a signal notification to arrive... */
res = read(sig_notify_fds[0], (void *) &buf[0], 32);
if (res < 0) {
if (errno == EINTR)
continue;
erl_exit(ERTS_ABORT_EXIT,
"signal-dispatcher thread got unexpected error: %s (%d)\n",
erl_errno_id(errno),
errno);
}
for (i = 0; i < res; i++) {
/*
* NOTE 1: The signal dispatcher thread should not do work
* that takes a substantial amount of time (except
* perhaps in test and debug builds). It needs to
* be responsive, i.e, it should only dispatch work
* to other threads.
*
* NOTE 2: The signal dispatcher thread is not a blockable
* thread (i.e., it hasn't called
* erts_register_blockable_thread()). This is
* intentional. We want to be able to interrupt
* writing of a crash dump by hitting C-c twice.
* Since it isn't a blockable thread it is important
* that it doesn't change the state of any data that
* a blocking thread expects to have exclusive access
* to (unless the signal dispatcher itself explicitly
* is blocking all blockable threads).
*/
switch (buf[i]) {
case 0: /* Emulator initialized */
initialized = 1;
#if !CHLDWTHR
if (!notify_check_children)
#endif
break;
#if !CHLDWTHR
case 'C': /* SIGCHLD */
if (initialized)
erts_smp_notify_check_children_needed();
else
notify_check_children = 1;
break;
#endif
case 'I': /* SIGINT */
break_requested();
break;
case 'Q': /* SIGQUIT */
quit_requested();
break;
case '1': /* SIGUSR1 */
sigusr1_exit();
break;
#ifdef QUANTIFY
case '2': /* SIGUSR2 */
quantify_save_data(); /* Might take a substantial amount of
time, but this is a test/debug
build */
break;
#endif
default:
erl_exit(ERTS_ABORT_EXIT,
"signal-dispatcher thread received unknown "
"signal notification: '%c'\n",
buf[i]);
}
}
ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING);
}
return NULL;
}
void
erts_sys_main_thread(void)
{
/* Become signal receiver thread... */
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_set_thread_name("signal_receiver");
#endif
smp_sig_notify(0); /* Notify initialized */
while (1) {
/* Wait for a signal to arrive... */
#ifdef DEBUG
int res =
#else
(void)
#endif
select(0, NULL, NULL, NULL, NULL);
ASSERT(res < 0);
ASSERT(errno == EINTR);
}
}
因为外部的程序是fork exec来执行的,所以退出的时候erts进程就会受到SIGCHLD信号。
static int spawn_init()
{
...
sys_sigset(SIGCHLD, onchld); /* Reap children */
...
}
onchld就会被调用
static RETSIGTYPE onchld(int signum)
{
#if CHLDWTHR
ASSERT(0); /* We should *never* catch a SIGCHLD signal */
#elif defined(ERTS_SMP)
smp_sig_notify('C');
#else
children_died = 1;
ERTS_CHK_IO_INTR(1); /* Make sure we don't sleep in poll */
#endif
}
static void
smp_sig_notify(char c)
{
int res;
do {
/* write() is async-signal safe (according to posix) */
res = write(sig_notify_fds[1], &c, 1);
} while (res < 0 && errno == EINTR);
if (res != 1) {
char msg[] =
"smp_sig_notify(): Failed to notify signal-dispatcher thread "
"about received signal";
(void) write(2, msg, sizeof(msg));
abort();
}
}
于是erts_smp_notify_check_children_needed()被调用。
void
erts_smp_notify_check_children_needed(void)
{
ErtsSchedulerData *esdp;
erts_smp_sched_lock();
for (esdp = schedulers; esdp; esdp = esdp->next)
esdp->check_children = 1;
if (block_multi_scheduling) {
/* Also blocked schedulers need to check children */
erts_smp_mtx_lock(&msched_blk_mtx);
for (esdp = schedulers; esdp; esdp = esdp->next)
esdp->blocked_check_children = 1;
erts_smp_cnd_broadcast(&msched_blk_cnd);
erts_smp_mtx_unlock(&msched_blk_mtx);
}
wake_all_schedulers();
erts_smp_sched_unlock();
}
这个函数设置调度器的check_children的标志 并且唤醒所有的调度器。
调度器的入口process_main我们来看下如何处理的:
Process *schedule(Process *p, int calls)
{
...
if (esdp->check_children) {
esdp->check_children = 0;
erts_smp_sched_unlock();
erts_check_children();
erts_smp_sched_lock();
}
...
}
调用erts_check_children。
void
erts_check_children(void)
{
(void) check_children();
}
static int check_children(void)
{
int res = 0;
int pid;
int status;
#ifndef ERTS_SMP
if (children_died)
#endif
{
sys_sigblock(SIGCHLD);
CHLD_STAT_LOCK;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
note_child_death(pid, status);
#ifndef ERTS_SMP
children_died = 0;
#endif
CHLD_STAT_UNLOCK;
sys_sigrelease(SIGCHLD);
res = 1;
}
return res;
}
static void note_child_death(int pid, int status)
{
ErtsSysReportExit **repp = &report_exit_list;
ErtsSysReportExit *rep = report_exit_list;
while (rep) {
if (pid == rep->pid) {
*repp = rep->next;
ERTS_REPORT_EXIT_STATUS(rep, status);
break;
}
repp = &rep->next;
rep = rep->next;
}
}
static ERTS_INLINE void
report_exit_status(ErtsSysReportExit *rep, int status)
{
Port *pp;
#ifdef ERTS_SMP
CHLD_STAT_UNLOCK;
#endif
pp = erts_id2port_sflgs(rep->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
#ifdef ERTS_SMP
CHLD_STAT_LOCK;
#endif
if (pp) {
if (rep->ifd >= 0) {
driver_data[rep->ifd].alive = 0;
driver_data[rep->ifd].status = status;
(void) driver_select((ErlDrvPort) internal_port_index(pp->id),
rep->ifd,
DO_READ,
1);
}
if (rep->ofd >= 0) {
driver_data[rep->ofd].alive = 0;
driver_data[rep->ofd].status = status;
(void) driver_select((ErlDrvPort) internal_port_index(pp->id),
rep->ofd,
DO_WRITE,
1);
}
erts_port_release(pp);
}
erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep);
}
这条链条的终点就是这个函数 移除对该port的监视 销毁port.
有点复杂吧,不过挺优雅的。记住信号处理函数里面不能做太耗时和调用有害的api。还有会有大量的退出事件发生,让调度器来调度这个事情比较公平,避免系统在处理退出处理上投入!
erts运行的时候会初始化smp_sig_notify,开启一个信号处理线程,在这个线程里面做具体的信号处理。
static void
init_smp_sig_notify(void)
{
erts_smp_thr_opts_t thr_opts = ERTS_SMP_THR_OPTS_DEFAULT_INITER;
thr_opts.detached = 1;
if (pipe(sig_notify_fds) < 0) {
erl_exit(ERTS_ABORT_EXIT,
"Failed to create signal-dispatcher pipe: %s (%d)\n",
erl_errno_id(errno),
errno);
}
/* Start signal handler thread */
erts_smp_thr_create(&sig_dispatcher_tid,
signal_dispatcher_thread_func,
NULL,
&thr_opts);
}
static void *
signal_dispatcher_thread_func(void *unused)
{
int initialized = 0;
#if !CHLDWTHR
int notify_check_children = 0;
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_set_thread_name("signal_dispatcher");
#endif
erts_thread_init_fp_exception();
while (1) {
char buf[32];
int res, i;
/* Block on read() waiting for a signal notification to arrive... */
res = read(sig_notify_fds[0], (void *) &buf[0], 32);
if (res < 0) {
if (errno == EINTR)
continue;
erl_exit(ERTS_ABORT_EXIT,
"signal-dispatcher thread got unexpected error: %s (%d)\n",
erl_errno_id(errno),
errno);
}
for (i = 0; i < res; i++) {
/*
* NOTE 1: The signal dispatcher thread should not do work
* that takes a substantial amount of time (except
* perhaps in test and debug builds). It needs to
* be responsive, i.e, it should only dispatch work
* to other threads.
*
* NOTE 2: The signal dispatcher thread is not a blockable
* thread (i.e., it hasn't called
* erts_register_blockable_thread()). This is
* intentional. We want to be able to interrupt
* writing of a crash dump by hitting C-c twice.
* Since it isn't a blockable thread it is important
* that it doesn't change the state of any data that
* a blocking thread expects to have exclusive access
* to (unless the signal dispatcher itself explicitly
* is blocking all blockable threads).
*/
switch (buf[i]) {
case 0: /* Emulator initialized */
initialized = 1;
#if !CHLDWTHR
if (!notify_check_children)
#endif
break;
#if !CHLDWTHR
case 'C': /* SIGCHLD */
if (initialized)
erts_smp_notify_check_children_needed();
else
notify_check_children = 1;
break;
#endif
case 'I': /* SIGINT */
break_requested();
break;
case 'Q': /* SIGQUIT */
quit_requested();
break;
case '1': /* SIGUSR1 */
sigusr1_exit();
break;
#ifdef QUANTIFY
case '2': /* SIGUSR2 */
quantify_save_data(); /* Might take a substantial amount of
time, but this is a test/debug
build */
break;
#endif
default:
erl_exit(ERTS_ABORT_EXIT,
"signal-dispatcher thread received unknown "
"signal notification: '%c'\n",
buf[i]);
}
}
ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING);
}
return NULL;
}
void
erts_sys_main_thread(void)
{
/* Become signal receiver thread... */
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_set_thread_name("signal_receiver");
#endif
smp_sig_notify(0); /* Notify initialized */
while (1) {
/* Wait for a signal to arrive... */
#ifdef DEBUG
int res =
#else
(void)
#endif
select(0, NULL, NULL, NULL, NULL);
ASSERT(res < 0);
ASSERT(errno == EINTR);
}
}
因为外部的程序是fork exec来执行的,所以退出的时候erts进程就会受到SIGCHLD信号。
static int spawn_init()
{
...
sys_sigset(SIGCHLD, onchld); /* Reap children */
...
}
onchld就会被调用
static RETSIGTYPE onchld(int signum)
{
#if CHLDWTHR
ASSERT(0); /* We should *never* catch a SIGCHLD signal */
#elif defined(ERTS_SMP)
smp_sig_notify('C');
#else
children_died = 1;
ERTS_CHK_IO_INTR(1); /* Make sure we don't sleep in poll */
#endif
}
static void
smp_sig_notify(char c)
{
int res;
do {
/* write() is async-signal safe (according to posix) */
res = write(sig_notify_fds[1], &c, 1);
} while (res < 0 && errno == EINTR);
if (res != 1) {
char msg[] =
"smp_sig_notify(): Failed to notify signal-dispatcher thread "
"about received signal";
(void) write(2, msg, sizeof(msg));
abort();
}
}
于是erts_smp_notify_check_children_needed()被调用。
void
erts_smp_notify_check_children_needed(void)
{
ErtsSchedulerData *esdp;
erts_smp_sched_lock();
for (esdp = schedulers; esdp; esdp = esdp->next)
esdp->check_children = 1;
if (block_multi_scheduling) {
/* Also blocked schedulers need to check children */
erts_smp_mtx_lock(&msched_blk_mtx);
for (esdp = schedulers; esdp; esdp = esdp->next)
esdp->blocked_check_children = 1;
erts_smp_cnd_broadcast(&msched_blk_cnd);
erts_smp_mtx_unlock(&msched_blk_mtx);
}
wake_all_schedulers();
erts_smp_sched_unlock();
}
这个函数设置调度器的check_children的标志 并且唤醒所有的调度器。
调度器的入口process_main我们来看下如何处理的:
Process *schedule(Process *p, int calls)
{
...
if (esdp->check_children) {
esdp->check_children = 0;
erts_smp_sched_unlock();
erts_check_children();
erts_smp_sched_lock();
}
...
}
调用erts_check_children。
void
erts_check_children(void)
{
(void) check_children();
}
static int check_children(void)
{
int res = 0;
int pid;
int status;
#ifndef ERTS_SMP
if (children_died)
#endif
{
sys_sigblock(SIGCHLD);
CHLD_STAT_LOCK;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
note_child_death(pid, status);
#ifndef ERTS_SMP
children_died = 0;
#endif
CHLD_STAT_UNLOCK;
sys_sigrelease(SIGCHLD);
res = 1;
}
return res;
}
static void note_child_death(int pid, int status)
{
ErtsSysReportExit **repp = &report_exit_list;
ErtsSysReportExit *rep = report_exit_list;
while (rep) {
if (pid == rep->pid) {
*repp = rep->next;
ERTS_REPORT_EXIT_STATUS(rep, status);
break;
}
repp = &rep->next;
rep = rep->next;
}
}
static ERTS_INLINE void
report_exit_status(ErtsSysReportExit *rep, int status)
{
Port *pp;
#ifdef ERTS_SMP
CHLD_STAT_UNLOCK;
#endif
pp = erts_id2port_sflgs(rep->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
#ifdef ERTS_SMP
CHLD_STAT_LOCK;
#endif
if (pp) {
if (rep->ifd >= 0) {
driver_data[rep->ifd].alive = 0;
driver_data[rep->ifd].status = status;
(void) driver_select((ErlDrvPort) internal_port_index(pp->id),
rep->ifd,
DO_READ,
1);
}
if (rep->ofd >= 0) {
driver_data[rep->ofd].alive = 0;
driver_data[rep->ofd].status = status;
(void) driver_select((ErlDrvPort) internal_port_index(pp->id),
rep->ofd,
DO_WRITE,
1);
}
erts_port_release(pp);
}
erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep);
}
这条链条的终点就是这个函数 移除对该port的监视 销毁port.
有点复杂吧,不过挺优雅的。记住信号处理函数里面不能做太耗时和调用有害的api。还有会有大量的退出事件发生,让调度器来调度这个事情比较公平,避免系统在处理退出处理上投入!
评论 共 0 条 请登录后发表评论