diff options
Diffstat (limited to 'proxy_lua.c')
-rw-r--r-- | proxy_lua.c | 752 |
1 files changed, 752 insertions, 0 deletions
diff --git a/proxy_lua.c b/proxy_lua.c new file mode 100644 index 0000000..2431417 --- /dev/null +++ b/proxy_lua.c @@ -0,0 +1,752 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ + +#include "proxy.h" + +// func prototype example: +// static int fname (lua_State *L) +// normal library open: +// int luaopen_mcp(lua_State *L) { } + +// resp:ok() +static int mcplib_response_ok(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + if (r->status == MCMC_OK) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + + return 1; +} + +static int mcplib_response_hit(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + if (r->status == MCMC_OK && r->resp.code != MCMC_CODE_MISS) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + + return 1; +} + +static int mcplib_response_gc(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + // On error/similar we might be holding the read buffer. + // If the buf is handed off to mc_resp for return, this pointer is NULL + if (r->buf != NULL) { + free(r->buf); + } + + return 0; +} + +// NOTE: backends are global objects owned by pool objects. +// Each pool has a "proxy pool object" distributed to each worker VM. +// proxy pool objects are held at the same time as any request exists on a +// backend, in the coroutine stack during yield() +// To free a backend: All proxies for a pool are collected, then the central +// pool is collected, which releases backend references, which allows backend +// to be collected. +static int mcplib_backend_gc(lua_State *L) { + mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend"); + + assert(STAILQ_EMPTY(&be->io_head)); + + mcmc_disconnect(be->client); + free(be->client); + + // FIXME (v2): upvalue for global ctx. + proxy_ctx_t *ctx = settings.proxy_ctx; + STAT_DECR(ctx, backend_total, 1); + + return 0; +} + +static int mcplib_backend(lua_State *L) { + luaL_checkstring(L, -4); // label for indexing backends. + const char *ip = luaL_checkstring(L, -3); + const char *port = luaL_checkstring(L, -2); + double weight = luaL_checknumber(L, -1); + // FIXME (v2): upvalue for global ctx. + proxy_ctx_t *ctx = settings.proxy_ctx; + + // first check our reference table to compare. + lua_pushvalue(L, -4); + int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); + if (ret != LUA_TNIL) { + mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend"); + if (strncmp(be_orig->ip, ip, MAX_IPLEN) == 0 + && strncmp(be_orig->port, port, MAX_PORTLEN) == 0 + && be_orig->weight == weight) { + // backend is the same, return it. + return 1; + } else { + // backend not the same, pop from stack and make new one. + lua_pop(L, 1); + } + } else { + lua_pop(L, 1); + } + + // This might shift to internal objects? + mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0); + + // FIXME (v2): remove some of the excess zero'ing below? + memset(be, 0, sizeof(mcp_backend_t)); + strncpy(be->ip, ip, MAX_IPLEN); + strncpy(be->port, port, MAX_PORTLEN); + be->weight = weight; + be->depth = 0; + be->rbuf = NULL; + be->failed_count = 0; + STAILQ_INIT(&be->io_head); + be->state = mcp_backend_read; + be->connecting = false; + be->can_write = false; + be->stacked = false; + be->bad = false; + + // this leaves a permanent buffer on the backend, which is fine + // unless you have billions of backends. + // we can later optimize for pulling buffers from idle backends. + be->rbuf = malloc(READ_BUFFER_SIZE); + if (be->rbuf == NULL) { + proxy_lua_error(L, "out of memory allocating backend"); + return 0; + } + + // initialize libevent. + memset(&be->event, 0, sizeof(be->event)); + + // initialize the client + be->client = malloc(mcmc_size(MCMC_OPTION_BLANK)); + if (be->client == NULL) { + proxy_lua_error(L, "out of memory allocating backend"); + return 0; + } + // TODO (v2): connect elsewhere. When there're multiple backend owners, or + // sockets per backend, etc. We'll want to kick off connects as use time. + // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction. + // This is a trivial fix if we ensure a backend's owning event thread is + // set before it can be used in the proxy, as it would have access to the + // tunables structure. _reset_bad_backend() may not have its event thread + // set 100% of the time and I don't want to introduce a crash right now, + // so I'm writing this overly long comment. :) + int flags = MCMC_OPTION_NONBLOCK; + STAT_L(ctx); + if (ctx->tunables.tcp_keepalive) { + flags |= MCMC_OPTION_TCP_KEEPALIVE; + } + STAT_UL(ctx); + be->connect_flags = flags; + int status = mcmc_connect(be->client, be->ip, be->port, flags); + if (status == MCMC_CONNECTED) { + // FIXME (v2): is this possible? do we ever want to allow blocking + // connections? + proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->ip, be->port); + return 0; + } else if (status == MCMC_CONNECTING) { + be->connecting = true; + be->can_write = false; + } else { + proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->ip, be->port); + return 0; + } + + luaL_getmetatable(L, "mcp.backend"); + lua_setmetatable(L, -2); // set metatable to userdata. + + lua_pushvalue(L, 1); // put the label at the top for settable later. + lua_pushvalue(L, -2); // copy the backend reference to the top. + // set our new backend object into the reference table. + lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); + // stack is back to having backend on the top. + + STAT_INCR(ctx, backend_total, 1); + + return 1; +} + +static int mcplib_pool_gc(lua_State *L) { + mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool"); + assert(p->refcount == 0); + pthread_mutex_destroy(&p->lock); + + for (int x = 0; x < p->pool_size; x++) { + if (p->pool[x].ref) { + luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref); + } + } + + return 0; +} + +// Looks for a short string in a key to separate which part gets hashed vs +// sent to the backend node. +// ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed. +static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) { + char temp[KEY_MAX_LENGTH+1]; + *newlen = klen; + if (klen > KEY_MAX_LENGTH) { + // Hedging against potential bugs. + return key; + } + + memcpy(temp, key, klen); + temp[klen+1] = '\0'; + + // TODO (v2): memmem would avoid the temp key and memcpy here, but it's + // not technically portable. An easy improvement would be to detect + // memmem() in `configure` and only use strstr/copy as a fallback. + // Since keys are short it's unlikely this would be a major performance + // win. + char *found = strstr(temp, conf); + + if (found) { + *newlen = found - temp; + } + + // hash stop can't change where keys start. + return key; +} + +// Takes a two character "tag", ie; "{}", or "$$", searches string for the +// first then second character. Only hashes the portion within these tags. +// *conf _must_ be two characters. +static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) { + *newlen = klen; + + const char *t1 = memchr(key, conf[0], klen); + if (t1) { + size_t remain = klen - (t1 - key); + // must be at least one character inbetween the tags to hash. + if (remain > 1) { + const char *t2 = memchr(t1, conf[1], remain); + + if (t2) { + *newlen = t2 - t1 - 1; + return t1+1; + } + } + } + + return key; +} + +static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) { + luaL_checktype(L, -1, LUA_TTABLE); + if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) { + proxy_lua_error(L, "key distribution object missing 'new' function"); + return; + } + + // - now create the copy pool table + lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint. + for (int x = 1; x <= p->pool_size; x++) { + mcp_backend_t *be = p->pool[x-1].be; + lua_createtable(L, 0, 4); + // stack = [p, h, f, optN, newpool, backend] + // the key should be fine for id? maybe don't need to duplicate + // this? + lua_pushinteger(L, x); + lua_setfield(L, -2, "id"); + // we don't use the hostname for ketama hashing + // so passing ip for hostname is fine + lua_pushstring(L, be->ip); + // FIXME: hostname should probably work... + lua_setfield(L, -2, "hostname"); + lua_pushstring(L, be->ip); + lua_setfield(L, -2, "addr"); + lua_pushstring(L, be->port); + lua_setfield(L, -2, "port"); + // TODO (v2): weight/etc? + + // set the backend table into the new pool table. + lua_rawseti(L, -2, x); + } + + // we can either use lua_insert() or possibly _rotate to shift + // things into the right place, but simplest is to just copy the + // option arg to the end of the stack. + lua_pushvalue(L, 2); + // - stack should be: pool, opts, func, pooltable, opts + + // call the dist new function. + int res = lua_pcall(L, 2, 2, 0); + + if (res != LUA_OK) { + lua_error(L); // error should be on the stack already. + return; + } + + // -1 is lightuserdata ptr to the struct (which must be owned by the + // userdata), which is later used for internal calls. + struct proxy_hash_caller *phc; + + luaL_checktype(L, -1, LUA_TLIGHTUSERDATA); + luaL_checktype(L, -2, LUA_TUSERDATA); + phc = lua_touserdata(L, -1); + memcpy(&p->phc, phc, sizeof(*phc)); + lua_pop(L, 1); + // -2 was userdata we need to hold a reference to + p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX); + // UD now popped from stack. +} + +// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) +static int mcplib_pool(lua_State *L) { + int argc = lua_gettop(L); + luaL_checktype(L, 1, LUA_TTABLE); + int n = luaL_len(L, 1); // get length of array table + + size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n; + mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); + // Zero the memory before use, so we can realibly use __gc to clean up + memset(p, 0, plen); + p->pool_size = n; + // TODO (v2): Nicer if this is fetched from mcp.default_key_hash + p->key_hasher = XXH3_64bits_withSeed; + pthread_mutex_init(&p->lock, NULL); + p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue. + + luaL_setmetatable(L, "mcp.pool"); + + lua_pushvalue(L, -1); // dupe self for reference. + p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + + // remember lua arrays are 1 indexed. + for (int x = 1; x <= n; x++) { + mcp_pool_be_t *s = &p->pool[x-1]; + lua_geti(L, 1, x); // get next server into the stack. + // If we bail here, the pool _gc() should handle releasing any backend + // references we made so far. + s->be = luaL_checkudata(L, -1, "mcp.backend"); + s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. + } + + if (argc == 1) { + lua_getglobal(L, "mcp"); + // TODO (v2): decide on a mcp.default_dist and use that instead + if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // pop "dist_jump_hash" value. + } else { + lua_pop(L, 1); + } + lua_pop(L, 1); // pop "mcp" + return 1; + } + + // Supplied with an options table. We inspect this table to decorate the + // pool, then pass it along to the a constructor if necessary. + luaL_checktype(L, 2, LUA_TTABLE); + + // stack: backends, options, mcp.pool + if (lua_getfield(L, 2, "dist") != LUA_TNIL) { + // overriding the distribution function. + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // remove the dist table from stack. + } else { + lua_pop(L, 1); // pop the nil. + } + + if (lua_getfield(L, 2, "filter") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + const char *f_type = lua_tostring(L, -1); + if (strcmp(f_type, "stop") == 0) { + p->key_filter = mcp_key_hash_filter_stop; + } else if (strcmp(f_type, "tags") == 0) { + p->key_filter = mcp_key_hash_filter_tag; + } else { + proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type); + } + + lua_pop(L, 1); // pops "filter" value. + + if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) { + size_t len = 0; + const char *conf = lua_tolstring(L, -1, &len); + if (len < 2 || len > KEY_HASH_FILTER_MAX) { + proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX); + } + + memcpy(p->key_filter_conf, conf, len); + p->key_filter_conf[len+1] = '\0'; + } else { + proxy_lua_error(L, "hash filter requires 'filter_conf' string"); + } + lua_pop(L, 1); // pops "filter_conf" value. + } else { + lua_pop(L, 1); // pop the nil. + } + + if (lua_getfield(L, 2, "hash") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TLIGHTUSERDATA); + struct proxy_hash_func *phf = lua_touserdata(L, -1); + p->key_hasher = phf->func; + lua_pop(L, 1); + } else { + lua_pop(L, 1); // pop the nil. + } + + if (lua_getfield(L, 2, "seed") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + size_t seedlen; + const char *seedstr = lua_tolstring(L, -1, &seedlen); + // Note: the custom hasher for a dist may be "weird" in some cases, so + // we use a standard hash method for the seed here. + // I'm open to changing this (ie; mcp.pool_seed_hasher = etc) + p->hash_seed = XXH3_64bits(seedstr, seedlen); + + lua_pop(L, 1); + } else { + lua_pop(L, 1); // pop the nil. + } + + if (p->phc.selector_func == NULL) { + proxy_lua_error(L, "cannot create pool missing 'dist' argument"); + } + + return 1; +} + +static int mcplib_pool_proxy_gc(lua_State *L) { + mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy"); + mcp_pool_t *p = pp->main; + pthread_mutex_lock(&p->lock); + p->refcount--; + if (p->refcount == 0) { + proxy_ctx_t *ctx = p->ctx; + pthread_mutex_lock(&ctx->manager_lock); + STAILQ_INSERT_TAIL(&ctx->manager_head, p, next); + pthread_cond_signal(&ctx->manager_cond); + pthread_mutex_unlock(&ctx->manager_lock); + } + pthread_mutex_unlock(&p->lock); + + return 0; +} + +mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) { + if (p->key_filter) { + key = p->key_filter(p->key_filter_conf, key, len, &len); + P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key); + } + uint64_t hash = p->key_hasher(key, len, p->hash_seed); + uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx); + + assert(p->phc.ctx != NULL); + // attach the backend to the request object. + // the lua modules should "think" in 1 based indexes, so we need to + // subtract one here. + if (lookup >= p->pool_size) { + proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); + } + + return p->pool[lookup].be; +} + +// hashfunc(request) -> backend(request) +// needs key from request object. +static int mcplib_pool_proxy_call(lua_State *L) { + // internal args are the hash selector (self) + mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); + mcp_pool_t *p = pp->main; + // then request object. + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + + // we have a fast path to the key/length. + if (!rq->pr.keytoken) { + proxy_lua_error(L, "cannot route commands without key"); + return 0; + } + const char *key = MCP_PARSER_KEY(rq->pr); + size_t len = rq->pr.klen; + rq->be = mcplib_pool_proxy_call_helper(L, p, key, len); + + // now yield request, pool up. + return lua_yield(L, 2); +} + +static int mcplib_tcp_keepalive(lua_State *L) { + luaL_checktype(L, -1, LUA_TBOOLEAN); + int state = lua_toboolean(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->tunables.tcp_keepalive = state; + STAT_UL(ctx); + + return 0; +} + +static int mcplib_backend_failure_limit(lua_State *L) { + int limit = luaL_checkinteger(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + if (limit < 0) { + proxy_lua_error(L, "backend_failure_limit must be >= 0"); + return 0; + } + + STAT_L(ctx); + ctx->tunables.backend_failure_limit = limit; + STAT_UL(ctx); + + return 0; +} + +// sad, I had to look this up... +#define NANOSECONDS(x) ((x) * 1E9 + 0.5) +#define MICROSECONDS(x) ((x) * 1E6 + 0.5) + +static int mcplib_backend_connect_timeout(lua_State *L) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->tunables.connect.tv_sec = secondsi; + ctx->tunables.connect.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + ctx->tunables.connect_ur.tv_sec = secondsi; + ctx->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + STAT_UL(ctx); + + return 0; +} + +static int mcplib_backend_retry_timeout(lua_State *L) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->tunables.retry.tv_sec = secondsi; + ctx->tunables.retry.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + ctx->tunables.retry_ur.tv_sec = secondsi; + ctx->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + STAT_UL(ctx); + + return 0; +} + +static int mcplib_backend_read_timeout(lua_State *L) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->tunables.read.tv_sec = secondsi; + ctx->tunables.read.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + ctx->tunables.read_ur.tv_sec = secondsi; + ctx->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + STAT_UL(ctx); + + return 0; +} + +// mcp.attach(mcp.HOOK_NAME, function) +// fill hook structure: if lua function, use luaL_ref() to store the func +static int mcplib_attach(lua_State *L) { + // Pull the original worker thread out of the shared mcplib upvalue. + LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); + + int hook = luaL_checkinteger(L, -2); + // pushvalue to dupe func and etc. + // can leave original func on stack afterward because it'll get cleared. + int loop_end = 0; + int loop_start = 1; + if (hook == CMD_ANY) { + // if CMD_ANY we need individually set loop 1 to CMD_SIZE. + loop_end = CMD_SIZE; + } else if (hook == CMD_ANY_STORAGE) { + // if CMD_ANY_STORAGE we only override get/set/etc. + loop_end = CMD_END_STORAGE; + } else { + loop_start = hook; + loop_end = hook + 1; + } + + if (lua_isfunction(L, -1)) { + struct proxy_hook *hooks = t->proxy_hooks; + + for (int x = loop_start; x < loop_end; x++) { + struct proxy_hook *h = &hooks[x]; + lua_pushvalue(L, -1); // duplicate the function for the ref. + if (h->lua_ref) { + // remove existing reference. + luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref); + } + + // pops the function from the stack and leaves us a ref. for later. + h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + h->is_lua = true; + } + } else { + proxy_lua_error(L, "Must pass a function to mcp.attach"); + return 0; + } + + return 0; +} + +/*** START lua interface to logger ***/ + +static int mcplib_log(lua_State *L) { + LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); + const char *msg = luaL_checkstring(L, -1); + LOGGER_LOG(t->l, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg); + return 0; +} + +/*** END lua interface to logger ***/ + +static void proxy_register_defines(lua_State *L) { +#define X(x) \ + lua_pushinteger(L, x); \ + lua_setfield(L, -2, #x); + + X(P_OK); + X(CMD_ANY); + X(CMD_ANY_STORAGE); + X(AWAIT_GOOD); + X(AWAIT_ANY); + X(AWAIT_OK); + X(AWAIT_FIRST); + CMD_FIELDS +#undef X +} + +// Creates and returns the top level "mcp" module +int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { + lua_State *L = ctx; + + const struct luaL_Reg mcplib_backend_m[] = { + {"set", NULL}, + {"__gc", mcplib_backend_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_request_m[] = { + {"command", mcplib_request_command}, + {"key", mcplib_request_key}, + {"ltrimkey", mcplib_request_ltrimkey}, + {"rtrimkey", mcplib_request_rtrimkey}, + {"token", mcplib_request_token}, + {"ntokens", mcplib_request_ntokens}, + {"__tostring", NULL}, + {"__gc", mcplib_request_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_response_m[] = { + {"ok", mcplib_response_ok}, + {"hit", mcplib_response_hit}, + {"__gc", mcplib_response_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_pool_m[] = { + {"__gc", mcplib_pool_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_pool_proxy_m[] = { + {"__call", mcplib_pool_proxy_call}, + {"__gc", mcplib_pool_proxy_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_f [] = { + {"pool", mcplib_pool}, + {"backend", mcplib_backend}, + {"request", mcplib_request}, + {"attach", mcplib_attach}, + {"add_stat", mcplib_add_stat}, + {"stat", mcplib_stat}, + {"await", mcplib_await}, + {"log", mcplib_log}, + {"backend_connect_timeout", mcplib_backend_connect_timeout}, + {"backend_retry_timeout", mcplib_backend_retry_timeout}, + {"backend_read_timeout", mcplib_backend_read_timeout}, + {"backend_failure_limit", mcplib_backend_failure_limit}, + {"tcp_keepalive", mcplib_tcp_keepalive}, + {NULL, NULL} + }; + + // TODO (v2): function + loop. + luaL_newmetatable(L, "mcp.backend"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_backend_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.request"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_request_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.response"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_response_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.pool"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_pool_m, 0); // register methods + lua_pop(L, 1); // drop the hash selector metatable + + luaL_newmetatable(L, "mcp.pool_proxy"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods + lua_pop(L, 1); // drop the hash selector metatable + + // create main library table. + //luaL_newlib(L, mcplib_f); + // TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things + // here. + // can replace with createtable and add the num. of the constant + // definitions. + luaL_newlibtable(L, mcplib_f); + proxy_register_defines(L); + + mcplib_open_hash_xxhash(L); + lua_setfield(L, -2, "hash_xxhash"); + // hash function for selectors. + // have to wrap the function in a struct because function pointers aren't + // pointer pointers :) + mcplib_open_dist_jump_hash(L); + lua_setfield(L, -2, "dist_jump_hash"); + + lua_pushlightuserdata(L, (void *)t); // upvalue for original thread + lua_newtable(L); // upvalue for mcp.attach() table. + + // create weak table for storing backends by label. + lua_newtable(L); // {} + lua_newtable(L); // {}, {} for metatable + lua_pushstring(L, "v"); // {}, {}, "v" for weak values. + lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"} + lua_setmetatable(L, -2); // {__mt = {__mode = "v"} } + + luaL_setfuncs(L, mcplib_f, 3); // store upvalues. + + lua_setglobal(L, "mcp"); // set the lib table to mcp global. + return 1; +} |