diff options
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 150 |
1 files changed, 101 insertions, 49 deletions
diff --git a/proxy_network.c b/proxy_network.c index 239b0d4..0334971 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { // paranoia about moving items between lists. io->io_next.stqe_next = NULL; - // Need to check on await's before looking at backends, in case it - // doesn't have one. - // Here we're letting an await resume without waiting on the network. - if (io->await_background) { - return_io_pending((io_pending_t *)io); - continue; - } - mcp_backend_t *be = io->backend; // So the backend can retrieve its event base. - be->event_thread = t; if (be->bad) { P_DEBUG("%s: fast failing request to bad backend\n", __func__); io->client_resp->status = MCMC_ERR; @@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) { _cleanup_backend(be); } else { be->transferred = true; - be->event_thread = t; int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it @@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { _cleanup_backend(be); } else { be->transferred = true; - be->event_thread = t; // assign the initial events to the backend, so we don't have to // constantly check if they were initialized yet elsewhere. // note these events will not fire until event_add() is called. int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); - event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); - event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); - event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); + event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be); if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it @@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { } } +void proxy_run_backend_queue(be_head_t *head) { + mcp_backend_t *be; + STAILQ_FOREACH(be, head, be_next) { + be->stacked = false; + int flags = 0; + + if (be->bad) { + // flush queue if backend is still bad. + // TODO: duplicated from _reset_bad_backend() + io_pending_proxy_t *io = NULL; + while (!STAILQ_EMPTY(&be->io_head)) { + io = STAILQ_FIRST(&be->io_head); + STAILQ_REMOVE_HEAD(&be->io_head, io_next); + io->client_resp->status = MCMC_ERR; + be->depth--; + return_io_pending((io_pending_t *)io); + } + } else if (be->connecting || be->validating) { + P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); + } else { + flags = _flush_pending_write(be); + + if (flags == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + } else if (flags & EV_WRITE) { + // only get here because we need to kick off the write handler + _start_write_event(be); + } + + if (be->pending_read) { + _start_timeout_event(be); + } + + } + } +} + // event handler for executing backend requests static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; @@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { } // Re-walk each backend and check set event as required. - mcp_backend_t *be = NULL; - - // FIXME (v2): _set_event() is buggy, see notes on function. - STAILQ_FOREACH(be, &t->be_head, be_next) { - be->stacked = false; - int flags = 0; - - if (be->connecting || be->validating) { - P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); - } else { - flags = _flush_pending_write(be); - - if (flags == -1) { - _reset_bad_backend(be, P_BE_FAIL_WRITING); - _backend_failed(be); - } else if (flags & EV_WRITE) { - // only get here because we need to kick off the write handler - _start_write_event(be); - } - - if (be->pending_read) { - _start_timeout_event(be); - } - } - } - + proxy_run_backend_queue(&t->be_head); } void *proxy_event_thread(void *arg) { @@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { // TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple // event threads. -void proxy_init_evthread_events(proxy_event_thread_t *t) { +// TODO: this either needs a restructure or split into two funcs: +// 1) for the IO thread which creates its own ring/event base +// 2) for the worker thread which reuses the event base. +// io_uring will probably only work for the IO thread which makes further +// exceptions. +void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) { + t->ctx = ctx; +#ifdef USE_EVENTFD + t->event_fd = eventfd(0, EFD_NONBLOCK); + if (t->event_fd == -1) { + perror("failed to create backend notify eventfd"); + exit(1); + } + t->be_event_fd = eventfd(0, EFD_NONBLOCK); + if (t->be_event_fd == -1) { + perror("failed to create backend notify eventfd"); + exit(1); + } +#else + int fds[2]; + if (pipe(fds)) { + perror("can't create proxy backend notify pipe"); + exit(1); + } + + t->notify_receive_fd = fds[0]; + t->notify_send_fd = fds[1]; + + if (pipe(fds)) { + perror("can't create proxy backend connection notify pipe"); + exit(1); + } + t->be_notify_receive_fd = fds[0]; + t->be_notify_send_fd = fds[1]; +#endif + + // incoming request queue. + STAILQ_INIT(&t->io_head_in); + STAILQ_INIT(&t->beconn_head_in); + pthread_mutex_init(&t->mutex, NULL); + pthread_cond_init(&t->cond, NULL); + + // initialize the event system. + #ifdef HAVE_LIBURING + fprintf(stderr, "Sorry, io_uring not supported right now\n"); + abort(); bool use_uring = t->ctx->use_uring; struct io_uring_params p = {0}; assert(t->event_fd); // uring only exists where eventfd also does. @@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { } #endif - struct event_config *ev_config; - ev_config = event_config_new(); - event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); - t->base = event_base_new_with_config(ev_config); - event_config_free(ev_config); - if (! t->base) { - fprintf(stderr, "Can't allocate event base\n"); - exit(1); + if (base == NULL) { + struct event_config *ev_config; + ev_config = event_config_new(); + event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); + t->base = event_base_new_with_config(ev_config); + event_config_free(ev_config); + if (! t->base) { + fprintf(stderr, "Can't allocate event base\n"); + exit(1); + } + } else { + // reusing an event base from a worker thread. + t->base = base; } // listen for notifications. |