summaryrefslogtreecommitdiff
path: root/listener.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2010-09-23 16:49:58 -0400
committerNick Mathewson <nickm@torproject.org>2010-10-07 18:11:19 -0400
commit127d4f2195638bbfa8b693558f3eb408abdcdf24 (patch)
tree889c6e77acc7561ee477e1347f3406e692dff6f6 /listener.c
parent5b7a37063647760646823215361df8605c36acd6 (diff)
downloadlibevent-127d4f2195638bbfa8b693558f3eb408abdcdf24.tar.gz
Add a LEV_OPT_THREADSAFE option for threadsafe evconnlisteners
Diffstat (limited to 'listener.c')
-rw-r--r--listener.c233
1 files changed, 196 insertions, 37 deletions
diff --git a/listener.c b/listener.c
index a1b77051..840b09fd 100644
--- a/listener.c
+++ b/listener.c
@@ -51,6 +51,7 @@
#include "mm-internal.h"
#include "util-internal.h"
#include "log-internal.h"
+#include "evthread-internal.h"
#ifdef WIN32
#include "iocp-internal.h"
#include "defer-internal.h"
@@ -60,16 +61,19 @@ struct evconnlistener_ops {
int (*enable)(struct evconnlistener *);
int (*disable)(struct evconnlistener *);
void (*destroy)(struct evconnlistener *);
+ void (*shutdown)(struct evconnlistener *);
evutil_socket_t (*getfd)(struct evconnlistener *);
struct event_base *(*getbase)(struct evconnlistener *);
};
struct evconnlistener {
const struct evconnlistener_ops *ops;
+ void *lock;
evconnlistener_cb cb;
evconnlistener_errorcb errorcb;
void *user_data;
unsigned flags;
+ int refcnt;
};
struct evconnlistener_event {
@@ -83,12 +87,15 @@ struct evconnlistener_iocp {
evutil_socket_t fd;
struct event_base *event_base;
struct event_iocp_port *port;
- CRITICAL_SECTION lock;
- int n_accepting;
+ short n_accepting;
+ short shutting_down;
struct accepting_socket **accepting;
};
#endif
+#define LOCK(listener) EVLOCK_LOCK((listener)->lock, 0)
+#define UNLOCK(listener) EVLOCK_UNLOCK((listener)->lock, 0)
+
struct evconnlistener *
evconnlistener_new_async(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
@@ -100,10 +107,36 @@ static void event_listener_destroy(struct evconnlistener *);
static evutil_socket_t event_listener_getfd(struct evconnlistener *);
static struct event_base *event_listener_getbase(struct evconnlistener *);
+#if 0
+static void
+listener_incref_and_lock(struct evconnlistener *listener)
+{
+ LOCK(listener);
+ ++listener->refcnt;
+}
+#endif
+
+static int
+listener_decref_and_unlock(struct evconnlistener *listener)
+{
+ int refcnt = --listener->refcnt;
+ if (refcnt == 0) {
+ listener->ops->destroy(listener);
+ UNLOCK(listener);
+ EVTHREAD_FREE_LOCK(listener->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+ mm_free(listener);
+ return 1;
+ } else {
+ UNLOCK(listener);
+ return 0;
+ }
+}
+
static const struct evconnlistener_ops evconnlistener_event_ops = {
event_listener_enable,
event_listener_disable,
event_listener_destroy,
+ NULL, /* shutdown */
event_listener_getfd,
event_listener_getbase
};
@@ -143,6 +176,11 @@ evconnlistener_new(struct event_base *base,
lev->base.cb = cb;
lev->base.user_data = ptr;
lev->base.flags = flags;
+ lev->base.refcnt = 1;
+
+ if (flags & LEV_OPT_THREADSAFE) {
+ EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+ }
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
listener_read_cb, lev);
@@ -204,8 +242,12 @@ evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,
void
evconnlistener_free(struct evconnlistener *lev)
{
- lev->ops->destroy(lev);
- mm_free(lev);
+ LOCK(lev);
+ lev->cb = NULL;
+ lev->errorcb = NULL;
+ if (lev->ops->shutdown)
+ lev->ops->shutdown(lev);
+ listener_decref_and_unlock(lev);
}
static void
@@ -223,13 +265,21 @@ event_listener_destroy(struct evconnlistener *lev)
int
evconnlistener_enable(struct evconnlistener *lev)
{
- return lev->ops->enable(lev);
+ int r;
+ LOCK(lev);
+ r = lev->ops->enable(lev);
+ UNLOCK(lev);
+ return r;
}
int
evconnlistener_disable(struct evconnlistener *lev)
{
- return lev->ops->disable(lev);
+ int r;
+ LOCK(lev);
+ r = lev->ops->disable(lev);
+ UNLOCK(lev);
+ return r;
}
static int
@@ -251,7 +301,11 @@ event_listener_disable(struct evconnlistener *lev)
evutil_socket_t
evconnlistener_get_fd(struct evconnlistener *lev)
{
- return lev->ops->getfd(lev);
+ evutil_socket_t fd;
+ LOCK(lev);
+ fd = lev->ops->getfd(lev);
+ UNLOCK(lev);
+ return fd;
}
static evutil_socket_t
@@ -265,7 +319,11 @@ event_listener_getfd(struct evconnlistener *lev)
struct event_base *
evconnlistener_get_base(struct evconnlistener *lev)
{
- return lev->ops->getbase(lev);
+ struct event_base *base;
+ LOCK(lev);
+ base = lev->ops->getbase(lev);
+ UNLOCK(lev);
+ return base;
}
static struct event_base *
@@ -279,7 +337,9 @@ event_listener_getbase(struct evconnlistener *lev)
void evconnlistener_set_error_cb(struct evconnlistener *lev,
evconnlistener_errorcb errorcb)
{
+ LOCK(lev);
lev->errorcb = errorcb;
+ UNLOCK(lev);
}
static void
@@ -287,6 +347,10 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
{
struct evconnlistener *lev = p;
int err;
+ evconnlistener_cb cb;
+ evconnlistener_errorcb errorcb;
+ void *user_data;
+ LOCK(lev);
while (1) {
struct sockaddr_storage ss;
#ifdef WIN32
@@ -301,16 +365,40 @@ listener_read_cb(evutil_socket_t fd, short what, void *p)
if (!(lev->flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))
evutil_make_socket_nonblocking(new_fd);
- lev->cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
- lev->user_data);
+ if (lev->cb == NULL) {
+ UNLOCK(lev);
+ return;
+ }
+ ++lev->refcnt;
+ cb = lev->cb;
+ user_data = lev->user_data;
+ UNLOCK(lev);
+ cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen,
+ user_data);
+ LOCK(lev);
+ if (lev->refcnt == 1) {
+ int freed = listener_decref_and_unlock(lev);
+ EVUTIL_ASSERT(freed);
+ return;
+ }
+ --lev->refcnt;
}
err = evutil_socket_geterror(fd);
- if (EVUTIL_ERR_ACCEPT_RETRIABLE(err))
+ if (EVUTIL_ERR_ACCEPT_RETRIABLE(err)) {
+ UNLOCK(lev);
return;
- if (lev->errorcb != NULL)
- lev->errorcb(lev, lev->user_data);
- else
+ }
+ if (lev->errorcb != NULL) {
+ ++lev->refcnt;
+ errorcb = lev->errorcb;
+ user_data = lev->user_data;
+ UNLOCK(lev);
+ errorcb(lev, user_data);
+ LOCK(lev);
+ listener_decref_and_unlock(lev);
+ } else {
event_sock_warn(fd, "Error from accept() call");
+ }
}
#ifdef WIN32
@@ -318,6 +406,7 @@ struct accepting_socket {
CRITICAL_SECTION lock;
struct event_overlapped overlapped;
SOCKET s;
+ int error;
struct deferred_cb deferred;
struct evconnlistener_iocp *lev;
ev_uint8_t buflen;
@@ -382,8 +471,11 @@ start_accepting(struct accepting_socket *as)
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
DWORD pending = 0;
SOCKET s = socket(as->family, SOCK_STREAM, 0);
- if (s == INVALID_SOCKET)
- return -1;
+ int error = 0;
+ if (s == INVALID_SOCKET) {
+ error = WSAGetLastError();
+ goto report_err;
+ }
setsockopt(s, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&as->lev->fd, sizeof(&as->lev->fd));
@@ -404,14 +496,20 @@ start_accepting(struct accepting_socket *as)
/* Immediate success! */
accepted_socket_cb(&as->overlapped, 1, 0, 1);
} else {
- int err = WSAGetLastError();
- if (err != ERROR_IO_PENDING) {
- event_warnx("AcceptEx: %s", evutil_socket_error_to_string(err));
- return -1;
+ error = WSAGetLastError();
+ if (error != ERROR_IO_PENDING) {
+ goto report_err;
}
}
return 0;
+
+report_err:
+ as->error = error;
+ event_deferred_cb_schedule(
+ event_base_get_deferred_cb_queue(as->lev->event_base),
+ &as->deferred);
+ return 0;
}
static void
@@ -424,32 +522,63 @@ stop_accepting(struct accepting_socket *as)
}
static void
-accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg)
+accepted_socket_invoke_user_cb(struct deferred_cb *dcb, void *arg)
{
struct accepting_socket *as = arg;
struct sockaddr *sa_local=NULL, *sa_remote=NULL;
int socklen_local=0, socklen_remote=0;
const struct win32_extension_fns *ext = event_get_win32_extension_fns();
+ struct evconnlistener *lev = &as->lev->base;
+ evutil_socket_t sock=-1;
+ void *data;
+ evconnlistener_cb cb=NULL;
+ evconnlistener_errorcb errorcb=NULL;
+ int error;
EVUTIL_ASSERT(ext->GetAcceptExSockaddrs);
+ LOCK(lev);
EnterCriticalSection(&as->lock);
if (as->free_on_cb) {
free_and_unlock_accepting_socket(as);
+ listener_decref_and_unlock(lev);
return;
}
- ext->GetAcceptExSockaddrs(
- as->addrbuf, 0, as->buflen/2, as->buflen/2,
- &sa_local, &socklen_local, &sa_remote, &socklen_remote);
+ ++lev->refcnt;
- as->lev->base.cb(&as->lev->base, as->s, sa_remote,
- socklen_remote, as->lev->base.user_data);
+ error = as->error;
+ if (error) {
+ as->error = 0;
+ errorcb = lev->errorcb;
+ } else {
+ ext->GetAcceptExSockaddrs(
+ as->addrbuf, 0, as->buflen/2, as->buflen/2,
+ &sa_local, &socklen_local, &sa_remote,
+ &socklen_remote);
+ sock = as->s;
+ cb = lev->cb;
+ as->s = INVALID_SOCKET;
+ }
+ data = lev->user_data;
- as->s = INVALID_SOCKET;
+ LeaveCriticalSection(&as->lock);
+ UNLOCK(lev);
+
+ if (errorcb) {
+ WSASetLastError(error);
+ errorcb(lev, data);
+ } else {
+ cb(lev, sock, sa_remote, socklen_remote, data);
+ }
+
+ LOCK(lev);
+ if (listener_decref_and_unlock(lev))
+ return;
- start_accepting(as); /* XXXX handle error */
+ EnterCriticalSection(&as->lock);
+ start_accepting(as);
LeaveCriticalSection(&as->lock);
}
@@ -459,6 +588,7 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
struct accepting_socket *as =
EVUTIL_UPCAST(o, struct accepting_socket, overlapped);
+ LOCK(&as->lev->base);
EnterCriticalSection(&as->lock);
if (ok) {
/* XXXX Don't do this if some EV_MT flag is set. */
@@ -467,16 +597,32 @@ accepted_socket_cb(struct event_overlapped *o, ev_uintptr_t key, ev_ssize_t n, i
&as->deferred);
LeaveCriticalSection(&as->lock);
} else if (as->free_on_cb) {
+ struct evconnlistener *lev = &as->lev->base;
free_and_unlock_accepting_socket(as);
+ listener_decref_and_unlock(lev);
+ return;
} else if (as->s == INVALID_SOCKET) {
/* This is okay; we were disabled by iocp_listener_disable. */
LeaveCriticalSection(&as->lock);
} else {
/* Some error on accept that we couldn't actually handle. */
+ BOOL ok;
+ DWORD transfer = 0, flags=0;
event_sock_warn(as->s, "Unexpected error on AcceptEx");
+ ok = WSAGetOverlappedResult(as->s, &o->overlapped,
+ &transfer, FALSE, &flags);
+ if (ok) {
+ /* well, that was confusing! */
+ as->error = 1;
+ } else {
+ as->error = WSAGetLastError();
+ }
+ event_deferred_cb_schedule(
+ event_base_get_deferred_cb_queue(as->lev->event_base),
+ &as->deferred);
LeaveCriticalSection(&as->lock);
- /* XXXX send error to user */
}
+ UNLOCK(&as->lev->base);
}
static int
@@ -486,17 +632,17 @@ iocp_listener_enable(struct evconnlistener *lev)
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
- EnterCriticalSection(&lev_iocp->lock);
+ LOCK(lev);
for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
continue;
EnterCriticalSection(&as->lock);
if (!as->free_on_cb && as->s == INVALID_SOCKET)
- start_accepting(as); /* XXXX handle error */
+ start_accepting(as);
LeaveCriticalSection(&as->lock);
}
- LeaveCriticalSection(&lev_iocp->lock);
+ UNLOCK(lev);
return 0;
}
@@ -507,7 +653,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
struct evconnlistener_iocp *lev_iocp =
EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
- EnterCriticalSection(&lev_iocp->lock);
+ LOCK(lev);
for (i = 0; i < lev_iocp->n_accepting; ++i) {
struct accepting_socket *as = lev_iocp->accepting[i];
if (!as)
@@ -520,7 +666,7 @@ iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown)
}
LeaveCriticalSection(&as->lock);
}
- LeaveCriticalSection(&lev_iocp->lock);
+ UNLOCK(lev);
return 0;
}
@@ -529,10 +675,18 @@ iocp_listener_disable(struct evconnlistener *lev)
{
return iocp_listener_disable_impl(lev,0);
}
+
static void
iocp_listener_destroy(struct evconnlistener *lev)
{
- iocp_listener_disable_impl(lev,1);
+ struct evconnlistener_iocp *lev_iocp =
+ EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base);
+
+ if (! lev_iocp->shutting_down) {
+ lev_iocp->shutting_down = 1;
+ iocp_listener_disable_impl(lev,1);
+ }
+
}
static evutil_socket_t
@@ -554,6 +708,7 @@ static const struct evconnlistener_ops evconnlistener_iocp_ops = {
iocp_listener_enable,
iocp_listener_disable,
iocp_listener_destroy,
+ iocp_listener_destroy, /* shutdown */
iocp_listener_getfd,
iocp_listener_getbase
};
@@ -571,6 +726,8 @@ evconnlistener_new_async(struct event_base *base,
struct evconnlistener_iocp *lev;
int i;
+ flags |= LEV_OPT_THREADSAFE;
+
if (!base || !event_base_get_iocp(base))
goto err;
@@ -595,6 +752,7 @@ evconnlistener_new_async(struct event_base *base,
lev->base.cb = cb;
lev->base.user_data = ptr;
lev->base.flags = flags;
+ lev->base.refcnt = 1;
lev->port = event_base_get_iocp(base);
lev->fd = fd;
@@ -603,7 +761,7 @@ evconnlistener_new_async(struct event_base *base,
if (event_iocp_port_associate(lev->port, fd, 1) < 0)
goto err_free_lev;
- InitializeCriticalSectionAndSpinCount(&lev->lock, 1000);
+ EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
lev->n_accepting = N_SOCKETS_PER_LISTENER;
lev->accepting = mm_calloc(lev->n_accepting,
@@ -624,6 +782,7 @@ evconnlistener_new_async(struct event_base *base,
free_and_unlock_accepting_socket(lev->accepting[i]);
goto err_free_accepting;
}
+ ++lev->base.refcnt;
}
return &lev->base;
@@ -632,7 +791,7 @@ err_free_accepting:
mm_free(lev->accepting);
/* XXXX free the other elements. */
err_delete_lock:
- DeleteCriticalSection(&lev->lock);
+ EVTHREAD_FREE_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);
err_free_lev:
mm_free(lev);
err: