summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-08-08 16:27:22 -0700
committerdormando <dormando@rydia.net>2022-08-24 22:32:40 -0700
commit6bcc5b8553d43f494e85ab3ec2ce789c38a3ca33 (patch)
treed49f62392b0b40419567014621877b5581885e72
parent2bdf0ec3c8d89d7b75cb8ba7f1290cd937595669 (diff)
downloadmemcached-6bcc5b8553d43f494e85ab3ec2ce789c38a3ca33.tar.gz
proxy: mcp.attach(CMD, r, "tag")
allows using tagged listeners (ex; `-l tag[test]:127.0.0.1:11212`) to select a top level route for a function. expects there to not be dozens of listeners, but for a handful will be faster than a hash table lookup.
-rw-r--r--proto_proxy.c21
-rw-r--r--proxy.h8
-rw-r--r--proxy_lua.c86
-rw-r--r--t/startfile.lua3
4 files changed, 105 insertions, 13 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 60c50d6..3ee8c07 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -644,8 +644,22 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
}
struct proxy_hook *hook = &hooks[pr.command];
+ int hook_ref = hook->lua_ref;
+ // if client came from a tagged listener, scan for a more specific hook.
+ // TODO: (v2) avoiding a hash table lookup here, but maybe some other
+ // datastructure would suffice. for 4-8 tags this is perfectly fast.
+ if (c->tag && hook->tagged) {
+ struct proxy_hook_tagged *pht = hook->tagged;
+ while (pht->lua_ref) {
+ if (c->tag == pht->tag) {
+ hook_ref = pht->lua_ref;
+ break;
+ }
+ pht++;
+ }
+ }
- if (!hook->is_lua) {
+ if (!hook_ref) {
// need to pass our command string into the internal handler.
// to minimize the code change, this means allowing it to tokenize the
// full command. The proxy's indirect parser should be built out to
@@ -754,12 +768,15 @@ static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool mu
lua_State *Lc = lua_tothread(L, -1);
// leave the thread first on the stack, so we can reference it if needed.
// pull the lua hook function onto the stack.
- lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook->lua_ref);
+ lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook_ref);
mcp_request_t *rq = mcp_new_request(Lc, &pr, command, cmdlen);
if (multiget) {
rq->ascii_multiget = true;
}
+ // NOTE: option 1) copy c->tag into rq->tag here.
+ // add req:listen_tag() to retrieve in top level route.
+
// TODO (v2): lift this to a post-processor?
if (rq->pr.vlen != 0) {
// relying on temporary malloc's not succumbing as poorly to
diff --git a/proxy.h b/proxy.h
index 1c5da3d..015c093 100644
--- a/proxy.h
+++ b/proxy.h
@@ -218,9 +218,15 @@ typedef struct {
pthread_mutex_t stats_lock; // used for rare global counters
} proxy_ctx_t;
+struct proxy_hook_tagged {
+ uint64_t tag;
+ int lua_ref;
+};
+
struct proxy_hook {
int lua_ref;
- bool is_lua; // pull the lua reference and call it as a lua function.
+ int tagcount;
+ struct proxy_hook_tagged *tagged; // array of possible tagged hooks.
};
// TODO (v2): some hash functions (crc?) might require initializers. If we run into
diff --git a/proxy_lua.c b/proxy_lua.c
index 66b0296..cc0cf75 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -604,7 +604,7 @@ static int mcplib_attach(lua_State *L) {
// Pull the original worker thread out of the shared mcplib upvalue.
LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
- int hook = luaL_checkinteger(L, -2);
+ int hook = luaL_checkinteger(L, 1);
// pushvalue to dupe func and etc.
// can leave original func on stack afterward because it'll get cleared.
int loop_end = 0;
@@ -620,20 +620,86 @@ static int mcplib_attach(lua_State *L) {
loop_end = hook + 1;
}
- if (lua_isfunction(L, -1)) {
+ if (lua_isfunction(L, 2)) {
struct proxy_hook *hooks = t->proxy_hooks;
+ uint64_t tag = 0; // listener socket tag
+
+ if (lua_isstring(L, 3)) {
+ size_t len;
+ const char *stag = lua_tolstring(L, 3, &len);
+ if (len < 1 || len > 8) {
+ proxy_lua_error(L, "mcp.attach: tag must be 1 to 8 characters");
+ return 0;
+ }
+ memcpy(&tag, stag, len);
+ }
for (int x = loop_start; x < loop_end; x++) {
struct proxy_hook *h = &hooks[x];
- lua_pushvalue(L, -1); // duplicate the function for the ref.
- if (h->lua_ref) {
- // remove existing reference.
- luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref);
+ lua_pushvalue(L, 2); // duplicate the function for the ref.
+
+ if (tag) {
+ // listener was tagged. use the extended hook structure.
+ struct proxy_hook_tagged *pht = h->tagged;
+
+ if (h->tagcount == 0) {
+ pht = calloc(1, sizeof(struct proxy_hook_tagged));
+ if (pht == NULL) {
+ proxy_lua_error(L, "mcp.attach: failure allocating tagged hooks");
+ return 0;
+ }
+ h->tagcount = 1;
+ h->tagged = pht;
+ }
+
+ bool found = false;
+ for (int x = 0; x < h->tagcount; x++) {
+ if (pht->tag == tag) {
+ if (pht->lua_ref) {
+ // Found existing tagged hook.
+ luaL_unref(L, LUA_REGISTRYINDEX, pht->lua_ref);
+ }
+
+ pht->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ assert(pht->lua_ref != 0);
+ found = true;
+ break;
+ } else if (pht->tag == 0) {
+ // no tag in this slot, so we use it.
+ pht->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ pht->tag = tag;
+ assert(pht->lua_ref != 0);
+ found = true;
+ break;
+ }
+ pht++;
+ }
+
+ // need to resize the array to fit the new tag.
+ if (!found) {
+ pht = realloc(h->tagged, sizeof(struct proxy_hook_tagged) * (h->tagcount+1));
+ if (!pht) {
+ proxy_lua_error(L, "mcp.attach: failure to resize tagged hooks");
+ return 0;
+ }
+
+ pht[h->tagcount].lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ pht[h->tagcount].tag = tag;
+
+ h->tagcount++;
+ h->tagged = pht;
+ }
+
+ } else {
+ if (h->lua_ref) {
+ // remove existing reference.
+ luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref);
+ }
+
+ // pops the function from the stack and leaves us a ref. for later.
+ h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ assert(h->lua_ref != 0);
}
-
- // pops the function from the stack and leaves us a ref. for later.
- h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- h->is_lua = true;
}
} else {
proxy_lua_error(L, "Must pass a function to mcp.attach");
diff --git a/t/startfile.lua b/t/startfile.lua
index 29e3d9b..9f16434 100644
--- a/t/startfile.lua
+++ b/t/startfile.lua
@@ -333,4 +333,7 @@ function mcp_config_routes(main_zones)
-- are attached to the internal parser.
--mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end)
mcp.attach(mcp.CMD_ANY_STORAGE, routetop)
+ -- tagged top level attachments. ex: memcached -l tag[tagtest]:127.0.0.1:11212
+ -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR no route\r\n" end, "tagtest")
+ -- mcp.attach(mcp.CMD_ANY_STORAGE, function (r) return "SERVER_ERROR my route\r\n" end, "newtag")
end