diff options
author | dormando <dormando@rydia.net> | 2022-02-05 23:01:26 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-02-07 21:25:40 -0800 |
commit | 4b781973608a8f31fff314340759af936c518cd2 (patch) | |
tree | 137f5d82187a86e27e0b82d50478ab6c0062a521 | |
parent | b6fd865985dd8285bd963dfd429d7f475d54d77f (diff) | |
download | memcached-4b781973608a8f31fff314340759af936c518cd2.tar.gz |
proxy: add filter and hash options to mcp.pool()
two builtin filter options (hash stop and tag), because why not :)
hash defaults caused some code reorganization. default hash dist is now
jump, because I can't think of why you'd use modulus over that.
-rw-r--r-- | proto_proxy.c | 361 | ||||
-rw-r--r-- | t/startfile.lua | 6 |
2 files changed, 238 insertions, 129 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 525fdb5..20f908f 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -33,7 +33,6 @@ #include "proto_proxy.h" #include "proto_text.h" -#include "murmur3_hash.h" #include "queue.h" #define XXH_INLINE_ALL // modifier for xxh3's include below #include "xxhash.h" @@ -213,18 +212,19 @@ struct proxy_hook { bool is_lua; // pull the lua reference and call it as a lua function. }; -typedef uint32_t (*hash_selector_func)(const void *key, size_t len, void *ctx); +// TODO (v2): some hash functions (crc?) might require initializers. If we run into +// any the interface might need expanding. +typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed); +struct proxy_hash_func { + key_hash_func func; +}; +typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen); +typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx); struct proxy_hash_caller { hash_selector_func selector_func; void *ctx; }; -// A default hash function for backends. -static uint32_t mcplib_hashfunc_murmur3_func(const void *key, size_t len, void *ctx) { - return MurmurHash3_x86_32(key, len); -} -static struct proxy_hash_caller mcplib_hashfunc_murmur3 = { mcplib_hashfunc_murmur3_func, NULL}; - enum mcp_backend_states { mcp_backend_read = 0, // waiting to read any response mcp_backend_parse, // have some buffered data to check @@ -383,12 +383,17 @@ typedef struct { mcp_backend_t *be; } mcp_pool_be_t; +#define KEY_HASH_FILTER_MAX 5 typedef struct mcp_pool_s mcp_pool_t; struct mcp_pool_s { struct proxy_hash_caller phc; + key_hash_filter_func key_filter; + key_hash_func key_hasher; pthread_mutex_t lock; // protects refcount. proxy_ctx_t *ctx; // main context. STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator. + char key_filter_conf[KEY_HASH_FILTER_MAX+1]; + uint64_t hash_seed; // calculated from a string. int refcount; int phc_ref; int self_ref; // TODO (v2): double check that this is needed. @@ -2903,8 +2908,119 @@ static int mcplib_pool_gc(lua_State *L) { return 0; } +// Looks for a short string in a key to separate which part gets hashed vs +// sent to the backend node. +// ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed. +static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) { + char temp[KEY_MAX_LENGTH+1]; + *newlen = klen; + if (klen > KEY_MAX_LENGTH) { + // Hedging against potential bugs. + return key; + } + + memcpy(temp, key, klen); + temp[klen+1] = '\0'; + + // TODO (v2): memmem would avoid the temp key and memcpy here, but it's + // not technically portable. An easy improvement would be to detect + // memmem() in `configure` and only use strstr/copy as a fallback. + // Since keys are short it's unlikely this would be a major performance + // win. + char *found = strstr(temp, conf); + + if (found) { + *newlen = found - temp; + } + + // hash stop can't change where keys start. + return key; +} + +// Takes a two character "tag", ie; "{}", or "$$", searches string for the +// first then second character. Only hashes the portion within these tags. +// *conf _must_ be two characters. +static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) { + *newlen = klen; + + const char *t1 = memchr(key, conf[0], klen); + if (t1) { + size_t remain = klen - (t1 - key); + // must be at least one character inbetween the tags to hash. + if (remain > 1) { + const char *t2 = memchr(t1, conf[1], remain); + + if (t2) { + *newlen = t2 - t1 - 1; + return t1+1; + } + } + } + + return key; +} + +static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) { + luaL_checktype(L, -1, LUA_TTABLE); + if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) { + proxy_lua_error(L, "key distribution object missing 'new' function"); + return; + } + + // - now create the copy pool table + lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint. + for (int x = 1; x <= p->pool_size; x++) { + mcp_backend_t *be = p->pool[x-1].be; + lua_createtable(L, 0, 4); + // stack = [p, h, f, optN, newpool, backend] + // the key should be fine for id? maybe don't need to duplicate + // this? + lua_pushinteger(L, x); + lua_setfield(L, -2, "id"); + // we don't use the hostname for ketama hashing + // so passing ip for hostname is fine + lua_pushstring(L, be->ip); + // FIXME: hostname should probably work... + lua_setfield(L, -2, "hostname"); + lua_pushstring(L, be->ip); + lua_setfield(L, -2, "addr"); + lua_pushstring(L, be->port); + lua_setfield(L, -2, "port"); + // TODO (v2): weight/etc? + + // set the backend table into the new pool table. + lua_rawseti(L, -2, x); + } + + // we can either use lua_insert() or possibly _rotate to shift + // things into the right place, but simplest is to just copy the + // option arg to the end of the stack. + lua_pushvalue(L, 2); + // - stack should be: pool, opts, func, pooltable, opts + + // call the dist new function. + int res = lua_pcall(L, 2, 2, 0); + + if (res != LUA_OK) { + lua_error(L); // error should be on the stack already. + return; + } + + // -1 is lightuserdata ptr to the struct (which must be owned by the + // userdata), which is later used for internal calls. + struct proxy_hash_caller *phc; + + luaL_checktype(L, -1, LUA_TLIGHTUSERDATA); + luaL_checktype(L, -2, LUA_TUSERDATA); + phc = lua_touserdata(L, -1); + memcpy(&p->phc, phc, sizeof(*phc)); + lua_pop(L, 1); + // -2 was userdata we need to hold a reference to + p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX); + // UD now popped from stack. +} + // p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) -// TODO: hash and hashfilter static int mcplib_pool(lua_State *L) { int argc = lua_gettop(L); luaL_checktype(L, 1, LUA_TTABLE); @@ -2915,7 +3031,8 @@ static int mcplib_pool(lua_State *L) { // Zero the memory before use, so we can realibly use __gc to clean up memset(p, 0, plen); p->pool_size = n; - p->refcount = 0; + // TODO (v2): Nicer if this is fetched from mcp.default_key_hash + p->key_hasher = XXH3_64bits_withSeed; pthread_mutex_init(&p->lock, NULL); p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue. @@ -2935,8 +3052,15 @@ static int mcplib_pool(lua_State *L) { } if (argc == 1) { - // Use default hash selector if none given. - p->phc = mcplib_hashfunc_murmur3; + lua_getglobal(L, "mcp"); + // TODO (v2): decide on a mcp.default_dist and use that instead + if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // pop "dist_jump_hash" value. + } else { + lua_pop(L, 1); + } + lua_pop(L, 1); // pop "mcp" return 1; } @@ -2947,72 +3071,68 @@ static int mcplib_pool(lua_State *L) { // stack: backends, options, mcp.pool if (lua_getfield(L, 2, "dist") != LUA_TNIL) { // overriding the distribution function. - luaL_checktype(L, -1, LUA_TTABLE); - if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) { - proxy_lua_error(L, "key distribution object missing 'new' function"); - return 0; - } + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // remove the dist table from stack. + } else { + lua_pop(L, 1); // pop the nil. + } - // - now create the copy pool table - lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint. - for (int x = 1; x <= p->pool_size; x++) { - mcp_backend_t *be = p->pool[x-1].be; - lua_createtable(L, 0, 4); - // stack = [p, h, f, optN, newpool, backend] - // the key should be fine for id? maybe don't need to duplicate - // this? - lua_pushinteger(L, x); - lua_setfield(L, -2, "id"); - // we don't use the hostname for ketama hashing - // so passing ip for hostname is fine - lua_pushstring(L, be->ip); - lua_setfield(L, -2, "hostname"); - lua_pushstring(L, be->ip); - lua_setfield(L, -2, "addr"); - lua_pushstring(L, be->port); - lua_setfield(L, -2, "port"); - // TODO (v2): weight/etc? - - // set the backend table into the new pool table. - lua_rawseti(L, -2, x); + if (lua_getfield(L, 2, "filter") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + const char *f_type = lua_tostring(L, -1); + if (strcmp(f_type, "stop") == 0) { + p->key_filter = mcp_key_hash_filter_stop; + } else if (strcmp(f_type, "tags") == 0) { + p->key_filter = mcp_key_hash_filter_tag; + } else { + proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type); } - // we can either use lua_insert() or possibly _rotate to shift - // things into the right place, but simplest is to just copy the - // option arg to the end of the stack. - lua_pushvalue(L, 2); - // - stack should be: pool, opts, func, pooltable, opts + lua_pop(L, 1); // pops "filter" value. - // call the dist new function. - int res = lua_pcall(L, 2, 2, 0); + if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) { + size_t len = 0; + const char *conf = lua_tolstring(L, -1, &len); + if (len < 2 || len > KEY_HASH_FILTER_MAX) { + proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX); + } - if (res != LUA_OK) { - lua_error(L); // error should be on the stack already. - return 0; + memcpy(p->key_filter_conf, conf, len); + p->key_filter_conf[len+1] = '\0'; + } else { + proxy_lua_error(L, "hash filter requires 'filter_conf' string"); } + lua_pop(L, 1); // pops "filter_conf" value. + } else { + lua_pop(L, 1); // pop the nil. + } - // -1 is lightuserdata ptr to the struct (which must be owned by the - // userdata), which is later used for internal calls. - struct proxy_hash_caller *phc; - - luaL_checktype(L, -1, LUA_TUSERDATA); - luaL_checktype(L, -2, LUA_TUSERDATA); - phc = lua_touserdata(L, -1); - memcpy(&p->phc, phc, sizeof(*phc)); + if (lua_getfield(L, 2, "hash") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TLIGHTUSERDATA); + struct proxy_hash_func *phf = lua_touserdata(L, -1); + p->key_hasher = phf->func; lua_pop(L, 1); - // -2 was userdata we need to hold a reference to - p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX); - // UD now popped from stack. + } else { + lua_pop(L, 1); // pop the nil. + } - lua_pop(L, 1); // remove the dist table from stack. + if (lua_getfield(L, 2, "seed") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + size_t seedlen; + const char *seedstr = lua_tolstring(L, -1, &seedlen); + // Note: the custom hasher for a dist may be "weird" in some cases, so + // we use a standard hash method for the seed here. + // I'm open to changing this (ie; mcp.pool_seed_hasher = etc) + p->hash_seed = XXH3_64bits(seedstr, seedlen); + + lua_pop(L, 1); } else { lua_pop(L, 1); // pop the nil. } - // TODO: getfield("filter") and add the function - // TODO: getfield("hash") and add the function - // TODO: getfield("seed") and use the hash function to calculate a seed - // value. + if (p->phc.selector_func == NULL) { + proxy_lua_error(L, "cannot create pool missing 'dist' argument"); + } return 1; } @@ -3034,6 +3154,25 @@ static int mcplib_pool_proxy_gc(lua_State *L) { return 0; } +static mcp_backend_t *_mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) { + if (p->key_filter) { + key = p->key_filter(p->key_filter_conf, key, len, &len); + P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key); + } + uint64_t hash = p->key_hasher(key, len, p->hash_seed); + uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx); + + assert(p->phc.ctx != NULL); + // attach the backend to the request object. + // the lua modules should "think" in 1 based indexes, so we need to + // subtract one here. + if (lookup >= p->pool_size) { + proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); + } + + return p->pool[lookup].be; +} + // hashfunc(request) -> backend(request) // needs key from request object. static int mcplib_pool_proxy_call(lua_State *L) { @@ -3050,24 +3189,7 @@ static int mcplib_pool_proxy_call(lua_State *L) { } const char *key = MCP_PARSER_KEY(rq->pr); size_t len = rq->pr.klen; - uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx); - - // attach the backend to the request object. - // save CPU cycles over rolling it through lua. - if (p->phc.ctx == NULL) { - // TODO: if NULL, pass in pool_size as ctx? - // works because the % bit will return an id we can index here. - // FIXME: temporary? maybe? - // if no context, what we got back was a hash which we need to modulus - // against the pool, since the func has no info about the pool. - rq->be = p->pool[lookup % p->pool_size].be; - } else { - // else we have a direct id into our pool. - // the lua modules should "think" in 1 based indexes, so we need to - // subtract one here. - // TODO: bother validating the range? - rq->be = p->pool[lookup-1].be; - } + rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len); // now yield request, pool up. return lua_yield(L, 2); @@ -3852,57 +3974,34 @@ static int mcplib_request_gc(lua_State *L) { typedef struct { struct proxy_hash_caller phc; // passed back to proxy API - uint64_t seed; unsigned int buckets; } mcplib_jump_hash_t; -static uint32_t mcplib_jump_hash_get_server(const void *key, size_t len, void *ctx) { +static uint32_t mcplib_dist_jump_hash_get_server(uint64_t hash, void *ctx) { mcplib_jump_hash_t *jh = ctx; - uint64_t hash = XXH3_64bits_withSeed(key, len, jh->seed); - int64_t b = -1, j = 0; while (j < jh->buckets) { b = j; hash = hash * 2862933555777941757ULL + 1; j = (b + 1) * ((double)(1LL << 31) / (double)((hash >> 33) + 1)); } - return b+1; // FIXME: do the -1 just for ketama and remove from internal code? + return b; } // stack = [pool, option] -static int mcplib_jump_hash_new(lua_State *L) { - uint64_t seed = 0; - const char *seedstr = NULL; - size_t seedlen = 0; - +static int mcplib_dist_jump_hash_new(lua_State *L) { luaL_checktype(L, 1, LUA_TTABLE); lua_Unsigned buckets = lua_rawlen(L, 1); - int argc = lua_gettop(L); - if (argc > 1) { - // options supplied. to be specified as a table. - // { seed = "foo" } - luaL_checktype(L, 2, LUA_TTABLE); - // FIXME: adjust so we ensure/error on this being a string? - if (lua_getfield(L, 2, "seed") != LUA_TNIL) { - seedstr = lua_tolstring(L, -1, &seedlen); - seed = XXH3_64bits(seedstr, seedlen); - } else { - dump_stack(L); - } - lua_pop(L, 1); - } - mcplib_jump_hash_t *jh = lua_newuserdatauv(L, sizeof(mcplib_jump_hash_t), 0); // don't need to loop through the table at all, just need its length. // could optimize startup time by adding hints to the module for how to // format pool (ie; just a total count or the full table) - jh->seed = seed; jh->buckets = buckets; jh->phc.ctx = jh; - jh->phc.selector_func = mcplib_jump_hash_get_server; + jh->phc.selector_func = mcplib_dist_jump_hash_get_server; lua_pushlightuserdata(L, &jh->phc); @@ -3910,9 +4009,9 @@ static int mcplib_jump_hash_new(lua_State *L) { return 2; } -static int mcplib_open_jump_hash(lua_State *L) { +static int mcplib_open_dist_jump_hash(lua_State *L) { const struct luaL_Reg jump_f[] = { - {"new", mcplib_jump_hash_new}, + {"new", mcplib_dist_jump_hash_new}, {NULL, NULL}, }; @@ -4047,6 +4146,9 @@ static int mcplib_await(lua_State *L) { int n = luaL_len(L, 2); // length of hash selector table int wait_for = 0; // 0 means wait for all responses + if (n <= 0) { + proxy_lua_error(L, "mcp.await arguments must have at least one pool"); + } if (lua_isnumber(L, 3)) { wait_for = lua_tointeger(L, 3); lua_pop(L, 1); @@ -4054,9 +4156,8 @@ static int mcplib_await(lua_State *L) { wait_for = n; } } - // TODO: bail if pool table was 0 len? else bad things can happen. - // TODO: quickly loop table once and ensure they're all pools? + // TODO (v2): quickly loop table once and ensure they're all pools? int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object. @@ -4166,21 +4267,15 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) { while (lua_next(L, 1) != 0) { P_DEBUG("%s: top of loop\n", __func__); // (key, -2), (val, -1) - // FIXME: move to a func. mostly redundant with hsp_call()? mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy"); if (pp == NULL) { - // TODO: fatal! wasn't correct object type + proxy_lua_error(L, "mcp.await must be supplied with a pool"); } mcp_pool_t *p = pp->main; - uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx); // NOTE: rq->be is only held to help pass the backend into the IOP in // mcp_queue call. Could be a local variable and an argument too. - if (p->phc.ctx == NULL) { - rq->be = p->pool[lookup % p->pool_size].be; - } else { - rq->be = p->pool[lookup-1].be; - } + rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len); mcp_queue_await_io(c, L, rq, await_ref); @@ -4307,6 +4402,19 @@ static int mcplib_await_return(io_pending_proxy_t *p) { /*** END lua await() object interface ***/ +/*** START xxhash module ***/ + +static struct proxy_hash_func mcplib_hash_xxhash = { + XXH3_64bits_withSeed, +}; + +static int mcplib_open_hash_xxhash(lua_State *L) { + lua_pushlightuserdata(L, &mcplib_hash_xxhash); + return 1; +} + +/*** END xxhash module ***/ + // Creates and returns the top level "mcp" module int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { lua_State *L = ctx; @@ -4402,14 +4510,13 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { luaL_newlibtable(L, mcplib_f); proxy_register_defines(L); + mcplib_open_hash_xxhash(L); + lua_setfield(L, -2, "hash_xxhash"); // hash function for selectors. // have to wrap the function in a struct because function pointers aren't // pointer pointers :) - mcplib_open_jump_hash(L); - lua_setfield(L, -2, "hash_jump"); - // FIXME: remove this once proper default is added. - lua_pushlightuserdata(L, &mcplib_hashfunc_murmur3); - lua_setfield(L, -2, "hash_murmur3"); + mcplib_open_dist_jump_hash(L); + lua_setfield(L, -2, "dist_jump_hash"); lua_pushlightuserdata(L, (void *)t); // upvalue for original thread lua_newtable(L); // upvalue for mcp.attach() table. diff --git a/t/startfile.lua b/t/startfile.lua index 1d4d9af..b1d2787 100644 --- a/t/startfile.lua +++ b/t/startfile.lua @@ -92,7 +92,7 @@ function mcp_config_pools(oldss) for _, subs in pairs(main_zones) do for k, v in pairs(subs) do -- use next line instead for a third party ketama hash - -- subs[k] = mcp.pool(v, { dist = ketama }) + -- subs[k] = mcp.pool(v, { dist = ketama, hash = ketama.hash }) -- this line overrides the default bucket size for ketama -- subs[k] = mcp.pool(v, { dist = ketama, obucket = 80 }) -- this line uses the default murmur3 straight hash. @@ -104,7 +104,9 @@ function mcp_config_pools(oldss) -- for each zone. -- NOTE: 'k' may not be the right seed here: -- instead stitch main_zone's key + the sub key? - -- subs[k] = mcp.pool(v, { dist = mcp.hash_jump, seed = k }) + -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k }) + -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "stop", filter_conf = "|#|" }) + -- subs[k] = mcp.pool(v, { dist = mcp.dist_jump_hash, seed = k, filter = "tags", filter_conf = "{}" }) end end |