diff options
author | dormando <dormando@rydia.net> | 2021-07-29 17:31:08 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | c25d0bd68ab0f6226c2d979bf0951923624926dd (patch) | |
tree | ee742afe391e8e47f43163bca32f91a284fdc4ef | |
parent | d89ecd3456138c226934ddcdde749e3ba90a45a7 (diff) | |
download | memcached-c25d0bd68ab0f6226c2d979bf0951923624926dd.tar.gz |
thread: use eventfd for worker notify if available
now that all of the read/writes to the notify pipe are in one place,
we can easily use linux eventfd if available. This also allows batching
events so we're not firing the same notifier constantly.
-rw-r--r-- | configure.ac | 1 | ||||
-rw-r--r-- | memcached.h | 4 | ||||
-rw-r--r-- | thread.c | 157 |
3 files changed, 104 insertions, 58 deletions
diff --git a/configure.ac b/configure.ac index be2b460..0985f07 100644 --- a/configure.ac +++ b/configure.ac @@ -644,6 +644,7 @@ AC_CHECK_FUNCS(memcntl) AC_CHECK_FUNCS(clock_gettime) AC_CHECK_FUNCS(preadv) AC_CHECK_FUNCS(pread) +AC_CHECK_FUNCS(eventfd) AC_CHECK_FUNCS([accept4], [AC_DEFINE(HAVE_ACCEPT4, 1, [Define to 1 if support accept4])]) AC_CHECK_FUNCS([getopt_long], [AC_DEFINE(HAVE_GETOPT_LONG, 1, [Define to 1 if support getopt_long])]) diff --git a/memcached.h b/memcached.h index c475de0..aab21cd 100644 --- a/memcached.h +++ b/memcached.h @@ -626,8 +626,12 @@ typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ +#ifdef HAVE_EVENTFD + int notify_event_fd; /* notify counter */ +#else int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ +#endif struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *ev_queue; /* Worker/conn event queue */ cache_t *rbuf_cache; /* static-sized read buffers */ @@ -6,6 +6,9 @@ #ifdef EXTSTORE #include "storage.h" #endif +#ifdef HAVE_EVENTFD +#include <sys/eventfd.h> +#endif #include <assert.h> #include <stdio.h> #include <errno.h> @@ -319,13 +322,20 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) { } static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) { - char buf[1]; cq_push(t->ev_queue, item); - buf[0] = 'c'; +#ifdef HAVE_EVENTFD + uint64_t u = 1; + if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + perror("failed writing to worker eventfd"); + /* TODO: This is a fatal problem. Can it ever happen temporarily? */ + } +#else + char buf[1] = "c"; if (write(t->notify_send_fd, buf, 1) != 1) { perror("Failed writing to notify pipe"); /* TODO: This is a fatal problem. Can it ever happen temporarily? */ } +#endif } // NOTE: An external func that takes a conn *c might be cleaner overall. @@ -393,8 +403,13 @@ static void setup_thread(LIBEVENT_THREAD *me) { } /* Listen for notifications from other threads */ +#ifdef HAVE_EVENTFD + event_set(&me->notify_event, me->notify_event_fd, + EV_READ | EV_PERSIST, thread_libevent_process, me); +#else event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); +#endif event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { @@ -482,82 +497,100 @@ static void *worker_libevent(void *arg) { * Processes an incoming "connection event" item. This is called when * input arrives on the libevent wakeup pipe. */ +// Syscalls can be expensive enough that handling a few of them once here can +// save both throughput and overall latency. +#define MAX_PIPE_EVENTS 32 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; - char buf[1]; conn *c; - - if (read(fd, buf, 1) != 1) { + uint64_t ev_count = 0; // max number of events to loop through this run. +#ifdef HAVE_EVENTFD + // NOTE: unlike pipe we aren't limiting the number of events per read. + // However we do limit the number of queue pulls to what the count was at + // the time of this function firing. + if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } +#else + char buf[MAX_PIPE_EVENTS]; - item = cq_pop(me->ev_queue); - if (item == NULL) { + ev_count = read(fd, buf, MAX_PIPE_EVENTS); + if (ev_count == 0) { + if (settings.verbose > 0) + fprintf(stderr, "Can't read from libevent pipe\n"); return; } +#endif - switch (item->mode) { - case queue_new_conn: - c = conn_new(item->sfd, item->init_state, item->event_flags, - item->read_buffer_size, item->transport, - me->base, item->ssl); - if (c == NULL) { - if (IS_UDP(item->transport)) { - fprintf(stderr, "Can't listen for events on UDP socket\n"); - exit(1); - } else { - if (settings.verbose > 0) { - fprintf(stderr, "Can't listen for events on fd %d\n", - item->sfd); - } + for (int x = 0; x < ev_count; x++) { + item = cq_pop(me->ev_queue); + if (item == NULL) { + return; + } + + switch (item->mode) { + case queue_new_conn: + c = conn_new(item->sfd, item->init_state, item->event_flags, + item->read_buffer_size, item->transport, + me->base, item->ssl); + if (c == NULL) { + if (IS_UDP(item->transport)) { + fprintf(stderr, "Can't listen for events on UDP socket\n"); + exit(1); + } else { + if (settings.verbose > 0) { + fprintf(stderr, "Can't listen for events on fd %d\n", + item->sfd); + } #ifdef TLS - if (item->ssl) { - SSL_shutdown(item->ssl); - SSL_free(item->ssl); - } + if (item->ssl) { + SSL_shutdown(item->ssl); + SSL_free(item->ssl); + } #endif - close(item->sfd); - } - } else { - c->thread = me; + close(item->sfd); + } + } else { + c->thread = me; #ifdef EXTSTORE - if (c->thread->storage) { - conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, - storage_submit_cb, storage_complete_cb, storage_finalize_cb); - } + if (c->thread->storage) { + conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, + storage_submit_cb, storage_complete_cb, storage_finalize_cb); + } #endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); + conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); #ifdef TLS - if (settings.ssl_enabled && c->ssl != NULL) { - assert(c->thread && c->thread->ssl_wbuf); - c->ssl_wbuf = c->thread->ssl_wbuf; - } + if (settings.ssl_enabled && c->ssl != NULL) { + assert(c->thread && c->thread->ssl_wbuf); + c->ssl_wbuf = c->thread->ssl_wbuf; + } #endif - } - break; - case queue_pause: - /* we were told to pause and report in */ - register_thread_initialized(); - break; - case queue_timeout: - /* a client socket timed out */ - conn_close_idle(conns[item->sfd]); - break; - case queue_redispatch: - /* a side thread redispatched a client connection */ - conn_worker_readd(conns[item->sfd]); - break; - case queue_stop: - /* asked to stop */ - event_base_loopexit(me->base, NULL); - break; - } + } + break; + case queue_pause: + /* we were told to pause and report in */ + register_thread_initialized(); + break; + case queue_timeout: + /* a client socket timed out */ + conn_close_idle(conns[item->sfd]); + break; + case queue_redispatch: + /* a side thread redispatched a client connection */ + conn_worker_readd(conns[item->sfd]); + break; + case queue_stop: + /* asked to stop */ + event_base_loopexit(me->base, NULL); + break; + } - cqi_free(me->ev_queue, item); + cqi_free(me->ev_queue, item); + } } /* Which thread we assigned a connection to most recently. */ @@ -955,6 +988,13 @@ void memcached_thread_init(int nthreads, void *arg) { } for (i = 0; i < nthreads; i++) { +#ifdef HAVE_EVENTFD + threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK); + if (threads[i].notify_event_fd == -1) { + perror("failed creating eventfd for worker thread"); + exit(1); + } +#else int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); @@ -963,6 +1003,7 @@ void memcached_thread_init(int nthreads, void *arg) { threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; +#endif #ifdef EXTSTORE threads[i].storage = arg; #endif |