summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c157
1 files changed, 99 insertions, 58 deletions
diff --git a/thread.c b/thread.c
index b288d96..d969a09 100644
--- a/thread.c
+++ b/thread.c
@@ -6,6 +6,9 @@
#ifdef EXTSTORE
#include "storage.h"
#endif
+#ifdef HAVE_EVENTFD
+#include <sys/eventfd.h>
+#endif
#include <assert.h>
#include <stdio.h>
#include <errno.h>
@@ -319,13 +322,20 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
}
static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
- char buf[1];
cq_push(t->ev_queue, item);
- buf[0] = 'c';
+#ifdef HAVE_EVENTFD
+ uint64_t u = 1;
+ if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ perror("failed writing to worker eventfd");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+#else
+ char buf[1] = "c";
if (write(t->notify_send_fd, buf, 1) != 1) {
perror("Failed writing to notify pipe");
/* TODO: This is a fatal problem. Can it ever happen temporarily? */
}
+#endif
}
// NOTE: An external func that takes a conn *c might be cleaner overall.
@@ -393,8 +403,13 @@ static void setup_thread(LIBEVENT_THREAD *me) {
}
/* Listen for notifications from other threads */
+#ifdef HAVE_EVENTFD
+ event_set(&me->notify_event, me->notify_event_fd,
+ EV_READ | EV_PERSIST, thread_libevent_process, me);
+#else
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
+#endif
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
@@ -482,82 +497,100 @@ static void *worker_libevent(void *arg) {
* Processes an incoming "connection event" item. This is called when
* input arrives on the libevent wakeup pipe.
*/
+// Syscalls can be expensive enough that handling a few of them once here can
+// save both throughput and overall latency.
+#define MAX_PIPE_EVENTS 32
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
- char buf[1];
conn *c;
-
- if (read(fd, buf, 1) != 1) {
+ uint64_t ev_count = 0; // max number of events to loop through this run.
+#ifdef HAVE_EVENTFD
+ // NOTE: unlike pipe we aren't limiting the number of events per read.
+ // However we do limit the number of queue pulls to what the count was at
+ // the time of this function firing.
+ if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
return;
}
+#else
+ char buf[MAX_PIPE_EVENTS];
- item = cq_pop(me->ev_queue);
- if (item == NULL) {
+ ev_count = read(fd, buf, MAX_PIPE_EVENTS);
+ if (ev_count == 0) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Can't read from libevent pipe\n");
return;
}
+#endif
- switch (item->mode) {
- case queue_new_conn:
- c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base, item->ssl);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket\n");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d\n",
- item->sfd);
- }
+ for (int x = 0; x < ev_count; x++) {
+ item = cq_pop(me->ev_queue);
+ if (item == NULL) {
+ return;
+ }
+
+ switch (item->mode) {
+ case queue_new_conn:
+ c = conn_new(item->sfd, item->init_state, item->event_flags,
+ item->read_buffer_size, item->transport,
+ me->base, item->ssl);
+ if (c == NULL) {
+ if (IS_UDP(item->transport)) {
+ fprintf(stderr, "Can't listen for events on UDP socket\n");
+ exit(1);
+ } else {
+ if (settings.verbose > 0) {
+ fprintf(stderr, "Can't listen for events on fd %d\n",
+ item->sfd);
+ }
#ifdef TLS
- if (item->ssl) {
- SSL_shutdown(item->ssl);
- SSL_free(item->ssl);
- }
+ if (item->ssl) {
+ SSL_shutdown(item->ssl);
+ SSL_free(item->ssl);
+ }
#endif
- close(item->sfd);
- }
- } else {
- c->thread = me;
+ close(item->sfd);
+ }
+ } else {
+ c->thread = me;
#ifdef EXTSTORE
- if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, storage_finalize_cb);
- }
+ if (c->thread->storage) {
+ conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
+ storage_submit_cb, storage_complete_cb, storage_finalize_cb);
+ }
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS
- if (settings.ssl_enabled && c->ssl != NULL) {
- assert(c->thread && c->thread->ssl_wbuf);
- c->ssl_wbuf = c->thread->ssl_wbuf;
- }
+ if (settings.ssl_enabled && c->ssl != NULL) {
+ assert(c->thread && c->thread->ssl_wbuf);
+ c->ssl_wbuf = c->thread->ssl_wbuf;
+ }
#endif
- }
- break;
- case queue_pause:
- /* we were told to pause and report in */
- register_thread_initialized();
- break;
- case queue_timeout:
- /* a client socket timed out */
- conn_close_idle(conns[item->sfd]);
- break;
- case queue_redispatch:
- /* a side thread redispatched a client connection */
- conn_worker_readd(conns[item->sfd]);
- break;
- case queue_stop:
- /* asked to stop */
- event_base_loopexit(me->base, NULL);
- break;
- }
+ }
+ break;
+ case queue_pause:
+ /* we were told to pause and report in */
+ register_thread_initialized();
+ break;
+ case queue_timeout:
+ /* a client socket timed out */
+ conn_close_idle(conns[item->sfd]);
+ break;
+ case queue_redispatch:
+ /* a side thread redispatched a client connection */
+ conn_worker_readd(conns[item->sfd]);
+ break;
+ case queue_stop:
+ /* asked to stop */
+ event_base_loopexit(me->base, NULL);
+ break;
+ }
- cqi_free(me->ev_queue, item);
+ cqi_free(me->ev_queue, item);
+ }
}
/* Which thread we assigned a connection to most recently. */
@@ -955,6 +988,13 @@ void memcached_thread_init(int nthreads, void *arg) {
}
for (i = 0; i < nthreads; i++) {
+#ifdef HAVE_EVENTFD
+ threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (threads[i].notify_event_fd == -1) {
+ perror("failed creating eventfd for worker thread");
+ exit(1);
+ }
+#else
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
@@ -963,6 +1003,7 @@ void memcached_thread_init(int nthreads, void *arg) {
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
+#endif
#ifdef EXTSTORE
threads[i].storage = arg;
#endif