diff options
Diffstat (limited to 'usr/actor.c')
-rw-r--r-- | usr/actor.c | 264 |
1 files changed, 131 insertions, 133 deletions
diff --git a/usr/actor.c b/usr/actor.c index a1373d6..37b5024 100644 --- a/usr/actor.c +++ b/usr/actor.c @@ -1,7 +1,8 @@ /* - * iSCSI usermode single-threaded scheduler + * iSCSI timeout & deferred work handling * * Copyright (C) 2004 Dmitry Yusupov, Alex Aizman + * Copyright (C) 2014 Red Hat Inc. * maintained by open-iscsi@googlegroups.com * * This program is free software; you can redistribute it and/or modify @@ -17,34 +18,25 @@ * See the file COPYING included with this distribution for more details. */ #include <inttypes.h> +#include <time.h> +#include <sys/signalfd.h> +#include <assert.h> +#include <unistd.h> #include "actor.h" #include "log.h" #include "list.h" static LIST_HEAD(pend_list); -static LIST_HEAD(poll_list); -static LIST_HEAD(actor_list); +static LIST_HEAD(ready_list); static volatile int poll_in_progress; -static volatile uint64_t actor_jiffies = 0; - -#define actor_diff(_time1, _time2) ({ \ - uint64_t __ret; \ - if ((_time2) >= (_time1)) \ - __ret = (_time2) - (_time1); \ - else \ - __ret = ((~0ULL) - (_time1)) + (_time2); \ - __ret; \ -}) - -#define ACTOR_MS_TO_TICKS(_a) ((_a)/ACTOR_RESOLUTION) static uint64_t -actor_diff_time(actor_t *thread, uint64_t current_time) +actor_time_left(actor_t *thread, uint64_t current_time) { - uint64_t diff_time = actor_diff(thread->scheduled_at, current_time); - if(diff_time >= thread->ttschedule) + if (current_time > thread->ttschedule) return 0; - return (thread->ttschedule - diff_time); + else + return (thread->ttschedule - current_time); } #define time_after(a,b) \ @@ -65,11 +57,18 @@ actor_delete(actor_t *thread) log_debug(7, "thread %08lx delete: state %d", (long)thread, thread->state); switch(thread->state) { - case ACTOR_SCHEDULED: case ACTOR_WAITING: - case ACTOR_POLL_WAITING: + /* TODO: remove/reset alarm if we were 1st entry in pend_list */ + /* priority: low */ + /* fallthrough */ + case ACTOR_SCHEDULED: log_debug(1, "deleting a scheduled/waiting thread!"); list_del_init(&thread->list); + if (list_empty(&pend_list)) { + log_debug(7, "nothing left on pend_list, deactivating alarm\n"); + alarm(0); + } + break; default: break; @@ -77,73 +76,81 @@ actor_delete(actor_t *thread) thread->state = ACTOR_NOTSCHEDULED; } +/* + * Inserts actor on pend list and sets alarm if new item is + * sooner than previous entries. + */ +static void +actor_insert_on_pend_list(actor_t *thread, uint32_t delay_secs) +{ + struct actor *orig_head; + struct actor *new_head; + struct actor *next_thread; + + orig_head = list_first_entry_or_null(&pend_list, + struct actor, list); + + /* insert new entry in sort order */ + list_for_each_entry(next_thread, &pend_list, list) { + log_debug(7, "thread %p %lld", next_thread, + (long long)next_thread->ttschedule); + + if (time_after(next_thread->ttschedule, thread->ttschedule)) { + list_add(&thread->list, &next_thread->list); + goto inserted; + } + } + + /* Not before any existing entries */ + list_add_tail(&thread->list, &pend_list); + +inserted: + new_head = list_first_entry(&pend_list, struct actor, list); + if (orig_head != new_head) { + int result = alarm(delay_secs); + log_debug(7, "new alarm set for %d seconds, old alarm %d", + delay_secs, result); + } +} + static void -actor_schedule_private(actor_t *thread, uint32_t ttschedule, int head) +actor_schedule_private(actor_t *thread, uint32_t delay_secs, int head) { - uint64_t delay_time, current_time; - actor_t *next_thread; + time_t current_time; + + struct timespec tv; - delay_time = ACTOR_MS_TO_TICKS(ttschedule); - current_time = actor_jiffies; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) { + log_error("clock_getime failed, can't schedule!\n"); + return; + } + + current_time = tv.tv_sec; - log_debug(7, "thread %p schedule: delay %" PRIu64 " state %d", - thread, delay_time, thread->state); + log_debug(7, "thread %p schedule: delay %u state %d", + thread, delay_secs, thread->state); - /* convert ttscheduled msecs in 10s of msecs by dividing for now. - * later we will change param to 10s of msecs */ switch(thread->state) { case ACTOR_WAITING: log_error("rescheduling a waiting thread!"); list_del(&thread->list); + /* fall-through */ case ACTOR_NOTSCHEDULED: INIT_LIST_HEAD(&thread->list); - /* if ttschedule is 0, put in scheduled queue and change - * state to scheduled, else add current time to ttschedule and - * insert in the queue at the correct point */ - if (delay_time == 0) { - /* For head addition, it must go onto the head of the - actor_list regardless if poll is in progress or not - */ - if (poll_in_progress && !head) { - thread->state = ACTOR_POLL_WAITING; - list_add_tail(&thread->list, - &poll_list); - } else { - thread->state = ACTOR_SCHEDULED; - if (head) - list_add(&thread->list, - &actor_list); - else - list_add_tail(&thread->list, - &actor_list); - } + + if (delay_secs == 0) { + thread->state = ACTOR_SCHEDULED; + if (head) + list_add(&thread->list, &ready_list); + else + list_add_tail(&thread->list, &ready_list); } else { thread->state = ACTOR_WAITING; - thread->ttschedule = delay_time; - thread->scheduled_at = current_time; - - /* insert new entry in sort order */ - list_for_each_entry(next_thread, &pend_list, list) { - log_debug(7, "thread %p %" PRIu64 " %"PRIu64, - next_thread, - next_thread->scheduled_at + - next_thread->ttschedule, - current_time + delay_time); - - if (time_after(next_thread->scheduled_at + - next_thread->ttschedule, - current_time + delay_time)) { - list_add(&thread->list, - &next_thread->list); - goto done; - } - } - - list_add_tail(&thread->list, &pend_list); + thread->ttschedule = current_time + delay_secs; + + actor_insert_on_pend_list(thread, delay_secs); } -done: break; - case ACTOR_POLL_WAITING: case ACTOR_SCHEDULED: // don't do anything break; @@ -168,75 +175,82 @@ actor_schedule(actor_t *thread) } void -actor_timer(actor_t *thread, uint32_t timeout, void (*callback)(void *), +actor_timer(actor_t *thread, uint32_t timeout_secs, void (*callback)(void *), void *data) { - actor_new(thread, callback, data); - actor_schedule_private(thread, timeout, 0); + actor_init(thread, callback, data); + actor_schedule_private(thread, timeout_secs, 0); } -int -actor_timer_mod(actor_t *thread, uint32_t timeout, void *data) +/* + * Execute all items that have expired. + * + * Set an alarm if items remain. Caller must catch SIGALRM and + * then re-invoke this function. + */ +void +actor_poll(void) { - if (thread->state == ACTOR_WAITING) { - list_del_init(&thread->list); - thread->data = data; - actor_schedule_private(thread, timeout, 0); - return 1; + struct actor *thread, *tmp; + uint64_t current_time; + struct timespec tv; + + if (poll_in_progress) { + log_error("recursive actor_poll() is not allowed"); + return; } - return 0; -} -static void -actor_check(uint64_t current_time) -{ - struct actor *thread, *tmp; + if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) { + log_error("clock_gettime failed, can't schedule!\n"); + return; + } + current_time = tv.tv_sec; + + /* + * Move items that are ripe from pend_list to ready_list. + * Actors are in sorted order of ascending run time, so + * stop at the first unripe entry. + */ list_for_each_entry_safe(thread, tmp, &pend_list, list) { - if (actor_diff_time(thread, current_time)) { + uint64_t time_left = actor_time_left(thread, current_time); + if (time_left) { log_debug(7, "thread %08lx wait some more", - (long)thread); - /* wait some more */ + (long)thread); + + alarm(time_left); break; } - /* it is time to schedule this entry */ + /* This entry can be run now */ list_del_init(&thread->list); - log_debug(2, "thread %08lx was scheduled at %" PRIu64 ":" - "%" PRIu64 ", curtime %" PRIu64 " q_forw %p " - "&pend_list %p", - (long)thread, thread->scheduled_at, thread->ttschedule, - current_time, pend_list.next, &pend_list); + log_debug(2, "thread %08lx was scheduled for " + "%" PRIu64 ", curtime %" PRIu64 " q_forw %p " + "&pend_list %p", + (long)thread, thread->ttschedule, + current_time, pend_list.next, &pend_list); + list_add_tail(&thread->list, &ready_list); + assert(thread->state == ACTOR_WAITING); thread->state = ACTOR_SCHEDULED; - list_add_tail(&thread->list, &actor_list); - log_debug(7, "thread %08lx now in actor_list", - (long)thread); + log_debug(7, "thread %08lx now in ready_list", + (long)thread); } -} -void -actor_poll(void) -{ - struct actor *thread; - - /* check that there are no any concurrency */ - if (poll_in_progress) { - log_error("concurrent actor_poll() is not allowed"); + /* Disable alarm if nothing else pending */ + if (list_empty(&pend_list)) { + log_debug(7, "nothing on pend_list, deactivating alarm\n"); + alarm(0); } - /* check the wait list */ - actor_check(actor_jiffies); - - /* the following code to check in the main data path */ poll_in_progress = 1; - while (!list_empty(&actor_list)) { - thread = list_entry(actor_list.next, struct actor, list); + while (!list_empty(&ready_list)) { + thread = list_first_entry(&ready_list, struct actor, list); list_del_init(&thread->list); if (thread->state != ACTOR_SCHEDULED) - log_error("actor_list: thread state corrupted! " + log_error("ready_list: thread state corrupted! " "Thread with state %d in actor list.", thread->state); thread->state = ACTOR_NOTSCHEDULED; @@ -245,20 +259,4 @@ actor_poll(void) log_debug(7, "thread removed\n"); } poll_in_progress = 0; - - while (!list_empty(&poll_list)) { - thread = list_entry(poll_list.next, struct actor, list); - list_del_init(&thread->list); - - if (thread->state != ACTOR_POLL_WAITING) - log_error("poll_list: thread state corrupted!" - "Thread with state %d in poll list.", - thread->state); - thread->state = ACTOR_SCHEDULED; - list_add_tail(&thread->list, &actor_list); - log_debug(7, "thread %08lx removed from poll_list", - (long)thread); - } - - actor_jiffies++; } |