summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-07-29 16:29:22 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commitd89ecd3456138c226934ddcdde749e3ba90a45a7 (patch)
tree166dc6db7d2af4838d03fda65671b1dc9ea3b97c
parent331dca5d644edefd99893c44827bdf2ca72f85be (diff)
downloadmemcached-d89ecd3456138c226934ddcdde749e3ba90a45a7.tar.gz
thread: unify worker notify interface
worker notification was a mix of reading data from pipe or examining a an object queue stack. now it's all one interface. this is necessary to switch signalling to eventfd or similar, since we won't have that pipe to work with.
-rw-r--r--memcached.c8
-rw-r--r--memcached.h1
-rw-r--r--thread.c203
3 files changed, 106 insertions, 106 deletions
diff --git a/memcached.c b/memcached.c
index 60c027f..a029061 100644
--- a/memcached.c
+++ b/memcached.c
@@ -301,11 +301,9 @@ static pthread_cond_t conn_timeout_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t conn_timeout_lock = PTHREAD_MUTEX_INITIALIZER;
#define CONNS_PER_SLICE 100
-#define TIMEOUT_MSG_SIZE (1 + sizeof(int))
static void *conn_timeout_thread(void *arg) {
int i;
conn *c;
- char buf[TIMEOUT_MSG_SIZE];
rel_time_t oldest_last_cmd;
int sleep_time;
int sleep_slice = max_fds / CONNS_PER_SLICE;
@@ -341,11 +339,7 @@ static void *conn_timeout_thread(void *arg) {
continue;
if ((current_time - c->last_cmd_time) > settings.idle_timeout) {
- buf[0] = 't';
- memcpy(&buf[1], &i, sizeof(int));
- if (write(c->thread->notify_send_fd, buf, TIMEOUT_MSG_SIZE)
- != TIMEOUT_MSG_SIZE)
- perror("Failed to write timeout to notify pipe");
+ timeout_conn(c);
} else {
if (c->last_cmd_time < oldest_last_cmd)
oldest_last_cmd = c->last_cmd_time;
diff --git a/memcached.h b/memcached.h
index 4c91560..c475de0 100644
--- a/memcached.h
+++ b/memcached.h
@@ -883,6 +883,7 @@ extern int daemonize(int nochdir, int noclose);
*/
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
+void timeout_conn(conn *c);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
void sidethread_conn_close(conn *c);
diff --git a/thread.c b/thread.c
index 48c116a..b288d96 100644
--- a/thread.c
+++ b/thread.c
@@ -28,6 +28,10 @@
/* An item in the connection queue. */
enum conn_queue_item_modes {
queue_new_conn, /* brand new connection. */
+ queue_pause, /* pause thread */
+ queue_timeout, /* socket sfd timed out */
+ queue_redispatch, /* return conn from side thread */
+ queue_stop, /* exit thread */
};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
@@ -86,6 +90,10 @@ static int init_count = 0;
static pthread_mutex_t init_lock;
static pthread_cond_t init_cond;
+static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
+static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
+static CQ_ITEM *cqi_new(CQ *cq);
+static void cq_push(CQ *cq, CQ_ITEM *item);
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
@@ -135,10 +143,9 @@ static void register_thread_initialized(void) {
/* Must not be called with any deeper locks held */
void pause_threads(enum pause_thread_types type) {
- char buf[1];
int i;
+ bool pause_workers = false;
- buf[0] = 0;
switch (type) {
case PAUSE_ALL_THREADS:
slabs_rebalancer_pause();
@@ -149,7 +156,7 @@ void pause_threads(enum pause_thread_types type) {
storage_write_pause();
#endif
case PAUSE_WORKER_THREADS:
- buf[0] = 'p';
+ pause_workers = true;
pthread_mutex_lock(&worker_hang_lock);
break;
case RESUME_ALL_THREADS:
@@ -170,17 +177,14 @@ void pause_threads(enum pause_thread_types type) {
}
/* Only send a message if we have one. */
- if (buf[0] == 0) {
+ if (!pause_workers) {
return;
}
pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
- if (write(threads[i].notify_send_fd, buf, 1) != 1) {
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
+ notify_worker_fd(&threads[i], 0, queue_pause);
}
wait_for_thread_registration(settings.num_threads);
pthread_mutex_unlock(&init_lock);
@@ -191,7 +195,6 @@ void pause_threads(enum pause_thread_types type) {
// Note: listener thread is the "main" event base, which has exited its
// loop in order to call this function.
void stop_threads(void) {
- char buf[1];
int i;
// assoc can call pause_threads(), so we have to stop it first.
@@ -201,15 +204,12 @@ void stop_threads(void) {
if (settings.verbose > 0)
fprintf(stderr, "asking workers to stop\n");
- buf[0] = 's';
+
pthread_mutex_lock(&worker_hang_lock);
pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
- if (write(threads[i].notify_send_fd, buf, 1) != 1) {
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
+ notify_worker_fd(&threads[i], 0, queue_stop);
}
wait_for_thread_registration(settings.num_threads);
pthread_mutex_unlock(&init_lock);
@@ -318,6 +318,35 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
cache_free(cq->cache, item);
}
+static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
+ char buf[1];
+ cq_push(t->ev_queue, item);
+ buf[0] = 'c';
+ if (write(t->notify_send_fd, buf, 1) != 1) {
+ perror("Failed writing to notify pipe");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+}
+
+// NOTE: An external func that takes a conn *c might be cleaner overall.
+static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
+ CQ_ITEM *item;
+ while ( (item = cqi_new(t->ev_queue)) == NULL ) {
+ // NOTE: most callers of this function cannot fail, but mallocs in
+ // theory can fail. Small mallocs essentially never do without also
+ // killing the process. Syscalls can also fail but the original code
+ // never handled this either.
+ // As a compromise, I'm leaving this note and this loop: This alloc
+ // cannot fail, but pre-allocating the data is too much code in an
+ // area I want to keep more lean. If this CQ business becomes a more
+ // generic queue I'll reconsider.
+ }
+
+ item->mode = mode;
+ item->sfd = sfd;
+ notify_worker(t, item);
+}
+
/*
* Creates a worker thread.
*/
@@ -450,7 +479,7 @@ static void *worker_libevent(void *arg) {
/*
- * Processes an incoming "handle a new connection" item. This is called when
+ * Processes an incoming "connection event" item. This is called when
* input arrives on the libevent wakeup pipe.
*/
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
@@ -458,7 +487,6 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
CQ_ITEM *item;
char buf[1];
conn *c;
- unsigned int fd_from_pipe;
if (read(fd, buf, 1) != 1) {
if (settings.verbose > 0)
@@ -466,83 +494,70 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
return;
}
- switch (buf[0]) {
- case 'c':
- item = cq_pop(me->ev_queue);
+ item = cq_pop(me->ev_queue);
+ if (item == NULL) {
+ return;
+ }
- if (NULL == item) {
- break;
- }
- switch (item->mode) {
- case queue_new_conn:
- c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base, item->ssl);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket\n");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d\n",
- item->sfd);
- }
+ switch (item->mode) {
+ case queue_new_conn:
+ c = conn_new(item->sfd, item->init_state, item->event_flags,
+ item->read_buffer_size, item->transport,
+ me->base, item->ssl);
+ if (c == NULL) {
+ if (IS_UDP(item->transport)) {
+ fprintf(stderr, "Can't listen for events on UDP socket\n");
+ exit(1);
+ } else {
+ if (settings.verbose > 0) {
+ fprintf(stderr, "Can't listen for events on fd %d\n",
+ item->sfd);
+ }
#ifdef TLS
- if (item->ssl) {
- SSL_shutdown(item->ssl);
- SSL_free(item->ssl);
- }
-#endif
- close(item->sfd);
+ if (item->ssl) {
+ SSL_shutdown(item->ssl);
+ SSL_free(item->ssl);
}
- } else {
- c->thread = me;
+#endif
+ close(item->sfd);
+ }
+ } else {
+ c->thread = me;
#ifdef EXTSTORE
- if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, storage_finalize_cb);
- }
+ if (c->thread->storage) {
+ conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
+ storage_submit_cb, storage_complete_cb, storage_finalize_cb);
+ }
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS
- if (settings.ssl_enabled && c->ssl != NULL) {
- assert(c->thread && c->thread->ssl_wbuf);
- c->ssl_wbuf = c->thread->ssl_wbuf;
- }
-#endif
+ if (settings.ssl_enabled && c->ssl != NULL) {
+ assert(c->thread && c->thread->ssl_wbuf);
+ c->ssl_wbuf = c->thread->ssl_wbuf;
}
- break;
- }
- cqi_free(me->ev_queue, item);
- break;
- /* we were told to pause and report in */
- case 'p':
- register_thread_initialized();
- break;
- /* a client socket timed out */
- case 't':
- if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
- return;
- }
- conn_close_idle(conns[fd_from_pipe]);
- break;
- /* a side thread redispatched a client connection */
- case 'r':
- if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Can't read redispatch fd from libevent pipe\n");
- return;
- }
- conn_worker_readd(conns[fd_from_pipe]);
- break;
- /* asked to stop */
- case 's':
- event_base_loopexit(me->base, NULL);
- break;
+#endif
+ }
+ break;
+ case queue_pause:
+ /* we were told to pause and report in */
+ register_thread_initialized();
+ break;
+ case queue_timeout:
+ /* a client socket timed out */
+ conn_close_idle(conns[item->sfd]);
+ break;
+ case queue_redispatch:
+ /* a side thread redispatched a client connection */
+ conn_worker_readd(conns[item->sfd]);
+ break;
+ case queue_stop:
+ /* asked to stop */
+ event_base_loopexit(me->base, NULL);
+ break;
}
+
+ cqi_free(me->ev_queue, item);
}
/* Which thread we assigned a connection to most recently. */
@@ -627,7 +642,6 @@ select:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport, void *ssl) {
CQ_ITEM *item = NULL;
- char buf[1];
LIBEVENT_THREAD *thread;
if (!settings.num_napi_ids)
@@ -651,29 +665,20 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->mode = queue_new_conn;
item->ssl = ssl;
- cq_push(thread->ev_queue, item);
-
MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
- buf[0] = 'c';
- if (write(thread->notify_send_fd, buf, 1) != 1) {
- perror("Writing to thread notify pipe");
- }
+ notify_worker(thread, item);
}
/*
* Re-dispatches a connection back to the original thread. Can be called from
* any side thread borrowing a connection.
*/
-#define REDISPATCH_MSG_SIZE (1 + sizeof(int))
void redispatch_conn(conn *c) {
- char buf[REDISPATCH_MSG_SIZE];
- LIBEVENT_THREAD *thread = c->thread;
+ notify_worker_fd(c->thread, c->sfd, queue_redispatch);
+}
- buf[0] = 'r';
- memcpy(&buf[1], &c->sfd, sizeof(int));
- if (write(thread->notify_send_fd, buf, REDISPATCH_MSG_SIZE) != REDISPATCH_MSG_SIZE) {
- perror("Writing redispatch to thread notify pipe");
- }
+void timeout_conn(conn *c) {
+ notify_worker_fd(c->thread, c->sfd, queue_timeout);
}
/* This misses the allow_new_conns flag :( */