From 2647026782b7971e6d03b46dcc61827b63a31b5d Mon Sep 17 00:00:00 2001 From: Trond Norbye Date: Sat, 2 Apr 2011 11:30:16 +0200 Subject: Fixed problems when accept returns EMFILE --- daemon/memcached.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++-- daemon/memcached.h | 14 ++++------ daemon/thread.c | 81 ++++++++++++++++++++++++++++++++++++++---------------- t/binary.t | 2 +- t/stats.t | 2 +- 5 files changed, 144 insertions(+), 35 deletions(-) diff --git a/daemon/memcached.c b/daemon/memcached.c index 55a49c5..6f22066 100644 --- a/daemon/memcached.c +++ b/daemon/memcached.c @@ -336,6 +336,47 @@ static const char *prot_text(enum protocol prot) { return rv; } +struct { + pthread_mutex_t mutex; + bool disabled; + ssize_t count; + uint64_t num_disable; +} listen_state; + +static bool is_listen_disabled(void) { + bool ret; + pthread_mutex_lock(&listen_state.mutex); + ret = listen_state.disabled; + pthread_mutex_unlock(&listen_state.mutex); + return ret; +} + +static uint64_t get_listen_disabled_num(void) { + uint64_t ret; + pthread_mutex_lock(&listen_state.mutex); + ret = listen_state.num_disable; + pthread_mutex_unlock(&listen_state.mutex); + return ret; +} + +static void disable_listen(void) { + pthread_mutex_lock(&listen_state.mutex); + listen_state.disabled = true; + listen_state.count = 10; + ++listen_state.num_disable; + pthread_mutex_unlock(&listen_state.mutex); + + conn *next; + for (next = listen_conn; next; next = next->next) { + update_event(next, 0); + if (listen(next->sfd, 1) != 0) { + settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, + "listen() failed", + strerror(errno)); + } + } +} + void safe_close(SOCKET sfd) { if (sfd != INVALID_SOCKET) { int rval; @@ -352,6 +393,10 @@ void safe_close(SOCKET sfd) { STATS_LOCK(); stats.curr_conns--; STATS_UNLOCK(); + + if (is_listen_disabled()) { + notify_dispatcher(); + } } } } @@ -3665,6 +3710,8 @@ static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate) { APPEND_STAT("bytes_read", "%"PRIu64, thread_stats.bytes_read); APPEND_STAT("bytes_written", "%"PRIu64, thread_stats.bytes_written); APPEND_STAT("limit_maxbytes", "%"PRIu64, settings.maxbytes); + APPEND_STAT("accepting_conns", "%u", is_listen_disabled() ? 0 : 1); + APPEND_STAT("listen_disabled_num", "%"PRIu64, get_listen_disabled_num()); APPEND_STAT("rejected_conns", "%" PRIu64, (unsigned long long)stats.rejected_conns); APPEND_STAT("threads", "%d", settings.num_threads); APPEND_STAT("conn_yields", "%" PRIu64, (unsigned long long)thread_stats.conn_yields); @@ -4998,11 +5045,11 @@ bool conn_listening(conn *c) settings.extensions.logger->log(EXTENSION_LOG_INFO, c, "Too many open connections\n"); } + disable_listen(); } else if (errno != EAGAIN && errno != EWOULDBLOCK) { settings.extensions.logger->log(EXTENSION_LOG_WARNING, c, "Failed to accept new client: %s\n", strerror(errno)); - } return false; @@ -5563,6 +5610,35 @@ void event_handler(const int fd, const short which, void *arg) { } } +static void dispatch_event_handler(int fd, short which, void *arg) { + char buffer[80]; + ssize_t nr = recv(fd, buffer, sizeof(buffer), 0); + + if (nr != -1 && is_listen_disabled()) { + bool enable = false; + pthread_mutex_lock(&listen_state.mutex); + listen_state.count -= nr; + if (listen_state.count <= 0) { + enable = true; + listen_state.disabled = false; + } + pthread_mutex_unlock(&listen_state.mutex); + if (enable) { + conn *next; + for (next = listen_conn; next; next = next->next) { + update_event(next, EV_READ | EV_PERSIST); + if (listen(next->sfd, settings.backlog) != 0) { + settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, + "listen() failed", + strerror(errno)); + } + } + } + } +} + + + static SOCKET new_socket(struct addrinfo *ai) { SOCKET sfd; @@ -7310,7 +7386,7 @@ int main (int argc, char **argv) { #endif /* start up worker threads if MT mode */ - thread_init(settings.num_threads, main_base); + thread_init(settings.num_threads, main_base, dispatch_event_handler); /* initialise clock event */ clock_handler(0, 0, 0); diff --git a/daemon/memcached.h b/daemon/memcached.h index 13555da..efa6dc3 100644 --- a/daemon/memcached.h +++ b/daemon/memcached.h @@ -223,7 +223,8 @@ extern struct settings settings; enum thread_type { GENERAL = 11, - TAP = 13 + TAP = 13, + DISPATCHER = 15 }; typedef struct { @@ -258,15 +259,11 @@ typedef struct { } extern void notify_thread(LIBEVENT_THREAD *thread); +extern void notify_dispatcher(void); +extern bool create_notification_pipe(LIBEVENT_THREAD *me); extern LIBEVENT_THREAD* tap_thread; -typedef struct { - pthread_t thread_id; /* unique ID of this thread */ - struct event_base *base; /* libevent handle this thread uses */ -} LIBEVENT_DISPATCHER_THREAD; - - typedef struct conn conn; typedef bool (*STATE_FUNC)(conn *); @@ -417,7 +414,8 @@ bool update_event(conn *c, const int new_flags); * also #define-d to directly call the underlying code in singlethreaded mode. */ -void thread_init(int nthreads, struct event_base *main_base); +void thread_init(int nthreads, struct event_base *main_base, + void (*dispatcher_callback)(int, short, void *)); void threads_shutdown(void); int dispatch_event_add(int thread, conn *c); diff --git a/daemon/thread.c b/daemon/thread.c index 9ac5d34..1ca65a1 100644 --- a/daemon/thread.c +++ b/daemon/thread.c @@ -50,7 +50,7 @@ static pthread_mutex_t stats_lock; static CQ_ITEM *cqi_freelist; static pthread_mutex_t cqi_freelist_lock; -static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; +static LIBEVENT_THREAD dispatcher_thread; /* * Each libevent instance has a wakeup pipe, which other threads @@ -186,6 +186,56 @@ static void create_worker(void *(*func)(void *), void *arg, pthread_t *id) { /****************************** LIBEVENT THREADS *****************************/ +bool create_notification_pipe(LIBEVENT_THREAD *me) +{ + if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0, + (void*)me->notify) == SOCKET_ERROR) { + settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, + "Can't create notify pipe: %s", + strerror(errno)); + return false; + } + + for (int j = 0; j < 2; ++j) { + int flags = 1; + setsockopt(me->notify[j], IPPROTO_TCP, + TCP_NODELAY, (void *)&flags, sizeof(flags)); + setsockopt(me->notify[j], SOL_SOCKET, + SO_REUSEADDR, (void *)&flags, sizeof(flags)); + + + if (evutil_make_socket_nonblocking(me->notify[j]) == -1) { + settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, + "Failed to enable non-blocking: %s", + strerror(errno)); + return false; + } + } + return true; +} + +static void setup_dispatcher(struct event_base *main_base, + void (*dispatcher_callback)(int, short, void *)) +{ + memset(&dispatcher_thread, 0, sizeof(dispatcher_thread)); + dispatcher_thread.type = DISPATCHER; + dispatcher_thread.base = main_base; + dispatcher_thread.thread_id = pthread_self(); + if (!create_notification_pipe(&dispatcher_thread)) { + exit(1); + } + /* Listen for notifications from other threads */ + event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0], + EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback); + event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event); + + if (event_add(&dispatcher_thread.notify_event, 0) == -1) { + settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, + "Can't monitor libevent notify pipe\n"); + exit(1); + } +} + /* * Set up a thread's information. */ @@ -651,6 +701,9 @@ int is_listen_thread() { #endif } +void notify_dispatcher(void) { + notify_thread(&dispatcher_thread); +} /******************************* GLOBAL STATS ******************************/ @@ -752,7 +805,8 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) { * nthreads Number of worker event handler threads to spawn * main_base Event base for main thread */ -void thread_init(int nthr, struct event_base *main_base) { +void thread_init(int nthr, struct event_base *main_base, + void (*dispatcher_callback)(int, short, void *)) { int i; nthreads = nthr + 1; @@ -776,31 +830,12 @@ void thread_init(int nthr, struct event_base *main_base) { exit(1); } - dispatcher_thread.base = main_base; - dispatcher_thread.thread_id = pthread_self(); + setup_dispatcher(main_base, dispatcher_callback); for (i = 0; i < nthreads; i++) { - if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0, - (void*)threads[i].notify) == SOCKET_ERROR) { - settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL, - "Can't create notify pipe: %s", - strerror(errno)); + if (!create_notification_pipe(&threads[i])) { exit(1); } - - for (int j = 0; j < 2; ++j) { - int flags = 1; - setsockopt(threads[i].notify[j], IPPROTO_TCP, - TCP_NODELAY, (void *)&flags, sizeof(flags)); - setsockopt(threads[i].notify[j], SOL_SOCKET, - SO_REUSEADDR, (void *)&flags, sizeof(flags)); - - - if (evutil_make_socket_nonblocking(threads[i].notify[j]) == -1) { - exit(1); - } - } - threads[i].index = i; setup_thread(&threads[i], i == (nthreads - 1)); diff --git a/t/binary.t b/t/binary.t index 59d981d..64c7a29 100755 --- a/t/binary.t +++ b/t/binary.t @@ -2,7 +2,7 @@ use strict; use warnings; -use Test::More tests => 3406; +use Test::More tests => 3430; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; diff --git a/t/stats.t b/t/stats.t index aef2d4c..1faf063 100755 --- a/t/stats.t +++ b/t/stats.t @@ -62,7 +62,7 @@ if ($stats->{'auth_sasl_enabled'} == 'yes') { $sasl_enabled = 1; } -is(scalar(keys(%$stats)), 40, "40 stats values"); +is(scalar(keys(%$stats)), 42, "42 stats values"); # Test initial state foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses -- cgit v1.2.1