summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-03-01 12:51:47 -0800
committerdormando <dormando@rydia.net>2022-03-01 12:51:47 -0800
commitfa745db8fffe7d13438fe2437bdf8fcb1372bc96 (patch)
tree15bd7946d2d9ac6631c635f5c7ac12228e078f0b
parent2a903f04d1b395cd60c1b7357a9b733e19eb7973 (diff)
downloadmemcached-fa745db8fffe7d13438fe2437bdf8fcb1372bc96.tar.gz
proxy: hacky method of supporting noreply/quiet
avoids sending the response to the client, in most cases. works by stripping the noreply status from the request before sending it along, so the proxy itself knows when to move the request forward. has sharp edges: - only looking at the request object that's actually sent to the backend, instead of the request object that created the coroutine. - overriding tokens in lua to re-set the noreply mode would break the protocol. So this change helps us validate the feature but solidifying it requires moving it to the "edges" of processing; before the coroutine and after any command assembly (or within the command assembly).
-rw-r--r--proto_proxy.c53
-rw-r--r--proxy.h13
-rw-r--r--proxy_await.c21
-rw-r--r--proxy_request.c29
4 files changed, 110 insertions, 6 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 8505868..f276fe1 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -470,6 +470,36 @@ static void proxy_out_errstring(mc_resp *resp, const char *str) {
return;
}
+// NOTE: See notes in mcp_queue_io; the secondary problem with setting the
+// noreply mode from the response object is that the proxy can return strings
+// manually, so we have no way to obey what the original request wanted in
+// that case.
+static void _set_noreply_mode(mc_resp *resp, mcp_resp_t *r) {
+ switch (r->mode) {
+ case RESP_MODE_NORMAL:
+ break;
+ case RESP_MODE_NOREPLY:
+ // ascii noreply only threw egregious errors to client
+ if (r->status == MCMC_OK) {
+ resp->skip = true;
+ }
+ break;
+ case RESP_MODE_METAQUIET:
+ if (r->resp.code == MCMC_CODE_MISS) {
+ resp->skip = true;
+ } else if (r->cmd[1] != 'g' && r->resp.code == MCMC_CODE_OK) {
+ // FIXME (v2): mcmc's parser needs to help us out a bit more
+ // here.
+ // This is a broken case in the protocol though; quiet mode
+ // ignores HD for mutations but not get.
+ resp->skip = true;
+ }
+ break;
+ default:
+ assert(1 == 0);
+ }
+}
+
// this resumes every yielded coroutine (and re-resumes if necessary).
// called from the worker thread after responses have been pulled from the
// network.
@@ -492,6 +522,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response");
LOGGER_LOG(NULL, LOG_PROXYCMDS, LOGGER_PROXY_RAW, NULL, r->start, r->cmd, r->resp.type, r->resp.code);
+ _set_noreply_mode(resp, r);
if (r->buf) {
// response set from C.
// FIXME (v2): write_and_free() ? it's a bit wrong for here.
@@ -760,6 +791,28 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
r->buf = NULL;
r->blen = 0;
r->start = rq->start; // need to inherit the original start time.
+ // Set noreply mode.
+ // TODO (v2): the response "inherits" the request's noreply mode, which isn't
+ // strictly correct; we should inherit based on the request that spawned
+ // the coroutine but the structure doesn't allow that yet.
+ // Should also be able to settle this exact mode from the parser so we
+ // don't have to re-branch here.
+ if (rq->pr.noreply) {
+ if (rq->pr.cmd_type == CMD_TYPE_META) {
+ r->mode = RESP_MODE_METAQUIET;
+ for (int x = 2; x < rq->pr.ntokens; x++) {
+ if (rq->request[rq->pr.tokens[x]] == 'q') {
+ rq->request[rq->pr.tokens[x]] = ' ';
+ }
+ }
+ } else {
+ r->mode = RESP_MODE_NOREPLY;
+ rq->request[rq->pr.reqlen - 3] = 'Y';
+ }
+ } else {
+ r->mode = RESP_MODE_NORMAL;
+ }
+
int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
for (x = 0; x < end; x++) {
diff --git a/proxy.h b/proxy.h
index 535ae8b..fea6879 100644
--- a/proxy.h
+++ b/proxy.h
@@ -131,7 +131,6 @@ enum proxy_defines {
enum proxy_cmd_types {
CMD_TYPE_GENERIC = 0,
CMD_TYPE_GET, // get/gets/gat/gats
- CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
CMD_TYPE_META, // m*'s.
};
@@ -273,6 +272,7 @@ struct mcp_parser_s {
uint32_t klen; // length of key.
uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
bool has_space; // a space was found after the last byte parsed.
+ bool noreply; // if quiet/noreply mode is set.
union {
struct mcp_parser_meta_s meta;
} t;
@@ -355,15 +355,22 @@ struct proxy_event_thread_s {
struct proxy_tunables tunables; // periodically copied from main ctx
};
+enum mcp_resp_mode {
+ RESP_MODE_NORMAL = 0,
+ RESP_MODE_NOREPLY,
+ RESP_MODE_METAQUIET
+};
+
#define RESP_CMD_MAX 8
typedef struct {
mcmc_resp_t resp;
struct timeval start; // start time inherited from paired request
- char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
- int status; // status code from mcmc_read()
char *buf; // response line + potentially value.
size_t blen; // total size of the value to read.
+ int status; // status code from mcmc_read()
int bread; // amount of bytes read into value so far.
+ char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
+ enum mcp_resp_mode mode; // reply mode (for noreply fixing)
} mcp_resp_t;
// re-cast an io_pending_t into this more descriptive structure.
diff --git a/proxy_await.c b/proxy_await.c
index f69e7df..c504b9a 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -91,6 +91,27 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
memset(r, 0, sizeof(mcp_resp_t));
r->start = rq->start;
+ // Set noreply mode.
+ // TODO (v2): the response "inherits" the request's noreply mode, which isn't
+ // strictly correct; we should inherit based on the request that spawned
+ // the coroutine but the structure doesn't allow that yet.
+ // Should also be able to settle this exact mode from the parser so we
+ // don't have to re-branch here.
+ if (rq->pr.noreply) {
+ if (rq->pr.cmd_type == CMD_TYPE_META) {
+ r->mode = RESP_MODE_METAQUIET;
+ for (int x = 2; x < rq->pr.ntokens; x++) {
+ if (rq->request[rq->pr.tokens[x]] == 'q') {
+ rq->request[rq->pr.tokens[x]] = ' ';
+ }
+ }
+ } else {
+ r->mode = RESP_MODE_NOREPLY;
+ rq->request[rq->pr.reqlen - 3] = 'Y';
+ }
+ } else {
+ r->mode = RESP_MODE_NORMAL;
+ }
int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
diff --git a/proxy_request.c b/proxy_request.c
index c52f9d9..9aecf5b 100644
--- a/proxy_request.c
+++ b/proxy_request.c
@@ -43,6 +43,9 @@ static int _process_tokenize(mcp_parser_t *pr, const size_t max) {
}
endloop:
+ // endcap token so we can quickly find the length of any token by looking
+ // at the next one.
+ pr->tokens[curtoken] = len;
pr->ntokens = curtoken;
P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken);
@@ -124,7 +127,7 @@ static int _process_request_metaflags(mcp_parser_t *pr, int token) {
return -1;
}
P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65);
- pr->t.meta.flags |= 1 << (*cur - 65);
+ pr->t.meta.flags |= (uint64_t)1 << (*cur - 65);
state = 1;
}
break;
@@ -138,6 +141,13 @@ static int _process_request_metaflags(mcp_parser_t *pr, int token) {
}
}
+ // not too great hack for noreply detection: this can be flattened out
+ // once a few other contexts are fixed and we detect the noreply from the
+ // coroutine start instead.
+ if (pr->t.meta.flags & ((uint64_t)1 << 48)) {
+ pr->noreply = true;
+ }
+
return 0;
}
@@ -198,6 +208,18 @@ static int _process_request_gat(mcp_parser_t *pr) {
return 0;
}
+#define NOREPLYSTR "noreply"
+#define NOREPLYLEN sizeof(NOREPLYSTR)-1
+// given a tokenized parser for a normal ASCII command, checks for noreply
+// mode.
+static int _process_request_noreply(mcp_parser_t *pr) {
+ if (pr->tokens[pr->ntokens] - pr->tokens[pr->ntokens-1] >= NOREPLYLEN
+ && strncmp(NOREPLYSTR, pr->request + pr->tokens[pr->ntokens-1], NOREPLYLEN) == 0) {
+ pr->noreply = true;
+ }
+ return 0;
+}
+
// we need t find the bytes supplied immediately so we can read the request
// from the client properly.
// set <key> <flags> <exptime> <bytes> [noreply]\r\n
@@ -226,7 +248,7 @@ static int _process_request_storage(mcp_parser_t *pr, size_t max) {
pr->vlen = vlen;
- return 0;
+ return _process_request_noreply(pr);
}
// common request with key: <cmd> <key> <args>
@@ -235,7 +257,7 @@ static int _process_request_simple(mcp_parser_t *pr, const size_t max) {
pr->keytoken = 1; // second token is usually the key... stupid GAT.
_process_request_key(pr);
- return 0;
+ return _process_request_noreply(pr);
}
// TODO: return code ENUM with error types.
@@ -278,6 +300,7 @@ int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) {
break;
case 2:
if (cm[0] == 'm') {
+ type = CMD_TYPE_META;
switch (cm[1]) {
case 'g':
cmd = CMD_MG;