diff options
author | dormando <dormando@rydia.net> | 2022-05-31 23:26:23 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-07-24 23:02:50 -0700 |
commit | 581e41623074cc989c11de03099994fe74db164a (patch) | |
tree | 809dbc35d6a94439700d05f63bf41ab064abe1ac /vendor | |
parent | 8715e9692b374e93d822c878db198ae7ae3d520f (diff) | |
download | memcached-581e41623074cc989c11de03099994fe74db164a.tar.gz |
mcmc: experimental api
Diffstat (limited to 'vendor')
-rw-r--r-- | vendor/mcmc/example.c | 84 | ||||
-rw-r--r-- | vendor/mcmc/mcmc.c | 139 | ||||
-rw-r--r-- | vendor/mcmc/mcmc.h | 7 |
3 files changed, 119 insertions, 111 deletions
diff --git a/vendor/mcmc/example.c b/vendor/mcmc/example.c index 958c351..4631053 100644 --- a/vendor/mcmc/example.c +++ b/vendor/mcmc/example.c @@ -6,9 +6,55 @@ #include <poll.h> #include <signal.h> #include <sys/uio.h> +#include <sys/types.h> +#include <sys/socket.h> #include "mcmc.h" +// in this form the socket syscalls are handled externally to the client, so +// we need to parse the protocol out of a buffer directly. +static void show_response_buffer(void *c, char *rbuf, size_t bufsize) { + int status; + mcmc_resp_t resp; + char *val = NULL; + + do { + int bread = recv(mcmc_fd(c), rbuf, bufsize, 0); + + // need to know how far to advance the buffer. + // resp->reslen + resp->vlen_read works, but feels awkward. + status = mcmc_parse_buf(c, rbuf, bread, &resp); + } while (status == MCMC_WANT_READ); + + if (status != MCMC_OK) { + printf("bad response\n"); + } + + // now we need to read the value back. + // resp.reslen + resp.vlen is the total length. + // resp.reslen + resp.vlen_read is how much of the buffer was used. + // resp.vlen_read vs resp.vlen is how much was read vs how much still + // needs to be read from the socket. + if (resp.vlen != resp.vlen_read) { + // malloc and recv the rest. + // can/should add convenience functions for this? + val = malloc(resp.vlen); + memcpy(val, resp.value, resp.vlen_read); + size_t toread = resp.vlen - resp.vlen_read; + char *buf = val + resp.vlen_read; + do { + // TODO: bug: check for read == 0 + int read = recv(mcmc_fd(c), buf, toread, 0); + toread -= read; + } while (toread > 0); + } else { + val = resp.value; + } + + // TODO: add the rest of the parser loop. + printf("read a response: %s\n", rbuf); +} +/* static void show_response(void *c, char *rbuf, size_t bufsize) { int status; // buffer shouldn't change until the read is completed. @@ -16,6 +62,7 @@ static void show_response(void *c, char *rbuf, size_t bufsize) { int go = 1; while (go) { go = 0; + // TODO: return flags? VALUE_PARTIAL flag? status = mcmc_read(c, rbuf, bufsize, &resp); if (status == MCMC_OK) { // OK means a response of some kind was read. @@ -29,7 +76,7 @@ static void show_response(void *c, char *rbuf, size_t bufsize) { val = malloc(resp.vlen); int read = 0; do { - status = mcmc_read_value(c, val, resp.vlen, &read); + status = mcmc_read_value(c, val, &resp, &read); } while (status == MCMC_WANT_READ); } if (resp.vlen > 0) { @@ -94,6 +141,33 @@ static void show_response(void *c, char *rbuf, size_t bufsize) { } } } +*/ +void buffer_mode(void) { + void *c = malloc(mcmc_size(MCMC_OPTION_BLANK)); + size_t bufsize = mcmc_min_buffer_size(MCMC_OPTION_BLANK) * 2; + char *rbuf = malloc(bufsize); + + int status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_BLANK); + + char *requests[5] = {"get foo\r\n", + "get foob\r\n", + "mg foo s t v\r\n", + "mg doof s t v Omoo k\r\n", + ""}; + + for (int x = 0; strlen(requests[x]) != 0; x++) { + status = mcmc_send_request(c, requests[x], strlen(requests[x]), 1); + + if (status != MCMC_OK) { + fprintf(stderr, "Failed to send request to memcached\n"); + return; + } + + // Regardless of what command we sent, this should print out the response. + show_response_buffer(c, rbuf, bufsize); + } + +} int main (int argc, char *agv[]) { // TODO: detect if C is pre-C11? @@ -103,7 +177,7 @@ int main (int argc, char *agv[]) { perror("signal"); exit(1); } - +/* void *c = malloc(mcmc_size(MCMC_OPTION_BLANK)); // we only "need" the minimum buf size. // buffers large enough to fit return values result in fewer syscalls. @@ -144,6 +218,7 @@ int main (int argc, char *agv[]) { } // Regardless of what command we sent, this should print out the response. + // TODO: mcmc_read() and friends need to be remade show_response(c, rbuf, bufsize); } @@ -151,9 +226,10 @@ int main (int argc, char *agv[]) { status = mcmc_disconnect(c); // The only free'ing needed. free(c); - +*/ // TODO: stats example. + /* // nonblocking example. c = malloc(mcmc_size(MCMC_OPTION_BLANK)); // reuse bufsize/rbuf. @@ -209,6 +285,8 @@ int main (int argc, char *agv[]) { show_response(c, rbuf, bufsize); } + */ + buffer_mode(); return 0; } diff --git a/vendor/mcmc/mcmc.c b/vendor/mcmc/mcmc.c index e62cdea..89a2753 100644 --- a/vendor/mcmc/mcmc.c +++ b/vendor/mcmc/mcmc.c @@ -38,7 +38,6 @@ typedef struct mcmc_ctx { int gai_status; // getaddrinfo() last status. int last_sys_error; // last syscall error (connect/etc?) int sent_bytes_partial; // note for partially sent buffers. - int request_queue; // supposed outstanding replies. int fail_code; // recent failure reason. int error; // latest error code. uint32_t status_flags; // internal only flags. @@ -48,15 +47,11 @@ typedef struct mcmc_ctx { size_t buffer_used; // amount of bytes read into the buffer so far. size_t buffer_request_len; // cached endpoint for current request char *buffer_head; // buffer pointer currently in use. - char *buffer_tail; // consumed tail of the buffer. - - // request response detail. - mcmc_resp_t *resp; } mcmc_ctx_t; // INTERNAL FUNCTIONS -static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) { +static int _mcmc_parse_value_line(mcmc_ctx_t *ctx, mcmc_resp_t *r) { char *buf = ctx->buffer_head; // we know that "VALUE " has matched, so skip that. char *p = buf+6; @@ -104,14 +99,10 @@ static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) { // If we made it this far, we've parsed everything, stuff the details into // the context for fetching later. - mcmc_resp_t *r = ctx->resp; - // FIXME: set to NULL if we don't have the value? - r->value = ctx->buffer_tail; r->vlen = bytes + 2; // add in the \r\n - int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + int buffer_remain = ctx->buffer_used - (r->value - ctx->buffer_head); if (buffer_remain >= r->vlen) { r->vlen_read = r->vlen; - ctx->buffer_tail += r->vlen; } else { r->vlen_read = buffer_remain; } @@ -131,13 +122,12 @@ static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) { // FIXME: This is broken for ASCII multiget. // if we get VALUE back, we need to stay in ASCII GET read mode until an END // is seen. -static int _mcmc_parse_response(mcmc_ctx_t *ctx) { +static int _mcmc_parse_response(mcmc_ctx_t *ctx, mcmc_resp_t *r) { char *buf = ctx->buffer_head; char *cur = buf; size_t l = ctx->buffer_request_len; int rlen; // response code length. int more = 0; - mcmc_resp_t *r = ctx->resp; r->reslen = ctx->buffer_request_len; r->type = MCMC_RESP_GENERIC; @@ -230,13 +220,11 @@ static int _mcmc_parse_response(mcmc_ctx_t *ctx) { if ((errno == ERANGE) || (cur == n)) { rv = MCMC_ERR; } else { - r->value = ctx->buffer_tail; r->vlen = vsize + 2; // tag in the \r\n. // FIXME: macro. - int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + int buffer_remain = ctx->buffer_used - (r->value - ctx->buffer_head); if (buffer_remain >= r->vlen) { r->vlen_read = r->vlen; - ctx->buffer_tail += r->vlen; } else { r->vlen_read = buffer_remain; } @@ -282,7 +270,7 @@ static int _mcmc_parse_response(mcmc_ctx_t *ctx) { if (memcmp(buf, "VALUE", 5) == 0) { if (more) { // <key> <flags> <bytes> [<cas unique>] - rv = _mcmc_parse_value_line(ctx); + rv = _mcmc_parse_value_line(ctx, r); } else { rv = MCMC_ERR; // FIXME: parse error. } @@ -350,13 +338,6 @@ size_t mcmc_min_buffer_size(int options) { return MIN_BUFFER_SIZE; } -char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) { - mcmc_ctx_t *ctx = c; - char *b = buf + ctx->buffer_used; - *bufremain = bufsize - ctx->buffer_used; - return b; -} - // Directly parse a buffer with read data of size len. // r->reslen + r->vlen_read is the bytes consumed from the buffer read. // Caller manages how to retry if MCMC_WANT_READ or an error happens. @@ -366,31 +347,25 @@ char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) { int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r) { mcmc_ctx_t *ctx = c; char *el; - ctx->buffer_used += read; - el = memchr(buf, '\n', ctx->buffer_used); + el = memchr(buf, '\n', read); if (el == NULL) { return MCMC_WANT_READ; } memset(r, 0, sizeof(*r)); - // Consume through the newline. - // buffer_tail now points to where value could start. - // FIXME: ctx->value ? - ctx->buffer_tail = el+1; + // Consume through the newline, note where the value would start if exists + r->value = el+1; + ctx->buffer_used = read; // FIXME: the server must be stricter in what it sends back. should always // have a \r. check for it and fail? - ctx->buffer_request_len = ctx->buffer_tail - buf; + ctx->buffer_request_len = r->value - buf; // leave the \r\n in the line end cache. ctx->buffer_head = buf; - // TODO: handling for nonblock case. - // We have a result line. Now pass it through the parser. - // Then we indicate to the user that a response is ready. - ctx->resp = r; - return _mcmc_parse_response(ctx); + return _mcmc_parse_response(ctx, r); } /*** Functions wrapping syscalls **/ @@ -531,7 +506,6 @@ int mcmc_send_request(void *c, const char *request, int len, int count) { ctx->sent_bytes_partial += sent; return MCMC_WANT_WRITE; } else { - ctx->request_queue += count; ctx->sent_bytes_partial = 0; } @@ -562,16 +536,19 @@ int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *s if (*sent < tosend) { // can happen anytime, but mostly in nonblocking mode. return MCMC_WANT_WRITE; - } else { - // FIXME: user has to keep submitting the same count value... - // should decide on whether or not to give up on this. - ctx->request_queue += count; } return MCMC_OK; } -// TODO: avoid recv if we have bytes in the buffer. +// TODO: return consumed bytes and end ptr? +// FIXME: mcmc no longer tracks the buffer inbetween commands, which was +// causing issues with the API and bugs. +// This function wraps the recv call, so it needs to understand the buffer a +// little bit. Since memcached doesn't currently use this function I'm +// commenting it out with this note so it can be rewritten in terms of an +// external buffer later. +/* int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) { mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; char *el; @@ -614,85 +591,62 @@ int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) { } parse: // Consume through the newline. - // buffer_tail now points to where a value could start. - ctx->buffer_tail = el+1; + r->value = el+1; // FIXME: the server must be stricter in what it sends back. should always // have a \r. check for it and fail? - ctx->buffer_request_len = ctx->buffer_tail - buf; + ctx->buffer_request_len = r->value - buf; // leave the \r\n in the line end cache. ctx->buffer_head = buf; // TODO: handling for nonblock case. // We have a result line. Now pass it through the parser. // Then we indicate to the user that a response is ready. - ctx->resp = r; - return _mcmc_parse_response(ctx); + return _mcmc_parse_response(ctx, r); } +*/ void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen) { code[0] = '\0'; msg[0] = '\0'; } -int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read) { - mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; - - // If the distance between tail/head is smaller than what we read into the - // main buffer, we have some value to copy out. - int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); - if (leftover > 0) { - int tocopy = leftover > vsize ? vsize : leftover; - memcpy(val + *read, ctx->buffer_tail, tocopy); - ctx->buffer_tail += tocopy; - *read += tocopy; - if (leftover > tocopy) { - // FIXME: think we need a specific code for "value didn't fit" - return MCMC_WANT_READ; - } - } - - return MCMC_OK; -} - // read into the buffer, up to a max size of vsize. // will read (vsize-read) into the buffer pointed to by (val+read). // you are able to stream the value into different buffers, or process the // value and reuse the same buffer, by adjusting vsize and *read between // calls. // vsize must not be larger than the remaining value size pending read. -int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) { +/* TODO: see notes on mcmc_read() +int mcmc_read_value(void *c, char *val, mcmc_resp_t *r, int *read) { mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; size_t l; // If the distance between tail/head is smaller than what we read into the // main buffer, we have some value to copy out. - int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); - if (leftover > 0) { - int tocopy = leftover > vsize ? vsize : leftover; - memcpy(val + *read, ctx->buffer_tail, tocopy); - ctx->buffer_tail += tocopy; - *read += tocopy; - if (leftover > tocopy) { - // FIXME: think we need a specific code for "value didn't fit" - return MCMC_WANT_READ; + size_t vsize = r->vlen; + if (*read < r->vlen_read) { + memcpy(val + *read, r->value, r->vlen_read); + *read += r->vlen_read; + if (r->vlen_read >= r->vlen) { + return MCMC_OK; } } char *v = val + *read; l = vsize - *read; - int r = recv(ctx->fd, v, l, 0); - if (r == 0) { + int rd = recv(ctx->fd, v, l, 0); + if (rd == 0) { // TODO: some internal disconnect work? return MCMC_NOT_CONNECTED; } // FIXME: EAGAIN || EWOULDBLOCK! - if (r == -1) { + if (rd == -1) { return MCMC_ERR; } - *read += r; + *read += rd; if (*read < vsize) { return MCMC_WANT_READ; @@ -700,28 +654,7 @@ int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) { return MCMC_OK; } } - -char *mcmc_buffer_consume(void *c, int *remain) { - mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; - ctx->buffer_used -= ctx->buffer_tail - ctx->buffer_head; - int used = ctx->buffer_used; - char *newbuf = ctx->buffer_tail; - - // FIXME: request_queue-- is in the wrong place. - // TODO: which of these _must_ be reset between requests? I think very - // little? - ctx->request_queue--; - ctx->status_flags = 0; - ctx->buffer_head = NULL; - ctx->buffer_tail = NULL; - - if (used) { - *remain = used; - return newbuf; - } else { - return NULL; - } -} +*/ int mcmc_disconnect(void *c) { mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; diff --git a/vendor/mcmc/mcmc.h b/vendor/mcmc/mcmc.h index dba159a..19e4ce4 100644 --- a/vendor/mcmc/mcmc.h +++ b/vendor/mcmc/mcmc.h @@ -76,16 +76,13 @@ typedef struct { int mcmc_fd(void *c); size_t mcmc_size(int options); size_t mcmc_min_buffer_size(int options); -char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain); int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r); int mcmc_connect(void *c, char *host, char *port, int options); int mcmc_check_nonblock_connect(void *c, int *err); int mcmc_send_request(void *c, const char *request, int len, int count); int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count); -int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r); -int mcmc_read_value(void *c, char *val, const size_t vsize, int *read); -int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read); -char *mcmc_buffer_consume(void *c, int *remain); +//int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r); +//int mcmc_read_value(void *c, char *val, mcmc_resp_t *r, int *read); int mcmc_disconnect(void *c); void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen); |