summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-05-31 23:27:22 -0700
committerdormando <dormando@rydia.net>2022-07-24 23:02:50 -0700
commitd273bd2063ac3e3171e8c74d0a8557102429a3f2 (patch)
treee3e859c84b071779ab99c3c817d77331caa77b9a
parent581e41623074cc989c11de03099994fe74db164a (diff)
downloadmemcached-d273bd2063ac3e3171e8c74d0a8557102429a3f2.tar.gz
proxy: rework backend buffer handling
experimental change. *io_uring mode is presently broken* there are some potential protocol desync bugs due to mcmc handling its own buffers and the newer event handler handling its own syscalls. this change should have better separation of code for the buffer tracking. if this change works I will add some optimizations to reduce memmove's.
-rw-r--r--proxy.h3
-rw-r--r--proxy_lua.c2
-rw-r--r--proxy_network.c190
3 files changed, 89 insertions, 106 deletions
diff --git a/proxy.h b/proxy.h
index 1c44ef3..4d88867 100644
--- a/proxy.h
+++ b/proxy.h
@@ -310,7 +310,8 @@ struct mcp_backend_s {
void *client; // mcmc client
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
io_head_t io_head; // stack of requests.
- char *rbuf; // static allocated read buffer.
+ char *rbuf; // statically allocated read buffer.
+ size_t rbufused; // currently active bytes in the buffer
struct event event; // libevent
#ifdef HAVE_LIBURING
proxy_event_t ur_rd_ev; // liburing.
diff --git a/proxy_lua.c b/proxy_lua.c
index aa0ea4e..5f5707c 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -104,7 +104,7 @@ static int mcplib_backend(lua_State *L) {
strncpy(be->name, name, MAX_NAMELEN);
strncpy(be->port, port, MAX_PORTLEN);
be->depth = 0;
- be->rbuf = NULL;
+ be->rbufused = 0;
be->failed_count = 0;
STAILQ_INIT(&be->io_head);
be->state = mcp_backend_read;
diff --git a/proxy_network.c b/proxy_network.c
index 3454164..7c7dbbb 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -9,7 +9,7 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
-static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
+static int proxy_backend_drive_machine(mcp_backend_t *be);
static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
io_head_t head;
@@ -125,7 +125,9 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
return;
}
- int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
+ // FIXME: update this io_uring code...
+ //int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
+ int res = -1;
P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
if (res > 0) {
@@ -494,10 +496,9 @@ static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, st
// TODO (v2): second argument with enum for a specific error.
// - probably just for logging. for app if any of these errors shouldn't
// result in killing the request stack!
-static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) {
+static int proxy_backend_drive_machine(mcp_backend_t *be) {
bool stop = false;
io_pending_proxy_t *p = NULL;
- mcmc_resp_t tmp_resp; // helper for testing for GET's END marker.
int flags = 0;
p = STAILQ_FIRST(&be->io_head);
@@ -516,31 +517,19 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
while (!stop) {
mcp_resp_t *r;
int res = 1;
- int remain = 0;
- char *newbuf = NULL;
switch(be->state) {
case mcp_backend_read:
assert(p != NULL);
- P_DEBUG("%s: [read] bread: %d\n", __func__, bread);
-
- if (bread == 0) {
- // haven't actually done a read yet; figure out where/what.
- *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
- return EV_READ;
- } else {
- be->state = mcp_backend_parse;
- }
+ // FIXME: remove the _read state?
+ be->state = mcp_backend_parse;
break;
case mcp_backend_parse:
r = p->client_resp;
- r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp);
- // FIXME (v2): Don't like how this mcmc API ended up.
- bread = 0; // only add the bread once per loop.
+ r->status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r->resp);
if (r->status != MCMC_OK) {
P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
if (r->status == MCMC_WANT_READ) {
- *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
return EV_READ;
} else {
flags = -1;
@@ -602,15 +591,17 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
if (r->resp.vlen != r->resp.vlen_read) {
+ // shouldn't be possible to have excess in buffer
+ // if we're dealing with a partial value.
+ assert(be->rbufused == r->resp.reslen+r->resp.vlen_read);
P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
// copy the partial and advance mcmc's buffer digestion.
- // FIXME (v2): should call this for both cases?
- // special function for advancing mcmc's buffer for
- // reading a value? perhaps a flag to skip the data copy
- // when it's unnecessary?
- memcpy(r->buf, be->rbuf, r->resp.reslen);
- r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread);
+ memcpy(r->buf, be->rbuf, r->resp.reslen + r->resp.vlen_read);
+ r->bread = r->resp.reslen + r->resp.vlen_read;
+ be->rbufused = 0;
be->state = mcp_backend_want_read;
+ flags |= EV_READ;
+ stop = true;
break;
} else {
// mcmc's already counted the value as read if it fit in
@@ -627,14 +618,13 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
break;
}
- if (r->resp.type == MCMC_RESP_GET) {
- // advance the buffer
- newbuf = mcmc_buffer_consume(be->client, &remain);
- if (remain != 0) {
- // TODO (v2): don't need to shuffle buffer with better API
- memmove(be->rbuf, newbuf, remain);
- }
+ // had a response, advance the buffer.
+ be->rbufused -= r->resp.reslen + r->resp.vlen_read;
+ if (be->rbufused > 0) {
+ memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused);
+ }
+ if (r->resp.type == MCMC_RESP_GET) {
be->state = mcp_backend_read_end;
} else {
be->state = mcp_backend_next;
@@ -646,32 +636,32 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
// we need to ensure the next data in the stream is "END\r\n"
// if not, the stack is desynced and we lose it.
- r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp);
- bread = 0;
- P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type);
- if (r->status != MCMC_OK) {
- if (r->status == MCMC_WANT_READ) {
- *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
- return EV_READ;
- } else {
- flags = -1; // TODO (v2): specific error.
+ if (be->rbufused >= ENDLEN) {
+ if (memcmp(be->rbuf, ENDSTR, ENDLEN) != 0) {
+ // TODO (v2): specific error.
+ flags = -1;
stop = true;
+ break;
+ } else {
+ // response is good.
+ // FIXME (v2): copy what the server actually sent?
+ if (!p->ascii_multiget) {
+ // sigh... if part of a multiget we need to eat the END
+ // markers down here.
+ memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
+ r->blen += 5;
+ }
+
+ // advance buffer
+ be->rbufused -= ENDLEN;
+ if (be->rbufused > 0) {
+ memmove(be->rbuf, be->rbuf+ENDLEN, be->rbufused);
+ }
}
- break;
- } else if (tmp_resp.type != MCMC_RESP_END) {
- // TODO (v2): specific error about protocol desync
- flags = -1;
+ } else {
+ flags |= EV_READ;
stop = true;
break;
- } else {
- // response is good.
- // FIXME (v2): copy what the server actually sent?
- if (!p->ascii_multiget) {
- // sigh... if part of a multiget we need to eat the END
- // markers down here.
- memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
- r->blen += 5;
- }
}
be->state = mcp_backend_next;
@@ -682,28 +672,30 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
r = p->client_resp;
// take bread input and see if we're done reading the value,
// else advance, set buffers, return next.
- if (bread > 0) {
- r->bread += bread;
- bread = 0;
- }
P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
+ assert(be->rbufused != 0);
+ size_t tocopy = be->rbufused < r->blen - r->bread ?
+ be->rbufused : r->blen - r->bread;
+ memcpy(r->buf+r->bread, be->rbuf, tocopy);
+ r->bread += tocopy;
if (r->bread >= r->resp.vlen) {
// all done copying data.
if (r->resp.type == MCMC_RESP_GET) {
- newbuf = mcmc_buffer_consume(be->client, &remain);
- // Shouldn't be anything in the buffer if we had to run to
- // want_read to read the value.
- assert(remain == 0);
be->state = mcp_backend_read_end;
} else {
be->state = mcp_backend_next;
}
+
+ // shuffle remaining buffer.
+ be->rbufused -= tocopy;
+ if (be->rbufused > 0) {
+ memmove(be->rbuf, be->rbuf+tocopy, be->rbufused);
+ }
} else {
+ assert(tocopy == be->rbufused);
// signal to caller to issue a read.
- *rbuf = r->buf+r->resp.reslen+r->bread;
- *toread = r->resp.vlen - r->bread;
- // need to retry later.
+ be->rbufused = 0;
flags |= EV_READ;
stop = true;
}
@@ -717,28 +709,28 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
// stuff here. The moment we call return_io here we
// don't own *p anymore.
return_io_pending((io_pending_t *)p);
+ be->state = mcp_backend_read;
if (STAILQ_EMPTY(&be->io_head)) {
- // TODO (v2): suspicious of this code. audit harder?
stop = true;
+ // TODO: if there're no pending requests, the read buffer
+ // should also be empty.
+ // Get a specific return code for errors to surface this.
+ if (be->rbufused > 0) {
+ flags = -1;
+ }
+ break;
} else {
p = STAILQ_FIRST(&be->io_head);
}
- // mcmc_buffer_consume() - if leftover, keep processing
- // IO's.
+ // if leftover, keep processing IO's.
// if no more data in buffer, need to re-set stack head and re-set
// event.
- remain = 0;
- // TODO (v2): do we need to yield every N reads?
- newbuf = mcmc_buffer_consume(be->client, &remain);
- P_DEBUG("%s: [next] remain: %d\n", __func__, remain);
- be->state = mcp_backend_read;
- if (remain != 0) {
+ P_DEBUG("%s: [next] remain: %d\n", __func__, be->rbufused);
+ if (be->rbufused != 0) {
// data trailing in the buffer, for a different request.
- memmove(be->rbuf, newbuf, remain);
be->state = mcp_backend_parse;
- P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain);
} else {
// need to read more data, buffer is empty.
stop = true;
@@ -820,6 +812,8 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
STAILQ_INIT(&be->io_head);
+ // reset buffer to blank state.
+ be->rbufused = 0;
mcmc_disconnect(be->client);
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
if (status == MCMC_CONNECTED) {
@@ -974,36 +968,24 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
if (which & EV_READ) {
// We do the syscall here before diving into the state machine to allow a
// common code path for io_uring/epoll
- int res = 1;
- int read = 0;
- while (res > 0) {
- char *rbuf = NULL;
- size_t toread = 0;
- // need to input how much was read since last call
- // needs _output_ of the buffer to read into and how much.
- res = proxy_backend_drive_machine(be, read, &rbuf, &toread);
- P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread);
-
- if (res > 0) {
- read = recv(mcmc_fd(be->client), rbuf, toread, 0);
- P_DEBUG("%s: read: %d\n", __func__, read);
- if (read == 0) {
- // not connected or error.
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- return;
- } else if (read == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- break; // sit on epoll again.
- } else {
- _reset_bad_backend(be, P_BE_FAIL_READING);
- return;
- }
- }
- } else if (res == -1) {
+ int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused,
+ READ_BUFFER_SIZE - be->rbufused, 0);
+ if (read > 0) {
+ be->rbufused += read;
+ int res = proxy_backend_drive_machine(be);
+ if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_PARSING);
return;
- } else {
- break;
+ }
+ } else if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ return;
+ } else if (read == -1) {
+ // sit on epoll again.
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ return;
}
}