summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c149
1 files changed, 74 insertions, 75 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 535800d..1acc920 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) {
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(ctx, NULL, L);
- // Create/start the backend threads, which we need before servers
+ // Create/start the IO thread, which we need before servers
// start getting created.
- // Supporting N event threads should be possible, but it will be a
- // low number of N to avoid too many wakeup syscalls.
- // For now we hardcode to 1.
- proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
- ctx->proxy_threads = threads;
- for (int i = 0; i < 1; i++) {
- proxy_event_thread_t *t = &threads[i];
- t->ctx = ctx;
-#ifdef USE_EVENTFD
- t->event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
- t->be_event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->be_event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
-#else
- int fds[2];
- if (pipe(fds)) {
- perror("can't create proxy backend notify pipe");
- exit(1);
- }
-
- t->notify_receive_fd = fds[0];
- t->notify_send_fd = fds[1];
-
- if (pipe(fds)) {
- perror("can't create proxy backend connection notify pipe");
- exit(1);
- }
- t->be_notify_receive_fd = fds[0];
- t->be_notify_send_fd = fds[1];
-#endif
- proxy_init_evthread_events(t);
-
- // incoming request queue.
- STAILQ_INIT(&t->io_head_in);
- STAILQ_INIT(&t->beconn_head_in);
- pthread_mutex_init(&t->mutex, NULL);
- pthread_cond_init(&t->cond, NULL);
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ ctx->proxy_io_thread = t;
+ proxy_init_event_thread(t, ctx, NULL);
#ifdef HAVE_LIBURING
- if (t->use_uring) {
- pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
- } else {
- pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
- }
-#else
+ if (t->use_uring) {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
+ } else {
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
-#endif // HAVE_LIBURING
- thread_setname(t->thread_id, "mc-prx-io");
}
+#else
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+#endif // HAVE_LIBURING
+ thread_setname(t->thread_id, "mc-prx-io");
_start_proxy_config_threads(ctx);
return ctx;
@@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
thr->proxy_rng[x] = rand();
}
- // kick off the configuration.
- if (proxy_thread_loadconf(ctx, thr) != 0) {
- exit(EXIT_FAILURE);
- }
+ // Create a proxy event thread structure to piggyback on the worker.
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ thr->proxy_event_thread = t;
+ proxy_init_event_thread(t, ctx, thr->base);
}
// ctx_stack is a stack of io_pending_proxy_t's.
void proxy_submit_cb(io_queue_t *q) {
- proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
+ proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
io_pending_proxy_t *p = q->stack_ctx;
io_head_t head;
+ be_head_t w_head; // worker local stack.
STAILQ_INIT(&head);
+ STAILQ_INIT(&w_head);
// NOTE: responses get returned in the correct order no matter what, since
// mc_resp's are linked.
@@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) {
// So for now we build the secondary list with an STAILQ, which
// can be transplanted/etc.
while (p) {
- // insert into tail so head is oldest request.
- STAILQ_INSERT_TAIL(&head, p, io_next);
+ mcp_backend_t *be;
+ P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
if (p->is_await) {
// need to not count await objects multiple times.
- if (p->await_first) {
+ if (p->await_background) {
+ P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p);
+ // intercept await backgrounds
+ // this call cannot recurse if we're on the worker thread,
+ // since the worker thread has to finish executing this
+ // function in order to pick up the returned IO.
+ q->count++;
+ return_io_pending((io_pending_t *)p);
+ p = p->next;
+ continue;
+ } else if (p->await_first) {
q->count++;
}
// funny workaround: awaiting IOP's don't count toward
@@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) {
} else {
q->count++;
}
+ be = p->backend;
+
+ if (be->use_io_thread) {
+ // insert into tail so head is oldest request.
+ STAILQ_INSERT_TAIL(&head, p, io_next);
+ } else {
+ // emulate some of handler_dequeue()
+ STAILQ_INSERT_TAIL(&be->io_head, p, io_next);
+ if (be->io_next == NULL) {
+ be->io_next = p;
+ }
+ be->depth++;
+ if (!be->stacked) {
+ be->stacked = true;
+ be->be_next.stqe_next = NULL; // paranoia
+ STAILQ_INSERT_TAIL(&w_head, be, be_next);
+ }
+ }
p = p->next;
}
@@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) {
// clear out the submit queue so we can re-queue new IO's inline.
q->stack_ctx = NULL;
- // Transfer request stack to event thread.
- pthread_mutex_lock(&e->mutex);
- STAILQ_CONCAT(&e->io_head_in, &head);
- // No point in holding the lock since we're not doing a cond signal.
- pthread_mutex_unlock(&e->mutex);
+ if (!STAILQ_EMPTY(&head)) {
+ P_DEBUG("%s: submitting queue to IO thread\n", __func__);
+ // Transfer request stack to event thread.
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_CONCAT(&e->io_head_in, &head);
+ // No point in holding the lock since we're not doing a cond signal.
+ pthread_mutex_unlock(&e->mutex);
- // Signal to check queue.
+ // Signal to check queue.
#ifdef USE_EVENTFD
- uint64_t u = 1;
- // TODO (v2): check result? is it ever possible to get a short write/failure
- // for an eventfd?
- if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
- assert(1 == 0);
- }
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
#else
- if (write(e->notify_send_fd, "w", 1) <= 0) {
- assert(1 == 0);
- }
+ if (write(e->notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
#endif
+ }
+ if (!STAILQ_EMPTY(&w_head)) {
+ P_DEBUG("%s: running inline worker queue\n", __func__);
+ // emulating proxy_event_handler
+ proxy_run_backend_queue(&w_head);
+ }
return;
}
@@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (cores == LUA_OK) {
WSTAT_DECR(c->thread, proxy_req_active, 1);
int type = lua_type(Lc, 1);
+ P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response");
_set_noreply_mode(resp, r);
@@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else if (cores == LUA_YIELD) {
int coro_ref = 0;
int yield_type = lua_tointeger(Lc, -1);
+ P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
assert(yield_type != 0);
lua_pop(Lc, 1);