原创作者: mryufeng
阅读:1896次
评论:0条
更新时间:2011-06-01
erlang的调度有2种: 1. 进程调度 2. IO调度。网络程序的事件来源基本上只有2种:IO和定时器。IO事件有可能是大量的, 不可预期的,所以在设计上要考虑和进程调度平衡。erlang的
erl_port_task就是为这个目标设计的。
poll检查到io时间的时候,会回调iread和oready函数。这2个会把这个队列加到porttask的调度队列去。
static ERTS_INLINE void
iready(Eterm id, ErtsDrvEventState *state)
{
if (erts_port_task_schedule(id,
&state->driver.select->intask,
ERTS_PORT_TASK_INPUT,
(ErlDrvEvent) state->fd,
NULL) != 0) {
stale_drv_select(id, state, DO_READ);
}
}
void
ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
{
...
if ((revents & ERTS_POLL_EV_IN)
|| (!(revents & ERTS_POLL_EV_OUT)
&& state->events & ERTS_POLL_EV_IN))
iready(state->driver.select->inport, state);
else if (state->events & ERTS_POLL_EV_OUT)
oready(state->driver.select->outport, state);
}
...
}
/*
* Run all scheduled tasks for the first port in run queue. If
* new tasks appear while running reschedule port (free task is
* an exception; it is always handled instantly).
*
* erts_port_task_execute() is called by scheduler threads between
* scheduleing of processes. Sched lock should be held by caller.
*/
int erts_port_task_execute(void)
{
...
switch (ptp->type) {
case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */
erts_smp_tasks_lock();
if (io_tasks_executed) {
ASSERT(erts_port_task_outstanding_io_tasks >= io_tasks_executed);
erts_port_task_outstanding_io_tasks -= io_tasks_executed;
}
goto free_port;
case ERTS_PORT_TASK_TIMEOUT: /*driver层面的timer超时时间*/
erts_port_ready_timeout(pp);
break;
case ERTS_PORT_TASK_INPUT: /*IO input*/
erts_port_ready_input(pp, ptp->event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT: /*IO output*/
erts_port_ready_output(pp, ptp->event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
erts_port_ready_event(pp, ptp->event, ptp->event_data);
io_tasks_executed++;
break;
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
(int) ptp->type);
break;
}
...
}
void
erts_port_ready_input(Port *p, ErlDrvEvent hndl)
{
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
ASSERT((p->status & ERTS_PORT_SFLGS_DEAD) == 0);
if (!p->drv_ptr->ready_input)
missing_drv_callback(p, hndl, DO_READ);
else {
(*p->drv_ptr->ready_input)((ErlDrvData) p->drv_data, hndl); /* 真正干活的地方 */
/* NOTE some windows drivers use ->ready_input for input and output */
if ((p->status & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(p)) {
terminate_port(p);
}
}
}
在erlang的schedule里会在适当的时间执行erts_port_task_execute消耗掉IO事件。执行的时间和次数主要和process平衡。
erl_port_task就是为这个目标设计的。
poll检查到io时间的时候,会回调iread和oready函数。这2个会把这个队列加到porttask的调度队列去。
static ERTS_INLINE void
iready(Eterm id, ErtsDrvEventState *state)
{
if (erts_port_task_schedule(id,
&state->driver.select->intask,
ERTS_PORT_TASK_INPUT,
(ErlDrvEvent) state->fd,
NULL) != 0) {
stale_drv_select(id, state, DO_READ);
}
}
void
ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
{
...
if ((revents & ERTS_POLL_EV_IN)
|| (!(revents & ERTS_POLL_EV_OUT)
&& state->events & ERTS_POLL_EV_IN))
iready(state->driver.select->inport, state);
else if (state->events & ERTS_POLL_EV_OUT)
oready(state->driver.select->outport, state);
}
...
}
/*
* Run all scheduled tasks for the first port in run queue. If
* new tasks appear while running reschedule port (free task is
* an exception; it is always handled instantly).
*
* erts_port_task_execute() is called by scheduler threads between
* scheduleing of processes. Sched lock should be held by caller.
*/
int erts_port_task_execute(void)
{
...
switch (ptp->type) {
case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */
erts_smp_tasks_lock();
if (io_tasks_executed) {
ASSERT(erts_port_task_outstanding_io_tasks >= io_tasks_executed);
erts_port_task_outstanding_io_tasks -= io_tasks_executed;
}
goto free_port;
case ERTS_PORT_TASK_TIMEOUT: /*driver层面的timer超时时间*/
erts_port_ready_timeout(pp);
break;
case ERTS_PORT_TASK_INPUT: /*IO input*/
erts_port_ready_input(pp, ptp->event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT: /*IO output*/
erts_port_ready_output(pp, ptp->event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
erts_port_ready_event(pp, ptp->event, ptp->event_data);
io_tasks_executed++;
break;
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
(int) ptp->type);
break;
}
...
}
void
erts_port_ready_input(Port *p, ErlDrvEvent hndl)
{
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
ASSERT((p->status & ERTS_PORT_SFLGS_DEAD) == 0);
if (!p->drv_ptr->ready_input)
missing_drv_callback(p, hndl, DO_READ);
else {
(*p->drv_ptr->ready_input)((ErlDrvData) p->drv_data, hndl); /* 真正干活的地方 */
/* NOTE some windows drivers use ->ready_input for input and output */
if ((p->status & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(p)) {
terminate_port(p);
}
}
}
在erlang的schedule里会在适当的时间执行erts_port_task_execute消耗掉IO事件。执行的时间和次数主要和process平衡。
评论 共 0 条 请登录后发表评论