summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-18 15:19:09 -0800
committerdormando <dormando@rydia.net>2022-02-18 16:13:52 -0800
commit34e0359d4de223d8cde4166f7d10ae352d7ebfdf (patch)
tree041a57edfb4bb3b58aa23498681295cb71789ee5 /proto_proxy.c
parentd85c379d74d92f8e9bd7ccf1ca57520f485a24f0 (diff)
downloadmemcached-34e0359d4de223d8cde4166f7d10ae352d7ebfdf.tar.gz
proxy: pull chunks into individual c files
now's a good time to at least shove functional subsections of code into their own files. Some further work to clearly separate the API's will help but looks not too terrible. Big bonus is getting the backend handling code away from the frontend handling code, which should make it easier to follow.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c3932
1 files changed, 17 insertions, 3915 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 7afa587..8505868 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -8,462 +8,13 @@
* manually malloc'ed information gets freed properly.
*/
-#include "memcached.h"
-#include <string.h>
-#include <stdlib.h>
-#include <ctype.h>
+#include "proxy.h"
-#include <lua.h>
-#include <lualib.h>
-#include <lauxlib.h>
-
-#include "config.h"
-
-#if defined(__linux__)
-#define USE_EVENTFD 1
-#include <sys/eventfd.h>
-#endif
-
-#ifdef HAVE_LIBURING
-#include <liburing.h>
-#include <poll.h> // POLLOUT for liburing.
-#define PRING_QUEUE_SQ_ENTRIES 2048
-#define PRING_QUEUE_CQ_ENTRIES 16384
-#endif
-
-#include "proto_proxy.h"
-#include "proto_text.h"
-#include "queue.h"
-#define XXH_INLINE_ALL // modifier for xxh3's include below
-#include "xxhash.h"
-
-#ifdef PROXY_DEBUG
-#define P_DEBUG(...) \
- do { \
- fprintf(stderr, __VA_ARGS__); \
- } while (0)
-#else
-#define P_DEBUG(...)
-#endif
-
-#define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex);
-#define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex);
-#define WSTAT_INCR(c, stat, amount) { \
- pthread_mutex_lock(&c->thread->stats.mutex); \
- c->thread->stats.stat += amount; \
- pthread_mutex_unlock(&c->thread->stats.mutex); \
-}
-#define WSTAT_DECR(c, stat, amount) { \
- pthread_mutex_lock(&c->thread->stats.mutex); \
- c->thread->stats.stat -= amount; \
- pthread_mutex_unlock(&c->thread->stats.mutex); \
-}
-#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
-#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
-#define STAT_INCR(ctx, stat, amount) { \
- pthread_mutex_lock(&ctx->stats_lock); \
- ctx->global_stats.stat += amount; \
- pthread_mutex_unlock(&ctx->stats_lock); \
-}
-
-#define STAT_DECR(ctx, stat, amount) { \
- pthread_mutex_lock(&ctx->stats_lock); \
- ctx->global_stats.stat -= amount; \
- pthread_mutex_unlock(&ctx->stats_lock); \
-}
-
-// FIXME (v2): do include dir properly.
-#include "vendor/mcmc/mcmc.h"
-
-// Note: value created from thin air. Could be shorter.
-#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2
-
-#define ENDSTR "END\r\n"
-#define ENDLEN sizeof(ENDSTR)-1
-
-#define MCP_THREAD_UPVALUE 1
-#define MCP_ATTACH_UPVALUE 2
-#define MCP_BACKEND_UPVALUE 3
-
-// all possible commands.
-#define CMD_FIELDS \
- X(CMD_MG) \
- X(CMD_MS) \
- X(CMD_MD) \
- X(CMD_MN) \
- X(CMD_MA) \
- X(CMD_ME) \
- X(CMD_GET) \
- X(CMD_GAT) \
- X(CMD_SET) \
- X(CMD_ADD) \
- X(CMD_CAS) \
- X(CMD_GETS) \
- X(CMD_GATS) \
- X(CMD_INCR) \
- X(CMD_DECR) \
- X(CMD_TOUCH) \
- X(CMD_APPEND) \
- X(CMD_DELETE) \
- X(CMD_REPLACE) \
- X(CMD_PREPEND) \
- X(CMD_END_STORAGE) \
- X(CMD_QUIT) \
- X(CMD_STATS) \
- X(CMD_SLABS) \
- X(CMD_WATCH) \
- X(CMD_LRU) \
- X(CMD_VERSION) \
- X(CMD_SHUTDOWN) \
- X(CMD_EXTSTORE) \
- X(CMD_FLUSH_ALL) \
- X(CMD_VERBOSITY) \
- X(CMD_LRU_CRAWLER) \
- X(CMD_REFRESH_CERTS) \
- X(CMD_CACHE_MEMLIMIT)
-
-#define X(name) name,
-enum proxy_defines {
- P_OK = 0,
- CMD_FIELDS
- CMD_SIZE, // used to define array size for command hooks.
- CMD_ANY, // override _all_ commands
- CMD_ANY_STORAGE, // override commands specific to key storage.
- CMD_FINAL, // end cap for convenience.
-};
-#undef X
-
-// certain classes of ascii commands have similar parsing (ie;
-// get/gets/gat/gats). Use types so we don't have to test a ton of them.
-enum proxy_cmd_types {
- CMD_TYPE_GENERIC = 0,
- CMD_TYPE_GET, // get/gets/gat/gats
- CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
- CMD_TYPE_META, // m*'s.
-};
-
-enum proxy_be_failures {
- P_BE_FAIL_TIMEOUT = 0,
- P_BE_FAIL_DISCONNECTED,
- P_BE_FAIL_CONNECTING,
- P_BE_FAIL_WRITING,
- P_BE_FAIL_READING,
- P_BE_FAIL_PARSING,
-};
-
-const char *proxy_be_failure_text[] = {
- [P_BE_FAIL_TIMEOUT] = "timeout",
- [P_BE_FAIL_DISCONNECTED] = "disconnected",
- [P_BE_FAIL_CONNECTING] = "connecting",
- [P_BE_FAIL_WRITING] = "writing",
- [P_BE_FAIL_READING] = "reading",
- [P_BE_FAIL_PARSING] = "parsing",
- NULL
-};
-
-typedef struct _io_pending_proxy_t io_pending_proxy_t;
-typedef struct proxy_event_thread_s proxy_event_thread_t;
-
-#ifdef HAVE_LIBURING
-typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe);
-typedef struct {
- void *udata;
- proxy_event_cb cb;
- bool set; // NOTE: not sure if necessary if code structured properly
-} proxy_event_t;
-
-static struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
-static void *proxy_event_thread_ur(void *arg);
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
-#endif
-
-// Note: This ends up wasting a few counters, but simplifies the rest of the
-// process for handling internal worker stats.
-struct proxy_int_stats {
- uint64_t counters[CMD_FINAL];
-};
-
-struct proxy_user_stats {
- size_t num_stats; // number of stats, for sizing various arrays
- char **names; // not needed for worker threads
- uint64_t *counters; // array of counters.
-};
-
-struct proxy_global_stats {
- uint64_t config_reloads;
- uint64_t config_reload_fails;
- uint64_t backend_total;
- uint64_t backend_disconn; // backends with no connections
- uint64_t backend_requests; // reqs sent to backends
- uint64_t backend_responses; // responses received from backends
- uint64_t backend_errors; // errors from backends
- uint64_t backend_marked_bad; // backend set to autofail
- uint64_t backend_failed; // an error caused a backend reset
-};
-
-struct proxy_tunables {
- struct timeval connect;
- struct timeval retry; // wait time before retrying a dead backend
- struct timeval read;
-#ifdef HAVE_LIBURING
- struct __kernel_timespec connect_ur;
- struct __kernel_timespec retry_ur;
- struct __kernel_timespec read_ur;
-#endif // HAVE_LIBURING
- int backend_failure_limit;
- bool tcp_keepalive;
-};
-
-typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
-typedef struct {
- lua_State *proxy_state;
- void *proxy_code;
- proxy_event_thread_t *proxy_threads;
- pthread_mutex_t config_lock;
- pthread_cond_t config_cond;
- pthread_t config_tid;
- pthread_mutex_t worker_lock;
- pthread_cond_t worker_cond;
- pthread_t manager_tid; // deallocation management thread
- pthread_mutex_t manager_lock;
- pthread_cond_t manager_cond;
- pool_head_t manager_head; // stack for pool deallocation.
- bool worker_done; // signal variable for the worker lock/cond system.
- bool worker_failed; // covered by worker_lock as well.
- bool use_uring; // use IO_URING for backend connections.
- struct proxy_global_stats global_stats;
- struct proxy_user_stats user_stats;
- struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
- pthread_mutex_t stats_lock; // used for rare global counters
-} proxy_ctx_t;
-
-struct proxy_hook {
- int lua_ref;
- bool is_lua; // pull the lua reference and call it as a lua function.
-};
-
-// TODO (v2): some hash functions (crc?) might require initializers. If we run into
-// any the interface might need expanding.
-typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed);
-struct proxy_hash_func {
- key_hash_func func;
-};
-typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen);
-typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx);
-struct proxy_hash_caller {
- hash_selector_func selector_func;
- void *ctx;
-};
-
-enum mcp_backend_states {
- mcp_backend_read = 0, // waiting to read any response
- mcp_backend_parse, // have some buffered data to check
- mcp_backend_read_end, // looking for an "END" marker for GET
- mcp_backend_want_read, // read more data to complete command
- mcp_backend_next, // advance to the next IO
-};
-
-typedef struct mcp_backend_s mcp_backend_t;
-typedef struct mcp_request_s mcp_request_t;
-typedef struct mcp_parser_s mcp_parser_t;
-
-#define PARSER_MAX_TOKENS 24
-
-struct mcp_parser_meta_s {
- uint64_t flags;
-};
-
-// Note that we must use offsets into request for tokens,
-// as *request can change between parsing and later accessors.
-struct mcp_parser_s {
- const char *request;
- void *vbuf; // temporary buffer for holding value lengths.
- uint8_t command;
- uint8_t cmd_type; // command class.
- uint8_t ntokens;
- uint8_t keytoken; // because GAT. sigh. also cmds without a key.
- uint32_t parsed; // how far into the request we parsed already
- uint32_t reqlen; // full length of request buffer.
- int vlen;
- uint32_t klen; // length of key.
- uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
- bool has_space; // a space was found after the last byte parsed.
- union {
- struct mcp_parser_meta_s meta;
- } t;
-};
-
-#define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]])
-
-#define MAX_REQ_TOKENS 2
-struct mcp_request_s {
- mcp_parser_t pr; // non-lua-specific parser handling.
- struct timeval start; // time this object was created.
- mcp_backend_t *be; // backend handling this request.
- bool ascii_multiget; // ascii multiget mode. (hide errors/END)
- bool was_modified; // need to rewrite the request
- int tokent_ref; // reference to token table if modified.
- char request[];
-};
-
-typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
-#define MAX_IPLEN 45
-#define MAX_PORTLEN 6
-// TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
-// don't have a good temporary space and don't want to malloc/free on every
-// write. transmit() uses the stack but we can't do that for uring's use case.
-#if (IOV_MAX > 128)
-#define BE_IOV_MAX 128
-#else
-#define BE_IOV_MAX IOV_MAX
-#endif
-struct mcp_backend_s {
- char ip[MAX_IPLEN+1];
- char port[MAX_PORTLEN+1];
- double weight;
- int depth;
- int failed_count; // number of fails (timeouts) in a row
- pthread_mutex_t mutex; // covers stack.
- proxy_event_thread_t *event_thread; // event thread owning this backend.
- void *client; // mcmc client
- STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
- io_head_t io_head; // stack of requests.
- char *rbuf; // static allocated read buffer.
- struct event event; // libevent
-#ifdef HAVE_LIBURING
- proxy_event_t ur_rd_ev; // liburing.
- proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
- proxy_event_t ur_te_ev; // for timeout handling
-#endif
- enum mcp_backend_states state; // readback state machine
- int connect_flags; // flags to pass to mcmc_connect
- bool connecting; // in the process of an asynch connection.
- bool can_write; // recently got a WANT_WRITE or are connecting.
- bool stacked; // if backend already queued for syscalls.
- bool bad; // timed out, marked as bad.
- struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
-};
-typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
-
-struct proxy_event_thread_s {
- pthread_t thread_id;
- struct event_base *base;
- struct event notify_event; // listen event for the notify pipe/eventfd.
- struct event clock_event; // timer for updating event thread data.
-#ifdef HAVE_LIBURING
- struct io_uring ring;
- proxy_event_t ur_notify_event; // listen on eventfd.
- proxy_event_t ur_clock_event; // timer for updating event thread data.
- eventfd_t event_counter;
- bool use_uring;
-#endif
- pthread_mutex_t mutex; // covers stack.
- pthread_cond_t cond; // condition to wait on while stack drains.
- io_head_t io_head_in; // inbound requests to process.
- be_head_t be_head; // stack of backends for processing.
-#ifdef USE_EVENTFD
- int event_fd;
-#else
- int notify_receive_fd;
- int notify_send_fd;
-#endif
- proxy_ctx_t *ctx; // main context.
- struct proxy_tunables tunables; // periodically copied from main ctx
-};
-
-#define RESP_CMD_MAX 8
-typedef struct {
- mcmc_resp_t resp;
- struct timeval start; // start time inherited from paired request
- char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
- int status; // status code from mcmc_read()
- char *buf; // response line + potentially value.
- size_t blen; // total size of the value to read.
- int bread; // amount of bytes read into value so far.
-} mcp_resp_t;
-
-// re-cast an io_pending_t into this more descriptive structure.
-// the first few items _must_ match the original struct.
-struct _io_pending_proxy_t {
- int io_queue_type;
- LIBEVENT_THREAD *thread;
- conn *c;
- mc_resp *resp; // original struct ends here
-
- struct _io_pending_proxy_t *next; // stack for IO submission
- STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
- int coro_ref; // lua registry reference to the coroutine
- int mcpres_ref; // mcp.res reference used for await()
- lua_State *coro; // pointer directly to the coroutine
- mcp_backend_t *backend; // backend server to request from
- struct iovec iov[2]; // request string + tail buffer
- int iovcnt; // 1 or 2...
- unsigned int iovbytes; // total bytes in the iovec
- int await_ref; // lua reference if we were an await object
- mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
- bool flushed; // whether we've fully written this request to a backend.
- bool ascii_multiget; // passed on from mcp_r_t
- bool is_await; // are we an await object?
- bool await_first; // are we the main route for an await object?
-};
-
-// Note: does *be have to be a sub-struct? how stable are userdata pointers?
-// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
-// - says no.
-typedef struct {
- int ref; // luaL_ref reference.
- mcp_backend_t *be;
-} mcp_pool_be_t;
-
-#define KEY_HASH_FILTER_MAX 5
-typedef struct mcp_pool_s mcp_pool_t;
-struct mcp_pool_s {
- struct proxy_hash_caller phc;
- key_hash_filter_func key_filter;
- key_hash_func key_hasher;
- pthread_mutex_t lock; // protects refcount.
- proxy_ctx_t *ctx; // main context.
- STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
- char key_filter_conf[KEY_HASH_FILTER_MAX+1];
- uint64_t hash_seed; // calculated from a string.
- int refcount;
- int phc_ref;
- int self_ref; // TODO (v2): double check that this is needed.
- int pool_size;
- mcp_pool_be_t pool[];
-};
-
-typedef struct {
- mcp_pool_t *main; // ptr to original
-} mcp_pool_proxy_t;
-
-static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
#define PROCESS_MULTIGET true
#define PROCESS_NORMAL false
static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
-static size_t _process_request_next_key(mcp_parser_t *pr);
-static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen);
-static void dump_stack(lua_State *L);
static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
-static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen);
-static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
-static int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
-static int mcplib_await_return(io_pending_proxy_t *p);
-static void proxy_backend_handler(const int fd, const short which, void *arg);
-static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
-static void *proxy_event_thread(void *arg);
static void proxy_out_errstring(mc_resp *resp, const char *str);
-static int _flush_pending_write(mcp_backend_t *be);
-static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
-static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
-static int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
-static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
-
-static void proxy_lua_error(lua_State *L, const char *s);
-static void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
@@ -547,279 +98,6 @@ void process_proxy_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("cmd_replace", "%llu", (unsigned long long)istats.counters[CMD_REPLACE]);
}
-struct _dumpbuf {
- size_t size;
- size_t used;
- char *buf;
-};
-
-static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
- (void)L;
- struct _dumpbuf *db = ud;
- if (db->used + sz > db->size) {
- db->size *= 2;
- char *nb = realloc(db->buf, db->size);
- if (nb == NULL) {
- return -1;
- }
- db->buf = nb;
- }
- memcpy(db->buf + db->used, (const char *)p, sz);
- db->used += sz;
- return 0;
-}
-
-static const char * _load_helper(lua_State *L, void *data, size_t *size) {
- (void)L;
- struct _dumpbuf *db = data;
- if (db->used == 0) {
- *size = 0;
- return NULL;
- }
- *size = db->used;
- db->used = 0;
- return db->buf;
-}
-
-void proxy_start_reload(void *arg) {
- proxy_ctx_t *ctx = arg;
- if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
- pthread_cond_signal(&ctx->config_cond);
- pthread_mutex_unlock(&ctx->config_lock);
- }
-}
-
-// Manages a queue of inbound objects destined to be deallocated.
-static void *_proxy_manager_thread(void *arg) {
- proxy_ctx_t *ctx = arg;
- pool_head_t head;
-
- pthread_mutex_lock(&ctx->manager_lock);
- while (1) {
- STAILQ_INIT(&head);
- while (STAILQ_EMPTY(&ctx->manager_head)) {
- pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
- }
-
- // pull dealloc queue into local queue.
- STAILQ_CONCAT(&head, &ctx->manager_head);
- pthread_mutex_unlock(&ctx->manager_lock);
-
- // Config lock is required for using config VM.
- pthread_mutex_lock(&ctx->config_lock);
- lua_State *L = ctx->proxy_state;
- mcp_pool_t *p;
- STAILQ_FOREACH(p, &head, next) {
- // we let the pool object _gc() handle backend references.
-
- luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
- // need to... unref self.
- // NOTE: double check if we really need to self-reference.
- // this is a backup here to ensure the external refcounts hit zero
- // before lua garbage collects the object. other things hold a
- // reference to the object though.
- luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref);
- }
- pthread_mutex_unlock(&ctx->config_lock);
-
- // done.
- pthread_mutex_lock(&ctx->manager_lock);
- }
-
- return NULL;
-}
-
-// Thread handling the configuration reload sequence.
-// TODO (v2): get a logger instance.
-// TODO (v2): making this "safer" will require a few phases of work.
-// 1) JFDI
-// 2) "test VM" -> from config thread, test the worker reload portion.
-// 3) "unit testing" -> from same temporary worker VM, execute set of
-// integration tests that must pass.
-// 4) run update on each worker, collecting new mcp.attach() hooks.
-// Once every worker has successfully executed and set new hooks, roll
-// through a _second_ time to actually swap the hook structures and unref
-// the old structures where marked dirty.
-static void *_proxy_config_thread(void *arg) {
- proxy_ctx_t *ctx = arg;
-
- logger_create();
- pthread_mutex_lock(&ctx->config_lock);
- while (1) {
- pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
- LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
- STAT_INCR(ctx, config_reloads, 1);
- lua_State *L = ctx->proxy_state;
- lua_settop(L, 0); // clear off any crud that could have been left on the stack.
-
- // The main stages of config reload are:
- // - load and execute the config file
- // - run mcp_config_pools()
- // - for each worker:
- // - copy and execute new lua code
- // - copy selector table
- // - run mcp_config_routes()
-
- if (proxy_load_config(ctx) != 0) {
- // Failed to load. log and wait for a retry.
- STAT_INCR(ctx, config_reload_fails, 1);
- LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
- continue;
- }
-
- // TODO (v2): create a temporary VM to test-load the worker code into.
- // failing to load partway through the worker VM reloads can be
- // critically bad if we're not careful about references.
- // IE: the config VM _must_ hold references to selectors and backends
- // as long as they exist in any worker for any reason.
-
- for (int x = 0; x < settings.num_threads; x++) {
- LIBEVENT_THREAD *thr = get_worker_thread(x);
-
- pthread_mutex_lock(&ctx->worker_lock);
- ctx->worker_done = false;
- ctx->worker_failed = false;
- proxy_reload_notify(thr);
- while (!ctx->worker_done) {
- // in case of spurious wakeup.
- pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
- }
- pthread_mutex_unlock(&ctx->worker_lock);
-
- // Code load bailed.
- if (ctx->worker_failed) {
- STAT_INCR(ctx, config_reload_fails, 1);
- LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
- continue;
- }
- }
- LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
- }
-
- return NULL;
-}
-
-static int _start_proxy_config_threads(proxy_ctx_t *ctx) {
- int ret;
-
- pthread_mutex_lock(&ctx->config_lock);
- if ((ret = pthread_create(&ctx->config_tid, NULL,
- _proxy_config_thread, ctx)) != 0) {
- fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
- strerror(ret));
- pthread_mutex_unlock(&ctx->config_lock);
- return -1;
- }
- pthread_mutex_unlock(&ctx->config_lock);
-
- pthread_mutex_lock(&ctx->manager_lock);
- if ((ret = pthread_create(&ctx->manager_tid, NULL,
- _proxy_manager_thread, ctx)) != 0) {
- fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
- strerror(ret));
- pthread_mutex_unlock(&ctx->manager_lock);
- return -1;
- }
- pthread_mutex_unlock(&ctx->manager_lock);
-
- return 0;
-}
-
-// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
-// event threads.
-static void _proxy_init_evthread_events(proxy_event_thread_t *t) {
-#ifdef HAVE_LIBURING
- bool use_uring = t->ctx->use_uring;
- struct io_uring_params p = {0};
- assert(t->event_fd); // uring only exists where eventfd also does.
-
- // Setup the CQSIZE to be much larger than SQ size, since backpressure
- // issues can cause us to block on SQ submissions and as a network server,
- // stuff happens.
-
- if (use_uring) {
- p.flags = IORING_SETUP_CQSIZE;
- p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
- int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
- if (ret) {
- perror("io_uring_queue_init_params");
- exit(1);
- }
- if (!(p.features & IORING_FEAT_NODROP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_FAST_POLL)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
- use_uring = false;
- }
-
- if (use_uring) {
- // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
- // chicken and egg in here. need a better structure... in meantime
- // re-create the event_fd.
- close(t->event_fd);
- t->event_fd = eventfd(0, 0);
- // FIXME (v2): hack for event init.
- t->ur_notify_event.set = false;
- _proxy_evthr_evset_notifier(t);
-
- // periodic data updater for event thread
- t->ur_clock_event.cb = proxy_event_updater_ur;
- t->ur_clock_event.udata = t;
- t->ur_clock_event.set = false;
- _proxy_evthr_evset_clock(t);
-
- t->use_uring = true;
- return;
- } else {
- // Decided to not use io_uring, so don't waste memory.
- t->use_uring = false;
- io_uring_queue_exit(&t->ring);
- }
- } else {
- t->use_uring = false;
- }
-#endif
-
- struct event_config *ev_config;
- ev_config = event_config_new();
- event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
- t->base = event_base_new_with_config(ev_config);
- event_config_free(ev_config);
- if (! t->base) {
- fprintf(stderr, "Can't allocate event base\n");
- exit(1);
- }
-
- // listen for notifications.
- // NULL was thread_libevent_process
- // FIXME (v2): use modern format? (event_assign)
-#ifdef USE_EVENTFD
- event_set(&t->notify_event, t->event_fd,
- EV_READ | EV_PERSIST, proxy_event_handler, t);
-#else
- event_set(&t->notify_event, t->notify_receive_fd,
- EV_READ | EV_PERSIST, proxy_event_handler, t);
-#endif
-
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
- event_base_set(t->base, &t->notify_event);
- if (event_add(&t->notify_event, 0) == -1) {
- fprintf(stderr, "Can't monitor libevent notify pipe\n");
- exit(1);
- }
-
-}
-
// start the centralized lua state and config thread.
// TODO (v2): return ctx ptr. avoid global vars.
void proxy_init(bool use_uring) {
@@ -880,7 +158,7 @@ void proxy_init(bool use_uring) {
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
#endif
- _proxy_init_evthread_events(t);
+ proxy_init_evthread_events(t);
// incoming request queue.
STAILQ_INIT(&t->io_head_in);
@@ -903,242 +181,6 @@ void proxy_init(bool use_uring) {
_start_proxy_config_threads(ctx);
}
-int proxy_load_config(void *arg) {
- proxy_ctx_t *ctx = arg;
- lua_State *L = ctx->proxy_state;
- int res = luaL_loadfile(L, settings.proxy_startfile);
- if (res != LUA_OK) {
- fprintf(stderr, "ERROR: Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
- return -1;
- }
- // LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
-
- // Now we need to dump the compiled code into bytecode.
- // This will then get loaded into worker threads.
- struct _dumpbuf *db = malloc(sizeof(struct _dumpbuf));
- db->size = 16384;
- db->used = 0;
- db->buf = malloc(db->size);
- lua_dump(L, _dump_helper, db, 0);
- // 0 means no error.
- ctx->proxy_code = db;
-
- // now we complete the data load by calling the function.
- res = lua_pcall(L, 0, LUA_MULTRET, 0);
- if (res != LUA_OK) {
- fprintf(stderr, "ERROR: Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
- exit(EXIT_FAILURE);
- }
-
- // call the mcp_config_pools function to get the central backends.
- lua_getglobal(L, "mcp_config_pools");
-
- if (lua_isnil(L, -1)) {
- fprintf(stderr, "ERROR: Configuration file missing 'mcp_config_pools' function\n");
- exit(EXIT_FAILURE);
- }
- lua_pushnil(L); // no "old" config yet.
- if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
- fprintf(stderr, "ERROR: Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
- exit(EXIT_FAILURE);
- }
-
- // result is our main config.
- return 0;
-}
-
-static int _copy_pool(lua_State *from, lua_State *to) {
- // from, -3 should have he userdata.
- mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
- size_t size = sizeof(mcp_pool_proxy_t);
- mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
- luaL_setmetatable(to, "mcp.pool_proxy");
-
- pp->main = p;
- pthread_mutex_lock(&p->lock);
- p->refcount++;
- pthread_mutex_unlock(&p->lock);
- return 0;
-}
-
-static void _copy_config_table(lua_State *from, lua_State *to);
-// (from, -1) is the source value
-// should end with (to, -1) being the new value.
-static void _copy_config_table(lua_State *from, lua_State *to) {
- int type = lua_type(from, -1);
- bool found = false;
- luaL_checkstack(from, 4, "configuration error: table recursion too deep");
- luaL_checkstack(to, 4, "configuration error: table recursion too deep");
- switch (type) {
- case LUA_TNIL:
- lua_pushnil(to);
- break;
- case LUA_TUSERDATA:
- // see dump_stack() - check if it's something we handle.
- if (lua_getmetatable(from, -1) != 0) {
- lua_pushstring(from, "__name");
- if (lua_rawget(from, -2) != LUA_TNIL) {
- const char *name = lua_tostring(from, -1);
- if (strcmp(name, "mcp.pool") == 0) {
- _copy_pool(from, to);
- found = true;
- }
- }
- lua_pop(from, 2);
- }
- if (!found) {
- proxy_lua_ferror(from, "unhandled userdata type in configuration table\n");
- }
- break;
- case LUA_TNUMBER:
- if (lua_isinteger(from, -1)) {
- lua_pushinteger(to, lua_tointeger(from, -1));
- } else {
- lua_pushnumber(to, lua_tonumber(from, -1));
- }
- break;
- case LUA_TSTRING:
- lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
- break;
- case LUA_TTABLE:
- // TODO (v2): copy the metatable first?
- // TODO (v2): size narr/nrec from old table and use createtable to
- // pre-allocate.
- lua_newtable(to); // throw new table on worker
- int t = lua_absindex(from, -1); // static index of table to copy.
- int nt = lua_absindex(to, -1); // static index of new table.
- lua_pushnil(from); // start iterator for main
- while (lua_next(from, t) != 0) {
- // (key, -2), (val, -1)
- int keytype = lua_type(from, -2);
- // to intentionally limit complexity and allow for future
- // optimizations we restrict what types may be used as keys
- // for sub-tables.
- switch (keytype) {
- case LUA_TSTRING:
- // to[l]string converts the actual key in the table
- // into a string, so we must not do that unless it
- // already is one.
- lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
- break;
- case LUA_TNUMBER:
- if (lua_isinteger(from, -1)) {
- lua_pushinteger(to, lua_tointeger(from, -1));
- } else {
- lua_pushnumber(to, lua_tonumber(from, -1));
- }
- break;
- default:
- proxy_lua_error(from, "configuration table keys must be strings or numbers");
- }
- // lua_settable(to, n) - n being the table
- // takes -2 key -1 value, pops both.
- // use lua_absindex(L, -1) and so to convert easier?
- _copy_config_table(from, to); // push next value.
- lua_settable(to, nt);
- lua_pop(from, 1); // drop value, keep key.
- }
- // top of from is now the original table.
- // top of to should be the new table.
- break;
- default:
- proxy_lua_error(from, "unhandled data type in configuration table\n");
- }
-}
-
-// Run from proxy worker to coordinate code reload.
-// config_lock must be held first.
-void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
- proxy_ctx_t *ctx = arg;
- pthread_mutex_lock(&ctx->worker_lock);
- if (proxy_thread_loadconf(thr) != 0) {
- ctx->worker_failed = true;
- }
- ctx->worker_done = true;
- pthread_cond_signal(&ctx->worker_cond);
- pthread_mutex_unlock(&ctx->worker_lock);
-}
-
-// FIXME (v2): need to test how to recover from an actual error here. error message
-// needs to go somewhere useful, counters added, etc.
-static int proxy_thread_loadconf(LIBEVENT_THREAD *thr) {
- lua_State *L = thr->L;
- // load the precompiled config function.
- proxy_ctx_t *ctx = settings.proxy_ctx;
- struct _dumpbuf *db = ctx->proxy_code;
- struct _dumpbuf db2; // copy because the helper modifies it.
- memcpy(&db2, db, sizeof(struct _dumpbuf));
-
- lua_load(L, _load_helper, &db2, "config", NULL);
- // LUA_OK + all errs from loadfile except LUA_ERRFILE.
- //dump_stack(L);
- // - pcall the func (which should load it)
- int res = lua_pcall(L, 0, LUA_MULTRET, 0);
- if (res != LUA_OK) {
- // FIXME (v2): don't exit here!
- fprintf(stderr, "Failed to load data into worker thread\n");
- return -1;
- }
-
- lua_getglobal(L, "mcp_config_routes");
- // create deepcopy of argument to pass into mcp_config_routes.
- // FIXME (v2): to avoid lua SIGABRT'ing on errors we need to protect the call
- // normal pattern:
- // lua_pushcfunction(L, &_copy_config_table);
- // lua_pushlightuserdata(L, &L2);
- // res = la_pcall(L, etc);
- // ... but since this is cross-VM we could get errors from not the
- // protected VM, breaking setjmp/etc.
- // for this part of the code we should override lua_atpanic(),
- // allowing us to specifically recover and bail.
- // However, again, this will require the next version of the config reload
- // code since we are re-using the VM's and a panic can leave us in a
- // broken state.
- // If the setjump/longjump combos are compatible a pcall for from and
- // atpanic for to might work best, since the config VM is/should be long
- // running and worker VM's should be rotated.
- _copy_config_table(ctx->proxy_state, L);
-
- // copied value is in front of route function, now call it.
- if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
- fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1));
- return -1;
- }
-
- // update user stats
- STAT_L(ctx);
- struct proxy_user_stats *us = &ctx->user_stats;
- struct proxy_user_stats *tus = NULL;
- if (us->num_stats != 0) {
- pthread_mutex_lock(&thr->stats.mutex);
- if (thr->proxy_user_stats == NULL) {
- tus = calloc(1, sizeof(struct proxy_user_stats));
- thr->proxy_user_stats = tus;
- } else {
- tus = thr->proxy_user_stats;
- }
-
- // originally this was a realloc routine but it felt fragile.
- // that might still be a better idea; still need to zero out the end.
- uint64_t *counters = calloc(us->num_stats, sizeof(uint64_t));
-
- // note that num_stats can _only_ grow in size.
- // we also only care about counters on the worker threads.
- if (tus->counters) {
- assert(tus->num_stats <= us->num_stats);
- memcpy(counters, tus->counters, tus->num_stats * sizeof(uint64_t));
- free(tus->counters);
- }
-
- tus->counters = counters;
- tus->num_stats = us->num_stats;
- pthread_mutex_unlock(&thr->stats.mutex);
- }
- STAT_UL(ctx);
-
- return 0;
-}
-
// Initialize the VM for an individual worker thread.
void proxy_thread_init(LIBEVENT_THREAD *thr) {
// Create the hook table.
@@ -1381,459 +423,22 @@ void complete_nread_proxy(conn *c) {
return;
}
-/******** NETWORKING AND INTERNAL FUNCTIONS ******/
-
-static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
- io_head_t head;
-
- STAILQ_INIT(&head);
- STAILQ_INIT(&t->be_head);
-
- // Pull the entire stack of inbound into local queue.
- pthread_mutex_lock(&t->mutex);
- STAILQ_CONCAT(&head, &t->io_head_in);
- pthread_mutex_unlock(&t->mutex);
-
- int io_count = 0;
- int be_count = 0;
- while (!STAILQ_EMPTY(&head)) {
- io_pending_proxy_t *io = STAILQ_FIRST(&head);
- io->flushed = false;
- mcp_backend_t *be = io->backend;
- // So the backend can retrieve its event base.
- be->event_thread = t;
-
- // _no_ mutex on backends. they are owned by the event thread.
- STAILQ_REMOVE_HEAD(&head, io_next);
- if (be->bad) {
- P_DEBUG("%s: fast failing request to bad backend\n", __func__);
- io->client_resp->status = MCMC_ERR;
- return_io_pending((io_pending_t *)io);
- continue;
- }
- STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
- be->depth++;
- io_count++;
- if (!be->stacked) {
- be->stacked = true;
- STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
- be_count++;
- }
- }
- //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count);
- return io_count;
-}
-
-#ifdef HAVE_LIBURING
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
-
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
- proxy_ctx_t *ctx = t->ctx;
-
- _proxy_evthr_evset_clock(t);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
-// No-op at the moment. when the linked timeout fires uring returns the
-// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
-// error. So we don't need to explicitly handle timeouts.
-// I'm leaving the structure in to simplify the callback routine.
-// Since timeouts rarely get called the extra code here shouldn't matter.
-static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- return;
-}
-
-static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
-}
-
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
- struct io_uring_sqe *sqe;
- if (be->ur_te_ev.set)
- return;
-
- be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
- be->ur_te_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // TODO (v2): NULL?
-
- io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
- be->ur_te_ev.set = true;
-}
-
-static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
- P_DEBUG("%s: marking backend as bad\n", __func__);
- be->bad = true;
- _proxy_evthr_evset_be_retry(be);
- } else {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur);
- }
-}
-
-// read handler.
-static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int bread = cqe->res;
- char *rbuf = NULL;
- size_t toread = 0;
- // Error or disconnection.
- if (bread <= 0) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- // NOTE: Not calling backed_failed here since if the backend is busted
- // it should be caught by the connect routine.
- // This is probably not _always_ true in practice. Leaving this note
- // so I can re-evaluate later.
- return;
- }
-
- int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
- P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
-
- if (res > 0) {
- _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur);
- } else if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- return;
- }
-
- // TODO (v2): when exactly do we need to reset the backend handler?
- if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
- }
-}
-
-static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int flags = 0;
-
- be->can_write = true;
- if (be->connecting) {
- int err = 0;
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- // kick the bad backend, clear the queue, retry later.
- // TODO (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
- return;
- }
- P_DEBUG("%s: backend connected\n", __func__);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
- }
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- return;
- }
-
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
- }
-
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
-}
-
-static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
-
- // liburing always uses eventfd for the notifier.
- // *cqe has our result.
- assert(cqe->res != -EINVAL);
- if (cqe->res != sizeof(eventfd_t)) {
- P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
- // FIXME (v2): figure out if this is impossible, and how to handle if not.
- assert(1 == 0);
- }
-
- // need to re-arm the listener every time.
- _proxy_evthr_evset_notifier(t);
-
- // TODO (v2): sqe queues for writing to backends
- // - _ur handler for backend write completion is to set a read event and
- // re-submit. ugh.
- // Should be possible to have standing reads, but flow is harder and lets
- // optimize that later. (ie; allow matching reads to a request but don't
- // actually dequeue anything until both read and write are confirmed)
- if (_proxy_event_handler_dequeue(t) == 0) {
- //P_DEBUG("%s: no IO's to complete\n", __func__);
- return;
- }
-
- // Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // TODO (v2): for each backend, queue writev's into sqe's
- // move the backend sqe bits into a write complete handler
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- flags |= EV_WRITE;
- } else {
- flags = _flush_pending_write(be);
- }
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- } else {
- // FIXME (v2): needs a re-write to handle sqe starvation.
- // FIXME (v2): can't actually set the read here? need to confirm _some_
- // write first?
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
- }
- if (flags & EV_READ) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
- }
- }
- }
-}
-
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
- P_DEBUG("%s: setting: %lu\n", __func__, len);
- struct io_uring_sqe *sqe;
- if (be->ur_rd_ev.set) {
- P_DEBUG("%s: already set\n", __func__);
- return;
- }
-
- be->ur_rd_ev.cb = proxy_backend_handler_ur;
- be->ur_rd_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
- assert(be->rbuf != NULL);
- io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
- io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
- be->ur_rd_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- // TODO (v2): we can pre-set the event data and avoid always re-doing it here.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_timeout(sqe, &updater_ts, 0, 0);
- io_uring_sqe_set_data(sqe, &t->ur_clock_event);
- t->ur_clock_event.set = true;
-}
-
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
- if (t->ur_notify_event.set)
- return;
-
- t->ur_notify_event.cb = proxy_event_handler_ur;
- t->ur_notify_event.udata = t;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
- io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
- io_uring_sqe_set_data(sqe, &t->ur_notify_event);
-}
-
-// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check
-// for space free in the sqe queue and submit in the middle of the cqe
-// foreach.
-// There might be better places to do this, but I think it's cleaner if
-// submission and cqe can stay in this function.
-// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are
-// waiting.
-// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
-// flight", because the former is much easier to work around (ie; only run the
-// backend handler after dequeuing everything else)
-// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
-// events slots. Therefore might starve SQE's if we were low beforehand.
-// - switch from for_each_cqe to doing one at a time (for now?)
-// - track # of sqe's allocated in the cqe loop.
-// - stop and submit if we've >= half the queue.
-// - ??? when can a CQE generate > 1 SQE?
-// - wrhandler_ur can set both wrpoll and read
-// - if CQE's can gen > 1 SQE at a time, we'll eventually starve.
-// - proper flow: CQE's can enqueue backends to be processed.
-// - after CQE's are processed, backends are processed (ouch?)
-// - if SQE's starve here, bail but keep the BE queue.
-// - then submit SQE's
-static void *proxy_event_thread_ur(void *arg) {
- proxy_event_thread_t *t = arg;
- struct io_uring_cqe *cqe;
-
- P_DEBUG("%s: starting\n", __func__);
-
- logger_create(); // TODO (v2): add logger to struct
- while (1) {
- P_DEBUG("%s: submit and wait\n", __func__);
- io_uring_submit_and_wait(&t->ring, 1);
- //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
-
- uint32_t head = 0;
- uint32_t count = 0;
-
- io_uring_for_each_cqe(&t->ring, head, cqe) {
- P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
-
- proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- pe->set = false;
- pe->cb(pe->udata, cqe);
-
- count++;
- }
-
- P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
- io_uring_cq_advance(&t->ring, count);
- }
-
- return NULL;
-}
-#endif // HAVE_LIBURING
-
-// We need to get timeout/retry/etc updates to the event thread(s)
-// occasionally. I'd like to have a better inteface around this where updates
-// are shipped directly; but this is good enough to start with.
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
- proxy_event_thread_t *t = arg;
- proxy_ctx_t *ctx = t->ctx;
-
- // TODO (v2): double check how much of this boilerplate is still necessary?
- // reschedule the clock event.
- evtimer_del(&t->clock_event);
-
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
-// event handler for executing backend requests
-static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
- proxy_event_thread_t *t = arg;
-
-#ifdef USE_EVENTFD
- uint64_t u;
- if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
- // Temporary error or wasn't actually ready to read somehow.
- return;
- }
-#else
- char buf[1];
- // TODO (v2): This is a lot more fatal than it should be. can it fail? can
- // it blow up the server?
- // TODO (v2): a cross-platform method of speeding this up would be nice. With
- // event fds we can queue N events and wakeup once here.
- // If we're pulling one byte out of the pipe at a time here it'll just
- // wake us up too often.
- // If the pipe is O_NONBLOCK then maybe just a larger read would work?
- if (read(fd, buf, 1) != 1) {
- P_DEBUG("%s: pipe read failed\n", __func__);
- return;
- }
-#endif
-
- if (_proxy_event_handler_dequeue(t) == 0) {
- //P_DEBUG("%s: no IO's to complete\n", __func__);
- return;
- }
-
- // Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
- struct timeval tmp_time = t->tunables.read;
-
- // FIXME (v2): _set_event() is buggy, see notes on function.
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
- int flags = 0;
-
- if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- } else {
- flags = _flush_pending_write(be);
- }
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- } else {
- flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
- _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
- }
- }
-
+// Simple error wrapper for common failures.
+// lua_error() is a jump so this function never returns
+// for clarity add a 'return' after calls to this.
+void proxy_lua_error(lua_State *L, const char *s) {
+ lua_pushstring(L, s);
+ lua_error(L);
}
-static void *proxy_event_thread(void *arg) {
- proxy_event_thread_t *t = arg;
-
- logger_create(); // TODO (v2): add logger ptr to structure
- event_base_loop(t->base, 0);
- event_base_free(t->base);
-
- // TODO (v2): join bt threads, free array.
-
- return NULL;
+void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ lua_pushfstring(L, fmt, ap);
+ va_end(ap);
+ lua_error(L);
}
-
// Need a custom function so we can prefix lua strings easily.
static void proxy_out_errstring(mc_resp *resp, const char *str) {
size_t len;
@@ -1865,46 +470,6 @@ static void proxy_out_errstring(mc_resp *resp, const char *str) {
return;
}
-// Simple error wrapper for common failures.
-// lua_error() is a jump so this function never returns
-// for clarity add a 'return' after calls to this.
-static void proxy_lua_error(lua_State *L, const char *s) {
- lua_pushstring(L, s);
- lua_error(L);
-}
-
-static void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- lua_pushfstring(L, fmt, ap);
- va_end(ap);
- lua_error(L);
-}
-
-// FIXME (v2): if we use the newer API the various pending checks can be adjusted.
-static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) {
- // FIXME (v2): chicken and egg.
- // can't check if pending if the structure is was calloc'ed (sigh)
- // don't want to double test here. should be able to event_assign but
- // not add anything during initialization, but need the owner thread's
- // event base.
- int pending = 0;
- if (event_initialized(&be->event)) {
- pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
- }
- if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
- event_del(&be->event); // replace existing event.
- }
-
- // if we can't write, we could be connecting.
- // TODO (v2): always check for READ in case some commands were sent
- // successfully? The flags could be tracked on *be and reset in the
- // handler, perhaps?
- event_assign(&be->event, base, mcmc_fd(be->client),
- flags, callback, be);
- event_add(&be->event, &t);
-}
-
// this resumes every yielded coroutine (and re-resumes if necessary).
// called from the worker thread after responses have been pulled from the
// network.
@@ -1916,7 +481,7 @@ static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, st
// again.
// - if LUA_OK finalize the response and return
// - else set error into mc_resp.
-static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c) {
+int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c) {
int nresults = 0;
int cores = lua_resume(Lc, NULL, 1, &nresults);
size_t rlen = 0;
@@ -2005,533 +570,6 @@ static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t
return 0;
}
-// NOTES:
-// - mcp_backend_read: grab req_stack_head, do things
-// read -> next, want_read -> next | read_end, etc.
-// issue: want read back to read_end as necessary. special state?
-// - it's fine: p->client_resp->type.
-// - mcp_backend_next: advance, consume, etc.
-// TODO (v2): second argument with enum for a specific error.
-// - probably just for logging. for app if any of these errors shouldn't
-// result in killing the request stack!
-static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) {
- bool stop = false;
- io_pending_proxy_t *p = NULL;
- mcmc_resp_t tmp_resp; // helper for testing for GET's END marker.
- int flags = 0;
-
- p = STAILQ_FIRST(&be->io_head);
- if (p == NULL) {
- // got a read event, but nothing was queued.
- // probably means a disconnect event.
- // TODO (v2): could probably confirm this by attempting to read the
- // socket, getsockopt, or something else simply for logging or
- // statistical purposes.
- // In this case we know it's going to be a close so error.
- flags = -1;
- P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
- return flags;
- }
-
- while (!stop) {
- mcp_resp_t *r;
- int res = 1;
- int remain = 0;
- char *newbuf = NULL;
-
- switch(be->state) {
- case mcp_backend_read:
- assert(p != NULL);
- P_DEBUG("%s: [read] bread: %d\n", __func__, bread);
-
- if (bread == 0) {
- // haven't actually done a read yet; figure out where/what.
- *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
- return EV_READ;
- } else {
- be->state = mcp_backend_parse;
- }
- break;
- case mcp_backend_parse:
- r = p->client_resp;
- r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp);
- // FIXME (v2): Don't like how this mcmc API ended up.
- bread = 0; // only add the bread once per loop.
- if (r->status != MCMC_OK) {
- P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
- if (r->status == MCMC_WANT_READ) {
- flags |= EV_READ;
- be->state = mcp_backend_read;
- stop = true;
- break;
- } else {
- flags = -1;
- stop = true;
- break;
- }
- }
-
- // we actually don't care about anything but the value length
- // TODO (v2): if vlen != vlen_read, pull an item and copy the data.
- int extra_space = 0;
- switch (r->resp.type) {
- case MCMC_RESP_GET:
- // We're in GET mode. we only support one key per
- // GET in the proxy backends, so we need to later check
- // for an END.
- extra_space = ENDLEN;
- break;
- case MCMC_RESP_END:
- // this is a MISS from a GET request
- // or final handler from a STAT request.
- assert(r->resp.vlen == 0);
- break;
- case MCMC_RESP_META:
- // we can handle meta responses easily since they're self
- // contained.
- break;
- case MCMC_RESP_GENERIC:
- case MCMC_RESP_NUMERIC:
- break;
- // TODO (v2): No-op response?
- default:
- P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
- // unhandled :(
- flags = -1;
- stop = true;
- break;
- }
-
- if (res) {
- if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
- // Ascii multiget hack mode; consume END's
- be->state = mcp_backend_next;
- break;
- }
-
- // r->resp.reslen + r->resp.vlen is the total length of the response.
- // TODO (v2): need to associate a buffer with this response...
- // for now lets abuse write_and_free on mc_resp and simply malloc the
- // space we need, stuffing it into the resp object.
-
- r->blen = r->resp.reslen + r->resp.vlen;
- r->buf = malloc(r->blen + extra_space);
- if (r->buf == NULL) {
- flags = -1; // TODO (v2): specific error.
- stop = true;
- break;
- }
-
- P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
- if (r->resp.vlen != r->resp.vlen_read) {
- P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
- // copy the partial and advance mcmc's buffer digestion.
- // FIXME (v2): should call this for both cases?
- // special function for advancing mcmc's buffer for
- // reading a value? perhaps a flag to skip the data copy
- // when it's unnecessary?
- memcpy(r->buf, be->rbuf, r->resp.reslen);
- r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread);
- be->state = mcp_backend_want_read;
- break;
- } else {
- // mcmc's already counted the value as read if it fit in
- // the original buffer...
- memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
- }
- } else {
- // TODO (v2): no response read?
- // nothing currently sets res to 0. should remove if that
- // never comes up and handle the error entirely above.
- P_DEBUG("%s: no response read from backend\n", __func__);
- flags = -1;
- stop = true;
- break;
- }
-
- if (r->resp.type == MCMC_RESP_GET) {
- // advance the buffer
- newbuf = mcmc_buffer_consume(be->client, &remain);
- if (remain != 0) {
- // TODO (v2): don't need to shuffle buffer with better API
- memmove(be->rbuf, newbuf, remain);
- }
-
- be->state = mcp_backend_read_end;
- } else {
- be->state = mcp_backend_next;
- }
-
- break;
- case mcp_backend_read_end:
- r = p->client_resp;
- // we need to ensure the next data in the stream is "END\r\n"
- // if not, the stack is desynced and we lose it.
-
- r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp);
- P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type);
- if (r->status != MCMC_OK) {
- if (r->status == MCMC_WANT_READ) {
- *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
- return EV_READ;
- } else {
- flags = -1; // TODO (v2): specific error.
- stop = true;
- }
- break;
- } else if (tmp_resp.type != MCMC_RESP_END) {
- // TODO (v2): specific error about protocol desync
- flags = -1;
- stop = true;
- break;
- } else {
- // response is good.
- // FIXME (v2): copy what the server actually sent?
- if (!p->ascii_multiget) {
- // sigh... if part of a multiget we need to eat the END
- // markers down here.
- memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
- r->blen += 5;
- }
- }
-
- be->state = mcp_backend_next;
-
- break;
- case mcp_backend_want_read:
- // Continuing a read from earlier
- r = p->client_resp;
- // take bread input and see if we're done reading the value,
- // else advance, set buffers, return next.
- if (bread > 0) {
- r->bread += bread;
- bread = 0;
- }
- P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
-
- if (r->bread >= r->resp.vlen) {
- // all done copying data.
- if (r->resp.type == MCMC_RESP_GET) {
- newbuf = mcmc_buffer_consume(be->client, &remain);
- // Shouldn't be anything in the buffer if we had to run to
- // want_read to read the value.
- assert(remain == 0);
- be->state = mcp_backend_read_end;
- } else {
- be->state = mcp_backend_next;
- }
- } else {
- // signal to caller to issue a read.
- *rbuf = r->buf+r->resp.reslen+r->bread;
- *toread = r->resp.vlen - r->bread;
- // need to retry later.
- flags |= EV_READ;
- stop = true;
- }
-
- break;
- case mcp_backend_next:
- // set the head here. when we break the head will be correct.
- STAILQ_REMOVE_HEAD(&be->io_head, io_next);
- be->depth--;
- // have to do the q->count-- and == 0 and redispatch_conn()
- // stuff here. The moment we call return_io here we
- // don't own *p anymore.
- return_io_pending((io_pending_t *)p);
-
- if (STAILQ_EMPTY(&be->io_head)) {
- // TODO (v2): suspicious of this code. audit harder?
- stop = true;
- } else {
- p = STAILQ_FIRST(&be->io_head);
- }
-
- // mcmc_buffer_consume() - if leftover, keep processing
- // IO's.
- // if no more data in buffer, need to re-set stack head and re-set
- // event.
- remain = 0;
- // TODO (v2): do we need to yield every N reads?
- newbuf = mcmc_buffer_consume(be->client, &remain);
- P_DEBUG("%s: [next] remain: %d\n", __func__, remain);
- be->state = mcp_backend_read;
- if (remain != 0) {
- // data trailing in the buffer, for a different request.
- memmove(be->rbuf, newbuf, remain);
- be->state = mcp_backend_parse;
- P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain);
- } else {
- // need to read more data, buffer is empty.
- stop = true;
- }
-
- break;
- default:
- // TODO (v2): at some point (after v1?) this should attempt to recover,
- // though we should only get here from memory corruption and
- // bailing may be the right thing to do.
- fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
- assert(false);
- } // switch
- } // while
-
- return flags;
-}
-
-// All we need to do here is schedule the backend to attempt to connect again.
-static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
- mcp_backend_t *be = arg;
- assert(which & EV_TIMEOUT);
- struct timeval tmp_time = be->event_thread->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
-}
-
-// currently just for timeouts, but certain errors should consider a backend
-// to be "bad" as well.
-// must be called before _reset_bad_backend(), so the backend is currently
-// clear.
-// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
-// connect failures. We need a specific connect() handler that executes a
-// "version" call to at least check that the backend isn't speaking garbage.
-// In theory backends can fail such that responses are constantly garbage,
-// but it's more likely an app is doing something bad and culling the backend
-// may prevent any other clients from talking to that backend. In
-// that case we need to track if clients are causing errors consistently and
-// block them instead. That's more challenging so leaving a note instead
-// of doing this now :)
-static void _backend_failed(mcp_backend_t *be) {
- struct timeval tmp_time = be->event_thread->tunables.retry;
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
- P_DEBUG("%s: marking backend as bad\n", __func__);
- be->bad = true;
- _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
- STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
- } else {
- STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
- }
-}
-
-// TODO (v2): add a second argument for assigning a specific error to all pending
-// IO's (ie; timeout).
-// The backend has gotten into a bad state (timed out, protocol desync, or
-// some other supposedly unrecoverable error: purge the queue and
-// cycle the socket.
-// Note that some types of errors may not require flushing the queue and
-// should be fixed as they're figured out.
-// _must_ be called from within the event thread.
-static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
- io_pending_proxy_t *io = NULL;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- // TODO (v2): Unsure if this is the best way of surfacing errors to lua,
- // but will do for V1.
- io->client_resp->status = MCMC_ERR;
- return_io_pending((io_pending_t *)io);
- }
-
- STAILQ_INIT(&be->io_head);
-
- mcmc_disconnect(be->client);
- int status = mcmc_connect(be->client, be->ip, be->port, be->connect_flags);
- if (status == MCMC_CONNECTED) {
- // TODO (v2): unexpected but lets let it be here.
- be->connecting = false;
- be->can_write = true;
- } else if (status == MCMC_CONNECTING) {
- be->connecting = true;
- be->can_write = false;
- } else {
- // TODO (v2): failed to immediately re-establish the connection.
- // need to put the BE into a bad/retry state.
- // FIXME (v2): until we get an event to specifically handle connecting and
- // bad server handling, attempt to force a reconnect here the next
- // time a request comes through.
- // The event thread will attempt to write to the backend, fail, then
- // end up in this routine again.
- be->connecting = false;
- be->can_write = true;
- }
-
- LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->ip, be->port);
-
- return 0;
-}
-
-static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
- struct iovec *iovs = be->write_iovs;
- io_pending_proxy_t *io = NULL;
- int iovused = 0;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- if (io->flushed)
- continue;
-
- if (io->iovcnt + iovused > BE_IOV_MAX) {
- // Signal to caller that we need to keep writing, if writeable.
- // FIXME (v2): can certainly refactor this to loop instead of waiting
- // for a writeable event.
- *tosend += 1;
- break;
- }
-
- memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
- iovused += io->iovcnt;
- *tosend += io->iovbytes;
- }
- return iovused;
-}
-
-static int _flush_pending_write(mcp_backend_t *be) {
- int flags = 0;
- unsigned int tosend = 0;
- int iovcnt = _prep_pending_write(be, &tosend);
-
- ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
- if (sent > 0) {
- io_pending_proxy_t *io = NULL;
- if (sent < tosend) {
- flags |= EV_WRITE;
- }
-
- STAILQ_FOREACH(io, &be->io_head, io_next) {
- bool flushed = true;
- if (io->flushed)
- continue;
-
- if (sent >= io->iovbytes) {
- // short circuit for common case.
- sent -= io->iovbytes;
- } else {
- for (int x = 0; x < io->iovcnt; x++) {
- struct iovec *iov = &io->iov[x];
- if (sent >= iov->iov_len) {
- sent -= iov->iov_len;
- iov->iov_len = 0;
- } else {
- iov->iov_len -= sent;
- sent = 0;
- flushed = false;
- break;
- }
- }
- }
- io->flushed = flushed;
-
- if (flushed) {
- flags |= EV_READ;
- }
- if (sent <= 0) {
- // really shouldn't be negative, though.
- assert(sent >= 0);
- break;
- }
- } // STAILQ_FOREACH
- } else if (sent == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- be->can_write = false;
- flags |= EV_WRITE;
- } else {
- flags = -1;
- }
- }
-
- return flags;
-}
-
-// The libevent backend callback handler.
-// If we end up resetting a backend, it will get put back into a connecting
-// state.
-static void proxy_backend_handler(const int fd, const short which, void *arg) {
- mcp_backend_t *be = arg;
- int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->tunables.read;
-
- if (which & EV_TIMEOUT) {
- P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
- _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
- _backend_failed(be);
- return;
- }
-
- if (which & EV_WRITE) {
- be->can_write = true;
- // TODO (v2): move connect routine to its own function?
- // - hard to do right now because we can't (easily?) edit libevent
- // events.
- if (be->connecting) {
- int err = 0;
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- // kick the bad backend, clear the queue, retry later.
- // FIXME (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
- return;
- }
- P_DEBUG("%s: backend connected\n", __func__);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
- }
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- return;
- }
- }
-
- if (which & EV_READ) {
- // We do the syscall here before diving into the state machine to allow a
- // common code path for io_uring/epoll
- int res = 1;
- int read = 0;
- while (res > 0) {
- char *rbuf = NULL;
- size_t toread = 0;
- // need to input how much was read since last call
- // needs _output_ of the buffer to read into and how much.
- res = proxy_backend_drive_machine(be, read, &rbuf, &toread);
- P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread);
-
- if (res > 0) {
- read = recv(mcmc_fd(be->client), rbuf, toread, 0);
- P_DEBUG("%s: read: %d\n", __func__, read);
- if (read == 0) {
- // not connected or error.
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- return;
- } else if (read == -1) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- break; // sit on epoll again.
- } else {
- _reset_bad_backend(be, P_BE_FAIL_READING);
- return;
- }
- }
- } else if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_PARSING);
- return;
- } else {
- break;
- }
- }
-
-#ifdef PROXY_DEBUG
- if (!STAILQ_EMPTY(&be->io_head)) {
- P_DEBUG("backend has leftover IOs: %d\n", be->depth);
- }
-#endif
- }
-
- // Still pending requests to read or write.
- if (!STAILQ_EMPTY(&be->io_head)) {
- flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event.
- _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
- }
-}
-
static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) {
assert(c != NULL);
LIBEVENT_THREAD *thr = c->thread;
@@ -2775,9 +813,8 @@ static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
return;
}
-/******** LUA INTERFACE FUNCTIONS ******/
-
-__attribute__((unused)) static void dump_stack(lua_State *L) {
+// Common lua debug command.
+__attribute__((unused)) void dump_stack(lua_State *L) {
int top = lua_gettop(L);
int i = 1;
fprintf(stderr, "--TOP OF STACK [%d]\n", top);
@@ -2802,1939 +839,4 @@ __attribute__((unused)) static void dump_stack(lua_State *L) {
fprintf(stderr, "-----------------\n");
}
-// func prototype example:
-// static int fname (lua_State *L)
-// normal library open:
-// int luaopen_mcp(lua_State *L) { }
-
-// resp:ok()
-static int mcplib_response_ok(lua_State *L) {
- mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
-
- if (r->status == MCMC_OK) {
- lua_pushboolean(L, 1);
- } else {
- lua_pushboolean(L, 0);
- }
-
- return 1;
-}
-
-static int mcplib_response_hit(lua_State *L) {
- mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
-
- if (r->status == MCMC_OK && r->resp.code != MCMC_CODE_MISS) {
- lua_pushboolean(L, 1);
- } else {
- lua_pushboolean(L, 0);
- }
-
- return 1;
-}
-
-static int mcplib_response_gc(lua_State *L) {
- mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
-
- // On error/similar we might be holding the read buffer.
- // If the buf is handed off to mc_resp for return, this pointer is NULL
- if (r->buf != NULL) {
- free(r->buf);
- }
-
- return 0;
-}
-
-// NOTE: backends are global objects owned by pool objects.
-// Each pool has a "proxy pool object" distributed to each worker VM.
-// proxy pool objects are held at the same time as any request exists on a
-// backend, in the coroutine stack during yield()
-// To free a backend: All proxies for a pool are collected, then the central
-// pool is collected, which releases backend references, which allows backend
-// to be collected.
-static int mcplib_backend_gc(lua_State *L) {
- mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
-
- assert(STAILQ_EMPTY(&be->io_head));
-
- mcmc_disconnect(be->client);
- free(be->client);
-
- // FIXME (v2): upvalue for global ctx.
- proxy_ctx_t *ctx = settings.proxy_ctx;
- STAT_DECR(ctx, backend_total, 1);
-
- return 0;
-}
-
-static int mcplib_backend(lua_State *L) {
- luaL_checkstring(L, -4); // label for indexing backends.
- const char *ip = luaL_checkstring(L, -3);
- const char *port = luaL_checkstring(L, -2);
- double weight = luaL_checknumber(L, -1);
- // FIXME (v2): upvalue for global ctx.
- proxy_ctx_t *ctx = settings.proxy_ctx;
-
- // first check our reference table to compare.
- lua_pushvalue(L, -4);
- int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
- if (ret != LUA_TNIL) {
- mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend");
- if (strncmp(be_orig->ip, ip, MAX_IPLEN) == 0
- && strncmp(be_orig->port, port, MAX_PORTLEN) == 0
- && be_orig->weight == weight) {
- // backend is the same, return it.
- return 1;
- } else {
- // backend not the same, pop from stack and make new one.
- lua_pop(L, 1);
- }
- } else {
- lua_pop(L, 1);
- }
-
- // This might shift to internal objects?
- mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0);
-
- // FIXME (v2): remove some of the excess zero'ing below?
- memset(be, 0, sizeof(mcp_backend_t));
- strncpy(be->ip, ip, MAX_IPLEN);
- strncpy(be->port, port, MAX_PORTLEN);
- be->weight = weight;
- be->depth = 0;
- be->rbuf = NULL;
- be->failed_count = 0;
- STAILQ_INIT(&be->io_head);
- be->state = mcp_backend_read;
- be->connecting = false;
- be->can_write = false;
- be->stacked = false;
- be->bad = false;
-
- // this leaves a permanent buffer on the backend, which is fine
- // unless you have billions of backends.
- // we can later optimize for pulling buffers from idle backends.
- be->rbuf = malloc(READ_BUFFER_SIZE);
- if (be->rbuf == NULL) {
- proxy_lua_error(L, "out of memory allocating backend");
- return 0;
- }
-
- // initialize libevent.
- memset(&be->event, 0, sizeof(be->event));
-
- // initialize the client
- be->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
- if (be->client == NULL) {
- proxy_lua_error(L, "out of memory allocating backend");
- return 0;
- }
- // TODO (v2): connect elsewhere. When there're multiple backend owners, or
- // sockets per backend, etc. We'll want to kick off connects as use time.
- // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
- // This is a trivial fix if we ensure a backend's owning event thread is
- // set before it can be used in the proxy, as it would have access to the
- // tunables structure. _reset_bad_backend() may not have its event thread
- // set 100% of the time and I don't want to introduce a crash right now,
- // so I'm writing this overly long comment. :)
- int flags = MCMC_OPTION_NONBLOCK;
- STAT_L(ctx);
- if (ctx->tunables.tcp_keepalive) {
- flags |= MCMC_OPTION_TCP_KEEPALIVE;
- }
- STAT_UL(ctx);
- be->connect_flags = flags;
- int status = mcmc_connect(be->client, be->ip, be->port, flags);
- if (status == MCMC_CONNECTED) {
- // FIXME (v2): is this possible? do we ever want to allow blocking
- // connections?
- proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->ip, be->port);
- return 0;
- } else if (status == MCMC_CONNECTING) {
- be->connecting = true;
- be->can_write = false;
- } else {
- proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->ip, be->port);
- return 0;
- }
-
- luaL_getmetatable(L, "mcp.backend");
- lua_setmetatable(L, -2); // set metatable to userdata.
-
- lua_pushvalue(L, 1); // put the label at the top for settable later.
- lua_pushvalue(L, -2); // copy the backend reference to the top.
- // set our new backend object into the reference table.
- lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
- // stack is back to having backend on the top.
-
- STAT_INCR(ctx, backend_total, 1);
-
- return 1;
-}
-
-static int mcplib_pool_gc(lua_State *L) {
- mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool");
- assert(p->refcount == 0);
- pthread_mutex_destroy(&p->lock);
-
- for (int x = 0; x < p->pool_size; x++) {
- if (p->pool[x].ref) {
- luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
- }
- }
-
- return 0;
-}
-
-// Looks for a short string in a key to separate which part gets hashed vs
-// sent to the backend node.
-// ie: "foo:bar|#|restofkey" - only "foo:bar" gets hashed.
-static const char *mcp_key_hash_filter_stop(const char *conf, const char *key, size_t klen, size_t *newlen) {
- char temp[KEY_MAX_LENGTH+1];
- *newlen = klen;
- if (klen > KEY_MAX_LENGTH) {
- // Hedging against potential bugs.
- return key;
- }
-
- memcpy(temp, key, klen);
- temp[klen+1] = '\0';
-
- // TODO (v2): memmem would avoid the temp key and memcpy here, but it's
- // not technically portable. An easy improvement would be to detect
- // memmem() in `configure` and only use strstr/copy as a fallback.
- // Since keys are short it's unlikely this would be a major performance
- // win.
- char *found = strstr(temp, conf);
-
- if (found) {
- *newlen = found - temp;
- }
-
- // hash stop can't change where keys start.
- return key;
-}
-
-// Takes a two character "tag", ie; "{}", or "$$", searches string for the
-// first then second character. Only hashes the portion within these tags.
-// *conf _must_ be two characters.
-static const char *mcp_key_hash_filter_tag(const char *conf, const char *key, size_t klen, size_t *newlen) {
- *newlen = klen;
-
- const char *t1 = memchr(key, conf[0], klen);
- if (t1) {
- size_t remain = klen - (t1 - key);
- // must be at least one character inbetween the tags to hash.
- if (remain > 1) {
- const char *t2 = memchr(t1, conf[1], remain);
-
- if (t2) {
- *newlen = t2 - t1 - 1;
- return t1+1;
- }
- }
- }
-
- return key;
-}
-
-static void _mcplib_pool_dist(lua_State *L, mcp_pool_t *p) {
- luaL_checktype(L, -1, LUA_TTABLE);
- if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
- proxy_lua_error(L, "key distribution object missing 'new' function");
- return;
- }
-
- // - now create the copy pool table
- lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
- for (int x = 1; x <= p->pool_size; x++) {
- mcp_backend_t *be = p->pool[x-1].be;
- lua_createtable(L, 0, 4);
- // stack = [p, h, f, optN, newpool, backend]
- // the key should be fine for id? maybe don't need to duplicate
- // this?
- lua_pushinteger(L, x);
- lua_setfield(L, -2, "id");
- // we don't use the hostname for ketama hashing
- // so passing ip for hostname is fine
- lua_pushstring(L, be->ip);
- // FIXME: hostname should probably work...
- lua_setfield(L, -2, "hostname");
- lua_pushstring(L, be->ip);
- lua_setfield(L, -2, "addr");
- lua_pushstring(L, be->port);
- lua_setfield(L, -2, "port");
- // TODO (v2): weight/etc?
-
- // set the backend table into the new pool table.
- lua_rawseti(L, -2, x);
- }
-
- // we can either use lua_insert() or possibly _rotate to shift
- // things into the right place, but simplest is to just copy the
- // option arg to the end of the stack.
- lua_pushvalue(L, 2);
- // - stack should be: pool, opts, func, pooltable, opts
-
- // call the dist new function.
- int res = lua_pcall(L, 2, 2, 0);
-
- if (res != LUA_OK) {
- lua_error(L); // error should be on the stack already.
- return;
- }
-
- // -1 is lightuserdata ptr to the struct (which must be owned by the
- // userdata), which is later used for internal calls.
- struct proxy_hash_caller *phc;
-
- luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
- luaL_checktype(L, -2, LUA_TUSERDATA);
- phc = lua_touserdata(L, -1);
- memcpy(&p->phc, phc, sizeof(*phc));
- lua_pop(L, 1);
- // -2 was userdata we need to hold a reference to
- p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- // UD now popped from stack.
-}
-
-// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
-static int mcplib_pool(lua_State *L) {
- int argc = lua_gettop(L);
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = luaL_len(L, 1); // get length of array table
-
- size_t plen = sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n;
- mcp_pool_t *p = lua_newuserdatauv(L, plen, 0);
- // Zero the memory before use, so we can realibly use __gc to clean up
- memset(p, 0, plen);
- p->pool_size = n;
- // TODO (v2): Nicer if this is fetched from mcp.default_key_hash
- p->key_hasher = XXH3_64bits_withSeed;
- pthread_mutex_init(&p->lock, NULL);
- p->ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue.
-
- luaL_setmetatable(L, "mcp.pool");
-
- lua_pushvalue(L, -1); // dupe self for reference.
- p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- // remember lua arrays are 1 indexed.
- for (int x = 1; x <= n; x++) {
- mcp_pool_be_t *s = &p->pool[x-1];
- lua_geti(L, 1, x); // get next server into the stack.
- // If we bail here, the pool _gc() should handle releasing any backend
- // references we made so far.
- s->be = luaL_checkudata(L, -1, "mcp.backend");
- s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
- }
-
- if (argc == 1) {
- lua_getglobal(L, "mcp");
- // TODO (v2): decide on a mcp.default_dist and use that instead
- if (lua_getfield(L, -1, "dist_jump_hash") != LUA_TNIL) {
- _mcplib_pool_dist(L, p);
- lua_pop(L, 1); // pop "dist_jump_hash" value.
- } else {
- lua_pop(L, 1);
- }
- lua_pop(L, 1); // pop "mcp"
- return 1;
- }
-
- // Supplied with an options table. We inspect this table to decorate the
- // pool, then pass it along to the a constructor if necessary.
- luaL_checktype(L, 2, LUA_TTABLE);
-
- // stack: backends, options, mcp.pool
- if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
- // overriding the distribution function.
- _mcplib_pool_dist(L, p);
- lua_pop(L, 1); // remove the dist table from stack.
- } else {
- lua_pop(L, 1); // pop the nil.
- }
-
- if (lua_getfield(L, 2, "filter") != LUA_TNIL) {
- luaL_checktype(L, -1, LUA_TSTRING);
- const char *f_type = lua_tostring(L, -1);
- if (strcmp(f_type, "stop") == 0) {
- p->key_filter = mcp_key_hash_filter_stop;
- } else if (strcmp(f_type, "tags") == 0) {
- p->key_filter = mcp_key_hash_filter_tag;
- } else {
- proxy_lua_ferror(L, "unknown hash filter specified: %s\n", f_type);
- }
-
- lua_pop(L, 1); // pops "filter" value.
-
- if (lua_getfield(L, 2, "filter_conf") == LUA_TSTRING) {
- size_t len = 0;
- const char *conf = lua_tolstring(L, -1, &len);
- if (len < 2 || len > KEY_HASH_FILTER_MAX) {
- proxy_lua_ferror(L, "hash filter conf must be between 2 and %d characters", KEY_HASH_FILTER_MAX);
- }
-
- memcpy(p->key_filter_conf, conf, len);
- p->key_filter_conf[len+1] = '\0';
- } else {
- proxy_lua_error(L, "hash filter requires 'filter_conf' string");
- }
- lua_pop(L, 1); // pops "filter_conf" value.
- } else {
- lua_pop(L, 1); // pop the nil.
- }
-
- if (lua_getfield(L, 2, "hash") != LUA_TNIL) {
- luaL_checktype(L, -1, LUA_TLIGHTUSERDATA);
- struct proxy_hash_func *phf = lua_touserdata(L, -1);
- p->key_hasher = phf->func;
- lua_pop(L, 1);
- } else {
- lua_pop(L, 1); // pop the nil.
- }
-
- if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
- luaL_checktype(L, -1, LUA_TSTRING);
- size_t seedlen;
- const char *seedstr = lua_tolstring(L, -1, &seedlen);
- // Note: the custom hasher for a dist may be "weird" in some cases, so
- // we use a standard hash method for the seed here.
- // I'm open to changing this (ie; mcp.pool_seed_hasher = etc)
- p->hash_seed = XXH3_64bits(seedstr, seedlen);
-
- lua_pop(L, 1);
- } else {
- lua_pop(L, 1); // pop the nil.
- }
-
- if (p->phc.selector_func == NULL) {
- proxy_lua_error(L, "cannot create pool missing 'dist' argument");
- }
-
- return 1;
-}
-
-static int mcplib_pool_proxy_gc(lua_State *L) {
- mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
- pthread_mutex_lock(&p->lock);
- p->refcount--;
- if (p->refcount == 0) {
- proxy_ctx_t *ctx = p->ctx;
- pthread_mutex_lock(&ctx->manager_lock);
- STAILQ_INSERT_TAIL(&ctx->manager_head, p, next);
- pthread_cond_signal(&ctx->manager_cond);
- pthread_mutex_unlock(&ctx->manager_lock);
- }
- pthread_mutex_unlock(&p->lock);
-
- return 0;
-}
-
-static mcp_backend_t *_mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len) {
- if (p->key_filter) {
- key = p->key_filter(p->key_filter_conf, key, len, &len);
- P_DEBUG("%s: filtered key for hashing (%.*s)\n", __func__, (int)len, key);
- }
- uint64_t hash = p->key_hasher(key, len, p->hash_seed);
- uint32_t lookup = p->phc.selector_func(hash, p->phc.ctx);
-
- assert(p->phc.ctx != NULL);
- // attach the backend to the request object.
- // the lua modules should "think" in 1 based indexes, so we need to
- // subtract one here.
- if (lookup >= p->pool_size) {
- proxy_lua_error(L, "key dist hasher tried to use out of bounds index");
- }
-
- return p->pool[lookup].be;
-}
-
-// hashfunc(request) -> backend(request)
-// needs key from request object.
-static int mcplib_pool_proxy_call(lua_State *L) {
- // internal args are the hash selector (self)
- mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
- mcp_pool_t *p = pp->main;
- // then request object.
- mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
-
- // we have a fast path to the key/length.
- if (!rq->pr.keytoken) {
- proxy_lua_error(L, "cannot route commands without key");
- return 0;
- }
- const char *key = MCP_PARSER_KEY(rq->pr);
- size_t len = rq->pr.klen;
- rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);
-
- // now yield request, pool up.
- return lua_yield(L, 2);
-}
-
-static int mcplib_tcp_keepalive(lua_State *L) {
- luaL_checktype(L, -1, LUA_TBOOLEAN);
- int state = lua_toboolean(L, -1);
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
-
- STAT_L(ctx);
- ctx->tunables.tcp_keepalive = state;
- STAT_UL(ctx);
-
- return 0;
-}
-
-static int mcplib_backend_failure_limit(lua_State *L) {
- int limit = luaL_checkinteger(L, -1);
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
-
- if (limit < 0) {
- proxy_lua_error(L, "backend_failure_limit must be >= 0");
- return 0;
- }
-
- STAT_L(ctx);
- ctx->tunables.backend_failure_limit = limit;
- STAT_UL(ctx);
-
- return 0;
-}
-
-// sad, I had to look this up...
-#define NANOSECONDS(x) ((x) * 1E9 + 0.5)
-#define MICROSECONDS(x) ((x) * 1E6 + 0.5)
-
-static int mcplib_backend_connect_timeout(lua_State *L) {
- lua_Number secondsf = luaL_checknumber(L, -1);
- lua_Integer secondsi = (lua_Integer) secondsf;
- lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
-
- STAT_L(ctx);
- ctx->tunables.connect.tv_sec = secondsi;
- ctx->tunables.connect.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.connect_ur.tv_sec = secondsi;
- ctx->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
- STAT_UL(ctx);
-
- return 0;
-}
-
-static int mcplib_backend_retry_timeout(lua_State *L) {
- lua_Number secondsf = luaL_checknumber(L, -1);
- lua_Integer secondsi = (lua_Integer) secondsf;
- lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
- STAT_L(ctx);
- ctx->tunables.retry.tv_sec = secondsi;
- ctx->tunables.retry.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.retry_ur.tv_sec = secondsi;
- ctx->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
- STAT_UL(ctx);
-
- return 0;
-}
-
-static int mcplib_backend_read_timeout(lua_State *L) {
- lua_Number secondsf = luaL_checknumber(L, -1);
- lua_Integer secondsi = (lua_Integer) secondsf;
- lua_Number subseconds = secondsf - secondsi;
- proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
-
- STAT_L(ctx);
- ctx->tunables.read.tv_sec = secondsi;
- ctx->tunables.read.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.read_ur.tv_sec = secondsi;
- ctx->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
- STAT_UL(ctx);
-
- return 0;
-}
-
-// mcp.attach(mcp.HOOK_NAME, function)
-// fill hook structure: if lua function, use luaL_ref() to store the func
-static int mcplib_attach(lua_State *L) {
- // Pull the original worker thread out of the shared mcplib upvalue.
- LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
-
- int hook = luaL_checkinteger(L, -2);
- // pushvalue to dupe func and etc.
- // can leave original func on stack afterward because it'll get cleared.
- int loop_end = 0;
- int loop_start = 1;
- if (hook == CMD_ANY) {
- // if CMD_ANY we need individually set loop 1 to CMD_SIZE.
- loop_end = CMD_SIZE;
- } else if (hook == CMD_ANY_STORAGE) {
- // if CMD_ANY_STORAGE we only override get/set/etc.
- loop_end = CMD_END_STORAGE;
- } else {
- loop_start = hook;
- loop_end = hook + 1;
- }
-
- if (lua_isfunction(L, -1)) {
- struct proxy_hook *hooks = t->proxy_hooks;
-
- for (int x = loop_start; x < loop_end; x++) {
- struct proxy_hook *h = &hooks[x];
- lua_pushvalue(L, -1); // duplicate the function for the ref.
- if (h->lua_ref) {
- // remove existing reference.
- luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref);
- }
-
- // pops the function from the stack and leaves us a ref. for later.
- h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- h->is_lua = true;
- }
- } else {
- proxy_lua_error(L, "Must pass a function to mcp.attach");
- return 0;
- }
-
- return 0;
-}
-
-/*** REQUEST PARSER AND OBJECT ***/
-
-#define PARSER_MAXLEN USHRT_MAX-1
-
-// Find the starting offsets of each token; ignoring length.
-// This creates a fast small (<= cacheline) index into the request,
-// where we later scan or directly feed data into API's.
-static int _process_tokenize(mcp_parser_t *pr, const size_t max) {
- const char *s = pr->request;
- int len = pr->reqlen - 2;
-
- // since multigets can be huge, we can't purely judge reqlen against this
- // limit, but we also can't index past it since the tokens are shorts.
- if (len > PARSER_MAXLEN) {
- len = PARSER_MAXLEN;
- }
- const char *end = s + len;
- int curtoken = 0;
-
- int state = 0;
- while (s != end) {
- switch (state) {
- case 0:
- if (*s != ' ') {
- pr->tokens[curtoken] = s - pr->request;
- if (++curtoken == max) {
- goto endloop;
- }
- state = 1;
- }
- s++;
- break;
- case 1:
- if (*s != ' ') {
- s++;
- } else {
- state = 0;
- }
- break;
- }
- }
-endloop:
-
- pr->ntokens = curtoken;
- P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken);
-
- return 0;
-}
-
-static int _process_token_len(mcp_parser_t *pr, size_t token) {
- const char *cur = pr->request + pr->tokens[token];
- int remain = pr->reqlen - pr->tokens[token] - 2; // CRLF
-
- const char *s = memchr(cur, ' ', remain);
- return (s != NULL) ? s - cur : remain;
-}
-
-static int _process_request_key(mcp_parser_t *pr) {
- pr->klen = _process_token_len(pr, pr->keytoken);
- // advance the parser in case of multikey.
- pr->parsed = pr->tokens[pr->keytoken] + pr->klen + 1;
-
- if (pr->request[pr->parsed-1] == ' ') {
- P_DEBUG("%s: request_key found extra space\n", __func__);
- pr->has_space = true;
- } else {
- pr->has_space = false;
- }
- return 0;
-}
-
-// Just for ascii multiget: search for next "key" beyond where we stopped
-// tokenizing before.
-// Returns the offset for the next key.
-static size_t _process_request_next_key(mcp_parser_t *pr) {
- const char *cur = pr->request + pr->parsed;
- int remain = pr->reqlen - pr->parsed - 2;
-
- // chew off any leading whitespace.
- while (remain) {
- if (*cur == ' ') {
- remain--;
- cur++;
- pr->parsed++;
- } else {
- break;
- }
- }
-
- const char *s = memchr(cur, ' ', remain);
- if (s != NULL) {
- pr->klen = s - cur;
- pr->parsed += s - cur;
- } else {
- pr->klen = remain;
- pr->parsed += remain;
- }
-
- return cur - pr->request;
-}
-
-// for fast testing of existence of meta flags.
-// meta has all flags as final tokens
-static int _process_request_metaflags(mcp_parser_t *pr, int token) {
- if (pr->ntokens <= token) {
- pr->t.meta.flags = 0; // no flags found.
- return 0;
- }
- const char *cur = pr->request + pr->tokens[token];
- const char *end = pr->request + pr->reqlen - 2;
-
- // We blindly convert flags into bits, since the range of possible
- // flags is deliberately < 64.
- int state = 0;
- while (cur != end) {
- switch (state) {
- case 0:
- if (*cur == ' ') {
- cur++;
- } else {
- if (*cur < 65 || *cur > 122) {
- return -1;
- }
- P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65);
- pr->t.meta.flags |= 1 << (*cur - 65);
- state = 1;
- }
- break;
- case 1:
- if (*cur != ' ') {
- cur++;
- } else {
- state = 0;
- }
- break;
- }
- }
-
- return 0;
-}
-
-// All meta commands are of form: "cm key f l a g S100"
-static int _process_request_meta(mcp_parser_t *pr) {
- _process_tokenize(pr, PARSER_MAX_TOKENS);
- if (pr->ntokens < 2) {
- P_DEBUG("%s: not enough tokens for meta command: %d\n", __func__, pr->ntokens);
- return -1;
- }
- pr->keytoken = 1;
- _process_request_key(pr);
-
- // pass the first flag token.
- return _process_request_metaflags(pr, 2);
-}
-
-// ms <key> <datalen> <flags>*\r\n
-static int _process_request_mset(mcp_parser_t *pr) {
- _process_tokenize(pr, PARSER_MAX_TOKENS);
- if (pr->ntokens < 3) {
- P_DEBUG("%s: not enough tokens for meta set command: %d\n", __func__, pr->ntokens);
- return -1;
- }
- pr->keytoken = 1;
- _process_request_key(pr);
-
- const char *cur = pr->request + pr->tokens[2];
-
- errno = 0;
- char *n = NULL;
- int vlen = strtol(cur, &n, 10);
- if ((errno == ERANGE) || (cur == n)) {
- return -1;
- }
-
- if (vlen < 0 || vlen > (INT_MAX - 2)) {
- return -1;
- }
- vlen += 2;
-
- pr->vlen = vlen;
-
- // pass the first flag token
- return _process_request_metaflags(pr, 3);
-}
-
-// gat[s] <exptime> <key>*\r\n
-static int _process_request_gat(mcp_parser_t *pr) {
- _process_tokenize(pr, 3);
- if (pr->ntokens < 3) {
- P_DEBUG("%s: not enough tokens for GAT: %d\n", __func__, pr->ntokens);
- return -1;
- }
-
- pr->keytoken = 2;
- _process_request_key(pr);
- return 0;
-}
-
-// we need t find the bytes supplied immediately so we can read the request
-// from the client properly.
-// set <key> <flags> <exptime> <bytes> [noreply]\r\n
-static int _process_request_storage(mcp_parser_t *pr, size_t max) {
- _process_tokenize(pr, max);
- if (pr->ntokens < 5) {
- P_DEBUG("%s: not enough tokens to storage command: %d\n", __func__, pr->ntokens);
- return -1;
- }
- pr->keytoken = 1;
- _process_request_key(pr);
-
- errno = 0;
- char *n = NULL;
- const char *cur = pr->request + pr->tokens[4];
-
- int vlen = strtol(cur, &n, 10);
- if ((errno == ERANGE) || (cur == n)) {
- return -1;
- }
-
- if (vlen < 0 || vlen > (INT_MAX - 2)) {
- return -1;
- }
- vlen += 2;
-
- pr->vlen = vlen;
-
- return 0;
-}
-
-// common request with key: <cmd> <key> <args>
-static int _process_request_simple(mcp_parser_t *pr, const size_t max) {
- _process_tokenize(pr, max);
- pr->keytoken = 1; // second token is usually the key... stupid GAT.
-
- _process_request_key(pr);
- return 0;
-}
-
-// TODO: return code ENUM with error types.
-// FIXME: the mcp_parser_t bits have ended up being more fragile than I hoped.
-// careful zero'ing is required. revisit?
-// I think this mostly refers to recursive work (maybe just multiget?)
-// Is a parser object run throgh process_request() twice, ever?
-static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) {
- // we want to "parse in place" as much as possible, which allows us to
- // forward an unmodified request without having to rebuild it.
-
- const char *cm = command;
- size_t cl = 0;
- // min command length is 2, plus the "\r\n"
- if (cmdlen < 4) {
- return -1;
- }
-
- const char *s = memchr(command, ' ', cmdlen-2);
- if (s != NULL) {
- cl = s - command;
- } else {
- cl = cmdlen - 2;
- }
- pr->keytoken = 0;
- pr->has_space = false;
- pr->parsed = cl + 1;
- pr->request = command;
- pr->reqlen = cmdlen;
- int token_max = PARSER_MAX_TOKENS;
-
- int cmd = -1;
- int type = CMD_TYPE_GENERIC;
- int ret = 0;
-
- switch (cl) {
- case 0:
- case 1:
- // falls through with cmd as -1. should error.
- break;
- case 2:
- if (cm[0] == 'm') {
- switch (cm[1]) {
- case 'g':
- cmd = CMD_MG;
- ret = _process_request_meta(pr);
- break;
- case 's':
- cmd = CMD_MS;
- ret = _process_request_mset(pr);
- break;
- case 'd':
- cmd = CMD_MD;
- ret = _process_request_meta(pr);
- break;
- case 'n':
- // TODO: do we route/handle NOP's at all?
- // they should simply reflect to the client.
- cmd = CMD_MN;
- break;
- case 'a':
- cmd = CMD_MA;
- ret = _process_request_meta(pr);
- break;
- case 'e':
- cmd = CMD_ME;
- // TODO: not much special processing here; binary keys
- ret = _process_request_meta(pr);
- break;
- }
- }
- break;
- case 3:
- if (cm[0] == 'g') {
- if (cm[1] == 'e' && cm[2] == 't') {
- cmd = CMD_GET;
- type = CMD_TYPE_GET;
- token_max = 2; // don't chew through multigets.
- ret = _process_request_simple(pr, 2);
- }
- if (cm[1] == 'a' && cm[2] == 't') {
- type = CMD_TYPE_GET;
- cmd = CMD_GAT;
- token_max = 2; // don't chew through multigets.
- ret = _process_request_gat(pr);
- }
- } else if (cm[0] == 's' && cm[1] == 'e' && cm[2] == 't') {
- cmd = CMD_SET;
- ret = _process_request_storage(pr, token_max);
- } else if (cm[0] == 'a' && cm[1] == 'd' && cm[2] == 'd') {
- cmd = CMD_ADD;
- ret = _process_request_storage(pr, token_max);
- } else if (cm[0] == 'c' && cm[1] == 'a' && cm[2] == 's') {
- cmd = CMD_CAS;
- ret = _process_request_storage(pr, token_max);
- }
- break;
- case 4:
- if (strncmp(cm, "gets", 4) == 0) {
- cmd = CMD_GETS;
- type = CMD_TYPE_GET;
- token_max = 2; // don't chew through multigets.
- ret = _process_request_simple(pr, 2);
- } else if (strncmp(cm, "incr", 4) == 0) {
- cmd = CMD_INCR;
- ret = _process_request_simple(pr, 4);
- } else if (strncmp(cm, "decr", 4) == 0) {
- cmd = CMD_DECR;
- ret = _process_request_simple(pr, 4);
- } else if (strncmp(cm, "gats", 4) == 0) {
- cmd = CMD_GATS;
- type = CMD_TYPE_GET;
- ret = _process_request_gat(pr);
- } else if (strncmp(cm, "quit", 4) == 0) {
- cmd = CMD_QUIT;
- }
- break;
- case 5:
- if (strncmp(cm, "touch", 5) == 0) {
- cmd = CMD_TOUCH;
- ret = _process_request_simple(pr, 4);
- } else if (strncmp(cm, "stats", 5) == 0) {
- cmd = CMD_STATS;
- // Don't process a key; fetch via arguments.
- _process_tokenize(pr, token_max);
- } else if (strncmp(cm, "watch", 5) == 0) {
- cmd = CMD_WATCH;
- _process_tokenize(pr, token_max);
- }
- break;
- case 6:
- if (strncmp(cm, "delete", 6) == 0) {
- cmd = CMD_DELETE;
- ret = _process_request_simple(pr, 4);
- } else if (strncmp(cm, "append", 6) == 0) {
- cmd = CMD_APPEND;
- ret = _process_request_storage(pr, token_max);
- }
- break;
- case 7:
- if (strncmp(cm, "replace", 7) == 0) {
- cmd = CMD_REPLACE;
- ret = _process_request_storage(pr, token_max);
- } else if (strncmp(cm, "prepend", 7) == 0) {
- cmd = CMD_PREPEND;
- ret = _process_request_storage(pr, token_max);
- } else if (strncmp(cm, "version", 7) == 0) {
- cmd = CMD_VERSION;
- _process_tokenize(pr, token_max);
- }
- break;
- }
-
- // TODO: log more specific error code.
- if (cmd == -1 || ret != 0) {
- return -1;
- }
-
- pr->command = cmd;
- pr->cmd_type = type;
-
- return 0;
-}
-
-// FIXME (v2): any reason to pass in command/cmdlen separately?
-static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen) {
- // reserving an upvalue for key.
- mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN * 2 + KEY_MAX_LENGTH, 1);
- // TODO (v2): memset only the non-data part? as the rest gets memcpy'd
- // over.
- memset(rq, 0, sizeof(mcp_request_t));
- memcpy(&rq->pr, pr, sizeof(*pr));
-
- memcpy(rq->request, command, cmdlen);
- rq->pr.request = rq->request;
- rq->pr.reqlen = cmdlen;
- gettimeofday(&rq->start, NULL);
-
- luaL_getmetatable(L, "mcp.request");
- lua_setmetatable(L, -2);
-
- // at this point we should know if we have to bounce through _nread to
- // get item data or not.
- return rq;
-}
-
-// TODO (v2):
-// if modified, this will re-serialize every time it's accessed.
-// a simple opt could copy back over the original space
-// a "better" one could A/B the request ptr and clear the modified state
-// each time it gets serialized.
-static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p) {
- mcp_parser_t *pr = &rq->pr;
- char *r = (char *) pr->request;
- size_t len = pr->reqlen;
-
- // one or more of the tokens were changed
- if (rq->was_modified) {
- assert(rq->tokent_ref);
- // option table to top of stack.
- lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref);
-
- // space was reserved in case of modification.
- char *nr = rq->request + MCP_REQUEST_MAXLEN;
- r = nr;
- char *or = NULL;
-
- for (int x = 0; x < pr->ntokens; x++) {
- const char *newtok = NULL;
- size_t newlen = 0;
- if (x != 0 && x != pr->keytoken) {
- int type = lua_rawgeti(L, -1, x+1);
- if (type != LUA_TNIL) {
- newtok = lua_tolstring(L, -1, &newlen);
- memcpy(nr, newtok, newlen);
- nr += newlen;
- }
- lua_pop(L, 1);
- }
-
- if (newtok == NULL) {
- // TODO (v2): if we add an extra "end" token that's just reqlen we can
- // memcpy... however most args are short and that may not be worth
- // it.
- or = rq->request + pr->tokens[x];
- // will walk past the end without the \r test.
- // if we add the end token trick this can be changed.
- while (*or != ' ' && *or != '\r' && *or != '\n') {
- *nr = *or;
- nr++;
- or++;
- }
- }
- *nr = ' ';
- nr++;
- }
- // tag the end bits.
- memcpy(nr-1, "\r\n\0", 3);
- nr++;
-
- len = nr - (rq->request + MCP_REQUEST_MAXLEN);
- lua_pop(L, 1); // pop the table
- }
-
- // The stringified request. This is also referencing into the coroutine
- // stack, which should be safe from gc.
- p->iov[0].iov_base = r;
- p->iov[0].iov_len = len;
- p->iovcnt = 1;
- p->iovbytes = len;
- if (pr->vlen != 0) {
- p->iov[1].iov_base = pr->vbuf;
- p->iov[1].iov_len = pr->vlen;
- p->iovcnt = 2;
- p->iovbytes += pr->vlen;
- }
-
-}
-
-// second argument is optional, for building set requests.
-// TODO: append the \r\n for the VAL?
-static int mcplib_request(lua_State *L) {
- size_t len = 0;
- size_t vlen = 0;
- mcp_parser_t pr = {0};
- const char *cmd = luaL_checklstring(L, 1, &len);
- const char *val = luaL_optlstring(L, 2, NULL, &vlen);
-
- // FIXME (v2): if we inline the userdata we can avoid memcpy'ing the parser
- // structure from the stack? but causes some code duplication.
- if (process_request(&pr, cmd, len) != 0) {
- proxy_lua_error(L, "failed to parse request");
- return 0;
- }
- mcp_request_t *rq = mcp_new_request(L, &pr, cmd, len);
-
- if (val != NULL) {
- rq->pr.vlen = vlen;
- rq->pr.vbuf = malloc(vlen);
- if (rq->pr.vbuf == NULL) {
- // Note: without *c we can't tick the appropriate counter.
- // However, in practice raw malloc's are nearly never going to
- // fail.
- // TODO(v2): we can stack values into the request objects or use
- // the slabber memory, so this isn't necessary anyway.
- proxy_lua_error(L, "failed to allocate value memory for request object");
- }
- memcpy(rq->pr.vbuf, val, vlen);
- }
- gettimeofday(&rq->start, NULL);
-
- // rq is now created, parsed, and on the stack.
- return 1;
-}
-
-static int mcplib_request_key(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
- lua_pushlstring(L, MCP_PARSER_KEY(rq->pr), rq->pr.klen);
- return 1;
-}
-
-// NOTE: I've mixed up const/non-const strings in the request. During parsing
-// we want it to be const, but after that's done the request is no longer
-// const. It might be better to just remove the const higher up the chain, but
-// I'd rather not. So for now these functions will be dumping the const to
-// modify the string.
-static int mcplib_request_ltrimkey(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request");
- int totrim = luaL_checkinteger(L, -1);
- char *key = (char *) MCP_PARSER_KEY(rq->pr);
-
- if (totrim > rq->pr.klen) {
- proxy_lua_error(L, "ltrimkey cannot zero out key");
- return 0;
- } else {
- memset(key, ' ', totrim);
- rq->pr.klen -= totrim;
- rq->pr.tokens[rq->pr.keytoken] += totrim;
- }
- return 1;
-}
-
-static int mcplib_request_rtrimkey(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request");
- int totrim = luaL_checkinteger(L, -1);
- char *key = (char *) MCP_PARSER_KEY(rq->pr);
-
- if (totrim > rq->pr.klen) {
- proxy_lua_error(L, "rtrimkey cannot zero out key");
- return 0;
- } else {
- memset(key + (rq->pr.klen - totrim), ' ', totrim);
- rq->pr.klen -= totrim;
- // don't need to change the key token.
- }
- return 1;
-}
-
-// Virtual table operations on the request.
-static int mcplib_request_token(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
- int argc = lua_gettop(L);
-
- if (argc == 1) {
- lua_pushnil(L);
- return 1;
- }
-
- int token = luaL_checkinteger(L, 2);
-
- if (token < 1 || token > rq->pr.ntokens) {
- // maybe an error?
- lua_pushnil(L);
- return 1;
- }
-
- // we hold overwritten or parsed tokens in a lua table.
- if (rq->tokent_ref == 0) {
- // create a presized table that can hold our tokens.
- lua_createtable(L, rq->pr.ntokens, 0);
- // duplicate value to set back
- lua_pushvalue(L, -1);
- rq->tokent_ref = luaL_ref(L, LUA_REGISTRYINDEX);
- } else {
- lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref);
- }
- // top of stack should be token table.
-
- size_t vlen = 0;
- if (argc > 2) {
- // overwriting a token.
- luaL_checklstring(L, 3, &vlen);
- lua_pushvalue(L, 3); // copy to top of stack
- lua_rawseti(L, -2, token);
- rq->was_modified = true;
- return 0;
- } else {
- // fetching a token.
- if (lua_rawgeti(L, -1, token) != LUA_TSTRING) {
- lua_pop(L, 1); // got a nil, drop it.
-
- // token not uploaded yet. find the len.
- char *s = (char *) &rq->pr.request[rq->pr.tokens[token-1]];
- char *e = s;
- while (*e != ' ') {
- e++;
- }
- vlen = e - s;
-
- P_DEBUG("%s: pushing token of len: %lu\n", __func__, vlen);
- lua_pushlstring(L, s, vlen);
- lua_pushvalue(L, -1); // copy
-
- lua_rawseti(L, -3, token); // pops copy.
- }
-
- // return fetched token or copy of new token.
- return 1;
- }
-
- return 0;
-}
-
-static int mcplib_request_ntokens(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
- lua_pushinteger(L, rq->pr.ntokens);
- return 1;
-}
-
-static int mcplib_request_command(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
- lua_pushinteger(L, rq->pr.command);
- return 1;
-}
-
-static int mcplib_request_gc(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
- // During nread c->item is the malloc'ed buffer. not yet put into
- // rq->buf - this gets freed because we've also set c->item_malloced if
- // the connection closes before finishing nread.
- if (rq->pr.vbuf != NULL) {
- free(rq->pr.vbuf);
- }
-
- if (rq->tokent_ref != 0) {
- luaL_unref(L, LUA_REGISTRYINDEX, rq->tokent_ref);
- }
- return 0;
-}
-
-// TODO (v2): check what lua does when it calls a function with a string argument
-// stored from a table/similar (ie; the prefix check code).
-// If it's not copying anything, we can add request-side functions to do most
-// forms of matching and avoid copying the key to lua space.
-
-/*** END REQUET PARSER AND OBJECT ***/
-
-/*** START jump consistent hash library ***/
-// TODO (v2): easy candidate for splitting to another .c, but I want this built in
-// instead of as a .so so make sure it's linked directly.
-
-typedef struct {
- struct proxy_hash_caller phc; // passed back to proxy API
- unsigned int buckets;
-} mcplib_jump_hash_t;
-
-static uint32_t mcplib_dist_jump_hash_get_server(uint64_t hash, void *ctx) {
- mcplib_jump_hash_t *jh = ctx;
-
- int64_t b = -1, j = 0;
- while (j < jh->buckets) {
- b = j;
- hash = hash * 2862933555777941757ULL + 1;
- j = (b + 1) * ((double)(1LL << 31) / (double)((hash >> 33) + 1));
- }
- return b;
-}
-
-// stack = [pool, option]
-static int mcplib_dist_jump_hash_new(lua_State *L) {
- luaL_checktype(L, 1, LUA_TTABLE);
- lua_Unsigned buckets = lua_rawlen(L, 1);
-
- mcplib_jump_hash_t *jh = lua_newuserdatauv(L, sizeof(mcplib_jump_hash_t), 0);
-
- // don't need to loop through the table at all, just need its length.
- // could optimize startup time by adding hints to the module for how to
- // format pool (ie; just a total count or the full table)
- jh->buckets = buckets;
- jh->phc.ctx = jh;
- jh->phc.selector_func = mcplib_dist_jump_hash_get_server;
-
- lua_pushlightuserdata(L, &jh->phc);
-
- // - return [UD, lightuserdata]
- return 2;
-}
-
-static int mcplib_open_dist_jump_hash(lua_State *L) {
- const struct luaL_Reg jump_f[] = {
- {"new", mcplib_dist_jump_hash_new},
- {NULL, NULL},
- };
-
- luaL_newlib(L, jump_f);
-
- return 1;
-}
-
-/*** END jump consistent hash library ***/
-
-/*** START lua interface to logger ***/
-
-static int mcplib_log(lua_State *L) {
- LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
- const char *msg = luaL_checkstring(L, -1);
- LOGGER_LOG(t->l, LOG_PROXYUSER, LOGGER_PROXY_USER, NULL, msg);
- return 0;
-}
-
-/*** START lua interface to user stats ***/
-
-// mcp.add_stat(index, name)
-// creates a custom lua stats counter
-static int mcplib_add_stat(lua_State *L) {
- LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
- if (t != NULL) {
- proxy_lua_error(L, "add_stat must be called from config_pools");
- return 0;
- }
- int idx = luaL_checkinteger(L, -2);
- const char *name = luaL_checkstring(L, -1);
-
- if (idx < 1) {
- proxy_lua_error(L, "stat index must be 1 or higher");
- return 0;
- }
- // max user counters? 1024? some weird number.
- if (idx > 1024) {
- proxy_lua_error(L, "stat index must be 1024 or less");
- return 0;
- }
- // max name length? avoids errors if something huge gets thrown in.
- if (strlen(name) > STAT_KEY_LEN - 6) {
- // we prepend "user_" to the output. + null byte.
- proxy_lua_ferror(L, "stat name too long: %s\n", name);
- return 0;
- }
- // restrict characters, at least no spaces/newlines.
- for (int x = 0; x < strlen(name); x++) {
- if (isspace(name[x])) {
- proxy_lua_error(L, "stat cannot contain spaces or newlines");
- return 0;
- }
- }
-
- proxy_ctx_t *ctx = settings.proxy_ctx; // TODO (v2): store ctx in upvalue.
-
- STAT_L(ctx);
- struct proxy_user_stats *us = &ctx->user_stats;
-
- // if num_stats is 0 we need to init sizes.
- // TODO (v2): malloc fail checking. (should be rare/impossible)
- if (us->num_stats < idx) {
- // don't allocate counters memory for the global ctx.
- char **nnames = calloc(idx, sizeof(char *));
- if (us->names != NULL) {
- for (int x = 0; x < us->num_stats; x++) {
- nnames[x] = us->names[x];
- }
- free(us->names);
- }
- us->names = nnames;
- us->num_stats = idx;
- }
-
- idx--; // real slot start as 0.
- // if slot has string in it, free first
- if (us->names[idx] != NULL) {
- free(us->names[idx]);
- }
- // strdup name into string slot
- // TODO (v2): malloc failure.
- us->names[idx] = strdup(name);
- STAT_UL(ctx);
-
- return 0;
-}
-
-static int mcplib_stat(lua_State *L) {
- LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
- if (t == NULL) {
- proxy_lua_error(L, "stat must be called from router handlers");
- return 0;
- }
-
- struct proxy_user_stats *tus = t->proxy_user_stats;
- if (tus == NULL) {
- proxy_lua_error(L, "no stats counters initialized");
- return 0;
- }
-
- int idx = luaL_checkinteger(L, -2);
- int change = luaL_checkinteger(L, -1);
-
- if (idx < 1 || idx > tus->num_stats) {
- proxy_lua_error(L, "stat index out of range");
- return 0;
- }
-
- idx--; // actual array is 0 indexed.
- WSTAT_L(t);
- tus->counters[idx] += change;
- WSTAT_UL(t);
-
- return 0;
-}
-
-/*** END lua interface to user stats ***/
-
-/*** START lua await() object interface ***/
-
-enum mcp_await_e {
- AWAIT_GOOD = 0, // looks for OK + NOT MISS
- AWAIT_ANY, // any response, including errors,
- AWAIT_OK, // any non-error response
- AWAIT_FIRST, // return the result from the first pool
-};
-
-typedef struct mcp_await_s {
- int pending;
- int wait_for;
- int req_ref;
- int argtable_ref; // need to hold refs to any potential hash selectors
- int restable_ref; // table of result objects
- int coro_ref; // reference to parent coroutine
- enum mcp_await_e type;
- bool completed; // have we completed the parent coroutine or not
- mcp_request_t *rq;
- mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
-} mcp_await_t;
-
-// TODO (v2): mcplib_await_gc()
-// - needs to handle cases where an await is created, but a rare error happens
-// before it completes and the coroutine is killed. must check and free its
-// references.
-
-// local restable = mcp.await(request, pools, num_wait)
-// NOTE: need to hold onto the pool objects since those hold backend
-// references. Here we just keep a reference to the argument table.
-static int mcplib_await(lua_State *L) {
- mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
- luaL_checktype(L, 2, LUA_TTABLE);
- int n = luaL_len(L, 2); // length of hash selector table
- int wait_for = 0; // 0 means wait for all responses
- enum mcp_await_e type = AWAIT_GOOD;
-
- if (n <= 0) {
- proxy_lua_error(L, "mcp.await arguments must have at least one pool");
- }
- if (lua_isnumber(L, 3)) {
- wait_for = lua_tointeger(L, 3);
- lua_pop(L, 1);
- if (wait_for > n) {
- wait_for = n;
- }
- }
-
- if (lua_isnumber(L, 4)) {
- type = lua_tointeger(L, 4);
- switch (type) {
- case AWAIT_GOOD:
- case AWAIT_ANY:
- case AWAIT_OK:
- case AWAIT_FIRST:
- break;
- default:
- proxy_lua_error(L, "invalid type argument tp mcp.await");
- }
- }
-
- // FIRST is only looking for one valid request.
- if (type == AWAIT_FIRST) {
- wait_for = 1;
- }
-
- // TODO (v2): quickly loop table once and ensure they're all pools?
- // TODO (v2) in case of newuserdatauv throwing an error, we need to grab
- // these references after allocating *aw else can leak memory.
- int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
- int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.
-
- // stack will be only the await object now
- // allocate before grabbing references so an error won't cause leaks.
- mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0);
- memset(aw, 0, sizeof(mcp_await_t));
-
- aw->wait_for = wait_for;
- aw->pending = n;
- aw->argtable_ref = argtable_ref;
- aw->rq = rq;
- aw->req_ref = req_ref;
- aw->type = type;
- P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
-
- return lua_yield(L, 1);
-}
-
-static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
- io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
-
- mcp_backend_t *be = rq->be;
-
- // Then we push a response object, which we'll re-use later.
- // reserve one uservalue for a lua-supplied response.
- mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
- memset(r, 0, sizeof(mcp_resp_t));
- r->start = rq->start;
-
- int x;
- int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
- for (x = 0; x < end; x++) {
- if (rq->pr.request[x] == ' ') {
- break;
- }
- r->cmd[x] = rq->pr.request[x];
- }
- r->cmd[x] = '\0';
-
- luaL_getmetatable(Lc, "mcp.response");
- lua_setmetatable(Lc, -2);
-
- io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
- if (p == NULL) {
- WSTAT_INCR(c, proxy_conn_oom, 1);
- proxy_lua_error(Lc, "out of memory allocating from IO cache");
- return;
- }
-
- // this is a re-cast structure, so assert that we never outsize it.
- assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
- memset(p, 0, sizeof(io_pending_proxy_t));
- // set up back references.
- p->io_queue_type = IO_QUEUE_PROXY;
- p->thread = c->thread;
- p->c = c;
- p->resp = NULL;
- p->client_resp = r;
- p->flushed = false;
- p->ascii_multiget = rq->ascii_multiget;
-
- // io_p needs to hold onto its own response reference, because we may or
- // may not include it in the final await() result.
- p->mcpres_ref = luaL_ref(Lc, LUA_REGISTRYINDEX); // pops mcp.response
-
- // avoiding coroutine reference for sub-IO
- p->coro_ref = 0;
- p->coro = NULL;
-
- // await specific
- p->is_await = true;
- p->await_ref = await_ref;
- p->await_first = await_first;
-
- // The direct backend object. await object is holding reference
- p->backend = be;
-
- mcp_request_attach(Lc, rq, p);
-
- // link into the batch chain.
- p->next = q->stack_ctx;
- q->stack_ctx = p;
- P_DEBUG("%s: queued\n", __func__);
-
- return;
-}
-
-// TODO (v2): need to get this code running under pcall().
-// It looks like a bulk of this code can move into mcplib_await(),
-// and then here post-yield we can add the conn and coro_ref to the right
-// places. Else these errors currently crash the daemon.
-static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
- P_DEBUG("%s: start\n", __func__);
- mcp_await_t *aw = lua_touserdata(L, -1);
- int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped.
- assert(aw != NULL);
- lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1
- //dump_stack(L);
- P_DEBUG("%s: argtable len: %d\n", __func__, (int)lua_rawlen(L, -1));
- mcp_request_t *rq = aw->rq;
- aw->coro_ref = coro_ref;
-
- // create result table
- lua_newtable(L); // -> 2
- aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table
-
- // prepare the request key
- const char *key = MCP_PARSER_KEY(rq->pr);
- size_t len = rq->pr.klen;
- bool await_first = true;
- // loop arg table and run each hash selector
- lua_pushnil(L); // -> 3
- while (lua_next(L, 1) != 0) {
- P_DEBUG("%s: top of loop\n", __func__);
- // (key, -2), (val, -1)
- mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
- if (pp == NULL) {
- proxy_lua_error(L, "mcp.await must be supplied with a pool");
- }
- mcp_pool_t *p = pp->main;
-
- // NOTE: rq->be is only held to help pass the backend into the IOP in
- // mcp_queue call. Could be a local variable and an argument too.
- rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);
-
- mcp_queue_await_io(c, L, rq, await_ref, await_first);
- await_first = false;
-
- // pop value, keep key.
- lua_pop(L, 1);
- }
-
- lua_pop(L, 1); // remove table key.
- aw->resp = c->resp; // cuddle the current mc_resp to fill later
-
- // we count the await as the "response pending" since it covers a single
- // response object. the sub-IO's don't count toward the redispatch of *c
- io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
- q->count++;
-
- P_DEBUG("%s\n", __func__);
-
- return 0;
-}
-
-// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown
-// purposefully as of this writing, but it's still not safe. Either the code
-// can be restructured to use less lua (which I think is better long term
-// anyway) or it can be pushed behind a cfunc pcall so we don't crash the
-// daemon if something bad happens.
-static int mcplib_await_return(io_pending_proxy_t *p) {
- mcp_await_t *aw;
- lua_State *L = p->thread->L; // use the main VM coroutine for work
- bool cleanup = false;
- bool valid = false; // is response valid to add to the result table.
- bool completing = false;
-
- // TODO (v2): just push the await ptr into *p?
- lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref);
- aw = lua_touserdata(L, -1);
- lua_pop(L, 1); // remove AW object from stack
- assert(aw != NULL);
- P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending);
- //dump_stack(L);
-
- aw->pending--;
- // Await not yet satisfied.
- // If wait_for != 0 check for response success
- // if success and wait_for is *now* 0, we complete.
- // add successful response to response table
- // Also, if no wait_for, add response to response table
- // TODO (v2): for GOOD or OK cases, it might be better to return the
- // last object as valid if there are otherwise zero valids?
- // Think we just have to count valids...
- if (!aw->completed) {
- valid = true; // always collect results unless we are completed.
- if (aw->wait_for > 0) {
- bool is_good = false;
- switch (aw->type) {
- case AWAIT_GOOD:
- if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
- is_good = true;
- }
- break;
- case AWAIT_ANY:
- is_good = true;
- break;
- case AWAIT_OK:
- if (p->client_resp->status == MCMC_OK) {
- is_good = true;
- }
- break;
- case AWAIT_FIRST:
- if (p->await_first) {
- is_good = true;
- } else {
- // user only wants the first pool's result.
- valid = false;
- }
- break;
- }
-
- if (is_good) {
- aw->wait_for--;
- }
-
- if (aw->wait_for == 0) {
- completing = true;
- }
- }
- }
-
- // note that post-completion, we stop gathering responses into the
- // resposne table... because it's already been returned.
- // So "valid" can only be true if also !completed
- if (aw->pending == 0) {
- if (!aw->completed) {
- // were waiting for all responses.
- completing = true;
- }
- cleanup = true;
- P_DEBUG("%s: pending == 0\n", __func__);
- }
-
- // a valid response to add to the result table.
- if (valid) {
- P_DEBUG("%s: valid\n", __func__);
- lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
- lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2
- // couldn't find a table.insert() equivalent; so this is
- // inserting into the length + 1 position manually.
- //dump_stack(L);
- lua_rawseti(L, 1, lua_rawlen(L, 1) + 1); // pops mcpres
- lua_pop(L, 1); // pops restable
- }
-
- // lose our internal mcpres reference regardless.
- luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
- // our await_ref is shared, so we don't need to release it.
-
- if (completing) {
- P_DEBUG("%s: completing\n", __func__);
- aw->completed = true;
- // if we haven't completed yet, the connection reference is still
- // valid. So now we pull it, reduce count, and readd if necessary.
- // here is also the point where we resume the coroutine.
- lua_rawgeti(L, LUA_REGISTRYINDEX, aw->coro_ref);
- lua_State *Lc = lua_tothread(L, -1);
- lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
- proxy_run_coroutine(Lc, aw->resp, NULL, p->c);
- luaL_unref(L, LUA_REGISTRYINDEX, aw->coro_ref);
- luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref);
-
- io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
- q->count--;
- if (q->count == 0) {
- // call re-add directly since we're already in the worker thread.
- conn_worker_readd(p->c);
- }
-
- }
-
- if (cleanup) {
- P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed);
- luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref);
- luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref);
- luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref);
- }
-
- // Just remove anything we could have left on the primary VM stack
- lua_settop(L, 0);
-
- // always return free this sub-IO object.
- do_cache_free(p->thread->io_cache, p);
-
- return 0;
-}
-
-/*** END lua await() object interface ***/
-
-/*** START xxhash module ***/
-
-static struct proxy_hash_func mcplib_hash_xxhash = {
- XXH3_64bits_withSeed,
-};
-
-static int mcplib_open_hash_xxhash(lua_State *L) {
- lua_pushlightuserdata(L, &mcplib_hash_xxhash);
- return 1;
-}
-
-/*** END xxhash module ***/
-
-static void proxy_register_defines(lua_State *L) {
-#define X(x) \
- lua_pushinteger(L, x); \
- lua_setfield(L, -2, #x);
-
- X(P_OK);
- X(CMD_ANY);
- X(CMD_ANY_STORAGE);
- X(AWAIT_GOOD);
- X(AWAIT_ANY);
- X(AWAIT_OK);
- X(AWAIT_FIRST);
- CMD_FIELDS
-#undef X
-}
-
-// Creates and returns the top level "mcp" module
-int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
- lua_State *L = ctx;
-
- const struct luaL_Reg mcplib_backend_m[] = {
- {"set", NULL},
- {"__gc", mcplib_backend_gc},
- {NULL, NULL}
- };
-
- const struct luaL_Reg mcplib_request_m[] = {
- {"command", mcplib_request_command},
- {"key", mcplib_request_key},
- {"ltrimkey", mcplib_request_ltrimkey},
- {"rtrimkey", mcplib_request_rtrimkey},
- {"token", mcplib_request_token},
- {"ntokens", mcplib_request_ntokens},
- {"__tostring", NULL},
- {"__gc", mcplib_request_gc},
- {NULL, NULL}
- };
-
- const struct luaL_Reg mcplib_response_m[] = {
- {"ok", mcplib_response_ok},
- {"hit", mcplib_response_hit},
- {"__gc", mcplib_response_gc},
- {NULL, NULL}
- };
-
- const struct luaL_Reg mcplib_pool_m[] = {
- {"__gc", mcplib_pool_gc},
- {NULL, NULL}
- };
-
- const struct luaL_Reg mcplib_pool_proxy_m[] = {
- {"__call", mcplib_pool_proxy_call},
- {"__gc", mcplib_pool_proxy_gc},
- {NULL, NULL}
- };
-
- const struct luaL_Reg mcplib_f [] = {
- {"pool", mcplib_pool},
- {"backend", mcplib_backend},
- {"request", mcplib_request},
- {"attach", mcplib_attach},
- {"add_stat", mcplib_add_stat},
- {"stat", mcplib_stat},
- {"await", mcplib_await},
- {"log", mcplib_log},
- {"backend_connect_timeout", mcplib_backend_connect_timeout},
- {"backend_retry_timeout", mcplib_backend_retry_timeout},
- {"backend_read_timeout", mcplib_backend_read_timeout},
- {"backend_failure_limit", mcplib_backend_failure_limit},
- {"tcp_keepalive", mcplib_tcp_keepalive},
- {NULL, NULL}
- };
-
- // TODO (v2): function + loop.
- luaL_newmetatable(L, "mcp.backend");
- lua_pushvalue(L, -1); // duplicate metatable.
- lua_setfield(L, -2, "__index"); // mt.__index = mt
- luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
- lua_pop(L, 1);
-
- luaL_newmetatable(L, "mcp.request");
- lua_pushvalue(L, -1); // duplicate metatable.
- lua_setfield(L, -2, "__index"); // mt.__index = mt
- luaL_setfuncs(L, mcplib_request_m, 0); // register methods
- lua_pop(L, 1);
-
- luaL_newmetatable(L, "mcp.response");
- lua_pushvalue(L, -1); // duplicate metatable.
- lua_setfield(L, -2, "__index"); // mt.__index = mt
- luaL_setfuncs(L, mcplib_response_m, 0); // register methods
- lua_pop(L, 1);
-
- luaL_newmetatable(L, "mcp.pool");
- lua_pushvalue(L, -1); // duplicate metatable.
- lua_setfield(L, -2, "__index"); // mt.__index = mt
- luaL_setfuncs(L, mcplib_pool_m, 0); // register methods
- lua_pop(L, 1); // drop the hash selector metatable
-
- luaL_newmetatable(L, "mcp.pool_proxy");
- lua_pushvalue(L, -1); // duplicate metatable.
- lua_setfield(L, -2, "__index"); // mt.__index = mt
- luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods
- lua_pop(L, 1); // drop the hash selector metatable
-
- // create main library table.
- //luaL_newlib(L, mcplib_f);
- // TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things
- // here.
- // can replace with createtable and add the num. of the constant
- // definitions.
- luaL_newlibtable(L, mcplib_f);
- proxy_register_defines(L);
-
- mcplib_open_hash_xxhash(L);
- lua_setfield(L, -2, "hash_xxhash");
- // hash function for selectors.
- // have to wrap the function in a struct because function pointers aren't
- // pointer pointers :)
- mcplib_open_dist_jump_hash(L);
- lua_setfield(L, -2, "dist_jump_hash");
-
- lua_pushlightuserdata(L, (void *)t); // upvalue for original thread
- lua_newtable(L); // upvalue for mcp.attach() table.
-
- // create weak table for storing backends by label.
- lua_newtable(L); // {}
- lua_newtable(L); // {}, {} for metatable
- lua_pushstring(L, "v"); // {}, {}, "v" for weak values.
- lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"}
- lua_setmetatable(L, -2); // {__mt = {__mode = "v"} }
-
- luaL_setfuncs(L, mcplib_f, 3); // store upvalues.
-
- lua_setglobal(L, "mcp"); // set the lib table to mcp global.
- return 1;
-}