summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bufferevent.c15
-rw-r--r--event.c16
-rw-r--r--test/regress_thread.c94
3 files changed, 109 insertions, 16 deletions
diff --git a/bufferevent.c b/bufferevent.c
index 9923bbe0..53b07f1f 100644
--- a/bufferevent.c
+++ b/bufferevent.c
@@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
#define SCHEDULE_DEFERRED(bevp) \
do { \
+ bufferevent_incref(&(bevp)->bev); \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
- } while (0);
+ } while (0)
void
@@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->readcb(bufev, bufev->cbarg);
}
@@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->writecb(bufev, bufev->cbarg);
}
@@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
- if (!p->deferred.queued) {
- bufferevent_incref(bufev);
+ if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
- }
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
diff --git a/event.c b/event.c
index 8e8c324b..d71932ce 100644
--- a/event.c
+++ b/event.c
@@ -1285,9 +1285,10 @@ event_process_active_single_queue(struct event_base *base,
}
/*
- Process all the defered_cb entries in 'queue'. If *breakptr becomes set to
- 1, stop. Requires that we start out holding the lock on 'queue'; releases
- the lock around 'queue' for each deferred_cb we process.
+ Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
+ *breakptr becomes set to 1, stop. Requires that we start out holding
+ the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
+ we process.
*/
static int
event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
@@ -1295,6 +1296,7 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
int count = 0;
struct deferred_cb *cb;
+#define MAX_DEFERRED 16
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
@@ -1302,12 +1304,14 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
UNLOCK_DEFERRED_QUEUE(queue);
cb->cb(cb, cb->arg);
- ++count;
- if (*breakptr)
- return -1;
LOCK_DEFERRED_QUEUE(queue);
+ if (*breakptr)
+ return -1;
+ if (++count == MAX_DEFERRED)
+ break;
}
+#undef MAX_DEFERRED
return count;
}
diff --git a/test/regress_thread.c b/test/regress_thread.c
index 34cf64b1..675e350e 100644
--- a/test/regress_thread.c
+++ b/test/regress_thread.c
@@ -33,6 +33,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#ifndef WIN32
+#include <unistd.h>
+#endif
#ifdef _EVENT_HAVE_PTHREADS
#include <pthread.h>
@@ -46,6 +49,7 @@
#include "event2/event_struct.h"
#include "event2/thread.h"
#include "evthread-internal.h"
+#include "defer-internal.h"
#include "regress.h"
#include "tinytest_macros.h"
@@ -312,12 +316,102 @@ end:
;
}
+#define CB_COUNT 128
+#define QUEUE_THREAD_COUNT 8
+
+#ifdef WIN32
+#define SLEEP_MS(ms) Sleep(ms)
+#else
+#define SLEEP_MS(ms) usleep((ms) * 1000)
+#endif
+
+struct deferred_test_data {
+ struct deferred_cb cbs[CB_COUNT];
+ struct deferred_cb_queue *queue;
+};
+
+static time_t timer_start = 0;
+static time_t timer_end = 0;
+static unsigned callback_count = 0;
+static THREAD_T load_threads[QUEUE_THREAD_COUNT];
+static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];
+
+static void
+deferred_callback(struct deferred_cb *cb, void *arg)
+{
+ SLEEP_MS(1);
+ callback_count += 1;
+}
+
+static THREAD_FN
+load_deferred_queue(void *arg)
+{
+ struct deferred_test_data *data = arg;
+ size_t i;
+
+ for (i = 0; i < CB_COUNT; ++i) {
+ event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
+ event_deferred_cb_schedule(data->queue, &data->cbs[i]);
+ SLEEP_MS(1);
+ }
+
+ THREAD_RETURN();
+}
+
+static void
+timer_callback(evutil_socket_t fd, short what, void *arg)
+{
+ timer_end = time(NULL);
+}
+
+static void
+start_threads_callback(evutil_socket_t fd, short what, void *arg)
+{
+ int i;
+
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
+ THREAD_START(load_threads[i], load_deferred_queue,
+ &deferred_data[i]);
+ }
+}
+
+static void
+thread_deferred_cb_skew(void *arg)
+{
+ struct basic_test_data *data = arg;
+ struct timeval tv_timer = {4, 0};
+ struct event event_threads;
+ struct deferred_cb_queue *queue;
+ int i;
+
+ queue = event_base_get_deferred_cb_queue(data->base);
+ tt_assert(queue);
+
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+ deferred_data[i].queue = queue;
+
+ timer_start = time(NULL);
+ event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
+ &tv_timer);
+ event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
+ NULL, NULL);
+ event_base_dispatch(data->base);
+
+ TT_BLATHER(("callback count, %u", callback_count));
+ tt_int_op(timer_end - timer_start, ==, 4);
+
+end:
+ for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
+ THREAD_JOIN(load_threads[i]);
+}
+
#define TEST(name) \
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
struct testcase_t thread_testcases[] = {
TEST(basic),
TEST(conditions_simple),
+ TEST(deferred_cb_skew),
END_OF_TESTCASES
};