summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-01-24 23:40:25 -0800
committerdormando <dormando@rydia.net>2022-01-24 23:40:25 -0800
commitf2a2f38717a01d05df013765ebde322fa7fcff0b (patch)
treeb1c15f9bdfbd486817908e2920fc863dd25b3dee /proto_proxy.c
parent894e4c16b5162e25ce649cf8a57dfad250965ab0 (diff)
downloadmemcached-f2a2f38717a01d05df013765ebde322fa7fcff0b.tar.gz
proxy: backend write combining
previously individual requests for the same backend would use multiple syscalls. This batches them. This is also prep work for write syscall offloading for uring mode.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c159
1 files changed, 84 insertions, 75 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 797a57d..83bc64b 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -3,6 +3,7 @@
* Functions for handling the proxy layer. wraps text protocols
*/
+#include "memcached.h"
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
@@ -25,7 +26,6 @@
#define PRING_QUEUE_CQ_ENTRIES 16384
#endif
-#include "memcached.h"
#include "proto_proxy.h"
#include "proto_text.h"
#include "murmur3_hash.h"
@@ -270,6 +270,14 @@ struct mcp_request_s {
typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
#define MAX_IPLEN 45
#define MAX_PORTLEN 6
+// TODO: IOV_MAX tends to be 1000+ which would allow for more batching but we
+// don't have a good temporary space and don't want to malloc/free on every
+// write. transmit() uses the stack but we can't do that for uring's use case.
+#if (IOV_MAX > 128)
+#define BE_IOV_MAX 128
+#else
+#define BE_IOV_MAX IOV_MAX
+#endif
struct mcp_backend_s {
char ip[MAX_IPLEN+1];
char port[MAX_PORTLEN+1];
@@ -293,6 +301,7 @@ struct mcp_backend_s {
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
+ struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
};
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
@@ -349,6 +358,7 @@ struct _io_pending_proxy_t {
mcp_backend_t *backend; // backend server to request from
struct iovec iov[2]; // request string + tail buffer
int iovcnt; // 1 or 2...
+ unsigned int iovbytes; // total bytes in the iovec
int await_ref; // lua reference if we were an await object
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
@@ -398,7 +408,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static void *proxy_event_thread(void *arg);
static void proxy_out_errstring(mc_resp *resp, const char *str);
-static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p);
+static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
@@ -1414,18 +1424,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
be->failed_count = 0;
}
io_pending_proxy_t *io = NULL;
- int res = 0;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- res = _flush_pending_write(be, io);
- if (res != -1) {
- flags |= res;
- if (flags & EV_WRITE) {
- break;
- }
- } else {
- break;
- }
- }
+ int res = _flush_pendig_write(be);
if (res == -1) {
_reset_bad_backend(be);
// FIXME: backend_failed?
@@ -1478,13 +1477,7 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
P_DEBUG("%s: deferring IO pending connecting\n", __func__);
flags |= EV_WRITE;
} else {
- io_pending_proxy_t *io = NULL;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- flags = _flush_pending_write(be, io);
- if (flags == -1 || flags & EV_WRITE) {
- break;
- }
- }
+ flags = flush_pending_write(be);
}
if (flags == -1) {
@@ -1704,13 +1697,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
if (be->connecting) {
P_DEBUG("%s: deferring IO pending connecting\n", __func__);
} else {
- io_pending_proxy_t *io = NULL;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- flags = _flush_pending_write(be, io);
- if (flags == -1 || flags & EV_WRITE) {
- break;
- }
- }
+ flags = _flush_pending_write(be);
}
if (flags == -1) {
@@ -2260,50 +2247,82 @@ static int _reset_bad_backend(mcp_backend_t *be) {
return 0;
}
-static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p) {
- int flags = 0;
+static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
+ struct iovec *iovs = be->write_iovs;
+ io_pending_proxy_t *io = NULL;
+ int iovused = 0;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ if (io->flushed)
+ continue;
- if (p->flushed) {
- return 0;
+ if (io->iovcnt + iovused > BE_IOV_MAX) {
+ // Signal to caller that we need to keep writing, if writeable.
+ // FIXME: can certainly refactor this to do more syscalls.
+ *tosend += 1;
+ break;
+ }
+
+ memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
+ iovused += io->iovcnt;
+ *tosend += io->iovbytes;
}
+ return iovused;
+}
+
+static int _flush_pending_write(mcp_backend_t *be) {
+ int flags = 0;
+ unsigned int tosend = 0;
+ int iovcnt = _prep_pending_write(be, &tosend);
- ssize_t sent = 0;
- // FIXME: original send function internally tracked how much was sent, but
- // doing this here would require copying all of the iovecs or modify what
- // we supply.
- // this is probably okay but I want to leave a note here in case I get a
- // better idea.
- int status = mcmc_request_writev(be->client, p->iov, p->iovcnt, &sent, 1);
+ ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- // we need to save progress in case of WANT_WRITE.
- for (int x = 0; x < p->iovcnt; x++) {
- struct iovec *iov = &p->iov[x];
- if (sent >= iov->iov_len) {
- sent -= iov->iov_len;
- iov->iov_len = 0;
+ io_pending_proxy_t *io = NULL;
+ if (sent < tosend) {
+ flags |= EV_WRITE;
+ }
+
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ bool flushed = true;
+ if (io->flushed)
+ continue;
+
+ if (sent >= io->iovbytes) {
+ // short circuit for common case.
+ sent -= io->iovbytes;
} else {
- iov->iov_len -= sent;
- sent = 0;
+ for (int x = 0; x < io->iovcnt; x++) {
+ struct iovec *iov = &io->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ sent = 0;
+ flushed = false;
+ break;
+ }
+ }
+ }
+ io->flushed = flushed;
+ // FIXME: logic around flushed and EV_READ marking feels odd.
+ if (flushed) {
+ flags |= EV_READ;
+ }
+ if (sent <= 0) {
+ // really shouldn't be negative, though.
+ assert(sent >= 0);
break;
}
+ } // STAILQ_FOREACH
+ } else if (sent == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ be->can_write = false;
+ flags |= EV_WRITE;
+ } else {
+ flags = -1;
}
}
- // request_writev() returns WANT_WRITE if we haven't fully flushed.
- if (status == MCMC_WANT_WRITE) {
- // avoid syscalls for any other queued requests.
- be->can_write = false;
- flags = EV_WRITE;
- // can't continue for now.
- } else if (status != MCMC_OK) {
- flags = -1;
- // TODO: specific error code
- // s->error = code?
- } else {
- flags = EV_READ;
- p->flushed = true;
- }
-
return flags;
}
@@ -2345,19 +2364,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
be->bad = false;
be->failed_count = 0;
}
- io_pending_proxy_t *io = NULL;
- int res = 0;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- res = _flush_pending_write(be, io);
- if (res != -1) {
- flags |= res;
- if (flags & EV_WRITE) {
- break;
- }
- } else {
- break;
- }
- }
+ int res = _flush_pending_write(be);
if (res == -1) {
_reset_bad_backend(be);
return;
@@ -3568,10 +3575,12 @@ static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy
p->iov[0].iov_base = r;
p->iov[0].iov_len = len;
p->iovcnt = 1;
+ p->iovbytes = len;
if (pr->vlen != 0) {
p->iov[1].iov_base = pr->vbuf;
p->iov[1].iov_len = pr->vlen;
p->iovcnt = 2;
+ p->iovbytes += pr->vlen;
}
}