diff options
-rw-r--r-- | BUILD | 13 | ||||
-rw-r--r-- | Makefile.am | 19 | ||||
-rw-r--r-- | configure.ac | 20 | ||||
-rw-r--r-- | logger.c | 37 | ||||
-rw-r--r-- | logger.h | 14 | ||||
-rw-r--r-- | memcached.c | 68 | ||||
-rw-r--r-- | memcached.h | 31 | ||||
-rw-r--r-- | proto_proxy.c | 4191 | ||||
-rw-r--r-- | proto_proxy.h | 26 | ||||
-rw-r--r-- | proto_text.c | 14 | ||||
-rw-r--r-- | proto_text.h | 1 | ||||
-rw-r--r-- | t/lib/MemcachedTest.pm | 8 | ||||
-rw-r--r-- | t/proxy.t | 263 | ||||
-rw-r--r-- | t/startfile.lua | 280 | ||||
-rw-r--r-- | thread.c | 38 | ||||
-rw-r--r-- | vendor/.gitignore | 4 | ||||
-rw-r--r-- | vendor/Makefile | 10 | ||||
-rwxr-xr-x | vendor/fetch.sh | 5 | ||||
-rw-r--r-- | vendor/lua/.gitignore | 2 | ||||
-rw-r--r-- | vendor/mcmc/LICENSE | 30 | ||||
-rw-r--r-- | vendor/mcmc/Makefile | 13 | ||||
-rw-r--r-- | vendor/mcmc/README.md | 52 | ||||
-rw-r--r-- | vendor/mcmc/example.c | 214 | ||||
-rw-r--r-- | vendor/mcmc/mcmc.c | 728 | ||||
-rw-r--r-- | vendor/mcmc/mcmc.h | 91 |
25 files changed, 6162 insertions, 10 deletions
@@ -1,3 +1,5 @@ +See below if building the proxy + To build memcached in your machine from local repo you will have to install autotools, automake and libevent. In a debian based system that will look like this @@ -22,3 +24,14 @@ You can telnet into that memcached to ensure it is up and running telnet 127.0.0.1 11211 stats + +IF BUILDING PROXY, AN EXTRA STEP IS NECESSARY: + +cd memcached +cd vendor +./fetch.sh +cd .. +./autogen.sh +./configure --enable-proxy +make +make test diff --git a/Makefile.am b/Makefile.am index 005df7f..e2c1bd8 100644 --- a/Makefile.am +++ b/Makefile.am @@ -51,6 +51,10 @@ if ENABLE_SASL memcached_SOURCES += sasl_defs.c endif +if ENABLE_PROXY +memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h +endif + if ENABLE_EXTSTORE memcached_SOURCES += extstore.c extstore.h \ crc32c.c crc32c.h \ @@ -90,6 +94,16 @@ memcached_debug_DEPENDENCIES += memcached_debug_dtrace.o CLEANFILES += memcached_dtrace.o memcached_debug_dtrace.o endif +if ENABLE_PROXY +memcached_LDADD += vendor/lua/src/liblua.a vendor/mcmc/mcmc.o +memcached_debug_LDADD += vendor/lua/src/liblua.a vendor/mcmc/mcmc.o +endif + +if ENABLE_PROXY_URING +memcached_LDADD += vendor/liburing/src/liburing.a +memcached_debug_LDADD += vendor/liburing/src/liburing.a +endif + memcached_debug_CFLAGS += -DMEMCACHED_DEBUG memcached_dtrace.h: memcached_dtrace.d @@ -108,6 +122,11 @@ memcached_debug_dtrace.o: $(memcached_debug_OBJECTS) SUBDIRS = doc DIST_DIRS = scripts EXTRA_DIST = doc scripts t memcached.spec memcached_dtrace.d version.m4 README.md LICENSE.bipbuffer +EXTRA_DIST += vendor/Makefile vendor/lua vendor/mcmc + +if ENABLE_PROXY +SUBDIRS += vendor +endif MOSTLYCLEANFILES = *.gcov *.gcno *.gcda *.tcov diff --git a/configure.ac b/configure.ac index 0985f07..a0851f2 100644 --- a/configure.ac +++ b/configure.ac @@ -114,6 +114,12 @@ AC_ARG_ENABLE(static, AC_ARG_ENABLE(unix_socket, [AS_HELP_STRING([--disable-unix-socket], [Disable unix domain socket])]) +AC_ARG_ENABLE(proxy, + [AS_HELP_STRING([--enable-proxy], [Enable proxy code EXPERIMENTAL])]) + +AC_ARG_ENABLE(proxy-uring, + [AS_HELP_STRING([--enable-proxy-uring], [Enable proxy io_uring code EXPERIMENTAL])]) + dnl ********************************************************************** dnl DETECT_SASL_CB_GETCONF dnl @@ -217,6 +223,18 @@ if test "x$enable_unix_socket" = "xno"; then AC_DEFINE([DISABLE_UNIX_SOCKET],1,[Set to nonzero if you want to disable unix domain socket]) fi +if test "x$enable_proxy" = "xyes"; then + AC_DEFINE([PROXY],1,[Set to nonzero if you want to enable proxy code]) + CPPFLAGS="-Ivendor/lua/src -Ivendor/liburing/src/include $CPPFLAGS" + dnl lua needs math lib. + LIBS="$LIBS -lm -ldl" +fi + +if test "x$enable_proxy_uring" = "xyes"; then + AC_DEFINE([HAVE_LIBURING],1,[Set to nonzero if you want to enable proxy uring handling]) + CPPFLAGS="-Ivendor/liburing/src/include $CPPFLAGS" +fi + AM_CONDITIONAL([BUILD_DTRACE],[test "$build_dtrace" = "yes"]) AM_CONDITIONAL([DTRACE_INSTRUMENT_OBJ],[test "$dtrace_instrument_obj" = "yes"]) AM_CONDITIONAL([ENABLE_SASL],[test "$enable_sasl" = "yes"]) @@ -226,6 +244,8 @@ AM_CONDITIONAL([ENABLE_TLS],[test "$enable_tls" = "yes"]) AM_CONDITIONAL([ENABLE_ASAN],[test "$enable_asan" = "yes"]) AM_CONDITIONAL([ENABLE_STATIC],[test "$enable_static" = "yes"]) AM_CONDITIONAL([DISABLE_UNIX_SOCKET],[test "$enable_unix_socket" = "no"]) +AM_CONDITIONAL([ENABLE_PROXY],[test "$enable_proxy" = "yes"]) +AM_CONDITIONAL([ENABLE_PROXY_URING],[test "$enable_proxy_uring" = "yes"]) AC_SUBST(DTRACE) @@ -303,6 +303,34 @@ static int _logger_parse_cce(logentry *e, char *scratch) { return total; } +#ifdef PROXY +static void _logger_log_proxy_raw(logentry *e, const entry_details *d, const void *entry, va_list ap) { + struct timeval start = va_arg(ap, struct timeval); + char *cmd = va_arg(ap, char *); + unsigned short type = va_arg(ap, int); + unsigned short code = va_arg(ap, int); + + struct logentry_proxy_raw *le = (void *)e->data; + struct timeval end; + gettimeofday(&end, NULL); + le->type = type; + le->code = code; + le->elapsed = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); + memcpy(le->cmd, cmd, 9); +} + +static int _logger_parse_prx_raw(logentry *e, char *scratch) { + int total; + struct logentry_proxy_raw *le = (void *)e->data; + + total = snprintf(scratch, LOGGER_PARSE_SCRATCH, + "ts=%d.%d gid=%llu type=proxy_raw elapsed=%lu cmd=%s type=%d code=%d\n", + (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid, + le->elapsed, le->cmd, le->type, le->code); + return total; +} +#endif + /* Should this go somewhere else? */ static const entry_details default_entries[] = { [LOGGER_ASCII_CMD] = {512, LOG_RAWCMDS, _logger_log_text, _logger_parse_text, "<%d %s"}, @@ -338,6 +366,15 @@ static const entry_details default_entries[] = { "type=compact_fraginfo ratio=%.2f bytes=%lu" }, #endif +#ifdef PROXY + [LOGGER_PROXY_CONFIG] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text, + "type=proxy_conf status=%s" + }, + [LOGGER_PROXY_RAW] = {512, LOG_RAWCMDS, _logger_log_proxy_raw, _logger_parse_prx_raw, NULL}, + [LOGGER_PROXY_ERROR] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text, + "type=proxy_error msg=%s" + }, +#endif }; /************************* @@ -31,6 +31,11 @@ enum log_entry_type { LOGGER_COMPACT_END, LOGGER_COMPACT_FRAGINFO, #endif +#ifdef PROXY + LOGGER_PROXY_CONFIG, + LOGGER_PROXY_RAW, + LOGGER_PROXY_ERROR, +#endif }; enum logger_ret_type { @@ -106,7 +111,14 @@ struct logentry_conn_event { int sfd; struct sockaddr_in6 addr; }; - +#ifdef PROXY +struct logentry_proxy_raw { + unsigned short type; + unsigned short code; + long elapsed; // elapsed time in usec + char cmd[8]; +}; +#endif /* end intermediary structures */ /* WARNING: cuddled items aren't compatible with warm restart. more code diff --git a/memcached.c b/memcached.c index 8bbdccd..bf6617d 100644 --- a/memcached.c +++ b/memcached.c @@ -56,6 +56,7 @@ #include "proto_text.h" #include "proto_bin.h" +#include "proto_proxy.h" #if defined(__FreeBSD__) #include <sys/sysctl.h> @@ -85,7 +86,6 @@ static enum try_read_result try_read_udp(conn *c); static int start_conn_timeout_thread(); - /* stats */ static void stats_init(void); static void conn_to_str(const conn *c, char *addr, char *svr_addr); @@ -500,6 +500,11 @@ static const char *prot_text(enum protocol prot) { case negotiating_prot: rv = "auto-negotiate"; break; +#ifdef PROXY + case proxy_prot: + rv = "proxy"; + break; +#endif } return rv; } @@ -797,6 +802,11 @@ conn *conn_new(const int sfd, enum conn_states init_state, case negotiating_prot: c->try_read_command = try_read_command_negotiate; break; +#ifdef PROXY + case proxy_prot: + c->try_read_command = try_read_command_proxy; + break; +#endif } } @@ -1417,13 +1427,22 @@ static void reset_cmd_handler(conn *c) { static void complete_nread(conn *c) { assert(c != NULL); +#ifdef PROXY + assert(c->protocol == ascii_prot + || c->protocol == binary_prot + || c->protocol == proxy_prot); +#else assert(c->protocol == ascii_prot || c->protocol == binary_prot); - +#endif if (c->protocol == ascii_prot) { complete_nread_ascii(c); } else if (c->protocol == binary_prot) { complete_nread_binary(c); +#ifdef PROXY + } else if (c->protocol == proxy_prot) { + complete_nread_proxy(c); +#endif } } @@ -1788,6 +1807,12 @@ void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore); } #endif +#ifdef PROXY + if (settings.proxy_enabled) { + APPEND_STAT("proxy_conn_requests", "%llu", (unsigned long long)thread_stats.proxy_conn_requests); + APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors); + } +#endif APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses); APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits); APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses); @@ -1843,6 +1868,9 @@ void server_stats(ADD_STAT add_stats, conn *c) { #ifdef EXTSTORE storage_stats(add_stats, c); #endif +#ifdef PROXY + proxy_stats(add_stats, c); +#endif #ifdef TLS if (settings.ssl_enabled) { if (settings.ssl_session_cache) { @@ -3801,6 +3829,9 @@ static void clock_handler(const evutil_socket_t fd, const short which, void *arg settings.sig_hup = false; authfile_load(settings.auth_file); +#ifdef PROXY + proxy_start_reload(settings.proxy_ctx); +#endif } evtimer_set(&clockevent, clock_handler, 0); @@ -3999,6 +4030,9 @@ static void usage(void) { settings.ext_max_frag, settings.slab_automove_freeratio); verify_default("ext_item_age", settings.ext_item_age == UINT_MAX); #endif +#ifdef PROXY + printf(" - proxy_config: path to lua config file.\n"); +#endif #ifdef TLS printf(" - ssl_chain_cert: certificate chain file in PEM format\n" " - ssl_key: private key, if not part of the -ssl_chain_cert\n" @@ -4663,6 +4697,9 @@ int main (int argc, char **argv) { SSL_SESSION_CACHE, SSL_MIN_VERSION, #endif +#ifdef PROXY + PROXY_CONFIG, +#endif #ifdef MEMCACHED_DEBUG RELAXED_PRIVILEGES, #endif @@ -4718,6 +4755,9 @@ int main (int argc, char **argv) { [SSL_SESSION_CACHE] = "ssl_session_cache", [SSL_MIN_VERSION] = "ssl_min_version", #endif +#ifdef PROXY + [PROXY_CONFIG] = "proxy_config", +#endif #ifdef MEMCACHED_DEBUG [RELAXED_PRIVILEGES] = "relaxed_privileges", #endif @@ -5461,6 +5501,22 @@ int main (int argc, char **argv) { } settings.read_buf_mem_limit *= 1024 * 1024; /* megabytes */ break; +#ifdef PROXY + case PROXY_CONFIG: + if (subopts_value == NULL) { + fprintf(stderr, "Missing proxy_config file argument\n"); + return 1; + } + if (protocol_specified) { + fprintf(stderr, "Cannot specify a protocol with proxy mode enabled\n"); + return 1; + } + settings.proxy_startfile = strdup(subopts_value); + settings.proxy_enabled = true; + settings.binding_protocol = proxy_prot; + protocol_specified = true; + break; +#endif #ifdef MEMCACHED_DEBUG case RELAXED_PRIVILEGES: settings.relaxed_privileges = true; @@ -5868,6 +5924,14 @@ int main (int argc, char **argv) { exit(EX_OSERR); } /* start up worker threads if MT mode */ +#ifdef PROXY + if (settings.proxy_enabled) { + proxy_init(); + if (proxy_load_config(settings.proxy_ctx) != 0) { + exit(EXIT_FAILURE); + } + } +#endif #ifdef EXTSTORE slabs_set_storage(storage); memcached_thread_init(settings.num_threads, storage); diff --git a/memcached.h b/memcached.h index 12385db..b7d4ad5 100644 --- a/memcached.h +++ b/memcached.h @@ -223,7 +223,10 @@ enum bin_substates { enum protocol { ascii_prot = 3, /* arbitrary value. */ binary_prot, - negotiating_prot /* Discovering the protocol */ + negotiating_prot, /* Discovering the protocol */ +#ifdef PROXY + proxy_prot, +#endif }; enum network_transport { @@ -329,6 +332,12 @@ struct slab_stats { X(badcrc_from_extstore) #endif +#ifdef PROXY +#define PROXY_THREAD_STATS_FIELDS \ + X(proxy_conn_requests) \ + X(proxy_conn_errors) +#endif + /** * Stats stored per-thread. */ @@ -339,6 +348,9 @@ struct thread_stats { #ifdef EXTSTORE EXTSTORE_THREAD_STATS_FIELDS #endif +#ifdef PROXY + PROXY_THREAD_STATS_FIELDS +#endif #undef X struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES]; uint64_t lru_hits[POWER_LARGEST]; @@ -501,6 +513,11 @@ struct settings { #endif int num_napi_ids; /* maximum number of NAPI IDs */ char *memory_file; /* warm restart memory file path */ +#ifdef PROXY + bool proxy_enabled; + char *proxy_startfile; /* lua file to run when workers start */ + void *proxy_ctx; /* proxy's state context */ +#endif }; extern struct stats stats; @@ -628,6 +645,7 @@ typedef struct { #define IO_QUEUE_NONE 0 #define IO_QUEUE_EXTSTORE 1 +#define IO_QUEUE_PROXY 2 typedef struct _io_pending_t io_pending_t; typedef struct io_queue_s io_queue_t; @@ -690,7 +708,12 @@ typedef struct { char *ssl_wbuf; #endif int napi_id; /* napi id associated with this thread */ - +#ifdef PROXY + void *L; + void *proxy_hooks; + void *proxy_stats; + // TODO: add ctx object so we can attach to queue. +#endif } LIBEVENT_THREAD; /** @@ -905,6 +928,9 @@ extern int daemonize(int nochdir, int noclose); void memcached_thread_init(int nthreads, void *arg); void redispatch_conn(conn *c); void timeout_conn(conn *c); +#ifdef PROXY +void proxy_reload_notify(LIBEVENT_THREAD *t); +#endif void return_io_pending(io_pending_t *io); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl); @@ -945,6 +971,7 @@ void STATS_UNLOCK(void); void threadlocal_stats_reset(void); void threadlocal_stats_aggregate(struct thread_stats *stats); void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out); +LIBEVENT_THREAD *get_worker_thread(int id); /* Stat processing functions */ void append_stat(const char *name, ADD_STAT add_stats, conn *c, diff --git a/proto_proxy.c b/proto_proxy.c new file mode 100644 index 0000000..428e01d --- /dev/null +++ b/proto_proxy.c @@ -0,0 +1,4191 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Functions for handling the proxy layer. wraps text protocols + */ + +#include <string.h> +#include <stdlib.h> +#include <ctype.h> + +#include <lua.h> +#include <lualib.h> +#include <lauxlib.h> + +#include "config.h" + +#if defined(__linux__) +#define USE_EVENTFD 1 +#include <sys/eventfd.h> +#endif + +#ifdef HAVE_LIBURING +#include <liburing.h> +#include <poll.h> // POLLOUT for liburing. +#define PRING_QUEUE_SQ_ENTRIES 2048 +#define PRING_QUEUE_CQ_ENTRIES 16384 +#endif + +#include "memcached.h" +#include "proto_proxy.h" +#include "proto_text.h" +#include "murmur3_hash.h" +#include "queue.h" +#define XXH_INLINE_ALL // modifier for xxh3's include below +#include "xxhash.h" + +#ifdef PROXY_DEBUG +#define P_DEBUG(...) \ + do { \ + fprintf(stderr, __VA_ARGS__); \ + } while (0) +#else +#define P_DEBUG(...) +#endif + +#define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex); +#define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex); +#define WSTAT_INCR(c, stat, amount) { \ + pthread_mutex_lock(&c->thread->stats.mutex); \ + c->thread->stats.stat += amount; \ + pthread_mutex_unlock(&c->thread->stats.mutex); \ +} +#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock); +#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock); +#define STAT_INCR(ctx, stat, amount) { \ + pthread_mutex_lock(&ctx->stats_lock); \ + ctx->global_stats.stat += amount; \ + pthread_mutex_unlock(&ctx->stats_lock); \ +} + +#define STAT_DECR(ctx, stat, amount) { \ + pthread_mutex_lock(&ctx->stats_lock); \ + ctx->global_stats.stat -= amount; \ + pthread_mutex_unlock(&ctx->stats_lock); \ +} + +// FIXME: do include dir properly. +#include "vendor/mcmc/mcmc.h" + +// Note: value created from thin air. Could be shorter. +#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2 + +#define ENDSTR "END\r\n" +#define ENDLEN sizeof(ENDSTR)-1 + +#define MCP_THREAD_UPVALUE 1 +#define MCP_ATTACH_UPVALUE 2 +#define MCP_BACKEND_UPVALUE 3 + +// all possible commands. +#define CMD_FIELDS \ + X(CMD_MG) \ + X(CMD_MS) \ + X(CMD_MD) \ + X(CMD_MN) \ + X(CMD_MA) \ + X(CMD_ME) \ + X(CMD_GET) \ + X(CMD_GAT) \ + X(CMD_SET) \ + X(CMD_ADD) \ + X(CMD_CAS) \ + X(CMD_GETS) \ + X(CMD_GATS) \ + X(CMD_INCR) \ + X(CMD_DECR) \ + X(CMD_TOUCH) \ + X(CMD_APPEND) \ + X(CMD_DELETE) \ + X(CMD_REPLACE) \ + X(CMD_PREPEND) \ + X(CMD_END_STORAGE) \ + X(CMD_QUIT) \ + X(CMD_STATS) \ + X(CMD_SLABS) \ + X(CMD_WATCH) \ + X(CMD_LRU) \ + X(CMD_VERSION) \ + X(CMD_SHUTDOWN) \ + X(CMD_EXTSTORE) \ + X(CMD_FLUSH_ALL) \ + X(CMD_VERBOSITY) \ + X(CMD_LRU_CRAWLER) \ + X(CMD_REFRESH_CERTS) \ + X(CMD_CACHE_MEMLIMIT) + +#define X(name) name, +enum proxy_defines { + P_OK = 0, + CMD_FIELDS + CMD_SIZE, // used to define array size for command hooks. + CMD_ANY, // override _all_ commands + CMD_ANY_STORAGE, // override commands specific to key storage. +}; +#undef X + +// certain classes of ascii commands have similar parsing (ie; +// get/gets/gat/gats). Use types so we don't have to test a ton of them. +enum proxy_cmd_types { + CMD_TYPE_GENERIC = 0, + CMD_TYPE_GET, // get/gets/gat/gats + CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace + CMD_TYPE_META, // m*'s. +}; + +typedef struct _io_pending_proxy_t io_pending_proxy_t; +typedef struct proxy_event_thread_s proxy_event_thread_t; + +#ifdef HAVE_LIBURING +typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe); +typedef struct { + void *udata; + proxy_event_cb cb; + bool set; // NOTE: not sure if necessary if code structured properly +} proxy_event_t; + +static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t); +static void *proxy_event_thread_ur(void *arg); +#endif + +struct proxy_user_stats { + size_t num_stats; // number of stats, for sizing various arrays + char **names; // not needed for worker threads + uint64_t *counters; // array of counters. +}; + +struct proxy_global_stats { + uint64_t config_reloads; + uint64_t config_reload_fails; + uint64_t backend_total; + uint64_t backend_disconn; // backends with no connections + uint64_t backend_requests; // reqs sent to backends + uint64_t backend_responses; // responses received from backends + uint64_t backend_errors; // errors from backends +}; + +struct proxy_timeouts { + struct timeval connect; + struct timeval retry; // wait time before retrying a dead backend + struct timeval read; +}; + +typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t; +typedef struct { + lua_State *proxy_state; + void *proxy_code; + proxy_event_thread_t *proxy_threads; + pthread_mutex_t config_lock; + pthread_cond_t config_cond; + pthread_t config_tid; + pthread_mutex_t worker_lock; + pthread_cond_t worker_cond; + pthread_t manager_tid; // deallocation management thread + pthread_mutex_t manager_lock; + pthread_cond_t manager_cond; + pool_head_t manager_head; // stack for pool deallocation. + bool worker_done; // signal variable for the worker lock/cond system. + bool worker_failed; // covered by worker_lock as well. + struct proxy_global_stats global_stats; + struct proxy_user_stats user_stats; + struct proxy_timeouts timeouts; // NOTE: updates covered by stats_lock + pthread_mutex_t stats_lock; // used for rare global counters +} proxy_ctx_t; + +struct proxy_hook { + int lua_ref; + bool is_lua; // pull the lua reference and call it as a lua function. +}; + +typedef uint32_t (*hash_selector_func)(const void *key, size_t len, void *ctx); +struct proxy_hash_caller { + hash_selector_func selector_func; + void *ctx; +}; + +// A default hash function for backends. +static uint32_t mcplib_hashfunc_murmur3_func(const void *key, size_t len, void *ctx) { + return MurmurHash3_x86_32(key, len); +} +static struct proxy_hash_caller mcplib_hashfunc_murmur3 = { mcplib_hashfunc_murmur3_func, NULL}; + +enum mcp_backend_states { + mcp_backend_read = 0, // waiting to read any response + mcp_backend_parse, // have some buffered data to check + mcp_backend_read_end, // looking for an "END" marker for GET + mcp_backend_want_read, // read more data to complete command + mcp_backend_next, // advance to the next IO +}; + +typedef struct mcp_backend_s mcp_backend_t; +typedef struct mcp_request_s mcp_request_t; +typedef struct mcp_parser_s mcp_parser_t; + +#define PARSER_MAX_TOKENS 24 + +struct mcp_parser_meta_s { + uint64_t flags; +}; + +// Note that we must use offsets into request for tokens, +// as *request can change between parsing and later accessors. +struct mcp_parser_s { + const char *request; + void *vbuf; // temporary buffer for holding value lengths. + uint8_t command; + uint8_t cmd_type; // command class. + uint8_t ntokens; + uint8_t keytoken; // because GAT. sigh. also cmds without a key. + uint32_t parsed; // how far into the request we parsed already + uint32_t reqlen; // full length of request buffer. + int vlen; + uint32_t klen; // length of key. + uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token + bool has_space; // a space was found after the last byte parsed. + union { + struct mcp_parser_meta_s meta; + } t; +}; + +#define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]]) + +#define MAX_REQ_TOKENS 2 +struct mcp_request_s { + mcp_parser_t pr; // non-lua-specific parser handling. + struct timeval start; // time this object was created. + mcp_backend_t *be; // backend handling this request. + bool ascii_multiget; // ascii multiget mode. (hide errors/END) + bool was_modified; // need to rewrite the request + int tokent_ref; // reference to token table if modified. + char request[]; +}; + +typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t; +#define MAX_IPLEN 45 +#define MAX_PORTLEN 6 +struct mcp_backend_s { + char ip[MAX_IPLEN+1]; + char port[MAX_PORTLEN+1]; + double weight; + int depth; + int failed_count; // number of fails (timeouts) in a row + pthread_mutex_t mutex; // covers stack. + proxy_event_thread_t *event_thread; // event thread owning this backend. + void *client; // mcmc client + STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends + io_head_t io_head; // stack of requests. + char *rbuf; // static allocated read buffer. + struct event event; // libevent +#ifdef HAVE_LIBURING + proxy_event_t ur_rd_ev; // liburing. + proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling +#endif + enum mcp_backend_states state; // readback state machine + bool connecting; // in the process of an asynch connection. + bool can_write; // recently got a WANT_WRITE or are connecting. + bool stacked; // if backend already queued for syscalls. + bool bad; // timed out, marked as bad. +}; +typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t; + +struct proxy_event_thread_s { + pthread_t thread_id; + struct event_base *base; + struct event notify_event; // listen event for the notify pipe/eventfd. + struct event clock_event; // timer for updating event thread data. +#ifdef HAVE_LIBURING + struct io_uring ring; + proxy_event_t ur_notify_event; // listen on eventfd. + eventfd_t event_counter; + bool use_uring; +#endif + pthread_mutex_t mutex; // covers stack. + pthread_cond_t cond; // condition to wait on while stack drains. + io_head_t io_head_in; // inbound requests to process. + be_head_t be_head; // stack of backends for processing. +#ifdef USE_EVENTFD + int event_fd; +#else + int notify_receive_fd; + int notify_send_fd; +#endif + proxy_ctx_t *ctx; // main context. + struct proxy_timeouts timeouts; // periodically copied from main ctx +}; + +#define RESP_CMD_MAX 8 +typedef struct { + mcmc_resp_t resp; + struct timeval start; // start time inherited from paired request + char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly. + int status; // status code from mcmc_read() + char *buf; // response line + potentially value. + size_t blen; // total size of the value to read. + int bread; // amount of bytes read into value so far. +} mcp_resp_t; + +// re-cast an io_pending_t into this more descriptive structure. +// the first few items _must_ match the original struct. +struct _io_pending_proxy_t { + int io_queue_type; + LIBEVENT_THREAD *thread; + conn *c; + mc_resp *resp; // original struct ends here + + struct _io_pending_proxy_t *next; // stack for IO submission + STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends + int coro_ref; // lua registry reference to the coroutine + int mcpres_ref; // mcp.res reference used for await() + lua_State *coro; // pointer directly to the coroutine + mcp_backend_t *backend; // backend server to request from + struct iovec iov[2]; // request string + tail buffer + int iovcnt; // 1 or 2... + int await_ref; // lua reference if we were an await object + mcp_resp_t *client_resp; // reference (currently pointing to a lua object) + bool flushed; // whether we've fully written this request to a backend. + bool ascii_multiget; // passed on from mcp_r_t + bool is_await; // are we an await object? +}; + +// Note: does *be have to be a sub-struct? how stable are userdata pointers? +// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers +// - says no. +typedef struct { + int ref; // luaL_ref reference. + mcp_backend_t *be; +} mcp_pool_be_t; + +typedef struct mcp_pool_s mcp_pool_t; +struct mcp_pool_s { + struct proxy_hash_caller phc; + pthread_mutex_t lock; // protects refcount. + proxy_ctx_t *ctx; // main context. + STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator. + int refcount; + int phc_ref; + int self_ref; // TODO: double check that this is needed. + int pool_size; + mcp_pool_be_t pool[]; +}; + +typedef struct { + mcp_pool_t *main; // ptr to original +} mcp_pool_proxy_t; + +static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c); +#define PROCESS_MULTIGET true +#define PROCESS_NORMAL false +static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget); +static size_t _process_request_next_key(mcp_parser_t *pr); +static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen); +static void dump_stack(lua_State *L); +static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc); +static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen); +static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p); +static int mcplib_await_run(conn *c, lua_State *L, int coro_ref); +static int mcplib_await_return(io_pending_proxy_t *p); +static void proxy_backend_handler(const int fd, const short which, void *arg); +static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); +static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); +static void *proxy_event_thread(void *arg); +static void proxy_out_errstring(mc_resp *resp, const char *str); +static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p); +static int _reset_bad_backend(mcp_backend_t *be); +static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); +static int proxy_thread_loadconf(LIBEVENT_THREAD *thr); +static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread); + +/******** EXTERNAL FUNCTIONS ******/ +// functions starting with _ are breakouts for the public functions. + +// see also: process_extstore_stats() +// FIXME: get context off of conn? global variables +// FIXME: stat coverage +void proxy_stats(ADD_STAT add_stats, conn *c) { + if (!settings.proxy_enabled) { + return; + } + proxy_ctx_t *ctx = settings.proxy_ctx; + STAT_L(ctx); + + APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads); + APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails); + APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total); + STAT_UL(ctx); +} + +void process_proxy_stats(ADD_STAT add_stats, conn *c) { + char key_str[STAT_KEY_LEN]; + + if (!settings.proxy_enabled) { + return; + } + proxy_ctx_t *ctx = settings.proxy_ctx; + STAT_L(ctx); + + // prepare aggregated counters. + struct proxy_user_stats *us = &ctx->user_stats; + uint64_t counters[us->num_stats]; + memset(counters, 0, sizeof(counters)); + + // aggregate worker thread counters. + for (int x = 0; x < settings.num_threads; x++) { + LIBEVENT_THREAD *t = get_worker_thread(x); + struct proxy_user_stats *tus = t->proxy_stats; + WSTAT_L(t); + for (int i = 0; i < tus->num_stats; i++) { + counters[i] += tus->counters[i]; + } + WSTAT_UL(t); + } + + // return all of the stats + for (int x = 0; x < us->num_stats; x++) { + snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]); + APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]); + } + STAT_UL(ctx); +} + +struct _dumpbuf { + size_t size; + size_t used; + char *buf; +}; + +static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) { + (void)L; + struct _dumpbuf *db = ud; + if (db->used + sz > db->size) { + db->size *= 2; + char *nb = realloc(db->buf, db->size); + if (nb == NULL) { + return -1; + } + db->buf = nb; + } + memcpy(db->buf + db->used, (const char *)p, sz); + db->used += sz; + return 0; +} + +static const char * _load_helper(lua_State *L, void *data, size_t *size) { + (void)L; + struct _dumpbuf *db = data; + if (db->used == 0) { + *size = 0; + return NULL; + } + *size = db->used; + db->used = 0; + return db->buf; +} + +void proxy_start_reload(void *arg) { + proxy_ctx_t *ctx = arg; + if (pthread_mutex_trylock(&ctx->config_lock) == 0) { + pthread_cond_signal(&ctx->config_cond); + pthread_mutex_unlock(&ctx->config_lock); + } +} + +// Manages a queue of inbound objects destined to be deallocated. +static void *_proxy_manager_thread(void *arg) { + proxy_ctx_t *ctx = arg; + pool_head_t head; + + pthread_mutex_lock(&ctx->manager_lock); + while (1) { + STAILQ_INIT(&head); + while (STAILQ_EMPTY(&ctx->manager_head)) { + pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock); + } + + // pull dealloc queue into local queue. + STAILQ_CONCAT(&head, &ctx->manager_head); + pthread_mutex_unlock(&ctx->manager_lock); + + // Config lock is required for using config VM. + pthread_mutex_lock(&ctx->config_lock); + lua_State *L = ctx->proxy_state; + mcp_pool_t *p; + STAILQ_FOREACH(p, &head, next) { + // walk the hash selector backends and unref. + for (int x = 0; x < p->pool_size; x++) { + luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref); + } + // unref the phc ref. + luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref); + // need to... unref self. + // NOTE: double check if we really need to self-reference. + // this is a backup here to ensure the external refcounts hit zero + // before lua garbage collects the object. other things hold a + // reference to the object though. + luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref); + // that's it? let it float? + } + pthread_mutex_unlock(&ctx->config_lock); + + // done. + pthread_mutex_lock(&ctx->manager_lock); + } + + return NULL; +} + +// Thread handling the configuration reload sequence. +// TODO: get a logger instance. +// TODO: making this "safer" will require a few phases of work. +// 1) JFDI +// 2) "test VM" -> from config thread, test the worker reload portion. +// 3) "unit testing" -> from same temporary worker VM, execute set of +// integration tests that must pass. +// 4) run update on each worker, collecting new mcp.attach() hooks. +// Once every worker has successfully executed and set new hooks, roll +// through a _second_ time to actually swap the hook structures and unref +// the old structures where marked dirty. +static void *_proxy_config_thread(void *arg) { + proxy_ctx_t *ctx = arg; + + logger_create(); + pthread_mutex_lock(&ctx->config_lock); + while (1) { + pthread_cond_wait(&ctx->config_cond, &ctx->config_lock); + LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "start"); + STAT_INCR(ctx, config_reloads, 1); + lua_State *L = ctx->proxy_state; + lua_settop(L, 0); // clear off any crud that could have been left on the stack. + + // The main stages of config reload are: + // - load and execute the config file + // - run mcp_config_pools() + // - for each worker: + // - copy and execute new lua code + // - copy selector table + // - run mcp_config_routes() + + if (proxy_load_config(ctx) != 0) { + // Failed to load. log and wait for a retry. + STAT_INCR(ctx, config_reload_fails, 1); + LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); + continue; + } + + // TODO: create a temporary VM to test-load the worker code into. + // failing to load partway through the worker VM reloads can be + // critically bad if we're not careful about references. + // IE: the config VM _must_ hold references to selectors and backends + // as long as they exist in any worker for any reason. + + for (int x = 0; x < settings.num_threads; x++) { + LIBEVENT_THREAD *thr = get_worker_thread(x); + + pthread_mutex_lock(&ctx->worker_lock); + ctx->worker_done = false; + ctx->worker_failed = false; + proxy_reload_notify(thr); + while (!ctx->worker_done) { + // in case of spurious wakeup. + pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock); + } + pthread_mutex_unlock(&ctx->worker_lock); + + // Code load bailed. + if (ctx->worker_failed) { + STAT_INCR(ctx, config_reload_fails, 1); + LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); + continue; + } + } + LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "done"); + } + + return NULL; +} + +static int _start_proxy_config_threads(proxy_ctx_t *ctx) { + int ret; + + pthread_mutex_lock(&ctx->config_lock); + if ((ret = pthread_create(&ctx->config_tid, NULL, + _proxy_config_thread, ctx)) != 0) { + fprintf(stderr, "Failed to start proxy configuration thread: %s\n", + strerror(ret)); + pthread_mutex_unlock(&ctx->config_lock); + return -1; + } + pthread_mutex_unlock(&ctx->config_lock); + + pthread_mutex_lock(&ctx->manager_lock); + if ((ret = pthread_create(&ctx->manager_tid, NULL, + _proxy_manager_thread, ctx)) != 0) { + fprintf(stderr, "Failed to start proxy configuration thread: %s\n", + strerror(ret)); + pthread_mutex_unlock(&ctx->manager_lock); + return -1; + } + pthread_mutex_unlock(&ctx->manager_lock); + + return 0; +} + +// TODO: IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple +// event threads. +static void _proxy_init_evthread_events(proxy_event_thread_t *t) { +#ifdef HAVE_LIBURING + bool use_uring = true; + struct io_uring_params p = {0}; + assert(t->event_fd); // uring only exists where eventfd also does. + + // Setup the CQSIZE to be much larger than SQ size, since backpressure + // issues can cause us to block on SQ submissions and as a network server, + // stuff happens. + p.flags = IORING_SETUP_CQSIZE; + p.cq_entries = PRING_QUEUE_CQ_ENTRIES; + int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p); + if (ret) { + perror("io_uring_queue_init_params"); + exit(1); + } + if (!(p.features & IORING_FEAT_NODROP)) { + fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n"); + use_uring = false; + } + if (!(p.features & IORING_FEAT_SINGLE_MMAP)) { + fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n"); + use_uring = false; + } + if (!(p.features & IORING_FEAT_FAST_POLL)) { + fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n"); + use_uring = false; + } + + if (use_uring) { + // FIXME: Sigh. we need a blocking event_fd for io_uring but we've a + // chicken and egg in here. need a better structure... in meantime + // re-create the event_fd. + close(t->event_fd); + t->event_fd = eventfd(0, 0); + // FIXME: hack for event init. + t->ur_notify_event.set = false; + _proxy_evthr_evset_notifier(t); + t->use_uring = true; + return; + } else { + // Decided to not use io_uring, so don't waste memory. + io_uring_queue_exit(&t->ring); + } +#endif + + struct event_config *ev_config; + ev_config = event_config_new(); + event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); + t->base = event_base_new_with_config(ev_config); + event_config_free(ev_config); + if (! t->base) { + fprintf(stderr, "Can't allocate event base\n"); + exit(1); + } + + // listen for notifications. + // NULL was thread_libevent_process + // FIXME: use modern format? (event_assign) +#ifdef USE_EVENTFD + event_set(&t->notify_event, t->event_fd, + EV_READ | EV_PERSIST, proxy_event_handler, t); +#else + event_set(&t->notify_event, t->notify_receive_fd, + EV_READ | EV_PERSIST, proxy_event_handler, t); +#endif + + evtimer_set(&t->clock_event, proxy_event_updater, t); + event_base_set(t->base, &t->clock_event); + struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; + evtimer_add(&t->clock_event, &rate); + + event_base_set(t->base, &t->notify_event); + if (event_add(&t->notify_event, 0) == -1) { + fprintf(stderr, "Can't monitor libevent notify pipe\n"); + exit(1); + } + +} + +// start the centralized lua state and config thread. +// TODO: return ctx/state. avoid global vars. +void proxy_init(void) { + proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t)); + settings.proxy_ctx = ctx; // FIXME: return and deal with outside? + + pthread_mutex_init(&ctx->config_lock, NULL); + pthread_cond_init(&ctx->config_cond, NULL); + pthread_mutex_init(&ctx->worker_lock, NULL); + pthread_cond_init(&ctx->worker_cond, NULL); + pthread_mutex_init(&ctx->manager_lock, NULL); + pthread_cond_init(&ctx->manager_cond, NULL); + pthread_mutex_init(&ctx->stats_lock, NULL); + + // FIXME: default defines. + ctx->timeouts.connect.tv_sec = 5; + ctx->timeouts.retry.tv_sec = 3; + ctx->timeouts.read.tv_sec = 3; + + STAILQ_INIT(&ctx->manager_head); + lua_State *L = luaL_newstate(); + ctx->proxy_state = L; + luaL_openlibs(L); + // NOTE: might need to differentiate the libs yes? + proxy_register_libs(NULL, L); + + // Create/start the backend threads, which we need before servers + // start getting created. + // Supporting N event threads should be possible, but it will be a + // low number of N to avoid too many wakeup syscalls. + // For now we hardcode to 1. + proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t)); + ctx->proxy_threads = threads; + for (int i = 0; i < 1; i++) { + proxy_event_thread_t *t = &threads[i]; +#ifdef USE_EVENTFD + t->event_fd = eventfd(0, EFD_NONBLOCK); + // FIXME: eventfd can fail? +#else + int fds[2]; + if (pipe(fds)) { + perror("can't create proxy backend notify pipe"); + exit(1); + } + + t->notify_receive_fd = fds[0]; + t->notify_send_fd = fds[1]; +#endif + _proxy_init_evthread_events(t); + + // incoming request queue. + STAILQ_INIT(&t->io_head_in); + pthread_mutex_init(&t->mutex, NULL); + pthread_cond_init(&t->cond, NULL); + + t->ctx = ctx; + memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts)); + +#ifdef HAVE_LIBURING + if (t->use_uring) { + pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); + } else { + pthread_create(&t->thread_id, NULL, proxy_event_thread, t); + } +#else + pthread_create(&t->thread_id, NULL, proxy_event_thread, t); +#endif // HAVE_LIBURING + } + + _start_proxy_config_threads(ctx); +} + +int proxy_load_config(void *arg) { + proxy_ctx_t *ctx = arg; + lua_State *L = ctx->proxy_state; + int res = luaL_loadfile(L, settings.proxy_startfile); + if (res != LUA_OK) { + fprintf(stderr, "Failed to load proxy_startfile: %s\n", lua_tostring(L, -1)); + return -1; + } + // LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE + + // Now we need to dump the compiled code into bytecode. + // This will then get loaded into worker threads. + struct _dumpbuf *db = malloc(sizeof(struct _dumpbuf)); + db->size = 16384; + db->used = 0; + db->buf = malloc(db->size); + lua_dump(L, _dump_helper, db, 0); + // 0 means no error. + ctx->proxy_code = db; + + // now we complete the data load by calling the function. + res = lua_pcall(L, 0, LUA_MULTRET, 0); + if (res != LUA_OK) { + fprintf(stderr, "Failed to load data into lua config state: %s\n", lua_tostring(L, -1)); + exit(EXIT_FAILURE); + } + + // call the mcp_config_pools function to get the central backends. + lua_getglobal(L, "mcp_config_pools"); + + // TODO: handle explicitly if function is missing. + lua_pushnil(L); // no "old" config yet. + if (lua_pcall(L, 1, 1, 0) != LUA_OK) { + fprintf(stderr, "Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1)); + exit(EXIT_FAILURE); + } + + // result is our main config. + return 0; +} + +// TODO: this will be done differently while implementing config reloading. +static int _copy_pool(lua_State *from, lua_State *to) { + // from, -3 should have he userdata. + mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool"); + size_t size = sizeof(mcp_pool_proxy_t); + mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0); + luaL_setmetatable(to, "mcp.pool_proxy"); + // TODO: check pp. + + pp->main = p; + pthread_mutex_lock(&p->lock); + p->refcount++; + pthread_mutex_unlock(&p->lock); + return 0; +} + +static void _copy_config_table(lua_State *from, lua_State *to); +// (from, -1) is the source value +// should end with (to, -1) being the new value. +// TODO: { foo = "bar", { thing = "foo" } } fails for lua_next() post final +// table. +static void _copy_config_table(lua_State *from, lua_State *to) { + int type = lua_type(from, -1); + bool found = false; + switch (type) { + case LUA_TNIL: + lua_pushnil(to); + break; + case LUA_TUSERDATA: + // see dump_stack() - check if it's something we handle. + if (lua_getmetatable(from, -1) != 0) { + lua_pushstring(from, "__name"); + if (lua_rawget(from, -2) != LUA_TNIL) { + const char *name = lua_tostring(from, -1); + if (strcmp(name, "mcp.pool") == 0) { + // FIXME: check result + _copy_pool(from, to); + found = true; + } + } + lua_pop(from, 2); + } + if (!found) { + fprintf(stderr, "unhandled userdata type\n"); + exit(1); + } + break; + case LUA_TNUMBER: + // FIXME: since 5.3 there's some sub-thing you need to do to push + // float vs int. + lua_pushnumber(to, lua_tonumber(from, -1)); + break; + case LUA_TSTRING: + // FIXME: temp var + tolstring worth doing? + lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1)); + break; + case LUA_TTABLE: + // TODO: huge table could cause stack exhaustion? have to do + // checkstack perhaps? + // TODO: copy the metatable first? + // TODO: size narr/nrec from old table and use createtable to + // pre-allocate. + lua_newtable(to); // throw new table on worker + int t = lua_absindex(from, -1); // static index of table to copy. + int nt = lua_absindex(to, -1); // static index of new table. + lua_pushnil(from); // start iterator for main + while (lua_next(from, t) != 0) { + // (key, -2), (val, -1) + // TODO: check what key is (it can be anything :|) + // to allow an optimization later lets restrict it to strings + // and numbers. + // don't coerce it to a string unless it already is one. + lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2)); + // lua_settable(to, n) - n being the table + // takes -2 key -1 value, pops both. + // use lua_absindex(L, -1) and so to convert easier? + _copy_config_table(from, to); // push next value. + lua_settable(to, nt); + lua_pop(from, 1); // drop value, keep key. + } + // top of from is now the original table. + // top of to should be the new table. + break; + default: + // FIXME: error. + fprintf(stderr, "unhandled type\n"); + exit(1); + } +} + +// Run from proxy worker to coordinate code reload. +// config_lock must be held first. +void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) { + proxy_ctx_t *ctx = arg; + pthread_mutex_lock(&ctx->worker_lock); + if (proxy_thread_loadconf(thr) != 0) { + ctx->worker_failed = true; + } + ctx->worker_done = true; + pthread_cond_signal(&ctx->worker_cond); + pthread_mutex_unlock(&ctx->worker_lock); +} + +// FIXME: need to test how to recover from an actual error here. stuff gets +// left on the stack? +static int proxy_thread_loadconf(LIBEVENT_THREAD *thr) { + lua_State *L = thr->L; + // load the precompiled config function. + proxy_ctx_t *ctx = settings.proxy_ctx; + struct _dumpbuf *db = ctx->proxy_code; + struct _dumpbuf db2; // copy because the helper modifies it. + memcpy(&db2, db, sizeof(struct _dumpbuf)); + + lua_load(L, _load_helper, &db2, "config", NULL); + // LUA_OK + all errs from loadfile except LUA_ERRFILE. + //dump_stack(L); + // - pcall the func (which should load it) + int res = lua_pcall(L, 0, LUA_MULTRET, 0); + if (res != LUA_OK) { + // FIXME: no crazy failure here! + fprintf(stderr, "Failed to load data into worker thread\n"); + return -1; + } + + lua_getglobal(L, "mcp_config_routes"); + // create deepcopy of argument to pass into mcp_config_routes. + _copy_config_table(ctx->proxy_state, L); + + // copied value is in front of route function, now call it. + if (lua_pcall(L, 1, 1, 0) != LUA_OK) { + fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1)); + return -1; + } + + // update user stats + STAT_L(ctx); + struct proxy_user_stats *us = &ctx->user_stats; + struct proxy_user_stats *tus = NULL; + if (us->num_stats != 0) { + pthread_mutex_lock(&thr->stats.mutex); + if (thr->proxy_stats == NULL) { + tus = calloc(1, sizeof(struct proxy_user_stats)); + thr->proxy_stats = tus; + } else { + tus = thr->proxy_stats; + } + + // originally this was a realloc routine but it felt fragile. + // that might still be a better idea; still need to zero out the end. + uint64_t *counters = calloc(us->num_stats, sizeof(uint64_t)); + + // note that num_stats can _only_ grow in size. + // we also only care about counters on the worker threads. + if (tus->counters) { + assert(tus->num_stats <= us->num_stats); + memcpy(counters, tus->counters, tus->num_stats * sizeof(uint64_t)); + free(tus->counters); + } + + tus->counters = counters; + tus->num_stats = us->num_stats; + pthread_mutex_unlock(&thr->stats.mutex); + } + STAT_UL(ctx); + + return 0; +} + +// Initialize the VM for an individual worker thread. +void proxy_thread_init(LIBEVENT_THREAD *thr) { + // Create the hook table. + thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook)); + if (thr->proxy_hooks == NULL) { + fprintf(stderr, "Failed to allocate proxy hooks\n"); + exit(EXIT_FAILURE); + } + + // Initialize the lua state. + lua_State *L = luaL_newstate(); + thr->L = L; + luaL_openlibs(L); + proxy_register_libs(thr, L); + + // kick off the configuration. + if (proxy_thread_loadconf(thr) != 0) { + exit(EXIT_FAILURE); + } +} + +// ctx_stack is a stack of io_pending_proxy_t's. +void proxy_submit_cb(io_queue_t *q) { + proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads; + io_pending_proxy_t *p = q->stack_ctx; + io_head_t head; + STAILQ_INIT(&head); + + // NOTE: responses get returned in the correct order no matter what, since + // mc_resp's are linked. + // we just need to ensure stuff is parsed off the backend in the correct + // order. + // So we can do with a single list here, but we need to repair the list as + // responses are parsed. (in the req_remaining-- section) + // TODO: + // - except we can't do that because the deferred IO stack isn't + // compatible with queue.h. + // So for now we build the secondary list with an STAILQ, which + // can be transplanted/etc. + while (p) { + // insert into tail so head is oldest request. + STAILQ_INSERT_TAIL(&head, p, io_next); + if (!p->is_await) { + // funny workaround: awaiting IOP's don't count toward + // resuming a connection, only the completion of the await + // condition. + q->count++; + } + + p = p->next; + } + + // clear out the submit queue so we can re-queue new IO's inline. + q->stack_ctx = NULL; + + // Transfer request stack to event thread. + pthread_mutex_lock(&e->mutex); + STAILQ_CONCAT(&e->io_head_in, &head); + // No point in holding the lock since we're not doing a cond signal. + pthread_mutex_unlock(&e->mutex); + + // Signal to check queue. + // TODO: error handling. +#ifdef USE_EVENTFD + uint64_t u = 1; + // FIXME: check result? is it ever possible to get a short write/failure + // for an eventfd? + if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + assert(1 == 0); + } +#else + if (write(e->notify_send_fd, "w", 1) <= 0) { + assert(1 == 0); + } +#endif + + return; +} + +void proxy_complete_cb(io_queue_t *q) { + /* + io_pending_proxy_t *p = q->stack_ctx; + q->stack_ctx = NULL; + + while (p) { + io_pending_proxy_t *next = p->next; + mc_resp *resp = p->resp; + lua_State *Lc = p->coro; + + // in order to resume we need to remove the objects that were + // originally returned + // what's currently on the top of the stack is what we want to keep. + lua_rotate(Lc, 1, 1); + // We kept the original results from the yield so lua would not + // collect them in the meantime. We can drop those now. + lua_settop(Lc, 1); + + proxy_run_coroutine(Lc, resp, p, NULL); + + // don't need to flatten main thread here, since the coro is gone. + + p = next; + } + return; + */ +} + +// called from worker thread after an individual IO has been returned back to +// the worker thread. Do post-IO run and cleanup work. +void proxy_return_cb(io_pending_t *pending) { + io_pending_proxy_t *p = (io_pending_proxy_t *)pending; + if (p->is_await) { + mcplib_await_return(p); + } else { + lua_State *Lc = p->coro; + + // in order to resume we need to remove the objects that were + // originally returned + // what's currently on the top of the stack is what we want to keep. + lua_rotate(Lc, 1, 1); + // We kept the original results from the yield so lua would not + // collect them in the meantime. We can drop those now. + lua_settop(Lc, 1); + + // p can be freed/changed from the call below, so fetch the queue now. + io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); + conn *c = p->c; + proxy_run_coroutine(Lc, p->resp, p, NULL); + + q->count--; + if (q->count == 0) { + // call re-add directly since we're already in the worker thread. + conn_worker_readd(c); + } + } +} + +// called from the worker thread as an mc_resp is being freed. +// must let go of the coroutine reference if there is one. +// caller frees the pending IO. +void proxy_finalize_cb(io_pending_t *pending) { + io_pending_proxy_t *p = (io_pending_proxy_t *)pending; + + // release our coroutine reference. + // TODO: coroutines are reusable in lua 5.4. we can stack this onto a freelist + // after a lua_resetthread(Lc) call. + if (p->coro_ref) { + // Note: lua registry is the same for main thread or a coroutine. + luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref); + } + return; +} + +int try_read_command_proxy(conn *c) { + char *el, *cont; + + if (c->rbytes == 0) + return 0; + + el = memchr(c->rcurr, '\n', c->rbytes); + if (!el) { + if (c->rbytes > 1024) { + /* + * We didn't have a '\n' in the first k. This _has_ to be a + * large multiget, if not we should just nuke the connection. + */ + char *ptr = c->rcurr; + while (*ptr == ' ') { /* ignore leading whitespaces */ + ++ptr; + } + + if (ptr - c->rcurr > 100 || + (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) { + + conn_set_state(c, conn_closing); + return 1; + } + + // ASCII multigets are unbound, so our fixed size rbuf may not + // work for this particular workload... For backcompat we'll use a + // malloc/realloc/free routine just for this. + if (!c->rbuf_malloced) { + if (!rbuf_switch_to_malloc(c)) { + conn_set_state(c, conn_closing); + return 1; + } + } + } + + return 0; + } + cont = el + 1; + // TODO: we don't want to cut the \r\n here. lets see how lua handles + // non-terminated strings? + /*if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { + el--; + } + *el = '\0';*/ + + assert(cont <= (c->rcurr + c->rbytes)); + + c->last_cmd_time = current_time; + proxy_process_command(c, c->rcurr, cont - c->rcurr, PROCESS_NORMAL); + + c->rbytes -= (cont - c->rcurr); + c->rcurr = cont; + + assert(c->rcurr <= (c->rbuf + c->rsize)); + + return 1; + +} + +// we buffered a SET of some kind. +void complete_nread_proxy(conn *c) { + assert(c != NULL); + + conn_set_state(c, conn_new_cmd); + + LIBEVENT_THREAD *thr = c->thread; + lua_State *L = thr->L; + lua_State *Lc = lua_tothread(L, -1); + // FIXME: could use a quicker method to retrieve the request. + mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request"); + + // validate the data chunk. + if (strncmp((char *)c->item + rq->pr.vlen - 2, "\r\n", 2) != 0) { + // TODO: error handling. + lua_settop(L, 0); // clear anything remaining on the main thread. + return; + } + rq->pr.vbuf = c->item; + c->item = NULL; + c->item_malloced = false; + + proxy_run_coroutine(Lc, c->resp, NULL, c); + + lua_settop(L, 0); // clear anything remaining on the main thread. + + return; +} + +/******** NETWORKING AND INTERNAL FUNCTIONS ******/ + +static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { + io_head_t head; + + STAILQ_INIT(&head); + STAILQ_INIT(&t->be_head); + + // Pull the entire stack of inbound into local queue. + pthread_mutex_lock(&t->mutex); + STAILQ_CONCAT(&head, &t->io_head_in); + pthread_mutex_unlock(&t->mutex); + + int io_count = 0; + int be_count = 0; + while (!STAILQ_EMPTY(&head)) { + io_pending_proxy_t *io = STAILQ_FIRST(&head); + io->flushed = false; + mcp_backend_t *be = io->backend; + // So the backend can retrieve its event base. + be->event_thread = t; + + // _no_ mutex on backends. they are owned by the event thread. + STAILQ_REMOVE_HEAD(&head, io_next); + if (be->bad) { + P_DEBUG("%s: fast failing request to bad backend\n", __func__); + io->client_resp->status = MCMC_ERR; + return_io_pending((io_pending_t *)io); + continue; + } + STAILQ_INSERT_TAIL(&be->io_head, io, io_next); + be->depth++; + io_count++; + if (!be->stacked) { + be->stacked = true; + STAILQ_INSERT_TAIL(&t->be_head, be, be_next); + be_count++; + } + } + //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count); + return io_count; +} + +#ifdef HAVE_LIBURING +static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len); +static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be); + +// read handler. +static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + int bread = cqe->res; + char *rbuf = NULL; + size_t toread = 0; + // TODO: check bread for disconnect/etc. + + int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread); + P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread); + + if (res > 0) { + _proxy_evthr_evset_be_read(be, rbuf, toread); + } else if (res == -1) { + _reset_bad_backend(be); + return; + } + + // FIXME: when exactly do we need to reset the backend handler? + if (!STAILQ_EMPTY(&be->io_head)) { + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE); + } +} + +static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + int flags = 0; + + be->can_write = true; + if (be->connecting) { + int err = 0; + // We were connecting, now ensure we're properly connected. + if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { + // kick the bad backend, clear the queue, retry later. + // FIXME: if a connect fails, anything currently in the queue + // should be safe to hold up until their timeout. + _reset_bad_backend(be); + //_backend_failed(be); // FIXME: need a uring version of this. + P_DEBUG("%s: backend failed to connect\n", __func__); + return; + } + P_DEBUG("%s: backend connected\n", __func__); + be->connecting = false; + be->state = mcp_backend_read; + be->bad = false; + be->failed_count = 0; + } + io_pending_proxy_t *io = NULL; + int res = 0; + STAILQ_FOREACH(io, &be->io_head, io_next) { + res = _flush_pending_write(be, io); + if (res != -1) { + flags |= res; + if (flags & EV_WRITE) { + break; + } + } else { + break; + } + } + if (res == -1) { + _reset_bad_backend(be); + return; + } + + if (flags & EV_WRITE) { + _proxy_evthr_evset_be_wrpoll(be); + } + + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE); +} + +static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { + proxy_event_thread_t *t = udata; + + // liburing always uses eventfd for the notifier. + // *cqe has our result. + assert(cqe->res != -EINVAL); + if (cqe->res != sizeof(eventfd_t)) { + P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res); + // FIXME: figure out if this is impossible, and how to handle if not. + assert(1 == 0); + } + + // need to re-arm the listener every time. + _proxy_evthr_evset_notifier(t); + + // TODO: sqe queues for writing to backends + // - _ur handler for backend write completion is to set a read event and + // re-submit. ugh. + // Should be possible to have standing reads, but flow is harder and lets + // optimize that later. (ie; allow matching reads to a request but don't + // actually dequeue anything until both read and write are confirmed) + if (_proxy_event_handler_dequeue(t) == 0) { + //P_DEBUG("%s: no IO's to complete\n", __func__); + return; + } + + // Re-walk each backend and check set event as required. + mcp_backend_t *be = NULL; + //struct timeval tmp_time = {5,0}; // FIXME: temporary hard coded timeout. + + // TODO: for each backend, queue writev's into sqe's + // move the backend sqe bits into a write complete handler + STAILQ_FOREACH(be, &t->be_head, be_next) { + be->stacked = false; + int flags = 0; + + if (be->connecting) { + P_DEBUG("%s: deferring IO pending connecting\n", __func__); + flags |= EV_WRITE; + } else { + io_pending_proxy_t *io = NULL; + STAILQ_FOREACH(io, &be->io_head, io_next) { + flags = _flush_pending_write(be, io); + if (flags == -1 || flags & EV_WRITE) { + break; + } + } + } + + if (flags == -1) { + _reset_bad_backend(be); + } else { + // FIXME: needs a re-write to handle sqe starvation. + // FIXME: can't actually set the read here? need to confirm _some_ + // write first? + if (flags & EV_WRITE) { + _proxy_evthr_evset_be_wrpoll(be); + } + if (flags & EV_READ) { + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE); + } + } + } +} + +static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be) { + struct io_uring_sqe *sqe; + if (be->ur_wr_ev.set) + return; + + be->ur_wr_ev.cb = proxy_backend_wrhandler_ur; + be->ur_wr_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME: NULL? + + io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT); + io_uring_sqe_set_data(sqe, &be->ur_wr_ev); + be->ur_wr_ev.set = true; +} + +static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len) { + P_DEBUG("%s: setting: %lu\n", __func__, len); + struct io_uring_sqe *sqe; + if (be->ur_rd_ev.set) { + P_DEBUG("%s: already set\n", __func__); + return; + } + + be->ur_rd_ev.cb = proxy_backend_handler_ur; + be->ur_rd_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME: NULL? + assert(be->rbuf != NULL); + io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0); + io_uring_sqe_set_data(sqe, &be->ur_rd_ev); + be->ur_rd_ev.set = true; +} + +static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) { + struct io_uring_sqe *sqe; + P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set); + if (t->ur_notify_event.set) + return; + + t->ur_notify_event.cb = proxy_event_handler_ur; + t->ur_notify_event.udata = t; + + sqe = io_uring_get_sqe(&t->ring); + // FIXME: NULL? + io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0); + io_uring_sqe_set_data(sqe, &t->ur_notify_event); +} + +// TODO: CQE's can generate many SQE's, so we might need to occasionally check +// for space free in the sqe queue and submit in the middle of the cqe +// foreach. +// There might be better places to do this, but I think it's cleaner if +// submission and cqe can stay in this function. +// TODO: The problem is io_submit() can deadlock if too many cqe's are +// waiting. +// Need to understand if this means "CQE's ready to be picked up" or "CQE's in +// flight", because the former is much easier to work around (ie; only run the +// backend handler after dequeuing everything else) +static void *proxy_event_thread_ur(void *arg) { + proxy_event_thread_t *t = arg; + struct io_uring_cqe *cqe; + + P_DEBUG("%s: starting\n", __func__); + + while (1) { + io_uring_submit_and_wait(&t->ring, 1); + //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret); + + uint32_t head = 0; + uint32_t count = 0; + + io_uring_for_each_cqe(&t->ring, head, cqe) { + P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count); + + proxy_event_t *pe = io_uring_cqe_get_data(cqe); + pe->set = false; + pe->cb(pe->udata, cqe); + + count++; + } + + P_DEBUG("%s: advancing [count:%d]\n", __func__, count); + io_uring_cq_advance(&t->ring, count); + } + + return NULL; +} +#endif // HAVE_LIBURING + +// We need to get timeout/retry/etc updates to the event thread(s) +// occasionally. I'd like to have a better inteface around this where updates +// are shipped directly; but this is good enough to start with. +static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { + proxy_event_thread_t *t = arg; + proxy_ctx_t *ctx = t->ctx; + + // TODO: double check how much of this boilerplate is still necessary? + // reschedule the clock event. + evtimer_del(&t->clock_event); + + evtimer_set(&t->clock_event, proxy_event_updater, t); + event_base_set(t->base, &t->clock_event); + struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; + evtimer_add(&t->clock_event, &rate); + + // we reuse the "global stats" lock since it's hardly ever used. + STAT_L(ctx); + memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts)); + STAT_UL(ctx); +} + +// event handler for executing backend requests +static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { + proxy_event_thread_t *t = arg; + +#ifdef USE_EVENTFD + uint64_t u; + if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + // FIXME: figure out if this is impossible, and how to handle if not. + assert(1 == 0); + } +#else + char buf[1]; + // TODO: This is a lot more fatal than it should be. can it fail? can + // it blow up the server? + // FIXME: a cross-platform method of speeding this up would be nice. With + // event fds we can queue N events and wakeup once here. + // If we're pulling one byte out of the pipe at a time here it'll just + // wake us up too often. + // If the pipe is O_NONBLOCK then maybe just a larger read would work? + if (read(fd, buf, 1) != 1) { + P_DEBUG("%s: pipe read failed\n", __func__); + return; + } +#endif + + if (_proxy_event_handler_dequeue(t) == 0) { + //P_DEBUG("%s: no IO's to complete\n", __func__); + return; + } + + // Re-walk each backend and check set event as required. + mcp_backend_t *be = NULL; + struct timeval tmp_time = t->timeouts.connect; + + // FIXME: _set_event() is buggy, see notes on function. + STAILQ_FOREACH(be, &t->be_head, be_next) { + be->stacked = false; + int flags = 0; + + if (be->connecting) { + P_DEBUG("%s: deferring IO pending connecting\n", __func__); + } else { + io_pending_proxy_t *io = NULL; + STAILQ_FOREACH(io, &be->io_head, io_next) { + flags = _flush_pending_write(be, io); + if (flags == -1 || flags & EV_WRITE) { + break; + } + } + } + + if (flags == -1) { + _reset_bad_backend(be); + } else { + flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; + _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); + } + } + +} + +static void *proxy_event_thread(void *arg) { + proxy_event_thread_t *t = arg; + + event_base_loop(t->base, 0); + event_base_free(t->base); + + // TODO: join bt threads, free array. + + return NULL; +} + + +// Need a custom function so we can prefix lua strings easily. +static void proxy_out_errstring(mc_resp *resp, const char *str) { + size_t len; + const static char error_prefix[] = "SERVER_ERROR "; + const static int error_prefix_len = sizeof(error_prefix) - 1; + + assert(resp != NULL); + + resp_reset(resp); + // avoid noreply since we're throwing important errors. + + // Fill response object with static string. + len = strlen(str); + if ((len + error_prefix_len + 2) > WRITE_BUFFER_SIZE) { + /* ought to be always enough. just fail for simplicity */ + str = "SERVER_ERROR output line too long"; + len = strlen(str); + } + + char *w = resp->wbuf; + memcpy(w, error_prefix, error_prefix_len); + w += error_prefix_len; + + memcpy(w, str, len); + w += len; + + memcpy(w, "\r\n", 2); + resp_add_iov(resp, resp->wbuf, len + error_prefix_len + 2); + return; +} + +// Simple error wrapper for common failures. +// lua_error() is a jump so this function never returns +// for clarity add a 'return' after calls to this. +static void proxy_lua_error(lua_State *L, const char *s) { + lua_pushstring(L, s); + lua_error(L); +} + +static void proxy_lua_ferror(lua_State *L, const char *fmt, ...) { + va_list ap; + va_start(ap, fmt); + lua_pushfstring(L, fmt, ap); + va_end(ap); + lua_error(L); +} + +// FIXME: if we use the newer API the various pending checks can be adjusted. +static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) { + // FIXME: chicken and egg. + // can't check if pending if the structure is was calloc'ed (sigh) + // don't want to double test here. should be able to event_assign but + // not add anything during initialization, but need the owner thread's + // event base. + int pending = 0; + if (event_initialized(&be->event)) { + pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); + } + if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { + event_del(&be->event); // replace existing event. + } + + // if we can't write, we could be connecting. + // TODO: always check for READ in case some commands were sent + // successfully? The flags could be tracked on *be and reset in the + // handler, perhaps? + event_assign(&be->event, base, mcmc_fd(be->client), + flags, callback, be); + event_add(&be->event, &t); +} + +// this resumes every yielded coroutine (and re-resumes if necessary). +// called from the worker thread after responses have been pulled from the +// network. +// Flow: +// - the response object should already be on the coroutine stack. +// - fix up the stack. +// - run coroutine. +// - if LUA_YIELD, we need to swap out the pending IO from its mc_resp then call for a queue +// again. +// - if LUA_OK finalize the response and return +// - else set error into mc_resp. +static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c) { + int nresults = 0; + int cores = lua_resume(Lc, NULL, 1, &nresults); + size_t rlen = 0; + + if (cores == LUA_OK) { + int type = lua_type(Lc, -1); + if (type == LUA_TUSERDATA) { + mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response"); + LOGGER_LOG(NULL, LOG_RAWCMDS, LOGGER_PROXY_RAW, NULL, r->start, r->cmd, r->resp.type, r->resp.code); + if (r->buf) { + // response set from C. + // FIXME: write_and_free() ? it's a bit wrong for here. + resp->write_and_free = r->buf; + resp_add_iov(resp, r->buf, r->blen); + r->buf = NULL; + } else if (lua_getiuservalue(Lc, -1, 1) != LUA_TNIL) { + // uservalue slot 1 is pre-created, so we get TNIL instead of + // TNONE when nothing was set into it. + const char *s = lua_tolstring(Lc, -1, &rlen); + size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen; + memcpy(resp->wbuf, s, l); + resp_add_iov(resp, resp->wbuf, l); + lua_pop(Lc, 1); + } else if (r->status != MCMC_OK) { + proxy_out_errstring(resp, "backend failure"); + } else { + // TODO: double check how this can get here? + // MCMC_OK but no buffer and no internal value set? still an + // error? + P_DEBUG("%s: unhandled response\n", __func__); + } + } else if (type == LUA_TSTRING) { + // response is a raw string from lua. + const char *s = lua_tolstring(Lc, -1, &rlen); + size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen; + memcpy(resp->wbuf, s, l); + resp_add_iov(resp, resp->wbuf, l); + lua_pop(Lc, 1); + } else { + proxy_out_errstring(resp, "bad response"); + } + } else if (cores == LUA_YIELD) { + if (nresults == 1) { + // TODO: try harder to validate; but we have so few yield cases + // that I'm going to shortcut this here. A single yielded result + // means it's probably an await(), so attempt to process this. + // FIXME: if p, do we need to free it up from the resp? + // resp should not have an IOP I think... + assert(p == NULL); + // coroutine object sitting on the _main_ VM right now, so we grab + // the reference from there, which also pops it. + int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); + mcplib_await_run(c, Lc, coro_ref); + } else { + // need to remove and free the io_pending, since c->resp owns it. + // so we call mcp_queue_io() again and let it override the + // mc_resp's io_pending object. + + int coro_ref = 0; + mc_resp *resp; + if (p != NULL) { + coro_ref = p->coro_ref; + resp = p->resp; + c = p->c; + do_cache_free(p->c->thread->io_cache, p); + // *p is now dead. + } else { + // yielding from a top level call to the coroutine, + // so we need to grab a reference to the coroutine thread. + // TODO: make this more explicit? + // we only need to get the reference here, and error conditions + // should instead drop it, but now it's not obvious to users that + // we're reaching back into the main thread's stack. + assert(c != NULL); + coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX); + resp = c->resp; + } + // TODO: c only used for cache alloc? push the above into the func? + mcp_queue_io(c, resp, coro_ref, Lc); + } + } else { + P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1)); + LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1)); + proxy_out_errstring(resp, "lua failure"); + } + + return 0; +} + +// NOTES: +// - mcp_backend_read: grab req_stack_head, do things +// read -> next, want_read -> next | read_end, etc. +// issue: want read back to read_end as necessary. special state? +// - it's fine: p->client_resp->type. +// - mcp_backend_next: advance, consume, etc. +// TODO: second argument with enum for a specific error. +// - probably just for logging. for app if any of these errors shouldn't +// result in killing the request stack! +static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) { + bool stop = false; + io_pending_proxy_t *p = NULL; + mcmc_resp_t tmp_resp; // helper for testing for GET's END marker. + int flags = 0; + + p = STAILQ_FIRST(&be->io_head); + if (p == NULL) { + // got a read event, but nothing was queued. + // probably means a disconnect event. + // TODO: could probably confirm this by attempting to read the + // socket, getsockopt, or something else simply for logging or + // statistical purposes. + // In this case we know it's going to be a close so error. + flags = -1; + P_DEBUG("%s: read event but nothing in IO queue\n", __func__); + return flags; + } + + while (!stop) { + mcp_resp_t *r; + int res = 1; + int remain = 0; + char *newbuf = NULL; + + switch(be->state) { + case mcp_backend_read: + assert(p != NULL); + P_DEBUG("%s: [read] bread: %d\n", __func__, bread); + + if (bread == 0) { + // haven't actually done a read yet; figure out where/what. + *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread); + return EV_READ; + } else { + be->state = mcp_backend_parse; + } + break; + case mcp_backend_parse: + r = p->client_resp; + r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp); + // FIXME: Don't like this interface. + bread = 0; // only add the bread once per loop. + if (r->status != MCMC_OK) { + P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); + if (r->status == MCMC_WANT_READ) { + flags |= EV_READ; + be->state = mcp_backend_read; + stop = true; + break; + } else { + flags = -1; + stop = true; + break; + } + } + + // we actually don't care about anything but the value length + // TODO: if vlen != vlen_read, pull an item and copy the data. + int extra_space = 0; + switch (r->resp.type) { + case MCMC_RESP_GET: + // We're in GET mode. we only support one key per + // GET in the proxy backends, so we need to later check + // for an END. + extra_space = ENDLEN; + break; + case MCMC_RESP_END: + // this is a MISS from a GET request + // or final handler from a STAT request. + assert(r->resp.vlen == 0); + break; + case MCMC_RESP_META: + // we can handle meta responses easily since they're self + // contained. + break; + case MCMC_RESP_GENERIC: + case MCMC_RESP_NUMERIC: + break; + // TODO: No-op response? + default: + P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type); + // unhandled :( + flags = -1; + stop = true; + break; + } + + if (res) { + if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) { + // Ascii multiget hack mode; consume END's + be->state = mcp_backend_next; + break; + } + + // r->resp.reslen + r->resp.vlen is the total length of the response. + // TODO: need to associate a buffer with this response... + // for now lets abuse write_and_free on mc_resp and simply malloc the + // space we need, stuffing it into the resp object. + + r->blen = r->resp.reslen + r->resp.vlen; + r->buf = malloc(r->blen + extra_space); + if (r->buf == NULL) { + flags = -1; // TODO: specific error. + stop = true; + break; + } + + P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen); + if (r->resp.vlen != r->resp.vlen_read) { + P_DEBUG("%s: got a short read, moving to want_read\n", __func__); + // copy the partial and advance mcmc's buffer digestion. + // FIXME: should call this for both cases? + // special function for advancing mcmc's buffer for + // reading a value? perhaps a flag to skip the data copy + // when it's unnecessary? + memcpy(r->buf, be->rbuf, r->resp.reslen); + r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread); + be->state = mcp_backend_want_read; + break; + } else { + // mcmc's already counted the value as read if it fit in + // the original buffer... + memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read); + } + } else { + // TODO: no response read? + // nothing currently sets res to 0. should remove if that + // never comes up and handle the error entirely above. + P_DEBUG("%s: no response read from backend\n", __func__); + flags = -1; + stop = true; + break; + } + + if (r->resp.type == MCMC_RESP_GET) { + // advance the buffer + newbuf = mcmc_buffer_consume(be->client, &remain); + if (remain != 0) { + // TODO: don't need to shuffle buffer with better API + memmove(be->rbuf, newbuf, remain); + } + + be->state = mcp_backend_read_end; + } else { + be->state = mcp_backend_next; + } + + break; + case mcp_backend_read_end: + r = p->client_resp; + // we need to ensure the next data in the stream is "END\r\n" + // if not, the stack is desynced and we lose it. + + r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp); + P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type); + if (r->status != MCMC_OK) { + if (r->status == MCMC_WANT_READ) { + *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread); + return EV_READ; + } else { + flags = -1; // TODO: specific error. + stop = true; + } + break; + } else if (tmp_resp.type != MCMC_RESP_END) { + // TODO: specific error about protocol desync + flags = -1; + stop = true; + break; + } else { + // response is good. + // FIXME: copy what the server actually sent? + if (!p->ascii_multiget) { + // sigh... if part of a multiget we need to eat the END + // markers down here. + memcpy(r->buf+r->blen, ENDSTR, ENDLEN); + r->blen += 5; + } + } + + be->state = mcp_backend_next; + + break; + case mcp_backend_want_read: + // Continuing a read from earlier + r = p->client_resp; + // take bread input and see if we're done reading the value, + // else advance, set buffers, return next. + if (bread > 0) { + r->bread += bread; + bread = 0; + } + P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen); + + if (r->bread >= r->resp.vlen) { + // all done copying data. + if (r->resp.type == MCMC_RESP_GET) { + newbuf = mcmc_buffer_consume(be->client, &remain); + // Shouldn't be anything in the buffer if we had to run to + // want_read to read the value. + assert(remain == 0); + be->state = mcp_backend_read_end; + } else { + be->state = mcp_backend_next; + } + } else { + // signal to caller to issue a read. + *rbuf = r->buf+r->resp.reslen+r->bread; + *toread = r->resp.vlen - r->bread; + // need to retry later. + flags |= EV_READ; + stop = true; + } + + break; + case mcp_backend_next: + // set the head here. when we break the head will be correct. + STAILQ_REMOVE_HEAD(&be->io_head, io_next); + be->depth--; + // have to do the q->count-- and == 0 and redispatch_conn() + // stuff here. The moment we call return_io here we + // don't own *p anymore. + return_io_pending((io_pending_t *)p); + + if (STAILQ_EMPTY(&be->io_head)) { + // TODO: suspicious of this code. audit harder? + stop = true; + } else { + p = STAILQ_FIRST(&be->io_head); + } + + // mcmc_buffer_consume() - if leftover, keep processing + // IO's. + // if no more data in buffer, need to re-set stack head and re-set + // event. + remain = 0; + // TODO: do we need to yield every N reads? + newbuf = mcmc_buffer_consume(be->client, &remain); + P_DEBUG("%s: [next] remain: %d\n", __func__, remain); + be->state = mcp_backend_read; + if (remain != 0) { + // data trailing in the buffer, for a different request. + memmove(be->rbuf, newbuf, remain); + be->state = mcp_backend_parse; + P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain); + } else { + // need to read more data, buffer is empty. + stop = true; + } + + break; + default: + // TODO: at some point (after v1?) this should attempt to recover, + // though we should only get here from memory corruption and + // bailing may be the right thing to do. + fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state); + assert(false); + } // switch + } // while + + return flags; +} + +// TODO: surface to option +#define BACKEND_FAILURE_LIMIT 3 + +// All we need to do here is schedule the backend to attempt to connect again. +static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { + mcp_backend_t *be = arg; + assert(which & EV_TIMEOUT); + struct timeval tmp_time = be->event_thread->timeouts.retry; + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); +} + +// currently just for timeouts, but certain errors should consider a backend +// to be "bad" as well. +// must be called before _reset_bad_backend(), so the backend is currently +// clear. +// TODO: currently only notes for "bad backends" in cases of timeouts or +// connect failures. We need a specific connect() handler that executes a +// "version" call to at least check that the backend isn't speaking garbage. +// In theory backends can fail such that responses are constantly garbage, +// but it's more likely an app is doing something bad and culling the backend +// may prevent any other clients from talking to that backend. In +// that case we need to track if clients are causing errors consistently and +// block them instead. That's more challenging so leaving a note instead +// of doing this now :) +static void _backend_failed(mcp_backend_t *be) { + struct timeval tmp_time = be->event_thread->timeouts.retry; + if (++be->failed_count > BACKEND_FAILURE_LIMIT) { + P_DEBUG("%s: marking backend as bad\n", __func__); + be->bad = true; + _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); + } else { + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); + } +} + +// TODO: add a second argument for assigning a specific error to all pending +// IO's (ie; timeout). +// The backend has gotten into a bad state (timed out, protocol desync, or +// some other supposedly unrecoverable error: purge the queue and +// cycle the socket. +// Note that some types of errors may not require flushing the queue and +// should be fixed as they're figured out. +// _must_ be called from within the event thread. +static int _reset_bad_backend(mcp_backend_t *be) { + io_pending_proxy_t *io = NULL; + STAILQ_FOREACH(io, &be->io_head, io_next) { + // TODO: Unsure if this is the best way of surfacing errors to lua, + // but will do for V1. + io->client_resp->status = MCMC_ERR; + return_io_pending((io_pending_t *)io); + } + + STAILQ_INIT(&be->io_head); + + mcmc_disconnect(be->client); + int status = mcmc_connect(be->client, be->ip, be->port, MCMC_OPTION_NONBLOCK); + if (status == MCMC_CONNECTED) { + // TODO: unexpected but lets let it be here. + be->connecting = false; + be->can_write = true; + } else if (status == MCMC_CONNECTING) { + be->connecting = true; + be->can_write = false; + } else { + // TODO: failed to immediately re-establish the connection. + // need to put the BE into a bad/retry state. + // FIXME: until we get an event to specifically handle connecting and + // bad server handling, attempt to force a reconnect here the next + // time a request comes through. + // The event thread will attempt to write to the backend, fail, then + // end up in this routine again. + be->connecting = false; + be->can_write = true; + } + + // TODO: configure the event as necessary internally. + + return 0; +} + +static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p) { + int flags = 0; + + if (p->flushed) { + return 0; + } + + ssize_t sent = 0; + // FIXME: original send function internally tracked how much was sent, but + // doing this here would require copying all of the iovecs or modify what + // we supply. + // this is probably okay but I want to leave a note here in case I get a + // better idea. + int status = mcmc_request_writev(be->client, p->iov, p->iovcnt, &sent, 1); + if (sent > 0) { + // we need to save progress in case of WANT_WRITE. + for (int x = 0; x < p->iovcnt; x++) { + struct iovec *iov = &p->iov[x]; + if (sent >= iov->iov_len) { + sent -= iov->iov_len; + iov->iov_len = 0; + } else { + iov->iov_len -= sent; + sent = 0; + break; + } + } + } + + // request_writev() returns WANT_WRITE if we haven't fully flushed. + if (status == MCMC_WANT_WRITE) { + // avoid syscalls for any other queued requests. + be->can_write = false; + flags = EV_WRITE; + // can't continue for now. + } else if (status != MCMC_OK) { + flags = -1; + // TODO: specific error code + // s->error = code? + } else { + flags = EV_READ; + p->flushed = true; + } + + return flags; +} + +// The libevent backend callback handler. +// If we end up resetting a backend, it will get put back into a connecting +// state. +static void proxy_backend_handler(const int fd, const short which, void *arg) { + mcp_backend_t *be = arg; + int flags = EV_TIMEOUT; + struct timeval tmp_time = be->event_thread->timeouts.read; + + if (which & EV_TIMEOUT) { + P_DEBUG("%s: timeout received, killing backend queue\n", __func__); + _reset_bad_backend(be); + _backend_failed(be); + return; + } + + if (which & EV_WRITE) { + be->can_write = true; + // TODO: move connect routine to its own function? + // - hard to do right now because we can't (easily?) edit libevent + // events. + if (be->connecting) { + int err = 0; + // We were connecting, now ensure we're properly connected. + if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { + // kick the bad backend, clear the queue, retry later. + // FIXME: if a connect fails, anything currently in the queue + // should be safe to hold up until their timeout. + _reset_bad_backend(be); + _backend_failed(be); + P_DEBUG("%s: backend failed to connect\n", __func__); + return; + } + P_DEBUG("%s: backend connected\n", __func__); + be->connecting = false; + be->state = mcp_backend_read; + be->bad = false; + be->failed_count = 0; + } + io_pending_proxy_t *io = NULL; + int res = 0; + STAILQ_FOREACH(io, &be->io_head, io_next) { + res = _flush_pending_write(be, io); + if (res != -1) { + flags |= res; + if (flags & EV_WRITE) { + break; + } + } else { + break; + } + } + if (res == -1) { + _reset_bad_backend(be); + return; + } + } + + if (which & EV_READ) { + // We do the syscall here before diving into the state machine to allow a + // common code path for io_uring/epoll + int res = 1; + int read = 0; + while (res > 0) { + char *rbuf = NULL; + size_t toread = 0; + // need to input how much was read since last call + // needs _output_ of the buffer to read into and how much. + res = proxy_backend_drive_machine(be, read, &rbuf, &toread); + P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread); + + if (res > 0) { + read = recv(mcmc_fd(be->client), rbuf, toread, 0); + P_DEBUG("%s: read: %d\n", __func__, read); + if (read == 0) { + // not connected or error. + _reset_bad_backend(be); + return; + } else if (read == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; // sit on epoll again. + } else { + _reset_bad_backend(be); + return; + } + } + } else if (res == -1) { + _reset_bad_backend(be); + return; + } else { + break; + } + } + +#ifdef PROXY_DEBUG + if (!STAILQ_EMPTY(&be->io_head)) { + P_DEBUG("backend has leftover IOs: %d\n", be->depth); + } +#endif + } + + // Still pending requests to read or write. + // TODO: need to handle errors from above so we don't go to sleep here. + if (!STAILQ_EMPTY(&be->io_head)) { + flags |= EV_READ; // FIXME: might not be necessary here, but ensures we get a disconnect event. + _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + } +} + +static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) { + assert(c != NULL); + LIBEVENT_THREAD *thr = c->thread; + struct proxy_hook *hooks = thr->proxy_hooks; + lua_State *L = thr->L; + mcp_parser_t pr = {0}; + + // Avoid doing resp_start() here, instead do it a bit later or as-needed. + // This allows us to hop over to the internal text protocol parser, which + // also calls resp_start(). + // Tighter integration later should obviate the need for this, it is not a + // permanent solution. + int ret = process_request(&pr, command, cmdlen); + if (ret != 0) { + WSTAT_INCR(c, proxy_conn_errors, 1); + if (!resp_start(c)) { + conn_set_state(c, conn_closing); + return; + } + proxy_out_errstring(c->resp, "parsing request"); + if (ret == -2) { + // Kill connection on more critical parse failure. + conn_set_state(c, conn_closing); + } + return; + } + + struct proxy_hook *hook = &hooks[pr.command]; + + if (!hook->is_lua) { + // need to pass our command string into the internal handler. + // to minimize the code change, this means allowing it to tokenize the + // full command. The proxy's indirect parser should be built out to + // become common code for both proxy and ascii handlers. + // For now this means we have to null-terminate the command string, + // then call into text protocol handler. + // FIXME: use a ptr or something; don't like this code. + if (cmdlen > 1 && command[cmdlen-2] == '\r') { + command[cmdlen-2] = '\0'; + } else { + command[cmdlen-1] = '\0'; + } + process_command_ascii(c, command); + return; + } + + // Count requests handled by proxy vs local. + WSTAT_INCR(c, proxy_conn_requests, 1); + + // If ascii multiget, we turn this into a self-calling loop :( + // create new request with next key, call this func again, then advance + // original string. + // might be better to split this function; the below bits turn into a + // function call, then we don't re-process the above bits in the same way? + // The way this is detected/passed on is very fragile. + if (!multiget && pr.cmd_type == CMD_TYPE_GET && pr.has_space) { + // TODO: need some way to abort this. + // FIXME: before the while loop, ensure pr.keytoken isn't too bug for + // the local temp buffer here. + uint32_t keyoff = pr.tokens[pr.keytoken]; + while (pr.klen != 0) { + char temp[KEY_MAX_LENGTH + 30]; + char *cur = temp; + // copy original request up until the original key token. + memcpy(cur, pr.request, pr.tokens[pr.keytoken]); + cur += pr.tokens[pr.keytoken]; + + // now copy in our "current" key. + memcpy(cur, &pr.request[keyoff], pr.klen); + cur += pr.klen; + + memcpy(cur, "\r\n", 2); + cur += 2; + + *cur = '\0'; + P_DEBUG("%s: new multiget sub request: %s [%u/%u]\n", __func__, temp, keyoff, pr.klen); + proxy_process_command(c, temp, cur - temp, PROCESS_MULTIGET); + + // now advance to the next key. + keyoff = _process_request_next_key(&pr); + } + + if (!resp_start(c)) { + conn_set_state(c, conn_closing); + return; + } + + // The above recursions should have created c->resp's in dispatch + // order. + // So now we add another one at the end to create the capping END + // string. + memcpy(c->resp->wbuf, ENDSTR, ENDLEN); + resp_add_iov(c->resp, c->resp->wbuf, ENDLEN); + + return; + } + + // We test the command length all the way down here because multigets can + // be very long, and they're chopped up by now. + if (cmdlen >= MCP_REQUEST_MAXLEN) { + WSTAT_INCR(c, proxy_conn_errors, 1); + if (!resp_start(c)) { + conn_set_state(c, conn_closing); + return; + } + proxy_out_errstring(c->resp, "request too long"); + conn_set_state(c, conn_closing); + return; + } + + if (!resp_start(c)) { + conn_set_state(c, conn_closing); + return; + } + + // start a coroutine. + // TODO: This can pull from a cache. + lua_newthread(L); + lua_State *Lc = lua_tothread(L, -1); + // leave the thread first on the stack, so we can reference it if needed. + // pull the lua hook function onto the stack. + lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook->lua_ref); + + mcp_request_t *rq = mcp_new_request(Lc, &pr, command, cmdlen); + if (multiget) { + rq->ascii_multiget = true; + } + // TODO: a better indicator of needing nread? pr->has_value? + // TODO: lift this to a post-processor? + if (rq->pr.vlen != 0) { + // relying on temporary malloc's not succumbing as poorly to + // fragmentation. + c->item = malloc(rq->pr.vlen); + if (c->item == NULL) { + lua_settop(L, 0); + proxy_out_errstring(c->resp, "out of memory"); + return; + } + c->item_malloced = true; + c->ritem = c->item; + c->rlbytes = rq->pr.vlen; + + conn_set_state(c, conn_nread); + + // thread coroutine is still on (L, -1) + // FIXME: could get speedup from stashing Lc ptr. + return; + } + + proxy_run_coroutine(Lc, c->resp, NULL, c); + + lua_settop(L, 0); // clear anything remaining on the main thread. +} + +// analogue for storage_get_item(); add a deferred IO object to the current +// connection's response object. stack enough information to write to the +// server on the submit callback, and enough to resume the lua state on the +// completion callback. +static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) { + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); + + // stack: request, hash selector. latter just to hold a reference. + + mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request"); + mcp_backend_t *be = rq->be; + + // Then we push a response object, which we'll re-use later. + // reserve one uservalue for a lua-supplied response. + mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1); + if (r == NULL) { + proxy_lua_error(Lc, "out of memory allocating response"); + return; + } + // FIXME: debugging? + memset(r, 0, sizeof(mcp_resp_t)); + // TODO: check *r + r->buf = NULL; + r->blen = 0; + r->start = rq->start; // need to inherit the original start time. + int x; + int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2; + for (x = 0; x < end; x++) { + if (rq->pr.request[x] == ' ') { + break; + } + r->cmd[x] = rq->pr.request[x]; + } + r->cmd[x] = '\0'; + + luaL_getmetatable(Lc, "mcp.response"); + lua_setmetatable(Lc, -2); + + io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache); + // FIXME: can this fail? + + // this is a re-cast structure, so assert that we never outsize it. + assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t)); + memset(p, 0, sizeof(io_pending_proxy_t)); + // set up back references. + p->io_queue_type = IO_QUEUE_PROXY; + p->thread = c->thread; + p->c = c; + p->resp = resp; + p->client_resp = r; + p->flushed = false; + p->ascii_multiget = rq->ascii_multiget; + resp->io_pending = (io_pending_t *)p; + + // top of the main thread should be our coroutine. + // lets grab a reference to it and pop so it doesn't get gc'ed. + p->coro_ref = coro_ref; + + // we'll drop the pointer to the coro on here to save some CPU + // on re-fetching it later. The pointer shouldn't change. + p->coro = Lc; + + // The direct backend object. Lc is holding the reference in the stack + p->backend = be; + + mcp_request_attach(Lc, rq, p); + + // link into the batch chain. + p->next = q->stack_ctx; + q->stack_ctx = p; + + return; +} + +/******** LUA INTERFACE FUNCTIONS ******/ + +__attribute__((unused)) static void dump_stack(lua_State *L) { + int top = lua_gettop(L); + int i = 1; + fprintf(stderr, "--TOP OF STACK [%d]\n", top); + for (; i < top + 1; i++) { + int type = lua_type(L, i); + // lets find the metatable of this userdata to identify it. + if (lua_getmetatable(L, i) != 0) { + lua_pushstring(L, "__name"); + if (lua_rawget(L, -2) != LUA_TNIL) { + fprintf(stderr, "--|%d| [%s] (%s)\n", i, lua_typename(L, type), lua_tostring(L, -1)); + lua_pop(L, 2); + continue; + } + lua_pop(L, 2); + } + if (type == LUA_TSTRING) { + fprintf(stderr, "--|%d| [%s] | %s\n", i, lua_typename(L, type), lua_tostring(L, i)); + } else { + fprintf(stderr, "--|%d| [%s]\n", i, lua_typename(L, type)); + } + } + fprintf(stderr, "-----------------\n"); +} + +// func prototype example: +// static int fname (lua_State *L) +// normal library open: +// int luaopen_mcp(lua_State *L) { } + +// resp:ok() +static int mcplib_response_ok(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + if (r->status == MCMC_OK) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + + return 1; +} + +static int mcplib_response_hit(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + if (r->status == MCMC_OK && r->resp.code != MCMC_CODE_MISS) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + + return 1; +} + +static int mcplib_response_gc(lua_State *L) { + mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response"); + + // On error/similar we might be holding the read buffer. + // If the buf is handed off to mc_resp for return, this pointer is NULL + if (r->buf != NULL) { + free(r->buf); + } + + return 0; +} + +static int mcplib_backend_gc(lua_State *L) { + mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend"); + + // TODO: need to validate it's impossible to cause a backend to be garbage + // collected while outstanding requests exist. + // might need some kind of failsafe here to leak memory and warn instead + // of killing the object and crashing? or is that too late since we're in + // __gc? + assert(STAILQ_EMPTY(&be->io_head)); + + mcmc_disconnect(be->client); + free(be->client); + + return 0; +} + +static int mcplib_backend(lua_State *L) { + luaL_checkstring(L, -4); // label for indexing backends. + const char *ip = luaL_checkstring(L, -3); // FIXME: checklstring? + const char *port = luaL_checkstring(L, -2); + double weight = luaL_checknumber(L, -1); + + // first check our reference table to compare. + lua_pushvalue(L, -4); + int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); + if (ret != LUA_TNIL) { + mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend"); + if (strncmp(be_orig->ip, ip, MAX_IPLEN) == 0 + && strncmp(be_orig->port, port, MAX_PORTLEN) == 0 + && be_orig->weight == weight) { + // backend is the same, return it. + return 1; + } else { + // backend not the same, pop from stack and make new one. + lua_pop(L, 1); + } + } else { + lua_pop(L, 1); + } + + // This might shift to internal objects? + mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0); + if (be == NULL) { + proxy_lua_error(L, "out of memory allocating backend"); + return 0; + } + + // FIXME: remove some of the excess zero'ing below? + memset(be, 0, sizeof(mcp_backend_t)); + strncpy(be->ip, ip, MAX_IPLEN); + strncpy(be->port, port, MAX_PORTLEN); + be->weight = weight; + be->depth = 0; + be->rbuf = NULL; + be->failed_count = 0; + STAILQ_INIT(&be->io_head); + be->state = mcp_backend_read; + be->connecting = false; + be->can_write = false; + be->stacked = false; + be->bad = false; + + // this leaves a permanent buffer on the backend, which is fine + // unless you have billions of backends. + // we can later optimize for pulling buffers from idle backends. + be->rbuf = malloc(READ_BUFFER_SIZE); + if (be->rbuf == NULL) { + proxy_lua_error(L, "out of memory allocating backend"); + return 0; + } + + // initialize libevent. + memset(&be->event, 0, sizeof(be->event)); + + // initialize the client + be->client = malloc(mcmc_size(MCMC_OPTION_BLANK)); + if (be->client == NULL) { + proxy_lua_error(L, "out of memory allocating backend"); + return 0; + } + // TODO: connect elsewhere. When there're multiple backend owners, or + // sockets per backend, etc. We'll want to kick off connects as use time. + int status = mcmc_connect(be->client, be->ip, be->port, MCMC_OPTION_NONBLOCK); + if (status == MCMC_CONNECTED) { + // FIXME: is this possible? do we ever want to allow blocking + // connections? + proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->ip, be->port); + return 0; + } else if (status == MCMC_CONNECTING) { + be->connecting = true; + be->can_write = false; + } else { + proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->ip, be->port); + return 0; + } + + luaL_getmetatable(L, "mcp.backend"); + lua_setmetatable(L, -2); // set metatable to userdata. + + lua_pushvalue(L, 1); // put the label at the top for settable later. + lua_pushvalue(L, -2); // copy the backend reference to the top. + // set our new backend object into the reference table. + lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE)); + // stack is back to having backend on the top. + + return 1; +} + +static int mcplib_pool_gc(lua_State *L) { + mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool"); + assert(p->refcount == 0); + pthread_mutex_destroy(&p->lock); + + return 0; +} + +// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f }) +// TODO: hash and hashfilter +static int mcplib_pool(lua_State *L) { + int argc = lua_gettop(L); + luaL_checktype(L, 1, LUA_TTABLE); + int n = luaL_len(L, 1); // get length of array table + + mcp_pool_t *p = lua_newuserdatauv(L, sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n, 0); + // TODO: check null. + // FIXME: zero the memory? then __gc will fix up server references on + // errors. + p->pool_size = n; + p->refcount = 0; + pthread_mutex_init(&p->lock, NULL); + p->ctx = settings.proxy_ctx; // TODO: store ctx in upvalue. + + luaL_setmetatable(L, "mcp.pool"); + + lua_pushvalue(L, -1); // dupe self for reference. + p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + + // TODO: ensure to increment refcounts for servers. + // remember lua arrays are 1 indexed. + for (int x = 1; x <= n; x++) { + mcp_pool_be_t *s = &p->pool[x-1]; + lua_geti(L, 1, x); // get next server into the stack. + // TODO: do we leak memory if we bail here? + // the stack should clear, then release the userdata + etc? + // - yes it should leak memory for the registry indexed items. + s->be = luaL_checkudata(L, -1, "mcp.backend"); + s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object. + } + + if (argc == 1) { + // Use default hash selector if none given. + p->phc = mcplib_hashfunc_murmur3; + return 1; + } + + // Supplied with an options table. We inspect this table to decorate the + // pool, then pass it along to the a constructor if necessary. + luaL_checktype(L, 2, LUA_TTABLE); + + // stack: backends, options, mcp.pool + if (lua_getfield(L, 2, "dist") != LUA_TNIL) { + // overriding the distribution function. + luaL_checktype(L, -1, LUA_TTABLE); + if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) { + proxy_lua_error(L, "key distribution object missing 'new' function"); + return 0; + } + + // - now create the copy pool table + lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint. + for (int x = 1; x <= p->pool_size; x++) { + mcp_backend_t *be = p->pool[x-1].be; + lua_createtable(L, 0, 4); + // stack = [p, h, f, optN, newpool, backend] + // the key should be fine for id? maybe don't need to duplicate + // this? + lua_pushinteger(L, x); + lua_setfield(L, -2, "id"); + // we don't use the hostname for ketama hashing + // so passing ip for hostname is fine + lua_pushstring(L, be->ip); + lua_setfield(L, -2, "hostname"); + lua_pushstring(L, be->ip); + lua_setfield(L, -2, "addr"); + lua_pushstring(L, be->port); + lua_setfield(L, -2, "port"); + // TODO: weight/etc? + + // set the backend table into the new pool table. + lua_rawseti(L, -2, x); + } + + // we can either use lua_insert() or possibly _rotate to shift + // things into the right place, but simplest is to just copy the + // option arg to the end of the stack. + lua_pushvalue(L, 2); + // - stack should be: pool, opts, func, pooltable, opts + + // call the dist new function. + int res = lua_pcall(L, 2, 2, 0); + + if (res != LUA_OK) { + lua_error(L); // error should be on the stack already. + return 0; + } + + // TODO: validate response arguments. + // -1 is lightuserdata ptr to the struct (which must be owned by the + // userdata), which is later used for internal calls. + struct proxy_hash_caller *phc; + phc = lua_touserdata(L, -1); + memcpy(&p->phc, phc, sizeof(*phc)); + lua_pop(L, 1); + // -2 was userdata we need to hold a reference to + p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX); + // UD now popped from stack. + + lua_pop(L, 1); // remove the dist table from stack. + } else { + lua_pop(L, 1); // pop the nil. + } + + // TODO: getfield("filter") and add the function + // TODO: getfield("hash") and add the function + // TODO: getfield("seed") and use the hash function to calculate a seed + // value. + + return 1; +} + +static int mcplib_pool_proxy_gc(lua_State *L) { + mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy"); + mcp_pool_t *p = pp->main; + pthread_mutex_lock(&p->lock); + p->refcount--; + if (p->refcount == 0) { + proxy_ctx_t *ctx = p->ctx; + pthread_mutex_lock(&ctx->manager_lock); + STAILQ_INSERT_TAIL(&ctx->manager_head, p, next); + pthread_cond_signal(&ctx->manager_cond); + pthread_mutex_unlock(&ctx->manager_lock); + } + pthread_mutex_unlock(&p->lock); + + return 0; +} + +// hashfunc(request) -> backend(request) +// needs key from request object. +static int mcplib_pool_proxy_call(lua_State *L) { + // internal args are the hash selector (self) + mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy"); + mcp_pool_t *p = pp->main; + // then request object. + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + + // we have a fast path to the key/length. + // FIXME: indicator for if request actually has a key token or not. + const char *key = MCP_PARSER_KEY(rq->pr); + size_t len = rq->pr.klen; + uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx); + + // attach the backend to the request object. + // save CPU cycles over rolling it through lua. + if (p->phc.ctx == NULL) { + // TODO: if NULL, pass in pool_size as ctx? + // works because the % bit will return an id we can index here. + // FIXME: temporary? maybe? + // if no context, what we got back was a hash which we need to modulus + // against the pool, since the func has no info about the pool. + rq->be = p->pool[lookup % p->pool_size].be; + } else { + // else we have a direct id into our pool. + // the lua modules should "think" in 1 based indexes, so we need to + // subtract one here. + // TODO: bother validating the range? + rq->be = p->pool[lookup-1].be; + } + + // now yield request, hash selector up. + return lua_yield(L, 2); +} + +// TODO: take fractional time and convert. +static int mcplib_backend_connect_timeout(lua_State *L) { + int seconds = luaL_checkinteger(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->timeouts.connect.tv_sec = seconds; + STAT_UL(ctx); + + return 0; +} + +static int mcplib_backend_retry_timeout(lua_State *L) { + int seconds = luaL_checkinteger(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->timeouts.retry.tv_sec = seconds; + STAT_UL(ctx); + + return 0; +} + +static int mcplib_backend_read_timeout(lua_State *L) { + int seconds = luaL_checkinteger(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue. + + STAT_L(ctx); + ctx->timeouts.read.tv_sec = seconds; + STAT_UL(ctx); + + return 0; +} + +// mcp.attach(mcp.HOOK_NAME, function) +// fill hook structure: if lua function, use luaL_ref() to store the func +static int mcplib_attach(lua_State *L) { + // Pull the original worker thread out of the shared mcplib upvalue. + LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); + + int hook = luaL_checkinteger(L, -2); + // pushvalue to dupe func and etc. + // can leave original func on stack afterward because it'll get cleared. + int loop_end = 0; + int loop_start = 1; + if (hook == CMD_ANY) { + // if CMD_ANY we need individually set loop 1 to CMD_SIZE. + loop_end = CMD_SIZE; + } else if (hook == CMD_ANY_STORAGE) { + // if CMD_ANY_STORAGE we only override get/set/etc. + loop_end = CMD_END_STORAGE; + } else { + loop_start = hook; + loop_end = hook + 1; + } + + if (lua_isfunction(L, -1)) { + struct proxy_hook *hooks = t->proxy_hooks; + + for (int x = loop_start; x < loop_end; x++) { + struct proxy_hook *h = &hooks[x]; + lua_pushvalue(L, -1); // duplicate the function for the ref. + if (h->lua_ref) { + // remove existing reference. + luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref); + } + + // pops the function from the stack and leaves us a ref. for later. + h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX); + h->is_lua = true; + } + } else { + proxy_lua_error(L, "Must pass a function to mcp.attach"); + return 0; + } + + return 0; +} + +static void proxy_register_defines(lua_State *L) { +#define X(x) \ + lua_pushinteger(L, x); \ + lua_setfield(L, -2, #x); + + X(P_OK); + X(CMD_ANY); + X(CMD_ANY_STORAGE); + CMD_FIELDS +#undef X +} + +/*** REQUEST PARSER AND OBJECT ***/ + +// Find the starting offsets of each token; ignoring length. +// This creates a fast small (<= cacheline) index into the request, +// where we later scan or directly feed data into API's. +static int _process_tokenize(mcp_parser_t *pr, const size_t max) { + const char *s = pr->request; + int len = pr->reqlen - 2; + // FIXME: die if reqlen too long. + // reqlen could be huge if multiget so... need some special casing? + const char *end = s + len; + int curtoken = 0; + + int state = 0; + while (s != end) { + switch (state) { + case 0: + if (*s != ' ') { + pr->tokens[curtoken] = s - pr->request; + if (++curtoken == max) { + goto endloop; + } + state = 1; + } + s++; + break; + case 1: + if (*s != ' ') { + s++; + } else { + state = 0; + } + break; + } + } +endloop: + + pr->ntokens = curtoken; + P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken); + + return 0; +} + +static int _process_token_len(mcp_parser_t *pr, size_t token) { + const char *cur = pr->request + pr->tokens[token]; + int remain = pr->reqlen - pr->tokens[token] - 2; // CRLF + + const char *s = memchr(cur, ' ', remain); + return (s != NULL) ? s - cur : remain; +} + +static int _process_request_key(mcp_parser_t *pr) { + pr->klen = _process_token_len(pr, pr->keytoken); + // advance the parser in case of multikey. + pr->parsed = pr->tokens[pr->keytoken] + pr->klen + 1; + + if (pr->request[pr->parsed-1] == ' ') { + P_DEBUG("%s: request_key found extra space\n", __func__); + pr->has_space = true; + } else { + pr->has_space = false; + } + return 0; +} + +// Just for ascii multiget: search for next "key" beyond where we stopped +// tokenizing before. +// Returns the offset for the next key. +static size_t _process_request_next_key(mcp_parser_t *pr) { + const char *cur = pr->request + pr->parsed; + int remain = pr->reqlen - pr->parsed - 2; + + // chew off any leading whitespace. + while (remain) { + if (*cur == ' ') { + remain--; + cur++; + pr->parsed++; + } else { + break; + } + } + + const char *s = memchr(cur, ' ', remain); + if (s != NULL) { + pr->klen = s - cur; + pr->parsed += s - cur; + } else { + pr->klen = remain; + pr->parsed += remain; + } + + return cur - pr->request; +} + +// for fast testing of existence of meta flags. +// meta has all flags as final tokens +static int _process_request_metaflags(mcp_parser_t *pr, int token) { + if (pr->ntokens <= token) { + pr->t.meta.flags = 0; // no flags found. + return 0; + } + const char *cur = pr->request + pr->tokens[token]; + const char *end = pr->request + pr->reqlen - 2; + + // We blindly convert flags into // bits, since the range of possible + // flags is deliberately < 64. + int state = 0; + while (cur != end) { + switch (state) { + case 0: + if (*cur == ' ') { + cur++; + } else { + if (*cur < 65 || *cur > 122) { + return -1; + } + P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65); + pr->t.meta.flags |= 1 << (*cur - 65); + state = 1; + } + break; + case 1: + if (*cur != ' ') { + cur++; + } else { + state = 0; + } + break; + } + } + + return 0; +} + +// All meta commands are of form: "cm key f l a g S100" +static int _process_request_meta(mcp_parser_t *pr) { + _process_tokenize(pr, PARSER_MAX_TOKENS); + if (pr->ntokens < 2) { + P_DEBUG("%s: not enough tokens for meta command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + // pass the first flag token. + return _process_request_metaflags(pr, 2); +} + +// ms <key> <datalen> <flags>*\r\n +static int _process_request_mset(mcp_parser_t *pr) { + _process_tokenize(pr, PARSER_MAX_TOKENS); + if (pr->ntokens < 3) { + P_DEBUG("%s: not enough tokens for meta set command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + const char *cur = pr->request + pr->tokens[2]; + + errno = 0; + char *n = NULL; + int vlen = strtol(cur, &n, 10); + if ((errno == ERANGE) || (cur == n)) { + return -1; + } + + if (vlen < 0 || vlen > (INT_MAX - 2)) { + return -1; + } + vlen += 2; + + pr->vlen = vlen; + + // pass the first flag token + return _process_request_metaflags(pr, 3); +} + +// gat[s] <exptime> <key>*\r\n +static int _process_request_gat(mcp_parser_t *pr) { + _process_tokenize(pr, 3); + if (pr->ntokens < 3) { + P_DEBUG("%s: not enough tokens for GAT: %d\n", __func__, pr->ntokens); + return -1; + } + + pr->keytoken = 2; + _process_request_key(pr); + return 0; +} + +// we need t find the bytes supplied immediately so we can read the request +// from the client properly. +// set <key> <flags> <exptime> <bytes> [noreply]\r\n +static int _process_request_storage(mcp_parser_t *pr, size_t max) { + _process_tokenize(pr, max); + if (pr->ntokens < 5) { + P_DEBUG("%s: not enough tokens to storage command: %d\n", __func__, pr->ntokens); + return -1; + } + pr->keytoken = 1; + _process_request_key(pr); + + errno = 0; + char *n = NULL; + const char *cur = pr->request + pr->tokens[4]; + + int vlen = strtol(cur, &n, 10); + if ((errno == ERANGE) || (cur == n)) { + return -1; + } + + if (vlen < 0 || vlen > (INT_MAX - 2)) { + return -1; + } + vlen += 2; + + pr->vlen = vlen; + + return 0; +} + +// common request with key: <cmd> <key> <args> +static int _process_request_simple(mcp_parser_t *pr, const size_t max) { + _process_tokenize(pr, max); + pr->keytoken = 1; // second token is usually the key... stupid GAT. + + _process_request_key(pr); + return 0; +} + +// TODO: return code ENUM with error types. +// FIXME: the mcp_parser_t bits have ended up being more fragile than I hoped. +// careful zero'ing is required. revisit? +static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) { + // we want to "parse in place" as much as possible, which allows us to + // forward an unmodified request without having to rebuild it. + + const char *cm = command; + size_t cl = 0; + + const char *s = memchr(command, ' ', cmdlen-2); + // TODO: has_space -> has_tokens + // has_space resered for ascii multiget? + if (s != NULL) { + cl = s - command; + } else { + cl = cmdlen - 2; // FIXME: ensure cmdlen can never be < 2? + } + pr->has_space = false; + pr->parsed = cl + 1; + pr->request = command; + pr->reqlen = cmdlen; + int token_max = PARSER_MAX_TOKENS; + + //pr->vlen = 0; // FIXME: remove this once set indicator is decided + int cmd = -1; + int type = CMD_TYPE_GENERIC; + int ret = 0; + + switch (cl) { + case 0: + case 1: + // falls through with cmd as -1. should error. + break; + case 2: + if (cm[0] == 'm') { + switch (cm[1]) { + case 'g': + cmd = CMD_MG; + ret = _process_request_meta(pr); + break; + case 's': + cmd = CMD_MS; + ret = _process_request_mset(pr); + break; + case 'd': + cmd = CMD_MD; + ret = _process_request_meta(pr); + break; + case 'n': + // TODO: do we route/handle NOP's at all? + // they should simply reflect to the client. + cmd = CMD_MN; + break; + case 'a': + cmd = CMD_MA; + ret = _process_request_meta(pr); + break; + case 'e': + cmd = CMD_ME; + // TODO: not much special processing here; binary keys + ret = _process_request_meta(pr); + break; + } + } + break; + case 3: + if (cm[0] == 'g') { + if (cm[1] == 'e' && cm[2] == 't') { + cmd = CMD_GET; + type = CMD_TYPE_GET; + token_max = 2; // don't chew through multigets. + ret = _process_request_simple(pr, 2); + } + if (cm[1] == 'a' && cm[2] == 't') { + type = CMD_TYPE_GET; + cmd = CMD_GAT; + token_max = 2; // don't chew through multigets. + ret = _process_request_gat(pr); + } + } else if (cm[0] == 's' && cm[1] == 'e' && cm[2] == 't') { + cmd = CMD_SET; + ret = _process_request_storage(pr, token_max); + } else if (cm[0] == 'a' && cm[1] == 'd' && cm[2] == 'd') { + cmd = CMD_ADD; + ret = _process_request_storage(pr, token_max); + } else if (cm[0] == 'c' && cm[1] == 'a' && cm[2] == 's') { + cmd = CMD_CAS; + ret = _process_request_storage(pr, token_max); + } + break; + case 4: + if (strncmp(cm, "gets", 4) == 0) { + cmd = CMD_GETS; + type = CMD_TYPE_GET; + token_max = 2; // don't chew through multigets. + ret = _process_request_simple(pr, 2); + } else if (strncmp(cm, "incr", 4) == 0) { + cmd = CMD_INCR; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "decr", 4) == 0) { + cmd = CMD_DECR; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "gats", 4) == 0) { + cmd = CMD_GATS; + type = CMD_TYPE_GET; + ret = _process_request_gat(pr); + } else if (strncmp(cm, "quit", 4) == 0) { + cmd = CMD_QUIT; + } + break; + case 5: + if (strncmp(cm, "touch", 5) == 0) { + cmd = CMD_TOUCH; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "stats", 5) == 0) { + cmd = CMD_STATS; + // Don't process a key; fetch via arguments. + _process_tokenize(pr, token_max); + } else if (strncmp(cm, "watch", 5) == 0) { + cmd = CMD_WATCH; + _process_tokenize(pr, token_max); + } + break; + case 6: + if (strncmp(cm, "delete", 6) == 0) { + cmd = CMD_DELETE; + ret = _process_request_simple(pr, 4); + } else if (strncmp(cm, "append", 6) == 0) { + cmd = CMD_APPEND; + ret = _process_request_storage(pr, token_max); + } + break; + case 7: + if (strncmp(cm, "replace", 7) == 0) { + cmd = CMD_REPLACE; + ret = _process_request_storage(pr, token_max); + } else if (strncmp(cm, "prepend", 7) == 0) { + cmd = CMD_PREPEND; + ret = _process_request_storage(pr, token_max); + } else if (strncmp(cm, "version", 7) == 0) { + cmd = CMD_VERSION; + _process_tokenize(pr, token_max); + } + break; + } + + // TODO: log more specific error code. + if (cmd == -1 || ret != 0) { + return -1; + } + + pr->command = cmd; + pr->cmd_type = type; + + return 0; +} + +// FIXME: any reason to pass in command/cmdlen separately? +static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen) { + // reserving an upvalue for key. + mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN * 2 + KEY_MAX_LENGTH, 1); + memset(rq, 0, sizeof(mcp_request_t)); + memcpy(&rq->pr, pr, sizeof(*pr)); + + memcpy(rq->request, command, cmdlen); + rq->pr.request = rq->request; + rq->pr.reqlen = cmdlen; + gettimeofday(&rq->start, NULL); + + luaL_getmetatable(L, "mcp.request"); + lua_setmetatable(L, -2); + + // at this point we should know if we have to bounce through _nread to + // get item data or not. + return rq; +} + +// TODO: +// if modified, this will re-serialize every time it's accessed. +// a simple opt could copy back over the original space +// a "better" one could A/B the request ptr and clear the modified state +// each time it gets serialized. +static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p) { + mcp_parser_t *pr = &rq->pr; + char *r = (char *) pr->request; + size_t len = pr->reqlen; + + // one or more of the tokens were changed + if (rq->was_modified) { + assert(rq->tokent_ref); + // option table to top of stack. + lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref); + + // space was reserved in case of modification. + char *nr = rq->request + MCP_REQUEST_MAXLEN; + r = nr; + char *or = NULL; + + for (int x = 0; x < pr->ntokens; x++) { + const char *newtok = NULL; + size_t newlen = 0; + if (x != 0 && x != pr->keytoken) { + int type = lua_rawgeti(L, -1, x+1); + if (type != LUA_TNIL) { + newtok = lua_tolstring(L, -1, &newlen); + memcpy(nr, newtok, newlen); + nr += newlen; + } + lua_pop(L, 1); + } + + if (newtok == NULL) { + // TODO: if we add an extra "end" token that's just reqlen we can + // memcpy... however most args are short and that may not be worth + // it. + or = rq->request + pr->tokens[x]; + // will walk past the end without the \r test. + // if we add the end token trick this can be changed. + while (*or != ' ' && *or != '\r') { + *nr = *or; + nr++; + or++; + } + } + *nr = ' '; + nr++; + } + // tag the end bits. + memcpy(nr-1, "\r\n\0", 3); + nr++; + + len = nr - (rq->request + MCP_REQUEST_MAXLEN); + lua_pop(L, 1); // pop the table + } + + // The stringified request. This is also referencing into the coroutine + // stack, which should be safe from gc. + p->iov[0].iov_base = r; + p->iov[0].iov_len = len; + p->iovcnt = 1; + if (pr->vlen != 0) { + p->iov[1].iov_base = pr->vbuf; + p->iov[1].iov_len = pr->vlen; + p->iovcnt = 2; + } + +} + +// second argument is optional, for building set requests. +// TODO: append the \r\n for the VAL? +static int mcplib_request(lua_State *L) { + size_t len = 0; + size_t vlen = 0; + mcp_parser_t pr = {0}; + const char *cmd = luaL_checklstring(L, 1, &len); + const char *val = luaL_optlstring(L, 2, NULL, &vlen); + + // FIXME: if we inline the userdata we can avoid memcpy'ing the parser + // structure from the stack? but causes some code duplication. + if (process_request(&pr, cmd, len) != 0) { + proxy_lua_error(L, "failed to parse request"); + return 0; + } + mcp_request_t *rq = mcp_new_request(L, &pr, cmd, len); + + if (val != NULL) { + rq->pr.vlen = vlen; + rq->pr.vbuf = malloc(vlen); + // TODO: check malloc failure. + memcpy(rq->pr.vbuf, val, vlen); + } + gettimeofday(&rq->start, NULL); + + // rq is now created, parsed, and on the stack. + if (rq == NULL) { + // TODO: lua error. + } + return 1; +} + +static int mcplib_request_key(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + lua_pushlstring(L, MCP_PARSER_KEY(rq->pr), rq->pr.klen); + return 1; +} + +// NOTE: I've mixed up const/non-const strings in the request. During parsing +// we want it to be const, but after that's done the request is no longer +// const. It might be better to just remove the const higher up the chain, but +// I'd rather not. So for now these functions will be dumping the const to +// modify the string. +static int mcplib_request_ltrimkey(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request"); + int totrim = luaL_checkinteger(L, -1); + char *key = (char *) MCP_PARSER_KEY(rq->pr); + + if (totrim > rq->pr.klen) { + proxy_lua_error(L, "ltrimkey cannot zero out key"); + return 0; + } else { + memset(key, ' ', totrim); + rq->pr.klen -= totrim; + rq->pr.tokens[rq->pr.keytoken] += totrim; + } + return 1; +} + +static int mcplib_request_rtrimkey(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request"); + int totrim = luaL_checkinteger(L, -1); + char *key = (char *) MCP_PARSER_KEY(rq->pr); + + if (totrim > rq->pr.klen) { + proxy_lua_error(L, "rtrimkey cannot zero out key"); + return 0; + } else { + memset(key + (rq->pr.klen - totrim), ' ', totrim); + rq->pr.klen -= totrim; + // don't need to change the key token. + } + return 1; +} + +// Virtual table operations on the request. +static int mcplib_request_token(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + int argc = lua_gettop(L); + + if (argc == 1) { + lua_pushnil(L); + return 1; + } + + int token = luaL_checkinteger(L, 2); + + if (token < 1 || token > rq->pr.ntokens) { + // maybe an error? + lua_pushnil(L); + return 1; + } + + // we hold overwritten or parsed tokens in a lua table. + if (rq->tokent_ref == 0) { + // create a presized table that can hold our tokens. + lua_createtable(L, rq->pr.ntokens, 0); + // duplicate value to set back + lua_pushvalue(L, -1); + rq->tokent_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } else { + lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref); + } + // top of stack should be token table. + + size_t vlen = 0; + if (argc > 2) { + // overwriting a token. + luaL_checklstring(L, 3, &vlen); + lua_pushvalue(L, 3); // copy to top of stack + lua_rawseti(L, -2, token); + rq->was_modified = true; + return 0; + } else { + // fetching a token. + if (lua_rawgeti(L, -1, token) != LUA_TSTRING) { + lua_pop(L, 1); // got a nil, drop it. + + // token not uploaded yet. find the len. + char *s = (char *) &rq->pr.request[rq->pr.tokens[token-1]]; + char *e = s; + while (*e != ' ') { + e++; + } + vlen = e - s; + + P_DEBUG("%s: pushing token of len: %lu\n", __func__, vlen); + lua_pushlstring(L, s, vlen); + lua_pushvalue(L, -1); // copy + + lua_rawseti(L, -3, token); // pops copy. + } + + // return fetched token or copy of new token. + return 1; + } + + return 0; +} + +static int mcplib_request_ntokens(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + lua_pushinteger(L, rq->pr.ntokens); + return 1; +} + +static int mcplib_request_command(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + lua_pushinteger(L, rq->pr.command); + return 1; +} + +static int mcplib_request_gc(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request"); + // FIXME: during nread c->item is the malloc'ed buffer. not yet put into + // rq->buf - is this properly freed if the connection dies before + // complete_nread? + if (rq->pr.vbuf != NULL) { + free(rq->pr.vbuf); + } + + if (rq->tokent_ref != 0) { + luaL_unref(L, LUA_REGISTRYINDEX, rq->tokent_ref); + } + return 0; +} + +// TODO: check what lua does when it calls a function with a string argument +// stored from a table/similar (ie; the prefix check code). +// If it's not copying anything, we can add request-side functions to do most +// forms of matching and avoid copying the key to lua space. + +/*** END REQUET PARSER AND OBJECT ***/ + +/*** START jump consistent hash library ***/ +// TODO: easy candidate for splitting to another .c, but I want this built in +// instead of as a .so so make sure it's linked directly. + +typedef struct { + struct proxy_hash_caller phc; // passed back to proxy API + uint64_t seed; + unsigned int buckets; +} mcplib_jump_hash_t; + +static uint32_t mcplib_jump_hash_get_server(const void *key, size_t len, void *ctx) { + mcplib_jump_hash_t *jh = ctx; + + uint64_t hash = XXH3_64bits_withSeed(key, len, jh->seed); + + int64_t b = -1, j = 0; + while (j < jh->buckets) { + b = j; + hash = hash * 2862933555777941757ULL + 1; + j = (b + 1) * ((double)(1LL << 31) / (double)((hash >> 33) + 1)); + } + return b+1; // FIXME: do the -1 just for ketama and remove from internal code? +} + +// stack = [pool, option] +static int mcplib_jump_hash_new(lua_State *L) { + uint64_t seed = 0; + const char *seedstr = NULL; + size_t seedlen = 0; + + luaL_checktype(L, 1, LUA_TTABLE); + lua_Unsigned buckets = lua_rawlen(L, 1); + + int argc = lua_gettop(L); + if (argc > 1) { + // options supplied. to be specified as a table. + // { seed = "foo" } + luaL_checktype(L, 2, LUA_TTABLE); + // FIXME: adjust so we ensure/error on this being a string? + if (lua_getfield(L, 2, "seed") != LUA_TNIL) { + seedstr = lua_tolstring(L, -1, &seedlen); + seed = XXH3_64bits(seedstr, seedlen); + } else { + dump_stack(L); + } + lua_pop(L, 1); + } + + mcplib_jump_hash_t *jh = lua_newuserdatauv(L, sizeof(mcplib_jump_hash_t), 0); + // TODO: check jh. + + // don't need to loop through the table at all, just need its length. + // could optimize startup time by adding hints to the module for how to + // format pool (ie; just a total count or the full table) + jh->seed = seed; + jh->buckets = buckets; + jh->phc.ctx = jh; + jh->phc.selector_func = mcplib_jump_hash_get_server; + + lua_pushlightuserdata(L, &jh->phc); + + // - return [UD, lightuserdata] + return 2; +} + +static int mcplib_open_jump_hash(lua_State *L) { + const struct luaL_Reg jump_f[] = { + {"new", mcplib_jump_hash_new}, + {NULL, NULL}, + }; + + luaL_newlib(L, jump_f); + + return 1; +} + +/*** END jump consistent hash library ***/ + +/*** START lua interface to user stats ***/ + +// mcp.add_stat(index, name) +// creates a custom lua stats counter +static int mcplib_add_stat(lua_State *L) { + LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); + if (t != NULL) { + proxy_lua_error(L, "add_stat must be called from config_pools"); + return 0; + } + int idx = luaL_checkinteger(L, -2); + const char *name = luaL_checkstring(L, -1); + + if (idx < 1) { + proxy_lua_error(L, "stat index must be 1 or higher"); + return 0; + } + // max user counters? 1024? some weird number. + if (idx > 1024) { + proxy_lua_error(L, "stat index must be 1024 or less"); + return 0; + } + // max name length? avoids errors if something huge gets thrown in. + if (strlen(name) > STAT_KEY_LEN - 6) { + // we prepend "user_" to the output. + null byte. + proxy_lua_ferror(L, "stat name too long: %s\n", name); + return 0; + } + // restrict characters, at least no spaces/newlines. + for (int x = 0; x < strlen(name); x++) { + if (isspace(name[x])) { + proxy_lua_error(L, "stat cannot contain spaces or newlines"); + return 0; + } + } + + proxy_ctx_t *ctx = settings.proxy_ctx; // TODO: store ctx in upvalue. + + // just to save some typing. + STAT_L(ctx); + struct proxy_user_stats *us = &ctx->user_stats; + + // if num_stats is 0 we need to init sizes. + // TODO: malloc fail checking. + if (us->num_stats < idx) { + // don't allocate counters memory for the global ctx. + char **nnames = calloc(idx, sizeof(char *)); + if (us->names != NULL) { + for (int x = 0; x < us->num_stats; x++) { + nnames[x] = us->names[x]; + } + free(us->names); + } + us->names = nnames; + us->num_stats = idx; + } + + idx--; // real slot start as 0. + // if slot has string in it, free first + if (us->names[idx] != NULL) { + free(us->names[idx]); + } + // strdup name into string slot + // TODO: malloc failure. + us->names[idx] = strdup(name); + STAT_UL(ctx); + + return 0; +} + +static int mcplib_stat(lua_State *L) { + LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE)); + if (t == NULL) { + proxy_lua_error(L, "stat must be called from router handlers"); + return 0; + } + + struct proxy_user_stats *tus = t->proxy_stats; + if (tus == NULL) { + proxy_lua_error(L, "no stats counters initialized"); + return 0; + } + + int idx = luaL_checkinteger(L, -2); + int change = luaL_checkinteger(L, -1); + + if (idx < 1 || idx > tus->num_stats) { + proxy_lua_error(L, "stat index out of range"); + return 0; + } + + idx--; // actual array is 0 indexed. + WSTAT_L(t); + tus->counters[idx] += change; + WSTAT_UL(t); + + return 0; +} + +/*** END lua interface to user stats ***/ + +/*** START lua await() object interface ***/ + +typedef struct mcp_await_s { + int pending; + int wait_for; + int req_ref; + int argtable_ref; // need to hold refs to any potential hash selectors + int restable_ref; // table of result objects + int coro_ref; // reference to parent coroutine + bool completed; // have we completed the parent coroutine or not + mcp_request_t *rq; + mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop) +} mcp_await_t; + +// local restable = mcp.await(request, hashselectors, num_wait) +// NOTE: need to hold onto the hash selector objects since those hold backend +// references. Here we just keep a reference to the argument table. +static int mcplib_await(lua_State *L) { + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + luaL_checktype(L, 2, LUA_TTABLE); + int n = luaL_len(L, 2); // length of hash selector table + int wait_for = 0; // 0 means wait for all responses + + if (lua_isnumber(L, 3)) { + wait_for = lua_tointeger(L, 3); + lua_pop(L, 1); + if (wait_for > n) { + wait_for = n; + } + } + // TODO: bail if selector table was 0 len? else bad things can happen. + + // TODO: quickly loop table once and ensure they're all hash selectors? + int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table + int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object. + + // stack will be only the await object now + mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0); + memset(aw, 0, sizeof(mcp_await_t)); + aw->wait_for = wait_for; + aw->pending = n; + aw->argtable_ref = argtable_ref; + aw->rq = rq; + aw->req_ref = req_ref; + P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n); + //dump_stack(L); + + return lua_yield(L, 1); +} + +static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref) { + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); + + mcp_backend_t *be = rq->be; + + // Then we push a response object, which we'll re-use later. + // reserve one uservalue for a lua-supplied response. + mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1); + if (r == NULL) { + proxy_lua_error(Lc, "out of memory allocating response"); + return; + } + memset(r, 0, sizeof(mcp_resp_t)); + r->buf = NULL; + r->blen = 0; + r->start = rq->start; + int x; + int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2; + for (x = 0; x < end; x++) { + if (rq->pr.request[x] == ' ') { + break; + } + r->cmd[x] = rq->pr.request[x]; + } + r->cmd[x] = '\0'; + + luaL_getmetatable(Lc, "mcp.response"); + lua_setmetatable(Lc, -2); + + io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache); + // FIXME: can this fail? + + // this is a re-cast structure, so assert that we never outsize it. + assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t)); + memset(p, 0, sizeof(io_pending_proxy_t)); + // set up back references. + p->io_queue_type = IO_QUEUE_PROXY; + p->thread = c->thread; + p->c = c; + p->resp = NULL; + p->client_resp = r; + p->flushed = false; + p->ascii_multiget = rq->ascii_multiget; + + // io_p needs to hold onto its own response reference, because we may or + // may not include it in the final await() result. + p->mcpres_ref = luaL_ref(Lc, LUA_REGISTRYINDEX); // pops mcp.response + + // avoiding coroutine reference for sub-IO + p->coro_ref = 0; + p->coro = NULL; + + // await specific + p->is_await = true; + p->await_ref = await_ref; + + // The direct backend object. await object is holding reference + p->backend = be; + + mcp_request_attach(Lc, rq, p); + + // link into the batch chain. + p->next = q->stack_ctx; + q->stack_ctx = p; + P_DEBUG("%s: queued\n", __func__); + + return; +} + +static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) { + P_DEBUG("%s: start\n", __func__); + mcp_await_t *aw = lua_touserdata(L, -1); + int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped. + assert(aw != NULL); + lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1 + //dump_stack(L); + P_DEBUG("%s: argtable len: %d\n", __func__, (int)lua_rawlen(L, -1)); + mcp_request_t *rq = aw->rq; + aw->coro_ref = coro_ref; + + // create result table + lua_newtable(L); // -> 2 + aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table + + // prepare the request key + const char *key = MCP_PARSER_KEY(rq->pr); + size_t len = rq->pr.klen; + // loop arg table and run each hash selector + lua_pushnil(L); // -> 3 + while (lua_next(L, 1) != 0) { + P_DEBUG("%s: top of loop\n", __func__); + // (key, -2), (val, -1) + // FIXME: move to a func. mostly redundant with hsp_call()? + mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy"); + if (pp == NULL) { + // TODO: fatal! wasn't correct object type + } + mcp_pool_t *p = pp->main; + + uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx); + // NOTE: rq->be is only held to help pass the backend into the IOP in + // mcp_queue call. Could be a local variable and an argument too. + if (p->phc.ctx == NULL) { + rq->be = p->pool[lookup % p->pool_size].be; + } else { + rq->be = p->pool[lookup-1].be; + } + + mcp_queue_await_io(c, L, rq, await_ref); + + // pop value, keep key. + lua_pop(L, 1); + } + + lua_pop(L, 1); // remove table key. + aw->resp = c->resp; // cuddle the current mc_resp to fill later + + // we count the await as the "response pending" since it covers a single + // response object. the sub-IO's don't count toward the redispatch of *c + io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY); + q->count++; + + P_DEBUG("%s\n", __func__); + //dump_stack(L); // should be empty + + return 0; +} + +//lua_rawseti(L, -2, x++); +static int mcplib_await_return(io_pending_proxy_t *p) { + mcp_await_t *aw; + lua_State *L = p->thread->L; // use the main VM coroutine for work + bool cleanup = false; + bool valid = false; + bool completing = false; + + // TODO: just push the await ptr into *p? + lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref); + aw = lua_touserdata(L, -1); + lua_pop(L, 1); // remove AW object from stack + assert(aw != NULL); + P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending); + //dump_stack(L); + + aw->pending--; + // Await not yet satisfied. + // If wait_for != 0 check for response success + // if success and wait_for is *now* 0, we complete. + // add successful response to response table + // Also, if no wait_for, add response to response table + if (!aw->completed) { + if (aw->wait_for > 0) { + if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) { + valid = true; + } + aw->wait_for--; + + if (aw->wait_for == 0) { + completing = true; + } + } else { + valid = true; + } + } + + // note that post-completion, we stop gathering responses into the + // resposne table... because it's already been returned. + // So "valid" can only be true if also !completed + if (aw->pending == 0) { + if (!aw->completed) { + // were waiting for all responses. + completing = true; + } + cleanup = true; + P_DEBUG("%s: pending == 0\n", __func__); + } + + // a valid response to add to the result table. + if (valid) { + P_DEBUG("%s: valid\n", __func__); + lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 + lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2 + // couldn't find a table.insert() equivalent; so this is + // inserting into the length + 1 position manually. + //dump_stack(L); + lua_rawseti(L, 1, lua_rawlen(L, 1) + 1); // pops mcpres + lua_pop(L, 1); // pops restable + } + + // lose our internal mcpres reference regardless. + luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref); + // our await_ref is shared, so we don't need to release it. + + if (completing) { + P_DEBUG("%s: completing\n", __func__); + aw->completed = true; + // if we haven't completed yet, the connection reference is still + // valid. So now we pull it, reduce count, and readd if necessary. + // here is also the point where we resume the coroutine. + lua_rawgeti(L, LUA_REGISTRYINDEX, aw->coro_ref); + lua_State *Lc = lua_tothread(L, -1); + lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1 + proxy_run_coroutine(Lc, aw->resp, NULL, p->c); + luaL_unref(L, LUA_REGISTRYINDEX, aw->coro_ref); + luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref); + + io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type); + q->count--; + if (q->count == 0) { + // call re-add directly since we're already in the worker thread. + conn_worker_readd(p->c); + } + + } + + if (cleanup) { + P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed); + luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref); + luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref); + luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref); + } + + // Just remove anything we could have left on the primary VM stack + lua_settop(L, 0); + + // always return free this sub-IO object. + do_cache_free(p->thread->io_cache, p); + + return 0; +} + +/*** END lua await() object interface ***/ + +// Creates and returns the top level "mcp" module +int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { + lua_State *L = ctx; + + const struct luaL_Reg mcplib_backend_m[] = { + {"set", NULL}, + {"__gc", mcplib_backend_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_request_m[] = { + {"command", mcplib_request_command}, + {"key", mcplib_request_key}, + {"ltrimkey", mcplib_request_ltrimkey}, + {"rtrimkey", mcplib_request_rtrimkey}, + {"token", mcplib_request_token}, + {"ntokens", mcplib_request_ntokens}, + {"__tostring", NULL}, + {"__gc", mcplib_request_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_response_m[] = { + {"ok", mcplib_response_ok}, + {"hit", mcplib_response_hit}, + {"__gc", mcplib_response_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_pool_m[] = { + {"__gc", mcplib_pool_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_pool_proxy_m[] = { + {"__call", mcplib_pool_proxy_call}, + {"__gc", mcplib_pool_proxy_gc}, + {NULL, NULL} + }; + + const struct luaL_Reg mcplib_f [] = { + {"pool", mcplib_pool}, + {"backend", mcplib_backend}, + {"request", mcplib_request}, + {"attach", mcplib_attach}, + {"add_stat", mcplib_add_stat}, + {"stat", mcplib_stat}, + {"await", mcplib_await}, + {"backend_connect_timeout", mcplib_backend_connect_timeout}, + {"backend_retry_timeout", mcplib_backend_retry_timeout}, + {"backend_read_timeout", mcplib_backend_read_timeout}, + {NULL, NULL} + }; + + // TODO: function + loop. + luaL_newmetatable(L, "mcp.backend"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_backend_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.request"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_request_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.response"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_response_m, 0); // register methods + lua_pop(L, 1); + + luaL_newmetatable(L, "mcp.pool"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_pool_m, 0); // register methods + lua_pop(L, 1); // drop the hash selector metatable + + luaL_newmetatable(L, "mcp.pool_proxy"); + lua_pushvalue(L, -1); // duplicate metatable. + lua_setfield(L, -2, "__index"); // mt.__index = mt + luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods + lua_pop(L, 1); // drop the hash selector metatable + + // create main library table. + //luaL_newlib(L, mcplib_f); + // TODO: luaL_newlibtable() just pre-allocs the exact number of things + // here. + // can replace with createtable and add the num. of the constant + // definitions. + luaL_newlibtable(L, mcplib_f); + proxy_register_defines(L); + + // hash function for selectors. + // have to wrap the function in a struct because function pointers aren't + // pointer pointers :) + mcplib_open_jump_hash(L); + lua_setfield(L, -2, "hash_jump"); + // FIXME: remove this once multi-probe is in, use that as default instead. + lua_pushlightuserdata(L, &mcplib_hashfunc_murmur3); + lua_setfield(L, -2, "hash_murmur3"); + + lua_pushlightuserdata(L, (void *)t); // upvalue for original thread + lua_newtable(L); // upvalue for mcp.attach() table. + + // create weak table for storing backends by label. + lua_newtable(L); // {} + lua_newtable(L); // {}, {} for metatable + lua_pushstring(L, "v"); // {}, {}, "v" for weak values. + lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"} + lua_setmetatable(L, -2); // {__mt = {__mode = "v"} } + + luaL_setfuncs(L, mcplib_f, 3); // store upvalues. + + lua_setglobal(L, "mcp"); // set the lib table to mcp global. + return 1; +} diff --git a/proto_proxy.h b/proto_proxy.h new file mode 100644 index 0000000..aa58323 --- /dev/null +++ b/proto_proxy.h @@ -0,0 +1,26 @@ +#ifndef PROTO_PROXY_H +#define PROTO_PROXY_H + +void proxy_stats(ADD_STAT add_stats, conn *c); +void process_proxy_stats(ADD_STAT add_stats, conn *c); + +/* proxy mode handlers */ +int try_read_command_proxy(conn *c); +void complete_nread_proxy(conn *c); +void proxy_thread_init(LIBEVENT_THREAD *thr); +void proxy_init(void); +// TODO: need better names or a better interface for these. can be confusing +// to reason about the order. +void proxy_start_reload(void *arg); +int proxy_load_config(void *arg); +void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr); + +void proxy_submit_cb(io_queue_t *q); +void proxy_complete_cb(io_queue_t *q); +void proxy_return_cb(io_pending_t *pending); +void proxy_finalize_cb(io_pending_t *pending); + +/* lua */ +int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx); + +#endif diff --git a/proto_text.c b/proto_text.c index 000ddd1..cfe5d97 100644 --- a/proto_text.c +++ b/proto_text.c @@ -5,6 +5,10 @@ #include "memcached.h" #include "proto_text.h" +// FIXME: only for process_proxy_stats() +// - some better/different structure for stats subcommands +// would remove this abstraction leak. +#include "proto_proxy.h" #include "authfile.h" #include "storage.h" #include "base64.h" @@ -42,8 +46,6 @@ } \ } -static void process_command(conn *c, char *command); - typedef struct token_s { char *value; size_t length; @@ -481,7 +483,7 @@ int try_read_command_ascii(conn *c) { assert(cont <= (c->rcurr + c->rbytes)); c->last_cmd_time = current_time; - process_command(c, c->rcurr); + process_command_ascii(c, c->rcurr); c->rbytes -= (cont - c->rcurr); c->rcurr = cont; @@ -792,6 +794,10 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { } else if (strcmp(subcommand, "extstore") == 0) { process_extstore_stats(&append_stats, c); #endif +#ifdef PROXY + } else if (strcmp(subcommand, "proxy") == 0) { + process_proxy_stats(&append_stats, c); +#endif } else { /* getting here means that the subcommand is either engine specific or is invalid. query the engine and see. */ @@ -2684,7 +2690,7 @@ static void process_refresh_certs_command(conn *c, token_t *tokens, const size_t // we can't drop out and back in again. // Leaving this note here to spend more time on a fix when necessary, or if an // opportunity becomes obvious. -static void process_command(conn *c, char *command) { +void process_command_ascii(conn *c, char *command) { token_t tokens[MAX_TOKENS]; size_t ntokens; diff --git a/proto_text.h b/proto_text.h index b626f97..aae0ed6 100644 --- a/proto_text.h +++ b/proto_text.h @@ -5,5 +5,6 @@ void complete_nread_ascii(conn *c); int try_read_command_asciiauth(conn *c); int try_read_command_ascii(conn *c); +void process_command_ascii(conn *c, char *command); #endif diff --git a/t/lib/MemcachedTest.pm b/t/lib/MemcachedTest.pm index 2a0fcb7..c5cb6fb 100644 --- a/t/lib/MemcachedTest.pm +++ b/t/lib/MemcachedTest.pm @@ -18,7 +18,7 @@ my @unixsockets = (); mem_get_is mem_gets mem_gets_is mem_stats mem_move_time supports_sasl free_port supports_drop_priv supports_extstore wait_ext_flush supports_tls enabled_tls_testing run_help - supports_unix_socket); + supports_unix_socket get_memcached_exe supports_proxy); use constant MAX_READ_WRITE_SIZE => 16384; use constant SRV_CRT => "server_crt.pem"; @@ -215,6 +215,12 @@ sub supports_extstore { return 0; } +sub supports_proxy { + my $output = print_help(); + return 1 if $output =~ /proxy_config/i; + return 0; +} + sub supports_tls { my $output = print_help(); return 1 if $output =~ /enable-ssl/i; diff --git a/t/proxy.t b/t/proxy.t new file mode 100644 index 0000000..c85796d --- /dev/null +++ b/t/proxy.t @@ -0,0 +1,263 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Test::More; +use FindBin qw($Bin); +use lib "$Bin/lib"; +use Carp qw(croak); +use MemcachedTest; + +# TODO: to module? +# or "gettimedrun" etc +use Cwd; +my $builddir = getcwd; + +if (!supports_proxy()) { + plan skip_all => 'proxy not enabled'; + exit 0; +} + +# TODO: the lua file has hardcoded ports. any way to make this dynamic? +# TODO: once basic tests are done, actually split out the instances rather +# than the shared backend; validate keys go where they should be going. + +# FIXME: this listend on unix socket still. either need a manual runner or a +# fix upstream. +my @srv = (); +for (2 .. 6) { + my $srv = run_server("-p 1121$_", 11210 + $_); + push(@srv, $srv); +} +#my $sock = $srv->sock; + +my $p_srv = new_memcached('-o proxy_config=./t/startfile.lua -l 127.0.0.1', 11211); +my $p_sock = $p_srv->sock; + +# hack to help me use T_MEMD_USE_DAEMON for proxy. +#print STDERR "Sleeping\n"; +#sleep 900; + +# cmds to test: +# - noreply for main text commands? +# meta: +# me +# mn +# mg +# ms +# md +# ma +# - noreply? +# stats +# pass-thru? + +# incr/decr +{ + print $p_sock "set /foo/num 0 0 1\r\n1\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored num"); + mem_get_is($p_sock, "/foo/num", 1, "stored 1"); + + print $p_sock "incr /foo/num 1\r\n"; + is(scalar <$p_sock>, "2\r\n", "+ 1 = 2"); + mem_get_is($p_sock, "/foo/num", 2); + + print $p_sock "incr /foo/num 8\r\n"; + is(scalar <$p_sock>, "10\r\n", "+ 8 = 10"); + mem_get_is($p_sock, "/foo/num", 10); + + print $p_sock "decr /foo/num 1\r\n"; + is(scalar <$p_sock>, "9\r\n", "- 1 = 9"); + + print $p_sock "decr /foo/num 9\r\n"; + is(scalar <$p_sock>, "0\r\n", "- 9 = 0"); + + print $p_sock "decr /foo/num 5\r\n"; + is(scalar <$p_sock>, "0\r\n", "- 5 = 0"); +} + +# gat +{ + # cache miss + print $p_sock "gat 10 /foo/foo1\r\n"; + is(scalar <$p_sock>, "END\r\n", "cache miss"); + + # set /foo/foo1 and /foo/foo2 (and should get it) + print $p_sock "set /foo/foo1 0 2 7\r\nfooval1\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored foo"); + + print $p_sock "set /foo/foo2 0 2 7\r\nfooval2\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored /foo/foo2"); + + # get and touch it with cas + print $p_sock "gats 10 /foo/foo1 /foo/foo2\r\n"; + like(scalar <$p_sock>, qr/VALUE \/foo\/foo1 0 7 (\d+)\r\n/, "get and touch foo1 with cas regexp success"); + is(scalar <$p_sock>, "fooval1\r\n","value"); + like(scalar <$p_sock>, qr/VALUE \/foo\/foo2 0 7 (\d+)\r\n/, "get and touch foo2 with cas regexp success"); + is(scalar <$p_sock>, "fooval2\r\n","value"); + is(scalar <$p_sock>, "END\r\n", "end"); + + # get and touch it without cas + print $p_sock "gat 10 /foo/foo1 /foo/foo2\r\n"; + like(scalar <$p_sock>, qr/VALUE \/foo\/foo1 0 7\r\n/, "get and touch foo1 without cas regexp success"); + is(scalar <$p_sock>, "fooval1\r\n","value"); + like(scalar <$p_sock>, qr/VALUE \/foo\/foo2 0 7\r\n/, "get and touch foo2 without cas regexp success"); + is(scalar <$p_sock>, "fooval2\r\n","value"); + is(scalar <$p_sock>, "END\r\n", "end"); +} + +# gets/cas +{ + print $p_sock "add /foo/moo 0 0 6\r\nmooval\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored mooval"); + mem_get_is($p_sock, "/foo/moo", "mooval"); + + # check-and-set (cas) failure case, try to set value with incorrect cas unique val + print $p_sock "cas /foo/moo 0 0 6 0\r\nMOOVAL\r\n"; + is(scalar <$p_sock>, "EXISTS\r\n", "check and set with invalid id"); + + # test "gets", grab unique ID + print $p_sock "gets /foo/moo\r\n"; + # VALUE moo 0 6 3084947704 + # + my @retvals = split(/ /, scalar <$p_sock>); + my $data = scalar <$p_sock>; # grab data + my $dot = scalar <$p_sock>; # grab dot on line by itself + is($retvals[0], "VALUE", "get value using 'gets'"); + my $unique_id = $retvals[4]; + # clean off \r\n + $unique_id =~ s/\r\n$//; + ok($unique_id =~ /^\d+$/, "unique ID '$unique_id' is an integer"); + # now test that we can store moo with the correct unique id + print $p_sock "cas /foo/moo 0 0 6 $unique_id\r\nMOOVAL\r\n"; + is(scalar <$p_sock>, "STORED\r\n"); + mem_get_is($p_sock, "/foo/moo", "MOOVAL"); +} + +# touch +{ + print $p_sock "set /foo/t 0 2 6\r\nfooval\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored foo"); + mem_get_is($p_sock, "/foo/t", "fooval"); + + # touch it + print $p_sock "touch /foo/t 10\r\n"; + is(scalar <$p_sock>, "TOUCHED\r\n", "touched foo"); + + # don't need to sleep/validate the touch worked. We're testing the + # protocol, not the functionality. +} + +# command endings +# NOTE: memcached always allowed [\r]\n for single command lines, but payloads +# (set/etc) require exactly \r\n as termination. +# doc/protocol.txt has always specified \r\n for command/response. +# Proxy is more strict than normal server in this case. +{ + my $s = $srv[0]->sock; + print $s "version\n"; + like(<$s>, qr/VERSION/, "direct server version cmd with just newline"); + print $p_sock "version\n"; + like(<$p_sock>, qr/SERVER_ERROR/, "proxy version cmd with just newline"); + print $p_sock "version\r\n"; + like(<$p_sock>, qr/VERSION/, "proxy version cmd with full CRLF"); +} + +# set through proxy. +{ + print $p_sock "set /foo/z 0 0 5\r\nhello\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); + # ensure it's fetchable. + mem_get_is($p_sock, "/foo/z", "hello"); + # delete it. + print $p_sock "delete /foo/z\r\n"; + is(scalar <$p_sock>, "DELETED\r\n", "removed test value"); + # ensure it's deleted. + mem_get_is($p_sock, "/foo/z", undef); +} + +# test add. +{ + print $p_sock "add /foo/a 0 0 3\r\nmoo\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "add test value through proxy"); + # ensure it's fetchable + mem_get_is($p_sock, "/foo/a", "moo"); + # check re-adding fails. + print $p_sock "add /foo/a 0 0 3\r\ngoo\r\n"; + is(scalar <$p_sock>, "NOT_STORED\r\n", "re-add fails"); + # ensure we still hae the old value + mem_get_is($p_sock, "/foo/a", "moo"); +} + +# pipelined set. +{ + my $str = "set /foo/k 0 0 5\r\nhello\r\n"; + print $p_sock "$str$str$str$str$str"; + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); + is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy"); +} + +# Load some keys through proxy. +my $bdata = 'x' x 256000; +{ + for (1..20) { + print $p_sock "set /foo/a$_ 0 0 2\r\nhi\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored test value"); + print $p_sock "set /bar/b$_ 0 0 2\r\nhi\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored test value"); + } + + # load a couple larger values + for (1..4) { + print $p_sock "set /foo/big$_ 0 0 256000\r\n$bdata\r\n"; + is(scalar <$p_sock>, "STORED\r\n", "stored big value"); + } + diag "set large values"; +} + +# fetch through proxy. +{ + for (1..20) { + mem_get_is($p_sock, "/foo/a$_", "hi"); + } + diag "fetched small values"; + mem_get_is($p_sock, "/foo/big1", $bdata); + diag "fetched big value"; +} + +sub run_server { + my ($args, $port) = @_; + + my $exe = get_memcached_exe(); + + my $childpid = fork(); + + my $root = ''; + $root = "-u root" if ($< == 0); + + # test build requires more privileges + $args .= " -o relaxed_privileges"; + + my $cmd = "$builddir/timedrun 120 $exe $root $args"; + + unless($childpid) { + exec $cmd; + exit; # NOTREACHED + } + + for (1..20) { + my $conn = IO::Socket::INET->new(PeerAddr => "127.0.0.1:$port"); + if ($conn) { + return Memcached::Handle->new(pid => $childpid, + conn => $conn, + host => "127.0.0.1", + port => $port); + } + select undef, undef, undef, 0.10; + } + croak "Failed to start server."; +} + +done_testing(); diff --git a/t/startfile.lua b/t/startfile.lua new file mode 100644 index 0000000..e267655 --- /dev/null +++ b/t/startfile.lua @@ -0,0 +1,280 @@ +-- xTODO: sets of zones behind a prefix. +-- xTODO: zones with local vs other failover. +-- xTODO: failover on get +-- xTODO: all zone sync on set +-- TODO: fallback cache for broken/overloaded zones. + +-- local zone could/should be fetched from environment or local file. +-- doing so allows all configuration files to be identical, simplifying consistency checks. +local my_zone = 'z1' + +local STAT_EXAMPLE <const> = 1 +local STAT_ANOTHER <const> = 2 + +function mcp_config_pools(oldss) + mcp.add_stat(STAT_EXAMPLE, "example") + mcp.add_stat(STAT_ANOTHER, "another") + mcp.backend_connect_timeout(5) -- 5 second timeout. + -- alias mcp.backend for convenience. + -- important to alias global variables in routes where speed is concerned. + local srv = mcp.backend + -- local zones = { 'z1', 'z2', 'z3' } + + -- IPs are "127" . "zone" . "pool" . "srv" + local pfx = 'fooz1' + local fooz1 = { + srv(pfx .. 'srv1', '127.1.1.1', 11212, 1), + srv(pfx .. 'srv2', '127.1.1.2', 11212, 1), + srv(pfx .. 'srv3', '127.1.1.3', 11212, 1), + } + pfx = 'fooz2' + local fooz2 = { + srv(pfx .. 'srv1', '127.2.1.1', 11213, 1), + srv(pfx .. 'srv2', '127.2.1.2', 11213, 1), + srv(pfx .. 'srv3', '127.2.1.3', 11213, 1), + } + pfx = 'fooz3' + local fooz3 = { + srv(pfx .. 'srv1', '127.3.1.1', 11214, 1), + srv(pfx .. 'srv2', '127.3.1.2', 11214, 1), + srv(pfx .. 'srv3', '127.3.1.3', 11214, 1), + } + + pfx = 'barz1' + -- zone "/bar/"-s primary zone should fail; all down. + local barz1 = { + srv(pfx .. 'srv1', '127.1.2.1', 11210, 1), + srv(pfx .. 'srv2', '127.1.2.1', 11210, 1), + srv(pfx .. 'srv3', '127.1.2.1', 11210, 1), + } + pfx = 'barz2' + local barz2 = { + srv(pfx .. 'srv1', '127.2.2.2', 11215, 1), + srv(pfx .. 'srv2', '127.2.2.2', 11215, 1), + srv(pfx .. 'srv3', '127.2.2.2', 11215, 1), + } + pfx = 'barz3' + local barz3 = { + srv(pfx .. 'srv1', '127.3.2.3', 11216, 1), + srv(pfx .. 'srv2', '127.3.2.3', 11216, 1), + srv(pfx .. 'srv3', '127.3.2.3', 11216, 1), + } + + -- fallback cache for any zone + -- NOT USED YET + pfx = 'fallz1' + local fallz1 = { + srv(pfx .. 'srv1', '127.0.2.1', 11212, 1), + } + pfx = 'fallz2' + local fallz2 = { + srv(pfx .. 'srv1', '127.0.2.2', 11212, 1), + } + pfx = 'fallz3' + local fallz3 = { + srv(pfx .. 'srv1', '127.0.2.3', 11212, 1), + } + + local main_zones = { + foo = { z1 = fooz1, z2 = fooz2, z3 = fooz3 }, + bar = { z1 = barz1, z2 = barz2, z3 = barz3 }, + -- fall = { z1 = fallz1, z2 = fallz2, z3 = fallz3 }, + } + + -- FIXME: should we copy the table to keep the pool tables around? + -- does the hash selector hold a reference to the pool (but only available in main config?) + + -- uncomment to use the ketama loadable module. + -- FIXME: passing an argument to the ketama module doesn't work yet. + -- local ketama = require("ketama") + + -- convert the pools into hash selectors. + -- TODO: is this a good place to add prefixing/hash editing? + for _, subs in pairs(main_zones) do + for k, v in pairs(subs) do + -- use next line instead for a third party ketama hash + -- subs[k] = mcp.pool(v, { dist = ketama }) + -- this line overrides the default bucket size for ketama + -- subs[k] = mcp.pool(v, { dist = ketama, obucket = 80 }) + -- this line uses the default murmur3 straight hash. + subs[k] = mcp.pool(v) + + -- use this next line instead for jump hash. + -- the order of servers in the pool argument _must_ not change! + -- adding the seed string will give a different key distribution + -- for each zone. + -- NOTE: 'k' may not be the right seed here: + -- instead stitch main_zone's key + the sub key? + -- subs[k] = mcp.pool(v, { dist = mcp.hash_jump, seed = k }) + end + end + + return main_zones +end + +-- WORKER CODE: + +-- need to redefine main_zones using fetched selectors? + +-- TODO: Fallback zone here? +function failover_factory(zones, local_zone) + local near_zone = zones[local_zone] + local far_zones = {} + -- NOTE: could shuffle/sort to re-order zone retry order + -- or use 'next(far_zones, idx)' via a stored upvalue here + for k, v in pairs(zones) do + if k ~= local_zone then + far_zones[k] = v + end + end + return function(r) + local res = near_zone(r) + if res:hit() == false then + for _, zone in pairs(far_zones) do + res = zone(r) + if res:hit() then + break + end + end + end + return res -- send result back to client + end +end + +-- SET's to main zone, issues deletes to far zones. +function setinvalidate_factory(zones, local_zone) + local near_zone = zones[local_zone] + local far_zones = {} + -- NOTE: could shuffle/sort to re-order zone retry order + -- or use 'next(far_zones, idx)' via a stored upvalue here + for k, v in pairs(zones) do + if k ~= local_zone then + far_zones[k] = v + end + end + local new_req = mcp.request + return function(r) + local res = near_zone(r) + if res:ok() == true then + -- create a new delete request + local dr = new_req("delete /testing/" .. r:key() .. "\r\n") + for _, zone in pairs(far_zones) do + -- NOTE: can check/do things on the specific response here. + zone(dr) + end + end + -- use original response for client, not DELETE's response. + -- else client won't understand. + return res -- send result back to client + end +end + +-- NOTE: this function is culling key prefixes. it is an error to use it +-- without a left anchored (^) pattern. +function prefixtrim_factory(pattern, list, default) + local p = pattern + local l = list + local d = default + local s = mcp.stat + return function(r) + local i, j, match = string.find(r:key(), p) + local route + if match ~= nil then + -- remove the key prefix so we don't waste storage. + r:ltrimkey(j) + route = l[match] + if route == nil then + -- example counter: tick when default route hit. + s(STAT_EXAMPLE, 1) + return d(r) + end + end + return route(r) + end +end + +function prefix_factory(pattern, list, default) + local p = pattern + local l = list + local d = default + local s = mcp.stat + return function(r) + local route = l[string.match(r:key(), p)] + if route == nil then + -- example counter: tick when default route hit. + s(STAT_EXAMPLE, 1) + return d(r) + end + return route(r) + end +end + +-- TODO: Check tail call requirements? +function command_factory(map, default) + local m = map + local d = default + return function(r) + local f = map[r:command()] + if f == nil then + -- print("default command") + return d(r) + end + -- testing options replacement... + if r:command() == mcp.CMD_SET then + r:token(4, "100") -- set exptime. + end + -- print("override command") + return f(r) + end +end + +-- TODO: is the return value the average? anything special? +-- walks a list of selectors and repeats the request. +function walkall_factory(pool) + local p = {} + -- TODO: a shuffle could be useful here. + for _, v in pairs(pool) do + table.insert(p, v) + end + local x = #p + return function(r) + local restable = mcp.await(r, p) + -- walk results and return "best" result + -- print("length of await result table", #restable) + for _, res in pairs(restable) do + if res:ok() then + return res + end + end + -- else we return the first result. + return restable[1] + end +end + +function mcp_config_routes(main_zones) + -- generate the prefix routes from zones. + local prefixes = {} + for pfx, z in pairs(main_zones) do + local failover = failover_factory(z, my_zone) + local all = walkall_factory(main_zones[pfx]) + local setdel = setinvalidate_factory(z, my_zone) + local map = {} + map[mcp.CMD_SET] = all + -- NOTE: in t/proxy.t all the backends point to the same place + -- which makes replicating delete return NOT_FOUND + map[mcp.CMD_DELETE] = all + -- similar with ADD. will get an NOT_STORED back. + -- need better routes designed for the test suite (edit the key + -- prefix or something) + map[mcp.CMD_ADD] = failover_factory(z, my_zone) + prefixes[pfx] = command_factory(map, failover) + end + + local routetop = prefix_factory("^/(%a+)/", prefixes, function(r) return "SERVER_ERROR no route\r\n" end) + + -- internally run parser at top of tree + -- also wrap the request string with a convenience object until the C bits + -- are attached to the internal parser. + --mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end) + mcp.attach(mcp.CMD_ANY_STORAGE, routetop) +end @@ -9,6 +9,9 @@ #ifdef HAVE_EVENTFD #include <sys/eventfd.h> #endif +#ifdef PROXY +#include "proto_proxy.h" +#endif #include <assert.h> #include <stdio.h> #include <errno.h> @@ -36,6 +39,9 @@ enum conn_queue_item_modes { queue_redispatch, /* return conn from side thread */ queue_stop, /* exit thread */ queue_return_io, /* returning a pending IO object immediately */ +#ifdef PROXY + queue_proxy_reload, /* signal proxy to reload worker VM */ +#endif }; typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { @@ -475,6 +481,16 @@ static void setup_thread(LIBEVENT_THREAD *me) { storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb); } #endif +#ifdef PROXY + thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb, + proxy_complete_cb, proxy_return_cb, proxy_finalize_cb); + + // TODO: maybe register hooks to be called here from sub-packages? ie; + // extstore, TLS, proxy. + if (settings.proxy_enabled) { + proxy_thread_init(me); + } +#endif thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL); } @@ -600,12 +616,23 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) /* getting an individual IO object back */ conn_io_queue_return(item->io); break; +#ifdef PROXY + case queue_proxy_reload: + proxy_worker_reload(settings.proxy_ctx, me); + break; +#endif } cqi_free(me->ev_queue, item); } } +// NOTE: need better encapsulation. +// used by the proxy module to iterate the worker threads. +LIBEVENT_THREAD *get_worker_thread(int id) { + return &threads[id]; +} + /* Which thread we assigned a connection to most recently. */ static int last_thread = -1; @@ -726,6 +753,11 @@ void redispatch_conn(conn *c) { void timeout_conn(conn *c) { notify_worker_fd(c->thread, c->sfd, queue_timeout); } +#ifdef PROXY +void proxy_reload_notify(LIBEVENT_THREAD *t) { + notify_worker_fd(t, 0, queue_proxy_reload); +} +#endif void return_io_pending(io_pending_t *io) { CQ_ITEM *item = cqi_new(io->thread->ev_queue); @@ -898,6 +930,9 @@ void threadlocal_stats_reset(void) { #ifdef EXTSTORE EXTSTORE_THREAD_STATS_FIELDS #endif +#ifdef PROXY + PROXY_THREAD_STATS_FIELDS +#endif #undef X memset(&threads[ii].stats.slab_stats, 0, @@ -923,6 +958,9 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) { #ifdef EXTSTORE EXTSTORE_THREAD_STATS_FIELDS #endif +#ifdef PROXY + PROXY_THREAD_STATS_FIELDS +#endif #undef X for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { diff --git a/vendor/.gitignore b/vendor/.gitignore new file mode 100644 index 0000000..6ed69cb --- /dev/null +++ b/vendor/.gitignore @@ -0,0 +1,4 @@ +README.md +/mcmc/example +!/mcmc/Makefile +!/Makefile diff --git a/vendor/Makefile b/vendor/Makefile new file mode 100644 index 0000000..9d51aee --- /dev/null +++ b/vendor/Makefile @@ -0,0 +1,10 @@ +all: + cd lua && $(MAKE) all && cd .. + cd mcmc && $(MAKE) all && cd .. + +clean: + cd lua && $(MAKE) clean && cd .. + cd mcmc && $(MAKE) clean && cd .. + +dist: clean +distdir: clean diff --git a/vendor/fetch.sh b/vendor/fetch.sh new file mode 100755 index 0000000..2d54142 --- /dev/null +++ b/vendor/fetch.sh @@ -0,0 +1,5 @@ +#!/bin/sh +HASH="44a55dee1d41c3ae92524df9f0dd8a747db79f04" +wget https://github.com/memcached/memcached-vendor/archive/${HASH}.tar.gz +tar -zxf ./${HASH}.tar.gz --strip-components=1 +rm ${HASH}.tar.gz diff --git a/vendor/lua/.gitignore b/vendor/lua/.gitignore new file mode 100644 index 0000000..d6b7ef3 --- /dev/null +++ b/vendor/lua/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/vendor/mcmc/LICENSE b/vendor/mcmc/LICENSE new file mode 100644 index 0000000..656c8f7 --- /dev/null +++ b/vendor/mcmc/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2021 Cache Forge LLC. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + + * Neither the name of the Danga Interactive nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/mcmc/Makefile b/vendor/mcmc/Makefile new file mode 100644 index 0000000..8ac6290 --- /dev/null +++ b/vendor/mcmc/Makefile @@ -0,0 +1,13 @@ +# gcc -g -Wall -Werror -pedantic -o example example.c mcmc.c +PREFIX=/usr/local + +all: + gcc -g -O2 -Wall -Werror -pedantic -o example example.c mcmc.c + gcc -g -O2 -Wall -Werror -pedantic -c mcmc.c + +clean: + rm -f example mcmc.o + +dist: clean + +distdir: diff --git a/vendor/mcmc/README.md b/vendor/mcmc/README.md new file mode 100644 index 0000000..10d4880 --- /dev/null +++ b/vendor/mcmc/README.md @@ -0,0 +1,52 @@ +# Minimal (C) Client for MemCached + +WARNING: WORK IN PROGRESS. Missing features or testing! + +MCMC is a minimalistic allocation-free modern client for memcached. It uses a +generic response parser, allowing a single code path regardless of the command +sent to memcached. It has no 3rd party dependencies and is designed to +integrate as a building block into full clients. + +MCMC does not (yet) include a typical memcached "selector". Meaning the +ability to add many servers to a hash table of some kind and routing keys to +specific servers. The MCMC base client is designed to be an object that +selector objects hold and then issue commands against. + +Allocation-free (aside from a call to `getaddrinfo()`) means it does not +_internally_ do any allocations, relying only on the stack. It requires you +malloc a small structure and some buffers, but you are then free to manage +them yourselves. Clients do not hold onto buffers when idle, cutting their +memory overhead to a handful of bytes plus the TCP socket. + +MCMC is designed to be a building block for users designing full clients. +For example: + +* A client author wants to implement the "get" command +* They write a function in their native language's wrapper which accepts the + key to fetch and embeds that into a text buffer to look like `get [key]\r\n` +* They then call mcmc's functions to send and read the response, parsing and + returning it to the client. + +This should be the same, if not less, code than wrapping a full C client with +every possible command broken out. It also means 3rd party clients can (and +should!) embed mcmc.c/mcmc.h (and any selector code they want) rather than be +dependent on system distribution of a more complex client. + +The allocation-free nature also makes unit testing the client code easier, +hopefully leading to higher quality. + +Caveats: + +* Care should be taken when handling the buffers mcmc requires to operate. + Since there are few operators you should only have to pay attention once :) +* It does not support the various maintenance/settings commands (ie; `lru_crawler`). + It may gain some generic support for this, but those commands were not +designed with consistent response codes and are hard to implement. +* Does not support the binary protocol, which has been deprecated as of 1.6.0. + +As of this writing the code is being released _early_ (perhaps too early?). It +may not have proper makefiles, tests, or a fully implemented API. The code has +been posted so client authors and users can give early feedback on the API in +hopes of prodiving something high quality and stable. + +Again, looking for feedback! Open an issue or let me know what you think. diff --git a/vendor/mcmc/example.c b/vendor/mcmc/example.c new file mode 100644 index 0000000..958c351 --- /dev/null +++ b/vendor/mcmc/example.c @@ -0,0 +1,214 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <stdint.h> +#include <poll.h> +#include <signal.h> +#include <sys/uio.h> + +#include "mcmc.h" + +static void show_response(void *c, char *rbuf, size_t bufsize) { + int status; + // buffer shouldn't change until the read is completed. + mcmc_resp_t resp; + int go = 1; + while (go) { + go = 0; + status = mcmc_read(c, rbuf, bufsize, &resp); + if (status == MCMC_OK) { + // OK means a response of some kind was read. + char *val; + // NOTE: is "it's not a miss, and vlen is 0" enough to indicate that + // a 0 byte value was returned? + if (resp.vlen != 0) { + if (resp.vlen == resp.vlen_read) { + val = resp.value; + } else { + val = malloc(resp.vlen); + int read = 0; + do { + status = mcmc_read_value(c, val, resp.vlen, &read); + } while (status == MCMC_WANT_READ); + } + if (resp.vlen > 0) { + val[resp.vlen-1] = '\0'; + printf("Response value: %s\n", val); + } + } + switch (resp.type) { + case MCMC_RESP_GET: + // GET's need to continue until END is seen. + printf("in GET mode\n"); + go = 1; + break; + case MCMC_RESP_END: // ascii done-with-get's + printf("END seen\n"); + break; + case MCMC_RESP_META: // any meta command. they all return the same. + printf("META response seen\n"); + if (resp.rlen > 0) { + resp.rline[resp.rlen-1] = '\0'; + printf("META response line: %s\n", resp.rline); + } + break; + case MCMC_RESP_STAT: + // STAT responses. need to call mcmc_read() in loop until + // we get an end signal. + go = 1; + break; + default: + // TODO: type -> str func. + fprintf(stderr, "unknown response type: %d\n", resp.type); + break; + } + } else { + // some kind of command specific error code (management commands) + // or protocol error status. + char code[MCMC_ERROR_CODE_MAX]; + char msg[MCMC_ERROR_MSG_MAX]; + mcmc_get_error(c, code, MCMC_ERROR_CODE_MAX, msg, MCMC_ERROR_MSG_MAX); + fprintf(stderr, "Got error from mc: status [%d] code [%s] msg: [%s]\n", status, code, msg); + // some errors don't have a msg. in this case msg[0] will be \0 + } + + int remain = 0; + // advance us to the next command in the buffer, or ready for the next + // mc_read(). + char *newbuf = mcmc_buffer_consume(c, &remain); + printf("remains in buffer: %d\n", remain); + if (remain == 0) { + assert(newbuf == NULL); + // we're done. + } else { + // there're still some bytes unconsumed by the client. + // ensure the next time we call the client, the buffer has those + // bytes at the front still. + // NOTE: this _could_ be an entirely different buffer if we copied + // the data off. The client is just tracking the # of bytes it + // didn't gobble. + // In this case we shuffle the bytes back to the front of our read + // buffer. + memmove(rbuf, newbuf, remain); + } + } +} + +int main (int argc, char *agv[]) { + // TODO: detect if C is pre-C11? + printf("C version: %ld\n", __STDC_VERSION__); + + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { + perror("signal"); + exit(1); + } + + void *c = malloc(mcmc_size(MCMC_OPTION_BLANK)); + // we only "need" the minimum buf size. + // buffers large enough to fit return values result in fewer syscalls. + size_t bufsize = mcmc_min_buffer_size(MCMC_OPTION_BLANK) * 2; + // buffers are also generally agnostic to clients. The buffer must be + // held and re-used when required by the API. When the buffer is empty, + // it may be released to a pool or reused with other connections. + char *rbuf = malloc(bufsize); + + int status; + + // API is blocking by default. + status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_BLANK); + + if (status != MCMC_CONNECTED) { + // TODO: mc_strerr(c); + fprintf(stderr, "Failed to connect to memcached\n"); + return -1; + } + + char *requests[5] = {"get foo\r\n", + "get foob\r\n", + "mg foo s t v\r\n", + "mg doof s t v Omoo k\r\n", + ""}; + + for (int x = 0; strlen(requests[x]) != 0; x++) { + // provide a buffer, the buffer length, and the number of responses + // expected. ie; if pipelining many requests, or using noreply semantics. + // FIXME: not confident "number of expected responses" is worth tracking + // internally. + status = mcmc_send_request(c, requests[x], strlen(requests[x]), 1); + //printf("sent request: %s\n", requests[x]); + + if (status != MCMC_OK) { + fprintf(stderr, "Failed to send request to memcached\n"); + return -1; + } + + // Regardless of what command we sent, this should print out the response. + show_response(c, rbuf, bufsize); + + } + + status = mcmc_disconnect(c); + // The only free'ing needed. + free(c); + + // TODO: stats example. + + // nonblocking example. + c = malloc(mcmc_size(MCMC_OPTION_BLANK)); + // reuse bufsize/rbuf. + status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_NONBLOCK); + printf("nonblock connecting...\n"); + struct pollfd pfds[1]; + if (status == MCMC_CONNECTING) { + // need to wait for socket to become writeable. + pfds[0].fd = mcmc_fd(c); + pfds[0].events = POLLOUT; + if (poll(pfds, 1, 1000) != 1) { + fprintf(stderr, "poll on connect timed out or failed\n"); + return -1; + } + int err = 0; + if (pfds[0].revents & POLLOUT && mcmc_check_nonblock_connect(c, &err) == MCMC_OK) { + printf("asynchronous connection completed: %d\n", err); + } else { + printf("failed to connect: %s\n", strerror(err)); + return -1; + } + } else { + perror("connect"); + fprintf(stderr, "bad response to nonblock connection: %d\n", status); + return -1; + } + + // TODO: check socket for errors. + + // TODO: send request + status = mcmc_send_request(c, requests[0], strlen(requests[0]), 1); + //printf("sent request: %s\n", requests[x]); + + if (status != MCMC_OK) { + fprintf(stderr, "Failed to send request to memcached\n"); + return -1; + } + + mcmc_resp_t resp; + status = mcmc_read(c, rbuf, bufsize, &resp); + // this could race and fail, depending on the system. + if (status == MCMC_WANT_READ) { + printf("got MCMC_WANT_READ from a too-fast read as expected\n"); + pfds[0].fd = mcmc_fd(c); + pfds[0].events = POLLIN; + if (poll(pfds, 1, 1000) != 1) { + fprintf(stderr, "poll on connect timed out or failed\n"); + return -1; + } + if (pfds[0].revents & POLLIN) { + printf("asynchronous read ready\n"); + } + + show_response(c, rbuf, bufsize); + } + + return 0; +} diff --git a/vendor/mcmc/mcmc.c b/vendor/mcmc/mcmc.c new file mode 100644 index 0000000..65b7e68 --- /dev/null +++ b/vendor/mcmc/mcmc.c @@ -0,0 +1,728 @@ +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <netdb.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> +#include <stdint.h> +#include <errno.h> +#include <stdio.h> + +#include "mcmc.h" + +// TODO: if there's a parse error or unknown status code, we likely have a +// protocol desync and need to disconnect. + +// NOTE: this _will_ change a bit for adding TLS support. + +// A "reasonable" minimum buffer size to work with. +// Callers are allowed to create a buffer of any size larger than this. +// TODO: Put the math/documentation in here. +// This is essentially the largest return value status line possible. +// at least doubled for wiggle room. +#define MIN_BUFFER_SIZE 2048 + +#define FLAG_BUF_IS_ERROR 0x1 +#define FLAG_BUF_IS_NUMERIC 0x2 +#define FLAG_BUF_WANTED_READ 0x4 + +#define STATE_DEFAULT 0 // looking for any kind of response +#define STATE_GET_RESP 1 // processing VALUE's until END +#define STATE_STAT_RESP 2 // processing STAT's until END +#define STATE_STAT_RESP_DONE 3 + +typedef struct mcmc_ctx { + int fd; + int gai_status; // getaddrinfo() last status. + int last_sys_error; // last syscall error (connect/etc?) + int sent_bytes_partial; // note for partially sent buffers. + int request_queue; // supposed outstanding replies. + int fail_code; // recent failure reason. + int error; // latest error code. + uint32_t status_flags; // internal only flags. + int state; + + // FIXME: s/buffer_used/buffer_filled/ ? + size_t buffer_used; // amount of bytes read into the buffer so far. + size_t buffer_request_len; // cached endpoint for current request + char *buffer_head; // buffer pointer currently in use. + char *buffer_tail; // consumed tail of the buffer. + + // request response detail. + mcmc_resp_t *resp; +} mcmc_ctx_t; + +// INTERNAL FUNCTIONS + +static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) { + char *buf = ctx->buffer_head; + // we know that "VALUE " has matched, so skip that. + char *p = buf+6; + size_t l = ctx->buffer_request_len; + + // <key> <flags> <bytes> [<cas unique>] + char *key = p; + int keylen; + p = memchr(p, ' ', l - 6); + if (p == NULL) { + // FIXME: these should return MCMC_ERR and set the internal parse + // error code. + return MCMC_PARSE_ERROR; + } + + keylen = p - key; + + // convert flags into something useful. + // FIXME: do we need to prevent overruns in strtoul? + // we know for sure the line will eventually end in a \n. + char *n = NULL; + errno = 0; + uint32_t flags = strtoul(p, &n, 10); + if ((errno == ERANGE) || (p == n) || (*n != ' ')) { + return MCMC_PARSE_ERROR; + } + p = n; + + errno = 0; + uint32_t bytes = strtoul(p, &n, 10); + if ((errno == ERANGE) || (p == n)) { + return MCMC_PARSE_ERROR; + } + p = n; + + // If next byte is a space, we read the optional CAS value. + uint64_t cas = 0; + if (*n == ' ') { + errno = 0; + cas = strtoull(p, &n, 10); + if ((errno == ERANGE) || (p == n)) { + return MCMC_PARSE_ERROR; + } + } + + // If we made it this far, we've parsed everything, stuff the details into + // the context for fetching later. + mcmc_resp_t *r = ctx->resp; + // FIXME: set to NULL if we don't have the value? + r->value = ctx->buffer_tail; + r->vlen = bytes + 2; // add in the \r\n + int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + if (buffer_remain >= r->vlen) { + r->vlen_read = r->vlen; + ctx->buffer_tail += r->vlen; + } else { + r->vlen_read = buffer_remain; + } + r->key = key; + r->klen = keylen; + r->flags = flags; + r->cas = cas; + r->type = MCMC_RESP_GET; + ctx->state = STATE_GET_RESP; + + // NOTE: if value_offset < buffer_used, has part of the value in the + // buffer already. + + return MCMC_OK; +} + +// FIXME: This is broken for ASCII multiget. +// if we get VALUE back, we need to stay in ASCII GET read mode until an END +// is seen. +static int _mcmc_parse_response(mcmc_ctx_t *ctx) { + char *buf = ctx->buffer_head; + char *cur = buf; + size_t l = ctx->buffer_request_len; + int rlen; // response code length. + int more = 0; + mcmc_resp_t *r = ctx->resp; + r->reslen = ctx->buffer_request_len; + r->type = MCMC_RESP_GENERIC; + + // walk until the \r\n + while (l-- > 2) { + if (*cur == ' ') { + more = 1; + break; + } + cur++; + } + rlen = cur - buf; + + // incr/decr returns a number with no code :( + // not checking length first since buf must have at least one char to + // enter this function. + if (buf[0] >= '0' && buf[0] <= '9') { + // TODO: parse it as a number on request. + // TODO: validate whole thing as digits here? + ctx->status_flags |= FLAG_BUF_IS_NUMERIC; + r->type = MCMC_RESP_NUMERIC; + return MCMC_OK; + } + + if (rlen < 2) { + ctx->error = MCMC_PARSE_ERROR_SHORT; + return MCMC_ERR; + } + + int rv = MCMC_OK; + int code = MCMC_CODE_OK; + switch (rlen) { + case 2: + // meta, "OK" + // FIXME: adding new return codes would make the client completely + // fail. The rest of the client is agnostic to requests/flags for + // meta. + // can we make it agnostic for return codes outside of "read this + // data" types? + // As-is it should fail down to the "send the return code to the + // user". not sure that's right. + r->type = MCMC_RESP_META; + switch (buf[0]) { + case 'E': + if (buf[1] == 'N') { + code = MCMC_CODE_MISS; + // TODO: RESP type + } else if (buf[1] == 'X') { + code = MCMC_CODE_EXISTS; + } + break; + case 'H': + if (buf[1] == 'D') { + // typical meta response. + code = MCMC_CODE_OK; + } + break; + case 'M': + if (buf[1] == 'N') { + // specific return code so user can see pipeline end. + code = MCMC_CODE_NOP; + } else if (buf[1] == 'E') { + // ME is the debug output line. + // TODO: this just gets returned as an rline? + // specific code? specific type? + // ME <key> <key=value debug line> + rv = MCMC_OK; + } + break; + case 'N': + if (buf[1] == 'F') { + code = MCMC_CODE_NOT_FOUND; + } else if (buf[1] == 'S') { + code = MCMC_CODE_NOT_STORED; + } + break; + case 'O': + if (buf[1] == 'K') { + // Used by many random management commands + r->type = MCMC_RESP_GENERIC; + } + break; + case 'V': + if (buf[1] == 'A') { + // VA <size> <flags>*\r\n + if (more) { + errno = 0; + char *n = NULL; + uint32_t vsize = strtoul(cur, &n, 10); + if ((errno == ERANGE) || (cur == n)) { + rv = MCMC_ERR; + } else { + r->value = ctx->buffer_tail; + r->vlen = vsize + 2; // tag in the \r\n. + // FIXME: macro. + int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + if (buffer_remain >= r->vlen) { + r->vlen_read = r->vlen; + ctx->buffer_tail += r->vlen; + } else { + r->vlen_read = buffer_remain; + } + cur = n; + if (*cur != ' ') { + more = 0; + } + } + } else { + rv = MCMC_ERR; + } + } + break; + } + // maybe: if !rv and !fail, do something special? + // if (more), there are flags. shove them in the right place. + if (more) { + r->rline = cur+1; // eat the space. + r->rlen = l-1; + } else { + r->rline = NULL; + r->rlen = 0; + } + break; + case 3: + if (memcmp(buf, "END", 3) == 0) { + // Either end of STAT results, or end of ascii GET key list. + ctx->state = STATE_DEFAULT; + // FIXME: caller needs to understand if this is a real miss. + code = MCMC_CODE_MISS; + r->type = MCMC_RESP_END; + rv = MCMC_OK; + } + break; + case 4: + if (memcmp(buf, "STAT", 4) == 0) { + r->type = MCMC_RESP_STAT; + ctx->state = STATE_STAT_RESP; + // TODO: initialize stat reader mode. + } + break; + case 5: + if (memcmp(buf, "VALUE", 5) == 0) { + if (more) { + // <key> <flags> <bytes> [<cas unique>] + rv = _mcmc_parse_value_line(ctx); + } else { + rv = MCMC_ERR; // FIXME: parse error. + } + } + break; + case 6: + if (memcmp(buf, "STORED", 6) == 0) { + code = MCMC_CODE_STORED; + } else if (memcmp(buf, "EXISTS", 6) == 0) { + code = MCMC_CODE_EXISTS; + // TODO: type -> ASCII? + } + break; + case 7: + if (memcmp(buf, "DELETED", 7) == 0) { + code = MCMC_CODE_DELETED; + } else if (memcmp(buf, "TOUCHED", 7) == 0) { + code = MCMC_CODE_TOUCHED; + } else if (memcmp(buf, "VERSION", 7) == 0) { + code = MCMC_CODE_VERSION; + r->type = MCMC_RESP_VERSION; + // TODO: prep the version line for return + } + break; + case 9: + if (memcmp(buf, "NOT_FOUND", 9) == 0) { + code = MCMC_CODE_NOT_FOUND; + } + break; + case 10: + if (memcmp(buf, "NOT_STORED", 10) == 0) { + code = MCMC_CODE_NOT_STORED; + } + break; + default: + // Unknown code, assume error. + break; + } + + r->code = code; + if (rv == -1) { + // TODO: Finish this. + ctx->status_flags |= FLAG_BUF_IS_ERROR; + rv = MCMC_ERR; + } + + return rv; +} + +// EXTERNAL API + +int mcmc_fd(void *c) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + return ctx->fd; +} + +size_t mcmc_size(int options) { + return sizeof(mcmc_ctx_t); +} + +// Allow returning this dynamically based on options set. +// FIXME: it might be more flexible to call this after mcmc_connect()... +// but this is probably more convenient for the caller if it's less dynamic. +size_t mcmc_min_buffer_size(int options) { + return MIN_BUFFER_SIZE; +} + +char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) { + mcmc_ctx_t *ctx = c; + char *b = buf + ctx->buffer_used; + *bufremain = bufsize - ctx->buffer_used; + return b; +} + +// Directly parse a buffer with read data of size len. +// r->reslen + r->vlen_read is the bytes consumed from the buffer read. +// Caller manages how to retry if MCMC_WANT_READ or an error happens. +// FIXME: not sure if to keep this command to a fixed buffer size, or continue +// to use the ctx->buffer_used bits... if we keep the buffer_used stuff caller can +// loop without memmove'ing the buffer? +int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r) { + mcmc_ctx_t *ctx = c; + char *el; + ctx->buffer_used += read; + + el = memchr(buf, '\n', ctx->buffer_used); + if (el == NULL) { + return MCMC_WANT_READ; + } + + memset(r, 0, sizeof(*r)); + + // Consume through the newline. + // buffer_tail now points to where value could start. + // FIXME: ctx->value ? + ctx->buffer_tail = el+1; + + // FIXME: the server must be stricter in what it sends back. should always + // have a \r. check for it and fail? + ctx->buffer_request_len = ctx->buffer_tail - buf; + // leave the \r\n in the line end cache. + ctx->buffer_head = buf; + // TODO: handling for nonblock case. + + // We have a result line. Now pass it through the parser. + // Then we indicate to the user that a response is ready. + ctx->resp = r; + return _mcmc_parse_response(ctx); +} + +/*** Functions wrapping syscalls **/ + +// TODO: should be able to flip between block and nonblock. + +// used for checking on async connections. +int mcmc_check_nonblock_connect(void *c, int *err) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + socklen_t errsize = sizeof(*err); + if (getsockopt(ctx->fd, SOL_SOCKET, SO_ERROR, err, &errsize) == 0) { + if (*err == 0) { + return MCMC_OK; + } + } else { + // getsockopt failed. still need to pass up the error. + *err = errno; + } + + return MCMC_ERR; +} + +// TODO: +// - option for connecting 4 -> 6 or 6 -> 4 +// connect_unix() +// connect_bind_tcp() +// ^ fill an internal struct from the stack and call into this central +// connect? +int mcmc_connect(void *c, char *host, char *port, int options) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + + int s; + int sock; + int res = MCMC_CONNECTED; + struct addrinfo hints; + struct addrinfo *ai; + struct addrinfo *next; + + // Since our cx memory was likely malloc'ed, ensure we start clear. + memset(ctx, 0, sizeof(mcmc_ctx_t)); + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + s = getaddrinfo(host, port, &hints, &ai); + + if (s != 0) { + hints.ai_family = AF_INET6; + s = getaddrinfo(host, port, &hints, &ai); + if (s != 0) { + // TODO: gai_strerror(s) + ctx->gai_status = s; + res = MCMC_ERR; + goto end; + } + } + + for (next = ai; next != NULL; next = next->ai_next) { + sock = socket(next->ai_family, next->ai_socktype, + next->ai_protocol); + if (sock == -1) + continue; + + // TODO: NONBLOCK + if (options & MCMC_OPTION_NONBLOCK) { + int flags = fcntl(sock, F_GETFL); + if (flags < 0) { + res = MCMC_ERR; + close(sock); + goto end; + } + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) { + res = MCMC_ERR; + close(sock); + goto end; + } + res = MCMC_CONNECTING; + + if (connect(sock, next->ai_addr, next->ai_addrlen) != -1) { + if (errno == EINPROGRESS) { + break; // We're good, stop the loop. + } + } + + break; + } else { + // TODO: BIND local port. + if (connect(sock, next->ai_addr, next->ai_addrlen) != -1) + break; + } + + close(sock); + } + + // TODO: cache last connect status code? + if (next == NULL) { + res = MCMC_ERR; + goto end; + } + + ctx->fd = sock; +end: + if (ai) { + freeaddrinfo(ai); + } + return res; +} + +// NOTE: if WANT_WRITE returned, call with same arguments. +// FIXME: len -> size_t? +// TODO: rename to mcmc_request_send +int mcmc_send_request(void *c, const char *request, int len, int count) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + + // adjust our send buffer by how much has already been sent. + const char *r = request + ctx->sent_bytes_partial; + int l = len - ctx->sent_bytes_partial; + int sent = send(ctx->fd, r, l, 0); + if (sent == -1) { + // implicitly handle nonblock mode. + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return MCMC_WANT_WRITE; + } else { + return MCMC_ERR; + } + } + + if (sent < len) { + // can happen anytime, but mostly in nonblocking mode. + ctx->sent_bytes_partial += sent; + return MCMC_WANT_WRITE; + } else { + ctx->request_queue += count; + ctx->sent_bytes_partial = 0; + } + + return MCMC_OK; +} + +// TODO: pretty sure I don't want this function chewing on a submitted iov +// stack, but it might make for less client code :( +// so for now, lets not. +int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + // need to track sent vs tosend to know when to update counters. + ssize_t tosend = 0; + for (int i = 0; i < iovcnt; i++) { + tosend += iov[i].iov_len; + } + + *sent = writev(ctx->fd, iov, iovcnt); + if (*sent == -1) { + // implicitly handle nonblock mode. + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return MCMC_WANT_WRITE; + } else { + return MCMC_ERR; + } + } + + if (*sent < tosend) { + // can happen anytime, but mostly in nonblocking mode. + return MCMC_WANT_WRITE; + } else { + // FIXME: user has to keep submitting the same count value... + // should decide on whether or not to give up on this. + ctx->request_queue += count; + } + + return MCMC_OK; +} + +// TODO: avoid recv if we have bytes in the buffer. +int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + char *el; + memset(r, 0, sizeof(*r)); + + // If there's still data in the buffer try to use it before potentially + // hanging on the network read. + // Also skip this check if we specifically wanted more bytes from net. + if (ctx->buffer_used && !(ctx->status_flags & FLAG_BUF_WANTED_READ)) { + el = memchr(buf, '\n', ctx->buffer_used); + if (el) { + goto parse; + } + } + + // adjust buffer by how far we've already consumed. + char *b = buf + ctx->buffer_used; + size_t l = bufsize - ctx->buffer_used; + + int read = recv(ctx->fd, b, l, 0); + if (read == 0) { + return MCMC_NOT_CONNECTED; + } else if (read == -1) { + // implicitly handle nonblocking configurations. + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return MCMC_WANT_READ; + } else { + return MCMC_ERR; + } + } + + ctx->buffer_used += read; + + // Always scan from the start of the original buffer. + el = memchr(buf, '\n', ctx->buffer_used); + if (!el) { + // FIXME: error if buffer is full but no \n is found. + ctx->status_flags |= FLAG_BUF_WANTED_READ; + return MCMC_WANT_READ; + } +parse: + // Consume through the newline. + // buffer_tail now points to where a value could start. + ctx->buffer_tail = el+1; + + // FIXME: the server must be stricter in what it sends back. should always + // have a \r. check for it and fail? + ctx->buffer_request_len = ctx->buffer_tail - buf; + // leave the \r\n in the line end cache. + ctx->buffer_head = buf; + // TODO: handling for nonblock case. + + // We have a result line. Now pass it through the parser. + // Then we indicate to the user that a response is ready. + ctx->resp = r; + return _mcmc_parse_response(ctx); +} + +void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen) { + code[0] = '\0'; + msg[0] = '\0'; +} + +int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + + // If the distance between tail/head is smaller than what we read into the + // main buffer, we have some value to copy out. + int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + if (leftover > 0) { + int tocopy = leftover > vsize ? vsize : leftover; + memcpy(val + *read, ctx->buffer_tail, tocopy); + ctx->buffer_tail += tocopy; + *read += tocopy; + if (leftover > tocopy) { + // FIXME: think we need a specific code for "value didn't fit" + return MCMC_WANT_READ; + } + } + + return MCMC_OK; +} + +// read into the buffer, up to a max size of vsize. +// will read (vsize-read) into the buffer pointed to by (val+read). +// you are able to stream the value into different buffers, or process the +// value and reuse the same buffer, by adjusting vsize and *read between +// calls. +// vsize must not be larger than the remaining value size pending read. +int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + size_t l; + + // If the distance between tail/head is smaller than what we read into the + // main buffer, we have some value to copy out. + int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head); + if (leftover > 0) { + int tocopy = leftover > vsize ? vsize : leftover; + memcpy(val + *read, ctx->buffer_tail, tocopy); + ctx->buffer_tail += tocopy; + *read += tocopy; + if (leftover > tocopy) { + // FIXME: think we need a specific code for "value didn't fit" + return MCMC_WANT_READ; + } + } + + char *v = val + *read; + l = vsize - *read; + + int r = recv(ctx->fd, v, l, 0); + if (r == 0) { + // TODO: some internal disconnect work? + return MCMC_NOT_CONNECTED; + } + // FIXME: EAGAIN || EWOULDBLOCK! + if (r == -1) { + return MCMC_ERR; + } + + *read += r; + + if (*read < vsize) { + return MCMC_WANT_READ; + } else { + return MCMC_OK; + } +} + +char *mcmc_buffer_consume(void *c, int *remain) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + ctx->buffer_used -= ctx->buffer_tail - ctx->buffer_head; + int used = ctx->buffer_used; + char *newbuf = ctx->buffer_tail; + + // FIXME: request_queue-- is in the wrong place. + // TODO: which of these _must_ be reset between requests? I think very + // little? + ctx->request_queue--; + ctx->status_flags = 0; + ctx->buffer_head = NULL; + ctx->buffer_tail = NULL; + + if (used) { + *remain = used; + return newbuf; + } else { + return NULL; + } +} + +int mcmc_disconnect(void *c) { + mcmc_ctx_t *ctx = (mcmc_ctx_t *)c; + + // FIXME: I forget if 0 can be valid. + if (ctx->fd != 0) { + close(ctx->fd); + return MCMC_OK; + } else { + return MCMC_NOT_CONNECTED; + } +} diff --git a/vendor/mcmc/mcmc.h b/vendor/mcmc/mcmc.h new file mode 100644 index 0000000..1769228 --- /dev/null +++ b/vendor/mcmc/mcmc.h @@ -0,0 +1,91 @@ +#ifndef MCMC_HEADER +#define MCMC_HEADER + +#define MCMC_OK 0 +#define MCMC_ERR -1 +#define MCMC_NOT_CONNECTED 1 +#define MCMC_CONNECTED 2 +#define MCMC_CONNECTING 3 // nonblock mode. +#define MCMC_WANT_WRITE 4 +#define MCMC_WANT_READ 5 +#define MCMC_HAS_RESULT 7 +// TODO: either internally set a flag for "ok" or "not ok" and use a func, +// or use a bitflag here (1<<6) for "OK", (1<<5) for "FAIL", etc. +// or, we directly return "OK" or "FAIL" and you can ask for specific error. +#define MCMC_CODE_STORED 8 +#define MCMC_CODE_EXISTS 9 +#define MCMC_CODE_DELETED 10 +#define MCMC_CODE_TOUCHED 11 +#define MCMC_CODE_VERSION 12 +#define MCMC_CODE_NOT_FOUND 13 +#define MCMC_CODE_NOT_STORED 14 +#define MCMC_CODE_OK 15 +#define MCMC_CODE_NOP 16 +#define MCMC_PARSE_ERROR_SHORT 17 +#define MCMC_PARSE_ERROR 18 +#define MCMC_CODE_MISS 19 // FIXME + + +// response types +#define MCMC_RESP_GET 100 +#define MCMC_RESP_META 101 +#define MCMC_RESP_STAT 102 +#define MCMC_RESP_GENERIC 104 +#define MCMC_RESP_END 105 +#define MCMC_RESP_VERSION 106 +#define MCMC_RESP_NUMERIC 107 // for weird incr/decr syntax. + +#define MCMC_OPTION_BLANK 0 +#define MCMC_OPTION_NONBLOCK 1 + +// convenience defines. if you want to save RAM you can set these smaller and +// error handler will only copy what you ask for. +#define MCMC_ERROR_CODE_MAX 32 +#define MCMC_ERROR_MSG_MAX 512 + +typedef struct { + unsigned short type; + unsigned short code; + char *value; // pointer to start of value in buffer. + size_t reslen; // full length of the response line + size_t vlen_read; // amount of value that was in supplied buffer. + size_t vlen; // reslen + vlen is the full length of the response. + union { + // META response + struct { + char *rline; // start of meta response line. + size_t rlen; + }; + // GET response + struct { + char *key; + size_t klen; + uint32_t flags; + uint64_t cas; + // TODO: value info + }; + // STAT response + struct { + char *stat; + size_t slen; + }; + }; +} mcmc_resp_t; + +int mcmc_fd(void *c); +size_t mcmc_size(int options); +size_t mcmc_min_buffer_size(int options); +char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain); +int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r); +int mcmc_connect(void *c, char *host, char *port, int options); +int mcmc_check_nonblock_connect(void *c, int *err); +int mcmc_send_request(void *c, const char *request, int len, int count); +int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count); +int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r); +int mcmc_read_value(void *c, char *val, const size_t vsize, int *read); +int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read); +char *mcmc_buffer_consume(void *c, int *remain); +int mcmc_disconnect(void *c); +void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen); + +#endif |