diff options
author | Nick Mathewson <nickm@torproject.org> | 2010-09-23 16:49:58 -0400 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2010-10-07 18:11:19 -0400 |
commit | 127d4f2195638bbfa8b693558f3eb408abdcdf24 (patch) | |
tree | 889c6e77acc7561ee477e1347f3406e692dff6f6 /listener.c | |
parent | 5b7a37063647760646823215361df8605c36acd6 (diff) | |
download | libevent-127d4f2195638bbfa8b693558f3eb408abdcdf24.tar.gz |
Add a LEV_OPT_THREADSAFE option for threadsafe evconnlisteners
Diffstat (limited to 'listener.c')
-rw-r--r-- | listener.c | 233 |
1 files changed, 196 insertions, 37 deletions
@@ -51,6 +51,7 @@ #include "mm-internal.h" #include "util-internal.h" #include "log-internal.h" +#include "evthread-internal.h" #ifdef WIN32 #include "iocp-internal.h" #include "defer-internal.h" @@ -60,16 +61,19 @@ struct evconnlistener_ops { int (*enable)(struct evconnlistener *); int (*disable)(struct evconnlistener *); void (*destroy)(struct evconnlistener *); + void (*shutdown)(struct evconnlistener *); evutil_socket_t (*getfd)(struct evconnlistener *); struct event_base *(*getbase)(struct evconnlistener *); }; struct evconnlistener { const struct evconnlistener_ops *ops; + void *lock; evconnlistener_cb cb; evconnlistener_errorcb errorcb; void *user_data; unsigned flags; + int refcnt; }; struct evconnlistener_event { @@ -83,12 +87,15 @@ struct evconnlistener_iocp { evutil_socket_t fd; struct event_base *event_base; struct event_iocp_port *port; - CRITICAL_SECTION lock; - int n_accepting; + short n_accepting; + short shutting_down; struct accepting_socket **accepting; }; #endif +#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0) +#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0) + struct evconnlistener * evconnlistener_new_async(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, @@ -100,10 +107,36 @@ static void event_listener_destroy(struct evconnlistener *); static evutil_socket_t event_listener_getfd(struct evconnlistener *); static struct event_base *event_listener_getbase(struct evconnlistener *); +#if 0 +static void +listener_incref_and_lock(struct evconnlistener *listener) +{ + LOCK(listener); + ++listener->refcnt; +} +#endif + +static int +listener_decref_and_unlock(struct evconnlistener *listener) +{ + int refcnt = --listener->refcnt; + if (refcnt == 0) { + listener->ops->destroy(listener); + UNLOCK(listener); + EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE); + mm_free(listener); + return 1; + } else { + UNLOCK(listener); + return 0; + } +} + static const struct evconnlistener_ops evconnlistener_event_ops = { event_listener_enable, event_listener_disable, event_listener_destroy, + NULL, /* shutdown */ event_listener_getfd, event_listener_getbase }; @@ -143,6 +176,11 @@ evconnlistener_new(struct event_base *base, lev->base.cb = cb; lev->base.user_data = ptr; lev->base.flags = flags; + lev->base.refcnt = 1; + + if (flags & LEV_OPT_THREADSAFE) { + EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE); + } event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST, listener_read_cb, lev); @@ -204,8 +242,12 @@ evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void evconnlistener_free(struct evconnlistener *lev) { - lev->ops->destroy(lev); - mm_free(lev); + LOCK(lev); + lev->cb = NULL; + lev->errorcb = NULL; + if (lev->ops->shutdown) + lev->ops->shutdown(lev); + listener_decref_and_unlock(lev); } static void @@ -223,13 +265,21 @@ event_listener_destroy(struct evconnlistener *lev) int evconnlistener_enable(struct evconnlistener *lev) { - return lev->ops->enable(lev); + int r; + LOCK(lev); + r = lev->ops->enable(lev); + UNLOCK(lev); + return r; } int evconnlistener_disable(struct evconnlistener *lev) { - return lev->ops->disable(lev); + int r; + LOCK(lev); + r = lev->ops->disable(lev); + UNLOCK(lev); + return r; } static int @@ -251,7 +301,11 @@ event_listener_disable(struct evconnlistener *lev) evutil_socket_t evconnlistener_get_fd(struct evconnlistener *lev) { - return lev->ops->getfd(lev); + evutil_socket_t fd; + LOCK(lev); + fd = lev->ops->getfd(lev); + UNLOCK(lev); + return fd; } static evutil_socket_t @@ -265,7 +319,11 @@ event_listener_getfd(struct evconnlistener *lev) struct event_base * evconnlistener_get_base(struct evconnlistener *lev) { - return lev->ops->getbase(lev); + struct event_base *base; + LOCK(lev); + base = lev->ops->getbase(lev); + UNLOCK(lev); + return base; } static struct event_base * @@ -279,7 +337,9 @@ event_listener_getbase(struct evconnlistener *lev) void evconnlistener_set_error_cb(struct evconnlistener *lev, evconnlistener_errorcb errorcb) { + LOCK(lev); lev->errorcb = errorcb; + UNLOCK(lev); } static void @@ -287,6 +347,10 @@ listener_read_cb(evutil_socket_t fd, short what, void *p) { struct evconnlistener *lev = p; int err; + evconnlistener_cb cb; + evconnlistener_errorcb errorcb; + void *user_data; + LOCK(lev); while (1) { struct sockaddr_storage ss; #ifdef WIN32 @@ -301,16 +365,40 @@ listener_read_cb(evutil_socket_t fd, short what, void *p) if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING)) evutil_make_socket_nonblocking(new_fd); - lev->cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen, - lev->user_data); + if (lev->cb == NULL) { + UNLOCK(lev); + return; + } + ++lev->refcnt; + cb = lev->cb; + user_data = lev->user_data; + UNLOCK(lev); + cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen, + user_data); + LOCK(lev); + if (lev->refcnt == 1) { + int freed = listener_decref_and_unlock(lev); + EVUTIL_ASSERT(freed); + return; + } + --lev->refcnt; } err = evutil_socket_geterror(fd); - if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) + if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) { + UNLOCK(lev); return; - if (lev->errorcb != NULL) - lev->errorcb(lev, lev->user_data); - else + } + if (lev->errorcb != NULL) { + ++lev->refcnt; + errorcb = lev->errorcb; + user_data = lev->user_data; + UNLOCK(lev); + errorcb(lev, user_data); + LOCK(lev); + listener_decref_and_unlock(lev); + } else { event_sock_warn(fd, "Error from accept() call"); + } } #ifdef WIN32 @@ -318,6 +406,7 @@ struct accepting_socket { CRITICAL_SECTION lock; struct event_overlapped overlapped; SOCKET s; + int error; struct deferred_cb deferred; struct evconnlistener_iocp *lev; ev_uint8_t buflen; @@ -382,8 +471,11 @@ start_accepting(struct accepting_socket *as) const struct win32_extension_fns *ext = event_get_win32_extension_fns(); DWORD pending = 0; SOCKET s = socket(as->family, SOCK_STREAM, 0); - if (s == INVALID_SOCKET) - return -1; + int error = 0; + if (s == INVALID_SOCKET) { + error = WSAGetLastError(); + goto report_err; + } setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&as->lev->fd, sizeof(&as->lev->fd)); @@ -404,14 +496,20 @@ start_accepting(struct accepting_socket *as) /* Immediate success! */ accepted_socket_cb(&as->overlapped, 1, 0, 1); } else { - int err = WSAGetLastError(); - if (err != ERROR_IO_PENDING) { - event_warnx("AcceptEx: %s", evutil_socket_error_to_string(err)); - return -1; + error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) { + goto report_err; } } return 0; + +report_err: + as->error = error; + event_deferred_cb_schedule( + event_base_get_deferred_cb_queue(as->lev->event_base), + &as->deferred); + return 0; } static void @@ -424,32 +522,63 @@ stop_accepting(struct accepting_socket *as) } static void -accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg) +accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg) { struct accepting_socket *as = arg; struct sockaddr *sa_local=NULL, *sa_remote=NULL; int socklen_local=0, socklen_remote=0; const struct win32_extension_fns *ext = event_get_win32_extension_fns(); + struct evconnlistener *lev = &as->lev->base; + evutil_socket_t sock=-1; + void *data; + evconnlistener_cb cb=NULL; + evconnlistener_errorcb errorcb=NULL; + int error; EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); + LOCK(lev); EnterCriticalSection(&as->lock); if (as->free_on_cb) { free_and_unlock_accepting_socket(as); + listener_decref_and_unlock(lev); return; } - ext->GetAcceptExSockaddrs( - as->addrbuf, 0, as->buflen/2, as->buflen/2, - &sa_local, &socklen_local, &sa_remote, &socklen_remote); + ++lev->refcnt; - as->lev->base.cb(&as->lev->base, as->s, sa_remote, - socklen_remote, as->lev->base.user_data); + error = as->error; + if (error) { + as->error = 0; + errorcb = lev->errorcb; + } else { + ext->GetAcceptExSockaddrs( + as->addrbuf, 0, as->buflen/2, as->buflen/2, + &sa_local, &socklen_local, &sa_remote, + &socklen_remote); + sock = as->s; + cb = lev->cb; + as->s = INVALID_SOCKET; + } + data = lev->user_data; - as->s = INVALID_SOCKET; + LeaveCriticalSection(&as->lock); + UNLOCK(lev); + + if (errorcb) { + WSASetLastError(error); + errorcb(lev, data); + } else { + cb(lev, sock, sa_remote, socklen_remote, data); + } + + LOCK(lev); + if (listener_decref_and_unlock(lev)) + return; - start_accepting(as); /* XXXX handle error */ + EnterCriticalSection(&as->lock); + start_accepting(as); LeaveCriticalSection(&as->lock); } @@ -459,6 +588,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i struct accepting_socket *as = EVUTIL_UPCAST(o, struct accepting_socket, overlapped); + LOCK(&as->lev->base); EnterCriticalSection(&as->lock); if (ok) { /* XXXX Don't do this if some EV_MT flag is set. */ @@ -467,16 +597,32 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i &as->deferred); LeaveCriticalSection(&as->lock); } else if (as->free_on_cb) { + struct evconnlistener *lev = &as->lev->base; free_and_unlock_accepting_socket(as); + listener_decref_and_unlock(lev); + return; } else if (as->s == INVALID_SOCKET) { /* This is okay; we were disabled by iocp_listener_disable. */ LeaveCriticalSection(&as->lock); } else { /* Some error on accept that we couldn't actually handle. */ + BOOL ok; + DWORD transfer = 0, flags=0; event_sock_warn(as->s, "Unexpected error on AcceptEx"); + ok = WSAGetOverlappedResult(as->s, &o->overlapped, + &transfer, FALSE, &flags); + if (ok) { + /* well, that was confusing! */ + as->error = 1; + } else { + as->error = WSAGetLastError(); + } + event_deferred_cb_schedule( + event_base_get_deferred_cb_queue(as->lev->event_base), + &as->deferred); LeaveCriticalSection(&as->lock); - /* XXXX send error to user */ } + UNLOCK(&as->lev->base); } static int @@ -486,17 +632,17 @@ iocp_listener_enable(struct evconnlistener *lev) struct evconnlistener_iocp *lev_iocp = EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); - EnterCriticalSection(&lev_iocp->lock); + LOCK(lev); for (i = 0; i < lev_iocp->n_accepting; ++i) { struct accepting_socket *as = lev_iocp->accepting[i]; if (!as) continue; EnterCriticalSection(&as->lock); if (!as->free_on_cb && as->s == INVALID_SOCKET) - start_accepting(as); /* XXXX handle error */ + start_accepting(as); LeaveCriticalSection(&as->lock); } - LeaveCriticalSection(&lev_iocp->lock); + UNLOCK(lev); return 0; } @@ -507,7 +653,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown) struct evconnlistener_iocp *lev_iocp = EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); - EnterCriticalSection(&lev_iocp->lock); + LOCK(lev); for (i = 0; i < lev_iocp->n_accepting; ++i) { struct accepting_socket *as = lev_iocp->accepting[i]; if (!as) @@ -520,7 +666,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown) } LeaveCriticalSection(&as->lock); } - LeaveCriticalSection(&lev_iocp->lock); + UNLOCK(lev); return 0; } @@ -529,10 +675,18 @@ iocp_listener_disable(struct evconnlistener *lev) { return iocp_listener_disable_impl(lev,0); } + static void iocp_listener_destroy(struct evconnlistener *lev) { - iocp_listener_disable_impl(lev,1); + struct evconnlistener_iocp *lev_iocp = + EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); + + if (! lev_iocp->shutting_down) { + lev_iocp->shutting_down = 1; + iocp_listener_disable_impl(lev,1); + } + } static evutil_socket_t @@ -554,6 +708,7 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = { iocp_listener_enable, iocp_listener_disable, iocp_listener_destroy, + iocp_listener_destroy, /* shutdown */ iocp_listener_getfd, iocp_listener_getbase }; @@ -571,6 +726,8 @@ evconnlistener_new_async(struct event_base *base, struct evconnlistener_iocp *lev; int i; + flags |= LEV_OPT_THREADSAFE; + if (!base || !event_base_get_iocp(base)) goto err; @@ -595,6 +752,7 @@ evconnlistener_new_async(struct event_base *base, lev->base.cb = cb; lev->base.user_data = ptr; lev->base.flags = flags; + lev->base.refcnt = 1; lev->port = event_base_get_iocp(base); lev->fd = fd; @@ -603,7 +761,7 @@ evconnlistener_new_async(struct event_base *base, if (event_iocp_port_associate(lev->port, fd, 1) < 0) goto err_free_lev; - InitializeCriticalSectionAndSpinCount(&lev->lock, 1000); + EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE); lev->n_accepting = N_SOCKETS_PER_LISTENER; lev->accepting = mm_calloc(lev->n_accepting, @@ -624,6 +782,7 @@ evconnlistener_new_async(struct event_base *base, free_and_unlock_accepting_socket(lev->accepting[i]); goto err_free_accepting; } + ++lev->base.refcnt; } return &lev->base; @@ -632,7 +791,7 @@ err_free_accepting: mm_free(lev->accepting); /* XXXX free the other elements. */ err_delete_lock: - DeleteCriticalSection(&lev->lock); + EVTHREAD_FREE_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE); err_free_lev: mm_free(lev); err: |