diff options
Diffstat (limited to 'proto_proxy.c')
-rw-r--r-- | proto_proxy.c | 149 |
1 files changed, 74 insertions, 75 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 535800d..1acc920 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) { // NOTE: might need to differentiate the libs yes? proxy_register_libs(ctx, NULL, L); - // Create/start the backend threads, which we need before servers + // Create/start the IO thread, which we need before servers // start getting created. - // Supporting N event threads should be possible, but it will be a - // low number of N to avoid too many wakeup syscalls. - // For now we hardcode to 1. - proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t)); - ctx->proxy_threads = threads; - for (int i = 0; i < 1; i++) { - proxy_event_thread_t *t = &threads[i]; - t->ctx = ctx; -#ifdef USE_EVENTFD - t->event_fd = eventfd(0, EFD_NONBLOCK); - if (t->event_fd == -1) { - perror("failed to create backend notify eventfd"); - exit(1); - } - t->be_event_fd = eventfd(0, EFD_NONBLOCK); - if (t->be_event_fd == -1) { - perror("failed to create backend notify eventfd"); - exit(1); - } -#else - int fds[2]; - if (pipe(fds)) { - perror("can't create proxy backend notify pipe"); - exit(1); - } - - t->notify_receive_fd = fds[0]; - t->notify_send_fd = fds[1]; - - if (pipe(fds)) { - perror("can't create proxy backend connection notify pipe"); - exit(1); - } - t->be_notify_receive_fd = fds[0]; - t->be_notify_send_fd = fds[1]; -#endif - proxy_init_evthread_events(t); - - // incoming request queue. - STAILQ_INIT(&t->io_head_in); - STAILQ_INIT(&t->beconn_head_in); - pthread_mutex_init(&t->mutex, NULL); - pthread_cond_init(&t->cond, NULL); + proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t)); + ctx->proxy_io_thread = t; + proxy_init_event_thread(t, ctx, NULL); #ifdef HAVE_LIBURING - if (t->use_uring) { - pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); - } else { - pthread_create(&t->thread_id, NULL, proxy_event_thread, t); - } -#else + if (t->use_uring) { + pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); + } else { pthread_create(&t->thread_id, NULL, proxy_event_thread, t); -#endif // HAVE_LIBURING - thread_setname(t->thread_id, "mc-prx-io"); } +#else + pthread_create(&t->thread_id, NULL, proxy_event_thread, t); +#endif // HAVE_LIBURING + thread_setname(t->thread_id, "mc-prx-io"); _start_proxy_config_threads(ctx); return ctx; @@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) { thr->proxy_rng[x] = rand(); } - // kick off the configuration. - if (proxy_thread_loadconf(ctx, thr) != 0) { - exit(EXIT_FAILURE); - } + // Create a proxy event thread structure to piggyback on the worker. + proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t)); + thr->proxy_event_thread = t; + proxy_init_event_thread(t, ctx, thr->base); } // ctx_stack is a stack of io_pending_proxy_t's. void proxy_submit_cb(io_queue_t *q) { - proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads; + proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread; io_pending_proxy_t *p = q->stack_ctx; io_head_t head; + be_head_t w_head; // worker local stack. STAILQ_INIT(&head); + STAILQ_INIT(&w_head); // NOTE: responses get returned in the correct order no matter what, since // mc_resp's are linked. @@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) { // So for now we build the secondary list with an STAILQ, which // can be transplanted/etc. while (p) { - // insert into tail so head is oldest request. - STAILQ_INSERT_TAIL(&head, p, io_next); + mcp_backend_t *be; + P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p); if (p->is_await) { // need to not count await objects multiple times. - if (p->await_first) { + if (p->await_background) { + P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p); + // intercept await backgrounds + // this call cannot recurse if we're on the worker thread, + // since the worker thread has to finish executing this + // function in order to pick up the returned IO. + q->count++; + return_io_pending((io_pending_t *)p); + p = p->next; + continue; + } else if (p->await_first) { q->count++; } // funny workaround: awaiting IOP's don't count toward @@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) { } else { q->count++; } + be = p->backend; + + if (be->use_io_thread) { + // insert into tail so head is oldest request. + STAILQ_INSERT_TAIL(&head, p, io_next); + } else { + // emulate some of handler_dequeue() + STAILQ_INSERT_TAIL(&be->io_head, p, io_next); + if (be->io_next == NULL) { + be->io_next = p; + } + be->depth++; + if (!be->stacked) { + be->stacked = true; + be->be_next.stqe_next = NULL; // paranoia + STAILQ_INSERT_TAIL(&w_head, be, be_next); + } + } p = p->next; } @@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) { // clear out the submit queue so we can re-queue new IO's inline. q->stack_ctx = NULL; - // Transfer request stack to event thread. - pthread_mutex_lock(&e->mutex); - STAILQ_CONCAT(&e->io_head_in, &head); - // No point in holding the lock since we're not doing a cond signal. - pthread_mutex_unlock(&e->mutex); + if (!STAILQ_EMPTY(&head)) { + P_DEBUG("%s: submitting queue to IO thread\n", __func__); + // Transfer request stack to event thread. + pthread_mutex_lock(&e->mutex); + STAILQ_CONCAT(&e->io_head_in, &head); + // No point in holding the lock since we're not doing a cond signal. + pthread_mutex_unlock(&e->mutex); - // Signal to check queue. + // Signal to check queue. #ifdef USE_EVENTFD - uint64_t u = 1; - // TODO (v2): check result? is it ever possible to get a short write/failure - // for an eventfd? - if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { - assert(1 == 0); - } + uint64_t u = 1; + // TODO (v2): check result? is it ever possible to get a short write/failure + // for an eventfd? + if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + assert(1 == 0); + } #else - if (write(e->notify_send_fd, "w", 1) <= 0) { - assert(1 == 0); - } + if (write(e->notify_send_fd, "w", 1) <= 0) { + assert(1 == 0); + } #endif + } + if (!STAILQ_EMPTY(&w_head)) { + P_DEBUG("%s: running inline worker queue\n", __func__); + // emulating proxy_event_handler + proxy_run_backend_queue(&w_head); + } return; } @@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con if (cores == LUA_OK) { WSTAT_DECR(c->thread, proxy_req_active, 1); int type = lua_type(Lc, 1); + P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type); if (type == LUA_TUSERDATA) { mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response"); _set_noreply_mode(resp, r); @@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con } else if (cores == LUA_YIELD) { int coro_ref = 0; int yield_type = lua_tointeger(Lc, -1); + P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type); assert(yield_type != 0); lua_pop(Lc, 1); |