diff options
-rw-r--r-- | memcached.c | 4 | ||||
-rw-r--r-- | proto_proxy.c | 30 | ||||
-rw-r--r-- | proto_proxy.h | 10 | ||||
-rw-r--r-- | proto_text.c | 2 | ||||
-rw-r--r-- | proxy.h | 3 | ||||
-rw-r--r-- | proxy_config.c | 5 | ||||
-rw-r--r-- | proxy_lua.c | 26 | ||||
-rw-r--r-- | proxy_ustats.c | 2 | ||||
-rw-r--r-- | thread.c | 2 |
9 files changed, 43 insertions, 41 deletions
diff --git a/memcached.c b/memcached.c index 7871fe8..86ac064 100644 --- a/memcached.c +++ b/memcached.c @@ -1887,7 +1887,7 @@ void server_stats(ADD_STAT add_stats, conn *c) { storage_stats(add_stats, c); #endif #ifdef PROXY - proxy_stats(add_stats, c); + proxy_stats(settings.proxy_ctx, add_stats, c); #endif #ifdef TLS if (settings.ssl_enabled) { @@ -6093,7 +6093,7 @@ int main (int argc, char **argv) { /* start up worker threads if MT mode */ #ifdef PROXY if (settings.proxy_enabled) { - proxy_init(settings.proxy_uring); + settings.proxy_ctx = proxy_init(settings.proxy_uring); if (proxy_load_config(settings.proxy_ctx) != 0) { exit(EXIT_FAILURE); } diff --git a/proto_proxy.c b/proto_proxy.c index 985781b..d39b7b1 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -20,12 +20,11 @@ static void proxy_out_errstring(mc_resp *resp, const char *str); // functions starting with _ are breakouts for the public functions. // see also: process_extstore_stats() -// FIXME (v2): get context off of conn? global variables -void proxy_stats(ADD_STAT add_stats, conn *c) { - if (!settings.proxy_enabled) { +void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { + if (arg == NULL) { return; } - proxy_ctx_t *ctx = settings.proxy_ctx; + proxy_ctx_t *ctx = arg; STAT_L(ctx); APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads); @@ -36,14 +35,14 @@ void proxy_stats(ADD_STAT add_stats, conn *c) { STAT_UL(ctx); } -void process_proxy_stats(ADD_STAT add_stats, conn *c) { +void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) { char key_str[STAT_KEY_LEN]; struct proxy_int_stats istats = {0}; - if (!settings.proxy_enabled) { + if (!arg) { return; } - proxy_ctx_t *ctx = settings.proxy_ctx; + proxy_ctx_t *ctx = arg; STAT_L(ctx); // prepare aggregated counters. @@ -51,6 +50,7 @@ void process_proxy_stats(ADD_STAT add_stats, conn *c) { uint64_t counters[us->num_stats]; memset(counters, 0, sizeof(counters)); + // TODO (v3): more globals to remove and/or change API method. // aggregate worker thread counters. for (int x = 0; x < settings.num_threads; x++) { LIBEVENT_THREAD *t = get_worker_thread(x); @@ -99,10 +99,8 @@ void process_proxy_stats(ADD_STAT add_stats, conn *c) { } // start the centralized lua state and config thread. -// TODO (v2): return ctx ptr. avoid global vars. -void proxy_init(bool use_uring) { +void *proxy_init(bool use_uring) { proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t)); - settings.proxy_ctx = ctx; ctx->use_uring = use_uring; pthread_mutex_init(&ctx->config_lock, NULL); @@ -130,7 +128,7 @@ void proxy_init(bool use_uring) { ctx->proxy_state = L; luaL_openlibs(L); // NOTE: might need to differentiate the libs yes? - proxy_register_libs(NULL, L); + proxy_register_libs(ctx, NULL, L); // Create/start the backend threads, which we need before servers // start getting created. @@ -179,10 +177,14 @@ void proxy_init(bool use_uring) { } _start_proxy_config_threads(ctx); + return ctx; } // Initialize the VM for an individual worker thread. -void proxy_thread_init(LIBEVENT_THREAD *thr) { +void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) { + assert(ctx != NULL); + assert(thr != NULL); + // Create the hook table. thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook)); if (thr->proxy_hooks == NULL) { @@ -199,14 +201,14 @@ void proxy_thread_init(LIBEVENT_THREAD *thr) { lua_State *L = luaL_newstate(); thr->L = L; luaL_openlibs(L); - proxy_register_libs(thr, L); + proxy_register_libs(ctx, thr, L); // TODO: srand on time? do we need to bother? for (int x = 0; x < 3; x++) { thr->proxy_rng[x] = rand(); } // kick off the configuration. - if (proxy_thread_loadconf(thr) != 0) { + if (proxy_thread_loadconf(ctx, thr) != 0) { exit(EXIT_FAILURE); } } diff --git a/proto_proxy.h b/proto_proxy.h index 6d42e24..0b3d240 100644 --- a/proto_proxy.h +++ b/proto_proxy.h @@ -1,15 +1,15 @@ #ifndef PROTO_PROXY_H #define PROTO_PROXY_H -void proxy_stats(ADD_STAT add_stats, conn *c); -void process_proxy_stats(ADD_STAT add_stats, conn *c); +void proxy_stats(void *arg, ADD_STAT add_stats, conn *c); +void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c); /* proxy mode handlers */ int try_read_command_proxy(conn *c); void complete_nread_proxy(conn *c); void proxy_cleanup_conn(conn *c); -void proxy_thread_init(LIBEVENT_THREAD *thr); -void proxy_init(bool proxy_uring); +void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr); +void *proxy_init(bool proxy_uring); // TODO: need better names or a better interface for these. can be confusing // to reason about the order. void proxy_start_reload(void *arg); @@ -22,6 +22,6 @@ void proxy_return_cb(io_pending_t *pending); void proxy_finalize_cb(io_pending_t *pending); /* lua */ -int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx); +int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state); #endif diff --git a/proto_text.c b/proto_text.c index 6885786..dc6c624 100644 --- a/proto_text.c +++ b/proto_text.c @@ -795,7 +795,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { #endif #ifdef PROXY } else if (strcmp(subcommand, "proxy") == 0) { - process_proxy_stats(&append_stats, c); + process_proxy_stats(settings.proxy_ctx, &append_stats, c); #endif } else { /* getting here means that the subcommand is either engine specific or @@ -78,6 +78,7 @@ #define MCP_THREAD_UPVALUE 1 #define MCP_ATTACH_UPVALUE 2 #define MCP_BACKEND_UPVALUE 3 +#define MCP_CONTEXT_UPVALUE 4 // all possible commands. #define CMD_FIELDS \ @@ -476,7 +477,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p); void proxy_lua_error(lua_State *L, const char *s); void proxy_lua_ferror(lua_State *L, const char *fmt, ...); int _start_proxy_config_threads(proxy_ctx_t *ctx); -int proxy_thread_loadconf(LIBEVENT_THREAD *thr); +int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr); // TODO (v2): more .h files, perhaps? int mcplib_open_hash_xxhash(lua_State *L); diff --git a/proxy_config.c b/proxy_config.c index 3de1aac..16988d1 100644 --- a/proxy_config.c +++ b/proxy_config.c @@ -333,7 +333,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) { void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) { proxy_ctx_t *ctx = arg; pthread_mutex_lock(&ctx->worker_lock); - if (proxy_thread_loadconf(thr) != 0) { + if (proxy_thread_loadconf(ctx, thr) != 0) { ctx->worker_failed = true; } ctx->worker_done = true; @@ -343,10 +343,9 @@ void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) { // FIXME (v2): need to test how to recover from an actual error here. error message // needs to go somewhere useful, counters added, etc. -int proxy_thread_loadconf(LIBEVENT_THREAD *thr) { +int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) { lua_State *L = thr->L; // load the precompiled config function. - proxy_ctx_t *ctx = settings.proxy_ctx; struct _dumpbuf *db = ctx->proxy_code; struct _dumpbuf db2; // copy because the helper modifies it. memcpy(&db2, db, sizeof(struct _dumpbuf)); diff --git a/proxy_lua.c b/proxy_lua.c index 75bf62e..40936ec 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -98,8 +98,7 @@ static int mcplib_backend_gc(lua_State *L) { mcmc_disconnect(be->client); free(be->client); - // FIXME (v2): upvalue for global ctx. - proxy_ctx_t *ctx = settings.proxy_ctx; + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_DECR(ctx, backend_total, 1); return 0; @@ -110,8 +109,7 @@ static int mcplib_backend(lua_State *L) { size_t nlen = 0; const char *name = luaL_checklstring(L, -2, &nlen); const char *port = luaL_checkstring(L, -1); - // FIXME (v2): upvalue for global ctx. - proxy_ctx_t *ctx = settings.proxy_ctx; + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); if (nlen > MAX_NAMELEN-1) { proxy_lua_error(L, "backend name too long"); @@ -349,7 +347,7 @@ static int mcplib_pool(lua_State *L) { // TODO (v2): Nicer if this is fetched from mcp.default_key_hash p->key_hasher = XXH3_64bits_withSeed; pthread_mutex_init(&p->lock, NULL); - p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue. + p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); luaL_setmetatable(L, "mcp.pool"); @@ -515,7 +513,7 @@ static int mcplib_pool_proxy_call(lua_State *L) { static int mcplib_tcp_keepalive(lua_State *L) { luaL_checktype(L, -1, LUA_TBOOLEAN); int state = lua_toboolean(L, -1); - proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_L(ctx); ctx->tunables.tcp_keepalive = state; @@ -526,7 +524,7 @@ static int mcplib_tcp_keepalive(lua_State *L) { static int mcplib_backend_failure_limit(lua_State *L) { int limit = luaL_checkinteger(L, -1); - proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); if (limit < 0) { proxy_lua_error(L, "backend_failure_limit must be >= 0"); @@ -548,7 +546,7 @@ static int mcplib_backend_connect_timeout(lua_State *L) { lua_Number secondsf = luaL_checknumber(L, -1); lua_Integer secondsi = (lua_Integer) secondsf; lua_Number subseconds = secondsf - secondsi; - proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_L(ctx); ctx->tunables.connect.tv_sec = secondsi; @@ -566,7 +564,7 @@ static int mcplib_backend_retry_timeout(lua_State *L) { lua_Number secondsf = luaL_checknumber(L, -1); lua_Integer secondsi = (lua_Integer) secondsf; lua_Number subseconds = secondsf - secondsi; - proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_L(ctx); ctx->tunables.retry.tv_sec = secondsi; @@ -584,7 +582,7 @@ static int mcplib_backend_read_timeout(lua_State *L) { lua_Number secondsf = luaL_checknumber(L, -1); lua_Integer secondsi = (lua_Integer) secondsf; lua_Number subseconds = secondsf - secondsi; - proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_L(ctx); ctx->tunables.read.tv_sec = secondsi; @@ -866,8 +864,8 @@ static void proxy_register_defines(lua_State *L) { } // Creates and returns the top level "mcp" module -int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { - lua_State *L = ctx; +int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { + lua_State *L = state; const struct luaL_Reg mcplib_backend_m[] = { {"set", NULL}, @@ -989,7 +987,9 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"} lua_setmetatable(L, -2); // {__mt = {__mode = "v"} } - luaL_setfuncs(L, mcplib_f, 3); // store upvalues. + lua_pushlightuserdata(L, ctx); // upvalue for proxy context. + + luaL_setfuncs(L, mcplib_f, 4); // store upvalues. lua_setglobal(L, "mcp"); // set the lib table to mcp global. return 1; diff --git a/proxy_ustats.c b/proxy_ustats.c index 339ce89..7a429bd 100644 --- a/proxy_ustats.c +++ b/proxy_ustats.c @@ -36,7 +36,7 @@ int mcplib_add_stat(lua_State *L) { } } - proxy_ctx_t *ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue. + proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); STAT_L(ctx); struct proxy_user_stats *us = &ctx->user_stats; @@ -490,7 +490,7 @@ static void setup_thread(LIBEVENT_THREAD *me) { // TODO: maybe register hooks to be called here from sub-packages? ie; // extstore, TLS, proxy. if (settings.proxy_enabled) { - proxy_thread_init(me); + proxy_thread_init(settings.proxy_ctx, me); } #endif thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); |