summaryrefslogtreecommitdiff
path: root/usr/actor.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr/actor.c')
-rw-r--r--usr/actor.c264
1 files changed, 131 insertions, 133 deletions
diff --git a/usr/actor.c b/usr/actor.c
index a1373d6..37b5024 100644
--- a/usr/actor.c
+++ b/usr/actor.c
@@ -1,7 +1,8 @@
/*
- * iSCSI usermode single-threaded scheduler
+ * iSCSI timeout & deferred work handling
*
* Copyright (C) 2004 Dmitry Yusupov, Alex Aizman
+ * Copyright (C) 2014 Red Hat Inc.
* maintained by open-iscsi@googlegroups.com
*
* This program is free software; you can redistribute it and/or modify
@@ -17,34 +18,25 @@
* See the file COPYING included with this distribution for more details.
*/
#include <inttypes.h>
+#include <time.h>
+#include <sys/signalfd.h>
+#include <assert.h>
+#include <unistd.h>
#include "actor.h"
#include "log.h"
#include "list.h"
static LIST_HEAD(pend_list);
-static LIST_HEAD(poll_list);
-static LIST_HEAD(actor_list);
+static LIST_HEAD(ready_list);
static volatile int poll_in_progress;
-static volatile uint64_t actor_jiffies = 0;
-
-#define actor_diff(_time1, _time2) ({ \
- uint64_t __ret; \
- if ((_time2) >= (_time1)) \
- __ret = (_time2) - (_time1); \
- else \
- __ret = ((~0ULL) - (_time1)) + (_time2); \
- __ret; \
-})
-
-#define ACTOR_MS_TO_TICKS(_a) ((_a)/ACTOR_RESOLUTION)
static uint64_t
-actor_diff_time(actor_t *thread, uint64_t current_time)
+actor_time_left(actor_t *thread, uint64_t current_time)
{
- uint64_t diff_time = actor_diff(thread->scheduled_at, current_time);
- if(diff_time >= thread->ttschedule)
+ if (current_time > thread->ttschedule)
return 0;
- return (thread->ttschedule - diff_time);
+ else
+ return (thread->ttschedule - current_time);
}
#define time_after(a,b) \
@@ -65,11 +57,18 @@ actor_delete(actor_t *thread)
log_debug(7, "thread %08lx delete: state %d", (long)thread,
thread->state);
switch(thread->state) {
- case ACTOR_SCHEDULED:
case ACTOR_WAITING:
- case ACTOR_POLL_WAITING:
+ /* TODO: remove/reset alarm if we were 1st entry in pend_list */
+ /* priority: low */
+ /* fallthrough */
+ case ACTOR_SCHEDULED:
log_debug(1, "deleting a scheduled/waiting thread!");
list_del_init(&thread->list);
+ if (list_empty(&pend_list)) {
+ log_debug(7, "nothing left on pend_list, deactivating alarm\n");
+ alarm(0);
+ }
+
break;
default:
break;
@@ -77,73 +76,81 @@ actor_delete(actor_t *thread)
thread->state = ACTOR_NOTSCHEDULED;
}
+/*
+ * Inserts actor on pend list and sets alarm if new item is
+ * sooner than previous entries.
+ */
+static void
+actor_insert_on_pend_list(actor_t *thread, uint32_t delay_secs)
+{
+ struct actor *orig_head;
+ struct actor *new_head;
+ struct actor *next_thread;
+
+ orig_head = list_first_entry_or_null(&pend_list,
+ struct actor, list);
+
+ /* insert new entry in sort order */
+ list_for_each_entry(next_thread, &pend_list, list) {
+ log_debug(7, "thread %p %lld", next_thread,
+ (long long)next_thread->ttschedule);
+
+ if (time_after(next_thread->ttschedule, thread->ttschedule)) {
+ list_add(&thread->list, &next_thread->list);
+ goto inserted;
+ }
+ }
+
+ /* Not before any existing entries */
+ list_add_tail(&thread->list, &pend_list);
+
+inserted:
+ new_head = list_first_entry(&pend_list, struct actor, list);
+ if (orig_head != new_head) {
+ int result = alarm(delay_secs);
+ log_debug(7, "new alarm set for %d seconds, old alarm %d",
+ delay_secs, result);
+ }
+}
+
static void
-actor_schedule_private(actor_t *thread, uint32_t ttschedule, int head)
+actor_schedule_private(actor_t *thread, uint32_t delay_secs, int head)
{
- uint64_t delay_time, current_time;
- actor_t *next_thread;
+ time_t current_time;
+
+ struct timespec tv;
- delay_time = ACTOR_MS_TO_TICKS(ttschedule);
- current_time = actor_jiffies;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
+ log_error("clock_getime failed, can't schedule!\n");
+ return;
+ }
+
+ current_time = tv.tv_sec;
- log_debug(7, "thread %p schedule: delay %" PRIu64 " state %d",
- thread, delay_time, thread->state);
+ log_debug(7, "thread %p schedule: delay %u state %d",
+ thread, delay_secs, thread->state);
- /* convert ttscheduled msecs in 10s of msecs by dividing for now.
- * later we will change param to 10s of msecs */
switch(thread->state) {
case ACTOR_WAITING:
log_error("rescheduling a waiting thread!");
list_del(&thread->list);
+ /* fall-through */
case ACTOR_NOTSCHEDULED:
INIT_LIST_HEAD(&thread->list);
- /* if ttschedule is 0, put in scheduled queue and change
- * state to scheduled, else add current time to ttschedule and
- * insert in the queue at the correct point */
- if (delay_time == 0) {
- /* For head addition, it must go onto the head of the
- actor_list regardless if poll is in progress or not
- */
- if (poll_in_progress && !head) {
- thread->state = ACTOR_POLL_WAITING;
- list_add_tail(&thread->list,
- &poll_list);
- } else {
- thread->state = ACTOR_SCHEDULED;
- if (head)
- list_add(&thread->list,
- &actor_list);
- else
- list_add_tail(&thread->list,
- &actor_list);
- }
+
+ if (delay_secs == 0) {
+ thread->state = ACTOR_SCHEDULED;
+ if (head)
+ list_add(&thread->list, &ready_list);
+ else
+ list_add_tail(&thread->list, &ready_list);
} else {
thread->state = ACTOR_WAITING;
- thread->ttschedule = delay_time;
- thread->scheduled_at = current_time;
-
- /* insert new entry in sort order */
- list_for_each_entry(next_thread, &pend_list, list) {
- log_debug(7, "thread %p %" PRIu64 " %"PRIu64,
- next_thread,
- next_thread->scheduled_at +
- next_thread->ttschedule,
- current_time + delay_time);
-
- if (time_after(next_thread->scheduled_at +
- next_thread->ttschedule,
- current_time + delay_time)) {
- list_add(&thread->list,
- &next_thread->list);
- goto done;
- }
- }
-
- list_add_tail(&thread->list, &pend_list);
+ thread->ttschedule = current_time + delay_secs;
+
+ actor_insert_on_pend_list(thread, delay_secs);
}
-done:
break;
- case ACTOR_POLL_WAITING:
case ACTOR_SCHEDULED:
// don't do anything
break;
@@ -168,75 +175,82 @@ actor_schedule(actor_t *thread)
}
void
-actor_timer(actor_t *thread, uint32_t timeout, void (*callback)(void *),
+actor_timer(actor_t *thread, uint32_t timeout_secs, void (*callback)(void *),
void *data)
{
- actor_new(thread, callback, data);
- actor_schedule_private(thread, timeout, 0);
+ actor_init(thread, callback, data);
+ actor_schedule_private(thread, timeout_secs, 0);
}
-int
-actor_timer_mod(actor_t *thread, uint32_t timeout, void *data)
+/*
+ * Execute all items that have expired.
+ *
+ * Set an alarm if items remain. Caller must catch SIGALRM and
+ * then re-invoke this function.
+ */
+void
+actor_poll(void)
{
- if (thread->state == ACTOR_WAITING) {
- list_del_init(&thread->list);
- thread->data = data;
- actor_schedule_private(thread, timeout, 0);
- return 1;
+ struct actor *thread, *tmp;
+ uint64_t current_time;
+ struct timespec tv;
+
+ if (poll_in_progress) {
+ log_error("recursive actor_poll() is not allowed");
+ return;
}
- return 0;
-}
-static void
-actor_check(uint64_t current_time)
-{
- struct actor *thread, *tmp;
+ if (clock_gettime(CLOCK_MONOTONIC_COARSE, &tv)) {
+ log_error("clock_gettime failed, can't schedule!\n");
+ return;
+ }
+ current_time = tv.tv_sec;
+
+ /*
+ * Move items that are ripe from pend_list to ready_list.
+ * Actors are in sorted order of ascending run time, so
+ * stop at the first unripe entry.
+ */
list_for_each_entry_safe(thread, tmp, &pend_list, list) {
- if (actor_diff_time(thread, current_time)) {
+ uint64_t time_left = actor_time_left(thread, current_time);
+ if (time_left) {
log_debug(7, "thread %08lx wait some more",
- (long)thread);
- /* wait some more */
+ (long)thread);
+
+ alarm(time_left);
break;
}
- /* it is time to schedule this entry */
+ /* This entry can be run now */
list_del_init(&thread->list);
- log_debug(2, "thread %08lx was scheduled at %" PRIu64 ":"
- "%" PRIu64 ", curtime %" PRIu64 " q_forw %p "
- "&pend_list %p",
- (long)thread, thread->scheduled_at, thread->ttschedule,
- current_time, pend_list.next, &pend_list);
+ log_debug(2, "thread %08lx was scheduled for "
+ "%" PRIu64 ", curtime %" PRIu64 " q_forw %p "
+ "&pend_list %p",
+ (long)thread, thread->ttschedule,
+ current_time, pend_list.next, &pend_list);
+ list_add_tail(&thread->list, &ready_list);
+ assert(thread->state == ACTOR_WAITING);
thread->state = ACTOR_SCHEDULED;
- list_add_tail(&thread->list, &actor_list);
- log_debug(7, "thread %08lx now in actor_list",
- (long)thread);
+ log_debug(7, "thread %08lx now in ready_list",
+ (long)thread);
}
-}
-void
-actor_poll(void)
-{
- struct actor *thread;
-
- /* check that there are no any concurrency */
- if (poll_in_progress) {
- log_error("concurrent actor_poll() is not allowed");
+ /* Disable alarm if nothing else pending */
+ if (list_empty(&pend_list)) {
+ log_debug(7, "nothing on pend_list, deactivating alarm\n");
+ alarm(0);
}
- /* check the wait list */
- actor_check(actor_jiffies);
-
- /* the following code to check in the main data path */
poll_in_progress = 1;
- while (!list_empty(&actor_list)) {
- thread = list_entry(actor_list.next, struct actor, list);
+ while (!list_empty(&ready_list)) {
+ thread = list_first_entry(&ready_list, struct actor, list);
list_del_init(&thread->list);
if (thread->state != ACTOR_SCHEDULED)
- log_error("actor_list: thread state corrupted! "
+ log_error("ready_list: thread state corrupted! "
"Thread with state %d in actor list.",
thread->state);
thread->state = ACTOR_NOTSCHEDULED;
@@ -245,20 +259,4 @@ actor_poll(void)
log_debug(7, "thread removed\n");
}
poll_in_progress = 0;
-
- while (!list_empty(&poll_list)) {
- thread = list_entry(poll_list.next, struct actor, list);
- list_del_init(&thread->list);
-
- if (thread->state != ACTOR_POLL_WAITING)
- log_error("poll_list: thread state corrupted!"
- "Thread with state %d in poll list.",
- thread->state);
- thread->state = ACTOR_SCHEDULED;
- list_add_tail(&thread->list, &actor_list);
- log_debug(7, "thread %08lx removed from poll_list",
- (long)thread);
- }
-
- actor_jiffies++;
}