diff options
-rw-r--r-- | ChangeLog | 2 | ||||
-rw-r--r-- | WIN32-Code/win32.c | 26 | ||||
-rw-r--r-- | devpoll.c | 6 | ||||
-rw-r--r-- | epoll.c | 6 | ||||
-rw-r--r-- | event.c | 25 | ||||
-rw-r--r-- | evport.c | 10 | ||||
-rw-r--r-- | kqueue.c | 84 | ||||
-rw-r--r-- | poll.c | 47 | ||||
-rw-r--r-- | select.c | 32 |
9 files changed, 179 insertions, 59 deletions
@@ -28,6 +28,8 @@ Changes in 2.0.3-alpha: o Fix some bugs when using the old evdns interfaces to initialize the evdns module. o Detect errors during bufferevent_connect(). Patch from Christopher Davis. o Fix compilation for listener.h for C++ - missing extern "C". Patch from Ferenc Szalai. + o Make the event_base_loop() family of functions respect thread-safety better. This should clear up a few hard-to-debug race conditions. + o Fix a bug when using a specialized memory allocator on win32. Changes in 2.0.2-alpha: diff --git a/WIN32-Code/win32.c b/WIN32-Code/win32.c index 47055885..cb47ced4 100644 --- a/WIN32-Code/win32.c +++ b/WIN32-Code/win32.c @@ -65,6 +65,7 @@ struct idx_info { struct win32op { int fd_setsz; + int resize_out_sets; struct win_fd_set *readset_in; struct win_fd_set *writeset_in; struct win_fd_set *readset_out; @@ -103,16 +104,11 @@ realloc_fd_sets(struct win32op *op, size_t new_size) assert(new_size >= 1); size = FD_SET_ALLOC_SIZE(new_size); - if (!(op->readset_in = realloc(op->readset_in, size))) + if (!(op->readset_in = mm_realloc(op->readset_in, size))) return (-1); - if (!(op->writeset_in = realloc(op->writeset_in, size))) - return (-1); - if (!(op->readset_out = realloc(op->readset_out, size))) - return (-1); - if (!(op->exset_out = realloc(op->exset_out, size))) - return (-1); - if (!(op->writeset_out = realloc(op->writeset_out, size))) + if (!(op->writeset_in = mm_realloc(op->writeset_in, size))) return (-1); + op->resize_out_sets = 1; op->fd_setsz = new_size; return (0); } @@ -286,6 +282,16 @@ win32_dispatch(struct event_base *base, struct timeval *tv) int fd_count; SOCKET s; + if (op->resize_out_sets) { + if (!(op->readset_out = mm_realloc(op->readset_out, size))) + return (-1); + if (!(op->exset_out = mm_realloc(op->exset_out, size))) + return (-1); + if (!(op->writeset_out = mm_realloc(op->writeset_out, size))) + return (-1); + op->resize_out_sets = 0; + } + fd_set_copy(win32op->readset_out, win32op->readset_in); fd_set_copy(win32op->exset_out, win32op->readset_in); fd_set_copy(win32op->writeset_out, win32op->writeset_in); @@ -301,11 +307,15 @@ win32_dispatch(struct event_base *base, struct timeval *tv) return (0); } + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + res = select(fd_count, (struct fd_set*)win32op->readset_out, (struct fd_set*)win32op->writeset_out, (struct fd_set*)win32op->exset_out, tv); + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + event_debug(("%s: select returned %d", __func__, res)); if(res <= 0) { @@ -140,6 +140,8 @@ devpoll_init(struct event_base *base) devpollop->dpfd = dpfd; /* Initialize fields */ + /* FIXME: allocating 'nfiles' worth of space here can be + * expensive and unnecessary. See how epoll.c does it instead. */ devpollop->events = mm_calloc(nfiles, sizeof(struct pollfd)); if (devpollop->events == NULL) { mm_free(devpollop); @@ -179,8 +181,12 @@ devpoll_dispatch(struct event_base *base, struct timeval *tv) dvp.dp_nfds = devpollop->nevents; dvp.dp_timeout = timeout; + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + res = ioctl(devpollop->dpfd, DP_POLL, &dvp); + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + if (res == -1) { if (errno != EINTR) { event_warn("ioctl: DP_POLL"); @@ -51,6 +51,8 @@ #include "event-internal.h" #include "evsignal-internal.h" +#include "event2/thread.h" +#include "evthread-internal.h" #include "log-internal.h" #include "evmap-internal.h" @@ -148,8 +150,12 @@ epoll_dispatch(struct event_base *base, struct timeval *tv) timeout = MAX_EPOLL_TIMEOUT_MSEC; } + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + if (res == -1) { if (errno != EINTR) { event_warn("epoll_wait"); @@ -615,6 +615,7 @@ event_base_priority_init(struct event_base *base, int npriorities) static int event_haveevents(struct event_base *base) { + /* Caller must hold th_base_lock */ return (base->event_count > 0); } @@ -737,17 +738,16 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr) static void event_process_active(struct event_base *base) { + /* Caller must hold th_base_lock */ struct event_list *activeq = NULL; int i, c; - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); - for (i = 0; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(base->activequeues[i]) != NULL) { activeq = base->activequeues[i]; c = event_process_active_single_queue(base, activeq); if (c < 0) - goto unlock; + return; else if (c > 0) break; /* Processed a real event; do not * consider lower-priority events */ @@ -757,9 +757,6 @@ event_process_active(struct event_base *base) } event_process_deferred_callbacks(&base->defer_queue,&base->event_break); - -unlock: - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); } /* @@ -866,6 +863,10 @@ event_base_loop(struct event_base *base, int flags) struct timeval *tv_p; int res, done; + /* Grab the lock. We will release it inside evsel.dispatch, and again + * as we invoke user callbacks. */ + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + /* clear time cache */ base->tv_cache.tv_sec = 0; @@ -933,6 +934,8 @@ event_base_loop(struct event_base *base, int flags) /* clear time cache */ base->tv_cache.tv_sec = 0; + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + event_debug(("%s: asked to terminate loop.", __func__)); return (0); } @@ -1496,12 +1499,12 @@ event_deferred_cb_schedule(struct deferred_cb_queue *queue, static int timeout_next(struct event_base *base, struct timeval **tv_p) { + /* Caller must hold th_base_lock */ struct timeval now; struct event *ev; struct timeval *tv = *tv_p; int res = 0; - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); ev = min_heap_top(&base->timeheap); if (ev == NULL) { @@ -1527,7 +1530,6 @@ timeout_next(struct event_base *base, struct timeval **tv_p) event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec)); out: - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); return (res); } @@ -1540,6 +1542,7 @@ out: static void timeout_correct(struct event_base *base, struct timeval *tv) { + /* Caller must hold th_base_lock. */ struct event **pev; unsigned int size; struct timeval off; @@ -1549,11 +1552,9 @@ timeout_correct(struct event_base *base, struct timeval *tv) /* Check if time is running backwards */ gettime(base, tv); - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); if (evutil_timercmp(tv, &base->event_tv, >=)) { base->event_tv = *tv; - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); return; } @@ -1573,16 +1574,15 @@ timeout_correct(struct event_base *base, struct timeval *tv) } /* Now remember what the new time turned out to be. */ base->event_tv = *tv; - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); } static void timeout_process(struct event_base *base) { + /* Caller must hold lock. */ struct timeval now; struct event *ev; - EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); if (min_heap_empty(&base->timeheap)) { EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); return; @@ -1601,7 +1601,6 @@ timeout_process(struct event_base *base) ev->ev_callback)); event_active_internal(ev, EV_TIMEOUT, 1); } - EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); } static void @@ -303,8 +303,14 @@ evport_dispatch(struct event_base *base, struct timeval *tv) } } - if ((res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN, - (unsigned int *) &nevents, ts_p)) == -1) { + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + res = port_getn(epdp->ed_port, pevtlist, EVENTS_PER_GETN, + (unsigned int *) &nevents, ts_p); + + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + if (res == -1) { if (errno == EINTR || errno == EAGAIN) { evsig_process(base); return (0); @@ -67,8 +67,13 @@ struct kqop { struct kevent *changes; int nchanges; + int changes_size; + struct kevent *pend_changes; + int n_pend_changes; + int pend_changes_size; + struct kevent *events; - int nevents; + int events_size; int kq; pid_t pid; }; @@ -133,13 +138,21 @@ kq_init(struct event_base *base) mm_free (kqueueop); return (NULL); } + kqueueop->pend_changes = mm_malloc(NEVENT * sizeof(struct kevent)); + if (kqueueop->pendchanges == NULL) { + mm_free (kqueueop->changes); + mm_free (kqueueop); + return (NULL); + } kqueueop->events = mm_malloc(NEVENT * sizeof(struct kevent)); if (kqueueop->events == NULL) { mm_free (kqueueop->changes); + mm_free (kqueueop->pend_changes); mm_free (kqueueop); return (NULL); } - kqueueop->nevents = NEVENT; + kqueueop->events_size = kqueueop->changes_size = + kqueueop->pend_changes_size = NEVENT; /* Check for Mac OS X kqueue bug. */ kqueueop->changes[0].ident = -1; @@ -171,36 +184,21 @@ kq_init(struct event_base *base) static int kq_insert(struct kqop *kqop, struct kevent *kev) { - int nevents = kqop->nevents; + int size = kqop->changes_size; - if (kqop->nchanges == nevents) { + if (kqop->nchanges == size) { struct kevent *newchange; - struct kevent *newresult; - nevents *= 2; + size *= 2; newchange = mm_realloc(kqop->changes, - nevents * sizeof(struct kevent)); + size * sizeof(struct kevent)); if (newchange == NULL) { event_warn("%s: malloc", __func__); return (-1); } kqop->changes = newchange; - - newresult = mm_realloc(kqop->events, - nevents * sizeof(struct kevent)); - - /* - * If we fail, we don't have to worry about freeing, - * the next realloc will pick it up. - */ - if (newresult == NULL) { - event_warn("%s: malloc", __func__); - return (-1); - } - kqop->events = newresult; - - kqop->nevents = nevents; + kqop->changes_size = size; } memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent)); @@ -219,11 +217,17 @@ kq_sighandler(int sig) /* Do nothing here */ } +#define SWAP(tp,a,b) \ + do { \ + tp tmp_swap_var = (a); \ + a = b; \ + b = tmp_swap_var; \ + } while (0); + static int kq_dispatch(struct event_base *base, struct timeval *tv) { struct kqop *kqop = base->evbase; - struct kevent *changes = kqop->changes; struct kevent *events = kqop->events; struct timespec ts, *ts_p = NULL; int i, res; @@ -233,9 +237,23 @@ kq_dispatch(struct event_base *base, struct timeval *tv) ts_p = &ts; } - res = kevent(kqop->kq, changes, kqop->nchanges, - events, kqop->nevents, ts_p); - kqop->nchanges = 0; + /* We can't hold the lock while we're calling kqueue, so another + * thread might potentially mess with changes before the kernel has a + * chance to read it. Therefore, we need to keep the change list + * we're looking at in pend_changes, and let other threads mess with + * changes. */ + SWAP(struct kevent *, kqop->changes, kqop->pend_changes); + SWAP(int, kqop->nchanges, kqop->npend_changes); + SWAP(int, kqop->changes_size, kqop->pend_changes_size); + + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + res = kevent(kqop->kq, kqop->pend_changes, kqop->npend_changes, + events, kqop->events_size, ts_p); + + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + kqop->npend_changes = 0; if (res == -1) { if (errno != EINTR) { event_warn("kevent"); @@ -289,6 +307,20 @@ kq_dispatch(struct event_base *base, struct timeval *tv) } } + if (res == kqop->nevents) { + struct kevent *newresult; + int size = kqop->events_size; + /* We used all the events space that we have. Maybe we should + make it bigger. */ + size *= 2; + newresult = mm_realloc(kqop->events, + size * sizeof(struct kevent)); + if (newresult) { + kqop->events = newresult; + kqop->events_size = size; + } + } + return (0); } @@ -50,6 +50,8 @@ #include "evsignal-internal.h" #include "log-internal.h" #include "evmap-internal.h" +#include "event2/thread.h" +#include "evthread-internal.h" struct pollidx { int idxplus1; @@ -57,8 +59,11 @@ struct pollidx { struct pollop { int event_count; /* Highest number alloc */ - int nfds; /* Size of event_* */ + int nfds; /* Highest number used */ + int realloc_copy; /* True iff we must realloc + * event_set_copy */ struct pollfd *event_set; + struct pollfd *event_set_copy; }; static void *poll_init (struct event_base *); @@ -119,14 +124,43 @@ poll_dispatch(struct event_base *base, struct timeval *tv) { int res, i, j, msec = -1, nfds; struct pollop *pop = base->evbase; + struct pollfd *event_set; poll_check_ok(pop); + nfds = pop->nfds; + + if (base->th_base_lock) { + /* If we're using this backend in a multithreaded setting, + * then we need to work on a copy of event_set, so that we can + * let other threads modify the main event_set while we're + * polling. If we're not multithreaded, then we'll skip the + * copy step here to save memory and time. */ + if (pop->realloc_copy) { + struct pollfd *tmp = mm_realloc(pop->event_set_copy, + pop->event_count * sizeof(struct pollfd)); + if (tmp == NULL) { + event_warn("realloc"); + return -1; + } + pop->event_set_copy = tmp; + pop->realloc_copy = 0; + } + memcpy(pop->event_set_copy, pop->event_set, + sizeof(struct pollfd)*nfds); + event_set = pop->event_set_copy; + } else { + event_set = pop->event_set; + } + if (tv != NULL) msec = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000; - nfds = pop->nfds; - res = poll(pop->event_set, nfds, msec); + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + res = poll(event_set, nfds, msec); + + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); if (res == -1) { if (errno != EINTR) { @@ -150,7 +184,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv) int what; if (++i == nfds) i = 0; - what = pop->event_set[i].revents; + what = event_set[i].revents; if (!what) continue; @@ -166,7 +200,7 @@ poll_dispatch(struct event_base *base, struct timeval *tv) if (res == 0) continue; - evmap_io_active(base, pop->event_set[i].fd, res); + evmap_io_active(base, event_set[i].fd, res); } return (0); @@ -204,6 +238,7 @@ poll_add(struct event_base *base, int fd, short old, short events, void *_idx) pop->event_set = tmp_event_set; pop->event_count = tmp_event_count; + pop->realloc_copy = 1; } i = idx->idxplus1 - 1; @@ -289,6 +324,8 @@ poll_dealloc(struct event_base *base) evsig_dealloc(base); if (pop->event_set) mm_free(pop->event_set); + if (pop->event_set_copy) + mm_free(pop->event_set_copy); memset(pop, 0, sizeof(struct pollop)); mm_free(pop); @@ -50,6 +50,8 @@ #include "event-internal.h" #include "evsignal-internal.h" +#include "event2/thread.h" +#include "evthread-internal.h" #include "log-internal.h" #include "evmap-internal.h" @@ -67,6 +69,7 @@ typedef unsigned long fd_mask; struct selectop { int event_fds; /* Highest fd in fd set */ int event_fdsz; + int resize_out_sets; fd_set *event_readset_in; fd_set *event_writeset_in; fd_set *event_readset_out; @@ -121,19 +124,38 @@ check_selectop(struct selectop *sop) static int select_dispatch(struct event_base *base, struct timeval *tv) { - int res, i, j; + int res=0, i, j, nfds; struct selectop *sop = base->evbase; check_selectop(sop); + if (sop->resize_out_sets) { + fd_set *readset_out=NULL, *writeset_out=NULL; + size_t sz = sop->event_fdsz; + if (!(readset_out = mm_realloc(sop->event_readset_out, sz))) + return (-1); + if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) { + mm_free(readset_out); + return (-1); + } + sop->event_readset_out = readset_out; + sop->event_writeset_out = writeset_out; + sop->resize_out_sets = 0; + } memcpy(sop->event_readset_out, sop->event_readset_in, sop->event_fdsz); memcpy(sop->event_writeset_out, sop->event_writeset_in, sop->event_fdsz); - res = select(sop->event_fds + 1, sop->event_readset_out, + nfds = sop->event_fds+1; + + EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + + res = select(nfds, sop->event_readset_out, sop->event_writeset_out, NULL, tv); + EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock); + check_selectop(sop); if (res == -1) { @@ -151,9 +173,9 @@ select_dispatch(struct event_base *base, struct timeval *tv) event_debug(("%s: select reports %d", __func__, res)); check_selectop(sop); - i = random() % (sop->event_fds+1); - for (j = 0; j <= sop->event_fds; ++j) { - if (++i >= sop->event_fds+1) + i = random() % (nfds+1); + for (j = 0; j <= nfds; ++j) { + if (++i >= nfds+1) i = 0; res = 0; if (FD_ISSET(i, sop->event_readset_out)) |