summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrond Norbye <trond.norbye@gmail.com>2011-04-02 11:30:16 +0200
committerTrond Norbye <trond.norbye@gmail.com>2011-04-02 11:30:16 +0200
commit2647026782b7971e6d03b46dcc61827b63a31b5d (patch)
treebf37035a28fb55f945d060fa768b886531573d1b
parentbe962a0f934391c95660b63af356f266af11fb24 (diff)
downloadmemcached-2647026782b7971e6d03b46dcc61827b63a31b5d.tar.gz
Fixed problems when accept returns EMFILE
-rw-r--r--daemon/memcached.c80
-rw-r--r--daemon/memcached.h14
-rw-r--r--daemon/thread.c81
-rwxr-xr-xt/binary.t2
-rwxr-xr-xt/stats.t2
5 files changed, 144 insertions, 35 deletions
diff --git a/daemon/memcached.c b/daemon/memcached.c
index 55a49c5..6f22066 100644
--- a/daemon/memcached.c
+++ b/daemon/memcached.c
@@ -336,6 +336,47 @@ static const char *prot_text(enum protocol prot) {
return rv;
}
+struct {
+ pthread_mutex_t mutex;
+ bool disabled;
+ ssize_t count;
+ uint64_t num_disable;
+} listen_state;
+
+static bool is_listen_disabled(void) {
+ bool ret;
+ pthread_mutex_lock(&listen_state.mutex);
+ ret = listen_state.disabled;
+ pthread_mutex_unlock(&listen_state.mutex);
+ return ret;
+}
+
+static uint64_t get_listen_disabled_num(void) {
+ uint64_t ret;
+ pthread_mutex_lock(&listen_state.mutex);
+ ret = listen_state.num_disable;
+ pthread_mutex_unlock(&listen_state.mutex);
+ return ret;
+}
+
+static void disable_listen(void) {
+ pthread_mutex_lock(&listen_state.mutex);
+ listen_state.disabled = true;
+ listen_state.count = 10;
+ ++listen_state.num_disable;
+ pthread_mutex_unlock(&listen_state.mutex);
+
+ conn *next;
+ for (next = listen_conn; next; next = next->next) {
+ update_event(next, 0);
+ if (listen(next->sfd, 1) != 0) {
+ settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
+ "listen() failed",
+ strerror(errno));
+ }
+ }
+}
+
void safe_close(SOCKET sfd) {
if (sfd != INVALID_SOCKET) {
int rval;
@@ -352,6 +393,10 @@ void safe_close(SOCKET sfd) {
STATS_LOCK();
stats.curr_conns--;
STATS_UNLOCK();
+
+ if (is_listen_disabled()) {
+ notify_dispatcher();
+ }
}
}
}
@@ -3665,6 +3710,8 @@ static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate) {
APPEND_STAT("bytes_read", "%"PRIu64, thread_stats.bytes_read);
APPEND_STAT("bytes_written", "%"PRIu64, thread_stats.bytes_written);
APPEND_STAT("limit_maxbytes", "%"PRIu64, settings.maxbytes);
+ APPEND_STAT("accepting_conns", "%u", is_listen_disabled() ? 0 : 1);
+ APPEND_STAT("listen_disabled_num", "%"PRIu64, get_listen_disabled_num());
APPEND_STAT("rejected_conns", "%" PRIu64, (unsigned long long)stats.rejected_conns);
APPEND_STAT("threads", "%d", settings.num_threads);
APPEND_STAT("conn_yields", "%" PRIu64, (unsigned long long)thread_stats.conn_yields);
@@ -4998,11 +5045,11 @@ bool conn_listening(conn *c)
settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
"Too many open connections\n");
}
+ disable_listen();
} else if (errno != EAGAIN && errno != EWOULDBLOCK) {
settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
"Failed to accept new client: %s\n",
strerror(errno));
-
}
return false;
@@ -5563,6 +5610,35 @@ void event_handler(const int fd, const short which, void *arg) {
}
}
+static void dispatch_event_handler(int fd, short which, void *arg) {
+ char buffer[80];
+ ssize_t nr = recv(fd, buffer, sizeof(buffer), 0);
+
+ if (nr != -1 && is_listen_disabled()) {
+ bool enable = false;
+ pthread_mutex_lock(&listen_state.mutex);
+ listen_state.count -= nr;
+ if (listen_state.count <= 0) {
+ enable = true;
+ listen_state.disabled = false;
+ }
+ pthread_mutex_unlock(&listen_state.mutex);
+ if (enable) {
+ conn *next;
+ for (next = listen_conn; next; next = next->next) {
+ update_event(next, EV_READ | EV_PERSIST);
+ if (listen(next->sfd, settings.backlog) != 0) {
+ settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
+ "listen() failed",
+ strerror(errno));
+ }
+ }
+ }
+ }
+}
+
+
+
static SOCKET new_socket(struct addrinfo *ai) {
SOCKET sfd;
@@ -7310,7 +7386,7 @@ int main (int argc, char **argv) {
#endif
/* start up worker threads if MT mode */
- thread_init(settings.num_threads, main_base);
+ thread_init(settings.num_threads, main_base, dispatch_event_handler);
/* initialise clock event */
clock_handler(0, 0, 0);
diff --git a/daemon/memcached.h b/daemon/memcached.h
index 13555da..efa6dc3 100644
--- a/daemon/memcached.h
+++ b/daemon/memcached.h
@@ -223,7 +223,8 @@ extern struct settings settings;
enum thread_type {
GENERAL = 11,
- TAP = 13
+ TAP = 13,
+ DISPATCHER = 15
};
typedef struct {
@@ -258,15 +259,11 @@ typedef struct {
}
extern void notify_thread(LIBEVENT_THREAD *thread);
+extern void notify_dispatcher(void);
+extern bool create_notification_pipe(LIBEVENT_THREAD *me);
extern LIBEVENT_THREAD* tap_thread;
-typedef struct {
- pthread_t thread_id; /* unique ID of this thread */
- struct event_base *base; /* libevent handle this thread uses */
-} LIBEVENT_DISPATCHER_THREAD;
-
-
typedef struct conn conn;
typedef bool (*STATE_FUNC)(conn *);
@@ -417,7 +414,8 @@ bool update_event(conn *c, const int new_flags);
* also #define-d to directly call the underlying code in singlethreaded mode.
*/
-void thread_init(int nthreads, struct event_base *main_base);
+void thread_init(int nthreads, struct event_base *main_base,
+ void (*dispatcher_callback)(int, short, void *));
void threads_shutdown(void);
int dispatch_event_add(int thread, conn *c);
diff --git a/daemon/thread.c b/daemon/thread.c
index 9ac5d34..1ca65a1 100644
--- a/daemon/thread.c
+++ b/daemon/thread.c
@@ -50,7 +50,7 @@ static pthread_mutex_t stats_lock;
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
-static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
+static LIBEVENT_THREAD dispatcher_thread;
/*
* Each libevent instance has a wakeup pipe, which other threads
@@ -186,6 +186,56 @@ static void create_worker(void *(*func)(void *), void *arg, pthread_t *id) {
/****************************** LIBEVENT THREADS *****************************/
+bool create_notification_pipe(LIBEVENT_THREAD *me)
+{
+ if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
+ (void*)me->notify) == SOCKET_ERROR) {
+ settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
+ "Can't create notify pipe: %s",
+ strerror(errno));
+ return false;
+ }
+
+ for (int j = 0; j < 2; ++j) {
+ int flags = 1;
+ setsockopt(me->notify[j], IPPROTO_TCP,
+ TCP_NODELAY, (void *)&flags, sizeof(flags));
+ setsockopt(me->notify[j], SOL_SOCKET,
+ SO_REUSEADDR, (void *)&flags, sizeof(flags));
+
+
+ if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
+ settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
+ "Failed to enable non-blocking: %s",
+ strerror(errno));
+ return false;
+ }
+ }
+ return true;
+}
+
+static void setup_dispatcher(struct event_base *main_base,
+ void (*dispatcher_callback)(int, short, void *))
+{
+ memset(&dispatcher_thread, 0, sizeof(dispatcher_thread));
+ dispatcher_thread.type = DISPATCHER;
+ dispatcher_thread.base = main_base;
+ dispatcher_thread.thread_id = pthread_self();
+ if (!create_notification_pipe(&dispatcher_thread)) {
+ exit(1);
+ }
+ /* Listen for notifications from other threads */
+ event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
+ EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
+ event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
+
+ if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
+ settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
+ "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
+}
+
/*
* Set up a thread's information.
*/
@@ -651,6 +701,9 @@ int is_listen_thread() {
#endif
}
+void notify_dispatcher(void) {
+ notify_thread(&dispatcher_thread);
+}
/******************************* GLOBAL STATS ******************************/
@@ -752,7 +805,8 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
* nthreads Number of worker event handler threads to spawn
* main_base Event base for main thread
*/
-void thread_init(int nthr, struct event_base *main_base) {
+void thread_init(int nthr, struct event_base *main_base,
+ void (*dispatcher_callback)(int, short, void *)) {
int i;
nthreads = nthr + 1;
@@ -776,31 +830,12 @@ void thread_init(int nthr, struct event_base *main_base) {
exit(1);
}
- dispatcher_thread.base = main_base;
- dispatcher_thread.thread_id = pthread_self();
+ setup_dispatcher(main_base, dispatcher_callback);
for (i = 0; i < nthreads; i++) {
- if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
- (void*)threads[i].notify) == SOCKET_ERROR) {
- settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
- "Can't create notify pipe: %s",
- strerror(errno));
+ if (!create_notification_pipe(&threads[i])) {
exit(1);
}
-
- for (int j = 0; j < 2; ++j) {
- int flags = 1;
- setsockopt(threads[i].notify[j], IPPROTO_TCP,
- TCP_NODELAY, (void *)&flags, sizeof(flags));
- setsockopt(threads[i].notify[j], SOL_SOCKET,
- SO_REUSEADDR, (void *)&flags, sizeof(flags));
-
-
- if (evutil_make_socket_nonblocking(threads[i].notify[j]) == -1) {
- exit(1);
- }
- }
-
threads[i].index = i;
setup_thread(&threads[i], i == (nthreads - 1));
diff --git a/t/binary.t b/t/binary.t
index 59d981d..64c7a29 100755
--- a/t/binary.t
+++ b/t/binary.t
@@ -2,7 +2,7 @@
use strict;
use warnings;
-use Test::More tests => 3406;
+use Test::More tests => 3430;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
diff --git a/t/stats.t b/t/stats.t
index aef2d4c..1faf063 100755
--- a/t/stats.t
+++ b/t/stats.t
@@ -62,7 +62,7 @@ if ($stats->{'auth_sasl_enabled'} == 'yes') {
$sasl_enabled = 1;
}
-is(scalar(keys(%$stats)), 40, "40 stats values");
+is(scalar(keys(%$stats)), 42, "42 stats values");
# Test initial state
foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses