summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.c4
-rw-r--r--proto_proxy.c30
-rw-r--r--proto_proxy.h10
-rw-r--r--proto_text.c2
-rw-r--r--proxy.h3
-rw-r--r--proxy_config.c5
-rw-r--r--proxy_lua.c26
-rw-r--r--proxy_ustats.c2
-rw-r--r--thread.c2
9 files changed, 43 insertions, 41 deletions
diff --git a/memcached.c b/memcached.c
index 7871fe8..86ac064 100644
--- a/memcached.c
+++ b/memcached.c
@@ -1887,7 +1887,7 @@ void server_stats(ADD_STAT add_stats, conn *c) {
storage_stats(add_stats, c);
#endif
#ifdef PROXY
- proxy_stats(add_stats, c);
+ proxy_stats(settings.proxy_ctx, add_stats, c);
#endif
#ifdef TLS
if (settings.ssl_enabled) {
@@ -6093,7 +6093,7 @@ int main (int argc, char **argv) {
/* start up worker threads if MT mode */
#ifdef PROXY
if (settings.proxy_enabled) {
- proxy_init(settings.proxy_uring);
+ settings.proxy_ctx = proxy_init(settings.proxy_uring);
if (proxy_load_config(settings.proxy_ctx) != 0) {
exit(EXIT_FAILURE);
}
diff --git a/proto_proxy.c b/proto_proxy.c
index 985781b..d39b7b1 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -20,12 +20,11 @@ static void proxy_out_errstring(mc_resp *resp, const char *str);
// functions starting with _ are breakouts for the public functions.
// see also: process_extstore_stats()
-// FIXME (v2): get context off of conn? global variables
-void proxy_stats(ADD_STAT add_stats, conn *c) {
- if (!settings.proxy_enabled) {
+void proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
+ if (arg == NULL) {
return;
}
- proxy_ctx_t *ctx = settings.proxy_ctx;
+ proxy_ctx_t *ctx = arg;
STAT_L(ctx);
APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
@@ -36,14 +35,14 @@ void proxy_stats(ADD_STAT add_stats, conn *c) {
STAT_UL(ctx);
}
-void process_proxy_stats(ADD_STAT add_stats, conn *c) {
+void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
struct proxy_int_stats istats = {0};
- if (!settings.proxy_enabled) {
+ if (!arg) {
return;
}
- proxy_ctx_t *ctx = settings.proxy_ctx;
+ proxy_ctx_t *ctx = arg;
STAT_L(ctx);
// prepare aggregated counters.
@@ -51,6 +50,7 @@ void process_proxy_stats(ADD_STAT add_stats, conn *c) {
uint64_t counters[us->num_stats];
memset(counters, 0, sizeof(counters));
+ // TODO (v3): more globals to remove and/or change API method.
// aggregate worker thread counters.
for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *t = get_worker_thread(x);
@@ -99,10 +99,8 @@ void process_proxy_stats(ADD_STAT add_stats, conn *c) {
}
// start the centralized lua state and config thread.
-// TODO (v2): return ctx ptr. avoid global vars.
-void proxy_init(bool use_uring) {
+void *proxy_init(bool use_uring) {
proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
- settings.proxy_ctx = ctx;
ctx->use_uring = use_uring;
pthread_mutex_init(&ctx->config_lock, NULL);
@@ -130,7 +128,7 @@ void proxy_init(bool use_uring) {
ctx->proxy_state = L;
luaL_openlibs(L);
// NOTE: might need to differentiate the libs yes?
- proxy_register_libs(NULL, L);
+ proxy_register_libs(ctx, NULL, L);
// Create/start the backend threads, which we need before servers
// start getting created.
@@ -179,10 +177,14 @@ void proxy_init(bool use_uring) {
}
_start_proxy_config_threads(ctx);
+ return ctx;
}
// Initialize the VM for an individual worker thread.
-void proxy_thread_init(LIBEVENT_THREAD *thr) {
+void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
+ assert(ctx != NULL);
+ assert(thr != NULL);
+
// Create the hook table.
thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook));
if (thr->proxy_hooks == NULL) {
@@ -199,14 +201,14 @@ void proxy_thread_init(LIBEVENT_THREAD *thr) {
lua_State *L = luaL_newstate();
thr->L = L;
luaL_openlibs(L);
- proxy_register_libs(thr, L);
+ proxy_register_libs(ctx, thr, L);
// TODO: srand on time? do we need to bother?
for (int x = 0; x < 3; x++) {
thr->proxy_rng[x] = rand();
}
// kick off the configuration.
- if (proxy_thread_loadconf(thr) != 0) {
+ if (proxy_thread_loadconf(ctx, thr) != 0) {
exit(EXIT_FAILURE);
}
}
diff --git a/proto_proxy.h b/proto_proxy.h
index 6d42e24..0b3d240 100644
--- a/proto_proxy.h
+++ b/proto_proxy.h
@@ -1,15 +1,15 @@
#ifndef PROTO_PROXY_H
#define PROTO_PROXY_H
-void proxy_stats(ADD_STAT add_stats, conn *c);
-void process_proxy_stats(ADD_STAT add_stats, conn *c);
+void proxy_stats(void *arg, ADD_STAT add_stats, conn *c);
+void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c);
/* proxy mode handlers */
int try_read_command_proxy(conn *c);
void complete_nread_proxy(conn *c);
void proxy_cleanup_conn(conn *c);
-void proxy_thread_init(LIBEVENT_THREAD *thr);
-void proxy_init(bool proxy_uring);
+void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr);
+void *proxy_init(bool proxy_uring);
// TODO: need better names or a better interface for these. can be confusing
// to reason about the order.
void proxy_start_reload(void *arg);
@@ -22,6 +22,6 @@ void proxy_return_cb(io_pending_t *pending);
void proxy_finalize_cb(io_pending_t *pending);
/* lua */
-int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx);
+int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state);
#endif
diff --git a/proto_text.c b/proto_text.c
index 6885786..dc6c624 100644
--- a/proto_text.c
+++ b/proto_text.c
@@ -795,7 +795,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
#endif
#ifdef PROXY
} else if (strcmp(subcommand, "proxy") == 0) {
- process_proxy_stats(&append_stats, c);
+ process_proxy_stats(settings.proxy_ctx, &append_stats, c);
#endif
} else {
/* getting here means that the subcommand is either engine specific or
diff --git a/proxy.h b/proxy.h
index 015c093..47d7dd8 100644
--- a/proxy.h
+++ b/proxy.h
@@ -78,6 +78,7 @@
#define MCP_THREAD_UPVALUE 1
#define MCP_ATTACH_UPVALUE 2
#define MCP_BACKEND_UPVALUE 3
+#define MCP_CONTEXT_UPVALUE 4
// all possible commands.
#define CMD_FIELDS \
@@ -476,7 +477,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
void proxy_lua_error(lua_State *L, const char *s);
void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
int _start_proxy_config_threads(proxy_ctx_t *ctx);
-int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
+int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr);
// TODO (v2): more .h files, perhaps?
int mcplib_open_hash_xxhash(lua_State *L);
diff --git a/proxy_config.c b/proxy_config.c
index 3de1aac..16988d1 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -333,7 +333,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
proxy_ctx_t *ctx = arg;
pthread_mutex_lock(&ctx->worker_lock);
- if (proxy_thread_loadconf(thr) != 0) {
+ if (proxy_thread_loadconf(ctx, thr) != 0) {
ctx->worker_failed = true;
}
ctx->worker_done = true;
@@ -343,10 +343,9 @@ void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
// FIXME (v2): need to test how to recover from an actual error here. error message
// needs to go somewhere useful, counters added, etc.
-int proxy_thread_loadconf(LIBEVENT_THREAD *thr) {
+int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
lua_State *L = thr->L;
// load the precompiled config function.
- proxy_ctx_t *ctx = settings.proxy_ctx;
struct _dumpbuf *db = ctx->proxy_code;
struct _dumpbuf db2; // copy because the helper modifies it.
memcpy(&db2, db, sizeof(struct _dumpbuf));
diff --git a/proxy_lua.c b/proxy_lua.c
index 75bf62e..40936ec 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -98,8 +98,7 @@ static int mcplib_backend_gc(lua_State *L) {
mcmc_disconnect(be->client);
free(be->client);
- // FIXME (v2): upvalue for global ctx.
- proxy_ctx_t *ctx = settings.proxy_ctx;
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_DECR(ctx, backend_total, 1);
return 0;
@@ -110,8 +109,7 @@ static int mcplib_backend(lua_State *L) {
size_t nlen = 0;
const char *name = luaL_checklstring(L, -2, &nlen);
const char *port = luaL_checkstring(L, -1);
- // FIXME (v2): upvalue for global ctx.
- proxy_ctx_t *ctx = settings.proxy_ctx;
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
if (nlen > MAX_NAMELEN-1) {
proxy_lua_error(L, "backend name too long");
@@ -349,7 +347,7 @@ static int mcplib_pool(lua_State *L) {
// TODO (v2): Nicer if this is fetched from mcp.default_key_hash
p->key_hasher = XXH3_64bits_withSeed;
pthread_mutex_init(&p->lock, NULL);
- p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue.
+ p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
luaL_setmetatable(L, "mcp.pool");
@@ -515,7 +513,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
static int mcplib_tcp_keepalive(lua_State *L) {
luaL_checktype(L, -1, LUA_TBOOLEAN);
int state = lua_toboolean(L, -1);
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_L(ctx);
ctx->tunables.tcp_keepalive = state;
@@ -526,7 +524,7 @@ static int mcplib_tcp_keepalive(lua_State *L) {
static int mcplib_backend_failure_limit(lua_State *L) {
int limit = luaL_checkinteger(L, -1);
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
if (limit < 0) {
proxy_lua_error(L, "backend_failure_limit must be >= 0");
@@ -548,7 +546,7 @@ static int mcplib_backend_connect_timeout(lua_State *L) {
lua_Number secondsf = luaL_checknumber(L, -1);
lua_Integer secondsi = (lua_Integer) secondsf;
lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_L(ctx);
ctx->tunables.connect.tv_sec = secondsi;
@@ -566,7 +564,7 @@ static int mcplib_backend_retry_timeout(lua_State *L) {
lua_Number secondsf = luaL_checknumber(L, -1);
lua_Integer secondsi = (lua_Integer) secondsf;
lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_L(ctx);
ctx->tunables.retry.tv_sec = secondsi;
@@ -584,7 +582,7 @@ static int mcplib_backend_read_timeout(lua_State *L) {
lua_Number secondsf = luaL_checknumber(L, -1);
lua_Integer secondsi = (lua_Integer) secondsf;
lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_L(ctx);
ctx->tunables.read.tv_sec = secondsi;
@@ -866,8 +864,8 @@ static void proxy_register_defines(lua_State *L) {
}
// Creates and returns the top level "mcp" module
-int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
- lua_State *L = ctx;
+int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
+ lua_State *L = state;
const struct luaL_Reg mcplib_backend_m[] = {
{"set", NULL},
@@ -989,7 +987,9 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"}
lua_setmetatable(L, -2); // {__mt = {__mode = "v"} }
- luaL_setfuncs(L, mcplib_f, 3); // store upvalues.
+ lua_pushlightuserdata(L, ctx); // upvalue for proxy context.
+
+ luaL_setfuncs(L, mcplib_f, 4); // store upvalues.
lua_setglobal(L, "mcp"); // set the lib table to mcp global.
return 1;
diff --git a/proxy_ustats.c b/proxy_ustats.c
index 339ce89..7a429bd 100644
--- a/proxy_ustats.c
+++ b/proxy_ustats.c
@@ -36,7 +36,7 @@ int mcplib_add_stat(lua_State *L) {
}
}
- proxy_ctx_t *ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue.
+ proxy_ctx_t *ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
STAT_L(ctx);
struct proxy_user_stats *us = &ctx->user_stats;
diff --git a/thread.c b/thread.c
index d5ed052..bd1dc9f 100644
--- a/thread.c
+++ b/thread.c
@@ -490,7 +490,7 @@ static void setup_thread(LIBEVENT_THREAD *me) {
// TODO: maybe register hooks to be called here from sub-packages? ie;
// extstore, TLS, proxy.
if (settings.proxy_enabled) {
- proxy_thread_init(me);
+ proxy_thread_init(settings.proxy_ctx, me);
}
#endif
thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);