diff options
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 157 |
1 files changed, 99 insertions, 58 deletions
@@ -6,6 +6,9 @@ #ifdef EXTSTORE #include "storage.h" #endif +#ifdef HAVE_EVENTFD +#include <sys/eventfd.h> +#endif #include <assert.h> #include <stdio.h> #include <errno.h> @@ -319,13 +322,20 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) { } static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) { - char buf[1]; cq_push(t->ev_queue, item); - buf[0] = 'c'; +#ifdef HAVE_EVENTFD + uint64_t u = 1; + if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + perror("failed writing to worker eventfd"); + /* TODO: This is a fatal problem. Can it ever happen temporarily? */ + } +#else + char buf[1] = "c"; if (write(t->notify_send_fd, buf, 1) != 1) { perror("Failed writing to notify pipe"); /* TODO: This is a fatal problem. Can it ever happen temporarily? */ } +#endif } // NOTE: An external func that takes a conn *c might be cleaner overall. @@ -393,8 +403,13 @@ static void setup_thread(LIBEVENT_THREAD *me) { } /* Listen for notifications from other threads */ +#ifdef HAVE_EVENTFD + event_set(&me->notify_event, me->notify_event_fd, + EV_READ | EV_PERSIST, thread_libevent_process, me); +#else event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); +#endif event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { @@ -482,82 +497,100 @@ static void *worker_libevent(void *arg) { * Processes an incoming "connection event" item. This is called when * input arrives on the libevent wakeup pipe. */ +// Syscalls can be expensive enough that handling a few of them once here can +// save both throughput and overall latency. +#define MAX_PIPE_EVENTS 32 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; - char buf[1]; conn *c; - - if (read(fd, buf, 1) != 1) { + uint64_t ev_count = 0; // max number of events to loop through this run. +#ifdef HAVE_EVENTFD + // NOTE: unlike pipe we aren't limiting the number of events per read. + // However we do limit the number of queue pulls to what the count was at + // the time of this function firing. + if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } +#else + char buf[MAX_PIPE_EVENTS]; - item = cq_pop(me->ev_queue); - if (item == NULL) { + ev_count = read(fd, buf, MAX_PIPE_EVENTS); + if (ev_count == 0) { + if (settings.verbose > 0) + fprintf(stderr, "Can't read from libevent pipe\n"); return; } +#endif - switch (item->mode) { - case queue_new_conn: - c = conn_new(item->sfd, item->init_state, item->event_flags, - item->read_buffer_size, item->transport, - me->base, item->ssl); - if (c == NULL) { - if (IS_UDP(item->transport)) { - fprintf(stderr, "Can't listen for events on UDP socket\n"); - exit(1); - } else { - if (settings.verbose > 0) { - fprintf(stderr, "Can't listen for events on fd %d\n", - item->sfd); - } + for (int x = 0; x < ev_count; x++) { + item = cq_pop(me->ev_queue); + if (item == NULL) { + return; + } + + switch (item->mode) { + case queue_new_conn: + c = conn_new(item->sfd, item->init_state, item->event_flags, + item->read_buffer_size, item->transport, + me->base, item->ssl); + if (c == NULL) { + if (IS_UDP(item->transport)) { + fprintf(stderr, "Can't listen for events on UDP socket\n"); + exit(1); + } else { + if (settings.verbose > 0) { + fprintf(stderr, "Can't listen for events on fd %d\n", + item->sfd); + } #ifdef TLS - if (item->ssl) { - SSL_shutdown(item->ssl); - SSL_free(item->ssl); - } + if (item->ssl) { + SSL_shutdown(item->ssl); + SSL_free(item->ssl); + } #endif - close(item->sfd); - } - } else { - c->thread = me; + close(item->sfd); + } + } else { + c->thread = me; #ifdef EXTSTORE - if (c->thread->storage) { - conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, - storage_submit_cb, storage_complete_cb, storage_finalize_cb); - } + if (c->thread->storage) { + conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, + storage_submit_cb, storage_complete_cb, storage_finalize_cb); + } #endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); + conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); #ifdef TLS - if (settings.ssl_enabled && c->ssl != NULL) { - assert(c->thread && c->thread->ssl_wbuf); - c->ssl_wbuf = c->thread->ssl_wbuf; - } + if (settings.ssl_enabled && c->ssl != NULL) { + assert(c->thread && c->thread->ssl_wbuf); + c->ssl_wbuf = c->thread->ssl_wbuf; + } #endif - } - break; - case queue_pause: - /* we were told to pause and report in */ - register_thread_initialized(); - break; - case queue_timeout: - /* a client socket timed out */ - conn_close_idle(conns[item->sfd]); - break; - case queue_redispatch: - /* a side thread redispatched a client connection */ - conn_worker_readd(conns[item->sfd]); - break; - case queue_stop: - /* asked to stop */ - event_base_loopexit(me->base, NULL); - break; - } + } + break; + case queue_pause: + /* we were told to pause and report in */ + register_thread_initialized(); + break; + case queue_timeout: + /* a client socket timed out */ + conn_close_idle(conns[item->sfd]); + break; + case queue_redispatch: + /* a side thread redispatched a client connection */ + conn_worker_readd(conns[item->sfd]); + break; + case queue_stop: + /* asked to stop */ + event_base_loopexit(me->base, NULL); + break; + } - cqi_free(me->ev_queue, item); + cqi_free(me->ev_queue, item); + } } /* Which thread we assigned a connection to most recently. */ @@ -955,6 +988,13 @@ void memcached_thread_init(int nthreads, void *arg) { } for (i = 0; i < nthreads; i++) { +#ifdef HAVE_EVENTFD + threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK); + if (threads[i].notify_event_fd == -1) { + perror("failed creating eventfd for worker thread"); + exit(1); + } +#else int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); @@ -963,6 +1003,7 @@ void memcached_thread_init(int nthreads, void *arg) { threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; +#endif #ifdef EXTSTORE threads[i].storage = arg; #endif |