summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-13 15:22:26 -0800
committerdormando <dormando@rydia.net>2023-02-24 17:43:54 -0800
commit6442017c545a2a5ad076697b8695cd64bd32b542 (patch)
treea95c5443a72f5cfbe5b1c32fe5cf552da3201b0c
parent833a7234bbaed264a9973141850a23df4eb1b939 (diff)
downloadmemcached-6442017c545a2a5ad076697b8695cd64bd32b542.tar.gz
proxy: allow workers to run IO optionally
`mcp.pool(p, { dist = etc, iothread = true }` By default the IO thread is not used; instead a backend connection is created for each worker thread. This can be overridden by setting `iothread = true` when creating a pool. `mcp.pool(p, { dist = etc, beprefix = "etc" }` If a `beprefix` is added to pool arguments, it will create unique backend connections for this pool. This allows you to create multiple sockets per backend by making multiple pools with unique prefixes. There are legitimate use cases for sharing backend connections across different pools, which is why that is the default behavior.
-rw-r--r--memcached.c11
-rw-r--r--memcached.h2
-rw-r--r--proto_proxy.c149
-rw-r--r--proto_proxy.h1
-rw-r--r--proxy.h12
-rw-r--r--proxy_await.c9
-rw-r--r--proxy_config.c48
-rw-r--r--proxy_lua.c147
-rw-r--r--proxy_network.c150
-rw-r--r--t/proxyconfig.lua13
-rw-r--r--t/proxyconfig.t101
-rw-r--r--thread.c1
12 files changed, 470 insertions, 174 deletions
diff --git a/memcached.c b/memcached.c
index adaf162..70cc8e3 100644
--- a/memcached.c
+++ b/memcached.c
@@ -6053,9 +6053,6 @@ int main (int argc, char **argv) {
#ifdef PROXY
if (settings.proxy_enabled) {
settings.proxy_ctx = proxy_init(settings.proxy_uring);
- if (proxy_load_config(settings.proxy_ctx) != 0) {
- exit(EXIT_FAILURE);
- }
}
#endif
#ifdef EXTSTORE
@@ -6067,6 +6064,14 @@ int main (int argc, char **argv) {
init_lru_crawler(NULL);
#endif
+#ifdef PROXY
+ if (settings.proxy_enabled) {
+ if (proxy_first_confload(settings.proxy_ctx) != 0) {
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
+
if (start_assoc_maint && start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
diff --git a/memcached.h b/memcached.h
index 4790c75..79b9a81 100644
--- a/memcached.h
+++ b/memcached.h
@@ -703,6 +703,7 @@ typedef struct {
int notify_send_fd; /* sending end of notify pipe */
#endif
int cur_sfd; /* client fd for logging commands */
+ int thread_baseid; /* which "number" thread this is for data offsets */
struct thread_stats stats; /* Stats generated by this thread */
io_queue_cb_t io_queues[IO_QUEUE_COUNT];
struct conn_queue *ev_queue; /* Worker/conn event queue */
@@ -723,6 +724,7 @@ typedef struct {
void *proxy_hooks;
void *proxy_user_stats;
void *proxy_int_stats;
+ void *proxy_event_thread; // worker threads can also be proxy IO threads
uint32_t proxy_rng[4]; // fast per-thread rng for lua.
// TODO: add ctx object so we can attach to queue.
#endif
diff --git a/proto_proxy.c b/proto_proxy.c
index 535800d..1acc920 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) {
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(ctx, NULL, L);
- // Create/start the backend threads, which we need before servers
+ // Create/start the IO thread, which we need before servers
// start getting created.
- // Supporting N event threads should be possible, but it will be a
- // low number of N to avoid too many wakeup syscalls.
- // For now we hardcode to 1.
- proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
- ctx->proxy_threads = threads;
- for (int i = 0; i < 1; i++) {
- proxy_event_thread_t *t = &threads[i];
- t->ctx = ctx;
-#ifdef USE_EVENTFD
- t->event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
- t->be_event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->be_event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
-#else
- int fds[2];
- if (pipe(fds)) {
- perror("can't create proxy backend notify pipe");
- exit(1);
- }
-
- t->notify_receive_fd = fds[0];
- t->notify_send_fd = fds[1];
-
- if (pipe(fds)) {
- perror("can't create proxy backend connection notify pipe");
- exit(1);
- }
- t->be_notify_receive_fd = fds[0];
- t->be_notify_send_fd = fds[1];
-#endif
- proxy_init_evthread_events(t);
-
- // incoming request queue.
- STAILQ_INIT(&t->io_head_in);
- STAILQ_INIT(&t->beconn_head_in);
- pthread_mutex_init(&t->mutex, NULL);
- pthread_cond_init(&t->cond, NULL);
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ ctx->proxy_io_thread = t;
+ proxy_init_event_thread(t, ctx, NULL);
#ifdef HAVE_LIBURING
- if (t->use_uring) {
- pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
- } else {
- pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
- }
-#else
+ if (t->use_uring) {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
+ } else {
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
-#endif // HAVE_LIBURING
- thread_setname(t->thread_id, "mc-prx-io");
}
+#else
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+#endif // HAVE_LIBURING
+ thread_setname(t->thread_id, "mc-prx-io");
_start_proxy_config_threads(ctx);
return ctx;
@@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
thr->proxy_rng[x] = rand();
}
- // kick off the configuration.
- if (proxy_thread_loadconf(ctx, thr) != 0) {
- exit(EXIT_FAILURE);
- }
+ // Create a proxy event thread structure to piggyback on the worker.
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ thr->proxy_event_thread = t;
+ proxy_init_event_thread(t, ctx, thr->base);
}
// ctx_stack is a stack of io_pending_proxy_t's.
void proxy_submit_cb(io_queue_t *q) {
- proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
+ proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
io_pending_proxy_t *p = q->stack_ctx;
io_head_t head;
+ be_head_t w_head; // worker local stack.
STAILQ_INIT(&head);
+ STAILQ_INIT(&w_head);
// NOTE: responses get returned in the correct order no matter what, since
// mc_resp's are linked.
@@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) {
// So for now we build the secondary list with an STAILQ, which
// can be transplanted/etc.
while (p) {
- // insert into tail so head is oldest request.
- STAILQ_INSERT_TAIL(&head, p, io_next);
+ mcp_backend_t *be;
+ P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
if (p->is_await) {
// need to not count await objects multiple times.
- if (p->await_first) {
+ if (p->await_background) {
+ P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p);
+ // intercept await backgrounds
+ // this call cannot recurse if we're on the worker thread,
+ // since the worker thread has to finish executing this
+ // function in order to pick up the returned IO.
+ q->count++;
+ return_io_pending((io_pending_t *)p);
+ p = p->next;
+ continue;
+ } else if (p->await_first) {
q->count++;
}
// funny workaround: awaiting IOP's don't count toward
@@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) {
} else {
q->count++;
}
+ be = p->backend;
+
+ if (be->use_io_thread) {
+ // insert into tail so head is oldest request.
+ STAILQ_INSERT_TAIL(&head, p, io_next);
+ } else {
+ // emulate some of handler_dequeue()
+ STAILQ_INSERT_TAIL(&be->io_head, p, io_next);
+ if (be->io_next == NULL) {
+ be->io_next = p;
+ }
+ be->depth++;
+ if (!be->stacked) {
+ be->stacked = true;
+ be->be_next.stqe_next = NULL; // paranoia
+ STAILQ_INSERT_TAIL(&w_head, be, be_next);
+ }
+ }
p = p->next;
}
@@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) {
// clear out the submit queue so we can re-queue new IO's inline.
q->stack_ctx = NULL;
- // Transfer request stack to event thread.
- pthread_mutex_lock(&e->mutex);
- STAILQ_CONCAT(&e->io_head_in, &head);
- // No point in holding the lock since we're not doing a cond signal.
- pthread_mutex_unlock(&e->mutex);
+ if (!STAILQ_EMPTY(&head)) {
+ P_DEBUG("%s: submitting queue to IO thread\n", __func__);
+ // Transfer request stack to event thread.
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_CONCAT(&e->io_head_in, &head);
+ // No point in holding the lock since we're not doing a cond signal.
+ pthread_mutex_unlock(&e->mutex);
- // Signal to check queue.
+ // Signal to check queue.
#ifdef USE_EVENTFD
- uint64_t u = 1;
- // TODO (v2): check result? is it ever possible to get a short write/failure
- // for an eventfd?
- if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
- assert(1 == 0);
- }
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
#else
- if (write(e->notify_send_fd, "w", 1) <= 0) {
- assert(1 == 0);
- }
+ if (write(e->notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
#endif
+ }
+ if (!STAILQ_EMPTY(&w_head)) {
+ P_DEBUG("%s: running inline worker queue\n", __func__);
+ // emulating proxy_event_handler
+ proxy_run_backend_queue(&w_head);
+ }
return;
}
@@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (cores == LUA_OK) {
WSTAT_DECR(c->thread, proxy_req_active, 1);
int type = lua_type(Lc, 1);
+ P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response");
_set_noreply_mode(resp, r);
@@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else if (cores == LUA_YIELD) {
int coro_ref = 0;
int yield_type = lua_tointeger(Lc, -1);
+ P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
assert(yield_type != 0);
lua_pop(Lc, 1);
diff --git a/proto_proxy.h b/proto_proxy.h
index 0b3d240..c8608da 100644
--- a/proto_proxy.h
+++ b/proto_proxy.h
@@ -13,6 +13,7 @@ void *proxy_init(bool proxy_uring);
// TODO: need better names or a better interface for these. can be confusing
// to reason about the order.
void proxy_start_reload(void *arg);
+int proxy_first_confload(void *arg);
int proxy_load_config(void *arg);
void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr);
diff --git a/proxy.h b/proxy.h
index 23aabd8..bb9f0c3 100644
--- a/proxy.h
+++ b/proxy.h
@@ -196,7 +196,7 @@ typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
typedef struct {
lua_State *proxy_state;
void *proxy_code;
- proxy_event_thread_t *proxy_threads;
+ proxy_event_thread_t *proxy_io_thread;
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
@@ -209,6 +209,7 @@ typedef struct {
bool worker_done; // signal variable for the worker lock/cond system.
bool worker_failed; // covered by worker_lock as well.
bool use_uring; // use IO_URING for backend connections.
+ bool loading; // bool indicating an active config load.
struct proxy_global_stats global_stats;
struct proxy_user_stats user_stats;
struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
@@ -351,6 +352,7 @@ struct mcp_backend_s {
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
+ bool use_io_thread; // note if this backend is worker-local or not.
struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
char name[MAX_NAMELEN+1];
char port[MAX_PORTLEN+1];
@@ -456,21 +458,25 @@ struct mcp_pool_s {
proxy_ctx_t *ctx; // main context.
STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
char key_filter_conf[KEY_HASH_FILTER_MAX+1];
+ char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter.
uint64_t hash_seed; // calculated from a string.
int refcount;
int phc_ref;
int self_ref; // TODO (v2): double check that this is needed.
int pool_size;
+ bool use_iothread;
mcp_pool_be_t pool[];
};
typedef struct {
mcp_pool_t *main; // ptr to original
+ mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread.
} mcp_pool_proxy_t;
// networking interface
-void proxy_init_evthread_events(proxy_event_thread_t *t);
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
void *proxy_event_thread(void *arg);
+void proxy_run_backend_queue(be_head_t *head);
// await interface
enum mcp_await_e {
@@ -509,7 +515,7 @@ int mcplib_open_dist_jump_hash(lua_State *L);
int mcplib_open_dist_ring_hash(lua_State *L);
int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len);
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len);
void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len);
void proxy_lua_error(lua_State *L, const char *s);
diff --git a/proxy_await.c b/proxy_await.c
index b0a4dee..d5e40d3 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -259,7 +259,11 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
int n = 0;
- bool await_first = true;
+ // TODO (v3) await_first is used as a marker for upping the "wait for
+ // IO's" queue count, which means we need to force it off if we're in
+ // background mode, else we would accidentally wait for a response anyway.
+ // This note is for finding a less convoluted method for this.
+ bool await_first = (aw->type == AWAIT_BACKGROUND) ? false : true;
// loop arg table and run each hash selector
lua_pushnil(L); // -> 3
while (lua_next(L, 1) != 0) {
@@ -269,11 +273,10 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
if (pp == NULL) {
proxy_lua_error(L, "mcp.await must be supplied with a pool");
}
- mcp_pool_t *p = pp->main;
// NOTE: rq->be is only held to help pass the backend into the IOP in
// mcp_queue call. Could be a local variable and an argument too.
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
mcp_queue_await_io(c, L, rq, await_ref, await_first);
await_first = false;
diff --git a/proxy_config.c b/proxy_config.c
index 6bb54be..e45711e 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -41,11 +41,40 @@ static const char * _load_helper(lua_State *L, void *data, size_t *size) {
void proxy_start_reload(void *arg) {
proxy_ctx_t *ctx = arg;
if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
+ ctx->loading = true;
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
}
}
+int proxy_first_confload(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ pthread_mutex_lock(&ctx->config_lock);
+ ctx->loading = true;
+ pthread_cond_signal(&ctx->config_cond);
+ pthread_mutex_unlock(&ctx->config_lock);
+
+ while (1) {
+ bool stop = false;
+ pthread_mutex_lock(&ctx->config_lock);
+ if (!ctx->loading) {
+ stop = true;
+ }
+ pthread_mutex_unlock(&ctx->config_lock);
+ if (stop)
+ break;
+ }
+ int fails = 0;
+ STAT_L(ctx);
+ fails = ctx->global_stats.config_reload_fails;
+ STAT_UL(ctx);
+ if (fails) {
+ return -1;
+ }
+
+ return 0;
+}
+
// Manages a queue of inbound objects destined to be deallocated.
static void *_proxy_manager_thread(void *arg) {
proxy_ctx_t *ctx = arg;
@@ -108,6 +137,7 @@ static void *_proxy_config_thread(void *arg) {
logger_create();
pthread_mutex_lock(&ctx->config_lock);
while (1) {
+ ctx->loading = false;
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
@@ -233,7 +263,7 @@ int proxy_load_config(void *arg) {
return 0;
}
-static int _copy_pool(lua_State *from, lua_State *to) {
+static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
// from, -3 should have he userdata.
mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
size_t size = sizeof(mcp_pool_proxy_t);
@@ -241,16 +271,22 @@ static int _copy_pool(lua_State *from, lua_State *to) {
luaL_setmetatable(to, "mcp.pool_proxy");
pp->main = p;
+ if (p->use_iothread) {
+ pp->pool = p->pool;
+ } else {
+ // allow 0 indexing for backends when unique to each worker thread
+ pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
+ }
pthread_mutex_lock(&p->lock);
p->refcount++;
pthread_mutex_unlock(&p->lock);
return 0;
}
-static void _copy_config_table(lua_State *from, lua_State *to);
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
// (from, -1) is the source value
// should end with (to, -1) being the new value.
-static void _copy_config_table(lua_State *from, lua_State *to) {
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
int type = lua_type(from, -1);
bool found = false;
luaL_checkstack(from, 4, "configuration error: table recursion too deep");
@@ -266,7 +302,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
if (lua_rawget(from, -2) != LUA_TNIL) {
const char *name = lua_tostring(from, -1);
if (strcmp(name, "mcp.pool") == 0) {
- _copy_pool(from, to);
+ _copy_pool(from, to, thr);
found = true;
}
}
@@ -323,7 +359,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
// lua_settable(to, n) - n being the table
// takes -2 key -1 value, pops both.
// use lua_absindex(L, -1) and so to convert easier?
- _copy_config_table(from, to); // push next value.
+ _copy_config_table(from, to, thr); // push next value.
lua_settable(to, nt);
lua_pop(from, 1); // drop value, keep key.
}
@@ -385,7 +421,7 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
// If the setjump/longjump combos are compatible a pcall for from and
// atpanic for to might work best, since the config VM is/should be long
// running and worker VM's should be rotated.
- _copy_config_table(ctx->proxy_state, L);
+ _copy_config_table(ctx->proxy_state, L, thr);
// copied value is in front of route function, now call it.
if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
diff --git a/proxy_lua.c b/proxy_lua.c
index e6b50ae..aeaf1e5 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) {
// Since we're running in the config thread it could just busy poll
// until the connection was picked up.
assert(be->transferred);
- proxy_event_thread_t *e = ctx->proxy_threads;
+ proxy_event_thread_t *e = be->event_thread;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) {
return 1; // return be object.
}
+// Called with the cache label at top of the stack.
static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
// Note: The upvalue won't be found unless we're running from a function with it
// set as an upvalue.
- lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
@@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_
return NULL;
}
-static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
+ proxy_event_thread_t *e) {
// FIXME: remove global.
proxy_ctx_t *ctx = settings.proxy_ctx;
@@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
STAT_UL(ctx);
be->connect_flags = flags;
- proxy_event_thread_t *e = ctx->proxy_threads;
+ be->event_thread = e;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
}
#endif
+ lua_pushvalue(L, 4); // push the label string back to the top.
// Add this new backend connection to the object cache.
- lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
// set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
@@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
// UD now popped from stack.
}
-// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-static int mcplib_pool(lua_State *L) {
- int argc = lua_gettop(L);
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = luaL_len(L, 1); // get length of array table
-
- size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
- mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
- // Zero the memory before use, so we can realibly use __gc to clean up
- memset(p, 0, plen);
- p->pool_size = n;
- // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
- p->key_hasher = XXH3_64bits_withSeed;
- pthread_mutex_init(&p->lock, NULL);
- p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
-
- luaL_setmetatable(L, "mcp.pool");
-
- lua_pushvalue(L, -1); // dupe self for reference.
- p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- // TODO (v2): move to after function check so we can find the right
- // backend label to look up.
+// in the proxy object, we can alias a ptr to the pool to where it needs to be
+// based on worker number or io_thread right?
+static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
// remember lua arrays are 1 indexed.
- for (int x = 1; x <= n; x++) {
- mcp_pool_be_t *s = &p->pool[x-1];
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
// check label for pre-existing backend conn/wrapper
+ // TODO (v2): there're native ways of "from C make lua strings"
+ int toconcat = 1;
+ if (p->beprefix[0] != '\0') {
+ lua_pushstring(L, p->beprefix);
+ toconcat++;
+ }
+ if (p->use_iothread) {
+ lua_pushstring(L, ":io:");
+ toconcat++;
+ } else {
+ lua_pushstring(L, ":w");
+ lua_pushinteger(L, offset);
+ lua_pushstring(L, ":");
+ toconcat += 3;
+ }
+ lua_pushlstring(L, bel->label, bel->llen);
+ lua_concat(L, toconcat);
+
+ lua_pushvalue(L, -1); // copy the label string for the create method.
mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
if (bew == NULL) {
- bew = _mcplib_make_backendconn(L, bel);
+ bew = _mcplib_make_backendconn(L, bel, t);
}
s->be = bew->be; // unwrap the backend connection for direct ref.
+ bew->be->use_io_thread = p->use_iothread;
// If found from cache or made above, the backend wrapper is on the
// top of the stack, so we can now take its reference.
@@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) {
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
lua_pop(L, 1); // pop the mcp.backend label object.
+ lua_pop(L, 1); // drop extra label copy.
+ }
+}
+
+// call with table of backends in 1
+static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
+ if (p->use_iothread) {
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
+ } else {
+ // TODO (v3) globals.
+ for (int n = 0; n < settings.num_threads; n++) {
+ LIBEVENT_THREAD *t = get_worker_thread(n);
+ _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
+ }
}
+}
+
+// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
+static int mcplib_pool(lua_State *L) {
+ int argc = lua_gettop(L);
+ luaL_checktype(L, 1, LUA_TTABLE);
+ int n = luaL_len(L, 1); // get length of array table
+ int workers = settings.num_threads; // TODO (v3): globals usage.
+
+ size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
+ mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
+ // Zero the memory before use, so we can realibly use __gc to clean up
+ memset(p, 0, plen);
+ p->pool_size = n;
+ p->use_iothread = true;
+ // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
+ p->key_hasher = XXH3_64bits_withSeed;
+ pthread_mutex_init(&p->lock, NULL);
+ p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+
+ luaL_setmetatable(L, "mcp.pool");
+
+ lua_pushvalue(L, -1); // dupe self for reference.
+ p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// Allow passing an ignored nil as a second argument. Makes the lua easier
int type = lua_type(L, 2);
if (argc == 1 || type == LUA_TNIL) {
+ _mcplib_pool_make_be(L, p);
lua_getglobal(L, "mcp");
// TODO (v2): decide on a mcp.default_dist and use that instead
if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
@@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) {
// pool, then pass it along to the a constructor if necessary.
luaL_checktype(L, 2, LUA_TTABLE);
+ if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TBOOLEAN);
+ int use_iothread = lua_toboolean(L, -1);
+ if (use_iothread) {
+ p->use_iothread = true;
+ } else {
+ p->use_iothread = false;
+ }
+ lua_pop(L, 1); // remove value.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+
+ if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ size_t len = 0;
+ const char *bepfx = lua_tolstring(L, -1, &len);
+ memcpy(p->beprefix, bepfx, len);
+ p->beprefix[len+1] = '\0';
+ lua_pop(L, 1); // pop beprefix string.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+ _mcplib_pool_make_be(L, p);
+
// stack: backends, options, mcp.pool
if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
// overriding the distribution function.
@@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) {
lua_pop(L, 1); // remove the dist table from stack.
} else {
lua_pop(L, 1); // pop the nil.
+
+ // use the default dist if not specified with an override table.
+ lua_getglobal(L, "mcp");
+ // TODO (v2): decide on a mcp.default_dist and use that instead
+ if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // pop "dist_jump_hash" value.
+ } else {
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // pop "mcp"
}
if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
@@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) {
return 0;
}
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) {
+ mcp_pool_t *p = pp->main;
if (p->key_filter) {
key = p->key_filter(p->key_filter_conf, key, len, &len);
P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
@@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
}
- return p->pool[lookup].be;
+ return pp->pool[lookup].be;
}
// hashfunc(request) -> backend(request)
@@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
static int mcplib_pool_proxy_call(lua_State *L) {
// internal args are the hash selector (self)
mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
// then request object.
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
@@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
}
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
// now yield request, pool up.
lua_pushinteger(L, MCP_YIELD_POOL);
diff --git a/proxy_network.c b/proxy_network.c
index 239b0d4..0334971 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
// paranoia about moving items between lists.
io->io_next.stqe_next = NULL;
- // Need to check on await's before looking at backends, in case it
- // doesn't have one.
- // Here we're letting an await resume without waiting on the network.
- if (io->await_background) {
- return_io_pending((io_pending_t *)io);
- continue;
- }
-
mcp_backend_t *be = io->backend;
// So the backend can retrieve its event base.
- be->event_thread = t;
if (be->bad) {
P_DEBUG("%s: fast failing request to bad backend\n", __func__);
io->client_resp->status = MCMC_ERR;
@@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
// assign the initial events to the backend, so we don't have to
// constantly check if they were initialized yet elsewhere.
// note these events will not fire until event_add() is called.
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
- event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
- event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
+ event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
}
}
+void proxy_run_backend_queue(be_head_t *head) {
+ mcp_backend_t *be;
+ STAILQ_FOREACH(be, head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->bad) {
+ // flush queue if backend is still bad.
+ // TODO: duplicated from _reset_bad_backend()
+ io_pending_proxy_t *io = NULL;
+ while (!STAILQ_EMPTY(&be->io_head)) {
+ io = STAILQ_FIRST(&be->io_head);
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ io->client_resp->status = MCMC_ERR;
+ be->depth--;
+ return_io_pending((io_pending_t *)io);
+ }
+ } else if (be->connecting || be->validating) {
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
+ } else {
+ flags = _flush_pending_write(be);
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ } else if (flags & EV_WRITE) {
+ // only get here because we need to kick off the write handler
+ _start_write_event(be);
+ }
+
+ if (be->pending_read) {
+ _start_timeout_event(be);
+ }
+
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
}
// Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // FIXME (v2): _set_event() is buggy, see notes on function.
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
- } else {
- flags = _flush_pending_write(be);
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed(be);
- } else if (flags & EV_WRITE) {
- // only get here because we need to kick off the write handler
- _start_write_event(be);
- }
-
- if (be->pending_read) {
- _start_timeout_event(be);
- }
- }
- }
-
+ proxy_run_backend_queue(&t->be_head);
}
void *proxy_event_thread(void *arg) {
@@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
// event threads.
-void proxy_init_evthread_events(proxy_event_thread_t *t) {
+// TODO: this either needs a restructure or split into two funcs:
+// 1) for the IO thread which creates its own ring/event base
+// 2) for the worker thread which reuses the event base.
+// io_uring will probably only work for the IO thread which makes further
+// exceptions.
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
+ t->ctx = ctx;
+#ifdef USE_EVENTFD
+ t->event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+#else
+ int fds[2];
+ if (pipe(fds)) {
+ perror("can't create proxy backend notify pipe");
+ exit(1);
+ }
+
+ t->notify_receive_fd = fds[0];
+ t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
+#endif
+
+ // incoming request queue.
+ STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
+ pthread_mutex_init(&t->mutex, NULL);
+ pthread_cond_init(&t->cond, NULL);
+
+ // initialize the event system.
+
#ifdef HAVE_LIBURING
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
bool use_uring = t->ctx->use_uring;
struct io_uring_params p = {0};
assert(t->event_fd); // uring only exists where eventfd also does.
@@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
}
#endif
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- t->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- if (! t->base) {
- fprintf(stderr, "Can't allocate event base\n");
- exit(1);
+ if (base == NULL) {
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+ } else {
+ // reusing an event base from a worker thread.
+ t->base = base;
}
// listen for notifications.
diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua
index 5f9ad0e..dbe725d 100644
--- a/t/proxyconfig.lua
+++ b/t/proxyconfig.lua
@@ -3,6 +3,7 @@
local mode = dofile("/tmp/proxyconfigmode.lua")
mcp.backend_read_timeout(4)
+mcp.backend_connect_timeout(5)
function mcp_config_pools(old)
if mode == "none" then
@@ -29,6 +30,15 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3})
}
return pools
+ elseif mode == "noiothread" then
+ local b1 = mcp.backend('b1', '127.0.0.1', 11514)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11515)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11516)
+
+ local pools = {
+ test = mcp.pool({b1, b2, b3}, { iothread = false })
+ }
+ return pools
end
end
@@ -42,5 +52,8 @@ function mcp_config_routes(zones)
elseif mode == "start" or mode == "betable" then
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
+ elseif mode == "noiothread" then
+ mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
+ mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
end
end
diff --git a/t/proxyconfig.t b/t/proxyconfig.t
index 1b19b8e..ebf8684 100644
--- a/t/proxyconfig.t
+++ b/t/proxyconfig.t
@@ -99,6 +99,9 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
}
my @mbe = ();
+# A map of where keys route to for worker IO tests later
+my %keymap = ();
+my $keycount = 100;
{
# set up server backend sockets.
for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
@@ -125,6 +128,22 @@ my @mbe = ();
is(scalar <$be>, $cmd, "metaget passthrough");
print $be "EN\r\n";
is(scalar <$ps>, "EN\r\n", "miss received");
+
+ # Route a bunch of keys and map them to backends.
+ for my $key (0 .. $keycount) {
+ print $ps "mg /test/$key\r\n";
+ my @readable = $s->can_read(0.25);
+ is(scalar @readable, 1, "only one backend became readable");
+ my $be = shift @readable;
+ for (0 .. 2) {
+ if ($be == $mbe[$_]) {
+ $keymap{$key} = $_;
+ }
+ }
+ is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough");
+ print $be "EN\r\n";
+ is(scalar <$ps>, "EN\r\n", "miss received");
+ }
}
# Test backend table arguments and per-backend time overrides
@@ -165,6 +184,88 @@ my @holdbe = (); # avoid having the backends immediately disconnect and pollute
is(scalar @readable, 0, "no new sockets");
}
+# Disconnect the existing sockets
+@mbe = ();
+@holdbe = ();
+@mocksrvs = ();
+$watcher = $p_srv->new_sock;
+# Reset the watcher and let logs die off.
+sleep 1;
+print $watcher "watch proxyevents\n";
+is(<$watcher>, "OK\r\n", "watcher enabled");
+
+{
+ # re-create the mock servers so we get clean connects, the previous
+ # backends could be reconnecting still.
+ for my $port (11514, 11515, 11516) {
+ my $srv = mock_server($port);
+ ok(defined $srv, "mock server created");
+ push(@mocksrvs, $srv);
+ }
+
+ write_modefile('return "noiothread"');
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ my $s = IO::Select->new();
+ for my $msrv (@mocksrvs) {
+ $s->add($msrv);
+ }
+ my @readable = $s->can_read(0.25);
+ # All three backends should become readable with new sockets.
+ is(scalar @readable, 3, "all listeners became readable");
+
+ my @bepile = ();
+ my $bes = IO::Select->new(); # selector just for the backend sockets.
+ # Each backend should create one socket per worker thread.
+ for my $msrv (@readable) {
+ my @temp = ();
+ for (0 .. 3) {
+ my $be = $msrv->accept();
+ ok(defined $be, "mock backend accepted");
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $bes->add($be);
+ push(@temp, $be);
+ }
+ for (0 .. 2) {
+ if ($mocksrvs[$_] == $msrv) {
+ $bepile[$_] = \@temp;
+ }
+ }
+ }
+
+ # clients round robin onto different worker threads, so we can test the
+ # key dist on different offsets.
+ my @cli = ();
+ for (0 .. 2) {
+ my $p = $p_srv->new_sock;
+
+ for my $key (0 .. $keycount) {
+ print $p "mg /test/$key\r\n";
+ @readable = $bes->can_read(0.25);
+ is(scalar @readable, 1, "only one backend became readable");
+ my $be = shift @readable;
+ # find which listener this be belongs to
+ for my $x (0 .. 2) {
+ for (@{$bepile[$x]}) {
+ if ($_ == $be) {
+ cmp_ok($x, '==', $keymap{$key}, "key routed to correct listener: " . $keymap{$key});
+ }
+ }
+ }
+
+ is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough");
+ print $be "EN\r\n";
+ is(scalar <$p>, "EN\r\n", "miss received");
+ }
+
+ # hold onto the sockets just in case.
+ push(@cli, $p);
+ }
+
+}
+
# TODO:
# remove backends
# do dead sockets close?
diff --git a/thread.c b/thread.c
index 04aa8f4..584ad8e 100644
--- a/thread.c
+++ b/thread.c
@@ -1091,6 +1091,7 @@ void memcached_thread_init(int nthreads, void *arg) {
#ifdef EXTSTORE
threads[i].storage = arg;
#endif
+ threads[i].thread_baseid = i;
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats_state.reserved_fds += 5;