diff options
Diffstat (limited to 'proxy_lua.c')
-rw-r--r-- | proxy_lua.c | 171 |
1 files changed, 129 insertions, 42 deletions
diff --git a/proxy_lua.c b/proxy_lua.c index 2172733..7fa07bb 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -90,67 +90,130 @@ static int mcplib_response_gc(lua_State *L) { // To free a backend: All proxies for a pool are collected, then the central // pool is collected, which releases backend references, which allows backend // to be collected. -static int mcplib_backend_gc(lua_State *L) { - mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend"); - - assert(STAILQ_EMPTY(&be->io_head)); +static int mcplib_backend_wrap_gc(lua_State *L) { + mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap"); + // FIXME: remove global. + proxy_ctx_t *ctx = settings.proxy_ctx; - mcmc_disconnect(be->client); + if (bew->be != NULL) { + mcp_backend_t *be = bew->be; + // TODO (v3): technically a race where a backend could be created, + // queued, but not picked up before being gc'ed again. In practice + // this is impossible but at some point we should close the loop here. + // Since we're running in the config thread it could just busy poll + // until the connection was picked up. + assert(be->transferred); + proxy_event_thread_t *e = ctx->proxy_threads; + pthread_mutex_lock(&e->mutex); + STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); + pthread_mutex_unlock(&e->mutex); + + // Signal to check queue. +#ifdef USE_EVENTFD + uint64_t u = 1; + // TODO (v2): check result? is it ever possible to get a short write/failure + // for an eventfd? + if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + assert(1 == 0); + } +#else + if (write(e->be_notify_send_fd, "w", 1) <= 0) { + assert(1 == 0); + } +#endif + } - // FIXME (v2): can we ensure a backend always has be->event_thread set, so - // we can use be->event_thread->ctx? - proxy_ctx_t *ctx = settings.proxy_ctx; STAT_DECR(ctx, backend_total, 1); - free(be->client); return 0; } +static int mcplib_backend_gc(lua_State *L) { + return 0; // no-op. +} + +// backend label object; given to pools which then find or create backend +// objects as necessary. static int mcplib_backend(lua_State *L) { - luaL_checkstring(L, -3); // label for indexing backends. + size_t llen = 0; size_t nlen = 0; - const char *name = luaL_checklstring(L, -2, &nlen); - const char *port = luaL_checkstring(L, -1); - proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); + size_t plen = 0; + const char *label = luaL_checklstring(L, 1, &llen); + const char *name = luaL_checklstring(L, 2, &nlen); + const char *port = luaL_checklstring(L, 3, &plen); + + if (llen > MAX_LABELLEN-1) { + proxy_lua_error(L, "backend label too long"); + return 0; + } if (nlen > MAX_NAMELEN-1) { proxy_lua_error(L, "backend name too long"); return 0; } + if (plen > MAX_PORTLEN-1) { + proxy_lua_error(L, "backend port too long"); + return 0; + } + + mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0); + memset(be, 0, sizeof(*be)); + memcpy(be->label, label, llen); + be->label[llen] = '\0'; + memcpy(be->name, name, nlen); + be->name[nlen] = '\0'; + memcpy(be->port, port, plen); + be->port[plen] = '\0'; + be->llen = llen; + luaL_getmetatable(L, "mcp.backend"); + lua_setmetatable(L, -2); // set metatable to userdata. + + return 1; // return be object. +} + +static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) { // first check our reference table to compare. - lua_pushvalue(L, 1); + // Note: The upvalue won't be found unless we're running from a function with it + // set as an upvalue. + lua_pushlstring(L, bel->label, bel->llen); int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); if (ret != LUA_TNIL) { - mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend"); - if (strncmp(be_orig->name, name, MAX_NAMELEN) == 0 - && strncmp(be_orig->port, port, MAX_PORTLEN) == 0) { + mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap"); + if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0 + && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) { // backend is the same, return it. - return 1; + return be_orig; } else { // backend not the same, pop from stack and make new one. lua_pop(L, 1); } } else { - lua_pop(L, 1); + lua_pop(L, 1); // pop the nil. } - // This might shift to internal objects? - mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0); + return NULL; +} + +static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) { + // FIXME: remove global. + proxy_ctx_t *ctx = settings.proxy_ctx; - // FIXME (v2): remove some of the excess zero'ing below? - memset(be, 0, sizeof(mcp_backend_t)); - strncpy(be->name, name, MAX_NAMELEN); - strncpy(be->port, port, MAX_PORTLEN); - be->depth = 0; - be->rbufused = 0; - be->failed_count = 0; + mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0); + luaL_getmetatable(L, "mcp.backendwrap"); + lua_setmetatable(L, -2); // set metatable to userdata. + + mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t)); + if (be == NULL) { + proxy_lua_error(L, "out of memory allocating backend connection"); + return NULL; + } + bew->be = be; + + strncpy(be->name, bel->name, MAX_NAMELEN+1); + strncpy(be->port, bel->port, MAX_PORTLEN+1); STAILQ_INIT(&be->io_head); be->state = mcp_backend_read; - be->connecting = false; - be->can_write = false; - be->stacked = false; - be->bad = false; // this leaves a permanent buffer on the backend, which is fine // unless you have billions of backends. @@ -158,7 +221,7 @@ static int mcplib_backend(lua_State *L) { be->rbuf = malloc(READ_BUFFER_SIZE); if (be->rbuf == NULL) { proxy_lua_error(L, "out of memory allocating backend"); - return 0; + return NULL; } // initialize libevent. @@ -168,7 +231,7 @@ static int mcplib_backend(lua_State *L) { be->client = malloc(mcmc_size(MCMC_OPTION_BLANK)); if (be->client == NULL) { proxy_lua_error(L, "out of memory allocating backend"); - return 0; + return NULL; } // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction. // This is a trivial fix if we ensure a backend's owning event thread is @@ -203,18 +266,16 @@ static int mcplib_backend(lua_State *L) { } #endif - luaL_getmetatable(L, "mcp.backend"); - lua_setmetatable(L, -2); // set metatable to userdata. - - lua_pushvalue(L, 1); // put the label at the top for settable later. + // Add this new backend connection to the object cache. + lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable. lua_pushvalue(L, -2); // copy the backend reference to the top. - // set our new backend object into the reference table. + // set our new backend wrapper object into the reference table. lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); // stack is back to having backend on the top. STAT_INCR(ctx, backend_total, 1); - return 1; + return bew; } static int mcplib_pool_gc(lua_State *L) { @@ -360,14 +421,30 @@ static int mcplib_pool(lua_State *L) { lua_pushvalue(L, -1); // dupe self for reference. p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + // TODO (v2): move to after function check so we can find the right + // backend label to look up. // remember lua arrays are 1 indexed. for (int x = 1; x <= n; x++) { mcp_pool_be_t *s = &p->pool[x-1]; lua_geti(L, 1, x); // get next server into the stack. // If we bail here, the pool _gc() should handle releasing any backend // references we made so far. - s->be = luaL_checkudata(L, -1, "mcp.backend"); + mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend"); + + // check label for pre-existing backend conn/wrapper + mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel); + if (bew == NULL) { + bew = _mcplib_make_backendconn(L, bel); + } + s->be = bew->be; // unwrap the backend connection for direct ref. + + // If found from cache or made above, the backend wrapper is on the + // top of the stack, so we can now take its reference. + // The wrapper abstraction allows the be memory to be owned by its + // destination thread (IO thread/etc). + s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. + lua_pop(L, 1); // pop the mcp.backend label object. } // Allow passing an ignored nil as a second argument. Makes the lua easier @@ -876,11 +953,15 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { lua_State *L = state; const struct luaL_Reg mcplib_backend_m[] = { - {"set", NULL}, {"__gc", mcplib_backend_gc}, {NULL, NULL} }; + const struct luaL_Reg mcplib_backend_wrap_m[] = { + {"__gc", mcplib_backend_wrap_gc}, + {NULL, NULL} + }; + const struct luaL_Reg mcplib_request_m[] = { {"command", mcplib_request_command}, {"key", mcplib_request_key}, @@ -942,6 +1023,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { luaL_setfuncs(L, mcplib_backend_m, 0); // register methods lua_pop(L, 1); + luaL_newmetatable(L, "mcp.backendwrap"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods + lua_pop(L, 1); + luaL_newmetatable(L, "mcp.request"); lua_pushvalue(L, -1); // duplicate metatable. lua_setfield(L, -2, "__index"); // mt.__index = mt |