summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c150
1 files changed, 101 insertions, 49 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 239b0d4..0334971 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
// paranoia about moving items between lists.
io->io_next.stqe_next = NULL;
- // Need to check on await's before looking at backends, in case it
- // doesn't have one.
- // Here we're letting an await resume without waiting on the network.
- if (io->await_background) {
- return_io_pending((io_pending_t *)io);
- continue;
- }
-
mcp_backend_t *be = io->backend;
// So the backend can retrieve its event base.
- be->event_thread = t;
if (be->bad) {
P_DEBUG("%s: fast failing request to bad backend\n", __func__);
io->client_resp->status = MCMC_ERR;
@@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
// assign the initial events to the backend, so we don't have to
// constantly check if they were initialized yet elsewhere.
// note these events will not fire until event_add() is called.
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
- event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
- event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
+ event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
}
}
+void proxy_run_backend_queue(be_head_t *head) {
+ mcp_backend_t *be;
+ STAILQ_FOREACH(be, head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->bad) {
+ // flush queue if backend is still bad.
+ // TODO: duplicated from _reset_bad_backend()
+ io_pending_proxy_t *io = NULL;
+ while (!STAILQ_EMPTY(&be->io_head)) {
+ io = STAILQ_FIRST(&be->io_head);
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ io->client_resp->status = MCMC_ERR;
+ be->depth--;
+ return_io_pending((io_pending_t *)io);
+ }
+ } else if (be->connecting || be->validating) {
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
+ } else {
+ flags = _flush_pending_write(be);
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ } else if (flags & EV_WRITE) {
+ // only get here because we need to kick off the write handler
+ _start_write_event(be);
+ }
+
+ if (be->pending_read) {
+ _start_timeout_event(be);
+ }
+
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
}
// Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // FIXME (v2): _set_event() is buggy, see notes on function.
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
- } else {
- flags = _flush_pending_write(be);
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed(be);
- } else if (flags & EV_WRITE) {
- // only get here because we need to kick off the write handler
- _start_write_event(be);
- }
-
- if (be->pending_read) {
- _start_timeout_event(be);
- }
- }
- }
-
+ proxy_run_backend_queue(&t->be_head);
}
void *proxy_event_thread(void *arg) {
@@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
// event threads.
-void proxy_init_evthread_events(proxy_event_thread_t *t) {
+// TODO: this either needs a restructure or split into two funcs:
+// 1) for the IO thread which creates its own ring/event base
+// 2) for the worker thread which reuses the event base.
+// io_uring will probably only work for the IO thread which makes further
+// exceptions.
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
+ t->ctx = ctx;
+#ifdef USE_EVENTFD
+ t->event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+#else
+ int fds[2];
+ if (pipe(fds)) {
+ perror("can't create proxy backend notify pipe");
+ exit(1);
+ }
+
+ t->notify_receive_fd = fds[0];
+ t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
+#endif
+
+ // incoming request queue.
+ STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
+ pthread_mutex_init(&t->mutex, NULL);
+ pthread_cond_init(&t->cond, NULL);
+
+ // initialize the event system.
+
#ifdef HAVE_LIBURING
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
bool use_uring = t->ctx->use_uring;
struct io_uring_params p = {0};
assert(t->event_fd); // uring only exists where eventfd also does.
@@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
}
#endif
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- t->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- if (! t->base) {
- fprintf(stderr, "Can't allocate event base\n");
- exit(1);
+ if (base == NULL) {
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+ } else {
+ // reusing an event base from a worker thread.
+ t->base = base;
}
// listen for notifications.