summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-07-29 17:31:08 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commitc25d0bd68ab0f6226c2d979bf0951923624926dd (patch)
treeee742afe391e8e47f43163bca32f91a284fdc4ef /thread.c
parentd89ecd3456138c226934ddcdde749e3ba90a45a7 (diff)
downloadmemcached-c25d0bd68ab0f6226c2d979bf0951923624926dd.tar.gz
thread: use eventfd for worker notify if available
now that all of the read/writes to the notify pipe are in one place, we can easily use linux eventfd if available. This also allows batching events so we're not firing the same notifier constantly.
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c157
1 files changed, 99 insertions, 58 deletions
diff --git a/thread.c b/thread.c
index b288d96..d969a09 100644
--- a/thread.c
+++ b/thread.c
@@ -6,6 +6,9 @@
#ifdef EXTSTORE
#include "storage.h"
#endif
+#ifdef HAVE_EVENTFD
+#include <sys/eventfd.h>
+#endif
#include <assert.h>
#include <stdio.h>
#include <errno.h>
@@ -319,13 +322,20 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
}
static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
- char buf[1];
cq_push(t->ev_queue, item);
- buf[0] = 'c';
+#ifdef HAVE_EVENTFD
+ uint64_t u = 1;
+ if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ perror("failed writing to worker eventfd");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+#else
+ char buf[1] = "c";
if (write(t->notify_send_fd, buf, 1) != 1) {
perror("Failed writing to notify pipe");
/* TODO: This is a fatal problem. Can it ever happen temporarily? */
}
+#endif
}
// NOTE: An external func that takes a conn *c might be cleaner overall.
@@ -393,8 +403,13 @@ static void setup_thread(LIBEVENT_THREAD *me) {
}
/* Listen for notifications from other threads */
+#ifdef HAVE_EVENTFD
+ event_set(&me->notify_event, me->notify_event_fd,
+ EV_READ | EV_PERSIST, thread_libevent_process, me);
+#else
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
+#endif
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
@@ -482,82 +497,100 @@ static void *worker_libevent(void *arg) {
* Processes an incoming "connection event" item. This is called when
* input arrives on the libevent wakeup pipe.
*/
+// Syscalls can be expensive enough that handling a few of them once here can
+// save both throughput and overall latency.
+#define MAX_PIPE_EVENTS 32
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
- char buf[1];
conn *c;
-
- if (read(fd, buf, 1) != 1) {
+ uint64_t ev_count = 0; // max number of events to loop through this run.
+#ifdef HAVE_EVENTFD
+ // NOTE: unlike pipe we aren't limiting the number of events per read.
+ // However we do limit the number of queue pulls to what the count was at
+ // the time of this function firing.
+ if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
return;
}
+#else
+ char buf[MAX_PIPE_EVENTS];
- item = cq_pop(me->ev_queue);
- if (item == NULL) {
+ ev_count = read(fd, buf, MAX_PIPE_EVENTS);
+ if (ev_count == 0) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Can't read from libevent pipe\n");
return;
}
+#endif
- switch (item->mode) {
- case queue_new_conn:
- c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base, item->ssl);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket\n");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d\n",
- item->sfd);
- }
+ for (int x = 0; x < ev_count; x++) {
+ item = cq_pop(me->ev_queue);
+ if (item == NULL) {
+ return;
+ }
+
+ switch (item->mode) {
+ case queue_new_conn:
+ c = conn_new(item->sfd, item->init_state, item->event_flags,
+ item->read_buffer_size, item->transport,
+ me->base, item->ssl);
+ if (c == NULL) {
+ if (IS_UDP(item->transport)) {
+ fprintf(stderr, "Can't listen for events on UDP socket\n");
+ exit(1);
+ } else {
+ if (settings.verbose > 0) {
+ fprintf(stderr, "Can't listen for events on fd %d\n",
+ item->sfd);
+ }
#ifdef TLS
- if (item->ssl) {
- SSL_shutdown(item->ssl);
- SSL_free(item->ssl);
- }
+ if (item->ssl) {
+ SSL_shutdown(item->ssl);
+ SSL_free(item->ssl);
+ }
#endif
- close(item->sfd);
- }
- } else {
- c->thread = me;
+ close(item->sfd);
+ }
+ } else {
+ c->thread = me;
#ifdef EXTSTORE
- if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, storage_finalize_cb);
- }
+ if (c->thread->storage) {
+ conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
+ storage_submit_cb, storage_complete_cb, storage_finalize_cb);
+ }
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS
- if (settings.ssl_enabled && c->ssl != NULL) {
- assert(c->thread && c->thread->ssl_wbuf);
- c->ssl_wbuf = c->thread->ssl_wbuf;
- }
+ if (settings.ssl_enabled && c->ssl != NULL) {
+ assert(c->thread && c->thread->ssl_wbuf);
+ c->ssl_wbuf = c->thread->ssl_wbuf;
+ }
#endif
- }
- break;
- case queue_pause:
- /* we were told to pause and report in */
- register_thread_initialized();
- break;
- case queue_timeout:
- /* a client socket timed out */
- conn_close_idle(conns[item->sfd]);
- break;
- case queue_redispatch:
- /* a side thread redispatched a client connection */
- conn_worker_readd(conns[item->sfd]);
- break;
- case queue_stop:
- /* asked to stop */
- event_base_loopexit(me->base, NULL);
- break;
- }
+ }
+ break;
+ case queue_pause:
+ /* we were told to pause and report in */
+ register_thread_initialized();
+ break;
+ case queue_timeout:
+ /* a client socket timed out */
+ conn_close_idle(conns[item->sfd]);
+ break;
+ case queue_redispatch:
+ /* a side thread redispatched a client connection */
+ conn_worker_readd(conns[item->sfd]);
+ break;
+ case queue_stop:
+ /* asked to stop */
+ event_base_loopexit(me->base, NULL);
+ break;
+ }
- cqi_free(me->ev_queue, item);
+ cqi_free(me->ev_queue, item);
+ }
}
/* Which thread we assigned a connection to most recently. */
@@ -955,6 +988,13 @@ void memcached_thread_init(int nthreads, void *arg) {
}
for (i = 0; i < nthreads; i++) {
+#ifdef HAVE_EVENTFD
+ threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (threads[i].notify_event_fd == -1) {
+ perror("failed creating eventfd for worker thread");
+ exit(1);
+ }
+#else
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
@@ -963,6 +1003,7 @@ void memcached_thread_init(int nthreads, void *arg) {
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
+#endif
#ifdef EXTSTORE
threads[i].storage = arg;
#endif