diff options
Diffstat (limited to 'proxy_lua.c')
-rw-r--r-- | proxy_lua.c | 147 |
1 files changed, 112 insertions, 35 deletions
diff --git a/proxy_lua.c b/proxy_lua.c index e6b50ae..aeaf1e5 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) { // Since we're running in the config thread it could just busy poll // until the connection was picked up. assert(be->transferred); - proxy_event_thread_t *e = ctx->proxy_threads; + proxy_event_thread_t *e = be->event_thread; pthread_mutex_lock(&e->mutex); STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); pthread_mutex_unlock(&e->mutex); @@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) { return 1; // return be object. } +// Called with the cache label at top of the stack. static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) { // first check our reference table to compare. // Note: The upvalue won't be found unless we're running from a function with it // set as an upvalue. - lua_pushlstring(L, bel->label, bel->llen); int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); if (ret != LUA_TNIL) { mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap"); @@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_ return NULL; } -static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) { +static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel, + proxy_event_thread_t *e) { // FIXME: remove global. proxy_ctx_t *ctx = settings.proxy_ctx; @@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la STAT_UL(ctx); be->connect_flags = flags; - proxy_event_thread_t *e = ctx->proxy_threads; + be->event_thread = e; pthread_mutex_lock(&e->mutex); STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); pthread_mutex_unlock(&e->mutex); @@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la } #endif + lua_pushvalue(L, 4); // push the label string back to the top. // Add this new backend connection to the object cache. - lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable. lua_pushvalue(L, -2); // copy the backend reference to the top. // set our new backend wrapper object into the reference table. lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); @@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) { // UD now popped from stack. } -// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) -static int mcplib_pool(lua_State *L) { - int argc = lua_gettop(L); - luaL_checktype(L, 1, LUA_TTABLE); - int n = luaL_len(L, 1); // get length of array table - - size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n; - mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); - // Zero the memory before use, so we can realibly use __gc to clean up - memset(p, 0, plen); - p->pool_size = n; - // TODO (v2): Nicer if this is fetched from mcp.default_key_hash - p->key_hasher = XXH3_64bits_withSeed; - pthread_mutex_init(&p->lock, NULL); - p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); - - luaL_setmetatable(L, "mcp.pool"); - - lua_pushvalue(L, -1); // dupe self for reference. - p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); - - // TODO (v2): move to after function check so we can find the right - // backend label to look up. +// in the proxy object, we can alias a ptr to the pool to where it needs to be +// based on worker number or io_thread right? +static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) { // remember lua arrays are 1 indexed. - for (int x = 1; x <= n; x++) { - mcp_pool_be_t *s = &p->pool[x-1]; + for (int x = 1; x <= p->pool_size; x++) { + mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)]; lua_geti(L, 1, x); // get next server into the stack. // If we bail here, the pool _gc() should handle releasing any backend // references we made so far. mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend"); // check label for pre-existing backend conn/wrapper + // TODO (v2): there're native ways of "from C make lua strings" + int toconcat = 1; + if (p->beprefix[0] != '\0') { + lua_pushstring(L, p->beprefix); + toconcat++; + } + if (p->use_iothread) { + lua_pushstring(L, ":io:"); + toconcat++; + } else { + lua_pushstring(L, ":w"); + lua_pushinteger(L, offset); + lua_pushstring(L, ":"); + toconcat += 3; + } + lua_pushlstring(L, bel->label, bel->llen); + lua_concat(L, toconcat); + + lua_pushvalue(L, -1); // copy the label string for the create method. mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel); if (bew == NULL) { - bew = _mcplib_make_backendconn(L, bel); + bew = _mcplib_make_backendconn(L, bel, t); } s->be = bew->be; // unwrap the backend connection for direct ref. + bew->be->use_io_thread = p->use_iothread; // If found from cache or made above, the backend wrapper is on the // top of the stack, so we can now take its reference. @@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) { s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. lua_pop(L, 1); // pop the mcp.backend label object. + lua_pop(L, 1); // drop extra label copy. + } +} + +// call with table of backends in 1 +static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) { + if (p->use_iothread) { + proxy_ctx_t *ctx = settings.proxy_ctx; + _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread); + } else { + // TODO (v3) globals. + for (int n = 0; n < settings.num_threads; n++) { + LIBEVENT_THREAD *t = get_worker_thread(n); + _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread); + } } +} + +// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) +static int mcplib_pool(lua_State *L) { + int argc = lua_gettop(L); + luaL_checktype(L, 1, LUA_TTABLE); + int n = luaL_len(L, 1); // get length of array table + int workers = settings.num_threads; // TODO (v3): globals usage. + + size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers); + mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); + // Zero the memory before use, so we can realibly use __gc to clean up + memset(p, 0, plen); + p->pool_size = n; + p->use_iothread = true; + // TODO (v2): Nicer if this is fetched from mcp.default_key_hash + p->key_hasher = XXH3_64bits_withSeed; + pthread_mutex_init(&p->lock, NULL); + p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); + + luaL_setmetatable(L, "mcp.pool"); + + lua_pushvalue(L, -1); // dupe self for reference. + p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); // Allow passing an ignored nil as a second argument. Makes the lua easier int type = lua_type(L, 2); if (argc == 1 || type == LUA_TNIL) { + _mcplib_pool_make_be(L, p); lua_getglobal(L, "mcp"); // TODO (v2): decide on a mcp.default_dist and use that instead if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { @@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) { // pool, then pass it along to the a constructor if necessary. luaL_checktype(L, 2, LUA_TTABLE); + if (lua_getfield(L, 2, "iothread") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TBOOLEAN); + int use_iothread = lua_toboolean(L, -1); + if (use_iothread) { + p->use_iothread = true; + } else { + p->use_iothread = false; + } + lua_pop(L, 1); // remove value. + } else { + lua_pop(L, 1); // pop the nil. + } + + if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + size_t len = 0; + const char *bepfx = lua_tolstring(L, -1, &len); + memcpy(p->beprefix, bepfx, len); + p->beprefix[len+1] = '\0'; + lua_pop(L, 1); // pop beprefix string. + } else { + lua_pop(L, 1); // pop the nil. + } + _mcplib_pool_make_be(L, p); + // stack: backends, options, mcp.pool if (lua_getfield(L, 2, "dist") != LUA_TNIL) { // overriding the distribution function. @@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) { lua_pop(L, 1); // remove the dist table from stack. } else { lua_pop(L, 1); // pop the nil. + + // use the default dist if not specified with an override table. + lua_getglobal(L, "mcp"); + // TODO (v2): decide on a mcp.default_dist and use that instead + if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // pop "dist_jump_hash" value. + } else { + lua_pop(L, 1); + } + lua_pop(L, 1); // pop "mcp" } if (lua_getfield(L, 2, "filter") != LUA_TNIL) { @@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) { return 0; } -mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) { +mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) { + mcp_pool_t *p = pp->main; if (p->key_filter) { key = p->key_filter(p->key_filter_conf, key, len, &len); P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key); @@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); } - return p->pool[lookup].be; + return pp->pool[lookup].be; } // hashfunc(request) -> backend(request) @@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const static int mcplib_pool_proxy_call(lua_State *L) { // internal args are the hash selector (self) mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); - mcp_pool_t *p = pp->main; // then request object. mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); @@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) { } const char *key = MCP_PARSER_KEY(rq->pr); size_t len = rq->pr.klen; - rq->be = mcplib_pool_proxy_call_helper(L, p, key, len); + rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len); // now yield request, pool up. lua_pushinteger(L, MCP_YIELD_POOL); |