summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-13 15:22:26 -0800
committerdormando <dormando@rydia.net>2023-02-24 17:43:54 -0800
commit6442017c545a2a5ad076697b8695cd64bd32b542 (patch)
treea95c5443a72f5cfbe5b1c32fe5cf552da3201b0c /proto_proxy.c
parent833a7234bbaed264a9973141850a23df4eb1b939 (diff)
downloadmemcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }` By default the IO thread is not used; instead a backend connection is created for each worker thread. This can be overridden by setting `iothread = true` when creating a pool. `mcp.pool(p, { dist = etc, beprefix = "etc" }` If a `beprefix` is added to pool arguments, it will create unique backend connections for this pool. This allows you to create multiple sockets per backend by making multiple pools with unique prefixes. There are legitimate use cases for sharing backend connections across different pools, which is why that is the default behavior.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c149
1 files changed, 74 insertions, 75 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 535800d..1acc920 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) {
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(ctx, NULL, L);
- // Create/start the backend threads, which we need before servers
+ // Create/start the IO thread, which we need before servers
// start getting created.
- // Supporting N event threads should be possible, but it will be a
- // low number of N to avoid too many wakeup syscalls.
- // For now we hardcode to 1.
- proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
- ctx->proxy_threads = threads;
- for (int i = 0; i < 1; i++) {
- proxy_event_thread_t *t = &threads[i];
- t->ctx = ctx;
-#ifdef USE_EVENTFD
- t->event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
- t->be_event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->be_event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
-#else
- int fds[2];
- if (pipe(fds)) {
- perror("can't create proxy backend notify pipe");
- exit(1);
- }
-
- t->notify_receive_fd = fds[0];
- t->notify_send_fd = fds[1];
-
- if (pipe(fds)) {
- perror("can't create proxy backend connection notify pipe");
- exit(1);
- }
- t->be_notify_receive_fd = fds[0];
- t->be_notify_send_fd = fds[1];
-#endif
- proxy_init_evthread_events(t);
-
- // incoming request queue.
- STAILQ_INIT(&t->io_head_in);
- STAILQ_INIT(&t->beconn_head_in);
- pthread_mutex_init(&t->mutex, NULL);
- pthread_cond_init(&t->cond, NULL);
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ ctx->proxy_io_thread = t;
+ proxy_init_event_thread(t, ctx, NULL);
#ifdef HAVE_LIBURING
- if (t->use_uring) {
- pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
- } else {
- pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
- }
-#else
+ if (t->use_uring) {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
+ } else {
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
-#endif // HAVE_LIBURING
- thread_setname(t->thread_id, "mc-prx-io");
}
+#else
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+#endif // HAVE_LIBURING
+ thread_setname(t->thread_id, "mc-prx-io");
_start_proxy_config_threads(ctx);
return ctx;
@@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
thr->proxy_rng[x] = rand();
}
- // kick off the configuration.
- if (proxy_thread_loadconf(ctx, thr) != 0) {
- exit(EXIT_FAILURE);
- }
+ // Create a proxy event thread structure to piggyback on the worker.
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ thr->proxy_event_thread = t;
+ proxy_init_event_thread(t, ctx, thr->base);
}
// ctx_stack is a stack of io_pending_proxy_t's.
void proxy_submit_cb(io_queue_t *q) {
- proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
+ proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
io_pending_proxy_t *p = q->stack_ctx;
io_head_t head;
+ be_head_t w_head; // worker local stack.
STAILQ_INIT(&head);
+ STAILQ_INIT(&w_head);
// NOTE: responses get returned in the correct order no matter what, since
// mc_resp's are linked.
@@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) {
// So for now we build the secondary list with an STAILQ, which
// can be transplanted/etc.
while (p) {
- // insert into tail so head is oldest request.
- STAILQ_INSERT_TAIL(&head, p, io_next);
+ mcp_backend_t *be;
+ P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
if (p->is_await) {
// need to not count await objects multiple times.
- if (p->await_first) {
+ if (p->await_background) {
+ P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p);
+ // intercept await backgrounds
+ // this call cannot recurse if we're on the worker thread,
+ // since the worker thread has to finish executing this
+ // function in order to pick up the returned IO.
+ q->count++;
+ return_io_pending((io_pending_t *)p);
+ p = p->next;
+ continue;
+ } else if (p->await_first) {
q->count++;
}
// funny workaround: awaiting IOP's don't count toward
@@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) {
} else {
q->count++;
}
+ be = p->backend;
+
+ if (be->use_io_thread) {
+ // insert into tail so head is oldest request.
+ STAILQ_INSERT_TAIL(&head, p, io_next);
+ } else {
+ // emulate some of handler_dequeue()
+ STAILQ_INSERT_TAIL(&be->io_head, p, io_next);
+ if (be->io_next == NULL) {
+ be->io_next = p;
+ }
+ be->depth++;
+ if (!be->stacked) {
+ be->stacked = true;
+ be->be_next.stqe_next = NULL; // paranoia
+ STAILQ_INSERT_TAIL(&w_head, be, be_next);
+ }
+ }
p = p->next;
}
@@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) {
// clear out the submit queue so we can re-queue new IO's inline.
q->stack_ctx = NULL;
- // Transfer request stack to event thread.
- pthread_mutex_lock(&e->mutex);
- STAILQ_CONCAT(&e->io_head_in, &head);
- // No point in holding the lock since we're not doing a cond signal.
- pthread_mutex_unlock(&e->mutex);
+ if (!STAILQ_EMPTY(&head)) {
+ P_DEBUG("%s: submitting queue to IO thread\n", __func__);
+ // Transfer request stack to event thread.
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_CONCAT(&e->io_head_in, &head);
+ // No point in holding the lock since we're not doing a cond signal.
+ pthread_mutex_unlock(&e->mutex);
- // Signal to check queue.
+ // Signal to check queue.
#ifdef USE_EVENTFD
- uint64_t u = 1;
- // TODO (v2): check result? is it ever possible to get a short write/failure
- // for an eventfd?
- if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
- assert(1 == 0);
- }
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
#else
- if (write(e->notify_send_fd, "w", 1) <= 0) {
- assert(1 == 0);
- }
+ if (write(e->notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
#endif
+ }
+ if (!STAILQ_EMPTY(&w_head)) {
+ P_DEBUG("%s: running inline worker queue\n", __func__);
+ // emulating proxy_event_handler
+ proxy_run_backend_queue(&w_head);
+ }
return;
}
@@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (cores == LUA_OK) {
WSTAT_DECR(c->thread, proxy_req_active, 1);
int type = lua_type(Lc, 1);
+ P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response");
_set_noreply_mode(resp, r);
@@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else if (cores == LUA_YIELD) {
int coro_ref = 0;
int yield_type = lua_tointeger(Lc, -1);
+ P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
assert(yield_type != 0);
lua_pop(Lc, 1);