summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.c11
-rw-r--r--memcached.h2
-rw-r--r--proto_proxy.c149
-rw-r--r--proto_proxy.h1
-rw-r--r--proxy.h12
-rw-r--r--proxy_await.c9
-rw-r--r--proxy_config.c48
-rw-r--r--proxy_lua.c147
-rw-r--r--proxy_network.c150
-rw-r--r--t/proxyconfig.lua13
-rw-r--r--t/proxyconfig.t101
-rw-r--r--thread.c1
12 files changed, 470 insertions, 174 deletions
diff --git a/memcached.c b/memcached.c
index adaf162..70cc8e3 100644
--- a/memcached.c
+++ b/memcached.c
@@ -6053,9 +6053,6 @@ int main (int argc, char **argv) {
#ifdef PROXY
if (settings.proxy_enabled) {
settings.proxy_ctx = proxy_init(settings.proxy_uring);
- if (proxy_load_config(settings.proxy_ctx) != 0) {
- exit(EXIT_FAILURE);
- }
}
#endif
#ifdef EXTSTORE
@@ -6067,6 +6064,14 @@ int main (int argc, char **argv) {
init_lru_crawler(NULL);
#endif
+#ifdef PROXY
+ if (settings.proxy_enabled) {
+ if (proxy_first_confload(settings.proxy_ctx) != 0) {
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
+
if (start_assoc_maint && start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
diff --git a/memcached.h b/memcached.h
index 4790c75..79b9a81 100644
--- a/memcached.h
+++ b/memcached.h
@@ -703,6 +703,7 @@ typedef struct {
int notify_send_fd; /* sending end of notify pipe */
#endif
int cur_sfd; /* client fd for logging commands */
+ int thread_baseid; /* which "number" thread this is for data offsets */
struct thread_stats stats; /* Stats generated by this thread */
io_queue_cb_t io_queues[IO_QUEUE_COUNT];
struct conn_queue *ev_queue; /* Worker/conn event queue */
@@ -723,6 +724,7 @@ typedef struct {
void *proxy_hooks;
void *proxy_user_stats;
void *proxy_int_stats;
+ void *proxy_event_thread; // worker threads can also be proxy IO threads
uint32_t proxy_rng[4]; // fast per-thread rng for lua.
// TODO: add ctx object so we can attach to queue.
#endif
diff --git a/proto_proxy.c b/proto_proxy.c
index 535800d..1acc920 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) {
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(ctx, NULL, L);
- // Create/start the backend threads, which we need before servers
+ // Create/start the IO thread, which we need before servers
// start getting created.
- // Supporting N event threads should be possible, but it will be a
- // low number of N to avoid too many wakeup syscalls.
- // For now we hardcode to 1.
- proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
- ctx->proxy_threads = threads;
- for (int i = 0; i < 1; i++) {
- proxy_event_thread_t *t = &threads[i];
- t->ctx = ctx;
-#ifdef USE_EVENTFD
- t->event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
- t->be_event_fd = eventfd(0, EFD_NONBLOCK);
- if (t->be_event_fd == -1) {
- perror("failed to create backend notify eventfd");
- exit(1);
- }
-#else
- int fds[2];
- if (pipe(fds)) {
- perror("can't create proxy backend notify pipe");
- exit(1);
- }
-
- t->notify_receive_fd = fds[0];
- t->notify_send_fd = fds[1];
-
- if (pipe(fds)) {
- perror("can't create proxy backend connection notify pipe");
- exit(1);
- }
- t->be_notify_receive_fd = fds[0];
- t->be_notify_send_fd = fds[1];
-#endif
- proxy_init_evthread_events(t);
-
- // incoming request queue.
- STAILQ_INIT(&t->io_head_in);
- STAILQ_INIT(&t->beconn_head_in);
- pthread_mutex_init(&t->mutex, NULL);
- pthread_cond_init(&t->cond, NULL);
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ ctx->proxy_io_thread = t;
+ proxy_init_event_thread(t, ctx, NULL);
#ifdef HAVE_LIBURING
- if (t->use_uring) {
- pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
- } else {
- pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
- }
-#else
+ if (t->use_uring) {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
+ } else {
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
-#endif // HAVE_LIBURING
- thread_setname(t->thread_id, "mc-prx-io");
}
+#else
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+#endif // HAVE_LIBURING
+ thread_setname(t->thread_id, "mc-prx-io");
_start_proxy_config_threads(ctx);
return ctx;
@@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) {
thr->proxy_rng[x] = rand();
}
- // kick off the configuration.
- if (proxy_thread_loadconf(ctx, thr) != 0) {
- exit(EXIT_FAILURE);
- }
+ // Create a proxy event thread structure to piggyback on the worker.
+ proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t));
+ thr->proxy_event_thread = t;
+ proxy_init_event_thread(t, ctx, thr->base);
}
// ctx_stack is a stack of io_pending_proxy_t's.
void proxy_submit_cb(io_queue_t *q) {
- proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
+ proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread;
io_pending_proxy_t *p = q->stack_ctx;
io_head_t head;
+ be_head_t w_head; // worker local stack.
STAILQ_INIT(&head);
+ STAILQ_INIT(&w_head);
// NOTE: responses get returned in the correct order no matter what, since
// mc_resp's are linked.
@@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) {
// So for now we build the secondary list with an STAILQ, which
// can be transplanted/etc.
while (p) {
- // insert into tail so head is oldest request.
- STAILQ_INSERT_TAIL(&head, p, io_next);
+ mcp_backend_t *be;
+ P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p);
if (p->is_await) {
// need to not count await objects multiple times.
- if (p->await_first) {
+ if (p->await_background) {
+ P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p);
+ // intercept await backgrounds
+ // this call cannot recurse if we're on the worker thread,
+ // since the worker thread has to finish executing this
+ // function in order to pick up the returned IO.
+ q->count++;
+ return_io_pending((io_pending_t *)p);
+ p = p->next;
+ continue;
+ } else if (p->await_first) {
q->count++;
}
// funny workaround: awaiting IOP's don't count toward
@@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) {
} else {
q->count++;
}
+ be = p->backend;
+
+ if (be->use_io_thread) {
+ // insert into tail so head is oldest request.
+ STAILQ_INSERT_TAIL(&head, p, io_next);
+ } else {
+ // emulate some of handler_dequeue()
+ STAILQ_INSERT_TAIL(&be->io_head, p, io_next);
+ if (be->io_next == NULL) {
+ be->io_next = p;
+ }
+ be->depth++;
+ if (!be->stacked) {
+ be->stacked = true;
+ be->be_next.stqe_next = NULL; // paranoia
+ STAILQ_INSERT_TAIL(&w_head, be, be_next);
+ }
+ }
p = p->next;
}
@@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) {
// clear out the submit queue so we can re-queue new IO's inline.
q->stack_ctx = NULL;
- // Transfer request stack to event thread.
- pthread_mutex_lock(&e->mutex);
- STAILQ_CONCAT(&e->io_head_in, &head);
- // No point in holding the lock since we're not doing a cond signal.
- pthread_mutex_unlock(&e->mutex);
+ if (!STAILQ_EMPTY(&head)) {
+ P_DEBUG("%s: submitting queue to IO thread\n", __func__);
+ // Transfer request stack to event thread.
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_CONCAT(&e->io_head_in, &head);
+ // No point in holding the lock since we're not doing a cond signal.
+ pthread_mutex_unlock(&e->mutex);
- // Signal to check queue.
+ // Signal to check queue.
#ifdef USE_EVENTFD
- uint64_t u = 1;
- // TODO (v2): check result? is it ever possible to get a short write/failure
- // for an eventfd?
- if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
- assert(1 == 0);
- }
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
#else
- if (write(e->notify_send_fd, "w", 1) <= 0) {
- assert(1 == 0);
- }
+ if (write(e->notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
#endif
+ }
+ if (!STAILQ_EMPTY(&w_head)) {
+ P_DEBUG("%s: running inline worker queue\n", __func__);
+ // emulating proxy_event_handler
+ proxy_run_backend_queue(&w_head);
+ }
return;
}
@@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (cores == LUA_OK) {
WSTAT_DECR(c->thread, proxy_req_active, 1);
int type = lua_type(Lc, 1);
+ P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type);
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response");
_set_noreply_mode(resp, r);
@@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else if (cores == LUA_YIELD) {
int coro_ref = 0;
int yield_type = lua_tointeger(Lc, -1);
+ P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type);
assert(yield_type != 0);
lua_pop(Lc, 1);
diff --git a/proto_proxy.h b/proto_proxy.h
index 0b3d240..c8608da 100644
--- a/proto_proxy.h
+++ b/proto_proxy.h
@@ -13,6 +13,7 @@ void *proxy_init(bool proxy_uring);
// TODO: need better names or a better interface for these. can be confusing
// to reason about the order.
void proxy_start_reload(void *arg);
+int proxy_first_confload(void *arg);
int proxy_load_config(void *arg);
void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr);
diff --git a/proxy.h b/proxy.h
index 23aabd8..bb9f0c3 100644
--- a/proxy.h
+++ b/proxy.h
@@ -196,7 +196,7 @@ typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
typedef struct {
lua_State *proxy_state;
void *proxy_code;
- proxy_event_thread_t *proxy_threads;
+ proxy_event_thread_t *proxy_io_thread;
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
@@ -209,6 +209,7 @@ typedef struct {
bool worker_done; // signal variable for the worker lock/cond system.
bool worker_failed; // covered by worker_lock as well.
bool use_uring; // use IO_URING for backend connections.
+ bool loading; // bool indicating an active config load.
struct proxy_global_stats global_stats;
struct proxy_user_stats user_stats;
struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
@@ -351,6 +352,7 @@ struct mcp_backend_s {
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
+ bool use_io_thread; // note if this backend is worker-local or not.
struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
char name[MAX_NAMELEN+1];
char port[MAX_PORTLEN+1];
@@ -456,21 +458,25 @@ struct mcp_pool_s {
proxy_ctx_t *ctx; // main context.
STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
char key_filter_conf[KEY_HASH_FILTER_MAX+1];
+ char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter.
uint64_t hash_seed; // calculated from a string.
int refcount;
int phc_ref;
int self_ref; // TODO (v2): double check that this is needed.
int pool_size;
+ bool use_iothread;
mcp_pool_be_t pool[];
};
typedef struct {
mcp_pool_t *main; // ptr to original
+ mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread.
} mcp_pool_proxy_t;
// networking interface
-void proxy_init_evthread_events(proxy_event_thread_t *t);
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
void *proxy_event_thread(void *arg);
+void proxy_run_backend_queue(be_head_t *head);
// await interface
enum mcp_await_e {
@@ -509,7 +515,7 @@ int mcplib_open_dist_jump_hash(lua_State *L);
int mcplib_open_dist_ring_hash(lua_State *L);
int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len);
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len);
void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len);
void proxy_lua_error(lua_State *L, const char *s);
diff --git a/proxy_await.c b/proxy_await.c
index b0a4dee..d5e40d3 100644
--- a/proxy_await.c
+++ b/proxy_await.c
@@ -259,7 +259,11 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
int n = 0;
- bool await_first = true;
+ // TODO (v3) await_first is used as a marker for upping the "wait for
+ // IO's" queue count, which means we need to force it off if we're in
+ // background mode, else we would accidentally wait for a response anyway.
+ // This note is for finding a less convoluted method for this.
+ bool await_first = (aw->type == AWAIT_BACKGROUND) ? false : true;
// loop arg table and run each hash selector
lua_pushnil(L); // -> 3
while (lua_next(L, 1) != 0) {
@@ -269,11 +273,10 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
if (pp == NULL) {
proxy_lua_error(L, "mcp.await must be supplied with a pool");
}
- mcp_pool_t *p = pp->main;
// NOTE: rq->be is only held to help pass the backend into the IOP in
// mcp_queue call. Could be a local variable and an argument too.
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
mcp_queue_await_io(c, L, rq, await_ref, await_first);
await_first = false;
diff --git a/proxy_config.c b/proxy_config.c
index 6bb54be..e45711e 100644
--- a/proxy_config.c
+++ b/proxy_config.c
@@ -41,11 +41,40 @@ static const char * _load_helper(lua_State *L, void *data, size_t *size) {
void proxy_start_reload(void *arg) {
proxy_ctx_t *ctx = arg;
if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
+ ctx->loading = true;
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
}
}
+int proxy_first_confload(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ pthread_mutex_lock(&ctx->config_lock);
+ ctx->loading = true;
+ pthread_cond_signal(&ctx->config_cond);
+ pthread_mutex_unlock(&ctx->config_lock);
+
+ while (1) {
+ bool stop = false;
+ pthread_mutex_lock(&ctx->config_lock);
+ if (!ctx->loading) {
+ stop = true;
+ }
+ pthread_mutex_unlock(&ctx->config_lock);
+ if (stop)
+ break;
+ }
+ int fails = 0;
+ STAT_L(ctx);
+ fails = ctx->global_stats.config_reload_fails;
+ STAT_UL(ctx);
+ if (fails) {
+ return -1;
+ }
+
+ return 0;
+}
+
// Manages a queue of inbound objects destined to be deallocated.
static void *_proxy_manager_thread(void *arg) {
proxy_ctx_t *ctx = arg;
@@ -108,6 +137,7 @@ static void *_proxy_config_thread(void *arg) {
logger_create();
pthread_mutex_lock(&ctx->config_lock);
while (1) {
+ ctx->loading = false;
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
@@ -233,7 +263,7 @@ int proxy_load_config(void *arg) {
return 0;
}
-static int _copy_pool(lua_State *from, lua_State *to) {
+static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
// from, -3 should have he userdata.
mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
size_t size = sizeof(mcp_pool_proxy_t);
@@ -241,16 +271,22 @@ static int _copy_pool(lua_State *from, lua_State *to) {
luaL_setmetatable(to, "mcp.pool_proxy");
pp->main = p;
+ if (p->use_iothread) {
+ pp->pool = p->pool;
+ } else {
+ // allow 0 indexing for backends when unique to each worker thread
+ pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
+ }
pthread_mutex_lock(&p->lock);
p->refcount++;
pthread_mutex_unlock(&p->lock);
return 0;
}
-static void _copy_config_table(lua_State *from, lua_State *to);
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
// (from, -1) is the source value
// should end with (to, -1) being the new value.
-static void _copy_config_table(lua_State *from, lua_State *to) {
+static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
int type = lua_type(from, -1);
bool found = false;
luaL_checkstack(from, 4, "configuration error: table recursion too deep");
@@ -266,7 +302,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
if (lua_rawget(from, -2) != LUA_TNIL) {
const char *name = lua_tostring(from, -1);
if (strcmp(name, "mcp.pool") == 0) {
- _copy_pool(from, to);
+ _copy_pool(from, to, thr);
found = true;
}
}
@@ -323,7 +359,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) {
// lua_settable(to, n) - n being the table
// takes -2 key -1 value, pops both.
// use lua_absindex(L, -1) and so to convert easier?
- _copy_config_table(from, to); // push next value.
+ _copy_config_table(from, to, thr); // push next value.
lua_settable(to, nt);
lua_pop(from, 1); // drop value, keep key.
}
@@ -385,7 +421,7 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
// If the setjump/longjump combos are compatible a pcall for from and
// atpanic for to might work best, since the config VM is/should be long
// running and worker VM's should be rotated.
- _copy_config_table(ctx->proxy_state, L);
+ _copy_config_table(ctx->proxy_state, L, thr);
// copied value is in front of route function, now call it.
if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
diff --git a/proxy_lua.c b/proxy_lua.c
index e6b50ae..aeaf1e5 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) {
// Since we're running in the config thread it could just busy poll
// until the connection was picked up.
assert(be->transferred);
- proxy_event_thread_t *e = ctx->proxy_threads;
+ proxy_event_thread_t *e = be->event_thread;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) {
return 1; // return be object.
}
+// Called with the cache label at top of the stack.
static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) {
// first check our reference table to compare.
// Note: The upvalue won't be found unless we're running from a function with it
// set as an upvalue.
- lua_pushlstring(L, bel->label, bel->llen);
int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
if (ret != LUA_TNIL) {
mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
@@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_
return NULL;
}
-static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) {
+static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel,
+ proxy_event_thread_t *e) {
// FIXME: remove global.
proxy_ctx_t *ctx = settings.proxy_ctx;
@@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
STAT_UL(ctx);
be->connect_flags = flags;
- proxy_event_thread_t *e = ctx->proxy_threads;
+ be->event_thread = e;
pthread_mutex_lock(&e->mutex);
STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
pthread_mutex_unlock(&e->mutex);
@@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
}
#endif
+ lua_pushvalue(L, 4); // push the label string back to the top.
// Add this new backend connection to the object cache.
- lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable.
lua_pushvalue(L, -2); // copy the backend reference to the top.
// set our new backend wrapper object into the reference table.
lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
@@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
// UD now popped from stack.
}
-// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-static int mcplib_pool(lua_State *L) {
- int argc = lua_gettop(L);
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = luaL_len(L, 1); // get length of array table
-
- size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
- mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
- // Zero the memory before use, so we can realibly use __gc to clean up
- memset(p, 0, plen);
- p->pool_size = n;
- // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
- p->key_hasher = XXH3_64bits_withSeed;
- pthread_mutex_init(&p->lock, NULL);
- p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
-
- luaL_setmetatable(L, "mcp.pool");
-
- lua_pushvalue(L, -1); // dupe self for reference.
- p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- // TODO (v2): move to after function check so we can find the right
- // backend label to look up.
+// in the proxy object, we can alias a ptr to the pool to where it needs to be
+// based on worker number or io_thread right?
+static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) {
// remember lua arrays are 1 indexed.
- for (int x = 1; x <= n; x++) {
- mcp_pool_be_t *s = &p->pool[x-1];
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)];
lua_geti(L, 1, x); // get next server into the stack.
// If we bail here, the pool _gc() should handle releasing any backend
// references we made so far.
mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend");
// check label for pre-existing backend conn/wrapper
+ // TODO (v2): there're native ways of "from C make lua strings"
+ int toconcat = 1;
+ if (p->beprefix[0] != '\0') {
+ lua_pushstring(L, p->beprefix);
+ toconcat++;
+ }
+ if (p->use_iothread) {
+ lua_pushstring(L, ":io:");
+ toconcat++;
+ } else {
+ lua_pushstring(L, ":w");
+ lua_pushinteger(L, offset);
+ lua_pushstring(L, ":");
+ toconcat += 3;
+ }
+ lua_pushlstring(L, bel->label, bel->llen);
+ lua_concat(L, toconcat);
+
+ lua_pushvalue(L, -1); // copy the label string for the create method.
mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel);
if (bew == NULL) {
- bew = _mcplib_make_backendconn(L, bel);
+ bew = _mcplib_make_backendconn(L, bel, t);
}
s->be = bew->be; // unwrap the backend connection for direct ref.
+ bew->be->use_io_thread = p->use_iothread;
// If found from cache or made above, the backend wrapper is on the
// top of the stack, so we can now take its reference.
@@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) {
s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
lua_pop(L, 1); // pop the mcp.backend label object.
+ lua_pop(L, 1); // drop extra label copy.
+ }
+}
+
+// call with table of backends in 1
+static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) {
+ if (p->use_iothread) {
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread);
+ } else {
+ // TODO (v3) globals.
+ for (int n = 0; n < settings.num_threads; n++) {
+ LIBEVENT_THREAD *t = get_worker_thread(n);
+ _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread);
+ }
}
+}
+
+// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
+static int mcplib_pool(lua_State *L) {
+ int argc = lua_gettop(L);
+ luaL_checktype(L, 1, LUA_TTABLE);
+ int n = luaL_len(L, 1); // get length of array table
+ int workers = settings.num_threads; // TODO (v3): globals usage.
+
+ size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers);
+ mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
+ // Zero the memory before use, so we can realibly use __gc to clean up
+ memset(p, 0, plen);
+ p->pool_size = n;
+ p->use_iothread = true;
+ // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
+ p->key_hasher = XXH3_64bits_withSeed;
+ pthread_mutex_init(&p->lock, NULL);
+ p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE));
+
+ luaL_setmetatable(L, "mcp.pool");
+
+ lua_pushvalue(L, -1); // dupe self for reference.
+ p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
// Allow passing an ignored nil as a second argument. Makes the lua easier
int type = lua_type(L, 2);
if (argc == 1 || type == LUA_TNIL) {
+ _mcplib_pool_make_be(L, p);
lua_getglobal(L, "mcp");
// TODO (v2): decide on a mcp.default_dist and use that instead
if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
@@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) {
// pool, then pass it along to the a constructor if necessary.
luaL_checktype(L, 2, LUA_TTABLE);
+ if (lua_getfield(L, 2, "iothread") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TBOOLEAN);
+ int use_iothread = lua_toboolean(L, -1);
+ if (use_iothread) {
+ p->use_iothread = true;
+ } else {
+ p->use_iothread = false;
+ }
+ lua_pop(L, 1); // remove value.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+
+ if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) {
+ luaL_checktype(L, -1, LUA_TSTRING);
+ size_t len = 0;
+ const char *bepfx = lua_tolstring(L, -1, &len);
+ memcpy(p->beprefix, bepfx, len);
+ p->beprefix[len+1] = '\0';
+ lua_pop(L, 1); // pop beprefix string.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+ _mcplib_pool_make_be(L, p);
+
// stack: backends, options, mcp.pool
if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
// overriding the distribution function.
@@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) {
lua_pop(L, 1); // remove the dist table from stack.
} else {
lua_pop(L, 1); // pop the nil.
+
+ // use the default dist if not specified with an override table.
+ lua_getglobal(L, "mcp");
+ // TODO (v2): decide on a mcp.default_dist and use that instead
+ if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
+ _mcplib_pool_dist(L, p);
+ lua_pop(L, 1); // pop "dist_jump_hash" value.
+ } else {
+ lua_pop(L, 1);
+ }
+ lua_pop(L, 1); // pop "mcp"
}
if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
@@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) {
return 0;
}
-mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) {
+ mcp_pool_t *p = pp->main;
if (p->key_filter) {
key = p->key_filter(p->key_filter_conf, key, len, &len);
P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
@@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
}
- return p->pool[lookup].be;
+ return pp->pool[lookup].be;
}
// hashfunc(request) -> backend(request)
@@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const
static int mcplib_pool_proxy_call(lua_State *L) {
// internal args are the hash selector (self)
mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
// then request object.
mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
@@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) {
}
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
- rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);
+ rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len);
// now yield request, pool up.
lua_pushinteger(L, MCP_YIELD_POOL);
diff --git a/proxy_network.c b/proxy_network.c
index 239b0d4..0334971 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
// paranoia about moving items between lists.
io->io_next.stqe_next = NULL;
- // Need to check on await's before looking at backends, in case it
- // doesn't have one.
- // Here we're letting an await resume without waiting on the network.
- if (io->await_background) {
- return_io_pending((io_pending_t *)io);
- continue;
- }
-
mcp_backend_t *be = io->backend;
// So the backend can retrieve its event base.
- be->event_thread = t;
if (be->bad) {
P_DEBUG("%s: fast failing request to bad backend\n", __func__);
io->client_resp->status = MCMC_ERR;
@@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
_cleanup_backend(be);
} else {
be->transferred = true;
- be->event_thread = t;
// assign the initial events to the backend, so we don't have to
// constantly check if they were initialized yet elsewhere.
// note these events will not fire until event_add() is called.
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
- event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
- event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
+ event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
@@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
}
}
+void proxy_run_backend_queue(be_head_t *head) {
+ mcp_backend_t *be;
+ STAILQ_FOREACH(be, head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->bad) {
+ // flush queue if backend is still bad.
+ // TODO: duplicated from _reset_bad_backend()
+ io_pending_proxy_t *io = NULL;
+ while (!STAILQ_EMPTY(&be->io_head)) {
+ io = STAILQ_FIRST(&be->io_head);
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ io->client_resp->status = MCMC_ERR;
+ be->depth--;
+ return_io_pending((io_pending_t *)io);
+ }
+ } else if (be->connecting || be->validating) {
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
+ } else {
+ flags = _flush_pending_write(be);
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ } else if (flags & EV_WRITE) {
+ // only get here because we need to kick off the write handler
+ _start_write_event(be);
+ }
+
+ if (be->pending_read) {
+ _start_timeout_event(be);
+ }
+
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
}
// Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // FIXME (v2): _set_event() is buggy, see notes on function.
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
- } else {
- flags = _flush_pending_write(be);
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed(be);
- } else if (flags & EV_WRITE) {
- // only get here because we need to kick off the write handler
- _start_write_event(be);
- }
-
- if (be->pending_read) {
- _start_timeout_event(be);
- }
- }
- }
-
+ proxy_run_backend_queue(&t->be_head);
}
void *proxy_event_thread(void *arg) {
@@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
// event threads.
-void proxy_init_evthread_events(proxy_event_thread_t *t) {
+// TODO: this either needs a restructure or split into two funcs:
+// 1) for the IO thread which creates its own ring/event base
+// 2) for the worker thread which reuses the event base.
+// io_uring will probably only work for the IO thread which makes further
+// exceptions.
+void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) {
+ t->ctx = ctx;
+#ifdef USE_EVENTFD
+ t->event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
+#else
+ int fds[2];
+ if (pipe(fds)) {
+ perror("can't create proxy backend notify pipe");
+ exit(1);
+ }
+
+ t->notify_receive_fd = fds[0];
+ t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
+#endif
+
+ // incoming request queue.
+ STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
+ pthread_mutex_init(&t->mutex, NULL);
+ pthread_cond_init(&t->cond, NULL);
+
+ // initialize the event system.
+
#ifdef HAVE_LIBURING
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
bool use_uring = t->ctx->use_uring;
struct io_uring_params p = {0};
assert(t->event_fd); // uring only exists where eventfd also does.
@@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
}
#endif
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- t->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- if (! t->base) {
- fprintf(stderr, "Can't allocate event base\n");
- exit(1);
+ if (base == NULL) {
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+ } else {
+ // reusing an event base from a worker thread.
+ t->base = base;
}
// listen for notifications.
diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua
index 5f9ad0e..dbe725d 100644
--- a/t/proxyconfig.lua
+++ b/t/proxyconfig.lua
@@ -3,6 +3,7 @@
local mode = dofile("/tmp/proxyconfigmode.lua")
mcp.backend_read_timeout(4)
+mcp.backend_connect_timeout(5)
function mcp_config_pools(old)
if mode == "none" then
@@ -29,6 +30,15 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3})
}
return pools
+ elseif mode == "noiothread" then
+ local b1 = mcp.backend('b1', '127.0.0.1', 11514)
+ local b2 = mcp.backend('b2', '127.0.0.1', 11515)
+ local b3 = mcp.backend('b3', '127.0.0.1', 11516)
+
+ local pools = {
+ test = mcp.pool({b1, b2, b3}, { iothread = false })
+ }
+ return pools
end
end
@@ -42,5 +52,8 @@ function mcp_config_routes(zones)
elseif mode == "start" or mode == "betable" then
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
+ elseif mode == "noiothread" then
+ mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
+ mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
end
end
diff --git a/t/proxyconfig.t b/t/proxyconfig.t
index 1b19b8e..ebf8684 100644
--- a/t/proxyconfig.t
+++ b/t/proxyconfig.t
@@ -99,6 +99,9 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
}
my @mbe = ();
+# A map of where keys route to for worker IO tests later
+my %keymap = ();
+my $keycount = 100;
{
# set up server backend sockets.
for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
@@ -125,6 +128,22 @@ my @mbe = ();
is(scalar <$be>, $cmd, "metaget passthrough");
print $be "EN\r\n";
is(scalar <$ps>, "EN\r\n", "miss received");
+
+ # Route a bunch of keys and map them to backends.
+ for my $key (0 .. $keycount) {
+ print $ps "mg /test/$key\r\n";
+ my @readable = $s->can_read(0.25);
+ is(scalar @readable, 1, "only one backend became readable");
+ my $be = shift @readable;
+ for (0 .. 2) {
+ if ($be == $mbe[$_]) {
+ $keymap{$key} = $_;
+ }
+ }
+ is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough");
+ print $be "EN\r\n";
+ is(scalar <$ps>, "EN\r\n", "miss received");
+ }
}
# Test backend table arguments and per-backend time overrides
@@ -165,6 +184,88 @@ my @holdbe = (); # avoid having the backends immediately disconnect and pollute
is(scalar @readable, 0, "no new sockets");
}
+# Disconnect the existing sockets
+@mbe = ();
+@holdbe = ();
+@mocksrvs = ();
+$watcher = $p_srv->new_sock;
+# Reset the watcher and let logs die off.
+sleep 1;
+print $watcher "watch proxyevents\n";
+is(<$watcher>, "OK\r\n", "watcher enabled");
+
+{
+ # re-create the mock servers so we get clean connects, the previous
+ # backends could be reconnecting still.
+ for my $port (11514, 11515, 11516) {
+ my $srv = mock_server($port);
+ ok(defined $srv, "mock server created");
+ push(@mocksrvs, $srv);
+ }
+
+ write_modefile('return "noiothread"');
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ my $s = IO::Select->new();
+ for my $msrv (@mocksrvs) {
+ $s->add($msrv);
+ }
+ my @readable = $s->can_read(0.25);
+ # All three backends should become readable with new sockets.
+ is(scalar @readable, 3, "all listeners became readable");
+
+ my @bepile = ();
+ my $bes = IO::Select->new(); # selector just for the backend sockets.
+ # Each backend should create one socket per worker thread.
+ for my $msrv (@readable) {
+ my @temp = ();
+ for (0 .. 3) {
+ my $be = $msrv->accept();
+ ok(defined $be, "mock backend accepted");
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $bes->add($be);
+ push(@temp, $be);
+ }
+ for (0 .. 2) {
+ if ($mocksrvs[$_] == $msrv) {
+ $bepile[$_] = \@temp;
+ }
+ }
+ }
+
+ # clients round robin onto different worker threads, so we can test the
+ # key dist on different offsets.
+ my @cli = ();
+ for (0 .. 2) {
+ my $p = $p_srv->new_sock;
+
+ for my $key (0 .. $keycount) {
+ print $p "mg /test/$key\r\n";
+ @readable = $bes->can_read(0.25);
+ is(scalar @readable, 1, "only one backend became readable");
+ my $be = shift @readable;
+ # find which listener this be belongs to
+ for my $x (0 .. 2) {
+ for (@{$bepile[$x]}) {
+ if ($_ == $be) {
+ cmp_ok($x, '==', $keymap{$key}, "key routed to correct listener: " . $keymap{$key});
+ }
+ }
+ }
+
+ is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough");
+ print $be "EN\r\n";
+ is(scalar <$p>, "EN\r\n", "miss received");
+ }
+
+ # hold onto the sockets just in case.
+ push(@cli, $p);
+ }
+
+}
+
# TODO:
# remove backends
# do dead sockets close?
diff --git a/thread.c b/thread.c
index 04aa8f4..584ad8e 100644
--- a/thread.c
+++ b/thread.c
@@ -1091,6 +1091,7 @@ void memcached_thread_init(int nthreads, void *arg) {
#ifdef EXTSTORE
threads[i].storage = arg;
#endif
+ threads[i].thread_baseid = i;
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats_state.reserved_fds += 5;