summaryrefslogtreecommitdiff
path: root/proxy_lua.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_lua.c')
-rw-r--r--proxy_lua.c147
1 files changed, 112 insertions, 35 deletions
diff --git a/proxy_lua.c b/proxy_lua.c
index e6b50ae..aeaf1e5 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) {
// Since we're running in the config thread it could just busy poll
// until the connection was picked up.
assert(be->transferred);
- proxy_event_thread_t *e = ctx->proxy_threads;
+ proxy_event_thread_t *e = be->event_thread;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) {
return 1; // return be object.
}
+// Called with the cache label at top of the stack.
static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
// Note: The upvalue won't be found unless we're running from a function with it
// set as an upvalue.
- lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
@@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_
return NULL;
}
-static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
+ proxy_event_thread_t *e) {
// FIXME: remove global.
proxy_ctx_t *ctx = settings.proxy_ctx;
@@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
STAT_UL(ctx);
be->connect_flags = flags;
- proxy_event_thread_t *e = ctx->proxy_threads;
+ be->event_thread = e;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
}
#endif
+ lua_pushvalue(L, 4); // push the label string back to the top.
// Add this new backend connection to the object cache.
- lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
// set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
@@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
// UD now popped from stack.
}
-// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-static int mcplib_pool(lua_State *L) {
- int argc = lua_gettop(L);
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = luaL_len(L, 1); // get length of array table
-
- size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
- mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
- // Zero the memory before use, so we can realibly use __gc to clean up
- memset(p, 0, plen);
- p->pool_size = n;
- // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
- p->key_hasher = XXH3_64bits_withSeed;
- pthread_mutex_init(&p->lock, NULL);
- p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
-
- luaL_setmetatable(L, "mcp.pool");
-
- lua_pushvalue(L, -1); // dupe self for reference.
- p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- // TODO (v2): move to after function check so we can find the right
- // backend label to look up.
+// in the proxy object, we can alias a ptr to the pool to where it needs to be
+// based on worker number or io_thread right?
+static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
// remember lua arrays are 1 indexed.
- for (int x = 1; x <= n; x++) {
- mcp_pool_be_t *s = &p->pool[x-1];
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
// check label for pre-existing backend conn/wrapper
+ // TODO (v2): there're native ways of "from C make lua strings"
+ int toconcat = 1;
+ if (p->beprefix[0] != '\0') {
+ lua_pushstring(L, p->beprefix);
+ toconcat++;
+ }
+ if (p->use_iothread) {
+ lua_pushstring(L, ":io:");
+ toconcat++;
+ } else {
+ lua_pushstring(L, ":w");
+ lua_pushinteger(L, offset);
+ lua_pushstring(L, ":");
+ toconcat += 3;
+ }
+ lua_pushlstring(L, bel->label, bel->llen);
+ lua_concat(L, toconcat);
+
+ lua_pushvalue(L, -1); // copy the label string for the create method.
mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
if (bew == NULL) {
- bew = _mcplib_make_backendconn(L, bel);
+ bew = _mcplib_make_backendconn(L, bel, t);
}
s->be = bew->be; // unwrap the backend connection for direct ref.
+ bew->be->use_io_thread = p->use_iothread;
// If found from cache or made above, the backend wrapper is on the
// top of the stack, so we can now take its reference.
@@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) {
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
lua_pop(L, 1); // pop the mcp.backend label object.
+ lua_pop(L, 1); // drop extra label copy.
+ }
+}
+
+// call with table of backends in 1
+static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
+ if (p->use_iothread) {
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
+ } else {
+ // TODO (v3) globals.
+ for (int n = 0; n < settings.num_threads; n++) {
+ LIBEVENT_THREAD *t = get_worker_thread(n);
+ _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
+ }
}
+}
+
+// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
+static int mcplib_pool(lua_State *L) {
+ int argc = lua_gettop(L);
+ luaL_checktype(L, 1, LUA_TTABLE);
+ int n = luaL_len(L, 1); // get length of array table
+ int workers = settings.num_threads; // TODO (v3): globals usage.
+
+ size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
+ mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
+ // Zero the memory before use, so we can realibly use __gc to clean up
+ memset(p, 0, plen);
+ p->pool_size = n;
+ p->use_iothread = true;
+ // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
+ p->key_hasher = XXH3_64bits_withSeed;
+ pthread_mutex_init(&p->lock, NULL);
+ p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+
+ luaL_setmetatable(L, "mcp.pool");
+
+ lua_pushvalue(L, -1); // dupe self for reference.
+ p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// Allow passing an ignored nil as a second argument. Makes the lua easier
int type = lua_type(L, 2);
if (argc == 1 || type == LUA_TNIL) {
+ _mcplib_pool_make_be(L, p);
lua_getglobal(L, "mcp");
// TODO (v2): decide on a mcp.default_dist and use that instead
if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
@@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) {
// pool, then pass it along to the a constructor if necessary.
luaL_checktype(L, 2, LUA_TTABLE);
+ if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TBOOLEAN);
+ int use_iothread = lua_toboolean(L, -1);
+ if (use_iothread) {
+ p->use_iothread = true;
+ } else {
+ p->use_iothread = false;
+ }
+ lua_pop(L, 1); // remove value.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+
+ if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ size_t len = 0;
+ const char *bepfx = lua_tolstring(L, -1, &len);
+ memcpy(p->beprefix, bepfx, len);
+ p->beprefix[len+1] = '\0';
+ lua_pop(L, 1); // pop beprefix string.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+ _mcplib_pool_make_be(L, p);
+
// stack: backends, options, mcp.pool
if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
// overriding the distribution function.
@@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) {
lua_pop(L, 1); // remove the dist table from stack.
} else {
lua_pop(L, 1); // pop the nil.
+
+ // use the default dist if not specified with an override table.
+ lua_getglobal(L, "mcp");
+ // TODO (v2): decide on a mcp.default_dist and use that instead
+ if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // pop "dist_jump_hash" value.
+ } else {
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // pop "mcp"
}
if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
@@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) {
return 0;
}
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) {
+ mcp_pool_t *p = pp->main;
if (p->key_filter) {
key = p->key_filter(p->key_filter_conf, key, len, &len);
P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
@@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
}
- return p->pool[lookup].be;
+ return pp->pool[lookup].be;
}
// hashfunc(request) -> backend(request)
@@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
static int mcplib_pool_proxy_call(lua_State *L) {
// internal args are the hash selector (self)
mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
// then request object.
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
@@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
}
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
// now yield request, pool up.
lua_pushinteger(L, MCP_YIELD_POOL);