diff options
Diffstat (limited to 'proxy_request.c')
-rw-r--r-- | proxy_request.c | 672 |
1 files changed, 672 insertions, 0 deletions
diff --git a/proxy_request.c b/proxy_request.c new file mode 100644 index 0000000..c52f9d9 --- /dev/null +++ b/proxy_request.c @@ -0,0 +1,672 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ + +#include "proxy.h" + +#define PARSER_MAXLEN USHRT_MAX-1 + +// Find the starting offsets of each token; ignoring length. +// This creates a fast small (<= cacheline) index into the request, +// where we later scan or directly feed data into API's. +static int _process_tokenize(mcp_parser_t *pr, const size_t max) { + const char *s = pr->request; + int len = pr->reqlen - 2; + + // since multigets can be huge, we can't purely judge reqlen against this + // limit, but we also can't index past it since the tokens are shorts. + if (len > PARSER_MAXLEN) { + len = PARSER_MAXLEN; + } + const char *end = s + len; + int curtoken = 0; + + int state = 0; + while (s != end) { + switch (state) { + case 0: + if (*s != ' ') { + pr->tokens[curtoken] = s - pr->request; + if (++curtoken == max) { + goto endloop; + } + state = 1; + } + s++; + break; + case 1: + if (*s != ' ') { + s++; + } else { + state = 0; + } + break; + } + } +endloop: + + pr->ntokens = curtoken; + P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken); + + return 0; +} + +static int _process_token_len(mcp_parser_t *pr, size_t token) { + const char *cur = pr->request + pr->tokens[token]; + int remain = pr->reqlen - pr->tokens[token] - 2; // CRLF + + const char *s = memchr(cur, ' ', remain); + return (s != NULL) ? s - cur : remain; +} + +static int _process_request_key(mcp_parser_t *pr) { + pr->klen = _process_token_len(pr, pr->keytoken); + // advance the parser in case of multikey. + pr->parsed = pr->tokens[pr->keytoken] + pr->klen + 1; + + if (pr->request[pr->parsed-1] == ' ') { + P_DEBUG("%s: request_key found extra space\n", __func__); + pr->has_space = true; + } else { + pr->has_space = false; + } + return 0; +} + +// Just for ascii multiget: search for next "key" beyond where we stopped +// tokenizing before. +// Returns the offset for the next key. +size_t _process_request_next_key(mcp_parser_t *pr) { + const char *cur = pr->request + pr->parsed; + int remain = pr->reqlen - pr->parsed - 2; + + // chew off any leading whitespace. + while (remain) { + if (*cur == ' ') { + remain--; + cur++; + pr->parsed++; + } else { + break; + } + } + + const char *s = memchr(cur, ' ', remain); + if (s != NULL) { + pr->klen = s - cur; + pr->parsed += s - cur; + } else { + pr->klen = remain; + pr->parsed += remain; + } + + return cur - pr->request; +} + +// for fast testing of existence of meta flags. +// meta has all flags as final tokens +static int _process_request_metaflags(mcp_parser_t *pr, int token) { + if (pr->ntokens <= token) { + pr->t.meta.flags = 0; // no flags found. + return 0; + } + const char *cur = pr->request + pr->tokens[token]; + const char *end = pr->request + pr->reqlen - 2; + + // We blindly convert flags into bits, since the range of possible + // flags is deliberately < 64. + int state = 0; + while (cur != end) { + switch (state) { + case 0: + if (*cur == ' ') { + cur++; + } else { + if (*cur < 65 || *cur > 122) { + return -1; + } + P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65); + pr->t.meta.flags |= 1 << (*cur - 65); + state = 1; + } + break; + case 1: + if (*cur != ' ') { + cur++; + } else { + state = 0; + } + break; + } + } + + return 0; +} + +// All meta commands are of form: "cm key f l a g S100" +static int _process_request_meta(mcp_parser_t *pr) { + _process_tokenize(pr, PARSER_MAX_TOKENS); + if (pr->ntokens < 2) { + P_DEBUG("%s: not enough tokens for meta command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + // pass the first flag token. + return _process_request_metaflags(pr, 2); +} + +// ms <key> <datalen> <flags>*\r\n +static int _process_request_mset(mcp_parser_t *pr) { + _process_tokenize(pr, PARSER_MAX_TOKENS); + if (pr->ntokens < 3) { + P_DEBUG("%s: not enough tokens for meta set command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + const char *cur = pr->request + pr->tokens[2]; + + errno = 0; + char *n = NULL; + int vlen = strtol(cur, &n, 10); + if ((errno == ERANGE) || (cur == n)) { + return -1; + } + + if (vlen < 0 || vlen > (INT_MAX - 2)) { + return -1; + } + vlen += 2; + + pr->vlen = vlen; + + // pass the first flag token + return _process_request_metaflags(pr, 3); +} + +// gat[s] <exptime> <key>*\r\n +static int _process_request_gat(mcp_parser_t *pr) { + _process_tokenize(pr, 3); + if (pr->ntokens < 3) { + P_DEBUG("%s: not enough tokens for GAT: %d\n", __func__, pr->ntokens); + return -1; + } + + pr->keytoken = 2; + _process_request_key(pr); + return 0; +} + +// we need t find the bytes supplied immediately so we can read the request +// from the client properly. +// set <key> <flags> <exptime> <bytes> [noreply]\r\n +static int _process_request_storage(mcp_parser_t *pr, size_t max) { + _process_tokenize(pr, max); + if (pr->ntokens < 5) { + P_DEBUG("%s: not enough tokens to storage command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + errno = 0; + char *n = NULL; + const char *cur = pr->request + pr->tokens[4]; + + int vlen = strtol(cur, &n, 10); + if ((errno == ERANGE) || (cur == n)) { + return -1; + } + + if (vlen < 0 || vlen > (INT_MAX - 2)) { + return -1; + } + vlen += 2; + + pr->vlen = vlen; + + return 0; +} + +// common request with key: <cmd> <key> <args> +static int _process_request_simple(mcp_parser_t *pr, const size_t max) { + _process_tokenize(pr, max); + pr->keytoken = 1; // second token is usually the key... stupid GAT. + + _process_request_key(pr); + return 0; +} + +// TODO: return code ENUM with error types. +// FIXME: the mcp_parser_t bits have ended up being more fragile than I hoped. +// careful zero'ing is required. revisit? +// I think this mostly refers to recursive work (maybe just multiget?) +// Is a parser object run throgh process_request() twice, ever? +int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) { + // we want to "parse in place" as much as possible, which allows us to + // forward an unmodified request without having to rebuild it. + + const char *cm = command; + size_t cl = 0; + // min command length is 2, plus the "\r\n" + if (cmdlen < 4) { + return -1; + } + + const char *s = memchr(command, ' ', cmdlen-2); + if (s != NULL) { + cl = s - command; + } else { + cl = cmdlen - 2; + } + pr->keytoken = 0; + pr->has_space = false; + pr->parsed = cl + 1; + pr->request = command; + pr->reqlen = cmdlen; + int token_max = PARSER_MAX_TOKENS; + + int cmd = -1; + int type = CMD_TYPE_GENERIC; + int ret = 0; + + switch (cl) { + case 0: + case 1: + // falls through with cmd as -1. should error. + break; + case 2: + if (cm[0] == 'm') { + switch (cm[1]) { + case 'g': + cmd = CMD_MG; + ret = _process_request_meta(pr); + break; + case 's': + cmd = CMD_MS; + ret = _process_request_mset(pr); + break; + case 'd': + cmd = CMD_MD; + ret = _process_request_meta(pr); + break; + case 'n': + // TODO: do we route/handle NOP's at all? + // they should simply reflect to the client. + cmd = CMD_MN; + break; + case 'a': + cmd = CMD_MA; + ret = _process_request_meta(pr); + break; + case 'e': + cmd = CMD_ME; + // TODO: not much special processing here; binary keys + ret = _process_request_meta(pr); + break; + } + } + break; + case 3: + if (cm[0] == 'g') { + if (cm[1] == 'e' && cm[2] == 't') { + cmd = CMD_GET; + type = CMD_TYPE_GET; + token_max = 2; // don't chew through multigets. + ret = _process_request_simple(pr, 2); + } + if (cm[1] == 'a' && cm[2] == 't') { + type = CMD_TYPE_GET; + cmd = CMD_GAT; + token_max = 2; // don't chew through multigets. + ret = _process_request_gat(pr); + } + } else if (cm[0] == 's' && cm[1] == 'e' && cm[2] == 't') { + cmd = CMD_SET; + ret = _process_request_storage(pr, token_max); + } else if (cm[0] == 'a' && cm[1] == 'd' && cm[2] == 'd') { + cmd = CMD_ADD; + ret = _process_request_storage(pr, token_max); + } else if (cm[0] == 'c' && cm[1] == 'a' && cm[2] == 's') { + cmd = CMD_CAS; + ret = _process_request_storage(pr, token_max); + } + break; + case 4: + if (strncmp(cm, "gets", 4) == 0) { + cmd = CMD_GETS; + type = CMD_TYPE_GET; + token_max = 2; // don't chew through multigets. + ret = _process_request_simple(pr, 2); + } else if (strncmp(cm, "incr", 4) == 0) { + cmd = CMD_INCR; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "decr", 4) == 0) { + cmd = CMD_DECR; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "gats", 4) == 0) { + cmd = CMD_GATS; + type = CMD_TYPE_GET; + ret = _process_request_gat(pr); + } else if (strncmp(cm, "quit", 4) == 0) { + cmd = CMD_QUIT; + } + break; + case 5: + if (strncmp(cm, "touch", 5) == 0) { + cmd = CMD_TOUCH; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "stats", 5) == 0) { + cmd = CMD_STATS; + // Don't process a key; fetch via arguments. + _process_tokenize(pr, token_max); + } else if (strncmp(cm, "watch", 5) == 0) { + cmd = CMD_WATCH; + _process_tokenize(pr, token_max); + } + break; + case 6: + if (strncmp(cm, "delete", 6) == 0) { + cmd = CMD_DELETE; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "append", 6) == 0) { + cmd = CMD_APPEND; + ret = _process_request_storage(pr, token_max); + } + break; + case 7: + if (strncmp(cm, "replace", 7) == 0) { + cmd = CMD_REPLACE; + ret = _process_request_storage(pr, token_max); + } else if (strncmp(cm, "prepend", 7) == 0) { + cmd = CMD_PREPEND; + ret = _process_request_storage(pr, token_max); + } else if (strncmp(cm, "version", 7) == 0) { + cmd = CMD_VERSION; + _process_tokenize(pr, token_max); + } + break; + } + + // TODO: log more specific error code. + if (cmd == -1 || ret != 0) { + return -1; + } + + pr->command = cmd; + pr->cmd_type = type; + + return 0; +} + +// FIXME (v2): any reason to pass in command/cmdlen separately? +mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen) { + // reserving an upvalue for key. + mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN * 2 + KEY_MAX_LENGTH, 1); + // TODO (v2): memset only the non-data part? as the rest gets memcpy'd + // over. + memset(rq, 0, sizeof(mcp_request_t)); + memcpy(&rq->pr, pr, sizeof(*pr)); + + memcpy(rq->request, command, cmdlen); + rq->pr.request = rq->request; + rq->pr.reqlen = cmdlen; + gettimeofday(&rq->start, NULL); + + luaL_getmetatable(L, "mcp.request"); + lua_setmetatable(L, -2); + + // at this point we should know if we have to bounce through _nread to + // get item data or not. + return rq; +} + +// TODO (v2): +// if modified, this will re-serialize every time it's accessed. +// a simple opt could copy back over the original space +// a "better" one could A/B the request ptr and clear the modified state +// each time it gets serialized. +void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p) { + mcp_parser_t *pr = &rq->pr; + char *r = (char *) pr->request; + size_t len = pr->reqlen; + + // one or more of the tokens were changed + if (rq->was_modified) { + assert(rq->tokent_ref); + // option table to top of stack. + lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref); + + // space was reserved in case of modification. + char *nr = rq->request + MCP_REQUEST_MAXLEN; + r = nr; + char *or = NULL; + + for (int x = 0; x < pr->ntokens; x++) { + const char *newtok = NULL; + size_t newlen = 0; + if (x != 0 && x != pr->keytoken) { + int type = lua_rawgeti(L, -1, x+1); + if (type != LUA_TNIL) { + newtok = lua_tolstring(L, -1, &newlen); + memcpy(nr, newtok, newlen); + nr += newlen; + } + lua_pop(L, 1); + } + + if (newtok == NULL) { + // TODO (v2): if we add an extra "end" token that's just reqlen we can + // memcpy... however most args are short and that may not be worth + // it. + or = rq->request + pr->tokens[x]; + // will walk past the end without the \r test. + // if we add the end token trick this can be changed. + while (*or != ' ' && *or != '\r' && *or != '\n') { + *nr = *or; + nr++; + or++; + } + } + *nr = ' '; + nr++; + } + // tag the end bits. + memcpy(nr-1, "\r\n\0", 3); + nr++; + + len = nr - (rq->request + MCP_REQUEST_MAXLEN); + lua_pop(L, 1); // pop the table + } + + // The stringified request. This is also referencing into the coroutine + // stack, which should be safe from gc. + p->iov[0].iov_base = r; + p->iov[0].iov_len = len; + p->iovcnt = 1; + p->iovbytes = len; + if (pr->vlen != 0) { + p->iov[1].iov_base = pr->vbuf; + p->iov[1].iov_len = pr->vlen; + p->iovcnt = 2; + p->iovbytes += pr->vlen; + } + +} + +// second argument is optional, for building set requests. +// TODO: append the \r\n for the VAL? +int mcplib_request(lua_State *L) { + size_t len = 0; + size_t vlen = 0; + mcp_parser_t pr = {0}; + const char *cmd = luaL_checklstring(L, 1, &len); + const char *val = luaL_optlstring(L, 2, NULL, &vlen); + + // FIXME (v2): if we inline the userdata we can avoid memcpy'ing the parser + // structure from the stack? but causes some code duplication. + if (process_request(&pr, cmd, len) != 0) { + proxy_lua_error(L, "failed to parse request"); + return 0; + } + mcp_request_t *rq = mcp_new_request(L, &pr, cmd, len); + + if (val != NULL) { + rq->pr.vlen = vlen; + rq->pr.vbuf = malloc(vlen); + if (rq->pr.vbuf == NULL) { + // Note: without *c we can't tick the appropriate counter. + // However, in practice raw malloc's are nearly never going to + // fail. + // TODO(v2): we can stack values into the request objects or use + // the slabber memory, so this isn't necessary anyway. + proxy_lua_error(L, "failed to allocate value memory for request object"); + } + memcpy(rq->pr.vbuf, val, vlen); + } + gettimeofday(&rq->start, NULL); + + // rq is now created, parsed, and on the stack. + return 1; +} + +int mcplib_request_key(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + lua_pushlstring(L, MCP_PARSER_KEY(rq->pr), rq->pr.klen); + return 1; +} + +// NOTE: I've mixed up const/non-const strings in the request. During parsing +// we want it to be const, but after that's done the request is no longer +// const. It might be better to just remove the const higher up the chain, but +// I'd rather not. So for now these functions will be dumping the const to +// modify the string. +int mcplib_request_ltrimkey(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request"); + int totrim = luaL_checkinteger(L, -1); + char *key = (char *) MCP_PARSER_KEY(rq->pr); + + if (totrim > rq->pr.klen) { + proxy_lua_error(L, "ltrimkey cannot zero out key"); + return 0; + } else { + memset(key, ' ', totrim); + rq->pr.klen -= totrim; + rq->pr.tokens[rq->pr.keytoken] += totrim; + } + return 1; +} + +int mcplib_request_rtrimkey(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request"); + int totrim = luaL_checkinteger(L, -1); + char *key = (char *) MCP_PARSER_KEY(rq->pr); + + if (totrim > rq->pr.klen) { + proxy_lua_error(L, "rtrimkey cannot zero out key"); + return 0; + } else { + memset(key + (rq->pr.klen - totrim), ' ', totrim); + rq->pr.klen -= totrim; + // don't need to change the key token. + } + return 1; +} + +// Virtual table operations on the request. +int mcplib_request_token(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + int argc = lua_gettop(L); + + if (argc == 1) { + lua_pushnil(L); + return 1; + } + + int token = luaL_checkinteger(L, 2); + + if (token < 1 || token > rq->pr.ntokens) { + // maybe an error? + lua_pushnil(L); + return 1; + } + + // we hold overwritten or parsed tokens in a lua table. + if (rq->tokent_ref == 0) { + // create a presized table that can hold our tokens. + lua_createtable(L, rq->pr.ntokens, 0); + // duplicate value to set back + lua_pushvalue(L, -1); + rq->tokent_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } else { + lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref); + } + // top of stack should be token table. + + size_t vlen = 0; + if (argc > 2) { + // overwriting a token. + luaL_checklstring(L, 3, &vlen); + lua_pushvalue(L, 3); // copy to top of stack + lua_rawseti(L, -2, token); + rq->was_modified = true; + return 0; + } else { + // fetching a token. + if (lua_rawgeti(L, -1, token) != LUA_TSTRING) { + lua_pop(L, 1); // got a nil, drop it. + + // token not uploaded yet. find the len. + char *s = (char *) &rq->pr.request[rq->pr.tokens[token-1]]; + char *e = s; + while (*e != ' ') { + e++; + } + vlen = e - s; + + P_DEBUG("%s: pushing token of len: %lu\n", __func__, vlen); + lua_pushlstring(L, s, vlen); + lua_pushvalue(L, -1); // copy + + lua_rawseti(L, -3, token); // pops copy. + } + + // return fetched token or copy of new token. + return 1; + } + + return 0; +} + +int mcplib_request_ntokens(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + lua_pushinteger(L, rq->pr.ntokens); + return 1; +} + +int mcplib_request_command(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + lua_pushinteger(L, rq->pr.command); + return 1; +} + +int mcplib_request_gc(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + // During nread c->item is the malloc'ed buffer. not yet put into + // rq->buf - this gets freed because we've also set c->item_malloced if + // the connection closes before finishing nread. + if (rq->pr.vbuf != NULL) { + free(rq->pr.vbuf); + } + + if (rq->tokent_ref != 0) { + luaL_unref(L, LUA_REGISTRYINDEX, rq->tokent_ref); + } + return 0; +} + +// TODO (v2): check what lua does when it calls a function with a string argument +// stored from a table/similar (ie; the prefix check code). +// If it's not copying anything, we can add request-side functions to do most +// forms of matching and avoid copying the key to lua space. |