diff options
-rw-r--r-- | memcached.c | 11 | ||||
-rw-r--r-- | memcached.h | 2 | ||||
-rw-r--r-- | proto_proxy.c | 149 | ||||
-rw-r--r-- | proto_proxy.h | 1 | ||||
-rw-r--r-- | proxy.h | 12 | ||||
-rw-r--r-- | proxy_await.c | 9 | ||||
-rw-r--r-- | proxy_config.c | 48 | ||||
-rw-r--r-- | proxy_lua.c | 147 | ||||
-rw-r--r-- | proxy_network.c | 150 | ||||
-rw-r--r-- | t/proxyconfig.lua | 13 | ||||
-rw-r--r-- | t/proxyconfig.t | 101 | ||||
-rw-r--r-- | thread.c | 1 |
12 files changed, 470 insertions, 174 deletions
diff --git a/memcached.c b/memcached.c index adaf162..70cc8e3 100644 --- a/memcached.c +++ b/memcached.c @@ -6053,9 +6053,6 @@ int main (int argc, char **argv) { #ifdef PROXY if (settings.proxy_enabled) { settings.proxy_ctx = proxy_init(settings.proxy_uring); - if (proxy_load_config(settings.proxy_ctx) != 0) { - exit(EXIT_FAILURE); - } } #endif #ifdef EXTSTORE @@ -6067,6 +6064,14 @@ int main (int argc, char **argv) { init_lru_crawler(NULL); #endif +#ifdef PROXY + if (settings.proxy_enabled) { + if (proxy_first_confload(settings.proxy_ctx) != 0) { + exit(EXIT_FAILURE); + } + } +#endif + if (start_assoc_maint && start_assoc_maintenance_thread() == -1) { exit(EXIT_FAILURE); } diff --git a/memcached.h b/memcached.h index 4790c75..79b9a81 100644 --- a/memcached.h +++ b/memcached.h @@ -703,6 +703,7 @@ typedef struct { int notify_send_fd; /* sending end of notify pipe */ #endif int cur_sfd; /* client fd for logging commands */ + int thread_baseid; /* which "number" thread this is for data offsets */ struct thread_stats stats; /* Stats generated by this thread */ io_queue_cb_t io_queues[IO_QUEUE_COUNT]; struct conn_queue *ev_queue; /* Worker/conn event queue */ @@ -723,6 +724,7 @@ typedef struct { void *proxy_hooks; void *proxy_user_stats; void *proxy_int_stats; + void *proxy_event_thread; // worker threads can also be proxy IO threads uint32_t proxy_rng[4]; // fast per-thread rng for lua. // TODO: add ctx object so we can attach to queue. #endif diff --git a/proto_proxy.c b/proto_proxy.c index 535800d..1acc920 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -129,63 +129,22 @@ void *proxy_init(bool use_uring) { // NOTE: might need to differentiate the libs yes? proxy_register_libs(ctx, NULL, L); - // Create/start the backend threads, which we need before servers + // Create/start the IO thread, which we need before servers // start getting created. - // Supporting N event threads should be possible, but it will be a - // low number of N to avoid too many wakeup syscalls. - // For now we hardcode to 1. - proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t)); - ctx->proxy_threads = threads; - for (int i = 0; i < 1; i++) { - proxy_event_thread_t *t = &threads[i]; - t->ctx = ctx; -#ifdef USE_EVENTFD - t->event_fd = eventfd(0, EFD_NONBLOCK); - if (t->event_fd == -1) { - perror("failed to create backend notify eventfd"); - exit(1); - } - t->be_event_fd = eventfd(0, EFD_NONBLOCK); - if (t->be_event_fd == -1) { - perror("failed to create backend notify eventfd"); - exit(1); - } -#else - int fds[2]; - if (pipe(fds)) { - perror("can't create proxy backend notify pipe"); - exit(1); - } - - t->notify_receive_fd = fds[0]; - t->notify_send_fd = fds[1]; - - if (pipe(fds)) { - perror("can't create proxy backend connection notify pipe"); - exit(1); - } - t->be_notify_receive_fd = fds[0]; - t->be_notify_send_fd = fds[1]; -#endif - proxy_init_evthread_events(t); - - // incoming request queue. - STAILQ_INIT(&t->io_head_in); - STAILQ_INIT(&t->beconn_head_in); - pthread_mutex_init(&t->mutex, NULL); - pthread_cond_init(&t->cond, NULL); + proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t)); + ctx->proxy_io_thread = t; + proxy_init_event_thread(t, ctx, NULL); #ifdef HAVE_LIBURING - if (t->use_uring) { - pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); - } else { - pthread_create(&t->thread_id, NULL, proxy_event_thread, t); - } -#else + if (t->use_uring) { + pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); + } else { pthread_create(&t->thread_id, NULL, proxy_event_thread, t); -#endif // HAVE_LIBURING - thread_setname(t->thread_id, "mc-prx-io"); } +#else + pthread_create(&t->thread_id, NULL, proxy_event_thread, t); +#endif // HAVE_LIBURING + thread_setname(t->thread_id, "mc-prx-io"); _start_proxy_config_threads(ctx); return ctx; @@ -218,18 +177,20 @@ void proxy_thread_init(void *ctx, LIBEVENT_THREAD *thr) { thr->proxy_rng[x] = rand(); } - // kick off the configuration. - if (proxy_thread_loadconf(ctx, thr) != 0) { - exit(EXIT_FAILURE); - } + // Create a proxy event thread structure to piggyback on the worker. + proxy_event_thread_t *t = calloc(1, sizeof(proxy_event_thread_t)); + thr->proxy_event_thread = t; + proxy_init_event_thread(t, ctx, thr->base); } // ctx_stack is a stack of io_pending_proxy_t's. void proxy_submit_cb(io_queue_t *q) { - proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads; + proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_io_thread; io_pending_proxy_t *p = q->stack_ctx; io_head_t head; + be_head_t w_head; // worker local stack. STAILQ_INIT(&head); + STAILQ_INIT(&w_head); // NOTE: responses get returned in the correct order no matter what, since // mc_resp's are linked. @@ -243,11 +204,21 @@ void proxy_submit_cb(io_queue_t *q) { // So for now we build the secondary list with an STAILQ, which // can be transplanted/etc. while (p) { - // insert into tail so head is oldest request. - STAILQ_INSERT_TAIL(&head, p, io_next); + mcp_backend_t *be; + P_DEBUG("%s: queueing req for backend: %p\n", __func__, (void *)p); if (p->is_await) { // need to not count await objects multiple times. - if (p->await_first) { + if (p->await_background) { + P_DEBUG("%s: fast-returning await_background object: %p\n", __func__, (void *)p); + // intercept await backgrounds + // this call cannot recurse if we're on the worker thread, + // since the worker thread has to finish executing this + // function in order to pick up the returned IO. + q->count++; + return_io_pending((io_pending_t *)p); + p = p->next; + continue; + } else if (p->await_first) { q->count++; } // funny workaround: awaiting IOP's don't count toward @@ -256,6 +227,24 @@ void proxy_submit_cb(io_queue_t *q) { } else { q->count++; } + be = p->backend; + + if (be->use_io_thread) { + // insert into tail so head is oldest request. + STAILQ_INSERT_TAIL(&head, p, io_next); + } else { + // emulate some of handler_dequeue() + STAILQ_INSERT_TAIL(&be->io_head, p, io_next); + if (be->io_next == NULL) { + be->io_next = p; + } + be->depth++; + if (!be->stacked) { + be->stacked = true; + be->be_next.stqe_next = NULL; // paranoia + STAILQ_INSERT_TAIL(&w_head, be, be_next); + } + } p = p->next; } @@ -263,26 +252,34 @@ void proxy_submit_cb(io_queue_t *q) { // clear out the submit queue so we can re-queue new IO's inline. q->stack_ctx = NULL; - // Transfer request stack to event thread. - pthread_mutex_lock(&e->mutex); - STAILQ_CONCAT(&e->io_head_in, &head); - // No point in holding the lock since we're not doing a cond signal. - pthread_mutex_unlock(&e->mutex); + if (!STAILQ_EMPTY(&head)) { + P_DEBUG("%s: submitting queue to IO thread\n", __func__); + // Transfer request stack to event thread. + pthread_mutex_lock(&e->mutex); + STAILQ_CONCAT(&e->io_head_in, &head); + // No point in holding the lock since we're not doing a cond signal. + pthread_mutex_unlock(&e->mutex); - // Signal to check queue. + // Signal to check queue. #ifdef USE_EVENTFD - uint64_t u = 1; - // TODO (v2): check result? is it ever possible to get a short write/failure - // for an eventfd? - if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { - assert(1 == 0); - } + uint64_t u = 1; + // TODO (v2): check result? is it ever possible to get a short write/failure + // for an eventfd? + if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + assert(1 == 0); + } #else - if (write(e->notify_send_fd, "w", 1) <= 0) { - assert(1 == 0); - } + if (write(e->notify_send_fd, "w", 1) <= 0) { + assert(1 == 0); + } #endif + } + if (!STAILQ_EMPTY(&w_head)) { + P_DEBUG("%s: running inline worker queue\n", __func__); + // emulating proxy_event_handler + proxy_run_backend_queue(&w_head); + } return; } @@ -545,6 +542,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con if (cores == LUA_OK) { WSTAT_DECR(c->thread, proxy_req_active, 1); int type = lua_type(Lc, 1); + P_DEBUG("%s: coroutine completed. return type: %d\n", __func__, type); if (type == LUA_TUSERDATA) { mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response"); _set_noreply_mode(resp, r); @@ -581,6 +579,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con } else if (cores == LUA_YIELD) { int coro_ref = 0; int yield_type = lua_tointeger(Lc, -1); + P_DEBUG("%s: coroutine yielded. return type: %d\n", __func__, yield_type); assert(yield_type != 0); lua_pop(Lc, 1); diff --git a/proto_proxy.h b/proto_proxy.h index 0b3d240..c8608da 100644 --- a/proto_proxy.h +++ b/proto_proxy.h @@ -13,6 +13,7 @@ void *proxy_init(bool proxy_uring); // TODO: need better names or a better interface for these. can be confusing // to reason about the order. void proxy_start_reload(void *arg); +int proxy_first_confload(void *arg); int proxy_load_config(void *arg); void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr); @@ -196,7 +196,7 @@ typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t; typedef struct { lua_State *proxy_state; void *proxy_code; - proxy_event_thread_t *proxy_threads; + proxy_event_thread_t *proxy_io_thread; pthread_mutex_t config_lock; pthread_cond_t config_cond; pthread_t config_tid; @@ -209,6 +209,7 @@ typedef struct { bool worker_done; // signal variable for the worker lock/cond system. bool worker_failed; // covered by worker_lock as well. bool use_uring; // use IO_URING for backend connections. + bool loading; // bool indicating an active config load. struct proxy_global_stats global_stats; struct proxy_user_stats user_stats; struct proxy_tunables tunables; // NOTE: updates covered by stats_lock @@ -351,6 +352,7 @@ struct mcp_backend_s { bool can_write; // recently got a WANT_WRITE or are connecting. bool stacked; // if backend already queued for syscalls. bool bad; // timed out, marked as bad. + bool use_io_thread; // note if this backend is worker-local or not. struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes char name[MAX_NAMELEN+1]; char port[MAX_PORTLEN+1]; @@ -456,21 +458,25 @@ struct mcp_pool_s { proxy_ctx_t *ctx; // main context. STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator. char key_filter_conf[KEY_HASH_FILTER_MAX+1]; + char beprefix[MAX_LABELLEN+1]; // TODO: should probably be shorter. uint64_t hash_seed; // calculated from a string. int refcount; int phc_ref; int self_ref; // TODO (v2): double check that this is needed. int pool_size; + bool use_iothread; mcp_pool_be_t pool[]; }; typedef struct { mcp_pool_t *main; // ptr to original + mcp_pool_be_t *pool; // ptr to main->pool starting offset for owner thread. } mcp_pool_proxy_t; // networking interface -void proxy_init_evthread_events(proxy_event_thread_t *t); +void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base); void *proxy_event_thread(void *arg); +void proxy_run_backend_queue(be_head_t *head); // await interface enum mcp_await_e { @@ -509,7 +515,7 @@ int mcplib_open_dist_jump_hash(lua_State *L); int mcplib_open_dist_ring_hash(lua_State *L); int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c); -mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len); +mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len); void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p); int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len); void proxy_lua_error(lua_State *L, const char *s); diff --git a/proxy_await.c b/proxy_await.c index b0a4dee..d5e40d3 100644 --- a/proxy_await.c +++ b/proxy_await.c @@ -259,7 +259,11 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) { const char *key = MCP_PARSER_KEY(rq->pr); size_t len = rq->pr.klen; int n = 0; - bool await_first = true; + // TODO (v3) await_first is used as a marker for upping the "wait for + // IO's" queue count, which means we need to force it off if we're in + // background mode, else we would accidentally wait for a response anyway. + // This note is for finding a less convoluted method for this. + bool await_first = (aw->type == AWAIT_BACKGROUND) ? false : true; // loop arg table and run each hash selector lua_pushnil(L); // -> 3 while (lua_next(L, 1) != 0) { @@ -269,11 +273,10 @@ int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) { if (pp == NULL) { proxy_lua_error(L, "mcp.await must be supplied with a pool"); } - mcp_pool_t *p = pp->main; // NOTE: rq->be is only held to help pass the backend into the IOP in // mcp_queue call. Could be a local variable and an argument too. - rq->be = mcplib_pool_proxy_call_helper(L, p, key, len); + rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len); mcp_queue_await_io(c, L, rq, await_ref, await_first); await_first = false; diff --git a/proxy_config.c b/proxy_config.c index 6bb54be..e45711e 100644 --- a/proxy_config.c +++ b/proxy_config.c @@ -41,11 +41,40 @@ static const char * _load_helper(lua_State *L, void *data, size_t *size) { void proxy_start_reload(void *arg) { proxy_ctx_t *ctx = arg; if (pthread_mutex_trylock(&ctx->config_lock) == 0) { + ctx->loading = true; pthread_cond_signal(&ctx->config_cond); pthread_mutex_unlock(&ctx->config_lock); } } +int proxy_first_confload(void *arg) { + proxy_ctx_t *ctx = arg; + pthread_mutex_lock(&ctx->config_lock); + ctx->loading = true; + pthread_cond_signal(&ctx->config_cond); + pthread_mutex_unlock(&ctx->config_lock); + + while (1) { + bool stop = false; + pthread_mutex_lock(&ctx->config_lock); + if (!ctx->loading) { + stop = true; + } + pthread_mutex_unlock(&ctx->config_lock); + if (stop) + break; + } + int fails = 0; + STAT_L(ctx); + fails = ctx->global_stats.config_reload_fails; + STAT_UL(ctx); + if (fails) { + return -1; + } + + return 0; +} + // Manages a queue of inbound objects destined to be deallocated. static void *_proxy_manager_thread(void *arg) { proxy_ctx_t *ctx = arg; @@ -108,6 +137,7 @@ static void *_proxy_config_thread(void *arg) { logger_create(); pthread_mutex_lock(&ctx->config_lock); while (1) { + ctx->loading = false; pthread_cond_wait(&ctx->config_cond, &ctx->config_lock); LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start"); STAT_INCR(ctx, config_reloads, 1); @@ -233,7 +263,7 @@ int proxy_load_config(void *arg) { return 0; } -static int _copy_pool(lua_State *from, lua_State *to) { +static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) { // from, -3 should have he userdata. mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool"); size_t size = sizeof(mcp_pool_proxy_t); @@ -241,16 +271,22 @@ static int _copy_pool(lua_State *from, lua_State *to) { luaL_setmetatable(to, "mcp.pool_proxy"); pp->main = p; + if (p->use_iothread) { + pp->pool = p->pool; + } else { + // allow 0 indexing for backends when unique to each worker thread + pp->pool = &p->pool[thr->thread_baseid * p->pool_size]; + } pthread_mutex_lock(&p->lock); p->refcount++; pthread_mutex_unlock(&p->lock); return 0; } -static void _copy_config_table(lua_State *from, lua_State *to); +static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr); // (from, -1) is the source value // should end with (to, -1) being the new value. -static void _copy_config_table(lua_State *from, lua_State *to) { +static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) { int type = lua_type(from, -1); bool found = false; luaL_checkstack(from, 4, "configuration error: table recursion too deep"); @@ -266,7 +302,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) { if (lua_rawget(from, -2) != LUA_TNIL) { const char *name = lua_tostring(from, -1); if (strcmp(name, "mcp.pool") == 0) { - _copy_pool(from, to); + _copy_pool(from, to, thr); found = true; } } @@ -323,7 +359,7 @@ static void _copy_config_table(lua_State *from, lua_State *to) { // lua_settable(to, n) - n being the table // takes -2 key -1 value, pops both. // use lua_absindex(L, -1) and so to convert easier? - _copy_config_table(from, to); // push next value. + _copy_config_table(from, to, thr); // push next value. lua_settable(to, nt); lua_pop(from, 1); // drop value, keep key. } @@ -385,7 +421,7 @@ int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) { // If the setjump/longjump combos are compatible a pcall for from and // atpanic for to might work best, since the config VM is/should be long // running and worker VM's should be rotated. - _copy_config_table(ctx->proxy_state, L); + _copy_config_table(ctx->proxy_state, L, thr); // copied value is in front of route function, now call it. if (lua_pcall(L, 1, 1, 0) != LUA_OK) { diff --git a/proxy_lua.c b/proxy_lua.c index e6b50ae..aeaf1e5 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -113,7 +113,7 @@ static int mcplib_backend_wrap_gc(lua_State *L) { // Since we're running in the config thread it could just busy poll // until the connection was picked up. assert(be->transferred); - proxy_event_thread_t *e = ctx->proxy_threads; + proxy_event_thread_t *e = be->event_thread; pthread_mutex_lock(&e->mutex); STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); pthread_mutex_unlock(&e->mutex); @@ -282,11 +282,11 @@ static int mcplib_backend(lua_State *L) { return 1; // return be object. } +// Called with the cache label at top of the stack. static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_label_t *bel) { // first check our reference table to compare. // Note: The upvalue won't be found unless we're running from a function with it // set as an upvalue. - lua_pushlstring(L, bel->label, bel->llen); int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); if (ret != LUA_TNIL) { mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap"); @@ -306,7 +306,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_ return NULL; } -static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel) { +static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_label_t *bel, + proxy_event_thread_t *e) { // FIXME: remove global. proxy_ctx_t *ctx = settings.proxy_ctx; @@ -361,7 +362,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la STAT_UL(ctx); be->connect_flags = flags; - proxy_event_thread_t *e = ctx->proxy_threads; + be->event_thread = e; pthread_mutex_lock(&e->mutex); STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); pthread_mutex_unlock(&e->mutex); @@ -380,8 +381,8 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la } #endif + lua_pushvalue(L, 4); // push the label string back to the top. // Add this new backend connection to the object cache. - lua_pushlstring(L, bel->label, bel->llen); // put the label at the top for settable. lua_pushvalue(L, -2); // copy the backend reference to the top. // set our new backend wrapper object into the reference table. lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); @@ -514,43 +515,43 @@ static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) { // UD now popped from stack. } -// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) -static int mcplib_pool(lua_State *L) { - int argc = lua_gettop(L); - luaL_checktype(L, 1, LUA_TTABLE); - int n = luaL_len(L, 1); // get length of array table - - size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n; - mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); - // Zero the memory before use, so we can realibly use __gc to clean up - memset(p, 0, plen); - p->pool_size = n; - // TODO (v2): Nicer if this is fetched from mcp.default_key_hash - p->key_hasher = XXH3_64bits_withSeed; - pthread_mutex_init(&p->lock, NULL); - p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); - - luaL_setmetatable(L, "mcp.pool"); - - lua_pushvalue(L, -1); // dupe self for reference. - p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); - - // TODO (v2): move to after function check so we can find the right - // backend label to look up. +// in the proxy object, we can alias a ptr to the pool to where it needs to be +// based on worker number or io_thread right? +static void _mcplib_pool_make_be_loop(lua_State *L, mcp_pool_t *p, int offset, proxy_event_thread_t *t) { // remember lua arrays are 1 indexed. - for (int x = 1; x <= n; x++) { - mcp_pool_be_t *s = &p->pool[x-1]; + for (int x = 1; x <= p->pool_size; x++) { + mcp_pool_be_t *s = &p->pool[x-1 + (offset * p->pool_size)]; lua_geti(L, 1, x); // get next server into the stack. // If we bail here, the pool _gc() should handle releasing any backend // references we made so far. mcp_backend_label_t *bel = luaL_checkudata(L, -1, "mcp.backend"); // check label for pre-existing backend conn/wrapper + // TODO (v2): there're native ways of "from C make lua strings" + int toconcat = 1; + if (p->beprefix[0] != '\0') { + lua_pushstring(L, p->beprefix); + toconcat++; + } + if (p->use_iothread) { + lua_pushstring(L, ":io:"); + toconcat++; + } else { + lua_pushstring(L, ":w"); + lua_pushinteger(L, offset); + lua_pushstring(L, ":"); + toconcat += 3; + } + lua_pushlstring(L, bel->label, bel->llen); + lua_concat(L, toconcat); + + lua_pushvalue(L, -1); // copy the label string for the create method. mcp_backend_wrap_t *bew = _mcplib_backend_checkcache(L, bel); if (bew == NULL) { - bew = _mcplib_make_backendconn(L, bel); + bew = _mcplib_make_backendconn(L, bel, t); } s->be = bew->be; // unwrap the backend connection for direct ref. + bew->be->use_io_thread = p->use_iothread; // If found from cache or made above, the backend wrapper is on the // top of the stack, so we can now take its reference. @@ -559,11 +560,51 @@ static int mcplib_pool(lua_State *L) { s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. lua_pop(L, 1); // pop the mcp.backend label object. + lua_pop(L, 1); // drop extra label copy. + } +} + +// call with table of backends in 1 +static void _mcplib_pool_make_be(lua_State *L, mcp_pool_t *p) { + if (p->use_iothread) { + proxy_ctx_t *ctx = settings.proxy_ctx; + _mcplib_pool_make_be_loop(L, p, 0, ctx->proxy_io_thread); + } else { + // TODO (v3) globals. + for (int n = 0; n < settings.num_threads; n++) { + LIBEVENT_THREAD *t = get_worker_thread(n); + _mcplib_pool_make_be_loop(L, p, t->thread_baseid, t->proxy_event_thread); + } } +} + +// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) +static int mcplib_pool(lua_State *L) { + int argc = lua_gettop(L); + luaL_checktype(L, 1, LUA_TTABLE); + int n = luaL_len(L, 1); // get length of array table + int workers = settings.num_threads; // TODO (v3): globals usage. + + size_t plen = sizeof(mcp_pool_t) + (sizeof(mcp_pool_be_t) * n * workers); + mcp_pool_t *p = lua_newuserdatauv(L, plen, 0); + // Zero the memory before use, so we can realibly use __gc to clean up + memset(p, 0, plen); + p->pool_size = n; + p->use_iothread = true; + // TODO (v2): Nicer if this is fetched from mcp.default_key_hash + p->key_hasher = XXH3_64bits_withSeed; + pthread_mutex_init(&p->lock, NULL); + p->ctx = lua_touserdata(L, lua_upvalueindex(MCP_CONTEXT_UPVALUE)); + + luaL_setmetatable(L, "mcp.pool"); + + lua_pushvalue(L, -1); // dupe self for reference. + p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); // Allow passing an ignored nil as a second argument. Makes the lua easier int type = lua_type(L, 2); if (argc == 1 || type == LUA_TNIL) { + _mcplib_pool_make_be(L, p); lua_getglobal(L, "mcp"); // TODO (v2): decide on a mcp.default_dist and use that instead if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { @@ -580,6 +621,31 @@ static int mcplib_pool(lua_State *L) { // pool, then pass it along to the a constructor if necessary. luaL_checktype(L, 2, LUA_TTABLE); + if (lua_getfield(L, 2, "iothread") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TBOOLEAN); + int use_iothread = lua_toboolean(L, -1); + if (use_iothread) { + p->use_iothread = true; + } else { + p->use_iothread = false; + } + lua_pop(L, 1); // remove value. + } else { + lua_pop(L, 1); // pop the nil. + } + + if (lua_getfield(L, 2, "beprefix") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TSTRING); + size_t len = 0; + const char *bepfx = lua_tolstring(L, -1, &len); + memcpy(p->beprefix, bepfx, len); + p->beprefix[len+1] = '\0'; + lua_pop(L, 1); // pop beprefix string. + } else { + lua_pop(L, 1); // pop the nil. + } + _mcplib_pool_make_be(L, p); + // stack: backends, options, mcp.pool if (lua_getfield(L, 2, "dist") != LUA_TNIL) { // overriding the distribution function. @@ -587,6 +653,17 @@ static int mcplib_pool(lua_State *L) { lua_pop(L, 1); // remove the dist table from stack. } else { lua_pop(L, 1); // pop the nil. + + // use the default dist if not specified with an override table. + lua_getglobal(L, "mcp"); + // TODO (v2): decide on a mcp.default_dist and use that instead + if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) { + _mcplib_pool_dist(L, p); + lua_pop(L, 1); // pop "dist_jump_hash" value. + } else { + lua_pop(L, 1); + } + lua_pop(L, 1); // pop "mcp" } if (lua_getfield(L, 2, "filter") != LUA_TNIL) { @@ -666,7 +743,8 @@ static int mcplib_pool_proxy_gc(lua_State *L) { return 0; } -mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) { +mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_proxy_t *pp, const char *key, size_t len) { + mcp_pool_t *p = pp->main; if (p->key_filter) { key = p->key_filter(p->key_filter_conf, key, len, &len); P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key); @@ -682,7 +760,7 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const proxy_lua_error(L, "key dist hasher tried to use out of bounds index"); } - return p->pool[lookup].be; + return pp->pool[lookup].be; } // hashfunc(request) -> backend(request) @@ -690,7 +768,6 @@ mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const static int mcplib_pool_proxy_call(lua_State *L) { // internal args are the hash selector (self) mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); - mcp_pool_t *p = pp->main; // then request object. mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); @@ -701,7 +778,7 @@ static int mcplib_pool_proxy_call(lua_State *L) { } const char *key = MCP_PARSER_KEY(rq->pr); size_t len = rq->pr.klen; - rq->be = mcplib_pool_proxy_call_helper(L, p, key, len); + rq->be = mcplib_pool_proxy_call_helper(L, pp, key, len); // now yield request, pool up. lua_pushinteger(L, MCP_YIELD_POOL); diff --git a/proxy_network.c b/proxy_network.c index 239b0d4..0334971 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -133,17 +133,8 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { // paranoia about moving items between lists. io->io_next.stqe_next = NULL; - // Need to check on await's before looking at backends, in case it - // doesn't have one. - // Here we're letting an await resume without waiting on the network. - if (io->await_background) { - return_io_pending((io_pending_t *)io); - continue; - } - mcp_backend_t *be = io->backend; // So the backend can retrieve its event base. - be->event_thread = t; if (be->bad) { P_DEBUG("%s: fast failing request to bad backend\n", __func__); io->client_resp->status = MCMC_ERR; @@ -424,7 +415,6 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) { _cleanup_backend(be); } else { be->transferred = true; - be->event_thread = t; int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it @@ -802,14 +792,13 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { _cleanup_backend(be); } else { be->transferred = true; - be->event_thread = t; // assign the initial events to the backend, so we don't have to // constantly check if they were initialized yet elsewhere. // note these events will not fire until event_add() is called. int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); - event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); - event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); - event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->main_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); + event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->timeout_event, be->event_thread->base, -1, EV_TIMEOUT, proxy_backend_handler, be); if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it @@ -827,6 +816,44 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { } } +void proxy_run_backend_queue(be_head_t *head) { + mcp_backend_t *be; + STAILQ_FOREACH(be, head, be_next) { + be->stacked = false; + int flags = 0; + + if (be->bad) { + // flush queue if backend is still bad. + // TODO: duplicated from _reset_bad_backend() + io_pending_proxy_t *io = NULL; + while (!STAILQ_EMPTY(&be->io_head)) { + io = STAILQ_FIRST(&be->io_head); + STAILQ_REMOVE_HEAD(&be->io_head, io_next); + io->client_resp->status = MCMC_ERR; + be->depth--; + return_io_pending((io_pending_t *)io); + } + } else if (be->connecting || be->validating) { + P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); + } else { + flags = _flush_pending_write(be); + + if (flags == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + } else if (flags & EV_WRITE) { + // only get here because we need to kick off the write handler + _start_write_event(be); + } + + if (be->pending_read) { + _start_timeout_event(be); + } + + } + } +} + // event handler for executing backend requests static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; @@ -858,32 +885,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { } // Re-walk each backend and check set event as required. - mcp_backend_t *be = NULL; - - // FIXME (v2): _set_event() is buggy, see notes on function. - STAILQ_FOREACH(be, &t->be_head, be_next) { - be->stacked = false; - int flags = 0; - - if (be->connecting || be->validating) { - P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); - } else { - flags = _flush_pending_write(be); - - if (flags == -1) { - _reset_bad_backend(be, P_BE_FAIL_WRITING); - _backend_failed(be); - } else if (flags & EV_WRITE) { - // only get here because we need to kick off the write handler - _start_write_event(be); - } - - if (be->pending_read) { - _start_timeout_event(be); - } - } - } - + proxy_run_backend_queue(&t->be_head); } void *proxy_event_thread(void *arg) { @@ -1585,8 +1587,53 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { // TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple // event threads. -void proxy_init_evthread_events(proxy_event_thread_t *t) { +// TODO: this either needs a restructure or split into two funcs: +// 1) for the IO thread which creates its own ring/event base +// 2) for the worker thread which reuses the event base. +// io_uring will probably only work for the IO thread which makes further +// exceptions. +void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base) { + t->ctx = ctx; +#ifdef USE_EVENTFD + t->event_fd = eventfd(0, EFD_NONBLOCK); + if (t->event_fd == -1) { + perror("failed to create backend notify eventfd"); + exit(1); + } + t->be_event_fd = eventfd(0, EFD_NONBLOCK); + if (t->be_event_fd == -1) { + perror("failed to create backend notify eventfd"); + exit(1); + } +#else + int fds[2]; + if (pipe(fds)) { + perror("can't create proxy backend notify pipe"); + exit(1); + } + + t->notify_receive_fd = fds[0]; + t->notify_send_fd = fds[1]; + + if (pipe(fds)) { + perror("can't create proxy backend connection notify pipe"); + exit(1); + } + t->be_notify_receive_fd = fds[0]; + t->be_notify_send_fd = fds[1]; +#endif + + // incoming request queue. + STAILQ_INIT(&t->io_head_in); + STAILQ_INIT(&t->beconn_head_in); + pthread_mutex_init(&t->mutex, NULL); + pthread_cond_init(&t->cond, NULL); + + // initialize the event system. + #ifdef HAVE_LIBURING + fprintf(stderr, "Sorry, io_uring not supported right now\n"); + abort(); bool use_uring = t->ctx->use_uring; struct io_uring_params p = {0}; assert(t->event_fd); // uring only exists where eventfd also does. @@ -1646,14 +1693,19 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { } #endif - struct event_config *ev_config; - ev_config = event_config_new(); - event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); - t->base = event_base_new_with_config(ev_config); - event_config_free(ev_config); - if (! t->base) { - fprintf(stderr, "Can't allocate event base\n"); - exit(1); + if (base == NULL) { + struct event_config *ev_config; + ev_config = event_config_new(); + event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); + t->base = event_base_new_with_config(ev_config); + event_config_free(ev_config); + if (! t->base) { + fprintf(stderr, "Can't allocate event base\n"); + exit(1); + } + } else { + // reusing an event base from a worker thread. + t->base = base; } // listen for notifications. diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua index 5f9ad0e..dbe725d 100644 --- a/t/proxyconfig.lua +++ b/t/proxyconfig.lua @@ -3,6 +3,7 @@ local mode = dofile("/tmp/proxyconfigmode.lua") mcp.backend_read_timeout(4) +mcp.backend_connect_timeout(5) function mcp_config_pools(old) if mode == "none" then @@ -29,6 +30,15 @@ function mcp_config_pools(old) test = mcp.pool({b1, b2, b3}) } return pools + elseif mode == "noiothread" then + local b1 = mcp.backend('b1', '127.0.0.1', 11514) + local b2 = mcp.backend('b2', '127.0.0.1', 11515) + local b3 = mcp.backend('b3', '127.0.0.1', 11516) + + local pools = { + test = mcp.pool({b1, b2, b3}, { iothread = false }) + } + return pools end end @@ -42,5 +52,8 @@ function mcp_config_routes(zones) elseif mode == "start" or mode == "betable" then mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) + elseif mode == "noiothread" then + mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) + mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) end end diff --git a/t/proxyconfig.t b/t/proxyconfig.t index 1b19b8e..ebf8684 100644 --- a/t/proxyconfig.t +++ b/t/proxyconfig.t @@ -99,6 +99,9 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); } my @mbe = (); +# A map of where keys route to for worker IO tests later +my %keymap = (); +my $keycount = 100; { # set up server backend sockets. for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) { @@ -125,6 +128,22 @@ my @mbe = (); is(scalar <$be>, $cmd, "metaget passthrough"); print $be "EN\r\n"; is(scalar <$ps>, "EN\r\n", "miss received"); + + # Route a bunch of keys and map them to backends. + for my $key (0 .. $keycount) { + print $ps "mg /test/$key\r\n"; + my @readable = $s->can_read(0.25); + is(scalar @readable, 1, "only one backend became readable"); + my $be = shift @readable; + for (0 .. 2) { + if ($be == $mbe[$_]) { + $keymap{$key} = $_; + } + } + is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough"); + print $be "EN\r\n"; + is(scalar <$ps>, "EN\r\n", "miss received"); + } } # Test backend table arguments and per-backend time overrides @@ -165,6 +184,88 @@ my @holdbe = (); # avoid having the backends immediately disconnect and pollute is(scalar @readable, 0, "no new sockets"); } +# Disconnect the existing sockets +@mbe = (); +@holdbe = (); +@mocksrvs = (); +$watcher = $p_srv->new_sock; +# Reset the watcher and let logs die off. +sleep 1; +print $watcher "watch proxyevents\n"; +is(<$watcher>, "OK\r\n", "watcher enabled"); + +{ + # re-create the mock servers so we get clean connects, the previous + # backends could be reconnecting still. + for my $port (11514, 11515, 11516) { + my $srv = mock_server($port); + ok(defined $srv, "mock server created"); + push(@mocksrvs, $srv); + } + + write_modefile('return "noiothread"'); + $p_srv->reload(); + wait_reload($watcher); + + my $s = IO::Select->new(); + for my $msrv (@mocksrvs) { + $s->add($msrv); + } + my @readable = $s->can_read(0.25); + # All three backends should become readable with new sockets. + is(scalar @readable, 3, "all listeners became readable"); + + my @bepile = (); + my $bes = IO::Select->new(); # selector just for the backend sockets. + # Each backend should create one socket per worker thread. + for my $msrv (@readable) { + my @temp = (); + for (0 .. 3) { + my $be = $msrv->accept(); + ok(defined $be, "mock backend accepted"); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $bes->add($be); + push(@temp, $be); + } + for (0 .. 2) { + if ($mocksrvs[$_] == $msrv) { + $bepile[$_] = \@temp; + } + } + } + + # clients round robin onto different worker threads, so we can test the + # key dist on different offsets. + my @cli = (); + for (0 .. 2) { + my $p = $p_srv->new_sock; + + for my $key (0 .. $keycount) { + print $p "mg /test/$key\r\n"; + @readable = $bes->can_read(0.25); + is(scalar @readable, 1, "only one backend became readable"); + my $be = shift @readable; + # find which listener this be belongs to + for my $x (0 .. 2) { + for (@{$bepile[$x]}) { + if ($_ == $be) { + cmp_ok($x, '==', $keymap{$key}, "key routed to correct listener: " . $keymap{$key}); + } + } + } + + is(scalar <$be>, "mg /test/$key\r\n", "got mg passthrough"); + print $be "EN\r\n"; + is(scalar <$p>, "EN\r\n", "miss received"); + } + + # hold onto the sockets just in case. + push(@cli, $p); + } + +} + # TODO: # remove backends # do dead sockets close? @@ -1091,6 +1091,7 @@ void memcached_thread_init(int nthreads, void *arg) { #ifdef EXTSTORE threads[i].storage = arg; #endif + threads[i].thread_baseid = i; setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats_state.reserved_fds += 5; |