/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // Functions related to the backend handler thread. #include "proxy.h" static void proxy_backend_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _flush_pending_write(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread); static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { io_head_t head; STAILQ_INIT(&head); STAILQ_INIT(&t->be_head); // Pull the entire stack of inbound into local queue. pthread_mutex_lock(&t->mutex); STAILQ_CONCAT(&head, &t->io_head_in); pthread_mutex_unlock(&t->mutex); int io_count = 0; int be_count = 0; while (!STAILQ_EMPTY(&head)) { io_pending_proxy_t *io = STAILQ_FIRST(&head); io->flushed = false; mcp_backend_t *be = io->backend; // So the backend can retrieve its event base. be->event_thread = t; // _no_ mutex on backends. they are owned by the event thread. STAILQ_REMOVE_HEAD(&head, io_next); if (be->bad) { P_DEBUG("%s: fast failing request to bad backend\n", __func__); io->client_resp->status = MCMC_ERR; return_io_pending((io_pending_t *)io); continue; } STAILQ_INSERT_TAIL(&be->io_head, io, io_next); be->depth++; io_count++; if (!be->stacked) { be->stacked = true; STAILQ_INSERT_TAIL(&t->be_head, be, be_next); be_count++; } } //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count); return io_count; } #ifdef HAVE_LIBURING static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_retry(mcp_backend_t *be); static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) { proxy_event_thread_t *t = udata; proxy_ctx_t *ctx = t->ctx; _proxy_evthr_evset_clock(t); // we reuse the "global stats" lock since it's hardly ever used. STAT_L(ctx); memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); STAT_UL(ctx); } // No-op at the moment. when the linked timeout fires uring returns the // linked request (read/write/poll/etc) with an interrupted/timeout/cancelled // error. So we don't need to explicitly handle timeouts. // I'm leaving the structure in to simplify the callback routine. // Since timeouts rarely get called the extra code here shouldn't matter. static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) { return; } static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); } static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { struct io_uring_sqe *sqe; if (be->ur_te_ev.set) return; be->ur_te_ev.cb = proxy_backend_retry_handler_ur; be->ur_te_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); // TODO (v2): NULL? io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); be->ur_te_ev.set = true; } static void _backend_failed_ur(mcp_backend_t *be) { if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _proxy_evthr_evset_be_retry(be); } else { _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur); } } // read handler. static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; int bread = cqe->res; char *rbuf = NULL; size_t toread = 0; // Error or disconnection. if (bread <= 0) { _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); // NOTE: Not calling backed_failed here since if the backend is busted // it should be caught by the connect routine. // This is probably not _always_ true in practice. Leaving this note // so I can re-evaluate later. return; } int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread); P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread); if (res > 0) { _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur); } else if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); return; } // TODO (v2): when exactly do we need to reset the backend handler? if (!STAILQ_EMPTY(&be->io_head)) { _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); } } static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; int flags = 0; be->can_write = true; if (be->connecting) { int err = 0; // We were connecting, now ensure we're properly connected. if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { // kick the bad backend, clear the queue, retry later. // TODO (v2): if a connect fails, anything currently in the queue // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed_ur(be); P_DEBUG("%s: backend failed to connect\n", __func__); return; } P_DEBUG("%s: backend connected\n", __func__); be->connecting = false; be->state = mcp_backend_read; be->bad = false; be->failed_count = 0; } int res = _flush_pending_write(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); return; } if (flags & EV_WRITE) { _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); } _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); } static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { proxy_event_thread_t *t = udata; // liburing always uses eventfd for the notifier. // *cqe has our result. assert(cqe->res != -EINVAL); if (cqe->res != sizeof(eventfd_t)) { P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res); // FIXME (v2): figure out if this is impossible, and how to handle if not. assert(1 == 0); } // need to re-arm the listener every time. _proxy_evthr_evset_notifier(t); // TODO (v2): sqe queues for writing to backends // - _ur handler for backend write completion is to set a read event and // re-submit. ugh. // Should be possible to have standing reads, but flow is harder and lets // optimize that later. (ie; allow matching reads to a request but don't // actually dequeue anything until both read and write are confirmed) if (_proxy_event_handler_dequeue(t) == 0) { //P_DEBUG("%s: no IO's to complete\n", __func__); return; } // Re-walk each backend and check set event as required. mcp_backend_t *be = NULL; // TODO (v2): for each backend, queue writev's into sqe's // move the backend sqe bits into a write complete handler STAILQ_FOREACH(be, &t->be_head, be_next) { be->stacked = false; int flags = 0; if (be->connecting) { P_DEBUG("%s: deferring IO pending connecting\n", __func__); flags |= EV_WRITE; } else { flags = _flush_pending_write(be); } if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); } else { // FIXME (v2): needs a re-write to handle sqe starvation. // FIXME (v2): can't actually set the read here? need to confirm _some_ // write first? if (flags & EV_WRITE) { _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur); } if (flags & EV_READ) { _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur); } } } } static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) { struct io_uring_sqe *sqe; if (be->ur_wr_ev.set) return; be->ur_wr_ev.cb = proxy_backend_wrhandler_ur; be->ur_wr_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); // FIXME (v2): NULL? io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT); io_uring_sqe_set_data(sqe, &be->ur_wr_ev); be->ur_wr_ev.set = true; sqe->flags |= IOSQE_IO_LINK; // add a timeout. be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; be->ur_te_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); io_uring_prep_link_timeout(sqe, ts, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); } static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) { P_DEBUG("%s: setting: %lu\n", __func__, len); struct io_uring_sqe *sqe; if (be->ur_rd_ev.set) { P_DEBUG("%s: already set\n", __func__); return; } be->ur_rd_ev.cb = proxy_backend_handler_ur; be->ur_rd_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); // FIXME (v2): NULL? assert(be->rbuf != NULL); io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0); io_uring_sqe_set_data(sqe, &be->ur_rd_ev); be->ur_rd_ev.set = true; sqe->flags |= IOSQE_IO_LINK; // add a timeout. // TODO (v2): we can pre-set the event data and avoid always re-doing it here. be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; be->ur_te_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); io_uring_prep_link_timeout(sqe, ts, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); } static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(&t->ring); // FIXME (v2): NULL? io_uring_prep_timeout(sqe, &updater_ts, 0, 0); io_uring_sqe_set_data(sqe, &t->ur_clock_event); t->ur_clock_event.set = true; } static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set); if (t->ur_notify_event.set) return; t->ur_notify_event.cb = proxy_event_handler_ur; t->ur_notify_event.udata = t; sqe = io_uring_get_sqe(&t->ring); // FIXME (v2): NULL? io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0); io_uring_sqe_set_data(sqe, &t->ur_notify_event); } // TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check // for space free in the sqe queue and submit in the middle of the cqe // foreach. // There might be better places to do this, but I think it's cleaner if // submission and cqe can stay in this function. // TODO (v2): The problem is io_submit() can deadlock if too many cqe's are // waiting. // Need to understand if this means "CQE's ready to be picked up" or "CQE's in // flight", because the former is much easier to work around (ie; only run the // backend handler after dequeuing everything else) // TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ // events slots. Therefore might starve SQE's if we were low beforehand. // - switch from for_each_cqe to doing one at a time (for now?) // - track # of sqe's allocated in the cqe loop. // - stop and submit if we've >= half the queue. // - ??? when can a CQE generate > 1 SQE? // - wrhandler_ur can set both wrpoll and read // - if CQE's can gen > 1 SQE at a time, we'll eventually starve. // - proper flow: CQE's can enqueue backends to be processed. // - after CQE's are processed, backends are processed (ouch?) // - if SQE's starve here, bail but keep the BE queue. // - then submit SQE's static void *proxy_event_thread_ur(void *arg) { proxy_event_thread_t *t = arg; struct io_uring_cqe *cqe; P_DEBUG("%s: starting\n", __func__); logger_create(); // TODO (v2): add logger to struct while (1) { P_DEBUG("%s: submit and wait\n", __func__); io_uring_submit_and_wait(&t->ring, 1); //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret); uint32_t head = 0; uint32_t count = 0; io_uring_for_each_cqe(&t->ring, head, cqe) { P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count); proxy_event_t *pe = io_uring_cqe_get_data(cqe); pe->set = false; pe->cb(pe->udata, cqe); count++; } P_DEBUG("%s: advancing [count:%d]\n", __func__, count); io_uring_cq_advance(&t->ring, count); } return NULL; } #endif // HAVE_LIBURING // We need to get timeout/retry/etc updates to the event thread(s) // occasionally. I'd like to have a better inteface around this where updates // are shipped directly; but this is good enough to start with. static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; proxy_ctx_t *ctx = t->ctx; // TODO (v2): double check how much of this boilerplate is still necessary? // reschedule the clock event. evtimer_del(&t->clock_event); evtimer_set(&t->clock_event, proxy_event_updater, t); event_base_set(t->base, &t->clock_event); struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; evtimer_add(&t->clock_event, &rate); // we reuse the "global stats" lock since it's hardly ever used. STAT_L(ctx); memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); STAT_UL(ctx); } // event handler for executing backend requests static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; #ifdef USE_EVENTFD uint64_t u; if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { // Temporary error or wasn't actually ready to read somehow. return; } #else char buf[1]; // TODO (v2): This is a lot more fatal than it should be. can it fail? can // it blow up the server? // TODO (v2): a cross-platform method of speeding this up would be nice. With // event fds we can queue N events and wakeup once here. // If we're pulling one byte out of the pipe at a time here it'll just // wake us up too often. // If the pipe is O_NONBLOCK then maybe just a larger read would work? if (read(fd, buf, 1) != 1) { P_DEBUG("%s: pipe read failed\n", __func__); return; } #endif if (_proxy_event_handler_dequeue(t) == 0) { //P_DEBUG("%s: no IO's to complete\n", __func__); return; } // Re-walk each backend and check set event as required. mcp_backend_t *be = NULL; struct timeval tmp_time = t->tunables.read; // FIXME (v2): _set_event() is buggy, see notes on function. STAILQ_FOREACH(be, &t->be_head, be_next) { be->stacked = false; int flags = 0; if (be->connecting) { P_DEBUG("%s: deferring IO pending connecting\n", __func__); } else { flags = _flush_pending_write(be); } if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); } else { flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); } } } void *proxy_event_thread(void *arg) { proxy_event_thread_t *t = arg; logger_create(); // TODO (v2): add logger ptr to structure event_base_loop(t->base, 0); event_base_free(t->base); // TODO (v2): join bt threads, free array. return NULL; } // FIXME (v2): if we use the newer API the various pending checks can be adjusted. static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) { // FIXME (v2): chicken and egg. // can't check if pending if the structure is was calloc'ed (sigh) // don't want to double test here. should be able to event_assign but // not add anything during initialization, but need the owner thread's // event base. int pending = 0; if (event_initialized(&be->event)) { pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); } if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { event_del(&be->event); // replace existing event. } // if we can't write, we could be connecting. // TODO (v2): always check for READ in case some commands were sent // successfully? The flags could be tracked on *be and reset in the // handler, perhaps? event_assign(&be->event, base, mcmc_fd(be->client), flags, callback, be); event_add(&be->event, &t); } // NOTES: // - mcp_backend_read: grab req_stack_head, do things // read -> next, want_read -> next | read_end, etc. // issue: want read back to read_end as necessary. special state? // - it's fine: p->client_resp->type. // - mcp_backend_next: advance, consume, etc. // TODO (v2): second argument with enum for a specific error. // - probably just for logging. for app if any of these errors shouldn't // result in killing the request stack! static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) { bool stop = false; io_pending_proxy_t *p = NULL; mcmc_resp_t tmp_resp; // helper for testing for GET's END marker. int flags = 0; p = STAILQ_FIRST(&be->io_head); if (p == NULL) { // got a read event, but nothing was queued. // probably means a disconnect event. // TODO (v2): could probably confirm this by attempting to read the // socket, getsockopt, or something else simply for logging or // statistical purposes. // In this case we know it's going to be a close so error. flags = -1; P_DEBUG("%s: read event but nothing in IO queue\n", __func__); return flags; } while (!stop) { mcp_resp_t *r; int res = 1; int remain = 0; char *newbuf = NULL; switch(be->state) { case mcp_backend_read: assert(p != NULL); P_DEBUG("%s: [read] bread: %d\n", __func__, bread); if (bread == 0) { // haven't actually done a read yet; figure out where/what. *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread); return EV_READ; } else { be->state = mcp_backend_parse; } break; case mcp_backend_parse: r = p->client_resp; r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp); // FIXME (v2): Don't like how this mcmc API ended up. bread = 0; // only add the bread once per loop. if (r->status != MCMC_OK) { P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); if (r->status == MCMC_WANT_READ) { *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread); return EV_READ; } else { flags = -1; stop = true; break; } } // we actually don't care about anything but the value length // TODO (v2): if vlen != vlen_read, pull an item and copy the data. int extra_space = 0; switch (r->resp.type) { case MCMC_RESP_GET: // We're in GET mode. we only support one key per // GET in the proxy backends, so we need to later check // for an END. extra_space = ENDLEN; break; case MCMC_RESP_END: // this is a MISS from a GET request // or final handler from a STAT request. assert(r->resp.vlen == 0); break; case MCMC_RESP_META: // we can handle meta responses easily since they're self // contained. break; case MCMC_RESP_GENERIC: case MCMC_RESP_NUMERIC: break; // TODO (v2): No-op response? default: P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type); // unhandled :( flags = -1; stop = true; break; } if (res) { if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) { // Ascii multiget hack mode; consume END's be->state = mcp_backend_next; break; } // r->resp.reslen + r->resp.vlen is the total length of the response. // TODO (v2): need to associate a buffer with this response... // for now lets abuse write_and_free on mc_resp and simply malloc the // space we need, stuffing it into the resp object. r->blen = r->resp.reslen + r->resp.vlen; r->buf = malloc(r->blen + extra_space); if (r->buf == NULL) { flags = -1; // TODO (v2): specific error. stop = true; break; } P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen); if (r->resp.vlen != r->resp.vlen_read) { P_DEBUG("%s: got a short read, moving to want_read\n", __func__); // copy the partial and advance mcmc's buffer digestion. // FIXME (v2): should call this for both cases? // special function for advancing mcmc's buffer for // reading a value? perhaps a flag to skip the data copy // when it's unnecessary? memcpy(r->buf, be->rbuf, r->resp.reslen); r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread); be->state = mcp_backend_want_read; break; } else { // mcmc's already counted the value as read if it fit in // the original buffer... memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read); } } else { // TODO (v2): no response read? // nothing currently sets res to 0. should remove if that // never comes up and handle the error entirely above. P_DEBUG("%s: no response read from backend\n", __func__); flags = -1; stop = true; break; } if (r->resp.type == MCMC_RESP_GET) { // advance the buffer newbuf = mcmc_buffer_consume(be->client, &remain); if (remain != 0) { // TODO (v2): don't need to shuffle buffer with better API memmove(be->rbuf, newbuf, remain); } be->state = mcp_backend_read_end; } else { be->state = mcp_backend_next; } break; case mcp_backend_read_end: r = p->client_resp; // we need to ensure the next data in the stream is "END\r\n" // if not, the stack is desynced and we lose it. r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp); bread = 0; P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type); if (r->status != MCMC_OK) { if (r->status == MCMC_WANT_READ) { *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread); return EV_READ; } else { flags = -1; // TODO (v2): specific error. stop = true; } break; } else if (tmp_resp.type != MCMC_RESP_END) { // TODO (v2): specific error about protocol desync flags = -1; stop = true; break; } else { // response is good. // FIXME (v2): copy what the server actually sent? if (!p->ascii_multiget) { // sigh... if part of a multiget we need to eat the END // markers down here. memcpy(r->buf+r->blen, ENDSTR, ENDLEN); r->blen += 5; } } be->state = mcp_backend_next; break; case mcp_backend_want_read: // Continuing a read from earlier r = p->client_resp; // take bread input and see if we're done reading the value, // else advance, set buffers, return next. if (bread > 0) { r->bread += bread; bread = 0; } P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen); if (r->bread >= r->resp.vlen) { // all done copying data. if (r->resp.type == MCMC_RESP_GET) { newbuf = mcmc_buffer_consume(be->client, &remain); // Shouldn't be anything in the buffer if we had to run to // want_read to read the value. assert(remain == 0); be->state = mcp_backend_read_end; } else { be->state = mcp_backend_next; } } else { // signal to caller to issue a read. *rbuf = r->buf+r->resp.reslen+r->bread; *toread = r->resp.vlen - r->bread; // need to retry later. flags |= EV_READ; stop = true; } break; case mcp_backend_next: // set the head here. when we break the head will be correct. STAILQ_REMOVE_HEAD(&be->io_head, io_next); be->depth--; // have to do the q->count-- and == 0 and redispatch_conn() // stuff here. The moment we call return_io here we // don't own *p anymore. return_io_pending((io_pending_t *)p); if (STAILQ_EMPTY(&be->io_head)) { // TODO (v2): suspicious of this code. audit harder? stop = true; } else { p = STAILQ_FIRST(&be->io_head); } // mcmc_buffer_consume() - if leftover, keep processing // IO's. // if no more data in buffer, need to re-set stack head and re-set // event. remain = 0; // TODO (v2): do we need to yield every N reads? newbuf = mcmc_buffer_consume(be->client, &remain); P_DEBUG("%s: [next] remain: %d\n", __func__, remain); be->state = mcp_backend_read; if (remain != 0) { // data trailing in the buffer, for a different request. memmove(be->rbuf, newbuf, remain); be->state = mcp_backend_parse; P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain); } else { // need to read more data, buffer is empty. stop = true; } break; default: // TODO (v2): at some point (after v1?) this should attempt to recover, // though we should only get here from memory corruption and // bailing may be the right thing to do. fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state); assert(false); } // switch } // while return flags; } // All we need to do here is schedule the backend to attempt to connect again. static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); struct timeval tmp_time = be->event_thread->tunables.retry; _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); } // currently just for timeouts, but certain errors should consider a backend // to be "bad" as well. // must be called before _reset_bad_backend(), so the backend is currently // clear. // TODO (v2): currently only notes for "bad backends" in cases of timeouts or // connect failures. We need a specific connect() handler that executes a // "version" call to at least check that the backend isn't speaking garbage. // In theory backends can fail such that responses are constantly garbage, // but it's more likely an app is doing something bad and culling the backend // may prevent any other clients from talking to that backend. In // that case we need to track if clients are causing errors consistently and // block them instead. That's more challenging so leaving a note instead // of doing this now :) static void _backend_failed(mcp_backend_t *be) { struct timeval tmp_time = be->event_thread->tunables.retry; if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { STAT_INCR(be->event_thread->ctx, backend_failed, 1); _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); } } const char *proxy_be_failure_text[] = { [P_BE_FAIL_TIMEOUT] = "timeout", [P_BE_FAIL_DISCONNECTED] = "disconnected", [P_BE_FAIL_CONNECTING] = "connecting", [P_BE_FAIL_WRITING] = "writing", [P_BE_FAIL_READING] = "reading", [P_BE_FAIL_PARSING] = "parsing", NULL }; // TODO (v2): add a second argument for assigning a specific error to all pending // IO's (ie; timeout). // The backend has gotten into a bad state (timed out, protocol desync, or // some other supposedly unrecoverable error: purge the queue and // cycle the socket. // Note that some types of errors may not require flushing the queue and // should be fixed as they're figured out. // _must_ be called from within the event thread. static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { io_pending_proxy_t *io = NULL; STAILQ_FOREACH(io, &be->io_head, io_next) { // TODO (v2): Unsure if this is the best way of surfacing errors to lua, // but will do for V1. io->client_resp->status = MCMC_ERR; return_io_pending((io_pending_t *)io); } STAILQ_INIT(&be->io_head); mcmc_disconnect(be->client); int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); if (status == MCMC_CONNECTED) { // TODO (v2): unexpected but lets let it be here. be->connecting = false; be->can_write = true; } else if (status == MCMC_CONNECTING) { be->connecting = true; be->can_write = false; } else { // TODO (v2): failed to immediately re-establish the connection. // need to put the BE into a bad/retry state. // FIXME (v2): until we get an event to specifically handle connecting and // bad server handling, attempt to force a reconnect here the next // time a request comes through. // The event thread will attempt to write to the backend, fail, then // end up in this routine again. be->connecting = false; be->can_write = true; } LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->name, be->port); return 0; } static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) { struct iovec *iovs = be->write_iovs; io_pending_proxy_t *io = NULL; int iovused = 0; STAILQ_FOREACH(io, &be->io_head, io_next) { if (io->flushed) continue; if (io->iovcnt + iovused > BE_IOV_MAX) { // Signal to caller that we need to keep writing, if writeable. // FIXME (v2): can certainly refactor this to loop instead of waiting // for a writeable event. *tosend += 1; break; } memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt); iovused += io->iovcnt; *tosend += io->iovbytes; } return iovused; } static int _flush_pending_write(mcp_backend_t *be) { int flags = 0; unsigned int tosend = 0; int iovcnt = _prep_pending_write(be, &tosend); ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt); if (sent > 0) { io_pending_proxy_t *io = NULL; if (sent < tosend) { flags |= EV_WRITE; } STAILQ_FOREACH(io, &be->io_head, io_next) { bool flushed = true; if (io->flushed) continue; if (sent >= io->iovbytes) { // short circuit for common case. sent -= io->iovbytes; } else { for (int x = 0; x < io->iovcnt; x++) { struct iovec *iov = &io->iov[x]; if (sent >= iov->iov_len) { sent -= iov->iov_len; iov->iov_len = 0; } else { iov->iov_len -= sent; sent = 0; flushed = false; break; } } } io->flushed = flushed; if (flushed) { flags |= EV_READ; } if (sent <= 0) { // really shouldn't be negative, though. assert(sent >= 0); break; } } // STAILQ_FOREACH } else if (sent == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { be->can_write = false; flags |= EV_WRITE; } else { flags = -1; } } return flags; } // The libevent backend callback handler. // If we end up resetting a backend, it will get put back into a connecting // state. static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; int flags = EV_TIMEOUT; struct timeval tmp_time = be->event_thread->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); _reset_bad_backend(be, P_BE_FAIL_TIMEOUT); _backend_failed(be); return; } if (which & EV_WRITE) { be->can_write = true; // TODO (v2): move connect routine to its own function? // - hard to do right now because we can't (easily?) edit libevent // events. if (be->connecting) { int err = 0; // We were connecting, now ensure we're properly connected. if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { // kick the bad backend, clear the queue, retry later. // FIXME (v2): if a connect fails, anything currently in the queue // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); P_DEBUG("%s: backend failed to connect\n", __func__); return; } P_DEBUG("%s: backend connected\n", __func__); be->connecting = false; be->state = mcp_backend_read; be->bad = false; be->failed_count = 0; } int res = _flush_pending_write(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); return; } } if (which & EV_READ) { // We do the syscall here before diving into the state machine to allow a // common code path for io_uring/epoll int res = 1; int read = 0; while (res > 0) { char *rbuf = NULL; size_t toread = 0; // need to input how much was read since last call // needs _output_ of the buffer to read into and how much. res = proxy_backend_drive_machine(be, read, &rbuf, &toread); P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread); if (res > 0) { read = recv(mcmc_fd(be->client), rbuf, toread, 0); P_DEBUG("%s: read: %d\n", __func__, read); if (read == 0) { // not connected or error. _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); return; } else if (read == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; // sit on epoll again. } else { _reset_bad_backend(be, P_BE_FAIL_READING); return; } } } else if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_PARSING); return; } else { break; } } #ifdef PROXY_DEBUG if (!STAILQ_EMPTY(&be->io_head)) { P_DEBUG("backend has leftover IOs: %d\n", be->depth); } #endif } // Still pending requests to read or write. if (!STAILQ_EMPTY(&be->io_head)) { flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event. _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); } } // TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple // event threads. void proxy_init_evthread_events(proxy_event_thread_t *t) { #ifdef HAVE_LIBURING bool use_uring = t->ctx->use_uring; struct io_uring_params p = {0}; assert(t->event_fd); // uring only exists where eventfd also does. // Setup the CQSIZE to be much larger than SQ size, since backpressure // issues can cause us to block on SQ submissions and as a network server, // stuff happens. if (use_uring) { p.flags = IORING_SETUP_CQSIZE; p.cq_entries = PRING_QUEUE_CQ_ENTRIES; int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p); if (ret) { perror("io_uring_queue_init_params"); exit(1); } if (!(p.features & IORING_FEAT_NODROP)) { fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n"); use_uring = false; } if (!(p.features & IORING_FEAT_SINGLE_MMAP)) { fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n"); use_uring = false; } if (!(p.features & IORING_FEAT_FAST_POLL)) { fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n"); use_uring = false; } if (use_uring) { // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a // chicken and egg in here. need a better structure... in meantime // re-create the event_fd. close(t->event_fd); t->event_fd = eventfd(0, 0); // FIXME (v2): hack for event init. t->ur_notify_event.set = false; _proxy_evthr_evset_notifier(t); // periodic data updater for event thread t->ur_clock_event.cb = proxy_event_updater_ur; t->ur_clock_event.udata = t; t->ur_clock_event.set = false; _proxy_evthr_evset_clock(t); t->use_uring = true; return; } else { // Decided to not use io_uring, so don't waste memory. t->use_uring = false; io_uring_queue_exit(&t->ring); } } else { t->use_uring = false; } #endif struct event_config *ev_config; ev_config = event_config_new(); event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); t->base = event_base_new_with_config(ev_config); event_config_free(ev_config); if (! t->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } // listen for notifications. // NULL was thread_libevent_process // FIXME (v2): use modern format? (event_assign) #ifdef USE_EVENTFD event_set(&t->notify_event, t->event_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); #else event_set(&t->notify_event, t->notify_receive_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); #endif evtimer_set(&t->clock_event, proxy_event_updater, t); event_base_set(t->base, &t->clock_event); struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; evtimer_add(&t->clock_event, &rate); event_base_set(t->base, &t->notify_event); if (event_add(&t->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } }