diff options
author | dormando <dormando@rydia.net> | 2022-12-13 12:48:14 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-12-13 12:48:14 -0800 |
commit | 8b98647d1ad80dee3ddb49dfc51cd7bd07fe9297 (patch) | |
tree | 3a12dd2af7f5de130a1fd0034d80716416c7685b /proxy_network.c | |
parent | d401611ba88db17c38fedf97d336f8085ce24bab (diff) | |
download | memcached-8b98647d1ad80dee3ddb49dfc51cd7bd07fe9297.tar.gz |
proxy: make io_uring backend work again
updates the io_uring code to match the updates on the libevent side.
needs more work before merge:
- auditing error conditions
- try harder for some code deduplication
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 368 |
1 files changed, 311 insertions, 57 deletions
diff --git a/proxy_network.c b/proxy_network.c index 000666d..8ce4bab 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -42,11 +42,69 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _flush_pending_write(mcp_backend_t *be); +static void _cleanup_backend(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); static void _backend_failed(mcp_backend_t *be); static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); static int proxy_backend_drive_machine(mcp_backend_t *be); +/* Helper routines common to io_uring and libevent modes */ + +// TODO (v3): doing an inline syscall here, not ideal for uring mode. +// leaving for now since this should be extremely uncommon. +static int _beconn_send_validate(mcp_backend_t *be) { + const char *str = "version\r\n"; + const ssize_t len = strlen(str); + + ssize_t res = write(mcmc_fd(be->client), str, len); + + if (res == -1) { + return -1; + } + + // I'm making an opinionated statement that we should be able to write + // "version\r\n" into a fresh socket without hitting EAGAIN. + if (res < len) { + return -1; + } + + return 1; +} + +// FIXME: make _backend_failed conditionally use _ur() so we can have one call +// in the code and reuse more code like this. +static int _proxy_beconn_checkconnect(mcp_backend_t *be) { + int err = 0; + // We were connecting, now ensure we're properly connected. + if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); + // kick the bad backend, clear the queue, retry later. + // FIXME (v2): if a connect fails, anything currently in the queue + // should be safe to hold up until their timeout. + _reset_bad_backend(be, P_BE_FAIL_CONNECTING); + _backend_failed(be); + return -1; + } + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); + be->connecting = false; + be->state = mcp_backend_read; + be->bad = false; + be->failed_count = 0; + + be->validating = true; + // TODO: make validation optional. + + if (_beconn_send_validate(be) == -1) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed(be); + return -1; + } else { + // buffer should be empty during validation stage. + assert(be->rbufused == 0); + return 0; + } +} + static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { io_head_t head; @@ -108,7 +166,10 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_retry(mcp_backend_t *be); +static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts); +static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t); +static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t); static void _proxy_evthr_evset_clock(proxy_event_thread_t *t); static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe); struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0}; @@ -136,7 +197,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); } static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { @@ -160,8 +221,10 @@ static void _backend_failed_ur(mcp_backend_t *be) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _proxy_evthr_evset_be_retry(be); + STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur); + _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); + STAT_INCR(be->event_thread->ctx, backend_failed, 1); } } @@ -172,21 +235,16 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { // Error or disconnection. if (bread <= 0) { _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); - // NOTE: Not calling backed_failed here since if the backend is busted - // it should be caught by the connect routine. - // This is probably not _always_ true in practice. Leaving this note - // so I can re-evaluate later. + _backend_failed_ur(be); return; } be->rbufused += bread; int res = proxy_backend_drive_machine(be); - if (res > 0) { - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); - return; - } else if (res == -1) { - _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + if (res != 0) { + _reset_bad_backend(be, res); + _backend_failed_ur(be); return; } @@ -232,6 +290,157 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); } +// a backend with an outstanding new connection has become writeable. +// check validity. +// TODO: this gets an error if cancelled right? check it? +static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + int err = 0; + assert(be->connecting); +/* if (_proxy_beconn_checkconnect(be) == -1) { + return; + } */ + + // We were connecting, now ensure we're properly connected. + if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); + // kick the bad backend, clear the queue, retry later. + // FIXME (v2): if a connect fails, anything currently in the queue + // should be safe to hold up until their timeout. + _reset_bad_backend(be, P_BE_FAIL_CONNECTING); + _backend_failed_ur(be); + return; + } + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); + be->connecting = false; + be->state = mcp_backend_read; + be->bad = false; + be->failed_count = 0; + + be->validating = true; + // TODO: make validation optional. + + if (_beconn_send_validate(be) == -1) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed_ur(be); + return; + } else { + // buffer should be empty during validation stage. + assert(be->rbufused == 0); + } + + // TODO: make validation optional. + // set next handler on recv for validity check. + _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); +} + +// TODO: share more code with proxy_beconn_handler +static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + mcmc_resp_t r; + assert(be->validating); + assert(cqe->res != -EINVAL); + P_DEBUG("%s: checking validation: %d\n", __func__, cqe->res); + + int bread = cqe->res; + // Error or disconnection. + if (bread <= 0) { + _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + _backend_failed_ur(be); + return; + } + + be->rbufused += bread; + + int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r); + if (status == MCMC_ERR) { + // Needed more data for a version line, somehow. For the uring code + // we'll treat that as an error, for now. + // TODO: re-schedule self if r.code == MCMC_WANT_READ. + + _reset_bad_backend(be, P_BE_FAIL_READVALIDATE); + _backend_failed_ur(be); + return; + } + + if (r.code != MCMC_CODE_VERSION) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed_ur(be); + return; + } + + be->validating = false; + be->rbufused = 0; + + // Passed validation, don't need to re-read, flush any pending writes. + int res = _flush_pending_write(be); + if (res == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed_ur(be); + return; + } + + if (!STAILQ_EMPTY(&be->io_head)) { + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + } +} + +// TODO (v3): much code shared with proxy_event_beconn, should be able to +// abstract out. +// TODO (v3): further optimization would move the mcmc_connect() socket +// creation to uring. +static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) { + proxy_event_thread_t *t = udata; + P_DEBUG("%s: got wakeup: %d\n", __func__, cqe->res); + + // liburing always uses eventfd for the notifier. + // *cqe has our result. + assert(cqe->res != -EINVAL); + if (cqe->res != sizeof(eventfd_t)) { + P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res); + // FIXME (v2): figure out if this is impossible, and how to handle if not. + assert(1 == 0); + } + + // need to re-arm the listener every time. + _proxy_evthr_evset_benotifier(t); + + beconn_head_t head; + + STAILQ_INIT(&head); + pthread_mutex_lock(&t->mutex); + STAILQ_CONCAT(&head, &t->beconn_head_in); + pthread_mutex_unlock(&t->mutex); + + mcp_backend_t *be = NULL; + // be can be freed by the loop, so can't use STAILQ_FOREACH. + while (!STAILQ_EMPTY(&head)) { + be = STAILQ_FIRST(&head); + STAILQ_REMOVE_HEAD(&head, beconn_next); + if (be->transferred) { + // If this object was already transferred here, we're being + // signalled to clean it up and free. + _cleanup_backend(be); + } else { + be->transferred = true; + be->event_thread = t; + int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { + // if we're already connected for some reason, still push it + // through the connection handler to keep the code unified. It + // will auto-wake because the socket is writeable. + be->connecting = true; + be->can_write = false; + _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur); + } else { + _reset_bad_backend(be, P_BE_FAIL_CONNECTING); + _backend_failed_ur(be); + } + } + } + +} + static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { proxy_event_thread_t *t = udata; @@ -290,6 +499,64 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { } } +static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) { + P_DEBUG("%s: setting: %lu\n", __func__, len); + struct io_uring_sqe *sqe; + if (be->ur_rd_ev.set) { + P_DEBUG("%s: already set\n", __func__); + return; + } + + be->ur_rd_ev.cb = proxy_backend_beconn_validate_ur; + be->ur_rd_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME (v2): NULL? + assert(be->rbuf != NULL); + io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0); + io_uring_sqe_set_data(sqe, &be->ur_rd_ev); + be->ur_rd_ev.set = true; + + sqe->flags |= IOSQE_IO_LINK; + + // add a timeout. + be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; + be->ur_te_ev.udata = be; + sqe = io_uring_get_sqe(&be->event_thread->ring); + + io_uring_prep_link_timeout(sqe, ts, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); +} + +// reuse the write handler event for pending connections. +static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts) { + struct io_uring_sqe *sqe; + P_DEBUG("%s: setting\n", __func__); + if (be->ur_wr_ev.set) + return; + + be->ur_wr_ev.cb = proxy_backend_beconn_ur; + be->ur_wr_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME (v2): NULL? + + io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT); + io_uring_sqe_set_data(sqe, &be->ur_wr_ev); + be->ur_wr_ev.set = true; + + sqe->flags |= IOSQE_IO_LINK; + + // add a timeout. + // FIXME: do I need to change this at all? + be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; + be->ur_te_ev.udata = be; + sqe = io_uring_get_sqe(&be->event_thread->ring); + + io_uring_prep_link_timeout(sqe, ts, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); +} + static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) { struct io_uring_sqe *sqe; if (be->ur_wr_ev.set) @@ -357,6 +624,21 @@ static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) { t->ur_clock_event.set = true; } +static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) { + struct io_uring_sqe *sqe; + P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set); + if (t->ur_benotify_event.set) + return; + + t->ur_benotify_event.cb = proxy_beconn_handler_ur; + t->ur_benotify_event.udata = t; + + sqe = io_uring_get_sqe(&t->ring); + // FIXME (v2): NULL? + io_uring_prep_read(sqe, t->be_event_fd, &t->beevent_counter, sizeof(eventfd_t), 0); + io_uring_sqe_set_data(sqe, &t->ur_benotify_event); +} + static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set); @@ -450,6 +732,11 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { } static void _cleanup_backend(mcp_backend_t *be) { +#ifdef HAVE_LIBURING + if (be->event_thread->use_uring) { + // TODO: cancel any live uring events. + } else { +#endif // remove any pending events. int pending = 0; if (event_initialized(&be->event)) { @@ -458,6 +745,9 @@ static void _cleanup_backend(mcp_backend_t *be) { if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { event_del(&be->event); // an error to call event_del() without event. } +#ifdef HAVE_LIBURING + } +#endif // - assert on empty queue assert(STAILQ_EMPTY(&be->io_head)); @@ -1058,25 +1348,6 @@ static int _flush_pending_write(mcp_backend_t *be) { return flags; } -static int _beconn_send_validate(mcp_backend_t *be) { - const char *str = "version\r\n"; - const ssize_t len = strlen(str); - - ssize_t res = write(mcmc_fd(be->client), str, len); - - if (res == -1) { - return -1; - } - - // I'm making an opinionated statement that we should be able to write - // "version\r\n" into a fresh socket without hitting EAGAIN. - if (res < len) { - return -1; - } - - return 1; -} - // Libevent handler for backends in a connecting state. static void proxy_beconn_handler(const int fd, const short which, void *arg) { assert(arg != NULL); @@ -1095,35 +1366,10 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { be->can_write = true; if (be->connecting) { - int err = 0; - // We were connecting, now ensure we're properly connected. - if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { - // kick the bad backend, clear the queue, retry later. - // FIXME (v2): if a connect fails, anything currently in the queue - // should be safe to hold up until their timeout. - _reset_bad_backend(be, P_BE_FAIL_CONNECTING); - _backend_failed(be); - P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); + if (_proxy_beconn_checkconnect(be) == -1) { return; } - P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); - be->connecting = false; - be->state = mcp_backend_read; - be->bad = false; - be->failed_count = 0; - - be->validating = true; - // TODO: make validation optional. - - if (_beconn_send_validate(be) == -1) { - _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); - _backend_failed(be); - return; - } else { - // buffer should be empty during validation stage. - assert(be->rbufused == 0); - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); - } + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); } // TODO: currently never taken, until validation is made optional. @@ -1305,12 +1551,20 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a // chicken and egg in here. need a better structure... in meantime // re-create the event_fd. + + // set the new request handler. close(t->event_fd); t->event_fd = eventfd(0, 0); // FIXME (v2): hack for event init. t->ur_notify_event.set = false; _proxy_evthr_evset_notifier(t); + // set the new backend connection handler. + close(t->be_event_fd); + t->be_event_fd = eventfd(0, 0); + t->ur_benotify_event.set = false; + _proxy_evthr_evset_benotifier(t); + // periodic data updater for event thread t->ur_clock_event.cb = proxy_event_updater_ur; t->ur_clock_event.udata = t; |