summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--proxy.h5
-rw-r--r--proxy_network.c23
2 files changed, 22 insertions, 6 deletions
diff --git a/proxy.h b/proxy.h
index 66b4748..f12ec1e 100644
--- a/proxy.h
+++ b/proxy.h
@@ -292,8 +292,8 @@ typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
// TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
// don't have a good temporary space and don't want to malloc/free on every
// write. transmit() uses the stack but we can't do that for uring's use case.
-#if (IOV_MAX > 128)
-#define BE_IOV_MAX 128
+#if (IOV_MAX > 1024)
+#define BE_IOV_MAX 1024
#else
#define BE_IOV_MAX IOV_MAX
#endif
@@ -306,6 +306,7 @@ struct mcp_backend_s {
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
io_head_t io_head; // stack of requests.
+ io_pending_proxy_t *io_next; // next request to write.
char *rbuf; // statically allocated read buffer.
size_t rbufused; // currently active bytes in the buffer
struct event event; // libevent
diff --git a/proxy_network.c b/proxy_network.c
index 9d885e6..61fa69b 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -87,6 +87,11 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
continue;
}
STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
+ if (be->io_next == NULL) {
+ // separate pointer into the request queue for how far we've
+ // flushed writes.
+ be->io_next = io;
+ }
be->depth++;
io_count++;
if (!be->stacked) {
@@ -897,6 +902,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
}
STAILQ_INIT(&be->io_head);
+ be->io_next = NULL; // also reset the write offset.
// reset buffer to blank state.
be->rbufused = 0;
@@ -930,7 +936,10 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
struct iovec *iovs = be->write_iovs;
io_pending_proxy_t *io = NULL;
int iovused = 0;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
+ assert(be->io_next != NULL);
+ io = be->io_next;
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
+ // TODO (v2): paranoia for now, but this check should never fire
if (io->flushed)
continue;
@@ -952,16 +961,21 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
static int _flush_pending_write(mcp_backend_t *be) {
int flags = 0;
unsigned int tosend = 0;
+ // Allow us to be called with an empty stack to prevent dev errors.
+ if (STAILQ_EMPTY(&be->io_head)) {
+ return 0;
+ }
+
int iovcnt = _prep_pending_write(be, &tosend);
ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- io_pending_proxy_t *io = NULL;
+ io_pending_proxy_t *io = be->io_next;
if (sent < tosend) {
flags |= EV_WRITE;
}
- STAILQ_FOREACH(io, &be->io_head, io_next) {
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
bool flushed = true;
if (io->flushed)
continue;
@@ -989,13 +1003,14 @@ static int _flush_pending_write(mcp_backend_t *be) {
if (flushed) {
flags |= EV_READ;
+ be->io_next = STAILQ_NEXT(io, io_next);
}
if (sent <= 0) {
// really shouldn't be negative, though.
assert(sent >= 0);
break;
}
- } // STAILQ_FOREACH
+ } // for
} else if (sent == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
be->can_write = false;