summaryrefslogtreecommitdiff
path: root/vendor
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-05-31 23:26:23 -0700
committerdormando <dormando@rydia.net>2022-07-24 23:02:50 -0700
commit581e41623074cc989c11de03099994fe74db164a (patch)
tree809dbc35d6a94439700d05f63bf41ab064abe1ac /vendor
parent8715e9692b374e93d822c878db198ae7ae3d520f (diff)
downloadmemcached-581e41623074cc989c11de03099994fe74db164a.tar.gz
mcmc: experimental api
Diffstat (limited to 'vendor')
-rw-r--r--vendor/mcmc/example.c84
-rw-r--r--vendor/mcmc/mcmc.c139
-rw-r--r--vendor/mcmc/mcmc.h7
3 files changed, 119 insertions, 111 deletions
diff --git a/vendor/mcmc/example.c b/vendor/mcmc/example.c
index 958c351..4631053 100644
--- a/vendor/mcmc/example.c
+++ b/vendor/mcmc/example.c
@@ -6,9 +6,55 @@
#include <poll.h>
#include <signal.h>
#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
#include "mcmc.h"
+// in this form the socket syscalls are handled externally to the client, so
+// we need to parse the protocol out of a buffer directly.
+static void show_response_buffer(void *c, char *rbuf, size_t bufsize) {
+ int status;
+ mcmc_resp_t resp;
+ char *val = NULL;
+
+ do {
+ int bread = recv(mcmc_fd(c), rbuf, bufsize, 0);
+
+ // need to know how far to advance the buffer.
+ // resp->reslen + resp->vlen_read works, but feels awkward.
+ status = mcmc_parse_buf(c, rbuf, bread, &resp);
+ } while (status == MCMC_WANT_READ);
+
+ if (status != MCMC_OK) {
+ printf("bad response\n");
+ }
+
+ // now we need to read the value back.
+ // resp.reslen + resp.vlen is the total length.
+ // resp.reslen + resp.vlen_read is how much of the buffer was used.
+ // resp.vlen_read vs resp.vlen is how much was read vs how much still
+ // needs to be read from the socket.
+ if (resp.vlen != resp.vlen_read) {
+ // malloc and recv the rest.
+ // can/should add convenience functions for this?
+ val = malloc(resp.vlen);
+ memcpy(val, resp.value, resp.vlen_read);
+ size_t toread = resp.vlen - resp.vlen_read;
+ char *buf = val + resp.vlen_read;
+ do {
+ // TODO: bug: check for read == 0
+ int read = recv(mcmc_fd(c), buf, toread, 0);
+ toread -= read;
+ } while (toread > 0);
+ } else {
+ val = resp.value;
+ }
+
+ // TODO: add the rest of the parser loop.
+ printf("read a response: %s\n", rbuf);
+}
+/*
static void show_response(void *c, char *rbuf, size_t bufsize) {
int status;
// buffer shouldn't change until the read is completed.
@@ -16,6 +62,7 @@ static void show_response(void *c, char *rbuf, size_t bufsize) {
int go = 1;
while (go) {
go = 0;
+ // TODO: return flags? VALUE_PARTIAL flag?
status = mcmc_read(c, rbuf, bufsize, &resp);
if (status == MCMC_OK) {
// OK means a response of some kind was read.
@@ -29,7 +76,7 @@ static void show_response(void *c, char *rbuf, size_t bufsize) {
val = malloc(resp.vlen);
int read = 0;
do {
- status = mcmc_read_value(c, val, resp.vlen, &read);
+ status = mcmc_read_value(c, val, &resp, &read);
} while (status == MCMC_WANT_READ);
}
if (resp.vlen > 0) {
@@ -94,6 +141,33 @@ static void show_response(void *c, char *rbuf, size_t bufsize) {
}
}
}
+*/
+void buffer_mode(void) {
+ void *c = malloc(mcmc_size(MCMC_OPTION_BLANK));
+ size_t bufsize = mcmc_min_buffer_size(MCMC_OPTION_BLANK) * 2;
+ char *rbuf = malloc(bufsize);
+
+ int status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_BLANK);
+
+ char *requests[5] = {"get foo\r\n",
+ "get foob\r\n",
+ "mg foo s t v\r\n",
+ "mg doof s t v Omoo k\r\n",
+ ""};
+
+ for (int x = 0; strlen(requests[x]) != 0; x++) {
+ status = mcmc_send_request(c, requests[x], strlen(requests[x]), 1);
+
+ if (status != MCMC_OK) {
+ fprintf(stderr, "Failed to send request to memcached\n");
+ return;
+ }
+
+ // Regardless of what command we sent, this should print out the response.
+ show_response_buffer(c, rbuf, bufsize);
+ }
+
+}
int main (int argc, char *agv[]) {
// TODO: detect if C is pre-C11?
@@ -103,7 +177,7 @@ int main (int argc, char *agv[]) {
perror("signal");
exit(1);
}
-
+/*
void *c = malloc(mcmc_size(MCMC_OPTION_BLANK));
// we only "need" the minimum buf size.
// buffers large enough to fit return values result in fewer syscalls.
@@ -144,6 +218,7 @@ int main (int argc, char *agv[]) {
}
// Regardless of what command we sent, this should print out the response.
+ // TODO: mcmc_read() and friends need to be remade
show_response(c, rbuf, bufsize);
}
@@ -151,9 +226,10 @@ int main (int argc, char *agv[]) {
status = mcmc_disconnect(c);
// The only free'ing needed.
free(c);
-
+*/
// TODO: stats example.
+ /*
// nonblocking example.
c = malloc(mcmc_size(MCMC_OPTION_BLANK));
// reuse bufsize/rbuf.
@@ -209,6 +285,8 @@ int main (int argc, char *agv[]) {
show_response(c, rbuf, bufsize);
}
+ */
+ buffer_mode();
return 0;
}
diff --git a/vendor/mcmc/mcmc.c b/vendor/mcmc/mcmc.c
index e62cdea..89a2753 100644
--- a/vendor/mcmc/mcmc.c
+++ b/vendor/mcmc/mcmc.c
@@ -38,7 +38,6 @@ typedef struct mcmc_ctx {
int gai_status; // getaddrinfo() last status.
int last_sys_error; // last syscall error (connect/etc?)
int sent_bytes_partial; // note for partially sent buffers.
- int request_queue; // supposed outstanding replies.
int fail_code; // recent failure reason.
int error; // latest error code.
uint32_t status_flags; // internal only flags.
@@ -48,15 +47,11 @@ typedef struct mcmc_ctx {
size_t buffer_used; // amount of bytes read into the buffer so far.
size_t buffer_request_len; // cached endpoint for current request
char *buffer_head; // buffer pointer currently in use.
- char *buffer_tail; // consumed tail of the buffer.
-
- // request response detail.
- mcmc_resp_t *resp;
} mcmc_ctx_t;
// INTERNAL FUNCTIONS
-static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) {
+static int _mcmc_parse_value_line(mcmc_ctx_t *ctx, mcmc_resp_t *r) {
char *buf = ctx->buffer_head;
// we know that "VALUE " has matched, so skip that.
char *p = buf+6;
@@ -104,14 +99,10 @@ static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) {
// If we made it this far, we've parsed everything, stuff the details into
// the context for fetching later.
- mcmc_resp_t *r = ctx->resp;
- // FIXME: set to NULL if we don't have the value?
- r->value = ctx->buffer_tail;
r->vlen = bytes + 2; // add in the \r\n
- int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ int buffer_remain = ctx->buffer_used - (r->value - ctx->buffer_head);
if (buffer_remain >= r->vlen) {
r->vlen_read = r->vlen;
- ctx->buffer_tail += r->vlen;
} else {
r->vlen_read = buffer_remain;
}
@@ -131,13 +122,12 @@ static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) {
// FIXME: This is broken for ASCII multiget.
// if we get VALUE back, we need to stay in ASCII GET read mode until an END
// is seen.
-static int _mcmc_parse_response(mcmc_ctx_t *ctx) {
+static int _mcmc_parse_response(mcmc_ctx_t *ctx, mcmc_resp_t *r) {
char *buf = ctx->buffer_head;
char *cur = buf;
size_t l = ctx->buffer_request_len;
int rlen; // response code length.
int more = 0;
- mcmc_resp_t *r = ctx->resp;
r->reslen = ctx->buffer_request_len;
r->type = MCMC_RESP_GENERIC;
@@ -230,13 +220,11 @@ static int _mcmc_parse_response(mcmc_ctx_t *ctx) {
if ((errno == ERANGE) || (cur == n)) {
rv = MCMC_ERR;
} else {
- r->value = ctx->buffer_tail;
r->vlen = vsize + 2; // tag in the \r\n.
// FIXME: macro.
- int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ int buffer_remain = ctx->buffer_used - (r->value - ctx->buffer_head);
if (buffer_remain >= r->vlen) {
r->vlen_read = r->vlen;
- ctx->buffer_tail += r->vlen;
} else {
r->vlen_read = buffer_remain;
}
@@ -282,7 +270,7 @@ static int _mcmc_parse_response(mcmc_ctx_t *ctx) {
if (memcmp(buf, "VALUE", 5) == 0) {
if (more) {
// <key> <flags> <bytes> [<cas unique>]
- rv = _mcmc_parse_value_line(ctx);
+ rv = _mcmc_parse_value_line(ctx, r);
} else {
rv = MCMC_ERR; // FIXME: parse error.
}
@@ -350,13 +338,6 @@ size_t mcmc_min_buffer_size(int options) {
return MIN_BUFFER_SIZE;
}
-char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) {
- mcmc_ctx_t *ctx = c;
- char *b = buf + ctx->buffer_used;
- *bufremain = bufsize - ctx->buffer_used;
- return b;
-}
-
// Directly parse a buffer with read data of size len.
// r->reslen + r->vlen_read is the bytes consumed from the buffer read.
// Caller manages how to retry if MCMC_WANT_READ or an error happens.
@@ -366,31 +347,25 @@ char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) {
int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r) {
mcmc_ctx_t *ctx = c;
char *el;
- ctx->buffer_used += read;
- el = memchr(buf, '\n', ctx->buffer_used);
+ el = memchr(buf, '\n', read);
if (el == NULL) {
return MCMC_WANT_READ;
}
memset(r, 0, sizeof(*r));
- // Consume through the newline.
- // buffer_tail now points to where value could start.
- // FIXME: ctx->value ?
- ctx->buffer_tail = el+1;
+ // Consume through the newline, note where the value would start if exists
+ r->value = el+1;
+ ctx->buffer_used = read;
// FIXME: the server must be stricter in what it sends back. should always
// have a \r. check for it and fail?
- ctx->buffer_request_len = ctx->buffer_tail - buf;
+ ctx->buffer_request_len = r->value - buf;
// leave the \r\n in the line end cache.
ctx->buffer_head = buf;
- // TODO: handling for nonblock case.
- // We have a result line. Now pass it through the parser.
- // Then we indicate to the user that a response is ready.
- ctx->resp = r;
- return _mcmc_parse_response(ctx);
+ return _mcmc_parse_response(ctx, r);
}
/*** Functions wrapping syscalls **/
@@ -531,7 +506,6 @@ int mcmc_send_request(void *c, const char *request, int len, int count) {
ctx->sent_bytes_partial += sent;
return MCMC_WANT_WRITE;
} else {
- ctx->request_queue += count;
ctx->sent_bytes_partial = 0;
}
@@ -562,16 +536,19 @@ int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *s
if (*sent < tosend) {
// can happen anytime, but mostly in nonblocking mode.
return MCMC_WANT_WRITE;
- } else {
- // FIXME: user has to keep submitting the same count value...
- // should decide on whether or not to give up on this.
- ctx->request_queue += count;
}
return MCMC_OK;
}
-// TODO: avoid recv if we have bytes in the buffer.
+// TODO: return consumed bytes and end ptr?
+// FIXME: mcmc no longer tracks the buffer inbetween commands, which was
+// causing issues with the API and bugs.
+// This function wraps the recv call, so it needs to understand the buffer a
+// little bit. Since memcached doesn't currently use this function I'm
+// commenting it out with this note so it can be rewritten in terms of an
+// external buffer later.
+/*
int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) {
mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
char *el;
@@ -614,85 +591,62 @@ int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) {
}
parse:
// Consume through the newline.
- // buffer_tail now points to where a value could start.
- ctx->buffer_tail = el+1;
+ r->value = el+1;
// FIXME: the server must be stricter in what it sends back. should always
// have a \r. check for it and fail?
- ctx->buffer_request_len = ctx->buffer_tail - buf;
+ ctx->buffer_request_len = r->value - buf;
// leave the \r\n in the line end cache.
ctx->buffer_head = buf;
// TODO: handling for nonblock case.
// We have a result line. Now pass it through the parser.
// Then we indicate to the user that a response is ready.
- ctx->resp = r;
- return _mcmc_parse_response(ctx);
+ return _mcmc_parse_response(ctx, r);
}
+*/
void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen) {
code[0] = '\0';
msg[0] = '\0';
}
-int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read) {
- mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
-
- // If the distance between tail/head is smaller than what we read into the
- // main buffer, we have some value to copy out.
- int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
- if (leftover > 0) {
- int tocopy = leftover > vsize ? vsize : leftover;
- memcpy(val + *read, ctx->buffer_tail, tocopy);
- ctx->buffer_tail += tocopy;
- *read += tocopy;
- if (leftover > tocopy) {
- // FIXME: think we need a specific code for "value didn't fit"
- return MCMC_WANT_READ;
- }
- }
-
- return MCMC_OK;
-}
-
// read into the buffer, up to a max size of vsize.
// will read (vsize-read) into the buffer pointed to by (val+read).
// you are able to stream the value into different buffers, or process the
// value and reuse the same buffer, by adjusting vsize and *read between
// calls.
// vsize must not be larger than the remaining value size pending read.
-int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) {
+/* TODO: see notes on mcmc_read()
+int mcmc_read_value(void *c, char *val, mcmc_resp_t *r, int *read) {
mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
size_t l;
// If the distance between tail/head is smaller than what we read into the
// main buffer, we have some value to copy out.
- int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
- if (leftover > 0) {
- int tocopy = leftover > vsize ? vsize : leftover;
- memcpy(val + *read, ctx->buffer_tail, tocopy);
- ctx->buffer_tail += tocopy;
- *read += tocopy;
- if (leftover > tocopy) {
- // FIXME: think we need a specific code for "value didn't fit"
- return MCMC_WANT_READ;
+ size_t vsize = r->vlen;
+ if (*read < r->vlen_read) {
+ memcpy(val + *read, r->value, r->vlen_read);
+ *read += r->vlen_read;
+ if (r->vlen_read >= r->vlen) {
+ return MCMC_OK;
}
}
char *v = val + *read;
l = vsize - *read;
- int r = recv(ctx->fd, v, l, 0);
- if (r == 0) {
+ int rd = recv(ctx->fd, v, l, 0);
+ if (rd == 0) {
// TODO: some internal disconnect work?
return MCMC_NOT_CONNECTED;
}
// FIXME: EAGAIN || EWOULDBLOCK!
- if (r == -1) {
+ if (rd == -1) {
return MCMC_ERR;
}
- *read += r;
+ *read += rd;
if (*read < vsize) {
return MCMC_WANT_READ;
@@ -700,28 +654,7 @@ int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) {
return MCMC_OK;
}
}
-
-char *mcmc_buffer_consume(void *c, int *remain) {
- mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
- ctx->buffer_used -= ctx->buffer_tail - ctx->buffer_head;
- int used = ctx->buffer_used;
- char *newbuf = ctx->buffer_tail;
-
- // FIXME: request_queue-- is in the wrong place.
- // TODO: which of these _must_ be reset between requests? I think very
- // little?
- ctx->request_queue--;
- ctx->status_flags = 0;
- ctx->buffer_head = NULL;
- ctx->buffer_tail = NULL;
-
- if (used) {
- *remain = used;
- return newbuf;
- } else {
- return NULL;
- }
-}
+*/
int mcmc_disconnect(void *c) {
mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
diff --git a/vendor/mcmc/mcmc.h b/vendor/mcmc/mcmc.h
index dba159a..19e4ce4 100644
--- a/vendor/mcmc/mcmc.h
+++ b/vendor/mcmc/mcmc.h
@@ -76,16 +76,13 @@ typedef struct {
int mcmc_fd(void *c);
size_t mcmc_size(int options);
size_t mcmc_min_buffer_size(int options);
-char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain);
int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r);
int mcmc_connect(void *c, char *host, char *port, int options);
int mcmc_check_nonblock_connect(void *c, int *err);
int mcmc_send_request(void *c, const char *request, int len, int count);
int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count);
-int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r);
-int mcmc_read_value(void *c, char *val, const size_t vsize, int *read);
-int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read);
-char *mcmc_buffer_consume(void *c, int *remain);
+//int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r);
+//int mcmc_read_value(void *c, char *val, mcmc_resp_t *r, int *read);
int mcmc_disconnect(void *c);
void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen);