diff options
author | dormando <dormando@rydia.net> | 2021-07-29 16:29:22 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | d89ecd3456138c226934ddcdde749e3ba90a45a7 (patch) | |
tree | 166dc6db7d2af4838d03fda65671b1dc9ea3b97c | |
parent | 331dca5d644edefd99893c44827bdf2ca72f85be (diff) | |
download | memcached-d89ecd3456138c226934ddcdde749e3ba90a45a7.tar.gz |
thread: unify worker notify interface
worker notification was a mix of reading data from pipe or examining a
an object queue stack. now it's all one interface. this is necessary to
switch signalling to eventfd or similar, since we won't have that pipe
to work with.
-rw-r--r-- | memcached.c | 8 | ||||
-rw-r--r-- | memcached.h | 1 | ||||
-rw-r--r-- | thread.c | 203 |
3 files changed, 106 insertions, 106 deletions
diff --git a/memcached.c b/memcached.c index 60c027f..a029061 100644 --- a/memcached.c +++ b/memcached.c @@ -301,11 +301,9 @@ static pthread_cond_t conn_timeout_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t conn_timeout_lock = PTHREAD_MUTEX_INITIALIZER; #define CONNS_PER_SLICE 100 -#define TIMEOUT_MSG_SIZE (1 + sizeof(int)) static void *conn_timeout_thread(void *arg) { int i; conn *c; - char buf[TIMEOUT_MSG_SIZE]; rel_time_t oldest_last_cmd; int sleep_time; int sleep_slice = max_fds / CONNS_PER_SLICE; @@ -341,11 +339,7 @@ static void *conn_timeout_thread(void *arg) { continue; if ((current_time - c->last_cmd_time) > settings.idle_timeout) { - buf[0] = 't'; - memcpy(&buf[1], &i, sizeof(int)); - if (write(c->thread->notify_send_fd, buf, TIMEOUT_MSG_SIZE) - != TIMEOUT_MSG_SIZE) - perror("Failed to write timeout to notify pipe"); + timeout_conn(c); } else { if (c->last_cmd_time < oldest_last_cmd) oldest_last_cmd = c->last_cmd_time; diff --git a/memcached.h b/memcached.h index 4c91560..c475de0 100644 --- a/memcached.h +++ b/memcached.h @@ -883,6 +883,7 @@ extern int daemonize(int nochdir, int noclose); */ void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); +void timeout_conn(conn *c); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); void sidethread_conn_close(conn *c); @@ -28,6 +28,10 @@ /* An item in the connection queue. */ enum conn_queue_item_modes { queue_new_conn, /* brand new connection. */ + queue_pause, /* pause thread */ + queue_timeout, /* socket sfd timed out */ + queue_redispatch, /* return conn from side thread */ + queue_stop, /* exit thread */ }; typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { @@ -86,6 +90,10 @@ static int init_count = 0; static pthread_mutex_t init_lock; static pthread_cond_t init_cond; +static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item); +static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode); +static CQ_ITEM *cqi_new(CQ *cq); +static void cq_push(CQ *cq, CQ_ITEM *item); static void thread_libevent_process(evutil_socket_t fd, short which, void *arg); @@ -135,10 +143,9 @@ static void register_thread_initialized(void) { /* Must not be called with any deeper locks held */ void pause_threads(enum pause_thread_types type) { - char buf[1]; int i; + bool pause_workers = false; - buf[0] = 0; switch (type) { case PAUSE_ALL_THREADS: slabs_rebalancer_pause(); @@ -149,7 +156,7 @@ void pause_threads(enum pause_thread_types type) { storage_write_pause(); #endif case PAUSE_WORKER_THREADS: - buf[0] = 'p'; + pause_workers = true; pthread_mutex_lock(&worker_hang_lock); break; case RESUME_ALL_THREADS: @@ -170,17 +177,14 @@ void pause_threads(enum pause_thread_types type) { } /* Only send a message if we have one. */ - if (buf[0] == 0) { + if (!pause_workers) { return; } pthread_mutex_lock(&init_lock); init_count = 0; for (i = 0; i < settings.num_threads; i++) { - if (write(threads[i].notify_send_fd, buf, 1) != 1) { - perror("Failed writing to notify pipe"); - /* TODO: This is a fatal problem. Can it ever happen temporarily? */ - } + notify_worker_fd(&threads[i], 0, queue_pause); } wait_for_thread_registration(settings.num_threads); pthread_mutex_unlock(&init_lock); @@ -191,7 +195,6 @@ void pause_threads(enum pause_thread_types type) { // Note: listener thread is the "main" event base, which has exited its // loop in order to call this function. void stop_threads(void) { - char buf[1]; int i; // assoc can call pause_threads(), so we have to stop it first. @@ -201,15 +204,12 @@ void stop_threads(void) { if (settings.verbose > 0) fprintf(stderr, "asking workers to stop\n"); - buf[0] = 's'; + pthread_mutex_lock(&worker_hang_lock); pthread_mutex_lock(&init_lock); init_count = 0; for (i = 0; i < settings.num_threads; i++) { - if (write(threads[i].notify_send_fd, buf, 1) != 1) { - perror("Failed writing to notify pipe"); - /* TODO: This is a fatal problem. Can it ever happen temporarily? */ - } + notify_worker_fd(&threads[i], 0, queue_stop); } wait_for_thread_registration(settings.num_threads); pthread_mutex_unlock(&init_lock); @@ -318,6 +318,35 @@ static void cqi_free(CQ *cq, CQ_ITEM *item) { cache_free(cq->cache, item); } +static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) { + char buf[1]; + cq_push(t->ev_queue, item); + buf[0] = 'c'; + if (write(t->notify_send_fd, buf, 1) != 1) { + perror("Failed writing to notify pipe"); + /* TODO: This is a fatal problem. Can it ever happen temporarily? */ + } +} + +// NOTE: An external func that takes a conn *c might be cleaner overall. +static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) { + CQ_ITEM *item; + while ( (item = cqi_new(t->ev_queue)) == NULL ) { + // NOTE: most callers of this function cannot fail, but mallocs in + // theory can fail. Small mallocs essentially never do without also + // killing the process. Syscalls can also fail but the original code + // never handled this either. + // As a compromise, I'm leaving this note and this loop: This alloc + // cannot fail, but pre-allocating the data is too much code in an + // area I want to keep more lean. If this CQ business becomes a more + // generic queue I'll reconsider. + } + + item->mode = mode; + item->sfd = sfd; + notify_worker(t, item); +} + /* * Creates a worker thread. */ @@ -450,7 +479,7 @@ static void *worker_libevent(void *arg) { /* - * Processes an incoming "handle a new connection" item. This is called when + * Processes an incoming "connection event" item. This is called when * input arrives on the libevent wakeup pipe. */ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) { @@ -458,7 +487,6 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) CQ_ITEM *item; char buf[1]; conn *c; - unsigned int fd_from_pipe; if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) @@ -466,83 +494,70 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) return; } - switch (buf[0]) { - case 'c': - item = cq_pop(me->ev_queue); + item = cq_pop(me->ev_queue); + if (item == NULL) { + return; + } - if (NULL == item) { - break; - } - switch (item->mode) { - case queue_new_conn: - c = conn_new(item->sfd, item->init_state, item->event_flags, - item->read_buffer_size, item->transport, - me->base, item->ssl); - if (c == NULL) { - if (IS_UDP(item->transport)) { - fprintf(stderr, "Can't listen for events on UDP socket\n"); - exit(1); - } else { - if (settings.verbose > 0) { - fprintf(stderr, "Can't listen for events on fd %d\n", - item->sfd); - } + switch (item->mode) { + case queue_new_conn: + c = conn_new(item->sfd, item->init_state, item->event_flags, + item->read_buffer_size, item->transport, + me->base, item->ssl); + if (c == NULL) { + if (IS_UDP(item->transport)) { + fprintf(stderr, "Can't listen for events on UDP socket\n"); + exit(1); + } else { + if (settings.verbose > 0) { + fprintf(stderr, "Can't listen for events on fd %d\n", + item->sfd); + } #ifdef TLS - if (item->ssl) { - SSL_shutdown(item->ssl); - SSL_free(item->ssl); - } -#endif - close(item->sfd); + if (item->ssl) { + SSL_shutdown(item->ssl); + SSL_free(item->ssl); } - } else { - c->thread = me; +#endif + close(item->sfd); + } + } else { + c->thread = me; #ifdef EXTSTORE - if (c->thread->storage) { - conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, - storage_submit_cb, storage_complete_cb, storage_finalize_cb); - } + if (c->thread->storage) { + conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage, + storage_submit_cb, storage_complete_cb, storage_finalize_cb); + } #endif - conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); + conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL); #ifdef TLS - if (settings.ssl_enabled && c->ssl != NULL) { - assert(c->thread && c->thread->ssl_wbuf); - c->ssl_wbuf = c->thread->ssl_wbuf; - } -#endif + if (settings.ssl_enabled && c->ssl != NULL) { + assert(c->thread && c->thread->ssl_wbuf); + c->ssl_wbuf = c->thread->ssl_wbuf; } - break; - } - cqi_free(me->ev_queue, item); - break; - /* we were told to pause and report in */ - case 'p': - register_thread_initialized(); - break; - /* a client socket timed out */ - case 't': - if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) { - if (settings.verbose > 0) - fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); - return; - } - conn_close_idle(conns[fd_from_pipe]); - break; - /* a side thread redispatched a client connection */ - case 'r': - if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) { - if (settings.verbose > 0) - fprintf(stderr, "Can't read redispatch fd from libevent pipe\n"); - return; - } - conn_worker_readd(conns[fd_from_pipe]); - break; - /* asked to stop */ - case 's': - event_base_loopexit(me->base, NULL); - break; +#endif + } + break; + case queue_pause: + /* we were told to pause and report in */ + register_thread_initialized(); + break; + case queue_timeout: + /* a client socket timed out */ + conn_close_idle(conns[item->sfd]); + break; + case queue_redispatch: + /* a side thread redispatched a client connection */ + conn_worker_readd(conns[item->sfd]); + break; + case queue_stop: + /* asked to stop */ + event_base_loopexit(me->base, NULL); + break; } + + cqi_free(me->ev_queue, item); } /* Which thread we assigned a connection to most recently. */ @@ -627,7 +642,6 @@ select: void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl) { CQ_ITEM *item = NULL; - char buf[1]; LIBEVENT_THREAD *thread; if (!settings.num_napi_ids) @@ -651,29 +665,20 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, item->mode = queue_new_conn; item->ssl = ssl; - cq_push(thread->ev_queue, item); - MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id); - buf[0] = 'c'; - if (write(thread->notify_send_fd, buf, 1) != 1) { - perror("Writing to thread notify pipe"); - } + notify_worker(thread, item); } /* * Re-dispatches a connection back to the original thread. Can be called from * any side thread borrowing a connection. */ -#define REDISPATCH_MSG_SIZE (1 + sizeof(int)) void redispatch_conn(conn *c) { - char buf[REDISPATCH_MSG_SIZE]; - LIBEVENT_THREAD *thread = c->thread; + notify_worker_fd(c->thread, c->sfd, queue_redispatch); +} - buf[0] = 'r'; - memcpy(&buf[1], &c->sfd, sizeof(int)); - if (write(thread->notify_send_fd, buf, REDISPATCH_MSG_SIZE) != REDISPATCH_MSG_SIZE) { - perror("Writing redispatch to thread notify pipe"); - } +void timeout_conn(conn *c) { + notify_worker_fd(c->thread, c->sfd, queue_timeout); } /* This misses the allow_new_conns flag :( */ |