summaryrefslogtreecommitdiff
path: root/proxy_lua.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_lua.c')
-rw-r--r--proxy_lua.c171
1 files changed, 129 insertions, 42 deletions
diff --git a/proxy_lua.c b/proxy_lua.c
index 2172733..7fa07bb 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -90,67 +90,130 @@ static int mcplib_response_gc(lua_State *L) {
// To free a backend: All proxies for a pool are collected, then the central
// pool is collected, which releases backend references, which allows backend
// to be collected.
-static int mcplib_backend_gc(lua_State *L) {
- mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
-
- assert(STAILQ_EMPTY(&be->io_head));
+static int mcplib_backend_wrap_gc(lua_State *L) {
+ mcp_backend_wrap_t *bew = luaL_checkudata(L, -1, "mcp.backendwrap");
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- mcmc_disconnect(be->client);
+ if (bew->be != NULL) {
+ mcp_backend_t *be = bew->be;
+ // TODO (v3): technically a race where a backend could be created,
+ // queued, but not picked up before being gc'ed again. In practice
+ // this is impossible but at some point we should close the loop here.
+ // Since we're running in the config thread it could just busy poll
+ // until the connection was picked up.
+ assert(be->transferred);
+ proxy_event_thread_t *e = ctx->proxy_threads;
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->be_notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
+#endif
+ }
- // FIXME (v2): can we ensure a backend always has be->event_thread set, so
- // we can use be->event_thread->ctx?
- proxy_ctx_t *ctx = settings.proxy_ctx;
STAT_DECR(ctx, backend_total, 1);
- free(be->client);
return 0;
}
+static int mcplib_backend_gc(lua_State *L) {
+ return 0; // no-op.
+}
+
+// backend label object; given to pools which then find or create backend
+// objects as necessary.
static int mcplib_backend(lua_State *L) {
- luaL_checkstring(L, -3); // label for indexing backends.
+ size_t llen = 0;
size_t nlen = 0;
- const char *name = luaL_checklstring(L, -2, &nlen);
- const char *port = luaL_checkstring(L, -1);
- proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+ size_t plen = 0;
+ const char *label = luaL_checklstring(L, 1, &llen);
+ const char *name = luaL_checklstring(L, 2, &nlen);
+ const char *port = luaL_checklstring(L, 3, &plen);
+
+ if (llen > MAX_LABELLEN-1) {
+ proxy_lua_error(L, "backend label too long");
+ return 0;
+ }
if (nlen > MAX_NAMELEN-1) {
proxy_lua_error(L, "backend name too long");
return 0;
}
+ if (plen > MAX_PORTLEN-1) {
+ proxy_lua_error(L, "backend port too long");
+ return 0;
+ }
+
+ mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
+ memset(be, 0, sizeof(*be));
+ memcpy(be->label, label, llen);
+ be->label[llen] = '\0';
+ memcpy(be->name, name, nlen);
+ be->name[nlen] = '\0';
+ memcpy(be->port, port, plen);
+ be->port[plen] = '\0';
+ be->llen = llen;
+ luaL_getmetatable(L, "mcp.backend");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ return 1; // return be object.
+}
+
+static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
- lua_pushvalue(L, 1);
+ // Note: The upvalue won't be found unless we're running from a function with it
+ // set as an upvalue.
+ lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
- mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend");
- if (strncmp(be_orig->name, name, MAX_NAMELEN) == 0
- && strncmp(be_orig->port, port, MAX_PORTLEN) == 0) {
+ mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
+ if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
+ && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) {
// backend is the same, return it.
- return 1;
+ return be_orig;
} else {
// backend not the same, pop from stack and make new one.
lua_pop(L, 1);
}
} else {
- lua_pop(L, 1);
+ lua_pop(L, 1); // pop the nil.
}
- // This might shift to internal objects?
- mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0);
+ return NULL;
+}
+
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+ // FIXME: remove global.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
- // FIXME (v2): remove some of the excess zero'ing below?
- memset(be, 0, sizeof(mcp_backend_t));
- strncpy(be->name, name, MAX_NAMELEN);
- strncpy(be->port, port, MAX_PORTLEN);
- be->depth = 0;
- be->rbufused = 0;
- be->failed_count = 0;
+ mcp_backend_wrap_t *bew = lua_newuserdatauv(L, sizeof(mcp_backend_wrap_t), 0);
+ luaL_getmetatable(L, "mcp.backendwrap");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ mcp_backend_t *be = calloc(1, sizeof(mcp_backend_t));
+ if (be == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend connection");
+ return NULL;
+ }
+ bew->be = be;
+
+ strncpy(be->name, bel->name, MAX_NAMELEN+1);
+ strncpy(be->port, bel->port, MAX_PORTLEN+1);
STAILQ_INIT(&be->io_head);
be->state = mcp_backend_read;
- be->connecting = false;
- be->can_write = false;
- be->stacked = false;
- be->bad = false;
// this leaves a permanent buffer on the backend, which is fine
// unless you have billions of backends.
@@ -158,7 +221,7 @@ static int mcplib_backend(lua_State *L) {
be->rbuf = malloc(READ_BUFFER_SIZE);
if (be->rbuf == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// initialize libevent.
@@ -168,7 +231,7 @@ static int mcplib_backend(lua_State *L) {
be->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
if (be->client == NULL) {
proxy_lua_error(L, "out of memory allocating backend");
- return 0;
+ return NULL;
}
// TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
// This is a trivial fix if we ensure a backend's owning event thread is
@@ -203,18 +266,16 @@ static int mcplib_backend(lua_State *L) {
}
#endif
- luaL_getmetatable(L, "mcp.backend");
- lua_setmetatable(L, -2); // set metatable to userdata.
-
- lua_pushvalue(L, 1); // put the label at the top for settable later.
+ // Add this new backend connection to the object cache.
+ lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
- // set our new backend object into the reference table.
+ // set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
// stack is back to having backend on the top.
STAT_INCR(ctx, backend_total, 1);
- return 1;
+ return bew;
}
static int mcplib_pool_gc(lua_State *L) {
@@ -360,14 +421,30 @@ static int mcplib_pool(lua_State *L) {
lua_pushvalue(L, -1); // dupe self for reference.
p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ // TODO (v2): move to after function check so we can find the right
+ // backend label to look up.
// remember lua arrays are 1 indexed.
for (int x = 1; x <= n; x++) {
mcp_pool_be_t *s = &p->pool[x-1];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
- s->be = luaL_checkudata(L, -1, "mcp.backend");
+ mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
+
+ // check label for pre-existing backend conn/wrapper
+ mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
+ if (bew == NULL) {
+ bew = _mcplib_make_backendconn(L, bel);
+ }
+ s->be = bew->be; // unwrap the backend connection for direct ref.
+
+ // If found from cache or made above, the backend wrapper is on the
+ // top of the stack, so we can now take its reference.
+ // The wrapper abstraction allows the be memory to be owned by its
+ // destination thread (IO thread/etc).
+
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
+ lua_pop(L, 1); // pop the mcp.backend label object.
}
// Allow passing an ignored nil as a second argument. Makes the lua easier
@@ -876,11 +953,15 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
lua_State *L = state;
const struct luaL_Reg mcplib_backend_m[] = {
- {"set", NULL},
{"__gc", mcplib_backend_gc},
{NULL, NULL}
};
+ const struct luaL_Reg mcplib_backend_wrap_m[] = {
+ {"__gc", mcplib_backend_wrap_gc},
+ {NULL, NULL}
+ };
+
const struct luaL_Reg mcplib_request_m[] = {
{"command", mcplib_request_command},
{"key", mcplib_request_key},
@@ -942,6 +1023,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
lua_pop(L, 1);
+ luaL_newmetatable(L, "mcp.backendwrap");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_backend_wrap_m, 0); // register methods
+ lua_pop(L, 1);
+
luaL_newmetatable(L, "mcp.request");
lua_pushvalue(L, -1); // duplicate metatable.
lua_setfield(L, -2, "__index"); // mt.__index = mt