summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-07-29 16:29:22 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commitd89ecd3456138c226934ddcdde749e3ba90a45a7 (patch)
tree166dc6db7d2af4838d03fda65671b1dc9ea3b97c /thread.c
parent331dca5d644edefd99893c44827bdf2ca72f85be (diff)
downloadmemcached-d89ecd3456138c226934ddcdde749e3ba90a45a7.tar.gz
thread: unify worker notify interface
worker notification was a mix of reading data from pipe or examining a an object queue stack. now it's all one interface. this is necessary to switch signalling to eventfd or similar, since we won't have that pipe to work with.
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c203
1 files changed, 104 insertions, 99 deletions
diff --git a/thread.c b/thread.c
index 48c116a..b288d96 100644
--- a/thread.c
+++ b/thread.c
@@ -28,6 +28,10 @@
/* An item in the connection queue. */
enum conn_queue_item_modes {
queue_new_conn, /* brand new connection. */
+ queue_pause, /* pause thread */
+ queue_timeout, /* socket sfd timed out */
+ queue_redispatch, /* return conn from side thread */
+ queue_stop, /* exit thread */
};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
@@ -86,6 +90,10 @@ static int init_count = 0;
static pthread_mutex_t init_lock;
static pthread_cond_t init_cond;
+static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
+static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
+static CQ_ITEM *cqi_new(CQ *cq);
+static void cq_push(CQ *cq, CQ_ITEM *item);
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
@@ -135,10 +143,9 @@ static void register_thread_initialized(void) {
/* Must not be called with any deeper locks held */
void pause_threads(enum pause_thread_types type) {
- char buf[1];
int i;
+ bool pause_workers = false;
- buf[0] = 0;
switch (type) {
case PAUSE_ALL_THREADS:
slabs_rebalancer_pause();
@@ -149,7 +156,7 @@ void pause_threads(enum pause_thread_types type) {
storage_write_pause();
#endif
case PAUSE_WORKER_THREADS:
- buf[0] = 'p';
+ pause_workers = true;
pthread_mutex_lock(&worker_hang_lock);
break;
case RESUME_ALL_THREADS:
@@ -170,17 +177,14 @@ void pause_threads(enum pause_thread_types type) {
}
/* Only send a message if we have one. */
- if (buf[0] == 0) {
+ if (!pause_workers) {
return;
}
pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
- if (write(threads[i].notify_send_fd, buf, 1) != 1) {
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
+ notify_worker_fd(&threads[i], 0, queue_pause);
}
wait_for_thread_registration(settings.num_threads);
pthread_mutex_unlock(&init_lock);
@@ -191,7 +195,6 @@ void pause_threads(enum pause_thread_types type) {
// Note: listener thread is the "main" event base, which has exited its
// loop in order to call this function.
void stop_threads(void) {
- char buf[1];
int i;
// assoc can call pause_threads(), so we have to stop it first.
@@ -201,15 +204,12 @@ void stop_threads(void) {
if (settings.verbose > 0)
fprintf(stderr, "asking workers to stop\n");
- buf[0] = 's';
+
pthread_mutex_lock(&worker_hang_lock);
pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
- if (write(threads[i].notify_send_fd, buf, 1) != 1) {
- perror("Failed writing to notify pipe");
- /* TODO: This is a fatal problem. Can it ever happen temporarily? */
- }
+ notify_worker_fd(&threads[i], 0, queue_stop);
}
wait_for_thread_registration(settings.num_threads);
pthread_mutex_unlock(&init_lock);
@@ -318,6 +318,35 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) {
cache_free(cq->cache, item);
}
+static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
+ char buf[1];
+ cq_push(t->ev_queue, item);
+ buf[0] = 'c';
+ if (write(t->notify_send_fd, buf, 1) != 1) {
+ perror("Failed writing to notify pipe");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+}
+
+// NOTE: An external func that takes a conn *c might be cleaner overall.
+static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
+ CQ_ITEM *item;
+ while ( (item = cqi_new(t->ev_queue)) == NULL ) {
+ // NOTE: most callers of this function cannot fail, but mallocs in
+ // theory can fail. Small mallocs essentially never do without also
+ // killing the process. Syscalls can also fail but the original code
+ // never handled this either.
+ // As a compromise, I'm leaving this note and this loop: This alloc
+ // cannot fail, but pre-allocating the data is too much code in an
+ // area I want to keep more lean. If this CQ business becomes a more
+ // generic queue I'll reconsider.
+ }
+
+ item->mode = mode;
+ item->sfd = sfd;
+ notify_worker(t, item);
+}
+
/*
* Creates a worker thread.
*/
@@ -450,7 +479,7 @@ static void *worker_libevent(void *arg) {
/*
- * Processes an incoming "handle a new connection" item. This is called when
+ * Processes an incoming "connection event" item. This is called when
* input arrives on the libevent wakeup pipe.
*/
static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
@@ -458,7 +487,6 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
CQ_ITEM *item;
char buf[1];
conn *c;
- unsigned int fd_from_pipe;
if (read(fd, buf, 1) != 1) {
if (settings.verbose > 0)
@@ -466,83 +494,70 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
return;
}
- switch (buf[0]) {
- case 'c':
- item = cq_pop(me->ev_queue);
+ item = cq_pop(me->ev_queue);
+ if (item == NULL) {
+ return;
+ }
- if (NULL == item) {
- break;
- }
- switch (item->mode) {
- case queue_new_conn:
- c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base, item->ssl);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket\n");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d\n",
- item->sfd);
- }
+ switch (item->mode) {
+ case queue_new_conn:
+ c = conn_new(item->sfd, item->init_state, item->event_flags,
+ item->read_buffer_size, item->transport,
+ me->base, item->ssl);
+ if (c == NULL) {
+ if (IS_UDP(item->transport)) {
+ fprintf(stderr, "Can't listen for events on UDP socket\n");
+ exit(1);
+ } else {
+ if (settings.verbose > 0) {
+ fprintf(stderr, "Can't listen for events on fd %d\n",
+ item->sfd);
+ }
#ifdef TLS
- if (item->ssl) {
- SSL_shutdown(item->ssl);
- SSL_free(item->ssl);
- }
-#endif
- close(item->sfd);
+ if (item->ssl) {
+ SSL_shutdown(item->ssl);
+ SSL_free(item->ssl);
}
- } else {
- c->thread = me;
+#endif
+ close(item->sfd);
+ }
+ } else {
+ c->thread = me;
#ifdef EXTSTORE
- if (c->thread->storage) {
- conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
- storage_submit_cb, storage_complete_cb, storage_finalize_cb);
- }
+ if (c->thread->storage) {
+ conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
+ storage_submit_cb, storage_complete_cb, storage_finalize_cb);
+ }
#endif
- conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
+ conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS
- if (settings.ssl_enabled && c->ssl != NULL) {
- assert(c->thread && c->thread->ssl_wbuf);
- c->ssl_wbuf = c->thread->ssl_wbuf;
- }
-#endif
+ if (settings.ssl_enabled && c->ssl != NULL) {
+ assert(c->thread && c->thread->ssl_wbuf);
+ c->ssl_wbuf = c->thread->ssl_wbuf;
}
- break;
- }
- cqi_free(me->ev_queue, item);
- break;
- /* we were told to pause and report in */
- case 'p':
- register_thread_initialized();
- break;
- /* a client socket timed out */
- case 't':
- if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
- return;
- }
- conn_close_idle(conns[fd_from_pipe]);
- break;
- /* a side thread redispatched a client connection */
- case 'r':
- if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
- if (settings.verbose > 0)
- fprintf(stderr, "Can't read redispatch fd from libevent pipe\n");
- return;
- }
- conn_worker_readd(conns[fd_from_pipe]);
- break;
- /* asked to stop */
- case 's':
- event_base_loopexit(me->base, NULL);
- break;
+#endif
+ }
+ break;
+ case queue_pause:
+ /* we were told to pause and report in */
+ register_thread_initialized();
+ break;
+ case queue_timeout:
+ /* a client socket timed out */
+ conn_close_idle(conns[item->sfd]);
+ break;
+ case queue_redispatch:
+ /* a side thread redispatched a client connection */
+ conn_worker_readd(conns[item->sfd]);
+ break;
+ case queue_stop:
+ /* asked to stop */
+ event_base_loopexit(me->base, NULL);
+ break;
}
+
+ cqi_free(me->ev_queue, item);
}
/* Which thread we assigned a connection to most recently. */
@@ -627,7 +642,6 @@ select:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport, void *ssl) {
CQ_ITEM *item = NULL;
- char buf[1];
LIBEVENT_THREAD *thread;
if (!settings.num_napi_ids)
@@ -651,29 +665,20 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->mode = queue_new_conn;
item->ssl = ssl;
- cq_push(thread->ev_queue, item);
-
MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
- buf[0] = 'c';
- if (write(thread->notify_send_fd, buf, 1) != 1) {
- perror("Writing to thread notify pipe");
- }
+ notify_worker(thread, item);
}
/*
* Re-dispatches a connection back to the original thread. Can be called from
* any side thread borrowing a connection.
*/
-#define REDISPATCH_MSG_SIZE (1 + sizeof(int))
void redispatch_conn(conn *c) {
- char buf[REDISPATCH_MSG_SIZE];
- LIBEVENT_THREAD *thread = c->thread;
+ notify_worker_fd(c->thread, c->sfd, queue_redispatch);
+}
- buf[0] = 'r';
- memcpy(&buf[1], &c->sfd, sizeof(int));
- if (write(thread->notify_send_fd, buf, REDISPATCH_MSG_SIZE) != REDISPATCH_MSG_SIZE) {
- perror("Writing redispatch to thread notify pipe");
- }
+void timeout_conn(conn *c) {
+ notify_worker_fd(c->thread, c->sfd, queue_timeout);
}
/* This misses the allow_new_conns flag :( */