summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--WIN32-Code/win32.c26
-rw-r--r--devpoll.c6
-rw-r--r--epoll.c6
-rw-r--r--event.c25
-rw-r--r--evport.c10
-rw-r--r--kqueue.c84
-rw-r--r--poll.c47
-rw-r--r--select.c32
9 files changed, 179 insertions, 59 deletions
diff --git a/ChangeLog b/ChangeLog
index b844de5c..f896936a 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -28,6 +28,8 @@ Changes in 2.0.3-alpha:
o Fix some bugs when using the old evdns interfaces to initialize the evdns module.
o Detect errors during bufferevent_connect(). Patch from Christopher Davis.
o Fix compilation for listener.h for C++ - missing extern "C". Patch from Ferenc Szalai.
+ o Make the event_base_loop() family of functions respect thread-safety better. This should clear up a few hard-to-debug race conditions.
+ o Fix a bug when using a specialized memory allocator on win32.
Changes in 2.0.2-alpha:
diff --git a/WIN32-Code/win32.c b/WIN32-Code/win32.c
index 47055885..cb47ced4 100644
--- a/WIN32-Code/win32.c
+++ b/WIN32-Code/win32.c
@@ -65,6 +65,7 @@ struct idx_info {
struct win32op {
int fd_setsz;
+ int resize_out_sets;
struct win_fd_set *readset_in;
struct win_fd_set *writeset_in;
struct win_fd_set *readset_out;
@@ -103,16 +104,11 @@ realloc_fd_sets(struct win32op *op, size_t new_size)
assert(new_size >= 1);
size = FD_SET_ALLOC_SIZE(new_size);
- if (!(op->readset_in = realloc(op->readset_in, size)))
+ if (!(op->readset_in = mm_realloc(op->readset_in, size)))
return (-1);
- if (!(op->writeset_in = realloc(op->writeset_in, size)))
- return (-1);
- if (!(op->readset_out = realloc(op->readset_out, size)))
- return (-1);
- if (!(op->exset_out = realloc(op->exset_out, size)))
- return (-1);
- if (!(op->writeset_out = realloc(op->writeset_out, size)))
+ if (!(op->writeset_in = mm_realloc(op->writeset_in, size)))
return (-1);
+ op->resize_out_sets = 1;
op->fd_setsz = new_size;
return (0);
}
@@ -286,6 +282,16 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
int fd_count;
SOCKET s;
+ if (op->resize_out_sets) {
+ if (!(op->readset_out = mm_realloc(op->readset_out, size)))
+ return (-1);
+ if (!(op->exset_out = mm_realloc(op->exset_out, size)))
+ return (-1);
+ if (!(op->writeset_out = mm_realloc(op->writeset_out, size)))
+ return (-1);
+ op->resize_out_sets = 0;
+ }
+
fd_set_copy(win32op->readset_out, win32op->readset_in);
fd_set_copy(win32op->exset_out, win32op->readset_in);
fd_set_copy(win32op->writeset_out, win32op->writeset_in);
@@ -301,11 +307,15 @@ win32_dispatch(struct event_base *base, struct timeval *tv)
return (0);
}
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = select(fd_count,
(struct fd_set*)win32op->readset_out,
(struct fd_set*)win32op->writeset_out,
(struct fd_set*)win32op->exset_out, tv);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
event_debug(("%s: select returned %d", __func__, res));
if(res <= 0) {
diff --git a/devpoll.c b/devpoll.c
index eaaac4ac..cf8e5c6d 100644
--- a/devpoll.c
+++ b/devpoll.c
@@ -140,6 +140,8 @@ devpoll_init(struct event_base *base)
devpollop->dpfd = dpfd;
/* Initialize fields */
+ /* FIXME: allocating 'nfiles' worth of space here can be
+ * expensive and unnecessary. See how epoll.c does it instead. */
devpollop->events = mm_calloc(nfiles, sizeof(struct pollfd));
if (devpollop->events == NULL) {
mm_free(devpollop);
@@ -179,8 +181,12 @@ devpoll_dispatch(struct event_base *base, struct timeval *tv)
dvp.dp_nfds = devpollop->nevents;
dvp.dp_timeout = timeout;
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = ioctl(devpollop->dpfd, DP_POLL, &dvp);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
if (res == -1) {
if (errno != EINTR) {
event_warn("ioctl: DP_POLL");
diff --git a/epoll.c b/epoll.c
index b6fa28ed..a01e501f 100644
--- a/epoll.c
+++ b/epoll.c
@@ -51,6 +51,8 @@
#include "event-internal.h"
#include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
@@ -148,8 +150,12 @@ epoll_dispatch(struct event_base *base, struct timeval *tv)
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
diff --git a/event.c b/event.c
index 5f1cd909..3c907139 100644
--- a/event.c
+++ b/event.c
@@ -615,6 +615,7 @@ event_base_priority_init(struct event_base *base, int npriorities)
static int
event_haveevents(struct event_base *base)
{
+ /* Caller must hold th_base_lock */
return (base->event_count > 0);
}
@@ -737,17 +738,16 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
static void
event_process_active(struct event_base *base)
{
+ /* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
-
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
activeq = base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
if (c < 0)
- goto unlock;
+ return;
else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */
@@ -757,9 +757,6 @@ event_process_active(struct event_base *base)
}
event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
-
-unlock:
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
/*
@@ -866,6 +863,10 @@ event_base_loop(struct event_base *base, int flags)
struct timeval *tv_p;
int res, done;
+ /* Grab the lock. We will release it inside evsel.dispatch, and again
+ * as we invoke user callbacks. */
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
/* clear time cache */
base->tv_cache.tv_sec = 0;
@@ -933,6 +934,8 @@ event_base_loop(struct event_base *base, int flags)
/* clear time cache */
base->tv_cache.tv_sec = 0;
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}
@@ -1496,12 +1499,12 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue,
static int
timeout_next(struct event_base *base, struct timeval **tv_p)
{
+ /* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
ev = min_heap_top(&base->timeheap);
if (ev == NULL) {
@@ -1527,7 +1530,6 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
@@ -1540,6 +1542,7 @@ out:
static void
timeout_correct(struct event_base *base, struct timeval *tv)
{
+ /* Caller must hold th_base_lock. */
struct event **pev;
unsigned int size;
struct timeval off;
@@ -1549,11 +1552,9 @@ timeout_correct(struct event_base *base, struct timeval *tv)
/* Check if time is running backwards */
gettime(base, tv);
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
@@ -1573,16 +1574,15 @@ timeout_correct(struct event_base *base, struct timeval *tv)
}
/* Now remember what the new time turned out to be. */
base->event_tv = *tv;
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void
timeout_process(struct event_base *base)
{
+ /* Caller must hold lock. */
struct timeval now;
struct event *ev;
- EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
@@ -1601,7 +1601,6 @@ timeout_process(struct event_base *base)
ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
- EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
static void
diff --git a/evport.c b/evport.c
index 55bbe800..32e373c0 100644
--- a/evport.c
+++ b/evport.c
@@ -303,8 +303,14 @@ evport_dispatch(struct event_base *base, struct timeval *tv)
}
}
- if ((res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
- (unsigned int *) &nevents, ts_p)) == -1) {
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN,
+ (unsigned int *) &nevents, ts_p);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ if (res == -1) {
if (errno == EINTR || errno == EAGAIN) {
evsig_process(base);
return (0);
diff --git a/kqueue.c b/kqueue.c
index be10dd69..aa9fc1c2 100644
--- a/kqueue.c
+++ b/kqueue.c
@@ -67,8 +67,13 @@
struct kqop {
struct kevent *changes;
int nchanges;
+ int changes_size;
+ struct kevent *pend_changes;
+ int n_pend_changes;
+ int pend_changes_size;
+
struct kevent *events;
- int nevents;
+ int events_size;
int kq;
pid_t pid;
};
@@ -133,13 +138,21 @@ kq_init(struct event_base *base)
mm_free (kqueueop);
return (NULL);
}
+ kqueueop->pend_changes = mm_malloc(NEVENT * sizeof(struct kevent));
+ if (kqueueop->pendchanges == NULL) {
+ mm_free (kqueueop->changes);
+ mm_free (kqueueop);
+ return (NULL);
+ }
kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent));
if (kqueueop->events == NULL) {
mm_free (kqueueop->changes);
+ mm_free (kqueueop->pend_changes);
mm_free (kqueueop);
return (NULL);
}
- kqueueop->nevents = NEVENT;
+ kqueueop->events_size = kqueueop->changes_size =
+ kqueueop->pend_changes_size = NEVENT;
/* Check for Mac OS X kqueue bug. */
kqueueop->changes[0].ident = -1;
@@ -171,36 +184,21 @@ kq_init(struct event_base *base)
static int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
- int nevents = kqop->nevents;
+ int size = kqop->changes_size;
- if (kqop->nchanges == nevents) {
+ if (kqop->nchanges == size) {
struct kevent *newchange;
- struct kevent *newresult;
- nevents *= 2;
+ size *= 2;
newchange = mm_realloc(kqop->changes,
- nevents * sizeof(struct kevent));
+ size * sizeof(struct kevent));
if (newchange == NULL) {
event_warn("%s: malloc", __func__);
return (-1);
}
kqop->changes = newchange;
-
- newresult = mm_realloc(kqop->events,
- nevents * sizeof(struct kevent));
-
- /*
- * If we fail, we don't have to worry about freeing,
- * the next realloc will pick it up.
- */
- if (newresult == NULL) {
- event_warn("%s: malloc", __func__);
- return (-1);
- }
- kqop->events = newresult;
-
- kqop->nevents = nevents;
+ kqop->changes_size = size;
}
memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
@@ -219,11 +217,17 @@ kq_sighandler(int sig)
/* Do nothing here */
}
+#define SWAP(tp,a,b) \
+ do { \
+ tp tmp_swap_var = (a); \
+ a = b; \
+ b = tmp_swap_var; \
+ } while (0);
+
static int
kq_dispatch(struct event_base *base, struct timeval *tv)
{
struct kqop *kqop = base->evbase;
- struct kevent *changes = kqop->changes;
struct kevent *events = kqop->events;
struct timespec ts, *ts_p = NULL;
int i, res;
@@ -233,9 +237,23 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
ts_p = &ts;
}
- res = kevent(kqop->kq, changes, kqop->nchanges,
- events, kqop->nevents, ts_p);
- kqop->nchanges = 0;
+ /* We can't hold the lock while we're calling kqueue, so another
+ * thread might potentially mess with changes before the kernel has a
+ * chance to read it. Therefore, we need to keep the change list
+ * we're looking at in pend_changes, and let other threads mess with
+ * changes. */
+ SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
+ SWAP(int, kqop->nchanges, kqop->npend_changes);
+ SWAP(int, kqop->changes_size, kqop->pend_changes_size);
+
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = kevent(kqop->kq, kqop->pend_changes, kqop->npend_changes,
+ events, kqop->events_size, ts_p);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ kqop->npend_changes = 0;
if (res == -1) {
if (errno != EINTR) {
event_warn("kevent");
@@ -289,6 +307,20 @@ kq_dispatch(struct event_base *base, struct timeval *tv)
}
}
+ if (res == kqop->nevents) {
+ struct kevent *newresult;
+ int size = kqop->events_size;
+ /* We used all the events space that we have. Maybe we should
+ make it bigger. */
+ size *= 2;
+ newresult = mm_realloc(kqop->events,
+ size * sizeof(struct kevent));
+ if (newresult) {
+ kqop->events = newresult;
+ kqop->events_size = size;
+ }
+ }
+
return (0);
}
diff --git a/poll.c b/poll.c
index 08e148cb..e7b9941e 100644
--- a/poll.c
+++ b/poll.c
@@ -50,6 +50,8 @@
#include "evsignal-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
struct pollidx {
int idxplus1;
@@ -57,8 +59,11 @@ struct pollidx {
struct pollop {
int event_count; /* Highest number alloc */
- int nfds; /* Size of event_* */
+ int nfds; /* Highest number used */
+ int realloc_copy; /* True iff we must realloc
+ * event_set_copy */
struct pollfd *event_set;
+ struct pollfd *event_set_copy;
};
static void *poll_init (struct event_base *);
@@ -119,14 +124,43 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
{
int res, i, j, msec = -1, nfds;
struct pollop *pop = base->evbase;
+ struct pollfd *event_set;
poll_check_ok(pop);
+ nfds = pop->nfds;
+
+ if (base->th_base_lock) {
+ /* If we're using this backend in a multithreaded setting,
+ * then we need to work on a copy of event_set, so that we can
+ * let other threads modify the main event_set while we're
+ * polling. If we're not multithreaded, then we'll skip the
+ * copy step here to save memory and time. */
+ if (pop->realloc_copy) {
+ struct pollfd *tmp = mm_realloc(pop->event_set_copy,
+ pop->event_count * sizeof(struct pollfd));
+ if (tmp == NULL) {
+ event_warn("realloc");
+ return -1;
+ }
+ pop->event_set_copy = tmp;
+ pop->realloc_copy = 0;
+ }
+ memcpy(pop->event_set_copy, pop->event_set,
+ sizeof(struct pollfd)*nfds);
+ event_set = pop->event_set_copy;
+ } else {
+ event_set = pop->event_set;
+ }
+
if (tv != NULL)
msec = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
- nfds = pop->nfds;
- res = poll(pop->event_set, nfds, msec);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = poll(event_set, nfds, msec);
+
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
@@ -150,7 +184,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
int what;
if (++i == nfds)
i = 0;
- what = pop->event_set[i].revents;
+ what = event_set[i].revents;
if (!what)
continue;
@@ -166,7 +200,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv)
if (res == 0)
continue;
- evmap_io_active(base, pop->event_set[i].fd, res);
+ evmap_io_active(base, event_set[i].fd, res);
}
return (0);
@@ -204,6 +238,7 @@ poll_add(struct event_base *base, int fd, short old, short events, void *_idx)
pop->event_set = tmp_event_set;
pop->event_count = tmp_event_count;
+ pop->realloc_copy = 1;
}
i = idx->idxplus1 - 1;
@@ -289,6 +324,8 @@ poll_dealloc(struct event_base *base)
evsig_dealloc(base);
if (pop->event_set)
mm_free(pop->event_set);
+ if (pop->event_set_copy)
+ mm_free(pop->event_set_copy);
memset(pop, 0, sizeof(struct pollop));
mm_free(pop);
diff --git a/select.c b/select.c
index ace8037c..9164ac91 100644
--- a/select.c
+++ b/select.c
@@ -50,6 +50,8 @@
#include "event-internal.h"
#include "evsignal-internal.h"
+#include "event2/thread.h"
+#include "evthread-internal.h"
#include "log-internal.h"
#include "evmap-internal.h"
@@ -67,6 +69,7 @@ typedef unsigned long fd_mask;
struct selectop {
int event_fds; /* Highest fd in fd set */
int event_fdsz;
+ int resize_out_sets;
fd_set *event_readset_in;
fd_set *event_writeset_in;
fd_set *event_readset_out;
@@ -121,19 +124,38 @@ check_selectop(struct selectop *sop)
static int
select_dispatch(struct event_base *base, struct timeval *tv)
{
- int res, i, j;
+ int res=0, i, j, nfds;
struct selectop *sop = base->evbase;
check_selectop(sop);
+ if (sop->resize_out_sets) {
+ fd_set *readset_out=NULL, *writeset_out=NULL;
+ size_t sz = sop->event_fdsz;
+ if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
+ return (-1);
+ if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
+ mm_free(readset_out);
+ return (-1);
+ }
+ sop->event_readset_out = readset_out;
+ sop->event_writeset_out = writeset_out;
+ sop->resize_out_sets = 0;
+ }
memcpy(sop->event_readset_out, sop->event_readset_in,
sop->event_fdsz);
memcpy(sop->event_writeset_out, sop->event_writeset_in,
sop->event_fdsz);
- res = select(sop->event_fds + 1, sop->event_readset_out,
+ nfds = sop->event_fds+1;
+
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
+ res = select(nfds, sop->event_readset_out,
sop->event_writeset_out, NULL, tv);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+
check_selectop(sop);
if (res == -1) {
@@ -151,9 +173,9 @@ select_dispatch(struct event_base *base, struct timeval *tv)
event_debug(("%s: select reports %d", __func__, res));
check_selectop(sop);
- i = random() % (sop->event_fds+1);
- for (j = 0; j <= sop->event_fds; ++j) {
- if (++i >= sop->event_fds+1)
+ i = random() % (nfds+1);
+ for (j = 0; j <= nfds; ++j) {
+ if (++i >= nfds+1)
i = 0;
res = 0;
if (FD_ISSET(i, sop->event_readset_out))