From ac55ac888e6252836ca4a233daf79253934c5728 Mon Sep 17 00:00:00 2001 From: dormando Date: Sun, 22 Jan 2023 15:11:18 -0800 Subject: proxy: fix mismatched responses after bad write Regression from the IO thread performance fix (again...) back in early december. Was getting corrupt backends if IO's were flushed in a very specific way, which would give bad data to clients. Once traffic stops the backends would timeout (waiting for responses that were never coming) and reset themselves. The optimization added was to "fast skip" IO's that were already flushed to the network by tracking a pointer into the list of IO's. The bug requires a series of events: 1) the "prep write command" function notes a pointer into the top of the backend IO stack. 2) a write to the backend socket resulting in an EAGAIN (no bytes written, try again later). 3) reads then complete from the backend, changing the list of IO objects. 4) "prep write command" tries again from a now invalid backend object. The fix: 1) only set the offset pointer _post flush_ to the last specifically non-flushed IO object, so if the list changes it should always be behind the IO pointer. 2) the IO pointer is nulled out immediately if flushing is complete. Took staring at it for a long time to understand this. I've rewritten this change once. I will split the stacks for "to-write queue" and "to-read queue" soon. That should be safer. --- proxy_network.c | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/proxy_network.c b/proxy_network.c index 9f2c410..d93dd6f 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -147,9 +147,6 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { continue; } STAILQ_INSERT_TAIL(&be->io_head, io, io_next); - if (be->io_next == NULL) { - be->io_next = io; // set write flush starting point. - } be->depth++; io_count++; if (!be->stacked) { @@ -1297,9 +1294,10 @@ static int _prep_pending_write(mcp_backend_t *be) { int iovused = 0; if (be->io_next == NULL) { // separate pointer for how far into the list we've flushed. - be->io_next = STAILQ_FIRST(&be->io_head); + io = STAILQ_FIRST(&be->io_head); + } else { + io = be->io_next; } - io = be->io_next; assert(io != NULL); for (; io; io = STAILQ_NEXT(io, io_next)) { // TODO (v2): paranoia for now, but this check should never fire @@ -1320,13 +1318,20 @@ static int _prep_pending_write(mcp_backend_t *be) { // returns true if any pending writes were fully flushed. static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { io_pending_proxy_t *io = be->io_next; - assert(io != NULL); + if (io == NULL) { + io = STAILQ_FIRST(&be->io_head); + } bool did_flush = false; for (; io; io = STAILQ_NEXT(io, io_next)) { bool flushed = true; if (io->flushed) continue; + if (sent <= 0) { + // really shouldn't be negative, though. + assert(sent >= 0); + break; + } if (sent >= io->iovbytes) { // short circuit for common case. @@ -1351,15 +1356,16 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { if (flushed) { did_flush = flushed; - be->io_next = STAILQ_NEXT(io, io_next); - } - if (sent <= 0) { - // really shouldn't be negative, though. - assert(sent >= 0); - break; } } // for + // resume the flush from this point. + if (io != NULL) { + be->io_next = io; + } else { + be->io_next = NULL; + } + return did_flush; } -- cgit v1.2.1