From e794d716a3ed525ca427d35cb1c80fc6bb9f8543 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 2 Nov 2009 20:20:40 +0000 Subject: Clean up acceptex code some more: add locking, single-threading, enable/disable. svn:r1491 --- listener.c | 120 +++++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 97 insertions(+), 23 deletions(-) (limited to 'listener.c') diff --git a/listener.c b/listener.c index e1406254..553eb4f4 100644 --- a/listener.c +++ b/listener.c @@ -55,6 +55,7 @@ #include "log-internal.h" #ifdef WIN32 #include "iocp-internal.h" +#include "defer-internal.h" #endif struct evconnlistener_ops { @@ -83,6 +84,7 @@ struct evconnlistener_iocp { evutil_socket_t fd; struct event_base *event_base; struct event_iocp_port *port; + CRITICAL_SECTION lock; int n_accepting; struct accepting_socket **accepting; }; @@ -289,6 +291,7 @@ struct accepting_socket { CRITICAL_SECTION lock; struct event_overlapped overlapped; SOCKET s; + struct deferred_cb deferred; struct evconnlistener_iocp *lev; ev_uint8_t buflen; ev_uint8_t family; @@ -344,6 +347,7 @@ free_and_unlock_accepting_socket(struct accepting_socket *as) static int start_accepting(struct accepting_socket *as) { + /* requires lock */ int result = -1; const struct win32_extension_fns *ext = event_get_win32_extension_fns(); @@ -386,61 +390,128 @@ done: return result; } -#if 0 static void stop_accepting(struct accepting_socket *as) { - /* XXX */ + /* requires lock. */ + SOCKET s = as->s; + as->s = INVALID_SOCKET; + closesocket(s); } -#endif static void -accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) +accepted_socket_invoke_user_cb(struct deferred_cb *cb, void *arg) { - /* Run this whole thing deferred unless some MT flag is set */ - /* XXX needs locking. */ - /* XXX use ok */ - - struct sockaddr *sa_local=NULL, *sa_remote=NULL; - int socklen_local=0, socklen_remote=0; - struct accepting_socket *as = - EVUTIL_UPCAST(o, struct accepting_socket, overlapped); - const struct win32_extension_fns *ext = - event_get_win32_extension_fns(); - EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); + struct *as = arg; + evconnlistener_cb cb; + EnterCriticalSection(&as->lock); ext->GetAcceptExSockaddrs(as->addrbuf, 0, as->buflen/2, as->buflen/2, &sa_local, &socklen_local, &sa_remote, &socklen_remote); - as->lev->base.cb(&as->lev->base, as->s, sa_remote, socklen_remote, - as->lev->base.user_data); + /* XXXX should we/can we release the lock here? */ + as->lev->base.cb(&as->lev->base, as->s, sa_remote, + socklen_remote, as->lev->base.user_data); as->s = INVALID_SOCKET; - /* Avoid stack overflow XXXX */ - start_accepting(as); + if (as->free_on_cb) { + free_and_unlock_accepting_socket(as); + } else { + start_accepting(as);/*XXX handle error */ + LeaveCriticalSection(&as->lock); + } } +static void +accepted_socket_cb(struct event_overlapped *o, uintptr_t key, ev_ssize_t n, int ok) +{ + struct sockaddr *sa_local=NULL, *sa_remote=NULL; + int socklen_local=0, socklen_remote=0; + struct accepting_socket *as = + EVUTIL_UPCAST(o, struct accepting_socket, overlapped); + const struct win32_extension_fns *ext = + event_get_win32_extension_fns(); + EVUTIL_ASSERT(ext->GetAcceptExSockaddrs); + + EnterCriticalSection(&as->lock); + if (ok) { + /* XXXX Don't do this if some EV_MT flag is set. */ + event_deferred_cb_schedule( + event_base_get_deferred_cb_queue(as->lev->event_base), + &as->deferred); + LeaveCriticalSection(&as->lock); + } else if (free_on_cb) { + free_and_unlock_accepting_socket(as); + } else if (as->s == INVALID_SOCKET) { + /* This is okay; we were disabled by iocp_listener_disable. */ + LeaveCriticalSection(&as->lock); + } else { + /* Some error on accept that we couldn't actually handle. */ + event_sock_warn(as->fd, "Unexpected error on AcceptEx"); + LeaveCriticalSection(&as->lock); + /* XXXX recover better. */ + } +} static int iocp_listener_enable(struct evconnlistener *lev) { - /* XXXX */ + int i; + struct evconnlistener_iocp *lev_iocp = + EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); + + EnterCriticalSection(&lev->lock); + for (i = 0; i < n_accepting; ++i) { + struct accepting_socket *as = lev_iocp->accepting[i]; + if (!as) + continue; + EnterCriticalSection(&as->lock); + if (!as->free_on_cb && as->s == INVALID_SOCKET) + start_accepting(as); /* detect failure. */ + LeaveCriticalSection(&as->lock); + } + LeaveCriticalSection(&lev->lock); return 0; } + static int -iocp_listener_disable(struct evconnlistener *lev) +iocp_listener_disable_impl(struct evconnlistener *lev, int shutdown) { - /* XXXX */ + int i; + struct evconnlistener_iocp *lev_iocp = + EVUTIL_UPCAST(lev, struct evconnlistener_iocp, base); + + EnterCriticalSection(&lev->lock); + for (i = 0; i < n_accepting; ++i) { + struct accepting_socket *as = lev_iocp->accepting[i]; + if (!as) + continue; + EnterCriticalSection(&as->lock); + if (!as->free_on_cb && as->s != INVALID_SOCKET) { + if (shutdown) + as->free_on_cb = 1; + stop_accepting(as); + } + LeaveCriticalSection(&as->lock); + } + LeaveCriticalSection(&lev->lock); return 0; } + +static int +iocp_listener_disable_impl(struct evconnlistener *lev) +{ + return iocp_listener_disable_impl(lev,0); +} static void iocp_listener_destroy(struct evconnlistener *lev) { - /* XXXX */ + iocp_listener_disable_impl(lev,1); } + static evutil_socket_t iocp_listener_getfd(struct evconnlistener *lev) { @@ -521,6 +592,8 @@ evconnlistener_new_async(struct event_base *base, return NULL; } + InitializeCriticalSection(&lev->lock); + if (start_accepting(lev->accepting[0]) < 0) { event_warnx("Couldn't start accepting on socket"); EnterCriticalSection(&lev->accepting[0]->lock); @@ -528,6 +601,7 @@ evconnlistener_new_async(struct event_base *base, mm_free(lev->accepting); mm_free(lev); closesocket(fd); + DeleteCriticalSection(&lev->lock); return NULL; } -- cgit v1.2.1