summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-13 15:22:26 -0800
committerdormando <dormando@rydia.net>2023-02-24 17:43:54 -0800
commit6442017c545a2a5ad076697b8695cd64bd32b542 (patch)
treea95c5443a72f5cfbe5b1c32fe5cf552da3201b0c /proxy_network.c
parent833a7234bbaed264a9973141850a23df4eb1b939 (diff)
downloadmemcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }` By default the IO thread is not used; instead a backend connection is created for each worker thread. This can be overridden by setting `iothread = true` when creating a pool. `mcp.pool(p, { dist = etc, beprefix = "etc" }` If a `beprefix` is added to pool arguments, it will create unique backend connections for this pool. This allows you to create multiple sockets per backend by making multiple pools with unique prefixes. There are legitimate use cases for sharing backend connections across different pools, which is why that is the default behavior.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c150
1 files changed, 101 insertions, 49 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 239b0d4..0334971 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
// paranoia about moving items between lists.
io->io_next.stqe_next = NULL;
- // Need to check on await's before looking at backends, in case it
- // doesn't have one.
- // Here we're letting an await resume without waiting on the network.
- if (io->await_background) {
- return_io_pending((io_pending_t *)io);
- continue;
- }
-
mcp_backend_t *be = io->backend;
// So the backend can retrieve its event base.
- be->event_thread = t;
if (be->bad) {
P_DEBUG("%s: fast failing request to bad backend\n", __func__);
io->client_resp->status = MCMC_ERR;
@@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
// assign the initial events to the backend, so we don't have to
// constantly check if they were initialized yet elsewhere.
// note these events will not fire until event_add() is called.
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
- event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
- event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
+ event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
}
}
+void proxy_run_backend_queue(be_head_t *head) {
+ mcp_backend_t *be;
+ STAILQ_FOREACH(be, head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->bad) {
+ // flush queue if backend is still bad.
+ // TODO: duplicated from _reset_bad_backend()
+ io_pending_proxy_t *io = NULL;
+ while (!STAILQ_EMPTY(&be->io_head)) {
+ io = STAILQ_FIRST(&be->io_head);
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ io->client_resp->status = MCMC_ERR;
+ be->depth--;
+ return_io_pending((io_pending_t *)io);
+ }
+ } else if (be->connecting || be->validating) {
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
+ } else {
+ flags = _flush_pending_write(be);
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ } else if (flags & EV_WRITE) {
+ // only get here because we need to kick off the write handler
+ _start_write_event(be);
+ }
+
+ if (be->pending_read) {
+ _start_timeout_event(be);
+ }
+
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
}
// Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // FIXME (v2): _set_event() is buggy, see notes on function.
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
- } else {
- flags = _flush_pending_write(be);
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed(be);
- } else if (flags & EV_WRITE) {
- // only get here because we need to kick off the write handler
- _start_write_event(be);
- }
-
- if (be->pending_read) {
- _start_timeout_event(be);
- }
- }
- }
-
+ proxy_run_backend_queue(&t->be_head);
}
void *proxy_event_thread(void *arg) {
@@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
// event threads.
-void proxy_init_evthread_events(proxy_event_thread_t *t) {
+// TODO: this either needs a restructure or split into two funcs:
+// 1) for the IO thread which creates its own ring/event base
+// 2) for the worker thread which reuses the event base.
+// io_uring will probably only work for the IO thread which makes further
+// exceptions.
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
+ t->ctx = ctx;
+#ifdef USE_EVENTFD
+ t->event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+#else
+ int fds[2];
+ if (pipe(fds)) {
+ perror("can't create proxy backend notify pipe");
+ exit(1);
+ }
+
+ t->notify_receive_fd = fds[0];
+ t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
+#endif
+
+ // incoming request queue.
+ STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
+ pthread_mutex_init(&t->mutex, NULL);
+ pthread_cond_init(&t->cond, NULL);
+
+ // initialize the event system.
+
#ifdef HAVE_LIBURING
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
bool use_uring = t->ctx->use_uring;
struct io_uring_params p = {0};
assert(t->event_fd); // uring only exists where eventfd also does.
@@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
}
#endif
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- t->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- if (! t->base) {
- fprintf(stderr, "Can't allocate event base\n");
- exit(1);
+ if (base == NULL) {
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+ } else {
+ // reusing an event base from a worker thread.
+ t->base = base;
}
// listen for notifications.