diff options
author | dormando <dormando@rydia.net> | 2022-08-08 16:27:22 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-08-24 22:32:40 -0700 |
commit | 6bcc5b8553d43f494e85ab3ec2ce789c38a3ca33 (patch) | |
tree | d49f62392b0b40419567014621877b5581885e72 | |
parent | 2bdf0ec3c8d89d7b75cb8ba7f1290cd937595669 (diff) | |
download | memcached-6bcc5b8553d43f494e85ab3ec2ce789c38a3ca33.tar.gz |
proxy: mcp.attach(CMD, r, "tag")
allows using tagged listeners (ex; `-l tag[test]:127.0.0.1:11212`) to
select a top level route for a function.
expects there to not be dozens of listeners, but for a handful will be
faster than a hash table lookup.
-rw-r--r-- | proto_proxy.c | 21 | ||||
-rw-r--r-- | proxy.h | 8 | ||||
-rw-r--r-- | proxy_lua.c | 86 | ||||
-rw-r--r-- | t/startfile.lua | 3 |
4 files changed, 105 insertions, 13 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 60c50d6..3ee8c07 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -644,8 +644,22 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu } struct proxy_hook *hook = &hooks[pr.command]; + int hook_ref = hook->lua_ref; + // if client came from a tagged listener, scan for a more specific hook. + // TODO: (v2) avoiding a hash table lookup here, but maybe some other + // datastructure would suffice. for 4-8 tags this is perfectly fast. + if (c->tag && hook->tagged) { + struct proxy_hook_tagged *pht = hook->tagged; + while (pht->lua_ref) { + if (c->tag == pht->tag) { + hook_ref = pht->lua_ref; + break; + } + pht++; + } + } - if (!hook->is_lua) { + if (!hook_ref) { // need to pass our command string into the internal handler. // to minimize the code change, this means allowing it to tokenize the // full command. The proxy's indirect parser should be built out to @@ -754,12 +768,15 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu lua_State *Lc = lua_tothread(L, -1); // leave the thread first on the stack, so we can reference it if needed. // pull the lua hook function onto the stack. - lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook->lua_ref); + lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook_ref); mcp_request_t *rq = mcp_new_request(Lc, &pr, command, cmdlen); if (multiget) { rq->ascii_multiget = true; } + // NOTE: option 1) copy c->tag into rq->tag here. + // add req:listen_tag() to retrieve in top level route. + // TODO (v2): lift this to a post-processor? if (rq->pr.vlen != 0) { // relying on temporary malloc's not succumbing as poorly to @@ -218,9 +218,15 @@ typedef struct { pthread_mutex_t stats_lock; // used for rare global counters } proxy_ctx_t; +struct proxy_hook_tagged { + uint64_t tag; + int lua_ref; +}; + struct proxy_hook { int lua_ref; - bool is_lua; // pull the lua reference and call it as a lua function. + int tagcount; + struct proxy_hook_tagged *tagged; // array of possible tagged hooks. }; // TODO (v2): some hash functions (crc?) might require initializers. If we run into diff --git a/proxy_lua.c b/proxy_lua.c index 66b0296..cc0cf75 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -604,7 +604,7 @@ static int mcplib_attach(lua_State *L) { // Pull the original worker thread out of the shared mcplib upvalue. LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); - int hook = luaL_checkinteger(L, -2); + int hook = luaL_checkinteger(L, 1); // pushvalue to dupe func and etc. // can leave original func on stack afterward because it'll get cleared. int loop_end = 0; @@ -620,20 +620,86 @@ static int mcplib_attach(lua_State *L) { loop_end = hook + 1; } - if (lua_isfunction(L, -1)) { + if (lua_isfunction(L, 2)) { struct proxy_hook *hooks = t->proxy_hooks; + uint64_t tag = 0; // listener socket tag + + if (lua_isstring(L, 3)) { + size_t len; + const char *stag = lua_tolstring(L, 3, &len); + if (len < 1 || len > 8) { + proxy_lua_error(L, "mcp.attach: tag must be 1 to 8 characters"); + return 0; + } + memcpy(&tag, stag, len); + } for (int x = loop_start; x < loop_end; x++) { struct proxy_hook *h = &hooks[x]; - lua_pushvalue(L, -1); // duplicate the function for the ref. - if (h->lua_ref) { - // remove existing reference. - luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref); + lua_pushvalue(L, 2); // duplicate the function for the ref. + + if (tag) { + // listener was tagged. use the extended hook structure. + struct proxy_hook_tagged *pht = h->tagged; + + if (h->tagcount == 0) { + pht = calloc(1, sizeof(struct proxy_hook_tagged)); + if (pht == NULL) { + proxy_lua_error(L, "mcp.attach: failure allocating tagged hooks"); + return 0; + } + h->tagcount = 1; + h->tagged = pht; + } + + bool found = false; + for (int x = 0; x < h->tagcount; x++) { + if (pht->tag == tag) { + if (pht->lua_ref) { + // Found existing tagged hook. + luaL_unref(L, LUA_REGISTRYINDEX, pht->lua_ref); + } + + pht->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + assert(pht->lua_ref != 0); + found = true; + break; + } else if (pht->tag == 0) { + // no tag in this slot, so we use it. + pht->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + pht->tag = tag; + assert(pht->lua_ref != 0); + found = true; + break; + } + pht++; + } + + // need to resize the array to fit the new tag. + if (!found) { + pht = realloc(h->tagged, sizeof(struct proxy_hook_tagged) * (h->tagcount+1)); + if (!pht) { + proxy_lua_error(L, "mcp.attach: failure to resize tagged hooks"); + return 0; + } + + pht[h->tagcount].lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + pht[h->tagcount].tag = tag; + + h->tagcount++; + h->tagged = pht; + } + + } else { + if (h->lua_ref) { + // remove existing reference. + luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref); + } + + // pops the function from the stack and leaves us a ref. for later. + h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + assert(h->lua_ref != 0); } - - // pops the function from the stack and leaves us a ref. for later. - h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); - h->is_lua = true; } } else { proxy_lua_error(L, "Must pass a function to mcp.attach"); diff --git a/t/startfile.lua b/t/startfile.lua index 29e3d9b..9f16434 100644 --- a/t/startfile.lua +++ b/t/startfile.lua @@ -333,4 +333,7 @@ function mcp_config_routes(main_zones) -- are attached to the internal parser. --mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end) mcp.attach(mcp.CMD_ANY_STORAGE, routetop) + -- tagged top level attachments. ex: memcached -l tag[tagtest]:127.0.0.1:11212 + -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR no route\r\n" end, "tagtest") + -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR my route\r\n" end, "newtag") end |