summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-05 23:01:26 -0800
committerdormando <dormando@rydia.net>2022-02-07 21:25:40 -0800
commit4b781973608a8f31fff314340759af936c518cd2 (patch)
tree137f5d82187a86e27e0b82d50478ab6c0062a521 /proto_proxy.c
parentb6fd865985dd8285bd963dfd429d7f475d54d77f (diff)
downloadmemcached-4b781973608a8f31fff314340759af936c518cd2.tar.gz
proxy: add filter and hash options to mcp.pool()
two builtin filter options (hash stop and tag), because why not :) hash defaults caused some code reorganization. default hash dist is now jump, because I can't think of why you'd use modulus over that.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c361
1 files changed, 234 insertions, 127 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 525fdb5..20f908f 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -33,7 +33,6 @@
#include "proto_proxy.h"
#include "proto_text.h"
-#include "murmur3_hash.h"
#include "queue.h"
#define XXH_INLINE_ALL // modifier for xxh3's include below
#include "xxhash.h"
@@ -213,18 +212,19 @@ struct proxy_hook {
bool is_lua; // pull the lua reference and call it as a lua function.
};
-typedef uint32_t (*hash_selector_func)(const void *key, size_t len, void *ctx);
+// TODO (v2): some hash functions (crc?) might require initializers. If we run into
+// any the interface might need expanding.
+typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed);
+struct proxy_hash_func {
+ key_hash_func func;
+};
+typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen);
+typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx);
struct proxy_hash_caller {
hash_selector_func selector_func;
void *ctx;
};
-// A default hash function for backends.
-static uint32_t mcplib_hashfunc_murmur3_func(const void *key, size_t len, void *ctx) {
- return MurmurHash3_x86_32(key, len);
-}
-static struct proxy_hash_caller mcplib_hashfunc_murmur3 = { mcplib_hashfunc_murmur3_func, NULL};
-
enum mcp_backend_states {
mcp_backend_read = 0, // waiting to read any response
mcp_backend_parse, // have some buffered data to check
@@ -383,12 +383,17 @@ typedef struct {
mcp_backend_t *be;
} mcp_pool_be_t;
+#define KEY_HASH_FILTER_MAX 5
typedef struct mcp_pool_s mcp_pool_t;
struct mcp_pool_s {
struct proxy_hash_caller phc;
+ key_hash_filter_func key_filter;
+ key_hash_func key_hasher;
pthread_mutex_t lock; // protects refcount.
proxy_ctx_t *ctx; // main context.
STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
+ char key_filter_conf[KEY_HASH_FILTER_MAX+1];
+ uint64_t hash_seed; // calculated from a string.
int refcount;
int phc_ref;
int self_ref; // TODO (v2): double check that this is needed.
@@ -2903,8 +2908,119 @@ static int mcplib_pool_gc(lua_State *L) {
return 0;
}
+// Looks for a short string in a key to separate which part gets hashed vs
+// sent to the backend node.
+// ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed.
+static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) {
+ char temp[KEY_MAX_LENGTH+1];
+ *newlen = klen;
+ if (klen > KEY_MAX_LENGTH) {
+ // Hedging against potential bugs.
+ return key;
+ }
+
+ memcpy(temp, key, klen);
+ temp[klen+1] = '\0';
+
+ // TODO (v2): memmem would avoid the temp key and memcpy here, but it's
+ // not technically portable. An easy improvement would be to detect
+ // memmem() in `configure` and only use strstr/copy as a fallback.
+ // Since keys are short it's unlikely this would be a major performance
+ // win.
+ char *found = strstr(temp, conf);
+
+ if (found) {
+ *newlen = found - temp;
+ }
+
+ // hash stop can't change where keys start.
+ return key;
+}
+
+// Takes a two character "tag", ie; "{}", or "$$", searches string for the
+// first then second character. Only hashes the portion within these tags.
+// *conf _must_ be two characters.
+static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) {
+ *newlen = klen;
+
+ const char *t1 = memchr(key, conf[0], klen);
+ if (t1) {
+ size_t remain = klen - (t1 - key);
+ // must be at least one character inbetween the tags to hash.
+ if (remain > 1) {
+ const char *t2 = memchr(t1, conf[1], remain);
+
+ if (t2) {
+ *newlen = t2 - t1 - 1;
+ return t1+1;
+ }
+ }
+ }
+
+ return key;
+}
+
+static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
+ luaL_checktype(L, -1, LUA_TTABLE);
+ if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
+ proxy_lua_error(L, "key distribution object missing 'new' function");
+ return;
+ }
+
+ // - now create the copy pool table
+ lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_backend_t *be = p->pool[x-1].be;
+ lua_createtable(L, 0, 4);
+ // stack = [p, h, f, optN, newpool, backend]
+ // the key should be fine for id? maybe don't need to duplicate
+ // this?
+ lua_pushinteger(L, x);
+ lua_setfield(L, -2, "id");
+ // we don't use the hostname for ketama hashing
+ // so passing ip for hostname is fine
+ lua_pushstring(L, be->ip);
+ // FIXME: hostname should probably work...
+ lua_setfield(L, -2, "hostname");
+ lua_pushstring(L, be->ip);
+ lua_setfield(L, -2, "addr");
+ lua_pushstring(L, be->port);
+ lua_setfield(L, -2, "port");
+ // TODO (v2): weight/etc?
+
+ // set the backend table into the new pool table.
+ lua_rawseti(L, -2, x);
+ }
+
+ // we can either use lua_insert() or possibly _rotate to shift
+ // things into the right place, but simplest is to just copy the
+ // option arg to the end of the stack.
+ lua_pushvalue(L, 2);
+ // - stack should be: pool, opts, func, pooltable, opts
+
+ // call the dist new function.
+ int res = lua_pcall(L, 2, 2, 0);
+
+ if (res != LUA_OK) {
+ lua_error(L); // error should be on the stack already.
+ return;
+ }
+
+ // -1 is lightuserdata ptr to the struct (which must be owned by the
+ // userdata), which is later used for internal calls.
+ struct proxy_hash_caller *phc;
+
+ luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
+ luaL_checktype(L, -2, LUA_TUSERDATA);
+ phc = lua_touserdata(L, -1);
+ memcpy(&p->phc, phc, sizeof(*phc));
+ lua_pop(L, 1);
+ // -2 was userdata we need to hold a reference to
+ p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ // UD now popped from stack.
+}
+
// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-// TODO: hash and hashfilter
static int mcplib_pool(lua_State *L) {
int argc = lua_gettop(L);
luaL_checktype(L, 1, LUA_TTABLE);
@@ -2915,7 +3031,8 @@ static int mcplib_pool(lua_State *L) {
// Zero the memory before use, so we can realibly use __gc to clean up
memset(p, 0, plen);
p->pool_size = n;
- p->refcount = 0;
+ // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
+ p->key_hasher = XXH3_64bits_withSeed;
pthread_mutex_init(&p->lock, NULL);
p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue.
@@ -2935,8 +3052,15 @@ static int mcplib_pool(lua_State *L) {
}
if (argc == 1) {
- // Use default hash selector if none given.
- p->phc = mcplib_hashfunc_murmur3;
+ lua_getglobal(L, "mcp");
+ // TODO (v2): decide on a mcp.default_dist and use that instead
+ if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // pop "dist_jump_hash" value.
+ } else {
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // pop "mcp"
return 1;
}
@@ -2947,72 +3071,68 @@ static int mcplib_pool(lua_State *L) {
// stack: backends, options, mcp.pool
if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
// overriding the distribution function.
- luaL_checktype(L, -1, LUA_TTABLE);
- if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
- proxy_lua_error(L, "key distribution object missing 'new' function");
- return 0;
- }
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // remove the dist table from stack.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
- // - now create the copy pool table
- lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
- for (int x = 1; x <= p->pool_size; x++) {
- mcp_backend_t *be = p->pool[x-1].be;
- lua_createtable(L, 0, 4);
- // stack = [p, h, f, optN, newpool, backend]
- // the key should be fine for id? maybe don't need to duplicate
- // this?
- lua_pushinteger(L, x);
- lua_setfield(L, -2, "id");
- // we don't use the hostname for ketama hashing
- // so passing ip for hostname is fine
- lua_pushstring(L, be->ip);
- lua_setfield(L, -2, "hostname");
- lua_pushstring(L, be->ip);
- lua_setfield(L, -2, "addr");
- lua_pushstring(L, be->port);
- lua_setfield(L, -2, "port");
- // TODO (v2): weight/etc?
-
- // set the backend table into the new pool table.
- lua_rawseti(L, -2, x);
+ if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ const char *f_type = lua_tostring(L, -1);
+ if (strcmp(f_type, "stop") == 0) {
+ p->key_filter = mcp_key_hash_filter_stop;
+ } else if (strcmp(f_type, "tags") == 0) {
+ p->key_filter = mcp_key_hash_filter_tag;
+ } else {
+ proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type);
}
- // we can either use lua_insert() or possibly _rotate to shift
- // things into the right place, but simplest is to just copy the
- // option arg to the end of the stack.
- lua_pushvalue(L, 2);
- // - stack should be: pool, opts, func, pooltable, opts
+ lua_pop(L, 1); // pops "filter" value.
- // call the dist new function.
- int res = lua_pcall(L, 2, 2, 0);
+ if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) {
+ size_t len = 0;
+ const char *conf = lua_tolstring(L, -1, &len);
+ if (len < 2 || len > KEY_HASH_FILTER_MAX) {
+ proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX);
+ }
- if (res != LUA_OK) {
- lua_error(L); // error should be on the stack already.
- return 0;
+ memcpy(p->key_filter_conf, conf, len);
+ p->key_filter_conf[len+1] = '\0';
+ } else {
+ proxy_lua_error(L, "hash filter requires 'filter_conf' string");
}
+ lua_pop(L, 1); // pops "filter_conf" value.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
- // -1 is lightuserdata ptr to the struct (which must be owned by the
- // userdata), which is later used for internal calls.
- struct proxy_hash_caller *phc;
-
- luaL_checktype(L, -1, LUA_TUSERDATA);
- luaL_checktype(L, -2, LUA_TUSERDATA);
- phc = lua_touserdata(L, -1);
- memcpy(&p->phc, phc, sizeof(*phc));
+ if (lua_getfield(L, 2, "hash") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
+ struct proxy_hash_func *phf = lua_touserdata(L, -1);
+ p->key_hasher = phf->func;
lua_pop(L, 1);
- // -2 was userdata we need to hold a reference to
- p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- // UD now popped from stack.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
- lua_pop(L, 1); // remove the dist table from stack.
+ if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ size_t seedlen;
+ const char *seedstr = lua_tolstring(L, -1, &seedlen);
+ // Note: the custom hasher for a dist may be "weird" in some cases, so
+ // we use a standard hash method for the seed here.
+ // I'm open to changing this (ie; mcp.pool_seed_hasher = etc)
+ p->hash_seed = XXH3_64bits(seedstr, seedlen);
+
+ lua_pop(L, 1);
} else {
lua_pop(L, 1); // pop the nil.
}
- // TODO: getfield("filter") and add the function
- // TODO: getfield("hash") and add the function
- // TODO: getfield("seed") and use the hash function to calculate a seed
- // value.
+ if (p->phc.selector_func == NULL) {
+ proxy_lua_error(L, "cannot create pool missing 'dist' argument");
+ }
return 1;
}
@@ -3034,6 +3154,25 @@ static int mcplib_pool_proxy_gc(lua_State *L) {
return 0;
}
+static mcp_backend_t *_mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
+ if (p->key_filter) {
+ key = p->key_filter(p->key_filter_conf, key, len, &len);
+ P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
+ }
+ uint64_t hash = p->key_hasher(key, len, p->hash_seed);
+ uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx);
+
+ assert(p->phc.ctx != NULL);
+ // attach the backend to the request object.
+ // the lua modules should "think" in 1 based indexes, so we need to
+ // subtract one here.
+ if (lookup >= p->pool_size) {
+ proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
+ }
+
+ return p->pool[lookup].be;
+}
+
// hashfunc(request) -> backend(request)
// needs key from request object.
static int mcplib_pool_proxy_call(lua_State *L) {
@@ -3050,24 +3189,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
}
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
- uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx);
-
- // attach the backend to the request object.
- // save CPU cycles over rolling it through lua.
- if (p->phc.ctx == NULL) {
- // TODO: if NULL, pass in pool_size as ctx?
- // works because the % bit will return an id we can index here.
- // FIXME: temporary? maybe?
- // if no context, what we got back was a hash which we need to modulus
- // against the pool, since the func has no info about the pool.
- rq->be = p->pool[lookup % p->pool_size].be;
- } else {
- // else we have a direct id into our pool.
- // the lua modules should "think" in 1 based indexes, so we need to
- // subtract one here.
- // TODO: bother validating the range?
- rq->be = p->pool[lookup-1].be;
- }
+ rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);
// now yield request, pool up.
return lua_yield(L, 2);
@@ -3852,57 +3974,34 @@ static int mcplib_request_gc(lua_State *L) {
typedef struct {
struct proxy_hash_caller phc; // passed back to proxy API
- uint64_t seed;
unsigned int buckets;
} mcplib_jump_hash_t;
-static uint32_t mcplib_jump_hash_get_server(const void *key, size_t len, void *ctx) {
+static uint32_t mcplib_dist_jump_hash_get_server(uint64_t hash, void *ctx) {
mcplib_jump_hash_t *jh = ctx;
- uint64_t hash = XXH3_64bits_withSeed(key, len, jh->seed);
-
int64_t b = -1, j = 0;
while (j < jh->buckets) {
b = j;
hash = hash * 2862933555777941757ULL + 1;
j = (b + 1) * ((double)(1LL << 31) / (double)((hash >> 33) + 1));
}
- return b+1; // FIXME: do the -1 just for ketama and remove from internal code?
+ return b;
}
// stack = [pool, option]
-static int mcplib_jump_hash_new(lua_State *L) {
- uint64_t seed = 0;
- const char *seedstr = NULL;
- size_t seedlen = 0;
-
+static int mcplib_dist_jump_hash_new(lua_State *L) {
luaL_checktype(L, 1, LUA_TTABLE);
lua_Unsigned buckets = lua_rawlen(L, 1);
- int argc = lua_gettop(L);
- if (argc > 1) {
- // options supplied. to be specified as a table.
- // { seed = "foo" }
- luaL_checktype(L, 2, LUA_TTABLE);
- // FIXME: adjust so we ensure/error on this being a string?
- if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
- seedstr = lua_tolstring(L, -1, &seedlen);
- seed = XXH3_64bits(seedstr, seedlen);
- } else {
- dump_stack(L);
- }
- lua_pop(L, 1);
- }
-
mcplib_jump_hash_t *jh = lua_newuserdatauv(L, sizeof(mcplib_jump_hash_t), 0);
// don't need to loop through the table at all, just need its length.
// could optimize startup time by adding hints to the module for how to
// format pool (ie; just a total count or the full table)
- jh->seed = seed;
jh->buckets = buckets;
jh->phc.ctx = jh;
- jh->phc.selector_func = mcplib_jump_hash_get_server;
+ jh->phc.selector_func = mcplib_dist_jump_hash_get_server;
lua_pushlightuserdata(L, &jh->phc);
@@ -3910,9 +4009,9 @@ static int mcplib_jump_hash_new(lua_State *L) {
return 2;
}
-static int mcplib_open_jump_hash(lua_State *L) {
+static int mcplib_open_dist_jump_hash(lua_State *L) {
const struct luaL_Reg jump_f[] = {
- {"new", mcplib_jump_hash_new},
+ {"new", mcplib_dist_jump_hash_new},
{NULL, NULL},
};
@@ -4047,6 +4146,9 @@ static int mcplib_await(lua_State *L) {
int n = luaL_len(L, 2); // length of hash selector table
int wait_for = 0; // 0 means wait for all responses
+ if (n <= 0) {
+ proxy_lua_error(L, "mcp.await arguments must have at least one pool");
+ }
if (lua_isnumber(L, 3)) {
wait_for = lua_tointeger(L, 3);
lua_pop(L, 1);
@@ -4054,9 +4156,8 @@ static int mcplib_await(lua_State *L) {
wait_for = n;
}
}
- // TODO: bail if pool table was 0 len? else bad things can happen.
- // TODO: quickly loop table once and ensure they're all pools?
+ // TODO (v2): quickly loop table once and ensure they're all pools?
int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.
@@ -4166,21 +4267,15 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
while (lua_next(L, 1) != 0) {
P_DEBUG("%s: top of loop\n", __func__);
// (key, -2), (val, -1)
- // FIXME: move to a func. mostly redundant with hsp_call()?
mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
if (pp == NULL) {
- // TODO: fatal! wasn't correct object type
+ proxy_lua_error(L, "mcp.await must be supplied with a pool");
}
mcp_pool_t *p = pp->main;
- uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx);
// NOTE: rq->be is only held to help pass the backend into the IOP in
// mcp_queue call. Could be a local variable and an argument too.
- if (p->phc.ctx == NULL) {
- rq->be = p->pool[lookup % p->pool_size].be;
- } else {
- rq->be = p->pool[lookup-1].be;
- }
+ rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);
mcp_queue_await_io(c, L, rq, await_ref);
@@ -4307,6 +4402,19 @@ static int mcplib_await_return(io_pending_proxy_t *p) {
/*** END lua await() object interface ***/
+/*** START xxhash module ***/
+
+static struct proxy_hash_func mcplib_hash_xxhash = {
+ XXH3_64bits_withSeed,
+};
+
+static int mcplib_open_hash_xxhash(lua_State *L) {
+ lua_pushlightuserdata(L, &mcplib_hash_xxhash);
+ return 1;
+}
+
+/*** END xxhash module ***/
+
// Creates and returns the top level "mcp" module
int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
lua_State *L = ctx;
@@ -4402,14 +4510,13 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
luaL_newlibtable(L, mcplib_f);
proxy_register_defines(L);
+ mcplib_open_hash_xxhash(L);
+ lua_setfield(L, -2, "hash_xxhash");
// hash function for selectors.
// have to wrap the function in a struct because function pointers aren't
// pointer pointers :)
- mcplib_open_jump_hash(L);
- lua_setfield(L, -2, "hash_jump");
- // FIXME: remove this once proper default is added.
- lua_pushlightuserdata(L, &mcplib_hashfunc_murmur3);
- lua_setfield(L, -2, "hash_murmur3");
+ mcplib_open_dist_jump_hash(L);
+ lua_setfield(L, -2, "dist_jump_hash");
lua_pushlightuserdata(L, (void *)t); // upvalue for original thread
lua_newtable(L); // upvalue for mcp.attach() table.