summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--BUILD13
-rw-r--r--Makefile.am19
-rw-r--r--configure.ac20
-rw-r--r--logger.c37
-rw-r--r--logger.h14
-rw-r--r--memcached.c68
-rw-r--r--memcached.h31
-rw-r--r--proto_proxy.c4191
-rw-r--r--proto_proxy.h26
-rw-r--r--proto_text.c14
-rw-r--r--proto_text.h1
-rw-r--r--t/lib/MemcachedTest.pm8
-rw-r--r--t/proxy.t263
-rw-r--r--t/startfile.lua280
-rw-r--r--thread.c38
-rw-r--r--vendor/.gitignore4
-rw-r--r--vendor/Makefile10
-rwxr-xr-xvendor/fetch.sh5
-rw-r--r--vendor/lua/.gitignore2
-rw-r--r--vendor/mcmc/LICENSE30
-rw-r--r--vendor/mcmc/Makefile13
-rw-r--r--vendor/mcmc/README.md52
-rw-r--r--vendor/mcmc/example.c214
-rw-r--r--vendor/mcmc/mcmc.c728
-rw-r--r--vendor/mcmc/mcmc.h91
25 files changed, 6162 insertions, 10 deletions
diff --git a/BUILD b/BUILD
index c29f8ed..5e3730a 100644
--- a/BUILD
+++ b/BUILD
@@ -1,3 +1,5 @@
+See below if building the proxy
+
To build memcached in your machine from local repo you will have to install
autotools, automake and libevent. In a debian based system that will look
like this
@@ -22,3 +24,14 @@ You can telnet into that memcached to ensure it is up and running
telnet 127.0.0.1 11211
stats
+
+IF BUILDING PROXY, AN EXTRA STEP IS NECESSARY:
+
+cd memcached
+cd vendor
+./fetch.sh
+cd ..
+./autogen.sh
+./configure --enable-proxy
+make
+make test
diff --git a/Makefile.am b/Makefile.am
index 005df7f..e2c1bd8 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -51,6 +51,10 @@ if ENABLE_SASL
memcached_SOURCES += sasl_defs.c
endif
+if ENABLE_PROXY
+memcached_SOURCES += proto_proxy.c proto_proxy.h vendor/mcmc/mcmc.h
+endif
+
if ENABLE_EXTSTORE
memcached_SOURCES += extstore.c extstore.h \
crc32c.c crc32c.h \
@@ -90,6 +94,16 @@ memcached_debug_DEPENDENCIES += memcached_debug_dtrace.o
CLEANFILES += memcached_dtrace.o memcached_debug_dtrace.o
endif
+if ENABLE_PROXY
+memcached_LDADD += vendor/lua/src/liblua.a vendor/mcmc/mcmc.o
+memcached_debug_LDADD += vendor/lua/src/liblua.a vendor/mcmc/mcmc.o
+endif
+
+if ENABLE_PROXY_URING
+memcached_LDADD += vendor/liburing/src/liburing.a
+memcached_debug_LDADD += vendor/liburing/src/liburing.a
+endif
+
memcached_debug_CFLAGS += -DMEMCACHED_DEBUG
memcached_dtrace.h: memcached_dtrace.d
@@ -108,6 +122,11 @@ memcached_debug_dtrace.o: $(memcached_debug_OBJECTS)
SUBDIRS = doc
DIST_DIRS = scripts
EXTRA_DIST = doc scripts t memcached.spec memcached_dtrace.d version.m4 README.md LICENSE.bipbuffer
+EXTRA_DIST += vendor/Makefile vendor/lua vendor/mcmc
+
+if ENABLE_PROXY
+SUBDIRS += vendor
+endif
MOSTLYCLEANFILES = *.gcov *.gcno *.gcda *.tcov
diff --git a/configure.ac b/configure.ac
index 0985f07..a0851f2 100644
--- a/configure.ac
+++ b/configure.ac
@@ -114,6 +114,12 @@ AC_ARG_ENABLE(static,
AC_ARG_ENABLE(unix_socket,
[AS_HELP_STRING([--disable-unix-socket], [Disable unix domain socket])])
+AC_ARG_ENABLE(proxy,
+ [AS_HELP_STRING([--enable-proxy], [Enable proxy code EXPERIMENTAL])])
+
+AC_ARG_ENABLE(proxy-uring,
+ [AS_HELP_STRING([--enable-proxy-uring], [Enable proxy io_uring code EXPERIMENTAL])])
+
dnl **********************************************************************
dnl DETECT_SASL_CB_GETCONF
dnl
@@ -217,6 +223,18 @@ if test "x$enable_unix_socket" = "xno"; then
AC_DEFINE([DISABLE_UNIX_SOCKET],1,[Set to nonzero if you want to disable unix domain socket])
fi
+if test "x$enable_proxy" = "xyes"; then
+ AC_DEFINE([PROXY],1,[Set to nonzero if you want to enable proxy code])
+ CPPFLAGS="-Ivendor/lua/src -Ivendor/liburing/src/include $CPPFLAGS"
+ dnl lua needs math lib.
+ LIBS="$LIBS -lm -ldl"
+fi
+
+if test "x$enable_proxy_uring" = "xyes"; then
+ AC_DEFINE([HAVE_LIBURING],1,[Set to nonzero if you want to enable proxy uring handling])
+ CPPFLAGS="-Ivendor/liburing/src/include $CPPFLAGS"
+fi
+
AM_CONDITIONAL([BUILD_DTRACE],[test "$build_dtrace" = "yes"])
AM_CONDITIONAL([DTRACE_INSTRUMENT_OBJ],[test "$dtrace_instrument_obj" = "yes"])
AM_CONDITIONAL([ENABLE_SASL],[test "$enable_sasl" = "yes"])
@@ -226,6 +244,8 @@ AM_CONDITIONAL([ENABLE_TLS],[test "$enable_tls" = "yes"])
AM_CONDITIONAL([ENABLE_ASAN],[test "$enable_asan" = "yes"])
AM_CONDITIONAL([ENABLE_STATIC],[test "$enable_static" = "yes"])
AM_CONDITIONAL([DISABLE_UNIX_SOCKET],[test "$enable_unix_socket" = "no"])
+AM_CONDITIONAL([ENABLE_PROXY],[test "$enable_proxy" = "yes"])
+AM_CONDITIONAL([ENABLE_PROXY_URING],[test "$enable_proxy_uring" = "yes"])
AC_SUBST(DTRACE)
diff --git a/logger.c b/logger.c
index 667f3c7..0a6cdff 100644
--- a/logger.c
+++ b/logger.c
@@ -303,6 +303,34 @@ static int _logger_parse_cce(logentry *e, char *scratch) {
return total;
}
+#ifdef PROXY
+static void _logger_log_proxy_raw(logentry *e, const entry_details *d, const void *entry, va_list ap) {
+ struct timeval start = va_arg(ap, struct timeval);
+ char *cmd = va_arg(ap, char *);
+ unsigned short type = va_arg(ap, int);
+ unsigned short code = va_arg(ap, int);
+
+ struct logentry_proxy_raw *le = (void *)e->data;
+ struct timeval end;
+ gettimeofday(&end, NULL);
+ le->type = type;
+ le->code = code;
+ le->elapsed = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
+ memcpy(le->cmd, cmd, 9);
+}
+
+static int _logger_parse_prx_raw(logentry *e, char *scratch) {
+ int total;
+ struct logentry_proxy_raw *le = (void *)e->data;
+
+ total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
+ "ts=%d.%d gid=%llu type=proxy_raw elapsed=%lu cmd=%s type=%d code=%d\n",
+ (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
+ le->elapsed, le->cmd, le->type, le->code);
+ return total;
+}
+#endif
+
/* Should this go somewhere else? */
static const entry_details default_entries[] = {
[LOGGER_ASCII_CMD] = {512, LOG_RAWCMDS, _logger_log_text, _logger_parse_text, "<%d %s"},
@@ -338,6 +366,15 @@ static const entry_details default_entries[] = {
"type=compact_fraginfo ratio=%.2f bytes=%lu"
},
#endif
+#ifdef PROXY
+ [LOGGER_PROXY_CONFIG] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
+ "type=proxy_conf status=%s"
+ },
+ [LOGGER_PROXY_RAW] = {512, LOG_RAWCMDS, _logger_log_proxy_raw, _logger_parse_prx_raw, NULL},
+ [LOGGER_PROXY_ERROR] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
+ "type=proxy_error msg=%s"
+ },
+#endif
};
/*************************
diff --git a/logger.h b/logger.h
index 039f443..6def6f3 100644
--- a/logger.h
+++ b/logger.h
@@ -31,6 +31,11 @@ enum log_entry_type {
LOGGER_COMPACT_END,
LOGGER_COMPACT_FRAGINFO,
#endif
+#ifdef PROXY
+ LOGGER_PROXY_CONFIG,
+ LOGGER_PROXY_RAW,
+ LOGGER_PROXY_ERROR,
+#endif
};
enum logger_ret_type {
@@ -106,7 +111,14 @@ struct logentry_conn_event {
int sfd;
struct sockaddr_in6 addr;
};
-
+#ifdef PROXY
+struct logentry_proxy_raw {
+ unsigned short type;
+ unsigned short code;
+ long elapsed; // elapsed time in usec
+ char cmd[8];
+};
+#endif
/* end intermediary structures */
/* WARNING: cuddled items aren't compatible with warm restart. more code
diff --git a/memcached.c b/memcached.c
index 8bbdccd..bf6617d 100644
--- a/memcached.c
+++ b/memcached.c
@@ -56,6 +56,7 @@
#include "proto_text.h"
#include "proto_bin.h"
+#include "proto_proxy.h"
#if defined(__FreeBSD__)
#include <sys/sysctl.h>
@@ -85,7 +86,6 @@ static enum try_read_result try_read_udp(conn *c);
static int start_conn_timeout_thread();
-
/* stats */
static void stats_init(void);
static void conn_to_str(const conn *c, char *addr, char *svr_addr);
@@ -500,6 +500,11 @@ static const char *prot_text(enum protocol prot) {
case negotiating_prot:
rv = "auto-negotiate";
break;
+#ifdef PROXY
+ case proxy_prot:
+ rv = "proxy";
+ break;
+#endif
}
return rv;
}
@@ -797,6 +802,11 @@ conn *conn_new(const int sfd, enum conn_states init_state,
case negotiating_prot:
c->try_read_command = try_read_command_negotiate;
break;
+#ifdef PROXY
+ case proxy_prot:
+ c->try_read_command = try_read_command_proxy;
+ break;
+#endif
}
}
@@ -1417,13 +1427,22 @@ static void reset_cmd_handler(conn *c) {
static void complete_nread(conn *c) {
assert(c != NULL);
+#ifdef PROXY
+ assert(c->protocol == ascii_prot
+ || c->protocol == binary_prot
+ || c->protocol == proxy_prot);
+#else
assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
-
+#endif
if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
+#ifdef PROXY
+ } else if (c->protocol == proxy_prot) {
+ complete_nread_proxy(c);
+#endif
}
}
@@ -1788,6 +1807,12 @@ void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore);
}
#endif
+#ifdef PROXY
+ if (settings.proxy_enabled) {
+ APPEND_STAT("proxy_conn_requests", "%llu", (unsigned long long)thread_stats.proxy_conn_requests);
+ APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors);
+ }
+#endif
APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
@@ -1843,6 +1868,9 @@ void server_stats(ADD_STAT add_stats, conn *c) {
#ifdef EXTSTORE
storage_stats(add_stats, c);
#endif
+#ifdef PROXY
+ proxy_stats(add_stats, c);
+#endif
#ifdef TLS
if (settings.ssl_enabled) {
if (settings.ssl_session_cache) {
@@ -3801,6 +3829,9 @@ static void clock_handler(const evutil_socket_t fd, const short which, void *arg
settings.sig_hup = false;
authfile_load(settings.auth_file);
+#ifdef PROXY
+ proxy_start_reload(settings.proxy_ctx);
+#endif
}
evtimer_set(&clockevent, clock_handler, 0);
@@ -3999,6 +4030,9 @@ static void usage(void) {
settings.ext_max_frag, settings.slab_automove_freeratio);
verify_default("ext_item_age", settings.ext_item_age == UINT_MAX);
#endif
+#ifdef PROXY
+ printf(" - proxy_config: path to lua config file.\n");
+#endif
#ifdef TLS
printf(" - ssl_chain_cert: certificate chain file in PEM format\n"
" - ssl_key: private key, if not part of the -ssl_chain_cert\n"
@@ -4663,6 +4697,9 @@ int main (int argc, char **argv) {
SSL_SESSION_CACHE,
SSL_MIN_VERSION,
#endif
+#ifdef PROXY
+ PROXY_CONFIG,
+#endif
#ifdef MEMCACHED_DEBUG
RELAXED_PRIVILEGES,
#endif
@@ -4718,6 +4755,9 @@ int main (int argc, char **argv) {
[SSL_SESSION_CACHE] = "ssl_session_cache",
[SSL_MIN_VERSION] = "ssl_min_version",
#endif
+#ifdef PROXY
+ [PROXY_CONFIG] = "proxy_config",
+#endif
#ifdef MEMCACHED_DEBUG
[RELAXED_PRIVILEGES] = "relaxed_privileges",
#endif
@@ -5461,6 +5501,22 @@ int main (int argc, char **argv) {
}
settings.read_buf_mem_limit *= 1024 * 1024; /* megabytes */
break;
+#ifdef PROXY
+ case PROXY_CONFIG:
+ if (subopts_value == NULL) {
+ fprintf(stderr, "Missing proxy_config file argument\n");
+ return 1;
+ }
+ if (protocol_specified) {
+ fprintf(stderr, "Cannot specify a protocol with proxy mode enabled\n");
+ return 1;
+ }
+ settings.proxy_startfile = strdup(subopts_value);
+ settings.proxy_enabled = true;
+ settings.binding_protocol = proxy_prot;
+ protocol_specified = true;
+ break;
+#endif
#ifdef MEMCACHED_DEBUG
case RELAXED_PRIVILEGES:
settings.relaxed_privileges = true;
@@ -5868,6 +5924,14 @@ int main (int argc, char **argv) {
exit(EX_OSERR);
}
/* start up worker threads if MT mode */
+#ifdef PROXY
+ if (settings.proxy_enabled) {
+ proxy_init();
+ if (proxy_load_config(settings.proxy_ctx) != 0) {
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
#ifdef EXTSTORE
slabs_set_storage(storage);
memcached_thread_init(settings.num_threads, storage);
diff --git a/memcached.h b/memcached.h
index 12385db..b7d4ad5 100644
--- a/memcached.h
+++ b/memcached.h
@@ -223,7 +223,10 @@ enum bin_substates {
enum protocol {
ascii_prot = 3, /* arbitrary value. */
binary_prot,
- negotiating_prot /* Discovering the protocol */
+ negotiating_prot, /* Discovering the protocol */
+#ifdef PROXY
+ proxy_prot,
+#endif
};
enum network_transport {
@@ -329,6 +332,12 @@ struct slab_stats {
X(badcrc_from_extstore)
#endif
+#ifdef PROXY
+#define PROXY_THREAD_STATS_FIELDS \
+ X(proxy_conn_requests) \
+ X(proxy_conn_errors)
+#endif
+
/**
* Stats stored per-thread.
*/
@@ -339,6 +348,9 @@ struct thread_stats {
#ifdef EXTSTORE
EXTSTORE_THREAD_STATS_FIELDS
#endif
+#ifdef PROXY
+ PROXY_THREAD_STATS_FIELDS
+#endif
#undef X
struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
uint64_t lru_hits[POWER_LARGEST];
@@ -501,6 +513,11 @@ struct settings {
#endif
int num_napi_ids; /* maximum number of NAPI IDs */
char *memory_file; /* warm restart memory file path */
+#ifdef PROXY
+ bool proxy_enabled;
+ char *proxy_startfile; /* lua file to run when workers start */
+ void *proxy_ctx; /* proxy's state context */
+#endif
};
extern struct stats stats;
@@ -628,6 +645,7 @@ typedef struct {
#define IO_QUEUE_NONE 0
#define IO_QUEUE_EXTSTORE 1
+#define IO_QUEUE_PROXY 2
typedef struct _io_pending_t io_pending_t;
typedef struct io_queue_s io_queue_t;
@@ -690,7 +708,12 @@ typedef struct {
char *ssl_wbuf;
#endif
int napi_id; /* napi id associated with this thread */
-
+#ifdef PROXY
+ void *L;
+ void *proxy_hooks;
+ void *proxy_stats;
+ // TODO: add ctx object so we can attach to queue.
+#endif
} LIBEVENT_THREAD;
/**
@@ -905,6 +928,9 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, void *arg);
void redispatch_conn(conn *c);
void timeout_conn(conn *c);
+#ifdef PROXY
+void proxy_reload_notify(LIBEVENT_THREAD *t);
+#endif
void return_io_pending(io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl);
@@ -945,6 +971,7 @@ void STATS_UNLOCK(void);
void threadlocal_stats_reset(void);
void threadlocal_stats_aggregate(struct thread_stats *stats);
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
+LIBEVENT_THREAD *get_worker_thread(int id);
/* Stat processing functions */
void append_stat(const char *name, ADD_STAT add_stats, conn *c,
diff --git a/proto_proxy.c b/proto_proxy.c
new file mode 100644
index 0000000..428e01d
--- /dev/null
+++ b/proto_proxy.c
@@ -0,0 +1,4191 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/*
+ * Functions for handling the proxy layer. wraps text protocols
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <lauxlib.h>
+
+#include "config.h"
+
+#if defined(__linux__)
+#define USE_EVENTFD 1
+#include <sys/eventfd.h>
+#endif
+
+#ifdef HAVE_LIBURING
+#include <liburing.h>
+#include <poll.h> // POLLOUT for liburing.
+#define PRING_QUEUE_SQ_ENTRIES 2048
+#define PRING_QUEUE_CQ_ENTRIES 16384
+#endif
+
+#include "memcached.h"
+#include "proto_proxy.h"
+#include "proto_text.h"
+#include "murmur3_hash.h"
+#include "queue.h"
+#define XXH_INLINE_ALL // modifier for xxh3's include below
+#include "xxhash.h"
+
+#ifdef PROXY_DEBUG
+#define P_DEBUG(...) \
+ do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+#else
+#define P_DEBUG(...)
+#endif
+
+#define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex);
+#define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex);
+#define WSTAT_INCR(c, stat, amount) { \
+ pthread_mutex_lock(&c->thread->stats.mutex); \
+ c->thread->stats.stat += amount; \
+ pthread_mutex_unlock(&c->thread->stats.mutex); \
+}
+#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
+#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
+#define STAT_INCR(ctx, stat, amount) { \
+ pthread_mutex_lock(&ctx->stats_lock); \
+ ctx->global_stats.stat += amount; \
+ pthread_mutex_unlock(&ctx->stats_lock); \
+}
+
+#define STAT_DECR(ctx, stat, amount) { \
+ pthread_mutex_lock(&ctx->stats_lock); \
+ ctx->global_stats.stat -= amount; \
+ pthread_mutex_unlock(&ctx->stats_lock); \
+}
+
+// FIXME: do include dir properly.
+#include "vendor/mcmc/mcmc.h"
+
+// Note: value created from thin air. Could be shorter.
+#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2
+
+#define ENDSTR "END\r\n"
+#define ENDLEN sizeof(ENDSTR)-1
+
+#define MCP_THREAD_UPVALUE 1
+#define MCP_ATTACH_UPVALUE 2
+#define MCP_BACKEND_UPVALUE 3
+
+// all possible commands.
+#define CMD_FIELDS \
+ X(CMD_MG) \
+ X(CMD_MS) \
+ X(CMD_MD) \
+ X(CMD_MN) \
+ X(CMD_MA) \
+ X(CMD_ME) \
+ X(CMD_GET) \
+ X(CMD_GAT) \
+ X(CMD_SET) \
+ X(CMD_ADD) \
+ X(CMD_CAS) \
+ X(CMD_GETS) \
+ X(CMD_GATS) \
+ X(CMD_INCR) \
+ X(CMD_DECR) \
+ X(CMD_TOUCH) \
+ X(CMD_APPEND) \
+ X(CMD_DELETE) \
+ X(CMD_REPLACE) \
+ X(CMD_PREPEND) \
+ X(CMD_END_STORAGE) \
+ X(CMD_QUIT) \
+ X(CMD_STATS) \
+ X(CMD_SLABS) \
+ X(CMD_WATCH) \
+ X(CMD_LRU) \
+ X(CMD_VERSION) \
+ X(CMD_SHUTDOWN) \
+ X(CMD_EXTSTORE) \
+ X(CMD_FLUSH_ALL) \
+ X(CMD_VERBOSITY) \
+ X(CMD_LRU_CRAWLER) \
+ X(CMD_REFRESH_CERTS) \
+ X(CMD_CACHE_MEMLIMIT)
+
+#define X(name) name,
+enum proxy_defines {
+ P_OK = 0,
+ CMD_FIELDS
+ CMD_SIZE, // used to define array size for command hooks.
+ CMD_ANY, // override _all_ commands
+ CMD_ANY_STORAGE, // override commands specific to key storage.
+};
+#undef X
+
+// certain classes of ascii commands have similar parsing (ie;
+// get/gets/gat/gats). Use types so we don't have to test a ton of them.
+enum proxy_cmd_types {
+ CMD_TYPE_GENERIC = 0,
+ CMD_TYPE_GET, // get/gets/gat/gats
+ CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
+ CMD_TYPE_META, // m*'s.
+};
+
+typedef struct _io_pending_proxy_t io_pending_proxy_t;
+typedef struct proxy_event_thread_s proxy_event_thread_t;
+
+#ifdef HAVE_LIBURING
+typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe);
+typedef struct {
+ void *udata;
+ proxy_event_cb cb;
+ bool set; // NOTE: not sure if necessary if code structured properly
+} proxy_event_t;
+
+static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
+static void *proxy_event_thread_ur(void *arg);
+#endif
+
+struct proxy_user_stats {
+ size_t num_stats; // number of stats, for sizing various arrays
+ char **names; // not needed for worker threads
+ uint64_t *counters; // array of counters.
+};
+
+struct proxy_global_stats {
+ uint64_t config_reloads;
+ uint64_t config_reload_fails;
+ uint64_t backend_total;
+ uint64_t backend_disconn; // backends with no connections
+ uint64_t backend_requests; // reqs sent to backends
+ uint64_t backend_responses; // responses received from backends
+ uint64_t backend_errors; // errors from backends
+};
+
+struct proxy_timeouts {
+ struct timeval connect;
+ struct timeval retry; // wait time before retrying a dead backend
+ struct timeval read;
+};
+
+typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
+typedef struct {
+ lua_State *proxy_state;
+ void *proxy_code;
+ proxy_event_thread_t *proxy_threads;
+ pthread_mutex_t config_lock;
+ pthread_cond_t config_cond;
+ pthread_t config_tid;
+ pthread_mutex_t worker_lock;
+ pthread_cond_t worker_cond;
+ pthread_t manager_tid; // deallocation management thread
+ pthread_mutex_t manager_lock;
+ pthread_cond_t manager_cond;
+ pool_head_t manager_head; // stack for pool deallocation.
+ bool worker_done; // signal variable for the worker lock/cond system.
+ bool worker_failed; // covered by worker_lock as well.
+ struct proxy_global_stats global_stats;
+ struct proxy_user_stats user_stats;
+ struct proxy_timeouts timeouts; // NOTE: updates covered by stats_lock
+ pthread_mutex_t stats_lock; // used for rare global counters
+} proxy_ctx_t;
+
+struct proxy_hook {
+ int lua_ref;
+ bool is_lua; // pull the lua reference and call it as a lua function.
+};
+
+typedef uint32_t (*hash_selector_func)(const void *key, size_t len, void *ctx);
+struct proxy_hash_caller {
+ hash_selector_func selector_func;
+ void *ctx;
+};
+
+// A default hash function for backends.
+static uint32_t mcplib_hashfunc_murmur3_func(const void *key, size_t len, void *ctx) {
+ return MurmurHash3_x86_32(key, len);
+}
+static struct proxy_hash_caller mcplib_hashfunc_murmur3 = { mcplib_hashfunc_murmur3_func, NULL};
+
+enum mcp_backend_states {
+ mcp_backend_read = 0, // waiting to read any response
+ mcp_backend_parse, // have some buffered data to check
+ mcp_backend_read_end, // looking for an "END" marker for GET
+ mcp_backend_want_read, // read more data to complete command
+ mcp_backend_next, // advance to the next IO
+};
+
+typedef struct mcp_backend_s mcp_backend_t;
+typedef struct mcp_request_s mcp_request_t;
+typedef struct mcp_parser_s mcp_parser_t;
+
+#define PARSER_MAX_TOKENS 24
+
+struct mcp_parser_meta_s {
+ uint64_t flags;
+};
+
+// Note that we must use offsets into request for tokens,
+// as *request can change between parsing and later accessors.
+struct mcp_parser_s {
+ const char *request;
+ void *vbuf; // temporary buffer for holding value lengths.
+ uint8_t command;
+ uint8_t cmd_type; // command class.
+ uint8_t ntokens;
+ uint8_t keytoken; // because GAT. sigh. also cmds without a key.
+ uint32_t parsed; // how far into the request we parsed already
+ uint32_t reqlen; // full length of request buffer.
+ int vlen;
+ uint32_t klen; // length of key.
+ uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
+ bool has_space; // a space was found after the last byte parsed.
+ union {
+ struct mcp_parser_meta_s meta;
+ } t;
+};
+
+#define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]])
+
+#define MAX_REQ_TOKENS 2
+struct mcp_request_s {
+ mcp_parser_t pr; // non-lua-specific parser handling.
+ struct timeval start; // time this object was created.
+ mcp_backend_t *be; // backend handling this request.
+ bool ascii_multiget; // ascii multiget mode. (hide errors/END)
+ bool was_modified; // need to rewrite the request
+ int tokent_ref; // reference to token table if modified.
+ char request[];
+};
+
+typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
+#define MAX_IPLEN 45
+#define MAX_PORTLEN 6
+struct mcp_backend_s {
+ char ip[MAX_IPLEN+1];
+ char port[MAX_PORTLEN+1];
+ double weight;
+ int depth;
+ int failed_count; // number of fails (timeouts) in a row
+ pthread_mutex_t mutex; // covers stack.
+ proxy_event_thread_t *event_thread; // event thread owning this backend.
+ void *client; // mcmc client
+ STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
+ io_head_t io_head; // stack of requests.
+ char *rbuf; // static allocated read buffer.
+ struct event event; // libevent
+#ifdef HAVE_LIBURING
+ proxy_event_t ur_rd_ev; // liburing.
+ proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
+#endif
+ enum mcp_backend_states state; // readback state machine
+ bool connecting; // in the process of an asynch connection.
+ bool can_write; // recently got a WANT_WRITE or are connecting.
+ bool stacked; // if backend already queued for syscalls.
+ bool bad; // timed out, marked as bad.
+};
+typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
+
+struct proxy_event_thread_s {
+ pthread_t thread_id;
+ struct event_base *base;
+ struct event notify_event; // listen event for the notify pipe/eventfd.
+ struct event clock_event; // timer for updating event thread data.
+#ifdef HAVE_LIBURING
+ struct io_uring ring;
+ proxy_event_t ur_notify_event; // listen on eventfd.
+ eventfd_t event_counter;
+ bool use_uring;
+#endif
+ pthread_mutex_t mutex; // covers stack.
+ pthread_cond_t cond; // condition to wait on while stack drains.
+ io_head_t io_head_in; // inbound requests to process.
+ be_head_t be_head; // stack of backends for processing.
+#ifdef USE_EVENTFD
+ int event_fd;
+#else
+ int notify_receive_fd;
+ int notify_send_fd;
+#endif
+ proxy_ctx_t *ctx; // main context.
+ struct proxy_timeouts timeouts; // periodically copied from main ctx
+};
+
+#define RESP_CMD_MAX 8
+typedef struct {
+ mcmc_resp_t resp;
+ struct timeval start; // start time inherited from paired request
+ char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
+ int status; // status code from mcmc_read()
+ char *buf; // response line + potentially value.
+ size_t blen; // total size of the value to read.
+ int bread; // amount of bytes read into value so far.
+} mcp_resp_t;
+
+// re-cast an io_pending_t into this more descriptive structure.
+// the first few items _must_ match the original struct.
+struct _io_pending_proxy_t {
+ int io_queue_type;
+ LIBEVENT_THREAD *thread;
+ conn *c;
+ mc_resp *resp; // original struct ends here
+
+ struct _io_pending_proxy_t *next; // stack for IO submission
+ STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
+ int coro_ref; // lua registry reference to the coroutine
+ int mcpres_ref; // mcp.res reference used for await()
+ lua_State *coro; // pointer directly to the coroutine
+ mcp_backend_t *backend; // backend server to request from
+ struct iovec iov[2]; // request string + tail buffer
+ int iovcnt; // 1 or 2...
+ int await_ref; // lua reference if we were an await object
+ mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
+ bool flushed; // whether we've fully written this request to a backend.
+ bool ascii_multiget; // passed on from mcp_r_t
+ bool is_await; // are we an await object?
+};
+
+// Note: does *be have to be a sub-struct? how stable are userdata pointers?
+// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
+// - says no.
+typedef struct {
+ int ref; // luaL_ref reference.
+ mcp_backend_t *be;
+} mcp_pool_be_t;
+
+typedef struct mcp_pool_s mcp_pool_t;
+struct mcp_pool_s {
+ struct proxy_hash_caller phc;
+ pthread_mutex_t lock; // protects refcount.
+ proxy_ctx_t *ctx; // main context.
+ STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
+ int refcount;
+ int phc_ref;
+ int self_ref; // TODO: double check that this is needed.
+ int pool_size;
+ mcp_pool_be_t pool[];
+};
+
+typedef struct {
+ mcp_pool_t *main; // ptr to original
+} mcp_pool_proxy_t;
+
+static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
+#define PROCESS_MULTIGET true
+#define PROCESS_NORMAL false
+static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
+static size_t _process_request_next_key(mcp_parser_t *pr);
+static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen);
+static void dump_stack(lua_State *L);
+static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
+static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen);
+static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
+static int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
+static int mcplib_await_return(io_pending_proxy_t *p);
+static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
+static void *proxy_event_thread(void *arg);
+static void proxy_out_errstring(mc_resp *resp, const char *str);
+static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p);
+static int _reset_bad_backend(mcp_backend_t *be);
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
+static int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
+
+/******** EXTERNAL FUNCTIONS ******/
+// functions starting with _ are breakouts for the public functions.
+
+// see also: process_extstore_stats()
+// FIXME: get context off of conn? global variables
+// FIXME: stat coverage
+void proxy_stats(ADD_STAT add_stats, conn *c) {
+ if (!settings.proxy_enabled) {
+ return;
+ }
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ STAT_L(ctx);
+
+ APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
+ APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails);
+ APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total);
+ STAT_UL(ctx);
+}
+
+void process_proxy_stats(ADD_STAT add_stats, conn *c) {
+ char key_str[STAT_KEY_LEN];
+
+ if (!settings.proxy_enabled) {
+ return;
+ }
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ STAT_L(ctx);
+
+ // prepare aggregated counters.
+ struct proxy_user_stats *us = &ctx->user_stats;
+ uint64_t counters[us->num_stats];
+ memset(counters, 0, sizeof(counters));
+
+ // aggregate worker thread counters.
+ for (int x = 0; x < settings.num_threads; x++) {
+ LIBEVENT_THREAD *t = get_worker_thread(x);
+ struct proxy_user_stats *tus = t->proxy_stats;
+ WSTAT_L(t);
+ for (int i = 0; i < tus->num_stats; i++) {
+ counters[i] += tus->counters[i];
+ }
+ WSTAT_UL(t);
+ }
+
+ // return all of the stats
+ for (int x = 0; x < us->num_stats; x++) {
+ snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
+ APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
+ }
+ STAT_UL(ctx);
+}
+
+struct _dumpbuf {
+ size_t size;
+ size_t used;
+ char *buf;
+};
+
+static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
+ (void)L;
+ struct _dumpbuf *db = ud;
+ if (db->used + sz > db->size) {
+ db->size *= 2;
+ char *nb = realloc(db->buf, db->size);
+ if (nb == NULL) {
+ return -1;
+ }
+ db->buf = nb;
+ }
+ memcpy(db->buf + db->used, (const char *)p, sz);
+ db->used += sz;
+ return 0;
+}
+
+static const char * _load_helper(lua_State *L, void *data, size_t *size) {
+ (void)L;
+ struct _dumpbuf *db = data;
+ if (db->used == 0) {
+ *size = 0;
+ return NULL;
+ }
+ *size = db->used;
+ db->used = 0;
+ return db->buf;
+}
+
+void proxy_start_reload(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
+ pthread_cond_signal(&ctx->config_cond);
+ pthread_mutex_unlock(&ctx->config_lock);
+ }
+}
+
+// Manages a queue of inbound objects destined to be deallocated.
+static void *_proxy_manager_thread(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ pool_head_t head;
+
+ pthread_mutex_lock(&ctx->manager_lock);
+ while (1) {
+ STAILQ_INIT(&head);
+ while (STAILQ_EMPTY(&ctx->manager_head)) {
+ pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
+ }
+
+ // pull dealloc queue into local queue.
+ STAILQ_CONCAT(&head, &ctx->manager_head);
+ pthread_mutex_unlock(&ctx->manager_lock);
+
+ // Config lock is required for using config VM.
+ pthread_mutex_lock(&ctx->config_lock);
+ lua_State *L = ctx->proxy_state;
+ mcp_pool_t *p;
+ STAILQ_FOREACH(p, &head, next) {
+ // walk the hash selector backends and unref.
+ for (int x = 0; x < p->pool_size; x++) {
+ luaL_unref(L, LUA_REGISTRYINDEX, p->pool[x].ref);
+ }
+ // unref the phc ref.
+ luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
+ // need to... unref self.
+ // NOTE: double check if we really need to self-reference.
+ // this is a backup here to ensure the external refcounts hit zero
+ // before lua garbage collects the object. other things hold a
+ // reference to the object though.
+ luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref);
+ // that's it? let it float?
+ }
+ pthread_mutex_unlock(&ctx->config_lock);
+
+ // done.
+ pthread_mutex_lock(&ctx->manager_lock);
+ }
+
+ return NULL;
+}
+
+// Thread handling the configuration reload sequence.
+// TODO: get a logger instance.
+// TODO: making this "safer" will require a few phases of work.
+// 1) JFDI
+// 2) "test VM" -> from config thread, test the worker reload portion.
+// 3) "unit testing" -> from same temporary worker VM, execute set of
+// integration tests that must pass.
+// 4) run update on each worker, collecting new mcp.attach() hooks.
+// Once every worker has successfully executed and set new hooks, roll
+// through a _second_ time to actually swap the hook structures and unref
+// the old structures where marked dirty.
+static void *_proxy_config_thread(void *arg) {
+ proxy_ctx_t *ctx = arg;
+
+ logger_create();
+ pthread_mutex_lock(&ctx->config_lock);
+ while (1) {
+ pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
+ LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
+ STAT_INCR(ctx, config_reloads, 1);
+ lua_State *L = ctx->proxy_state;
+ lua_settop(L, 0); // clear off any crud that could have been left on the stack.
+
+ // The main stages of config reload are:
+ // - load and execute the config file
+ // - run mcp_config_pools()
+ // - for each worker:
+ // - copy and execute new lua code
+ // - copy selector table
+ // - run mcp_config_routes()
+
+ if (proxy_load_config(ctx) != 0) {
+ // Failed to load. log and wait for a retry.
+ STAT_INCR(ctx, config_reload_fails, 1);
+ LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
+ continue;
+ }
+
+ // TODO: create a temporary VM to test-load the worker code into.
+ // failing to load partway through the worker VM reloads can be
+ // critically bad if we're not careful about references.
+ // IE: the config VM _must_ hold references to selectors and backends
+ // as long as they exist in any worker for any reason.
+
+ for (int x = 0; x < settings.num_threads; x++) {
+ LIBEVENT_THREAD *thr = get_worker_thread(x);
+
+ pthread_mutex_lock(&ctx->worker_lock);
+ ctx->worker_done = false;
+ ctx->worker_failed = false;
+ proxy_reload_notify(thr);
+ while (!ctx->worker_done) {
+ // in case of spurious wakeup.
+ pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
+ }
+ pthread_mutex_unlock(&ctx->worker_lock);
+
+ // Code load bailed.
+ if (ctx->worker_failed) {
+ STAT_INCR(ctx, config_reload_fails, 1);
+ LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
+ continue;
+ }
+ }
+ LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
+ }
+
+ return NULL;
+}
+
+static int _start_proxy_config_threads(proxy_ctx_t *ctx) {
+ int ret;
+
+ pthread_mutex_lock(&ctx->config_lock);
+ if ((ret = pthread_create(&ctx->config_tid, NULL,
+ _proxy_config_thread, ctx)) != 0) {
+ fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
+ strerror(ret));
+ pthread_mutex_unlock(&ctx->config_lock);
+ return -1;
+ }
+ pthread_mutex_unlock(&ctx->config_lock);
+
+ pthread_mutex_lock(&ctx->manager_lock);
+ if ((ret = pthread_create(&ctx->manager_tid, NULL,
+ _proxy_manager_thread, ctx)) != 0) {
+ fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
+ strerror(ret));
+ pthread_mutex_unlock(&ctx->manager_lock);
+ return -1;
+ }
+ pthread_mutex_unlock(&ctx->manager_lock);
+
+ return 0;
+}
+
+// TODO: IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
+// event threads.
+static void _proxy_init_evthread_events(proxy_event_thread_t *t) {
+#ifdef HAVE_LIBURING
+ bool use_uring = true;
+ struct io_uring_params p = {0};
+ assert(t->event_fd); // uring only exists where eventfd also does.
+
+ // Setup the CQSIZE to be much larger than SQ size, since backpressure
+ // issues can cause us to block on SQ submissions and as a network server,
+ // stuff happens.
+ p.flags = IORING_SETUP_CQSIZE;
+ p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
+ int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
+ if (ret) {
+ perror("io_uring_queue_init_params");
+ exit(1);
+ }
+ if (!(p.features & IORING_FEAT_NODROP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_FAST_POLL)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
+ use_uring = false;
+ }
+
+ if (use_uring) {
+ // FIXME: Sigh. we need a blocking event_fd for io_uring but we've a
+ // chicken and egg in here. need a better structure... in meantime
+ // re-create the event_fd.
+ close(t->event_fd);
+ t->event_fd = eventfd(0, 0);
+ // FIXME: hack for event init.
+ t->ur_notify_event.set = false;
+ _proxy_evthr_evset_notifier(t);
+ t->use_uring = true;
+ return;
+ } else {
+ // Decided to not use io_uring, so don't waste memory.
+ io_uring_queue_exit(&t->ring);
+ }
+#endif
+
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+
+ // listen for notifications.
+ // NULL was thread_libevent_process
+ // FIXME: use modern format? (event_assign)
+#ifdef USE_EVENTFD
+ event_set(&t->notify_event, t->event_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#else
+ event_set(&t->notify_event, t->notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#endif
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ event_base_set(t->base, &t->notify_event);
+ if (event_add(&t->notify_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
+
+}
+
+// start the centralized lua state and config thread.
+// TODO: return ctx/state. avoid global vars.
+void proxy_init(void) {
+ proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
+ settings.proxy_ctx = ctx; // FIXME: return and deal with outside?
+
+ pthread_mutex_init(&ctx->config_lock, NULL);
+ pthread_cond_init(&ctx->config_cond, NULL);
+ pthread_mutex_init(&ctx->worker_lock, NULL);
+ pthread_cond_init(&ctx->worker_cond, NULL);
+ pthread_mutex_init(&ctx->manager_lock, NULL);
+ pthread_cond_init(&ctx->manager_cond, NULL);
+ pthread_mutex_init(&ctx->stats_lock, NULL);
+
+ // FIXME: default defines.
+ ctx->timeouts.connect.tv_sec = 5;
+ ctx->timeouts.retry.tv_sec = 3;
+ ctx->timeouts.read.tv_sec = 3;
+
+ STAILQ_INIT(&ctx->manager_head);
+ lua_State *L = luaL_newstate();
+ ctx->proxy_state = L;
+ luaL_openlibs(L);
+ // NOTE: might need to differentiate the libs yes?
+ proxy_register_libs(NULL, L);
+
+ // Create/start the backend threads, which we need before servers
+ // start getting created.
+ // Supporting N event threads should be possible, but it will be a
+ // low number of N to avoid too many wakeup syscalls.
+ // For now we hardcode to 1.
+ proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
+ ctx->proxy_threads = threads;
+ for (int i = 0; i < 1; i++) {
+ proxy_event_thread_t *t = &threads[i];
+#ifdef USE_EVENTFD
+ t->event_fd = eventfd(0, EFD_NONBLOCK);
+ // FIXME: eventfd can fail?
+#else
+ int fds[2];
+ if (pipe(fds)) {
+ perror("can't create proxy backend notify pipe");
+ exit(1);
+ }
+
+ t->notify_receive_fd = fds[0];
+ t->notify_send_fd = fds[1];
+#endif
+ _proxy_init_evthread_events(t);
+
+ // incoming request queue.
+ STAILQ_INIT(&t->io_head_in);
+ pthread_mutex_init(&t->mutex, NULL);
+ pthread_cond_init(&t->cond, NULL);
+
+ t->ctx = ctx;
+ memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts));
+
+#ifdef HAVE_LIBURING
+ if (t->use_uring) {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
+ } else {
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+ }
+#else
+ pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
+#endif // HAVE_LIBURING
+ }
+
+ _start_proxy_config_threads(ctx);
+}
+
+int proxy_load_config(void *arg) {
+ proxy_ctx_t *ctx = arg;
+ lua_State *L = ctx->proxy_state;
+ int res = luaL_loadfile(L, settings.proxy_startfile);
+ if (res != LUA_OK) {
+ fprintf(stderr, "Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
+ return -1;
+ }
+ // LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
+
+ // Now we need to dump the compiled code into bytecode.
+ // This will then get loaded into worker threads.
+ struct _dumpbuf *db = malloc(sizeof(struct _dumpbuf));
+ db->size = 16384;
+ db->used = 0;
+ db->buf = malloc(db->size);
+ lua_dump(L, _dump_helper, db, 0);
+ // 0 means no error.
+ ctx->proxy_code = db;
+
+ // now we complete the data load by calling the function.
+ res = lua_pcall(L, 0, LUA_MULTRET, 0);
+ if (res != LUA_OK) {
+ fprintf(stderr, "Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
+ exit(EXIT_FAILURE);
+ }
+
+ // call the mcp_config_pools function to get the central backends.
+ lua_getglobal(L, "mcp_config_pools");
+
+ // TODO: handle explicitly if function is missing.
+ lua_pushnil(L); // no "old" config yet.
+ if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
+ fprintf(stderr, "Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
+ exit(EXIT_FAILURE);
+ }
+
+ // result is our main config.
+ return 0;
+}
+
+// TODO: this will be done differently while implementing config reloading.
+static int _copy_pool(lua_State *from, lua_State *to) {
+ // from, -3 should have he userdata.
+ mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
+ size_t size = sizeof(mcp_pool_proxy_t);
+ mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
+ luaL_setmetatable(to, "mcp.pool_proxy");
+ // TODO: check pp.
+
+ pp->main = p;
+ pthread_mutex_lock(&p->lock);
+ p->refcount++;
+ pthread_mutex_unlock(&p->lock);
+ return 0;
+}
+
+static void _copy_config_table(lua_State *from, lua_State *to);
+// (from, -1) is the source value
+// should end with (to, -1) being the new value.
+// TODO: { foo = "bar", { thing = "foo" } } fails for lua_next() post final
+// table.
+static void _copy_config_table(lua_State *from, lua_State *to) {
+ int type = lua_type(from, -1);
+ bool found = false;
+ switch (type) {
+ case LUA_TNIL:
+ lua_pushnil(to);
+ break;
+ case LUA_TUSERDATA:
+ // see dump_stack() - check if it's something we handle.
+ if (lua_getmetatable(from, -1) != 0) {
+ lua_pushstring(from, "__name");
+ if (lua_rawget(from, -2) != LUA_TNIL) {
+ const char *name = lua_tostring(from, -1);
+ if (strcmp(name, "mcp.pool") == 0) {
+ // FIXME: check result
+ _copy_pool(from, to);
+ found = true;
+ }
+ }
+ lua_pop(from, 2);
+ }
+ if (!found) {
+ fprintf(stderr, "unhandled userdata type\n");
+ exit(1);
+ }
+ break;
+ case LUA_TNUMBER:
+ // FIXME: since 5.3 there's some sub-thing you need to do to push
+ // float vs int.
+ lua_pushnumber(to, lua_tonumber(from, -1));
+ break;
+ case LUA_TSTRING:
+ // FIXME: temp var + tolstring worth doing?
+ lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
+ break;
+ case LUA_TTABLE:
+ // TODO: huge table could cause stack exhaustion? have to do
+ // checkstack perhaps?
+ // TODO: copy the metatable first?
+ // TODO: size narr/nrec from old table and use createtable to
+ // pre-allocate.
+ lua_newtable(to); // throw new table on worker
+ int t = lua_absindex(from, -1); // static index of table to copy.
+ int nt = lua_absindex(to, -1); // static index of new table.
+ lua_pushnil(from); // start iterator for main
+ while (lua_next(from, t) != 0) {
+ // (key, -2), (val, -1)
+ // TODO: check what key is (it can be anything :|)
+ // to allow an optimization later lets restrict it to strings
+ // and numbers.
+ // don't coerce it to a string unless it already is one.
+ lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
+ // lua_settable(to, n) - n being the table
+ // takes -2 key -1 value, pops both.
+ // use lua_absindex(L, -1) and so to convert easier?
+ _copy_config_table(from, to); // push next value.
+ lua_settable(to, nt);
+ lua_pop(from, 1); // drop value, keep key.
+ }
+ // top of from is now the original table.
+ // top of to should be the new table.
+ break;
+ default:
+ // FIXME: error.
+ fprintf(stderr, "unhandled type\n");
+ exit(1);
+ }
+}
+
+// Run from proxy worker to coordinate code reload.
+// config_lock must be held first.
+void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
+ proxy_ctx_t *ctx = arg;
+ pthread_mutex_lock(&ctx->worker_lock);
+ if (proxy_thread_loadconf(thr) != 0) {
+ ctx->worker_failed = true;
+ }
+ ctx->worker_done = true;
+ pthread_cond_signal(&ctx->worker_cond);
+ pthread_mutex_unlock(&ctx->worker_lock);
+}
+
+// FIXME: need to test how to recover from an actual error here. stuff gets
+// left on the stack?
+static int proxy_thread_loadconf(LIBEVENT_THREAD *thr) {
+ lua_State *L = thr->L;
+ // load the precompiled config function.
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ struct _dumpbuf *db = ctx->proxy_code;
+ struct _dumpbuf db2; // copy because the helper modifies it.
+ memcpy(&db2, db, sizeof(struct _dumpbuf));
+
+ lua_load(L, _load_helper, &db2, "config", NULL);
+ // LUA_OK + all errs from loadfile except LUA_ERRFILE.
+ //dump_stack(L);
+ // - pcall the func (which should load it)
+ int res = lua_pcall(L, 0, LUA_MULTRET, 0);
+ if (res != LUA_OK) {
+ // FIXME: no crazy failure here!
+ fprintf(stderr, "Failed to load data into worker thread\n");
+ return -1;
+ }
+
+ lua_getglobal(L, "mcp_config_routes");
+ // create deepcopy of argument to pass into mcp_config_routes.
+ _copy_config_table(ctx->proxy_state, L);
+
+ // copied value is in front of route function, now call it.
+ if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
+ fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1));
+ return -1;
+ }
+
+ // update user stats
+ STAT_L(ctx);
+ struct proxy_user_stats *us = &ctx->user_stats;
+ struct proxy_user_stats *tus = NULL;
+ if (us->num_stats != 0) {
+ pthread_mutex_lock(&thr->stats.mutex);
+ if (thr->proxy_stats == NULL) {
+ tus = calloc(1, sizeof(struct proxy_user_stats));
+ thr->proxy_stats = tus;
+ } else {
+ tus = thr->proxy_stats;
+ }
+
+ // originally this was a realloc routine but it felt fragile.
+ // that might still be a better idea; still need to zero out the end.
+ uint64_t *counters = calloc(us->num_stats, sizeof(uint64_t));
+
+ // note that num_stats can _only_ grow in size.
+ // we also only care about counters on the worker threads.
+ if (tus->counters) {
+ assert(tus->num_stats <= us->num_stats);
+ memcpy(counters, tus->counters, tus->num_stats * sizeof(uint64_t));
+ free(tus->counters);
+ }
+
+ tus->counters = counters;
+ tus->num_stats = us->num_stats;
+ pthread_mutex_unlock(&thr->stats.mutex);
+ }
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+// Initialize the VM for an individual worker thread.
+void proxy_thread_init(LIBEVENT_THREAD *thr) {
+ // Create the hook table.
+ thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook));
+ if (thr->proxy_hooks == NULL) {
+ fprintf(stderr, "Failed to allocate proxy hooks\n");
+ exit(EXIT_FAILURE);
+ }
+
+ // Initialize the lua state.
+ lua_State *L = luaL_newstate();
+ thr->L = L;
+ luaL_openlibs(L);
+ proxy_register_libs(thr, L);
+
+ // kick off the configuration.
+ if (proxy_thread_loadconf(thr) != 0) {
+ exit(EXIT_FAILURE);
+ }
+}
+
+// ctx_stack is a stack of io_pending_proxy_t's.
+void proxy_submit_cb(io_queue_t *q) {
+ proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
+ io_pending_proxy_t *p = q->stack_ctx;
+ io_head_t head;
+ STAILQ_INIT(&head);
+
+ // NOTE: responses get returned in the correct order no matter what, since
+ // mc_resp's are linked.
+ // we just need to ensure stuff is parsed off the backend in the correct
+ // order.
+ // So we can do with a single list here, but we need to repair the list as
+ // responses are parsed. (in the req_remaining-- section)
+ // TODO:
+ // - except we can't do that because the deferred IO stack isn't
+ // compatible with queue.h.
+ // So for now we build the secondary list with an STAILQ, which
+ // can be transplanted/etc.
+ while (p) {
+ // insert into tail so head is oldest request.
+ STAILQ_INSERT_TAIL(&head, p, io_next);
+ if (!p->is_await) {
+ // funny workaround: awaiting IOP's don't count toward
+ // resuming a connection, only the completion of the await
+ // condition.
+ q->count++;
+ }
+
+ p = p->next;
+ }
+
+ // clear out the submit queue so we can re-queue new IO's inline.
+ q->stack_ctx = NULL;
+
+ // Transfer request stack to event thread.
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_CONCAT(&e->io_head_in, &head);
+ // No point in holding the lock since we're not doing a cond signal.
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+ // TODO: error handling.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // FIXME: check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
+ }
+#endif
+
+ return;
+}
+
+void proxy_complete_cb(io_queue_t *q) {
+ /*
+ io_pending_proxy_t *p = q->stack_ctx;
+ q->stack_ctx = NULL;
+
+ while (p) {
+ io_pending_proxy_t *next = p->next;
+ mc_resp *resp = p->resp;
+ lua_State *Lc = p->coro;
+
+ // in order to resume we need to remove the objects that were
+ // originally returned
+ // what's currently on the top of the stack is what we want to keep.
+ lua_rotate(Lc, 1, 1);
+ // We kept the original results from the yield so lua would not
+ // collect them in the meantime. We can drop those now.
+ lua_settop(Lc, 1);
+
+ proxy_run_coroutine(Lc, resp, p, NULL);
+
+ // don't need to flatten main thread here, since the coro is gone.
+
+ p = next;
+ }
+ return;
+ */
+}
+
+// called from worker thread after an individual IO has been returned back to
+// the worker thread. Do post-IO run and cleanup work.
+void proxy_return_cb(io_pending_t *pending) {
+ io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+ if (p->is_await) {
+ mcplib_await_return(p);
+ } else {
+ lua_State *Lc = p->coro;
+
+ // in order to resume we need to remove the objects that were
+ // originally returned
+ // what's currently on the top of the stack is what we want to keep.
+ lua_rotate(Lc, 1, 1);
+ // We kept the original results from the yield so lua would not
+ // collect them in the meantime. We can drop those now.
+ lua_settop(Lc, 1);
+
+ // p can be freed/changed from the call below, so fetch the queue now.
+ io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
+ conn *c = p->c;
+ proxy_run_coroutine(Lc, p->resp, p, NULL);
+
+ q->count--;
+ if (q->count == 0) {
+ // call re-add directly since we're already in the worker thread.
+ conn_worker_readd(c);
+ }
+ }
+}
+
+// called from the worker thread as an mc_resp is being freed.
+// must let go of the coroutine reference if there is one.
+// caller frees the pending IO.
+void proxy_finalize_cb(io_pending_t *pending) {
+ io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
+
+ // release our coroutine reference.
+ // TODO: coroutines are reusable in lua 5.4. we can stack this onto a freelist
+ // after a lua_resetthread(Lc) call.
+ if (p->coro_ref) {
+ // Note: lua registry is the same for main thread or a coroutine.
+ luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
+ }
+ return;
+}
+
+int try_read_command_proxy(conn *c) {
+ char *el, *cont;
+
+ if (c->rbytes == 0)
+ return 0;
+
+ el = memchr(c->rcurr, '\n', c->rbytes);
+ if (!el) {
+ if (c->rbytes > 1024) {
+ /*
+ * We didn't have a '\n' in the first k. This _has_ to be a
+ * large multiget, if not we should just nuke the connection.
+ */
+ char *ptr = c->rcurr;
+ while (*ptr == ' ') { /* ignore leading whitespaces */
+ ++ptr;
+ }
+
+ if (ptr - c->rcurr > 100 ||
+ (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
+
+ conn_set_state(c, conn_closing);
+ return 1;
+ }
+
+ // ASCII multigets are unbound, so our fixed size rbuf may not
+ // work for this particular workload... For backcompat we'll use a
+ // malloc/realloc/free routine just for this.
+ if (!c->rbuf_malloced) {
+ if (!rbuf_switch_to_malloc(c)) {
+ conn_set_state(c, conn_closing);
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+ }
+ cont = el + 1;
+ // TODO: we don't want to cut the \r\n here. lets see how lua handles
+ // non-terminated strings?
+ /*if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
+ el--;
+ }
+ *el = '\0';*/
+
+ assert(cont <= (c->rcurr + c->rbytes));
+
+ c->last_cmd_time = current_time;
+ proxy_process_command(c, c->rcurr, cont - c->rcurr, PROCESS_NORMAL);
+
+ c->rbytes -= (cont - c->rcurr);
+ c->rcurr = cont;
+
+ assert(c->rcurr <= (c->rbuf + c->rsize));
+
+ return 1;
+
+}
+
+// we buffered a SET of some kind.
+void complete_nread_proxy(conn *c) {
+ assert(c != NULL);
+
+ conn_set_state(c, conn_new_cmd);
+
+ LIBEVENT_THREAD *thr = c->thread;
+ lua_State *L = thr->L;
+ lua_State *Lc = lua_tothread(L, -1);
+ // FIXME: could use a quicker method to retrieve the request.
+ mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request");
+
+ // validate the data chunk.
+ if (strncmp((char *)c->item + rq->pr.vlen - 2, "\r\n", 2) != 0) {
+ // TODO: error handling.
+ lua_settop(L, 0); // clear anything remaining on the main thread.
+ return;
+ }
+ rq->pr.vbuf = c->item;
+ c->item = NULL;
+ c->item_malloced = false;
+
+ proxy_run_coroutine(Lc, c->resp, NULL, c);
+
+ lua_settop(L, 0); // clear anything remaining on the main thread.
+
+ return;
+}
+
+/******** NETWORKING AND INTERNAL FUNCTIONS ******/
+
+static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
+ io_head_t head;
+
+ STAILQ_INIT(&head);
+ STAILQ_INIT(&t->be_head);
+
+ // Pull the entire stack of inbound into local queue.
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->io_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ int io_count = 0;
+ int be_count = 0;
+ while (!STAILQ_EMPTY(&head)) {
+ io_pending_proxy_t *io = STAILQ_FIRST(&head);
+ io->flushed = false;
+ mcp_backend_t *be = io->backend;
+ // So the backend can retrieve its event base.
+ be->event_thread = t;
+
+ // _no_ mutex on backends. they are owned by the event thread.
+ STAILQ_REMOVE_HEAD(&head, io_next);
+ if (be->bad) {
+ P_DEBUG("%s: fast failing request to bad backend\n", __func__);
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ continue;
+ }
+ STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
+ be->depth++;
+ io_count++;
+ if (!be->stacked) {
+ be->stacked = true;
+ STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
+ be_count++;
+ }
+ }
+ //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count);
+ return io_count;
+}
+
+#ifdef HAVE_LIBURING
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len);
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be);
+
+// read handler.
+static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int bread = cqe->res;
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // TODO: check bread for disconnect/etc.
+
+ int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
+ P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
+
+ if (res > 0) {
+ _proxy_evthr_evset_be_read(be, rbuf, toread);
+ } else if (res == -1) {
+ _reset_bad_backend(be);
+ return;
+ }
+
+ // FIXME: when exactly do we need to reset the backend handler?
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE);
+ }
+}
+
+static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int flags = 0;
+
+ be->can_write = true;
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME: if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be);
+ //_backend_failed(be); // FIXME: need a uring version of this.
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ io_pending_proxy_t *io = NULL;
+ int res = 0;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ res = _flush_pending_write(be, io);
+ if (res != -1) {
+ flags |= res;
+ if (flags & EV_WRITE) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ if (res == -1) {
+ _reset_bad_backend(be);
+ return;
+ }
+
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be);
+ }
+
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE);
+}
+
+static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+
+ // liburing always uses eventfd for the notifier.
+ // *cqe has our result.
+ assert(cqe->res != -EINVAL);
+ if (cqe->res != sizeof(eventfd_t)) {
+ P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
+ // FIXME: figure out if this is impossible, and how to handle if not.
+ assert(1 == 0);
+ }
+
+ // need to re-arm the listener every time.
+ _proxy_evthr_evset_notifier(t);
+
+ // TODO: sqe queues for writing to backends
+ // - _ur handler for backend write completion is to set a read event and
+ // re-submit. ugh.
+ // Should be possible to have standing reads, but flow is harder and lets
+ // optimize that later. (ie; allow matching reads to a request but don't
+ // actually dequeue anything until both read and write are confirmed)
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+ //struct timeval tmp_time = {5,0}; // FIXME: temporary hard coded timeout.
+
+ // TODO: for each backend, queue writev's into sqe's
+ // move the backend sqe bits into a write complete handler
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ flags |= EV_WRITE;
+ } else {
+ io_pending_proxy_t *io = NULL;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ flags = _flush_pending_write(be, io);
+ if (flags == -1 || flags & EV_WRITE) {
+ break;
+ }
+ }
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be);
+ } else {
+ // FIXME: needs a re-write to handle sqe starvation.
+ // FIXME: can't actually set the read here? need to confirm _some_
+ // write first?
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be);
+ }
+ if (flags & EV_READ) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE);
+ }
+ }
+ }
+}
+
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME: NULL?
+
+ io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+}
+
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len) {
+ P_DEBUG("%s: setting: %lu\n", __func__, len);
+ struct io_uring_sqe *sqe;
+ if (be->ur_rd_ev.set) {
+ P_DEBUG("%s: already set\n", __func__);
+ return;
+ }
+
+ be->ur_rd_ev.cb = proxy_backend_handler_ur;
+ be->ur_rd_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME: NULL?
+ assert(be->rbuf != NULL);
+ io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
+ be->ur_rd_ev.set = true;
+}
+
+static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+ P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
+ if (t->ur_notify_event.set)
+ return;
+
+ t->ur_notify_event.cb = proxy_event_handler_ur;
+ t->ur_notify_event.udata = t;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME: NULL?
+ io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
+ io_uring_sqe_set_data(sqe, &t->ur_notify_event);
+}
+
+// TODO: CQE's can generate many SQE's, so we might need to occasionally check
+// for space free in the sqe queue and submit in the middle of the cqe
+// foreach.
+// There might be better places to do this, but I think it's cleaner if
+// submission and cqe can stay in this function.
+// TODO: The problem is io_submit() can deadlock if too many cqe's are
+// waiting.
+// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
+// flight", because the former is much easier to work around (ie; only run the
+// backend handler after dequeuing everything else)
+static void *proxy_event_thread_ur(void *arg) {
+ proxy_event_thread_t *t = arg;
+ struct io_uring_cqe *cqe;
+
+ P_DEBUG("%s: starting\n", __func__);
+
+ while (1) {
+ io_uring_submit_and_wait(&t->ring, 1);
+ //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
+
+ uint32_t head = 0;
+ uint32_t count = 0;
+
+ io_uring_for_each_cqe(&t->ring, head, cqe) {
+ P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
+
+ proxy_event_t *pe = io_uring_cqe_get_data(cqe);
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
+
+ count++;
+ }
+
+ P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
+ io_uring_cq_advance(&t->ring, count);
+ }
+
+ return NULL;
+}
+#endif // HAVE_LIBURING
+
+// We need to get timeout/retry/etc updates to the event thread(s)
+// occasionally. I'd like to have a better inteface around this where updates
+// are shipped directly; but this is good enough to start with.
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+ proxy_ctx_t *ctx = t->ctx;
+
+ // TODO: double check how much of this boilerplate is still necessary?
+ // reschedule the clock event.
+ evtimer_del(&t->clock_event);
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ // we reuse the "global stats" lock since it's hardly ever used.
+ STAT_L(ctx);
+ memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts));
+ STAT_UL(ctx);
+}
+
+// event handler for executing backend requests
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // FIXME: figure out if this is impossible, and how to handle if not.
+ assert(1 == 0);
+ }
+#else
+ char buf[1];
+ // TODO: This is a lot more fatal than it should be. can it fail? can
+ // it blow up the server?
+ // FIXME: a cross-platform method of speeding this up would be nice. With
+ // event fds we can queue N events and wakeup once here.
+ // If we're pulling one byte out of the pipe at a time here it'll just
+ // wake us up too often.
+ // If the pipe is O_NONBLOCK then maybe just a larger read would work?
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+ struct timeval tmp_time = t->timeouts.connect;
+
+ // FIXME: _set_event() is buggy, see notes on function.
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ } else {
+ io_pending_proxy_t *io = NULL;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ flags = _flush_pending_write(be, io);
+ if (flags == -1 || flags & EV_WRITE) {
+ break;
+ }
+ }
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be);
+ } else {
+ flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
+ _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
+ }
+ }
+
+}
+
+static void *proxy_event_thread(void *arg) {
+ proxy_event_thread_t *t = arg;
+
+ event_base_loop(t->base, 0);
+ event_base_free(t->base);
+
+ // TODO: join bt threads, free array.
+
+ return NULL;
+}
+
+
+// Need a custom function so we can prefix lua strings easily.
+static void proxy_out_errstring(mc_resp *resp, const char *str) {
+ size_t len;
+ const static char error_prefix[] = "SERVER_ERROR ";
+ const static int error_prefix_len = sizeof(error_prefix) - 1;
+
+ assert(resp != NULL);
+
+ resp_reset(resp);
+ // avoid noreply since we're throwing important errors.
+
+ // Fill response object with static string.
+ len = strlen(str);
+ if ((len + error_prefix_len + 2) > WRITE_BUFFER_SIZE) {
+ /* ought to be always enough. just fail for simplicity */
+ str = "SERVER_ERROR output line too long";
+ len = strlen(str);
+ }
+
+ char *w = resp->wbuf;
+ memcpy(w, error_prefix, error_prefix_len);
+ w += error_prefix_len;
+
+ memcpy(w, str, len);
+ w += len;
+
+ memcpy(w, "\r\n", 2);
+ resp_add_iov(resp, resp->wbuf, len + error_prefix_len + 2);
+ return;
+}
+
+// Simple error wrapper for common failures.
+// lua_error() is a jump so this function never returns
+// for clarity add a 'return' after calls to this.
+static void proxy_lua_error(lua_State *L, const char *s) {
+ lua_pushstring(L, s);
+ lua_error(L);
+}
+
+static void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ lua_pushfstring(L, fmt, ap);
+ va_end(ap);
+ lua_error(L);
+}
+
+// FIXME: if we use the newer API the various pending checks can be adjusted.
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) {
+ // FIXME: chicken and egg.
+ // can't check if pending if the structure is was calloc'ed (sigh)
+ // don't want to double test here. should be able to event_assign but
+ // not add anything during initialization, but need the owner thread's
+ // event base.
+ int pending = 0;
+ if (event_initialized(&be->event)) {
+ pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ }
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
+ event_del(&be->event); // replace existing event.
+ }
+
+ // if we can't write, we could be connecting.
+ // TODO: always check for READ in case some commands were sent
+ // successfully? The flags could be tracked on *be and reset in the
+ // handler, perhaps?
+ event_assign(&be->event, base, mcmc_fd(be->client),
+ flags, callback, be);
+ event_add(&be->event, &t);
+}
+
+// this resumes every yielded coroutine (and re-resumes if necessary).
+// called from the worker thread after responses have been pulled from the
+// network.
+// Flow:
+// - the response object should already be on the coroutine stack.
+// - fix up the stack.
+// - run coroutine.
+// - if LUA_YIELD, we need to swap out the pending IO from its mc_resp then call for a queue
+// again.
+// - if LUA_OK finalize the response and return
+// - else set error into mc_resp.
+static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c) {
+ int nresults = 0;
+ int cores = lua_resume(Lc, NULL, 1, &nresults);
+ size_t rlen = 0;
+
+ if (cores == LUA_OK) {
+ int type = lua_type(Lc, -1);
+ if (type == LUA_TUSERDATA) {
+ mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response");
+ LOGGER_LOG(NULL, LOG_RAWCMDS, LOGGER_PROXY_RAW, NULL, r->start, r->cmd, r->resp.type, r->resp.code);
+ if (r->buf) {
+ // response set from C.
+ // FIXME: write_and_free() ? it's a bit wrong for here.
+ resp->write_and_free = r->buf;
+ resp_add_iov(resp, r->buf, r->blen);
+ r->buf = NULL;
+ } else if (lua_getiuservalue(Lc, -1, 1) != LUA_TNIL) {
+ // uservalue slot 1 is pre-created, so we get TNIL instead of
+ // TNONE when nothing was set into it.
+ const char *s = lua_tolstring(Lc, -1, &rlen);
+ size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
+ memcpy(resp->wbuf, s, l);
+ resp_add_iov(resp, resp->wbuf, l);
+ lua_pop(Lc, 1);
+ } else if (r->status != MCMC_OK) {
+ proxy_out_errstring(resp, "backend failure");
+ } else {
+ // TODO: double check how this can get here?
+ // MCMC_OK but no buffer and no internal value set? still an
+ // error?
+ P_DEBUG("%s: unhandled response\n", __func__);
+ }
+ } else if (type == LUA_TSTRING) {
+ // response is a raw string from lua.
+ const char *s = lua_tolstring(Lc, -1, &rlen);
+ size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
+ memcpy(resp->wbuf, s, l);
+ resp_add_iov(resp, resp->wbuf, l);
+ lua_pop(Lc, 1);
+ } else {
+ proxy_out_errstring(resp, "bad response");
+ }
+ } else if (cores == LUA_YIELD) {
+ if (nresults == 1) {
+ // TODO: try harder to validate; but we have so few yield cases
+ // that I'm going to shortcut this here. A single yielded result
+ // means it's probably an await(), so attempt to process this.
+ // FIXME: if p, do we need to free it up from the resp?
+ // resp should not have an IOP I think...
+ assert(p == NULL);
+ // coroutine object sitting on the _main_ VM right now, so we grab
+ // the reference from there, which also pops it.
+ int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
+ mcplib_await_run(c, Lc, coro_ref);
+ } else {
+ // need to remove and free the io_pending, since c->resp owns it.
+ // so we call mcp_queue_io() again and let it override the
+ // mc_resp's io_pending object.
+
+ int coro_ref = 0;
+ mc_resp *resp;
+ if (p != NULL) {
+ coro_ref = p->coro_ref;
+ resp = p->resp;
+ c = p->c;
+ do_cache_free(p->c->thread->io_cache, p);
+ // *p is now dead.
+ } else {
+ // yielding from a top level call to the coroutine,
+ // so we need to grab a reference to the coroutine thread.
+ // TODO: make this more explicit?
+ // we only need to get the reference here, and error conditions
+ // should instead drop it, but now it's not obvious to users that
+ // we're reaching back into the main thread's stack.
+ assert(c != NULL);
+ coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
+ resp = c->resp;
+ }
+ // TODO: c only used for cache alloc? push the above into the func?
+ mcp_queue_io(c, resp, coro_ref, Lc);
+ }
+ } else {
+ P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
+ LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1));
+ proxy_out_errstring(resp, "lua failure");
+ }
+
+ return 0;
+}
+
+// NOTES:
+// - mcp_backend_read: grab req_stack_head, do things
+// read -> next, want_read -> next | read_end, etc.
+// issue: want read back to read_end as necessary. special state?
+// - it's fine: p->client_resp->type.
+// - mcp_backend_next: advance, consume, etc.
+// TODO: second argument with enum for a specific error.
+// - probably just for logging. for app if any of these errors shouldn't
+// result in killing the request stack!
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) {
+ bool stop = false;
+ io_pending_proxy_t *p = NULL;
+ mcmc_resp_t tmp_resp; // helper for testing for GET's END marker.
+ int flags = 0;
+
+ p = STAILQ_FIRST(&be->io_head);
+ if (p == NULL) {
+ // got a read event, but nothing was queued.
+ // probably means a disconnect event.
+ // TODO: could probably confirm this by attempting to read the
+ // socket, getsockopt, or something else simply for logging or
+ // statistical purposes.
+ // In this case we know it's going to be a close so error.
+ flags = -1;
+ P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
+ return flags;
+ }
+
+ while (!stop) {
+ mcp_resp_t *r;
+ int res = 1;
+ int remain = 0;
+ char *newbuf = NULL;
+
+ switch(be->state) {
+ case mcp_backend_read:
+ assert(p != NULL);
+ P_DEBUG("%s: [read] bread: %d\n", __func__, bread);
+
+ if (bread == 0) {
+ // haven't actually done a read yet; figure out where/what.
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ be->state = mcp_backend_parse;
+ }
+ break;
+ case mcp_backend_parse:
+ r = p->client_resp;
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp);
+ // FIXME: Don't like this interface.
+ bread = 0; // only add the bread once per loop.
+ if (r->status != MCMC_OK) {
+ P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
+ if (r->status == MCMC_WANT_READ) {
+ flags |= EV_READ;
+ be->state = mcp_backend_read;
+ stop = true;
+ break;
+ } else {
+ flags = -1;
+ stop = true;
+ break;
+ }
+ }
+
+ // we actually don't care about anything but the value length
+ // TODO: if vlen != vlen_read, pull an item and copy the data.
+ int extra_space = 0;
+ switch (r->resp.type) {
+ case MCMC_RESP_GET:
+ // We're in GET mode. we only support one key per
+ // GET in the proxy backends, so we need to later check
+ // for an END.
+ extra_space = ENDLEN;
+ break;
+ case MCMC_RESP_END:
+ // this is a MISS from a GET request
+ // or final handler from a STAT request.
+ assert(r->resp.vlen == 0);
+ break;
+ case MCMC_RESP_META:
+ // we can handle meta responses easily since they're self
+ // contained.
+ break;
+ case MCMC_RESP_GENERIC:
+ case MCMC_RESP_NUMERIC:
+ break;
+ // TODO: No-op response?
+ default:
+ P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
+ // unhandled :(
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (res) {
+ if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
+ // Ascii multiget hack mode; consume END's
+ be->state = mcp_backend_next;
+ break;
+ }
+
+ // r->resp.reslen + r->resp.vlen is the total length of the response.
+ // TODO: need to associate a buffer with this response...
+ // for now lets abuse write_and_free on mc_resp and simply malloc the
+ // space we need, stuffing it into the resp object.
+
+ r->blen = r->resp.reslen + r->resp.vlen;
+ r->buf = malloc(r->blen + extra_space);
+ if (r->buf == NULL) {
+ flags = -1; // TODO: specific error.
+ stop = true;
+ break;
+ }
+
+ P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
+ if (r->resp.vlen != r->resp.vlen_read) {
+ P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
+ // copy the partial and advance mcmc's buffer digestion.
+ // FIXME: should call this for both cases?
+ // special function for advancing mcmc's buffer for
+ // reading a value? perhaps a flag to skip the data copy
+ // when it's unnecessary?
+ memcpy(r->buf, be->rbuf, r->resp.reslen);
+ r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread);
+ be->state = mcp_backend_want_read;
+ break;
+ } else {
+ // mcmc's already counted the value as read if it fit in
+ // the original buffer...
+ memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
+ }
+ } else {
+ // TODO: no response read?
+ // nothing currently sets res to 0. should remove if that
+ // never comes up and handle the error entirely above.
+ P_DEBUG("%s: no response read from backend\n", __func__);
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (r->resp.type == MCMC_RESP_GET) {
+ // advance the buffer
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ if (remain != 0) {
+ // TODO: don't need to shuffle buffer with better API
+ memmove(be->rbuf, newbuf, remain);
+ }
+
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+
+ break;
+ case mcp_backend_read_end:
+ r = p->client_resp;
+ // we need to ensure the next data in the stream is "END\r\n"
+ // if not, the stack is desynced and we lose it.
+
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp);
+ P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type);
+ if (r->status != MCMC_OK) {
+ if (r->status == MCMC_WANT_READ) {
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ flags = -1; // TODO: specific error.
+ stop = true;
+ }
+ break;
+ } else if (tmp_resp.type != MCMC_RESP_END) {
+ // TODO: specific error about protocol desync
+ flags = -1;
+ stop = true;
+ break;
+ } else {
+ // response is good.
+ // FIXME: copy what the server actually sent?
+ if (!p->ascii_multiget) {
+ // sigh... if part of a multiget we need to eat the END
+ // markers down here.
+ memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
+ r->blen += 5;
+ }
+ }
+
+ be->state = mcp_backend_next;
+
+ break;
+ case mcp_backend_want_read:
+ // Continuing a read from earlier
+ r = p->client_resp;
+ // take bread input and see if we're done reading the value,
+ // else advance, set buffers, return next.
+ if (bread > 0) {
+ r->bread += bread;
+ bread = 0;
+ }
+ P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
+
+ if (r->bread >= r->resp.vlen) {
+ // all done copying data.
+ if (r->resp.type == MCMC_RESP_GET) {
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ // Shouldn't be anything in the buffer if we had to run to
+ // want_read to read the value.
+ assert(remain == 0);
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+ } else {
+ // signal to caller to issue a read.
+ *rbuf = r->buf+r->resp.reslen+r->bread;
+ *toread = r->resp.vlen - r->bread;
+ // need to retry later.
+ flags |= EV_READ;
+ stop = true;
+ }
+
+ break;
+ case mcp_backend_next:
+ // set the head here. when we break the head will be correct.
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ be->depth--;
+ // have to do the q->count-- and == 0 and redispatch_conn()
+ // stuff here. The moment we call return_io here we
+ // don't own *p anymore.
+ return_io_pending((io_pending_t *)p);
+
+ if (STAILQ_EMPTY(&be->io_head)) {
+ // TODO: suspicious of this code. audit harder?
+ stop = true;
+ } else {
+ p = STAILQ_FIRST(&be->io_head);
+ }
+
+ // mcmc_buffer_consume() - if leftover, keep processing
+ // IO's.
+ // if no more data in buffer, need to re-set stack head and re-set
+ // event.
+ remain = 0;
+ // TODO: do we need to yield every N reads?
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ P_DEBUG("%s: [next] remain: %d\n", __func__, remain);
+ be->state = mcp_backend_read;
+ if (remain != 0) {
+ // data trailing in the buffer, for a different request.
+ memmove(be->rbuf, newbuf, remain);
+ be->state = mcp_backend_parse;
+ P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain);
+ } else {
+ // need to read more data, buffer is empty.
+ stop = true;
+ }
+
+ break;
+ default:
+ // TODO: at some point (after v1?) this should attempt to recover,
+ // though we should only get here from memory corruption and
+ // bailing may be the right thing to do.
+ fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
+ assert(false);
+ } // switch
+ } // while
+
+ return flags;
+}
+
+// TODO: surface to option
+#define BACKEND_FAILURE_LIMIT 3
+
+// All we need to do here is schedule the backend to attempt to connect again.
+static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ assert(which & EV_TIMEOUT);
+ struct timeval tmp_time = be->event_thread->timeouts.retry;
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+}
+
+// currently just for timeouts, but certain errors should consider a backend
+// to be "bad" as well.
+// must be called before _reset_bad_backend(), so the backend is currently
+// clear.
+// TODO: currently only notes for "bad backends" in cases of timeouts or
+// connect failures. We need a specific connect() handler that executes a
+// "version" call to at least check that the backend isn't speaking garbage.
+// In theory backends can fail such that responses are constantly garbage,
+// but it's more likely an app is doing something bad and culling the backend
+// may prevent any other clients from talking to that backend. In
+// that case we need to track if clients are causing errors consistently and
+// block them instead. That's more challenging so leaving a note instead
+// of doing this now :)
+static void _backend_failed(mcp_backend_t *be) {
+ struct timeval tmp_time = be->event_thread->timeouts.retry;
+ if (++be->failed_count > BACKEND_FAILURE_LIMIT) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
+ } else {
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ }
+}
+
+// TODO: add a second argument for assigning a specific error to all pending
+// IO's (ie; timeout).
+// The backend has gotten into a bad state (timed out, protocol desync, or
+// some other supposedly unrecoverable error: purge the queue and
+// cycle the socket.
+// Note that some types of errors may not require flushing the queue and
+// should be fixed as they're figured out.
+// _must_ be called from within the event thread.
+static int _reset_bad_backend(mcp_backend_t *be) {
+ io_pending_proxy_t *io = NULL;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ // TODO: Unsure if this is the best way of surfacing errors to lua,
+ // but will do for V1.
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ }
+
+ STAILQ_INIT(&be->io_head);
+
+ mcmc_disconnect(be->client);
+ int status = mcmc_connect(be->client, be->ip, be->port, MCMC_OPTION_NONBLOCK);
+ if (status == MCMC_CONNECTED) {
+ // TODO: unexpected but lets let it be here.
+ be->connecting = false;
+ be->can_write = true;
+ } else if (status == MCMC_CONNECTING) {
+ be->connecting = true;
+ be->can_write = false;
+ } else {
+ // TODO: failed to immediately re-establish the connection.
+ // need to put the BE into a bad/retry state.
+ // FIXME: until we get an event to specifically handle connecting and
+ // bad server handling, attempt to force a reconnect here the next
+ // time a request comes through.
+ // The event thread will attempt to write to the backend, fail, then
+ // end up in this routine again.
+ be->connecting = false;
+ be->can_write = true;
+ }
+
+ // TODO: configure the event as necessary internally.
+
+ return 0;
+}
+
+static int _flush_pending_write(mcp_backend_t *be, io_pending_proxy_t *p) {
+ int flags = 0;
+
+ if (p->flushed) {
+ return 0;
+ }
+
+ ssize_t sent = 0;
+ // FIXME: original send function internally tracked how much was sent, but
+ // doing this here would require copying all of the iovecs or modify what
+ // we supply.
+ // this is probably okay but I want to leave a note here in case I get a
+ // better idea.
+ int status = mcmc_request_writev(be->client, p->iov, p->iovcnt, &sent, 1);
+ if (sent > 0) {
+ // we need to save progress in case of WANT_WRITE.
+ for (int x = 0; x < p->iovcnt; x++) {
+ struct iovec *iov = &p->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ sent = 0;
+ break;
+ }
+ }
+ }
+
+ // request_writev() returns WANT_WRITE if we haven't fully flushed.
+ if (status == MCMC_WANT_WRITE) {
+ // avoid syscalls for any other queued requests.
+ be->can_write = false;
+ flags = EV_WRITE;
+ // can't continue for now.
+ } else if (status != MCMC_OK) {
+ flags = -1;
+ // TODO: specific error code
+ // s->error = code?
+ } else {
+ flags = EV_READ;
+ p->flushed = true;
+ }
+
+ return flags;
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->timeouts.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ // TODO: move connect routine to its own function?
+ // - hard to do right now because we can't (easily?) edit libevent
+ // events.
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME: if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be);
+ _backend_failed(be);
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ io_pending_proxy_t *io = NULL;
+ int res = 0;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ res = _flush_pending_write(be, io);
+ if (res != -1) {
+ flags |= res;
+ if (flags & EV_WRITE) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ if (res == -1) {
+ _reset_bad_backend(be);
+ return;
+ }
+ }
+
+ if (which & EV_READ) {
+ // We do the syscall here before diving into the state machine to allow a
+ // common code path for io_uring/epoll
+ int res = 1;
+ int read = 0;
+ while (res > 0) {
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // need to input how much was read since last call
+ // needs _output_ of the buffer to read into and how much.
+ res = proxy_backend_drive_machine(be, read, &rbuf, &toread);
+ P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread);
+
+ if (res > 0) {
+ read = recv(mcmc_fd(be->client), rbuf, toread, 0);
+ P_DEBUG("%s: read: %d\n", __func__, read);
+ if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be);
+ return;
+ } else if (read == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break; // sit on epoll again.
+ } else {
+ _reset_bad_backend(be);
+ return;
+ }
+ }
+ } else if (res == -1) {
+ _reset_bad_backend(be);
+ return;
+ } else {
+ break;
+ }
+ }
+
+#ifdef PROXY_DEBUG
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ P_DEBUG("backend has leftover IOs: %d\n", be->depth);
+ }
+#endif
+ }
+
+ // Still pending requests to read or write.
+ // TODO: need to handle errors from above so we don't go to sleep here.
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ flags |= EV_READ; // FIXME: might not be necessary here, but ensures we get a disconnect event.
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+}
+
+static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) {
+ assert(c != NULL);
+ LIBEVENT_THREAD *thr = c->thread;
+ struct proxy_hook *hooks = thr->proxy_hooks;
+ lua_State *L = thr->L;
+ mcp_parser_t pr = {0};
+
+ // Avoid doing resp_start() here, instead do it a bit later or as-needed.
+ // This allows us to hop over to the internal text protocol parser, which
+ // also calls resp_start().
+ // Tighter integration later should obviate the need for this, it is not a
+ // permanent solution.
+ int ret = process_request(&pr, command, cmdlen);
+ if (ret != 0) {
+ WSTAT_INCR(c, proxy_conn_errors, 1);
+ if (!resp_start(c)) {
+ conn_set_state(c, conn_closing);
+ return;
+ }
+ proxy_out_errstring(c->resp, "parsing request");
+ if (ret == -2) {
+ // Kill connection on more critical parse failure.
+ conn_set_state(c, conn_closing);
+ }
+ return;
+ }
+
+ struct proxy_hook *hook = &hooks[pr.command];
+
+ if (!hook->is_lua) {
+ // need to pass our command string into the internal handler.
+ // to minimize the code change, this means allowing it to tokenize the
+ // full command. The proxy's indirect parser should be built out to
+ // become common code for both proxy and ascii handlers.
+ // For now this means we have to null-terminate the command string,
+ // then call into text protocol handler.
+ // FIXME: use a ptr or something; don't like this code.
+ if (cmdlen > 1 && command[cmdlen-2] == '\r') {
+ command[cmdlen-2] = '\0';
+ } else {
+ command[cmdlen-1] = '\0';
+ }
+ process_command_ascii(c, command);
+ return;
+ }
+
+ // Count requests handled by proxy vs local.
+ WSTAT_INCR(c, proxy_conn_requests, 1);
+
+ // If ascii multiget, we turn this into a self-calling loop :(
+ // create new request with next key, call this func again, then advance
+ // original string.
+ // might be better to split this function; the below bits turn into a
+ // function call, then we don't re-process the above bits in the same way?
+ // The way this is detected/passed on is very fragile.
+ if (!multiget && pr.cmd_type == CMD_TYPE_GET && pr.has_space) {
+ // TODO: need some way to abort this.
+ // FIXME: before the while loop, ensure pr.keytoken isn't too bug for
+ // the local temp buffer here.
+ uint32_t keyoff = pr.tokens[pr.keytoken];
+ while (pr.klen != 0) {
+ char temp[KEY_MAX_LENGTH + 30];
+ char *cur = temp;
+ // copy original request up until the original key token.
+ memcpy(cur, pr.request, pr.tokens[pr.keytoken]);
+ cur += pr.tokens[pr.keytoken];
+
+ // now copy in our "current" key.
+ memcpy(cur, &pr.request[keyoff], pr.klen);
+ cur += pr.klen;
+
+ memcpy(cur, "\r\n", 2);
+ cur += 2;
+
+ *cur = '\0';
+ P_DEBUG("%s: new multiget sub request: %s [%u/%u]\n", __func__, temp, keyoff, pr.klen);
+ proxy_process_command(c, temp, cur - temp, PROCESS_MULTIGET);
+
+ // now advance to the next key.
+ keyoff = _process_request_next_key(&pr);
+ }
+
+ if (!resp_start(c)) {
+ conn_set_state(c, conn_closing);
+ return;
+ }
+
+ // The above recursions should have created c->resp's in dispatch
+ // order.
+ // So now we add another one at the end to create the capping END
+ // string.
+ memcpy(c->resp->wbuf, ENDSTR, ENDLEN);
+ resp_add_iov(c->resp, c->resp->wbuf, ENDLEN);
+
+ return;
+ }
+
+ // We test the command length all the way down here because multigets can
+ // be very long, and they're chopped up by now.
+ if (cmdlen >= MCP_REQUEST_MAXLEN) {
+ WSTAT_INCR(c, proxy_conn_errors, 1);
+ if (!resp_start(c)) {
+ conn_set_state(c, conn_closing);
+ return;
+ }
+ proxy_out_errstring(c->resp, "request too long");
+ conn_set_state(c, conn_closing);
+ return;
+ }
+
+ if (!resp_start(c)) {
+ conn_set_state(c, conn_closing);
+ return;
+ }
+
+ // start a coroutine.
+ // TODO: This can pull from a cache.
+ lua_newthread(L);
+ lua_State *Lc = lua_tothread(L, -1);
+ // leave the thread first on the stack, so we can reference it if needed.
+ // pull the lua hook function onto the stack.
+ lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook->lua_ref);
+
+ mcp_request_t *rq = mcp_new_request(Lc, &pr, command, cmdlen);
+ if (multiget) {
+ rq->ascii_multiget = true;
+ }
+ // TODO: a better indicator of needing nread? pr->has_value?
+ // TODO: lift this to a post-processor?
+ if (rq->pr.vlen != 0) {
+ // relying on temporary malloc's not succumbing as poorly to
+ // fragmentation.
+ c->item = malloc(rq->pr.vlen);
+ if (c->item == NULL) {
+ lua_settop(L, 0);
+ proxy_out_errstring(c->resp, "out of memory");
+ return;
+ }
+ c->item_malloced = true;
+ c->ritem = c->item;
+ c->rlbytes = rq->pr.vlen;
+
+ conn_set_state(c, conn_nread);
+
+ // thread coroutine is still on (L, -1)
+ // FIXME: could get speedup from stashing Lc ptr.
+ return;
+ }
+
+ proxy_run_coroutine(Lc, c->resp, NULL, c);
+
+ lua_settop(L, 0); // clear anything remaining on the main thread.
+}
+
+// analogue for storage_get_item(); add a deferred IO object to the current
+// connection's response object. stack enough information to write to the
+// server on the submit callback, and enough to resume the lua state on the
+// completion callback.
+static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+
+ // stack: request, hash selector. latter just to hold a reference.
+
+ mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request");
+ mcp_backend_t *be = rq->be;
+
+ // Then we push a response object, which we'll re-use later.
+ // reserve one uservalue for a lua-supplied response.
+ mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
+ if (r == NULL) {
+ proxy_lua_error(Lc, "out of memory allocating response");
+ return;
+ }
+ // FIXME: debugging?
+ memset(r, 0, sizeof(mcp_resp_t));
+ // TODO: check *r
+ r->buf = NULL;
+ r->blen = 0;
+ r->start = rq->start; // need to inherit the original start time.
+ int x;
+ int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
+ for (x = 0; x < end; x++) {
+ if (rq->pr.request[x] == ' ') {
+ break;
+ }
+ r->cmd[x] = rq->pr.request[x];
+ }
+ r->cmd[x] = '\0';
+
+ luaL_getmetatable(Lc, "mcp.response");
+ lua_setmetatable(Lc, -2);
+
+ io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
+ // FIXME: can this fail?
+
+ // this is a re-cast structure, so assert that we never outsize it.
+ assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
+ memset(p, 0, sizeof(io_pending_proxy_t));
+ // set up back references.
+ p->io_queue_type = IO_QUEUE_PROXY;
+ p->thread = c->thread;
+ p->c = c;
+ p->resp = resp;
+ p->client_resp = r;
+ p->flushed = false;
+ p->ascii_multiget = rq->ascii_multiget;
+ resp->io_pending = (io_pending_t *)p;
+
+ // top of the main thread should be our coroutine.
+ // lets grab a reference to it and pop so it doesn't get gc'ed.
+ p->coro_ref = coro_ref;
+
+ // we'll drop the pointer to the coro on here to save some CPU
+ // on re-fetching it later. The pointer shouldn't change.
+ p->coro = Lc;
+
+ // The direct backend object. Lc is holding the reference in the stack
+ p->backend = be;
+
+ mcp_request_attach(Lc, rq, p);
+
+ // link into the batch chain.
+ p->next = q->stack_ctx;
+ q->stack_ctx = p;
+
+ return;
+}
+
+/******** LUA INTERFACE FUNCTIONS ******/
+
+__attribute__((unused)) static void dump_stack(lua_State *L) {
+ int top = lua_gettop(L);
+ int i = 1;
+ fprintf(stderr, "--TOP OF STACK [%d]\n", top);
+ for (; i < top + 1; i++) {
+ int type = lua_type(L, i);
+ // lets find the metatable of this userdata to identify it.
+ if (lua_getmetatable(L, i) != 0) {
+ lua_pushstring(L, "__name");
+ if (lua_rawget(L, -2) != LUA_TNIL) {
+ fprintf(stderr, "--|%d| [%s] (%s)\n", i, lua_typename(L, type), lua_tostring(L, -1));
+ lua_pop(L, 2);
+ continue;
+ }
+ lua_pop(L, 2);
+ }
+ if (type == LUA_TSTRING) {
+ fprintf(stderr, "--|%d| [%s] | %s\n", i, lua_typename(L, type), lua_tostring(L, i));
+ } else {
+ fprintf(stderr, "--|%d| [%s]\n", i, lua_typename(L, type));
+ }
+ }
+ fprintf(stderr, "-----------------\n");
+}
+
+// func prototype example:
+// static int fname (lua_State *L)
+// normal library open:
+// int luaopen_mcp(lua_State *L) { }
+
+// resp:ok()
+static int mcplib_response_ok(lua_State *L) {
+ mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
+
+ if (r->status == MCMC_OK) {
+ lua_pushboolean(L, 1);
+ } else {
+ lua_pushboolean(L, 0);
+ }
+
+ return 1;
+}
+
+static int mcplib_response_hit(lua_State *L) {
+ mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
+
+ if (r->status == MCMC_OK && r->resp.code != MCMC_CODE_MISS) {
+ lua_pushboolean(L, 1);
+ } else {
+ lua_pushboolean(L, 0);
+ }
+
+ return 1;
+}
+
+static int mcplib_response_gc(lua_State *L) {
+ mcp_resp_t *r = luaL_checkudata(L, -1, "mcp.response");
+
+ // On error/similar we might be holding the read buffer.
+ // If the buf is handed off to mc_resp for return, this pointer is NULL
+ if (r->buf != NULL) {
+ free(r->buf);
+ }
+
+ return 0;
+}
+
+static int mcplib_backend_gc(lua_State *L) {
+ mcp_backend_t *be = luaL_checkudata(L, -1, "mcp.backend");
+
+ // TODO: need to validate it's impossible to cause a backend to be garbage
+ // collected while outstanding requests exist.
+ // might need some kind of failsafe here to leak memory and warn instead
+ // of killing the object and crashing? or is that too late since we're in
+ // __gc?
+ assert(STAILQ_EMPTY(&be->io_head));
+
+ mcmc_disconnect(be->client);
+ free(be->client);
+
+ return 0;
+}
+
+static int mcplib_backend(lua_State *L) {
+ luaL_checkstring(L, -4); // label for indexing backends.
+ const char *ip = luaL_checkstring(L, -3); // FIXME: checklstring?
+ const char *port = luaL_checkstring(L, -2);
+ double weight = luaL_checknumber(L, -1);
+
+ // first check our reference table to compare.
+ lua_pushvalue(L, -4);
+ int ret = lua_gettable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
+ if (ret != LUA_TNIL) {
+ mcp_backend_t *be_orig = luaL_checkudata(L, -1, "mcp.backend");
+ if (strncmp(be_orig->ip, ip, MAX_IPLEN) == 0
+ && strncmp(be_orig->port, port, MAX_PORTLEN) == 0
+ && be_orig->weight == weight) {
+ // backend is the same, return it.
+ return 1;
+ } else {
+ // backend not the same, pop from stack and make new one.
+ lua_pop(L, 1);
+ }
+ } else {
+ lua_pop(L, 1);
+ }
+
+ // This might shift to internal objects?
+ mcp_backend_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_t), 0);
+ if (be == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend");
+ return 0;
+ }
+
+ // FIXME: remove some of the excess zero'ing below?
+ memset(be, 0, sizeof(mcp_backend_t));
+ strncpy(be->ip, ip, MAX_IPLEN);
+ strncpy(be->port, port, MAX_PORTLEN);
+ be->weight = weight;
+ be->depth = 0;
+ be->rbuf = NULL;
+ be->failed_count = 0;
+ STAILQ_INIT(&be->io_head);
+ be->state = mcp_backend_read;
+ be->connecting = false;
+ be->can_write = false;
+ be->stacked = false;
+ be->bad = false;
+
+ // this leaves a permanent buffer on the backend, which is fine
+ // unless you have billions of backends.
+ // we can later optimize for pulling buffers from idle backends.
+ be->rbuf = malloc(READ_BUFFER_SIZE);
+ if (be->rbuf == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend");
+ return 0;
+ }
+
+ // initialize libevent.
+ memset(&be->event, 0, sizeof(be->event));
+
+ // initialize the client
+ be->client = malloc(mcmc_size(MCMC_OPTION_BLANK));
+ if (be->client == NULL) {
+ proxy_lua_error(L, "out of memory allocating backend");
+ return 0;
+ }
+ // TODO: connect elsewhere. When there're multiple backend owners, or
+ // sockets per backend, etc. We'll want to kick off connects as use time.
+ int status = mcmc_connect(be->client, be->ip, be->port, MCMC_OPTION_NONBLOCK);
+ if (status == MCMC_CONNECTED) {
+ // FIXME: is this possible? do we ever want to allow blocking
+ // connections?
+ proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->ip, be->port);
+ return 0;
+ } else if (status == MCMC_CONNECTING) {
+ be->connecting = true;
+ be->can_write = false;
+ } else {
+ proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->ip, be->port);
+ return 0;
+ }
+
+ luaL_getmetatable(L, "mcp.backend");
+ lua_setmetatable(L, -2); // set metatable to userdata.
+
+ lua_pushvalue(L, 1); // put the label at the top for settable later.
+ lua_pushvalue(L, -2); // copy the backend reference to the top.
+ // set our new backend object into the reference table.
+ lua_settable(L, lua_upvalueindex(MCP_BACKEND_UPVALUE));
+ // stack is back to having backend on the top.
+
+ return 1;
+}
+
+static int mcplib_pool_gc(lua_State *L) {
+ mcp_pool_t *p = luaL_checkudata(L, -1, "mcp.pool");
+ assert(p->refcount == 0);
+ pthread_mutex_destroy(&p->lock);
+
+ return 0;
+}
+
+// p = mcp.pool(backends, { dist = f, hashfilter = f, seed = "a", hash = f })
+// TODO: hash and hashfilter
+static int mcplib_pool(lua_State *L) {
+ int argc = lua_gettop(L);
+ luaL_checktype(L, 1, LUA_TTABLE);
+ int n = luaL_len(L, 1); // get length of array table
+
+ mcp_pool_t *p = lua_newuserdatauv(L, sizeof(mcp_pool_t) + sizeof(mcp_pool_be_t) * n, 0);
+ // TODO: check null.
+ // FIXME: zero the memory? then __gc will fix up server references on
+ // errors.
+ p->pool_size = n;
+ p->refcount = 0;
+ pthread_mutex_init(&p->lock, NULL);
+ p->ctx = settings.proxy_ctx; // TODO: store ctx in upvalue.
+
+ luaL_setmetatable(L, "mcp.pool");
+
+ lua_pushvalue(L, -1); // dupe self for reference.
+ p->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ // TODO: ensure to increment refcounts for servers.
+ // remember lua arrays are 1 indexed.
+ for (int x = 1; x <= n; x++) {
+ mcp_pool_be_t *s = &p->pool[x-1];
+ lua_geti(L, 1, x); // get next server into the stack.
+ // TODO: do we leak memory if we bail here?
+ // the stack should clear, then release the userdata + etc?
+ // - yes it should leak memory for the registry indexed items.
+ s->be = luaL_checkudata(L, -1, "mcp.backend");
+ s->ref = luaL_ref(L, LUA_REGISTRYINDEX); // references and pops object.
+ }
+
+ if (argc == 1) {
+ // Use default hash selector if none given.
+ p->phc = mcplib_hashfunc_murmur3;
+ return 1;
+ }
+
+ // Supplied with an options table. We inspect this table to decorate the
+ // pool, then pass it along to the a constructor if necessary.
+ luaL_checktype(L, 2, LUA_TTABLE);
+
+ // stack: backends, options, mcp.pool
+ if (lua_getfield(L, 2, "dist") != LUA_TNIL) {
+ // overriding the distribution function.
+ luaL_checktype(L, -1, LUA_TTABLE);
+ if (lua_getfield(L, -1, "new") != LUA_TFUNCTION) {
+ proxy_lua_error(L, "key distribution object missing 'new' function");
+ return 0;
+ }
+
+ // - now create the copy pool table
+ lua_createtable(L, p->pool_size, 0); // give the new pool table a sizing hint.
+ for (int x = 1; x <= p->pool_size; x++) {
+ mcp_backend_t *be = p->pool[x-1].be;
+ lua_createtable(L, 0, 4);
+ // stack = [p, h, f, optN, newpool, backend]
+ // the key should be fine for id? maybe don't need to duplicate
+ // this?
+ lua_pushinteger(L, x);
+ lua_setfield(L, -2, "id");
+ // we don't use the hostname for ketama hashing
+ // so passing ip for hostname is fine
+ lua_pushstring(L, be->ip);
+ lua_setfield(L, -2, "hostname");
+ lua_pushstring(L, be->ip);
+ lua_setfield(L, -2, "addr");
+ lua_pushstring(L, be->port);
+ lua_setfield(L, -2, "port");
+ // TODO: weight/etc?
+
+ // set the backend table into the new pool table.
+ lua_rawseti(L, -2, x);
+ }
+
+ // we can either use lua_insert() or possibly _rotate to shift
+ // things into the right place, but simplest is to just copy the
+ // option arg to the end of the stack.
+ lua_pushvalue(L, 2);
+ // - stack should be: pool, opts, func, pooltable, opts
+
+ // call the dist new function.
+ int res = lua_pcall(L, 2, 2, 0);
+
+ if (res != LUA_OK) {
+ lua_error(L); // error should be on the stack already.
+ return 0;
+ }
+
+ // TODO: validate response arguments.
+ // -1 is lightuserdata ptr to the struct (which must be owned by the
+ // userdata), which is later used for internal calls.
+ struct proxy_hash_caller *phc;
+ phc = lua_touserdata(L, -1);
+ memcpy(&p->phc, phc, sizeof(*phc));
+ lua_pop(L, 1);
+ // -2 was userdata we need to hold a reference to
+ p->phc_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ // UD now popped from stack.
+
+ lua_pop(L, 1); // remove the dist table from stack.
+ } else {
+ lua_pop(L, 1); // pop the nil.
+ }
+
+ // TODO: getfield("filter") and add the function
+ // TODO: getfield("hash") and add the function
+ // TODO: getfield("seed") and use the hash function to calculate a seed
+ // value.
+
+ return 1;
+}
+
+static int mcplib_pool_proxy_gc(lua_State *L) {
+ mcp_pool_proxy_t *pp = luaL_checkudata(L, -1, "mcp.pool_proxy");
+ mcp_pool_t *p = pp->main;
+ pthread_mutex_lock(&p->lock);
+ p->refcount--;
+ if (p->refcount == 0) {
+ proxy_ctx_t *ctx = p->ctx;
+ pthread_mutex_lock(&ctx->manager_lock);
+ STAILQ_INSERT_TAIL(&ctx->manager_head, p, next);
+ pthread_cond_signal(&ctx->manager_cond);
+ pthread_mutex_unlock(&ctx->manager_lock);
+ }
+ pthread_mutex_unlock(&p->lock);
+
+ return 0;
+}
+
+// hashfunc(request) -> backend(request)
+// needs key from request object.
+static int mcplib_pool_proxy_call(lua_State *L) {
+ // internal args are the hash selector (self)
+ mcp_pool_proxy_t *pp = luaL_checkudata(L, -2, "mcp.pool_proxy");
+ mcp_pool_t *p = pp->main;
+ // then request object.
+ mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
+
+ // we have a fast path to the key/length.
+ // FIXME: indicator for if request actually has a key token or not.
+ const char *key = MCP_PARSER_KEY(rq->pr);
+ size_t len = rq->pr.klen;
+ uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx);
+
+ // attach the backend to the request object.
+ // save CPU cycles over rolling it through lua.
+ if (p->phc.ctx == NULL) {
+ // TODO: if NULL, pass in pool_size as ctx?
+ // works because the % bit will return an id we can index here.
+ // FIXME: temporary? maybe?
+ // if no context, what we got back was a hash which we need to modulus
+ // against the pool, since the func has no info about the pool.
+ rq->be = p->pool[lookup % p->pool_size].be;
+ } else {
+ // else we have a direct id into our pool.
+ // the lua modules should "think" in 1 based indexes, so we need to
+ // subtract one here.
+ // TODO: bother validating the range?
+ rq->be = p->pool[lookup-1].be;
+ }
+
+ // now yield request, hash selector up.
+ return lua_yield(L, 2);
+}
+
+// TODO: take fractional time and convert.
+static int mcplib_backend_connect_timeout(lua_State *L) {
+ int seconds = luaL_checkinteger(L, -1);
+ proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue.
+
+ STAT_L(ctx);
+ ctx->timeouts.connect.tv_sec = seconds;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+static int mcplib_backend_retry_timeout(lua_State *L) {
+ int seconds = luaL_checkinteger(L, -1);
+ proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue.
+
+ STAT_L(ctx);
+ ctx->timeouts.retry.tv_sec = seconds;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+static int mcplib_backend_read_timeout(lua_State *L) {
+ int seconds = luaL_checkinteger(L, -1);
+ proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME: get global ctx reference in thread/upvalue.
+
+ STAT_L(ctx);
+ ctx->timeouts.read.tv_sec = seconds;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+// mcp.attach(mcp.HOOK_NAME, function)
+// fill hook structure: if lua function, use luaL_ref() to store the func
+static int mcplib_attach(lua_State *L) {
+ // Pull the original worker thread out of the shared mcplib upvalue.
+ LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
+
+ int hook = luaL_checkinteger(L, -2);
+ // pushvalue to dupe func and etc.
+ // can leave original func on stack afterward because it'll get cleared.
+ int loop_end = 0;
+ int loop_start = 1;
+ if (hook == CMD_ANY) {
+ // if CMD_ANY we need individually set loop 1 to CMD_SIZE.
+ loop_end = CMD_SIZE;
+ } else if (hook == CMD_ANY_STORAGE) {
+ // if CMD_ANY_STORAGE we only override get/set/etc.
+ loop_end = CMD_END_STORAGE;
+ } else {
+ loop_start = hook;
+ loop_end = hook + 1;
+ }
+
+ if (lua_isfunction(L, -1)) {
+ struct proxy_hook *hooks = t->proxy_hooks;
+
+ for (int x = loop_start; x < loop_end; x++) {
+ struct proxy_hook *h = &hooks[x];
+ lua_pushvalue(L, -1); // duplicate the function for the ref.
+ if (h->lua_ref) {
+ // remove existing reference.
+ luaL_unref(L, LUA_REGISTRYINDEX, h->lua_ref);
+ }
+
+ // pops the function from the stack and leaves us a ref. for later.
+ h->lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ h->is_lua = true;
+ }
+ } else {
+ proxy_lua_error(L, "Must pass a function to mcp.attach");
+ return 0;
+ }
+
+ return 0;
+}
+
+static void proxy_register_defines(lua_State *L) {
+#define X(x) \
+ lua_pushinteger(L, x); \
+ lua_setfield(L, -2, #x);
+
+ X(P_OK);
+ X(CMD_ANY);
+ X(CMD_ANY_STORAGE);
+ CMD_FIELDS
+#undef X
+}
+
+/*** REQUEST PARSER AND OBJECT ***/
+
+// Find the starting offsets of each token; ignoring length.
+// This creates a fast small (<= cacheline) index into the request,
+// where we later scan or directly feed data into API's.
+static int _process_tokenize(mcp_parser_t *pr, const size_t max) {
+ const char *s = pr->request;
+ int len = pr->reqlen - 2;
+ // FIXME: die if reqlen too long.
+ // reqlen could be huge if multiget so... need some special casing?
+ const char *end = s + len;
+ int curtoken = 0;
+
+ int state = 0;
+ while (s != end) {
+ switch (state) {
+ case 0:
+ if (*s != ' ') {
+ pr->tokens[curtoken] = s - pr->request;
+ if (++curtoken == max) {
+ goto endloop;
+ }
+ state = 1;
+ }
+ s++;
+ break;
+ case 1:
+ if (*s != ' ') {
+ s++;
+ } else {
+ state = 0;
+ }
+ break;
+ }
+ }
+endloop:
+
+ pr->ntokens = curtoken;
+ P_DEBUG("%s: cur_tokens: %d\n", __func__, curtoken);
+
+ return 0;
+}
+
+static int _process_token_len(mcp_parser_t *pr, size_t token) {
+ const char *cur = pr->request + pr->tokens[token];
+ int remain = pr->reqlen - pr->tokens[token] - 2; // CRLF
+
+ const char *s = memchr(cur, ' ', remain);
+ return (s != NULL) ? s - cur : remain;
+}
+
+static int _process_request_key(mcp_parser_t *pr) {
+ pr->klen = _process_token_len(pr, pr->keytoken);
+ // advance the parser in case of multikey.
+ pr->parsed = pr->tokens[pr->keytoken] + pr->klen + 1;
+
+ if (pr->request[pr->parsed-1] == ' ') {
+ P_DEBUG("%s: request_key found extra space\n", __func__);
+ pr->has_space = true;
+ } else {
+ pr->has_space = false;
+ }
+ return 0;
+}
+
+// Just for ascii multiget: search for next "key" beyond where we stopped
+// tokenizing before.
+// Returns the offset for the next key.
+static size_t _process_request_next_key(mcp_parser_t *pr) {
+ const char *cur = pr->request + pr->parsed;
+ int remain = pr->reqlen - pr->parsed - 2;
+
+ // chew off any leading whitespace.
+ while (remain) {
+ if (*cur == ' ') {
+ remain--;
+ cur++;
+ pr->parsed++;
+ } else {
+ break;
+ }
+ }
+
+ const char *s = memchr(cur, ' ', remain);
+ if (s != NULL) {
+ pr->klen = s - cur;
+ pr->parsed += s - cur;
+ } else {
+ pr->klen = remain;
+ pr->parsed += remain;
+ }
+
+ return cur - pr->request;
+}
+
+// for fast testing of existence of meta flags.
+// meta has all flags as final tokens
+static int _process_request_metaflags(mcp_parser_t *pr, int token) {
+ if (pr->ntokens <= token) {
+ pr->t.meta.flags = 0; // no flags found.
+ return 0;
+ }
+ const char *cur = pr->request + pr->tokens[token];
+ const char *end = pr->request + pr->reqlen - 2;
+
+ // We blindly convert flags into // bits, since the range of possible
+ // flags is deliberately < 64.
+ int state = 0;
+ while (cur != end) {
+ switch (state) {
+ case 0:
+ if (*cur == ' ') {
+ cur++;
+ } else {
+ if (*cur < 65 || *cur > 122) {
+ return -1;
+ }
+ P_DEBUG("%s: setting meta flag: %d\n", __func__, *cur - 65);
+ pr->t.meta.flags |= 1 << (*cur - 65);
+ state = 1;
+ }
+ break;
+ case 1:
+ if (*cur != ' ') {
+ cur++;
+ } else {
+ state = 0;
+ }
+ break;
+ }
+ }
+
+ return 0;
+}
+
+// All meta commands are of form: "cm key f l a g S100"
+static int _process_request_meta(mcp_parser_t *pr) {
+ _process_tokenize(pr, PARSER_MAX_TOKENS);
+ if (pr->ntokens < 2) {
+ P_DEBUG("%s: not enough tokens for meta command: %d\n", __func__, pr->ntokens);
+ return -1;
+ }
+ pr->keytoken = 1;
+ _process_request_key(pr);
+
+ // pass the first flag token.
+ return _process_request_metaflags(pr, 2);
+}
+
+// ms <key> <datalen> <flags>*\r\n
+static int _process_request_mset(mcp_parser_t *pr) {
+ _process_tokenize(pr, PARSER_MAX_TOKENS);
+ if (pr->ntokens < 3) {
+ P_DEBUG("%s: not enough tokens for meta set command: %d\n", __func__, pr->ntokens);
+ return -1;
+ }
+ pr->keytoken = 1;
+ _process_request_key(pr);
+
+ const char *cur = pr->request + pr->tokens[2];
+
+ errno = 0;
+ char *n = NULL;
+ int vlen = strtol(cur, &n, 10);
+ if ((errno == ERANGE) || (cur == n)) {
+ return -1;
+ }
+
+ if (vlen < 0 || vlen > (INT_MAX - 2)) {
+ return -1;
+ }
+ vlen += 2;
+
+ pr->vlen = vlen;
+
+ // pass the first flag token
+ return _process_request_metaflags(pr, 3);
+}
+
+// gat[s] <exptime> <key>*\r\n
+static int _process_request_gat(mcp_parser_t *pr) {
+ _process_tokenize(pr, 3);
+ if (pr->ntokens < 3) {
+ P_DEBUG("%s: not enough tokens for GAT: %d\n", __func__, pr->ntokens);
+ return -1;
+ }
+
+ pr->keytoken = 2;
+ _process_request_key(pr);
+ return 0;
+}
+
+// we need t find the bytes supplied immediately so we can read the request
+// from the client properly.
+// set <key> <flags> <exptime> <bytes> [noreply]\r\n
+static int _process_request_storage(mcp_parser_t *pr, size_t max) {
+ _process_tokenize(pr, max);
+ if (pr->ntokens < 5) {
+ P_DEBUG("%s: not enough tokens to storage command: %d\n", __func__, pr->ntokens);
+ return -1;
+ }
+ pr->keytoken = 1;
+ _process_request_key(pr);
+
+ errno = 0;
+ char *n = NULL;
+ const char *cur = pr->request + pr->tokens[4];
+
+ int vlen = strtol(cur, &n, 10);
+ if ((errno == ERANGE) || (cur == n)) {
+ return -1;
+ }
+
+ if (vlen < 0 || vlen > (INT_MAX - 2)) {
+ return -1;
+ }
+ vlen += 2;
+
+ pr->vlen = vlen;
+
+ return 0;
+}
+
+// common request with key: <cmd> <key> <args>
+static int _process_request_simple(mcp_parser_t *pr, const size_t max) {
+ _process_tokenize(pr, max);
+ pr->keytoken = 1; // second token is usually the key... stupid GAT.
+
+ _process_request_key(pr);
+ return 0;
+}
+
+// TODO: return code ENUM with error types.
+// FIXME: the mcp_parser_t bits have ended up being more fragile than I hoped.
+// careful zero'ing is required. revisit?
+static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen) {
+ // we want to "parse in place" as much as possible, which allows us to
+ // forward an unmodified request without having to rebuild it.
+
+ const char *cm = command;
+ size_t cl = 0;
+
+ const char *s = memchr(command, ' ', cmdlen-2);
+ // TODO: has_space -> has_tokens
+ // has_space resered for ascii multiget?
+ if (s != NULL) {
+ cl = s - command;
+ } else {
+ cl = cmdlen - 2; // FIXME: ensure cmdlen can never be < 2?
+ }
+ pr->has_space = false;
+ pr->parsed = cl + 1;
+ pr->request = command;
+ pr->reqlen = cmdlen;
+ int token_max = PARSER_MAX_TOKENS;
+
+ //pr->vlen = 0; // FIXME: remove this once set indicator is decided
+ int cmd = -1;
+ int type = CMD_TYPE_GENERIC;
+ int ret = 0;
+
+ switch (cl) {
+ case 0:
+ case 1:
+ // falls through with cmd as -1. should error.
+ break;
+ case 2:
+ if (cm[0] == 'm') {
+ switch (cm[1]) {
+ case 'g':
+ cmd = CMD_MG;
+ ret = _process_request_meta(pr);
+ break;
+ case 's':
+ cmd = CMD_MS;
+ ret = _process_request_mset(pr);
+ break;
+ case 'd':
+ cmd = CMD_MD;
+ ret = _process_request_meta(pr);
+ break;
+ case 'n':
+ // TODO: do we route/handle NOP's at all?
+ // they should simply reflect to the client.
+ cmd = CMD_MN;
+ break;
+ case 'a':
+ cmd = CMD_MA;
+ ret = _process_request_meta(pr);
+ break;
+ case 'e':
+ cmd = CMD_ME;
+ // TODO: not much special processing here; binary keys
+ ret = _process_request_meta(pr);
+ break;
+ }
+ }
+ break;
+ case 3:
+ if (cm[0] == 'g') {
+ if (cm[1] == 'e' && cm[2] == 't') {
+ cmd = CMD_GET;
+ type = CMD_TYPE_GET;
+ token_max = 2; // don't chew through multigets.
+ ret = _process_request_simple(pr, 2);
+ }
+ if (cm[1] == 'a' && cm[2] == 't') {
+ type = CMD_TYPE_GET;
+ cmd = CMD_GAT;
+ token_max = 2; // don't chew through multigets.
+ ret = _process_request_gat(pr);
+ }
+ } else if (cm[0] == 's' && cm[1] == 'e' && cm[2] == 't') {
+ cmd = CMD_SET;
+ ret = _process_request_storage(pr, token_max);
+ } else if (cm[0] == 'a' && cm[1] == 'd' && cm[2] == 'd') {
+ cmd = CMD_ADD;
+ ret = _process_request_storage(pr, token_max);
+ } else if (cm[0] == 'c' && cm[1] == 'a' && cm[2] == 's') {
+ cmd = CMD_CAS;
+ ret = _process_request_storage(pr, token_max);
+ }
+ break;
+ case 4:
+ if (strncmp(cm, "gets", 4) == 0) {
+ cmd = CMD_GETS;
+ type = CMD_TYPE_GET;
+ token_max = 2; // don't chew through multigets.
+ ret = _process_request_simple(pr, 2);
+ } else if (strncmp(cm, "incr", 4) == 0) {
+ cmd = CMD_INCR;
+ ret = _process_request_simple(pr, 4);
+ } else if (strncmp(cm, "decr", 4) == 0) {
+ cmd = CMD_DECR;
+ ret = _process_request_simple(pr, 4);
+ } else if (strncmp(cm, "gats", 4) == 0) {
+ cmd = CMD_GATS;
+ type = CMD_TYPE_GET;
+ ret = _process_request_gat(pr);
+ } else if (strncmp(cm, "quit", 4) == 0) {
+ cmd = CMD_QUIT;
+ }
+ break;
+ case 5:
+ if (strncmp(cm, "touch", 5) == 0) {
+ cmd = CMD_TOUCH;
+ ret = _process_request_simple(pr, 4);
+ } else if (strncmp(cm, "stats", 5) == 0) {
+ cmd = CMD_STATS;
+ // Don't process a key; fetch via arguments.
+ _process_tokenize(pr, token_max);
+ } else if (strncmp(cm, "watch", 5) == 0) {
+ cmd = CMD_WATCH;
+ _process_tokenize(pr, token_max);
+ }
+ break;
+ case 6:
+ if (strncmp(cm, "delete", 6) == 0) {
+ cmd = CMD_DELETE;
+ ret = _process_request_simple(pr, 4);
+ } else if (strncmp(cm, "append", 6) == 0) {
+ cmd = CMD_APPEND;
+ ret = _process_request_storage(pr, token_max);
+ }
+ break;
+ case 7:
+ if (strncmp(cm, "replace", 7) == 0) {
+ cmd = CMD_REPLACE;
+ ret = _process_request_storage(pr, token_max);
+ } else if (strncmp(cm, "prepend", 7) == 0) {
+ cmd = CMD_PREPEND;
+ ret = _process_request_storage(pr, token_max);
+ } else if (strncmp(cm, "version", 7) == 0) {
+ cmd = CMD_VERSION;
+ _process_tokenize(pr, token_max);
+ }
+ break;
+ }
+
+ // TODO: log more specific error code.
+ if (cmd == -1 || ret != 0) {
+ return -1;
+ }
+
+ pr->command = cmd;
+ pr->cmd_type = type;
+
+ return 0;
+}
+
+// FIXME: any reason to pass in command/cmdlen separately?
+static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen) {
+ // reserving an upvalue for key.
+ mcp_request_t *rq = lua_newuserdatauv(L, sizeof(mcp_request_t) + MCP_REQUEST_MAXLEN * 2 + KEY_MAX_LENGTH, 1);
+ memset(rq, 0, sizeof(mcp_request_t));
+ memcpy(&rq->pr, pr, sizeof(*pr));
+
+ memcpy(rq->request, command, cmdlen);
+ rq->pr.request = rq->request;
+ rq->pr.reqlen = cmdlen;
+ gettimeofday(&rq->start, NULL);
+
+ luaL_getmetatable(L, "mcp.request");
+ lua_setmetatable(L, -2);
+
+ // at this point we should know if we have to bounce through _nread to
+ // get item data or not.
+ return rq;
+}
+
+// TODO:
+// if modified, this will re-serialize every time it's accessed.
+// a simple opt could copy back over the original space
+// a "better" one could A/B the request ptr and clear the modified state
+// each time it gets serialized.
+static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p) {
+ mcp_parser_t *pr = &rq->pr;
+ char *r = (char *) pr->request;
+ size_t len = pr->reqlen;
+
+ // one or more of the tokens were changed
+ if (rq->was_modified) {
+ assert(rq->tokent_ref);
+ // option table to top of stack.
+ lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref);
+
+ // space was reserved in case of modification.
+ char *nr = rq->request + MCP_REQUEST_MAXLEN;
+ r = nr;
+ char *or = NULL;
+
+ for (int x = 0; x < pr->ntokens; x++) {
+ const char *newtok = NULL;
+ size_t newlen = 0;
+ if (x != 0 && x != pr->keytoken) {
+ int type = lua_rawgeti(L, -1, x+1);
+ if (type != LUA_TNIL) {
+ newtok = lua_tolstring(L, -1, &newlen);
+ memcpy(nr, newtok, newlen);
+ nr += newlen;
+ }
+ lua_pop(L, 1);
+ }
+
+ if (newtok == NULL) {
+ // TODO: if we add an extra "end" token that's just reqlen we can
+ // memcpy... however most args are short and that may not be worth
+ // it.
+ or = rq->request + pr->tokens[x];
+ // will walk past the end without the \r test.
+ // if we add the end token trick this can be changed.
+ while (*or != ' ' && *or != '\r') {
+ *nr = *or;
+ nr++;
+ or++;
+ }
+ }
+ *nr = ' ';
+ nr++;
+ }
+ // tag the end bits.
+ memcpy(nr-1, "\r\n\0", 3);
+ nr++;
+
+ len = nr - (rq->request + MCP_REQUEST_MAXLEN);
+ lua_pop(L, 1); // pop the table
+ }
+
+ // The stringified request. This is also referencing into the coroutine
+ // stack, which should be safe from gc.
+ p->iov[0].iov_base = r;
+ p->iov[0].iov_len = len;
+ p->iovcnt = 1;
+ if (pr->vlen != 0) {
+ p->iov[1].iov_base = pr->vbuf;
+ p->iov[1].iov_len = pr->vlen;
+ p->iovcnt = 2;
+ }
+
+}
+
+// second argument is optional, for building set requests.
+// TODO: append the \r\n for the VAL?
+static int mcplib_request(lua_State *L) {
+ size_t len = 0;
+ size_t vlen = 0;
+ mcp_parser_t pr = {0};
+ const char *cmd = luaL_checklstring(L, 1, &len);
+ const char *val = luaL_optlstring(L, 2, NULL, &vlen);
+
+ // FIXME: if we inline the userdata we can avoid memcpy'ing the parser
+ // structure from the stack? but causes some code duplication.
+ if (process_request(&pr, cmd, len) != 0) {
+ proxy_lua_error(L, "failed to parse request");
+ return 0;
+ }
+ mcp_request_t *rq = mcp_new_request(L, &pr, cmd, len);
+
+ if (val != NULL) {
+ rq->pr.vlen = vlen;
+ rq->pr.vbuf = malloc(vlen);
+ // TODO: check malloc failure.
+ memcpy(rq->pr.vbuf, val, vlen);
+ }
+ gettimeofday(&rq->start, NULL);
+
+ // rq is now created, parsed, and on the stack.
+ if (rq == NULL) {
+ // TODO: lua error.
+ }
+ return 1;
+}
+
+static int mcplib_request_key(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
+ lua_pushlstring(L, MCP_PARSER_KEY(rq->pr), rq->pr.klen);
+ return 1;
+}
+
+// NOTE: I've mixed up const/non-const strings in the request. During parsing
+// we want it to be const, but after that's done the request is no longer
+// const. It might be better to just remove the const higher up the chain, but
+// I'd rather not. So for now these functions will be dumping the const to
+// modify the string.
+static int mcplib_request_ltrimkey(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request");
+ int totrim = luaL_checkinteger(L, -1);
+ char *key = (char *) MCP_PARSER_KEY(rq->pr);
+
+ if (totrim > rq->pr.klen) {
+ proxy_lua_error(L, "ltrimkey cannot zero out key");
+ return 0;
+ } else {
+ memset(key, ' ', totrim);
+ rq->pr.klen -= totrim;
+ rq->pr.tokens[rq->pr.keytoken] += totrim;
+ }
+ return 1;
+}
+
+static int mcplib_request_rtrimkey(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, -2, "mcp.request");
+ int totrim = luaL_checkinteger(L, -1);
+ char *key = (char *) MCP_PARSER_KEY(rq->pr);
+
+ if (totrim > rq->pr.klen) {
+ proxy_lua_error(L, "rtrimkey cannot zero out key");
+ return 0;
+ } else {
+ memset(key + (rq->pr.klen - totrim), ' ', totrim);
+ rq->pr.klen -= totrim;
+ // don't need to change the key token.
+ }
+ return 1;
+}
+
+// Virtual table operations on the request.
+static int mcplib_request_token(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
+ int argc = lua_gettop(L);
+
+ if (argc == 1) {
+ lua_pushnil(L);
+ return 1;
+ }
+
+ int token = luaL_checkinteger(L, 2);
+
+ if (token < 1 || token > rq->pr.ntokens) {
+ // maybe an error?
+ lua_pushnil(L);
+ return 1;
+ }
+
+ // we hold overwritten or parsed tokens in a lua table.
+ if (rq->tokent_ref == 0) {
+ // create a presized table that can hold our tokens.
+ lua_createtable(L, rq->pr.ntokens, 0);
+ // duplicate value to set back
+ lua_pushvalue(L, -1);
+ rq->tokent_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ } else {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, rq->tokent_ref);
+ }
+ // top of stack should be token table.
+
+ size_t vlen = 0;
+ if (argc > 2) {
+ // overwriting a token.
+ luaL_checklstring(L, 3, &vlen);
+ lua_pushvalue(L, 3); // copy to top of stack
+ lua_rawseti(L, -2, token);
+ rq->was_modified = true;
+ return 0;
+ } else {
+ // fetching a token.
+ if (lua_rawgeti(L, -1, token) != LUA_TSTRING) {
+ lua_pop(L, 1); // got a nil, drop it.
+
+ // token not uploaded yet. find the len.
+ char *s = (char *) &rq->pr.request[rq->pr.tokens[token-1]];
+ char *e = s;
+ while (*e != ' ') {
+ e++;
+ }
+ vlen = e - s;
+
+ P_DEBUG("%s: pushing token of len: %lu\n", __func__, vlen);
+ lua_pushlstring(L, s, vlen);
+ lua_pushvalue(L, -1); // copy
+
+ lua_rawseti(L, -3, token); // pops copy.
+ }
+
+ // return fetched token or copy of new token.
+ return 1;
+ }
+
+ return 0;
+}
+
+static int mcplib_request_ntokens(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
+ lua_pushinteger(L, rq->pr.ntokens);
+ return 1;
+}
+
+static int mcplib_request_command(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
+ lua_pushinteger(L, rq->pr.command);
+ return 1;
+}
+
+static int mcplib_request_gc(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, -1, "mcp.request");
+ // FIXME: during nread c->item is the malloc'ed buffer. not yet put into
+ // rq->buf - is this properly freed if the connection dies before
+ // complete_nread?
+ if (rq->pr.vbuf != NULL) {
+ free(rq->pr.vbuf);
+ }
+
+ if (rq->tokent_ref != 0) {
+ luaL_unref(L, LUA_REGISTRYINDEX, rq->tokent_ref);
+ }
+ return 0;
+}
+
+// TODO: check what lua does when it calls a function with a string argument
+// stored from a table/similar (ie; the prefix check code).
+// If it's not copying anything, we can add request-side functions to do most
+// forms of matching and avoid copying the key to lua space.
+
+/*** END REQUET PARSER AND OBJECT ***/
+
+/*** START jump consistent hash library ***/
+// TODO: easy candidate for splitting to another .c, but I want this built in
+// instead of as a .so so make sure it's linked directly.
+
+typedef struct {
+ struct proxy_hash_caller phc; // passed back to proxy API
+ uint64_t seed;
+ unsigned int buckets;
+} mcplib_jump_hash_t;
+
+static uint32_t mcplib_jump_hash_get_server(const void *key, size_t len, void *ctx) {
+ mcplib_jump_hash_t *jh = ctx;
+
+ uint64_t hash = XXH3_64bits_withSeed(key, len, jh->seed);
+
+ int64_t b = -1, j = 0;
+ while (j < jh->buckets) {
+ b = j;
+ hash = hash * 2862933555777941757ULL + 1;
+ j = (b + 1) * ((double)(1LL << 31) / (double)((hash >> 33) + 1));
+ }
+ return b+1; // FIXME: do the -1 just for ketama and remove from internal code?
+}
+
+// stack = [pool, option]
+static int mcplib_jump_hash_new(lua_State *L) {
+ uint64_t seed = 0;
+ const char *seedstr = NULL;
+ size_t seedlen = 0;
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+ lua_Unsigned buckets = lua_rawlen(L, 1);
+
+ int argc = lua_gettop(L);
+ if (argc > 1) {
+ // options supplied. to be specified as a table.
+ // { seed = "foo" }
+ luaL_checktype(L, 2, LUA_TTABLE);
+ // FIXME: adjust so we ensure/error on this being a string?
+ if (lua_getfield(L, 2, "seed") != LUA_TNIL) {
+ seedstr = lua_tolstring(L, -1, &seedlen);
+ seed = XXH3_64bits(seedstr, seedlen);
+ } else {
+ dump_stack(L);
+ }
+ lua_pop(L, 1);
+ }
+
+ mcplib_jump_hash_t *jh = lua_newuserdatauv(L, sizeof(mcplib_jump_hash_t), 0);
+ // TODO: check jh.
+
+ // don't need to loop through the table at all, just need its length.
+ // could optimize startup time by adding hints to the module for how to
+ // format pool (ie; just a total count or the full table)
+ jh->seed = seed;
+ jh->buckets = buckets;
+ jh->phc.ctx = jh;
+ jh->phc.selector_func = mcplib_jump_hash_get_server;
+
+ lua_pushlightuserdata(L, &jh->phc);
+
+ // - return [UD, lightuserdata]
+ return 2;
+}
+
+static int mcplib_open_jump_hash(lua_State *L) {
+ const struct luaL_Reg jump_f[] = {
+ {"new", mcplib_jump_hash_new},
+ {NULL, NULL},
+ };
+
+ luaL_newlib(L, jump_f);
+
+ return 1;
+}
+
+/*** END jump consistent hash library ***/
+
+/*** START lua interface to user stats ***/
+
+// mcp.add_stat(index, name)
+// creates a custom lua stats counter
+static int mcplib_add_stat(lua_State *L) {
+ LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
+ if (t != NULL) {
+ proxy_lua_error(L, "add_stat must be called from config_pools");
+ return 0;
+ }
+ int idx = luaL_checkinteger(L, -2);
+ const char *name = luaL_checkstring(L, -1);
+
+ if (idx < 1) {
+ proxy_lua_error(L, "stat index must be 1 or higher");
+ return 0;
+ }
+ // max user counters? 1024? some weird number.
+ if (idx > 1024) {
+ proxy_lua_error(L, "stat index must be 1024 or less");
+ return 0;
+ }
+ // max name length? avoids errors if something huge gets thrown in.
+ if (strlen(name) > STAT_KEY_LEN - 6) {
+ // we prepend "user_" to the output. + null byte.
+ proxy_lua_ferror(L, "stat name too long: %s\n", name);
+ return 0;
+ }
+ // restrict characters, at least no spaces/newlines.
+ for (int x = 0; x < strlen(name); x++) {
+ if (isspace(name[x])) {
+ proxy_lua_error(L, "stat cannot contain spaces or newlines");
+ return 0;
+ }
+ }
+
+ proxy_ctx_t *ctx = settings.proxy_ctx; // TODO: store ctx in upvalue.
+
+ // just to save some typing.
+ STAT_L(ctx);
+ struct proxy_user_stats *us = &ctx->user_stats;
+
+ // if num_stats is 0 we need to init sizes.
+ // TODO: malloc fail checking.
+ if (us->num_stats < idx) {
+ // don't allocate counters memory for the global ctx.
+ char **nnames = calloc(idx, sizeof(char *));
+ if (us->names != NULL) {
+ for (int x = 0; x < us->num_stats; x++) {
+ nnames[x] = us->names[x];
+ }
+ free(us->names);
+ }
+ us->names = nnames;
+ us->num_stats = idx;
+ }
+
+ idx--; // real slot start as 0.
+ // if slot has string in it, free first
+ if (us->names[idx] != NULL) {
+ free(us->names[idx]);
+ }
+ // strdup name into string slot
+ // TODO: malloc failure.
+ us->names[idx] = strdup(name);
+ STAT_UL(ctx);
+
+ return 0;
+}
+
+static int mcplib_stat(lua_State *L) {
+ LIBEVENT_THREAD *t = lua_touserdata(L, lua_upvalueindex(MCP_THREAD_UPVALUE));
+ if (t == NULL) {
+ proxy_lua_error(L, "stat must be called from router handlers");
+ return 0;
+ }
+
+ struct proxy_user_stats *tus = t->proxy_stats;
+ if (tus == NULL) {
+ proxy_lua_error(L, "no stats counters initialized");
+ return 0;
+ }
+
+ int idx = luaL_checkinteger(L, -2);
+ int change = luaL_checkinteger(L, -1);
+
+ if (idx < 1 || idx > tus->num_stats) {
+ proxy_lua_error(L, "stat index out of range");
+ return 0;
+ }
+
+ idx--; // actual array is 0 indexed.
+ WSTAT_L(t);
+ tus->counters[idx] += change;
+ WSTAT_UL(t);
+
+ return 0;
+}
+
+/*** END lua interface to user stats ***/
+
+/*** START lua await() object interface ***/
+
+typedef struct mcp_await_s {
+ int pending;
+ int wait_for;
+ int req_ref;
+ int argtable_ref; // need to hold refs to any potential hash selectors
+ int restable_ref; // table of result objects
+ int coro_ref; // reference to parent coroutine
+ bool completed; // have we completed the parent coroutine or not
+ mcp_request_t *rq;
+ mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
+} mcp_await_t;
+
+// local restable = mcp.await(request, hashselectors, num_wait)
+// NOTE: need to hold onto the hash selector objects since those hold backend
+// references. Here we just keep a reference to the argument table.
+static int mcplib_await(lua_State *L) {
+ mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
+ luaL_checktype(L, 2, LUA_TTABLE);
+ int n = luaL_len(L, 2); // length of hash selector table
+ int wait_for = 0; // 0 means wait for all responses
+
+ if (lua_isnumber(L, 3)) {
+ wait_for = lua_tointeger(L, 3);
+ lua_pop(L, 1);
+ if (wait_for > n) {
+ wait_for = n;
+ }
+ }
+ // TODO: bail if selector table was 0 len? else bad things can happen.
+
+ // TODO: quickly loop table once and ensure they're all hash selectors?
+ int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
+ int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.
+
+ // stack will be only the await object now
+ mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0);
+ memset(aw, 0, sizeof(mcp_await_t));
+ aw->wait_for = wait_for;
+ aw->pending = n;
+ aw->argtable_ref = argtable_ref;
+ aw->rq = rq;
+ aw->req_ref = req_ref;
+ P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
+ //dump_stack(L);
+
+ return lua_yield(L, 1);
+}
+
+static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref) {
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+
+ mcp_backend_t *be = rq->be;
+
+ // Then we push a response object, which we'll re-use later.
+ // reserve one uservalue for a lua-supplied response.
+ mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
+ if (r == NULL) {
+ proxy_lua_error(Lc, "out of memory allocating response");
+ return;
+ }
+ memset(r, 0, sizeof(mcp_resp_t));
+ r->buf = NULL;
+ r->blen = 0;
+ r->start = rq->start;
+ int x;
+ int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
+ for (x = 0; x < end; x++) {
+ if (rq->pr.request[x] == ' ') {
+ break;
+ }
+ r->cmd[x] = rq->pr.request[x];
+ }
+ r->cmd[x] = '\0';
+
+ luaL_getmetatable(Lc, "mcp.response");
+ lua_setmetatable(Lc, -2);
+
+ io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
+ // FIXME: can this fail?
+
+ // this is a re-cast structure, so assert that we never outsize it.
+ assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
+ memset(p, 0, sizeof(io_pending_proxy_t));
+ // set up back references.
+ p->io_queue_type = IO_QUEUE_PROXY;
+ p->thread = c->thread;
+ p->c = c;
+ p->resp = NULL;
+ p->client_resp = r;
+ p->flushed = false;
+ p->ascii_multiget = rq->ascii_multiget;
+
+ // io_p needs to hold onto its own response reference, because we may or
+ // may not include it in the final await() result.
+ p->mcpres_ref = luaL_ref(Lc, LUA_REGISTRYINDEX); // pops mcp.response
+
+ // avoiding coroutine reference for sub-IO
+ p->coro_ref = 0;
+ p->coro = NULL;
+
+ // await specific
+ p->is_await = true;
+ p->await_ref = await_ref;
+
+ // The direct backend object. await object is holding reference
+ p->backend = be;
+
+ mcp_request_attach(Lc, rq, p);
+
+ // link into the batch chain.
+ p->next = q->stack_ctx;
+ q->stack_ctx = p;
+ P_DEBUG("%s: queued\n", __func__);
+
+ return;
+}
+
+static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
+ P_DEBUG("%s: start\n", __func__);
+ mcp_await_t *aw = lua_touserdata(L, -1);
+ int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped.
+ assert(aw != NULL);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->argtable_ref); // -> 1
+ //dump_stack(L);
+ P_DEBUG("%s: argtable len: %d\n", __func__, (int)lua_rawlen(L, -1));
+ mcp_request_t *rq = aw->rq;
+ aw->coro_ref = coro_ref;
+
+ // create result table
+ lua_newtable(L); // -> 2
+ aw->restable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pop the result table
+
+ // prepare the request key
+ const char *key = MCP_PARSER_KEY(rq->pr);
+ size_t len = rq->pr.klen;
+ // loop arg table and run each hash selector
+ lua_pushnil(L); // -> 3
+ while (lua_next(L, 1) != 0) {
+ P_DEBUG("%s: top of loop\n", __func__);
+ // (key, -2), (val, -1)
+ // FIXME: move to a func. mostly redundant with hsp_call()?
+ mcp_pool_proxy_t *pp = luaL_testudata(L, -1, "mcp.pool_proxy");
+ if (pp == NULL) {
+ // TODO: fatal! wasn't correct object type
+ }
+ mcp_pool_t *p = pp->main;
+
+ uint32_t lookup = p->phc.selector_func(key, len, p->phc.ctx);
+ // NOTE: rq->be is only held to help pass the backend into the IOP in
+ // mcp_queue call. Could be a local variable and an argument too.
+ if (p->phc.ctx == NULL) {
+ rq->be = p->pool[lookup % p->pool_size].be;
+ } else {
+ rq->be = p->pool[lookup-1].be;
+ }
+
+ mcp_queue_await_io(c, L, rq, await_ref);
+
+ // pop value, keep key.
+ lua_pop(L, 1);
+ }
+
+ lua_pop(L, 1); // remove table key.
+ aw->resp = c->resp; // cuddle the current mc_resp to fill later
+
+ // we count the await as the "response pending" since it covers a single
+ // response object. the sub-IO's don't count toward the redispatch of *c
+ io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
+ q->count++;
+
+ P_DEBUG("%s\n", __func__);
+ //dump_stack(L); // should be empty
+
+ return 0;
+}
+
+//lua_rawseti(L, -2, x++);
+static int mcplib_await_return(io_pending_proxy_t *p) {
+ mcp_await_t *aw;
+ lua_State *L = p->thread->L; // use the main VM coroutine for work
+ bool cleanup = false;
+ bool valid = false;
+ bool completing = false;
+
+ // TODO: just push the await ptr into *p?
+ lua_rawgeti(L, LUA_REGISTRYINDEX, p->await_ref);
+ aw = lua_touserdata(L, -1);
+ lua_pop(L, 1); // remove AW object from stack
+ assert(aw != NULL);
+ P_DEBUG("%s: start [pending: %d]\n", __func__, aw->pending);
+ //dump_stack(L);
+
+ aw->pending--;
+ // Await not yet satisfied.
+ // If wait_for != 0 check for response success
+ // if success and wait_for is *now* 0, we complete.
+ // add successful response to response table
+ // Also, if no wait_for, add response to response table
+ if (!aw->completed) {
+ if (aw->wait_for > 0) {
+ if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
+ valid = true;
+ }
+ aw->wait_for--;
+
+ if (aw->wait_for == 0) {
+ completing = true;
+ }
+ } else {
+ valid = true;
+ }
+ }
+
+ // note that post-completion, we stop gathering responses into the
+ // resposne table... because it's already been returned.
+ // So "valid" can only be true if also !completed
+ if (aw->pending == 0) {
+ if (!aw->completed) {
+ // were waiting for all responses.
+ completing = true;
+ }
+ cleanup = true;
+ P_DEBUG("%s: pending == 0\n", __func__);
+ }
+
+ // a valid response to add to the result table.
+ if (valid) {
+ P_DEBUG("%s: valid\n", __func__);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
+ lua_rawgeti(L, LUA_REGISTRYINDEX, p->mcpres_ref); // -> 2
+ // couldn't find a table.insert() equivalent; so this is
+ // inserting into the length + 1 position manually.
+ //dump_stack(L);
+ lua_rawseti(L, 1, lua_rawlen(L, 1) + 1); // pops mcpres
+ lua_pop(L, 1); // pops restable
+ }
+
+ // lose our internal mcpres reference regardless.
+ luaL_unref(L, LUA_REGISTRYINDEX, p->mcpres_ref);
+ // our await_ref is shared, so we don't need to release it.
+
+ if (completing) {
+ P_DEBUG("%s: completing\n", __func__);
+ aw->completed = true;
+ // if we haven't completed yet, the connection reference is still
+ // valid. So now we pull it, reduce count, and readd if necessary.
+ // here is also the point where we resume the coroutine.
+ lua_rawgeti(L, LUA_REGISTRYINDEX, aw->coro_ref);
+ lua_State *Lc = lua_tothread(L, -1);
+ lua_rawgeti(Lc, LUA_REGISTRYINDEX, aw->restable_ref); // -> 1
+ proxy_run_coroutine(Lc, aw->resp, NULL, p->c);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->coro_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->restable_ref);
+
+ io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
+ q->count--;
+ if (q->count == 0) {
+ // call re-add directly since we're already in the worker thread.
+ conn_worker_readd(p->c);
+ }
+
+ }
+
+ if (cleanup) {
+ P_DEBUG("%s: cleanup [completed: %d]\n", __func__, aw->completed);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->argtable_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, aw->req_ref);
+ luaL_unref(L, LUA_REGISTRYINDEX, p->await_ref);
+ }
+
+ // Just remove anything we could have left on the primary VM stack
+ lua_settop(L, 0);
+
+ // always return free this sub-IO object.
+ do_cache_free(p->thread->io_cache, p);
+
+ return 0;
+}
+
+/*** END lua await() object interface ***/
+
+// Creates and returns the top level "mcp" module
+int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
+ lua_State *L = ctx;
+
+ const struct luaL_Reg mcplib_backend_m[] = {
+ {"set", NULL},
+ {"__gc", mcplib_backend_gc},
+ {NULL, NULL}
+ };
+
+ const struct luaL_Reg mcplib_request_m[] = {
+ {"command", mcplib_request_command},
+ {"key", mcplib_request_key},
+ {"ltrimkey", mcplib_request_ltrimkey},
+ {"rtrimkey", mcplib_request_rtrimkey},
+ {"token", mcplib_request_token},
+ {"ntokens", mcplib_request_ntokens},
+ {"__tostring", NULL},
+ {"__gc", mcplib_request_gc},
+ {NULL, NULL}
+ };
+
+ const struct luaL_Reg mcplib_response_m[] = {
+ {"ok", mcplib_response_ok},
+ {"hit", mcplib_response_hit},
+ {"__gc", mcplib_response_gc},
+ {NULL, NULL}
+ };
+
+ const struct luaL_Reg mcplib_pool_m[] = {
+ {"__gc", mcplib_pool_gc},
+ {NULL, NULL}
+ };
+
+ const struct luaL_Reg mcplib_pool_proxy_m[] = {
+ {"__call", mcplib_pool_proxy_call},
+ {"__gc", mcplib_pool_proxy_gc},
+ {NULL, NULL}
+ };
+
+ const struct luaL_Reg mcplib_f [] = {
+ {"pool", mcplib_pool},
+ {"backend", mcplib_backend},
+ {"request", mcplib_request},
+ {"attach", mcplib_attach},
+ {"add_stat", mcplib_add_stat},
+ {"stat", mcplib_stat},
+ {"await", mcplib_await},
+ {"backend_connect_timeout", mcplib_backend_connect_timeout},
+ {"backend_retry_timeout", mcplib_backend_retry_timeout},
+ {"backend_read_timeout", mcplib_backend_read_timeout},
+ {NULL, NULL}
+ };
+
+ // TODO: function + loop.
+ luaL_newmetatable(L, "mcp.backend");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_backend_m, 0); // register methods
+ lua_pop(L, 1);
+
+ luaL_newmetatable(L, "mcp.request");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_request_m, 0); // register methods
+ lua_pop(L, 1);
+
+ luaL_newmetatable(L, "mcp.response");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_response_m, 0); // register methods
+ lua_pop(L, 1);
+
+ luaL_newmetatable(L, "mcp.pool");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_pool_m, 0); // register methods
+ lua_pop(L, 1); // drop the hash selector metatable
+
+ luaL_newmetatable(L, "mcp.pool_proxy");
+ lua_pushvalue(L, -1); // duplicate metatable.
+ lua_setfield(L, -2, "__index"); // mt.__index = mt
+ luaL_setfuncs(L, mcplib_pool_proxy_m, 0); // register methods
+ lua_pop(L, 1); // drop the hash selector metatable
+
+ // create main library table.
+ //luaL_newlib(L, mcplib_f);
+ // TODO: luaL_newlibtable() just pre-allocs the exact number of things
+ // here.
+ // can replace with createtable and add the num. of the constant
+ // definitions.
+ luaL_newlibtable(L, mcplib_f);
+ proxy_register_defines(L);
+
+ // hash function for selectors.
+ // have to wrap the function in a struct because function pointers aren't
+ // pointer pointers :)
+ mcplib_open_jump_hash(L);
+ lua_setfield(L, -2, "hash_jump");
+ // FIXME: remove this once multi-probe is in, use that as default instead.
+ lua_pushlightuserdata(L, &mcplib_hashfunc_murmur3);
+ lua_setfield(L, -2, "hash_murmur3");
+
+ lua_pushlightuserdata(L, (void *)t); // upvalue for original thread
+ lua_newtable(L); // upvalue for mcp.attach() table.
+
+ // create weak table for storing backends by label.
+ lua_newtable(L); // {}
+ lua_newtable(L); // {}, {} for metatable
+ lua_pushstring(L, "v"); // {}, {}, "v" for weak values.
+ lua_setfield(L, -2, "__mode"); // {}, {__mode = "v"}
+ lua_setmetatable(L, -2); // {__mt = {__mode = "v"} }
+
+ luaL_setfuncs(L, mcplib_f, 3); // store upvalues.
+
+ lua_setglobal(L, "mcp"); // set the lib table to mcp global.
+ return 1;
+}
diff --git a/proto_proxy.h b/proto_proxy.h
new file mode 100644
index 0000000..aa58323
--- /dev/null
+++ b/proto_proxy.h
@@ -0,0 +1,26 @@
+#ifndef PROTO_PROXY_H
+#define PROTO_PROXY_H
+
+void proxy_stats(ADD_STAT add_stats, conn *c);
+void process_proxy_stats(ADD_STAT add_stats, conn *c);
+
+/* proxy mode handlers */
+int try_read_command_proxy(conn *c);
+void complete_nread_proxy(conn *c);
+void proxy_thread_init(LIBEVENT_THREAD *thr);
+void proxy_init(void);
+// TODO: need better names or a better interface for these. can be confusing
+// to reason about the order.
+void proxy_start_reload(void *arg);
+int proxy_load_config(void *arg);
+void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr);
+
+void proxy_submit_cb(io_queue_t *q);
+void proxy_complete_cb(io_queue_t *q);
+void proxy_return_cb(io_pending_t *pending);
+void proxy_finalize_cb(io_pending_t *pending);
+
+/* lua */
+int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx);
+
+#endif
diff --git a/proto_text.c b/proto_text.c
index 000ddd1..cfe5d97 100644
--- a/proto_text.c
+++ b/proto_text.c
@@ -5,6 +5,10 @@
#include "memcached.h"
#include "proto_text.h"
+// FIXME: only for process_proxy_stats()
+// - some better/different structure for stats subcommands
+// would remove this abstraction leak.
+#include "proto_proxy.h"
#include "authfile.h"
#include "storage.h"
#include "base64.h"
@@ -42,8 +46,6 @@
} \
}
-static void process_command(conn *c, char *command);
-
typedef struct token_s {
char *value;
size_t length;
@@ -481,7 +483,7 @@ int try_read_command_ascii(conn *c) {
assert(cont <= (c->rcurr + c->rbytes));
c->last_cmd_time = current_time;
- process_command(c, c->rcurr);
+ process_command_ascii(c, c->rcurr);
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
@@ -792,6 +794,10 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
} else if (strcmp(subcommand, "extstore") == 0) {
process_extstore_stats(&append_stats, c);
#endif
+#ifdef PROXY
+ } else if (strcmp(subcommand, "proxy") == 0) {
+ process_proxy_stats(&append_stats, c);
+#endif
} else {
/* getting here means that the subcommand is either engine specific or
is invalid. query the engine and see. */
@@ -2684,7 +2690,7 @@ static void process_refresh_certs_command(conn *c, token_t *tokens, const size_t
// we can't drop out and back in again.
// Leaving this note here to spend more time on a fix when necessary, or if an
// opportunity becomes obvious.
-static void process_command(conn *c, char *command) {
+void process_command_ascii(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
size_t ntokens;
diff --git a/proto_text.h b/proto_text.h
index b626f97..aae0ed6 100644
--- a/proto_text.h
+++ b/proto_text.h
@@ -5,5 +5,6 @@
void complete_nread_ascii(conn *c);
int try_read_command_asciiauth(conn *c);
int try_read_command_ascii(conn *c);
+void process_command_ascii(conn *c, char *command);
#endif
diff --git a/t/lib/MemcachedTest.pm b/t/lib/MemcachedTest.pm
index 2a0fcb7..c5cb6fb 100644
--- a/t/lib/MemcachedTest.pm
+++ b/t/lib/MemcachedTest.pm
@@ -18,7 +18,7 @@ my @unixsockets = ();
mem_get_is mem_gets mem_gets_is mem_stats mem_move_time
supports_sasl free_port supports_drop_priv supports_extstore
wait_ext_flush supports_tls enabled_tls_testing run_help
- supports_unix_socket);
+ supports_unix_socket get_memcached_exe supports_proxy);
use constant MAX_READ_WRITE_SIZE => 16384;
use constant SRV_CRT => "server_crt.pem";
@@ -215,6 +215,12 @@ sub supports_extstore {
return 0;
}
+sub supports_proxy {
+ my $output = print_help();
+ return 1 if $output =~ /proxy_config/i;
+ return 0;
+}
+
sub supports_tls {
my $output = print_help();
return 1 if $output =~ /enable-ssl/i;
diff --git a/t/proxy.t b/t/proxy.t
new file mode 100644
index 0000000..c85796d
--- /dev/null
+++ b/t/proxy.t
@@ -0,0 +1,263 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+use Test::More;
+use FindBin qw($Bin);
+use lib "$Bin/lib";
+use Carp qw(croak);
+use MemcachedTest;
+
+# TODO: to module?
+# or "gettimedrun" etc
+use Cwd;
+my $builddir = getcwd;
+
+if (!supports_proxy()) {
+ plan skip_all => 'proxy not enabled';
+ exit 0;
+}
+
+# TODO: the lua file has hardcoded ports. any way to make this dynamic?
+# TODO: once basic tests are done, actually split out the instances rather
+# than the shared backend; validate keys go where they should be going.
+
+# FIXME: this listend on unix socket still. either need a manual runner or a
+# fix upstream.
+my @srv = ();
+for (2 .. 6) {
+ my $srv = run_server("-p 1121$_", 11210 + $_);
+ push(@srv, $srv);
+}
+#my $sock = $srv->sock;
+
+my $p_srv = new_memcached('-o proxy_config=./t/startfile.lua -l 127.0.0.1', 11211);
+my $p_sock = $p_srv->sock;
+
+# hack to help me use T_MEMD_USE_DAEMON for proxy.
+#print STDERR "Sleeping\n";
+#sleep 900;
+
+# cmds to test:
+# - noreply for main text commands?
+# meta:
+# me
+# mn
+# mg
+# ms
+# md
+# ma
+# - noreply?
+# stats
+# pass-thru?
+
+# incr/decr
+{
+ print $p_sock "set /foo/num 0 0 1\r\n1\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored num");
+ mem_get_is($p_sock, "/foo/num", 1, "stored 1");
+
+ print $p_sock "incr /foo/num 1\r\n";
+ is(scalar <$p_sock>, "2\r\n", "+ 1 = 2");
+ mem_get_is($p_sock, "/foo/num", 2);
+
+ print $p_sock "incr /foo/num 8\r\n";
+ is(scalar <$p_sock>, "10\r\n", "+ 8 = 10");
+ mem_get_is($p_sock, "/foo/num", 10);
+
+ print $p_sock "decr /foo/num 1\r\n";
+ is(scalar <$p_sock>, "9\r\n", "- 1 = 9");
+
+ print $p_sock "decr /foo/num 9\r\n";
+ is(scalar <$p_sock>, "0\r\n", "- 9 = 0");
+
+ print $p_sock "decr /foo/num 5\r\n";
+ is(scalar <$p_sock>, "0\r\n", "- 5 = 0");
+}
+
+# gat
+{
+ # cache miss
+ print $p_sock "gat 10 /foo/foo1\r\n";
+ is(scalar <$p_sock>, "END\r\n", "cache miss");
+
+ # set /foo/foo1 and /foo/foo2 (and should get it)
+ print $p_sock "set /foo/foo1 0 2 7\r\nfooval1\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored foo");
+
+ print $p_sock "set /foo/foo2 0 2 7\r\nfooval2\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored /foo/foo2");
+
+ # get and touch it with cas
+ print $p_sock "gats 10 /foo/foo1 /foo/foo2\r\n";
+ like(scalar <$p_sock>, qr/VALUE \/foo\/foo1 0 7 (\d+)\r\n/, "get and touch foo1 with cas regexp success");
+ is(scalar <$p_sock>, "fooval1\r\n","value");
+ like(scalar <$p_sock>, qr/VALUE \/foo\/foo2 0 7 (\d+)\r\n/, "get and touch foo2 with cas regexp success");
+ is(scalar <$p_sock>, "fooval2\r\n","value");
+ is(scalar <$p_sock>, "END\r\n", "end");
+
+ # get and touch it without cas
+ print $p_sock "gat 10 /foo/foo1 /foo/foo2\r\n";
+ like(scalar <$p_sock>, qr/VALUE \/foo\/foo1 0 7\r\n/, "get and touch foo1 without cas regexp success");
+ is(scalar <$p_sock>, "fooval1\r\n","value");
+ like(scalar <$p_sock>, qr/VALUE \/foo\/foo2 0 7\r\n/, "get and touch foo2 without cas regexp success");
+ is(scalar <$p_sock>, "fooval2\r\n","value");
+ is(scalar <$p_sock>, "END\r\n", "end");
+}
+
+# gets/cas
+{
+ print $p_sock "add /foo/moo 0 0 6\r\nmooval\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored mooval");
+ mem_get_is($p_sock, "/foo/moo", "mooval");
+
+ # check-and-set (cas) failure case, try to set value with incorrect cas unique val
+ print $p_sock "cas /foo/moo 0 0 6 0\r\nMOOVAL\r\n";
+ is(scalar <$p_sock>, "EXISTS\r\n", "check and set with invalid id");
+
+ # test "gets", grab unique ID
+ print $p_sock "gets /foo/moo\r\n";
+ # VALUE moo 0 6 3084947704
+ #
+ my @retvals = split(/ /, scalar <$p_sock>);
+ my $data = scalar <$p_sock>; # grab data
+ my $dot = scalar <$p_sock>; # grab dot on line by itself
+ is($retvals[0], "VALUE", "get value using 'gets'");
+ my $unique_id = $retvals[4];
+ # clean off \r\n
+ $unique_id =~ s/\r\n$//;
+ ok($unique_id =~ /^\d+$/, "unique ID '$unique_id' is an integer");
+ # now test that we can store moo with the correct unique id
+ print $p_sock "cas /foo/moo 0 0 6 $unique_id\r\nMOOVAL\r\n";
+ is(scalar <$p_sock>, "STORED\r\n");
+ mem_get_is($p_sock, "/foo/moo", "MOOVAL");
+}
+
+# touch
+{
+ print $p_sock "set /foo/t 0 2 6\r\nfooval\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored foo");
+ mem_get_is($p_sock, "/foo/t", "fooval");
+
+ # touch it
+ print $p_sock "touch /foo/t 10\r\n";
+ is(scalar <$p_sock>, "TOUCHED\r\n", "touched foo");
+
+ # don't need to sleep/validate the touch worked. We're testing the
+ # protocol, not the functionality.
+}
+
+# command endings
+# NOTE: memcached always allowed [\r]\n for single command lines, but payloads
+# (set/etc) require exactly \r\n as termination.
+# doc/protocol.txt has always specified \r\n for command/response.
+# Proxy is more strict than normal server in this case.
+{
+ my $s = $srv[0]->sock;
+ print $s "version\n";
+ like(<$s>, qr/VERSION/, "direct server version cmd with just newline");
+ print $p_sock "version\n";
+ like(<$p_sock>, qr/SERVER_ERROR/, "proxy version cmd with just newline");
+ print $p_sock "version\r\n";
+ like(<$p_sock>, qr/VERSION/, "proxy version cmd with full CRLF");
+}
+
+# set through proxy.
+{
+ print $p_sock "set /foo/z 0 0 5\r\nhello\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+ # ensure it's fetchable.
+ mem_get_is($p_sock, "/foo/z", "hello");
+ # delete it.
+ print $p_sock "delete /foo/z\r\n";
+ is(scalar <$p_sock>, "DELETED\r\n", "removed test value");
+ # ensure it's deleted.
+ mem_get_is($p_sock, "/foo/z", undef);
+}
+
+# test add.
+{
+ print $p_sock "add /foo/a 0 0 3\r\nmoo\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "add test value through proxy");
+ # ensure it's fetchable
+ mem_get_is($p_sock, "/foo/a", "moo");
+ # check re-adding fails.
+ print $p_sock "add /foo/a 0 0 3\r\ngoo\r\n";
+ is(scalar <$p_sock>, "NOT_STORED\r\n", "re-add fails");
+ # ensure we still hae the old value
+ mem_get_is($p_sock, "/foo/a", "moo");
+}
+
+# pipelined set.
+{
+ my $str = "set /foo/k 0 0 5\r\nhello\r\n";
+ print $p_sock "$str$str$str$str$str";
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value through proxy");
+}
+
+# Load some keys through proxy.
+my $bdata = 'x' x 256000;
+{
+ for (1..20) {
+ print $p_sock "set /foo/a$_ 0 0 2\r\nhi\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value");
+ print $p_sock "set /bar/b$_ 0 0 2\r\nhi\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored test value");
+ }
+
+ # load a couple larger values
+ for (1..4) {
+ print $p_sock "set /foo/big$_ 0 0 256000\r\n$bdata\r\n";
+ is(scalar <$p_sock>, "STORED\r\n", "stored big value");
+ }
+ diag "set large values";
+}
+
+# fetch through proxy.
+{
+ for (1..20) {
+ mem_get_is($p_sock, "/foo/a$_", "hi");
+ }
+ diag "fetched small values";
+ mem_get_is($p_sock, "/foo/big1", $bdata);
+ diag "fetched big value";
+}
+
+sub run_server {
+ my ($args, $port) = @_;
+
+ my $exe = get_memcached_exe();
+
+ my $childpid = fork();
+
+ my $root = '';
+ $root = "-u root" if ($< == 0);
+
+ # test build requires more privileges
+ $args .= " -o relaxed_privileges";
+
+ my $cmd = "$builddir/timedrun 120 $exe $root $args";
+
+ unless($childpid) {
+ exec $cmd;
+ exit; # NOTREACHED
+ }
+
+ for (1..20) {
+ my $conn = IO::Socket::INET->new(PeerAddr => "127.0.0.1:$port");
+ if ($conn) {
+ return Memcached::Handle->new(pid => $childpid,
+ conn => $conn,
+ host => "127.0.0.1",
+ port => $port);
+ }
+ select undef, undef, undef, 0.10;
+ }
+ croak "Failed to start server.";
+}
+
+done_testing();
diff --git a/t/startfile.lua b/t/startfile.lua
new file mode 100644
index 0000000..e267655
--- /dev/null
+++ b/t/startfile.lua
@@ -0,0 +1,280 @@
+-- xTODO: sets of zones behind a prefix.
+-- xTODO: zones with local vs other failover.
+-- xTODO: failover on get
+-- xTODO: all zone sync on set
+-- TODO: fallback cache for broken/overloaded zones.
+
+-- local zone could/should be fetched from environment or local file.
+-- doing so allows all configuration files to be identical, simplifying consistency checks.
+local my_zone = 'z1'
+
+local STAT_EXAMPLE <const> = 1
+local STAT_ANOTHER <const> = 2
+
+function mcp_config_pools(oldss)
+ mcp.add_stat(STAT_EXAMPLE, "example")
+ mcp.add_stat(STAT_ANOTHER, "another")
+ mcp.backend_connect_timeout(5) -- 5 second timeout.
+ -- alias mcp.backend for convenience.
+ -- important to alias global variables in routes where speed is concerned.
+ local srv = mcp.backend
+ -- local zones = { 'z1', 'z2', 'z3' }
+
+ -- IPs are "127" . "zone" . "pool" . "srv"
+ local pfx = 'fooz1'
+ local fooz1 = {
+ srv(pfx .. 'srv1', '127.1.1.1', 11212, 1),
+ srv(pfx .. 'srv2', '127.1.1.2', 11212, 1),
+ srv(pfx .. 'srv3', '127.1.1.3', 11212, 1),
+ }
+ pfx = 'fooz2'
+ local fooz2 = {
+ srv(pfx .. 'srv1', '127.2.1.1', 11213, 1),
+ srv(pfx .. 'srv2', '127.2.1.2', 11213, 1),
+ srv(pfx .. 'srv3', '127.2.1.3', 11213, 1),
+ }
+ pfx = 'fooz3'
+ local fooz3 = {
+ srv(pfx .. 'srv1', '127.3.1.1', 11214, 1),
+ srv(pfx .. 'srv2', '127.3.1.2', 11214, 1),
+ srv(pfx .. 'srv3', '127.3.1.3', 11214, 1),
+ }
+
+ pfx = 'barz1'
+ -- zone "/bar/"-s primary zone should fail; all down.
+ local barz1 = {
+ srv(pfx .. 'srv1', '127.1.2.1', 11210, 1),
+ srv(pfx .. 'srv2', '127.1.2.1', 11210, 1),
+ srv(pfx .. 'srv3', '127.1.2.1', 11210, 1),
+ }
+ pfx = 'barz2'
+ local barz2 = {
+ srv(pfx .. 'srv1', '127.2.2.2', 11215, 1),
+ srv(pfx .. 'srv2', '127.2.2.2', 11215, 1),
+ srv(pfx .. 'srv3', '127.2.2.2', 11215, 1),
+ }
+ pfx = 'barz3'
+ local barz3 = {
+ srv(pfx .. 'srv1', '127.3.2.3', 11216, 1),
+ srv(pfx .. 'srv2', '127.3.2.3', 11216, 1),
+ srv(pfx .. 'srv3', '127.3.2.3', 11216, 1),
+ }
+
+ -- fallback cache for any zone
+ -- NOT USED YET
+ pfx = 'fallz1'
+ local fallz1 = {
+ srv(pfx .. 'srv1', '127.0.2.1', 11212, 1),
+ }
+ pfx = 'fallz2'
+ local fallz2 = {
+ srv(pfx .. 'srv1', '127.0.2.2', 11212, 1),
+ }
+ pfx = 'fallz3'
+ local fallz3 = {
+ srv(pfx .. 'srv1', '127.0.2.3', 11212, 1),
+ }
+
+ local main_zones = {
+ foo = { z1 = fooz1, z2 = fooz2, z3 = fooz3 },
+ bar = { z1 = barz1, z2 = barz2, z3 = barz3 },
+ -- fall = { z1 = fallz1, z2 = fallz2, z3 = fallz3 },
+ }
+
+ -- FIXME: should we copy the table to keep the pool tables around?
+ -- does the hash selector hold a reference to the pool (but only available in main config?)
+
+ -- uncomment to use the ketama loadable module.
+ -- FIXME: passing an argument to the ketama module doesn't work yet.
+ -- local ketama = require("ketama")
+
+ -- convert the pools into hash selectors.
+ -- TODO: is this a good place to add prefixing/hash editing?
+ for _, subs in pairs(main_zones) do
+ for k, v in pairs(subs) do
+ -- use next line instead for a third party ketama hash
+ -- subs[k] = mcp.pool(v, { dist = ketama })
+ -- this line overrides the default bucket size for ketama
+ -- subs[k] = mcp.pool(v, { dist = ketama, obucket = 80 })
+ -- this line uses the default murmur3 straight hash.
+ subs[k] = mcp.pool(v)
+
+ -- use this next line instead for jump hash.
+ -- the order of servers in the pool argument _must_ not change!
+ -- adding the seed string will give a different key distribution
+ -- for each zone.
+ -- NOTE: 'k' may not be the right seed here:
+ -- instead stitch main_zone's key + the sub key?
+ -- subs[k] = mcp.pool(v, { dist = mcp.hash_jump, seed = k })
+ end
+ end
+
+ return main_zones
+end
+
+-- WORKER CODE:
+
+-- need to redefine main_zones using fetched selectors?
+
+-- TODO: Fallback zone here?
+function failover_factory(zones, local_zone)
+ local near_zone = zones[local_zone]
+ local far_zones = {}
+ -- NOTE: could shuffle/sort to re-order zone retry order
+ -- or use 'next(far_zones, idx)' via a stored upvalue here
+ for k, v in pairs(zones) do
+ if k ~= local_zone then
+ far_zones[k] = v
+ end
+ end
+ return function(r)
+ local res = near_zone(r)
+ if res:hit() == false then
+ for _, zone in pairs(far_zones) do
+ res = zone(r)
+ if res:hit() then
+ break
+ end
+ end
+ end
+ return res -- send result back to client
+ end
+end
+
+-- SET's to main zone, issues deletes to far zones.
+function setinvalidate_factory(zones, local_zone)
+ local near_zone = zones[local_zone]
+ local far_zones = {}
+ -- NOTE: could shuffle/sort to re-order zone retry order
+ -- or use 'next(far_zones, idx)' via a stored upvalue here
+ for k, v in pairs(zones) do
+ if k ~= local_zone then
+ far_zones[k] = v
+ end
+ end
+ local new_req = mcp.request
+ return function(r)
+ local res = near_zone(r)
+ if res:ok() == true then
+ -- create a new delete request
+ local dr = new_req("delete /testing/" .. r:key() .. "\r\n")
+ for _, zone in pairs(far_zones) do
+ -- NOTE: can check/do things on the specific response here.
+ zone(dr)
+ end
+ end
+ -- use original response for client, not DELETE's response.
+ -- else client won't understand.
+ return res -- send result back to client
+ end
+end
+
+-- NOTE: this function is culling key prefixes. it is an error to use it
+-- without a left anchored (^) pattern.
+function prefixtrim_factory(pattern, list, default)
+ local p = pattern
+ local l = list
+ local d = default
+ local s = mcp.stat
+ return function(r)
+ local i, j, match = string.find(r:key(), p)
+ local route
+ if match ~= nil then
+ -- remove the key prefix so we don't waste storage.
+ r:ltrimkey(j)
+ route = l[match]
+ if route == nil then
+ -- example counter: tick when default route hit.
+ s(STAT_EXAMPLE, 1)
+ return d(r)
+ end
+ end
+ return route(r)
+ end
+end
+
+function prefix_factory(pattern, list, default)
+ local p = pattern
+ local l = list
+ local d = default
+ local s = mcp.stat
+ return function(r)
+ local route = l[string.match(r:key(), p)]
+ if route == nil then
+ -- example counter: tick when default route hit.
+ s(STAT_EXAMPLE, 1)
+ return d(r)
+ end
+ return route(r)
+ end
+end
+
+-- TODO: Check tail call requirements?
+function command_factory(map, default)
+ local m = map
+ local d = default
+ return function(r)
+ local f = map[r:command()]
+ if f == nil then
+ -- print("default command")
+ return d(r)
+ end
+ -- testing options replacement...
+ if r:command() == mcp.CMD_SET then
+ r:token(4, "100") -- set exptime.
+ end
+ -- print("override command")
+ return f(r)
+ end
+end
+
+-- TODO: is the return value the average? anything special?
+-- walks a list of selectors and repeats the request.
+function walkall_factory(pool)
+ local p = {}
+ -- TODO: a shuffle could be useful here.
+ for _, v in pairs(pool) do
+ table.insert(p, v)
+ end
+ local x = #p
+ return function(r)
+ local restable = mcp.await(r, p)
+ -- walk results and return "best" result
+ -- print("length of await result table", #restable)
+ for _, res in pairs(restable) do
+ if res:ok() then
+ return res
+ end
+ end
+ -- else we return the first result.
+ return restable[1]
+ end
+end
+
+function mcp_config_routes(main_zones)
+ -- generate the prefix routes from zones.
+ local prefixes = {}
+ for pfx, z in pairs(main_zones) do
+ local failover = failover_factory(z, my_zone)
+ local all = walkall_factory(main_zones[pfx])
+ local setdel = setinvalidate_factory(z, my_zone)
+ local map = {}
+ map[mcp.CMD_SET] = all
+ -- NOTE: in t/proxy.t all the backends point to the same place
+ -- which makes replicating delete return NOT_FOUND
+ map[mcp.CMD_DELETE] = all
+ -- similar with ADD. will get an NOT_STORED back.
+ -- need better routes designed for the test suite (edit the key
+ -- prefix or something)
+ map[mcp.CMD_ADD] = failover_factory(z, my_zone)
+ prefixes[pfx] = command_factory(map, failover)
+ end
+
+ local routetop = prefix_factory("^/(%a+)/", prefixes, function(r) return "SERVER_ERROR no route\r\n" end)
+
+ -- internally run parser at top of tree
+ -- also wrap the request string with a convenience object until the C bits
+ -- are attached to the internal parser.
+ --mcp.attach(mcp.CMD_ANY, function (r) return routetop(r) end)
+ mcp.attach(mcp.CMD_ANY_STORAGE, routetop)
+end
diff --git a/thread.c b/thread.c
index 948e4fe..dc59062 100644
--- a/thread.c
+++ b/thread.c
@@ -9,6 +9,9 @@
#ifdef HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
+#ifdef PROXY
+#include "proto_proxy.h"
+#endif
#include <assert.h>
#include <stdio.h>
#include <errno.h>
@@ -36,6 +39,9 @@ enum conn_queue_item_modes {
queue_redispatch, /* return conn from side thread */
queue_stop, /* exit thread */
queue_return_io, /* returning a pending IO object immediately */
+#ifdef PROXY
+ queue_proxy_reload, /* signal proxy to reload worker VM */
+#endif
};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
@@ -475,6 +481,16 @@ static void setup_thread(LIBEVENT_THREAD *me) {
storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
}
#endif
+#ifdef PROXY
+ thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb,
+ proxy_complete_cb, proxy_return_cb, proxy_finalize_cb);
+
+ // TODO: maybe register hooks to be called here from sub-packages? ie;
+ // extstore, TLS, proxy.
+ if (settings.proxy_enabled) {
+ proxy_thread_init(me);
+ }
+#endif
thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
}
@@ -600,12 +616,23 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
/* getting an individual IO object back */
conn_io_queue_return(item->io);
break;
+#ifdef PROXY
+ case queue_proxy_reload:
+ proxy_worker_reload(settings.proxy_ctx, me);
+ break;
+#endif
}
cqi_free(me->ev_queue, item);
}
}
+// NOTE: need better encapsulation.
+// used by the proxy module to iterate the worker threads.
+LIBEVENT_THREAD *get_worker_thread(int id) {
+ return &threads[id];
+}
+
/* Which thread we assigned a connection to most recently. */
static int last_thread = -1;
@@ -726,6 +753,11 @@ void redispatch_conn(conn *c) {
void timeout_conn(conn *c) {
notify_worker_fd(c->thread, c->sfd, queue_timeout);
}
+#ifdef PROXY
+void proxy_reload_notify(LIBEVENT_THREAD *t) {
+ notify_worker_fd(t, 0, queue_proxy_reload);
+}
+#endif
void return_io_pending(io_pending_t *io) {
CQ_ITEM *item = cqi_new(io->thread->ev_queue);
@@ -898,6 +930,9 @@ void threadlocal_stats_reset(void) {
#ifdef EXTSTORE
EXTSTORE_THREAD_STATS_FIELDS
#endif
+#ifdef PROXY
+ PROXY_THREAD_STATS_FIELDS
+#endif
#undef X
memset(&threads[ii].stats.slab_stats, 0,
@@ -923,6 +958,9 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) {
#ifdef EXTSTORE
EXTSTORE_THREAD_STATS_FIELDS
#endif
+#ifdef PROXY
+ PROXY_THREAD_STATS_FIELDS
+#endif
#undef X
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
diff --git a/vendor/.gitignore b/vendor/.gitignore
new file mode 100644
index 0000000..6ed69cb
--- /dev/null
+++ b/vendor/.gitignore
@@ -0,0 +1,4 @@
+README.md
+/mcmc/example
+!/mcmc/Makefile
+!/Makefile
diff --git a/vendor/Makefile b/vendor/Makefile
new file mode 100644
index 0000000..9d51aee
--- /dev/null
+++ b/vendor/Makefile
@@ -0,0 +1,10 @@
+all:
+ cd lua && $(MAKE) all && cd ..
+ cd mcmc && $(MAKE) all && cd ..
+
+clean:
+ cd lua && $(MAKE) clean && cd ..
+ cd mcmc && $(MAKE) clean && cd ..
+
+dist: clean
+distdir: clean
diff --git a/vendor/fetch.sh b/vendor/fetch.sh
new file mode 100755
index 0000000..2d54142
--- /dev/null
+++ b/vendor/fetch.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+HASH="44a55dee1d41c3ae92524df9f0dd8a747db79f04"
+wget https://github.com/memcached/memcached-vendor/archive/${HASH}.tar.gz
+tar -zxf ./${HASH}.tar.gz --strip-components=1
+rm ${HASH}.tar.gz
diff --git a/vendor/lua/.gitignore b/vendor/lua/.gitignore
new file mode 100644
index 0000000..d6b7ef3
--- /dev/null
+++ b/vendor/lua/.gitignore
@@ -0,0 +1,2 @@
+*
+!.gitignore
diff --git a/vendor/mcmc/LICENSE b/vendor/mcmc/LICENSE
new file mode 100644
index 0000000..656c8f7
--- /dev/null
+++ b/vendor/mcmc/LICENSE
@@ -0,0 +1,30 @@
+Copyright (c) 2021 Cache Forge LLC.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+ * Neither the name of the Danga Interactive nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/vendor/mcmc/Makefile b/vendor/mcmc/Makefile
new file mode 100644
index 0000000..8ac6290
--- /dev/null
+++ b/vendor/mcmc/Makefile
@@ -0,0 +1,13 @@
+# gcc -g -Wall -Werror -pedantic -o example example.c mcmc.c
+PREFIX=/usr/local
+
+all:
+ gcc -g -O2 -Wall -Werror -pedantic -o example example.c mcmc.c
+ gcc -g -O2 -Wall -Werror -pedantic -c mcmc.c
+
+clean:
+ rm -f example mcmc.o
+
+dist: clean
+
+distdir:
diff --git a/vendor/mcmc/README.md b/vendor/mcmc/README.md
new file mode 100644
index 0000000..10d4880
--- /dev/null
+++ b/vendor/mcmc/README.md
@@ -0,0 +1,52 @@
+# Minimal (C) Client for MemCached
+
+WARNING: WORK IN PROGRESS. Missing features or testing!
+
+MCMC is a minimalistic allocation-free modern client for memcached. It uses a
+generic response parser, allowing a single code path regardless of the command
+sent to memcached. It has no 3rd party dependencies and is designed to
+integrate as a building block into full clients.
+
+MCMC does not (yet) include a typical memcached "selector". Meaning the
+ability to add many servers to a hash table of some kind and routing keys to
+specific servers. The MCMC base client is designed to be an object that
+selector objects hold and then issue commands against.
+
+Allocation-free (aside from a call to `getaddrinfo()`) means it does not
+_internally_ do any allocations, relying only on the stack. It requires you
+malloc a small structure and some buffers, but you are then free to manage
+them yourselves. Clients do not hold onto buffers when idle, cutting their
+memory overhead to a handful of bytes plus the TCP socket.
+
+MCMC is designed to be a building block for users designing full clients.
+For example:
+
+* A client author wants to implement the "get" command
+* They write a function in their native language's wrapper which accepts the
+ key to fetch and embeds that into a text buffer to look like `get [key]\r\n`
+* They then call mcmc's functions to send and read the response, parsing and
+ returning it to the client.
+
+This should be the same, if not less, code than wrapping a full C client with
+every possible command broken out. It also means 3rd party clients can (and
+should!) embed mcmc.c/mcmc.h (and any selector code they want) rather than be
+dependent on system distribution of a more complex client.
+
+The allocation-free nature also makes unit testing the client code easier,
+hopefully leading to higher quality.
+
+Caveats:
+
+* Care should be taken when handling the buffers mcmc requires to operate.
+ Since there are few operators you should only have to pay attention once :)
+* It does not support the various maintenance/settings commands (ie; `lru_crawler`).
+ It may gain some generic support for this, but those commands were not
+designed with consistent response codes and are hard to implement.
+* Does not support the binary protocol, which has been deprecated as of 1.6.0.
+
+As of this writing the code is being released _early_ (perhaps too early?). It
+may not have proper makefiles, tests, or a fully implemented API. The code has
+been posted so client authors and users can give early feedback on the API in
+hopes of prodiving something high quality and stable.
+
+Again, looking for feedback! Open an issue or let me know what you think.
diff --git a/vendor/mcmc/example.c b/vendor/mcmc/example.c
new file mode 100644
index 0000000..958c351
--- /dev/null
+++ b/vendor/mcmc/example.c
@@ -0,0 +1,214 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <stdint.h>
+#include <poll.h>
+#include <signal.h>
+#include <sys/uio.h>
+
+#include "mcmc.h"
+
+static void show_response(void *c, char *rbuf, size_t bufsize) {
+ int status;
+ // buffer shouldn't change until the read is completed.
+ mcmc_resp_t resp;
+ int go = 1;
+ while (go) {
+ go = 0;
+ status = mcmc_read(c, rbuf, bufsize, &resp);
+ if (status == MCMC_OK) {
+ // OK means a response of some kind was read.
+ char *val;
+ // NOTE: is "it's not a miss, and vlen is 0" enough to indicate that
+ // a 0 byte value was returned?
+ if (resp.vlen != 0) {
+ if (resp.vlen == resp.vlen_read) {
+ val = resp.value;
+ } else {
+ val = malloc(resp.vlen);
+ int read = 0;
+ do {
+ status = mcmc_read_value(c, val, resp.vlen, &read);
+ } while (status == MCMC_WANT_READ);
+ }
+ if (resp.vlen > 0) {
+ val[resp.vlen-1] = '\0';
+ printf("Response value: %s\n", val);
+ }
+ }
+ switch (resp.type) {
+ case MCMC_RESP_GET:
+ // GET's need to continue until END is seen.
+ printf("in GET mode\n");
+ go = 1;
+ break;
+ case MCMC_RESP_END: // ascii done-with-get's
+ printf("END seen\n");
+ break;
+ case MCMC_RESP_META: // any meta command. they all return the same.
+ printf("META response seen\n");
+ if (resp.rlen > 0) {
+ resp.rline[resp.rlen-1] = '\0';
+ printf("META response line: %s\n", resp.rline);
+ }
+ break;
+ case MCMC_RESP_STAT:
+ // STAT responses. need to call mcmc_read() in loop until
+ // we get an end signal.
+ go = 1;
+ break;
+ default:
+ // TODO: type -> str func.
+ fprintf(stderr, "unknown response type: %d\n", resp.type);
+ break;
+ }
+ } else {
+ // some kind of command specific error code (management commands)
+ // or protocol error status.
+ char code[MCMC_ERROR_CODE_MAX];
+ char msg[MCMC_ERROR_MSG_MAX];
+ mcmc_get_error(c, code, MCMC_ERROR_CODE_MAX, msg, MCMC_ERROR_MSG_MAX);
+ fprintf(stderr, "Got error from mc: status [%d] code [%s] msg: [%s]\n", status, code, msg);
+ // some errors don't have a msg. in this case msg[0] will be \0
+ }
+
+ int remain = 0;
+ // advance us to the next command in the buffer, or ready for the next
+ // mc_read().
+ char *newbuf = mcmc_buffer_consume(c, &remain);
+ printf("remains in buffer: %d\n", remain);
+ if (remain == 0) {
+ assert(newbuf == NULL);
+ // we're done.
+ } else {
+ // there're still some bytes unconsumed by the client.
+ // ensure the next time we call the client, the buffer has those
+ // bytes at the front still.
+ // NOTE: this _could_ be an entirely different buffer if we copied
+ // the data off. The client is just tracking the # of bytes it
+ // didn't gobble.
+ // In this case we shuffle the bytes back to the front of our read
+ // buffer.
+ memmove(rbuf, newbuf, remain);
+ }
+ }
+}
+
+int main (int argc, char *agv[]) {
+ // TODO: detect if C is pre-C11?
+ printf("C version: %ld\n", __STDC_VERSION__);
+
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
+ perror("signal");
+ exit(1);
+ }
+
+ void *c = malloc(mcmc_size(MCMC_OPTION_BLANK));
+ // we only "need" the minimum buf size.
+ // buffers large enough to fit return values result in fewer syscalls.
+ size_t bufsize = mcmc_min_buffer_size(MCMC_OPTION_BLANK) * 2;
+ // buffers are also generally agnostic to clients. The buffer must be
+ // held and re-used when required by the API. When the buffer is empty,
+ // it may be released to a pool or reused with other connections.
+ char *rbuf = malloc(bufsize);
+
+ int status;
+
+ // API is blocking by default.
+ status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_BLANK);
+
+ if (status != MCMC_CONNECTED) {
+ // TODO: mc_strerr(c);
+ fprintf(stderr, "Failed to connect to memcached\n");
+ return -1;
+ }
+
+ char *requests[5] = {"get foo\r\n",
+ "get foob\r\n",
+ "mg foo s t v\r\n",
+ "mg doof s t v Omoo k\r\n",
+ ""};
+
+ for (int x = 0; strlen(requests[x]) != 0; x++) {
+ // provide a buffer, the buffer length, and the number of responses
+ // expected. ie; if pipelining many requests, or using noreply semantics.
+ // FIXME: not confident "number of expected responses" is worth tracking
+ // internally.
+ status = mcmc_send_request(c, requests[x], strlen(requests[x]), 1);
+ //printf("sent request: %s\n", requests[x]);
+
+ if (status != MCMC_OK) {
+ fprintf(stderr, "Failed to send request to memcached\n");
+ return -1;
+ }
+
+ // Regardless of what command we sent, this should print out the response.
+ show_response(c, rbuf, bufsize);
+
+ }
+
+ status = mcmc_disconnect(c);
+ // The only free'ing needed.
+ free(c);
+
+ // TODO: stats example.
+
+ // nonblocking example.
+ c = malloc(mcmc_size(MCMC_OPTION_BLANK));
+ // reuse bufsize/rbuf.
+ status = mcmc_connect(c, "127.0.0.1", "11211", MCMC_OPTION_NONBLOCK);
+ printf("nonblock connecting...\n");
+ struct pollfd pfds[1];
+ if (status == MCMC_CONNECTING) {
+ // need to wait for socket to become writeable.
+ pfds[0].fd = mcmc_fd(c);
+ pfds[0].events = POLLOUT;
+ if (poll(pfds, 1, 1000) != 1) {
+ fprintf(stderr, "poll on connect timed out or failed\n");
+ return -1;
+ }
+ int err = 0;
+ if (pfds[0].revents & POLLOUT && mcmc_check_nonblock_connect(c, &err) == MCMC_OK) {
+ printf("asynchronous connection completed: %d\n", err);
+ } else {
+ printf("failed to connect: %s\n", strerror(err));
+ return -1;
+ }
+ } else {
+ perror("connect");
+ fprintf(stderr, "bad response to nonblock connection: %d\n", status);
+ return -1;
+ }
+
+ // TODO: check socket for errors.
+
+ // TODO: send request
+ status = mcmc_send_request(c, requests[0], strlen(requests[0]), 1);
+ //printf("sent request: %s\n", requests[x]);
+
+ if (status != MCMC_OK) {
+ fprintf(stderr, "Failed to send request to memcached\n");
+ return -1;
+ }
+
+ mcmc_resp_t resp;
+ status = mcmc_read(c, rbuf, bufsize, &resp);
+ // this could race and fail, depending on the system.
+ if (status == MCMC_WANT_READ) {
+ printf("got MCMC_WANT_READ from a too-fast read as expected\n");
+ pfds[0].fd = mcmc_fd(c);
+ pfds[0].events = POLLIN;
+ if (poll(pfds, 1, 1000) != 1) {
+ fprintf(stderr, "poll on connect timed out or failed\n");
+ return -1;
+ }
+ if (pfds[0].revents & POLLIN) {
+ printf("asynchronous read ready\n");
+ }
+
+ show_response(c, rbuf, bufsize);
+ }
+
+ return 0;
+}
diff --git a/vendor/mcmc/mcmc.c b/vendor/mcmc/mcmc.c
new file mode 100644
index 0000000..65b7e68
--- /dev/null
+++ b/vendor/mcmc/mcmc.c
@@ -0,0 +1,728 @@
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+#include <stdio.h>
+
+#include "mcmc.h"
+
+// TODO: if there's a parse error or unknown status code, we likely have a
+// protocol desync and need to disconnect.
+
+// NOTE: this _will_ change a bit for adding TLS support.
+
+// A "reasonable" minimum buffer size to work with.
+// Callers are allowed to create a buffer of any size larger than this.
+// TODO: Put the math/documentation in here.
+// This is essentially the largest return value status line possible.
+// at least doubled for wiggle room.
+#define MIN_BUFFER_SIZE 2048
+
+#define FLAG_BUF_IS_ERROR 0x1
+#define FLAG_BUF_IS_NUMERIC 0x2
+#define FLAG_BUF_WANTED_READ 0x4
+
+#define STATE_DEFAULT 0 // looking for any kind of response
+#define STATE_GET_RESP 1 // processing VALUE's until END
+#define STATE_STAT_RESP 2 // processing STAT's until END
+#define STATE_STAT_RESP_DONE 3
+
+typedef struct mcmc_ctx {
+ int fd;
+ int gai_status; // getaddrinfo() last status.
+ int last_sys_error; // last syscall error (connect/etc?)
+ int sent_bytes_partial; // note for partially sent buffers.
+ int request_queue; // supposed outstanding replies.
+ int fail_code; // recent failure reason.
+ int error; // latest error code.
+ uint32_t status_flags; // internal only flags.
+ int state;
+
+ // FIXME: s/buffer_used/buffer_filled/ ?
+ size_t buffer_used; // amount of bytes read into the buffer so far.
+ size_t buffer_request_len; // cached endpoint for current request
+ char *buffer_head; // buffer pointer currently in use.
+ char *buffer_tail; // consumed tail of the buffer.
+
+ // request response detail.
+ mcmc_resp_t *resp;
+} mcmc_ctx_t;
+
+// INTERNAL FUNCTIONS
+
+static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) {
+ char *buf = ctx->buffer_head;
+ // we know that "VALUE " has matched, so skip that.
+ char *p = buf+6;
+ size_t l = ctx->buffer_request_len;
+
+ // <key> <flags> <bytes> [<cas unique>]
+ char *key = p;
+ int keylen;
+ p = memchr(p, ' ', l - 6);
+ if (p == NULL) {
+ // FIXME: these should return MCMC_ERR and set the internal parse
+ // error code.
+ return MCMC_PARSE_ERROR;
+ }
+
+ keylen = p - key;
+
+ // convert flags into something useful.
+ // FIXME: do we need to prevent overruns in strtoul?
+ // we know for sure the line will eventually end in a \n.
+ char *n = NULL;
+ errno = 0;
+ uint32_t flags = strtoul(p, &n, 10);
+ if ((errno == ERANGE) || (p == n) || (*n != ' ')) {
+ return MCMC_PARSE_ERROR;
+ }
+ p = n;
+
+ errno = 0;
+ uint32_t bytes = strtoul(p, &n, 10);
+ if ((errno == ERANGE) || (p == n)) {
+ return MCMC_PARSE_ERROR;
+ }
+ p = n;
+
+ // If next byte is a space, we read the optional CAS value.
+ uint64_t cas = 0;
+ if (*n == ' ') {
+ errno = 0;
+ cas = strtoull(p, &n, 10);
+ if ((errno == ERANGE) || (p == n)) {
+ return MCMC_PARSE_ERROR;
+ }
+ }
+
+ // If we made it this far, we've parsed everything, stuff the details into
+ // the context for fetching later.
+ mcmc_resp_t *r = ctx->resp;
+ // FIXME: set to NULL if we don't have the value?
+ r->value = ctx->buffer_tail;
+ r->vlen = bytes + 2; // add in the \r\n
+ int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ if (buffer_remain >= r->vlen) {
+ r->vlen_read = r->vlen;
+ ctx->buffer_tail += r->vlen;
+ } else {
+ r->vlen_read = buffer_remain;
+ }
+ r->key = key;
+ r->klen = keylen;
+ r->flags = flags;
+ r->cas = cas;
+ r->type = MCMC_RESP_GET;
+ ctx->state = STATE_GET_RESP;
+
+ // NOTE: if value_offset < buffer_used, has part of the value in the
+ // buffer already.
+
+ return MCMC_OK;
+}
+
+// FIXME: This is broken for ASCII multiget.
+// if we get VALUE back, we need to stay in ASCII GET read mode until an END
+// is seen.
+static int _mcmc_parse_response(mcmc_ctx_t *ctx) {
+ char *buf = ctx->buffer_head;
+ char *cur = buf;
+ size_t l = ctx->buffer_request_len;
+ int rlen; // response code length.
+ int more = 0;
+ mcmc_resp_t *r = ctx->resp;
+ r->reslen = ctx->buffer_request_len;
+ r->type = MCMC_RESP_GENERIC;
+
+ // walk until the \r\n
+ while (l-- > 2) {
+ if (*cur == ' ') {
+ more = 1;
+ break;
+ }
+ cur++;
+ }
+ rlen = cur - buf;
+
+ // incr/decr returns a number with no code :(
+ // not checking length first since buf must have at least one char to
+ // enter this function.
+ if (buf[0] >= '0' && buf[0] <= '9') {
+ // TODO: parse it as a number on request.
+ // TODO: validate whole thing as digits here?
+ ctx->status_flags |= FLAG_BUF_IS_NUMERIC;
+ r->type = MCMC_RESP_NUMERIC;
+ return MCMC_OK;
+ }
+
+ if (rlen < 2) {
+ ctx->error = MCMC_PARSE_ERROR_SHORT;
+ return MCMC_ERR;
+ }
+
+ int rv = MCMC_OK;
+ int code = MCMC_CODE_OK;
+ switch (rlen) {
+ case 2:
+ // meta, "OK"
+ // FIXME: adding new return codes would make the client completely
+ // fail. The rest of the client is agnostic to requests/flags for
+ // meta.
+ // can we make it agnostic for return codes outside of "read this
+ // data" types?
+ // As-is it should fail down to the "send the return code to the
+ // user". not sure that's right.
+ r->type = MCMC_RESP_META;
+ switch (buf[0]) {
+ case 'E':
+ if (buf[1] == 'N') {
+ code = MCMC_CODE_MISS;
+ // TODO: RESP type
+ } else if (buf[1] == 'X') {
+ code = MCMC_CODE_EXISTS;
+ }
+ break;
+ case 'H':
+ if (buf[1] == 'D') {
+ // typical meta response.
+ code = MCMC_CODE_OK;
+ }
+ break;
+ case 'M':
+ if (buf[1] == 'N') {
+ // specific return code so user can see pipeline end.
+ code = MCMC_CODE_NOP;
+ } else if (buf[1] == 'E') {
+ // ME is the debug output line.
+ // TODO: this just gets returned as an rline?
+ // specific code? specific type?
+ // ME <key> <key=value debug line>
+ rv = MCMC_OK;
+ }
+ break;
+ case 'N':
+ if (buf[1] == 'F') {
+ code = MCMC_CODE_NOT_FOUND;
+ } else if (buf[1] == 'S') {
+ code = MCMC_CODE_NOT_STORED;
+ }
+ break;
+ case 'O':
+ if (buf[1] == 'K') {
+ // Used by many random management commands
+ r->type = MCMC_RESP_GENERIC;
+ }
+ break;
+ case 'V':
+ if (buf[1] == 'A') {
+ // VA <size> <flags>*\r\n
+ if (more) {
+ errno = 0;
+ char *n = NULL;
+ uint32_t vsize = strtoul(cur, &n, 10);
+ if ((errno == ERANGE) || (cur == n)) {
+ rv = MCMC_ERR;
+ } else {
+ r->value = ctx->buffer_tail;
+ r->vlen = vsize + 2; // tag in the \r\n.
+ // FIXME: macro.
+ int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ if (buffer_remain >= r->vlen) {
+ r->vlen_read = r->vlen;
+ ctx->buffer_tail += r->vlen;
+ } else {
+ r->vlen_read = buffer_remain;
+ }
+ cur = n;
+ if (*cur != ' ') {
+ more = 0;
+ }
+ }
+ } else {
+ rv = MCMC_ERR;
+ }
+ }
+ break;
+ }
+ // maybe: if !rv and !fail, do something special?
+ // if (more), there are flags. shove them in the right place.
+ if (more) {
+ r->rline = cur+1; // eat the space.
+ r->rlen = l-1;
+ } else {
+ r->rline = NULL;
+ r->rlen = 0;
+ }
+ break;
+ case 3:
+ if (memcmp(buf, "END", 3) == 0) {
+ // Either end of STAT results, or end of ascii GET key list.
+ ctx->state = STATE_DEFAULT;
+ // FIXME: caller needs to understand if this is a real miss.
+ code = MCMC_CODE_MISS;
+ r->type = MCMC_RESP_END;
+ rv = MCMC_OK;
+ }
+ break;
+ case 4:
+ if (memcmp(buf, "STAT", 4) == 0) {
+ r->type = MCMC_RESP_STAT;
+ ctx->state = STATE_STAT_RESP;
+ // TODO: initialize stat reader mode.
+ }
+ break;
+ case 5:
+ if (memcmp(buf, "VALUE", 5) == 0) {
+ if (more) {
+ // <key> <flags> <bytes> [<cas unique>]
+ rv = _mcmc_parse_value_line(ctx);
+ } else {
+ rv = MCMC_ERR; // FIXME: parse error.
+ }
+ }
+ break;
+ case 6:
+ if (memcmp(buf, "STORED", 6) == 0) {
+ code = MCMC_CODE_STORED;
+ } else if (memcmp(buf, "EXISTS", 6) == 0) {
+ code = MCMC_CODE_EXISTS;
+ // TODO: type -> ASCII?
+ }
+ break;
+ case 7:
+ if (memcmp(buf, "DELETED", 7) == 0) {
+ code = MCMC_CODE_DELETED;
+ } else if (memcmp(buf, "TOUCHED", 7) == 0) {
+ code = MCMC_CODE_TOUCHED;
+ } else if (memcmp(buf, "VERSION", 7) == 0) {
+ code = MCMC_CODE_VERSION;
+ r->type = MCMC_RESP_VERSION;
+ // TODO: prep the version line for return
+ }
+ break;
+ case 9:
+ if (memcmp(buf, "NOT_FOUND", 9) == 0) {
+ code = MCMC_CODE_NOT_FOUND;
+ }
+ break;
+ case 10:
+ if (memcmp(buf, "NOT_STORED", 10) == 0) {
+ code = MCMC_CODE_NOT_STORED;
+ }
+ break;
+ default:
+ // Unknown code, assume error.
+ break;
+ }
+
+ r->code = code;
+ if (rv == -1) {
+ // TODO: Finish this.
+ ctx->status_flags |= FLAG_BUF_IS_ERROR;
+ rv = MCMC_ERR;
+ }
+
+ return rv;
+}
+
+// EXTERNAL API
+
+int mcmc_fd(void *c) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ return ctx->fd;
+}
+
+size_t mcmc_size(int options) {
+ return sizeof(mcmc_ctx_t);
+}
+
+// Allow returning this dynamically based on options set.
+// FIXME: it might be more flexible to call this after mcmc_connect()...
+// but this is probably more convenient for the caller if it's less dynamic.
+size_t mcmc_min_buffer_size(int options) {
+ return MIN_BUFFER_SIZE;
+}
+
+char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) {
+ mcmc_ctx_t *ctx = c;
+ char *b = buf + ctx->buffer_used;
+ *bufremain = bufsize - ctx->buffer_used;
+ return b;
+}
+
+// Directly parse a buffer with read data of size len.
+// r->reslen + r->vlen_read is the bytes consumed from the buffer read.
+// Caller manages how to retry if MCMC_WANT_READ or an error happens.
+// FIXME: not sure if to keep this command to a fixed buffer size, or continue
+// to use the ctx->buffer_used bits... if we keep the buffer_used stuff caller can
+// loop without memmove'ing the buffer?
+int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r) {
+ mcmc_ctx_t *ctx = c;
+ char *el;
+ ctx->buffer_used += read;
+
+ el = memchr(buf, '\n', ctx->buffer_used);
+ if (el == NULL) {
+ return MCMC_WANT_READ;
+ }
+
+ memset(r, 0, sizeof(*r));
+
+ // Consume through the newline.
+ // buffer_tail now points to where value could start.
+ // FIXME: ctx->value ?
+ ctx->buffer_tail = el+1;
+
+ // FIXME: the server must be stricter in what it sends back. should always
+ // have a \r. check for it and fail?
+ ctx->buffer_request_len = ctx->buffer_tail - buf;
+ // leave the \r\n in the line end cache.
+ ctx->buffer_head = buf;
+ // TODO: handling for nonblock case.
+
+ // We have a result line. Now pass it through the parser.
+ // Then we indicate to the user that a response is ready.
+ ctx->resp = r;
+ return _mcmc_parse_response(ctx);
+}
+
+/*** Functions wrapping syscalls **/
+
+// TODO: should be able to flip between block and nonblock.
+
+// used for checking on async connections.
+int mcmc_check_nonblock_connect(void *c, int *err) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ socklen_t errsize = sizeof(*err);
+ if (getsockopt(ctx->fd, SOL_SOCKET, SO_ERROR, err, &errsize) == 0) {
+ if (*err == 0) {
+ return MCMC_OK;
+ }
+ } else {
+ // getsockopt failed. still need to pass up the error.
+ *err = errno;
+ }
+
+ return MCMC_ERR;
+}
+
+// TODO:
+// - option for connecting 4 -> 6 or 6 -> 4
+// connect_unix()
+// connect_bind_tcp()
+// ^ fill an internal struct from the stack and call into this central
+// connect?
+int mcmc_connect(void *c, char *host, char *port, int options) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+
+ int s;
+ int sock;
+ int res = MCMC_CONNECTED;
+ struct addrinfo hints;
+ struct addrinfo *ai;
+ struct addrinfo *next;
+
+ // Since our cx memory was likely malloc'ed, ensure we start clear.
+ memset(ctx, 0, sizeof(mcmc_ctx_t));
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+
+ s = getaddrinfo(host, port, &hints, &ai);
+
+ if (s != 0) {
+ hints.ai_family = AF_INET6;
+ s = getaddrinfo(host, port, &hints, &ai);
+ if (s != 0) {
+ // TODO: gai_strerror(s)
+ ctx->gai_status = s;
+ res = MCMC_ERR;
+ goto end;
+ }
+ }
+
+ for (next = ai; next != NULL; next = next->ai_next) {
+ sock = socket(next->ai_family, next->ai_socktype,
+ next->ai_protocol);
+ if (sock == -1)
+ continue;
+
+ // TODO: NONBLOCK
+ if (options & MCMC_OPTION_NONBLOCK) {
+ int flags = fcntl(sock, F_GETFL);
+ if (flags < 0) {
+ res = MCMC_ERR;
+ close(sock);
+ goto end;
+ }
+ if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
+ res = MCMC_ERR;
+ close(sock);
+ goto end;
+ }
+ res = MCMC_CONNECTING;
+
+ if (connect(sock, next->ai_addr, next->ai_addrlen) != -1) {
+ if (errno == EINPROGRESS) {
+ break; // We're good, stop the loop.
+ }
+ }
+
+ break;
+ } else {
+ // TODO: BIND local port.
+ if (connect(sock, next->ai_addr, next->ai_addrlen) != -1)
+ break;
+ }
+
+ close(sock);
+ }
+
+ // TODO: cache last connect status code?
+ if (next == NULL) {
+ res = MCMC_ERR;
+ goto end;
+ }
+
+ ctx->fd = sock;
+end:
+ if (ai) {
+ freeaddrinfo(ai);
+ }
+ return res;
+}
+
+// NOTE: if WANT_WRITE returned, call with same arguments.
+// FIXME: len -> size_t?
+// TODO: rename to mcmc_request_send
+int mcmc_send_request(void *c, const char *request, int len, int count) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+
+ // adjust our send buffer by how much has already been sent.
+ const char *r = request + ctx->sent_bytes_partial;
+ int l = len - ctx->sent_bytes_partial;
+ int sent = send(ctx->fd, r, l, 0);
+ if (sent == -1) {
+ // implicitly handle nonblock mode.
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return MCMC_WANT_WRITE;
+ } else {
+ return MCMC_ERR;
+ }
+ }
+
+ if (sent < len) {
+ // can happen anytime, but mostly in nonblocking mode.
+ ctx->sent_bytes_partial += sent;
+ return MCMC_WANT_WRITE;
+ } else {
+ ctx->request_queue += count;
+ ctx->sent_bytes_partial = 0;
+ }
+
+ return MCMC_OK;
+}
+
+// TODO: pretty sure I don't want this function chewing on a submitted iov
+// stack, but it might make for less client code :(
+// so for now, lets not.
+int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ // need to track sent vs tosend to know when to update counters.
+ ssize_t tosend = 0;
+ for (int i = 0; i < iovcnt; i++) {
+ tosend += iov[i].iov_len;
+ }
+
+ *sent = writev(ctx->fd, iov, iovcnt);
+ if (*sent == -1) {
+ // implicitly handle nonblock mode.
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return MCMC_WANT_WRITE;
+ } else {
+ return MCMC_ERR;
+ }
+ }
+
+ if (*sent < tosend) {
+ // can happen anytime, but mostly in nonblocking mode.
+ return MCMC_WANT_WRITE;
+ } else {
+ // FIXME: user has to keep submitting the same count value...
+ // should decide on whether or not to give up on this.
+ ctx->request_queue += count;
+ }
+
+ return MCMC_OK;
+}
+
+// TODO: avoid recv if we have bytes in the buffer.
+int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ char *el;
+ memset(r, 0, sizeof(*r));
+
+ // If there's still data in the buffer try to use it before potentially
+ // hanging on the network read.
+ // Also skip this check if we specifically wanted more bytes from net.
+ if (ctx->buffer_used && !(ctx->status_flags & FLAG_BUF_WANTED_READ)) {
+ el = memchr(buf, '\n', ctx->buffer_used);
+ if (el) {
+ goto parse;
+ }
+ }
+
+ // adjust buffer by how far we've already consumed.
+ char *b = buf + ctx->buffer_used;
+ size_t l = bufsize - ctx->buffer_used;
+
+ int read = recv(ctx->fd, b, l, 0);
+ if (read == 0) {
+ return MCMC_NOT_CONNECTED;
+ } else if (read == -1) {
+ // implicitly handle nonblocking configurations.
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return MCMC_WANT_READ;
+ } else {
+ return MCMC_ERR;
+ }
+ }
+
+ ctx->buffer_used += read;
+
+ // Always scan from the start of the original buffer.
+ el = memchr(buf, '\n', ctx->buffer_used);
+ if (!el) {
+ // FIXME: error if buffer is full but no \n is found.
+ ctx->status_flags |= FLAG_BUF_WANTED_READ;
+ return MCMC_WANT_READ;
+ }
+parse:
+ // Consume through the newline.
+ // buffer_tail now points to where a value could start.
+ ctx->buffer_tail = el+1;
+
+ // FIXME: the server must be stricter in what it sends back. should always
+ // have a \r. check for it and fail?
+ ctx->buffer_request_len = ctx->buffer_tail - buf;
+ // leave the \r\n in the line end cache.
+ ctx->buffer_head = buf;
+ // TODO: handling for nonblock case.
+
+ // We have a result line. Now pass it through the parser.
+ // Then we indicate to the user that a response is ready.
+ ctx->resp = r;
+ return _mcmc_parse_response(ctx);
+}
+
+void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen) {
+ code[0] = '\0';
+ msg[0] = '\0';
+}
+
+int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+
+ // If the distance between tail/head is smaller than what we read into the
+ // main buffer, we have some value to copy out.
+ int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ if (leftover > 0) {
+ int tocopy = leftover > vsize ? vsize : leftover;
+ memcpy(val + *read, ctx->buffer_tail, tocopy);
+ ctx->buffer_tail += tocopy;
+ *read += tocopy;
+ if (leftover > tocopy) {
+ // FIXME: think we need a specific code for "value didn't fit"
+ return MCMC_WANT_READ;
+ }
+ }
+
+ return MCMC_OK;
+}
+
+// read into the buffer, up to a max size of vsize.
+// will read (vsize-read) into the buffer pointed to by (val+read).
+// you are able to stream the value into different buffers, or process the
+// value and reuse the same buffer, by adjusting vsize and *read between
+// calls.
+// vsize must not be larger than the remaining value size pending read.
+int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ size_t l;
+
+ // If the distance between tail/head is smaller than what we read into the
+ // main buffer, we have some value to copy out.
+ int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
+ if (leftover > 0) {
+ int tocopy = leftover > vsize ? vsize : leftover;
+ memcpy(val + *read, ctx->buffer_tail, tocopy);
+ ctx->buffer_tail += tocopy;
+ *read += tocopy;
+ if (leftover > tocopy) {
+ // FIXME: think we need a specific code for "value didn't fit"
+ return MCMC_WANT_READ;
+ }
+ }
+
+ char *v = val + *read;
+ l = vsize - *read;
+
+ int r = recv(ctx->fd, v, l, 0);
+ if (r == 0) {
+ // TODO: some internal disconnect work?
+ return MCMC_NOT_CONNECTED;
+ }
+ // FIXME: EAGAIN || EWOULDBLOCK!
+ if (r == -1) {
+ return MCMC_ERR;
+ }
+
+ *read += r;
+
+ if (*read < vsize) {
+ return MCMC_WANT_READ;
+ } else {
+ return MCMC_OK;
+ }
+}
+
+char *mcmc_buffer_consume(void *c, int *remain) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+ ctx->buffer_used -= ctx->buffer_tail - ctx->buffer_head;
+ int used = ctx->buffer_used;
+ char *newbuf = ctx->buffer_tail;
+
+ // FIXME: request_queue-- is in the wrong place.
+ // TODO: which of these _must_ be reset between requests? I think very
+ // little?
+ ctx->request_queue--;
+ ctx->status_flags = 0;
+ ctx->buffer_head = NULL;
+ ctx->buffer_tail = NULL;
+
+ if (used) {
+ *remain = used;
+ return newbuf;
+ } else {
+ return NULL;
+ }
+}
+
+int mcmc_disconnect(void *c) {
+ mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
+
+ // FIXME: I forget if 0 can be valid.
+ if (ctx->fd != 0) {
+ close(ctx->fd);
+ return MCMC_OK;
+ } else {
+ return MCMC_NOT_CONNECTED;
+ }
+}
diff --git a/vendor/mcmc/mcmc.h b/vendor/mcmc/mcmc.h
new file mode 100644
index 0000000..1769228
--- /dev/null
+++ b/vendor/mcmc/mcmc.h
@@ -0,0 +1,91 @@
+#ifndef MCMC_HEADER
+#define MCMC_HEADER
+
+#define MCMC_OK 0
+#define MCMC_ERR -1
+#define MCMC_NOT_CONNECTED 1
+#define MCMC_CONNECTED 2
+#define MCMC_CONNECTING 3 // nonblock mode.
+#define MCMC_WANT_WRITE 4
+#define MCMC_WANT_READ 5
+#define MCMC_HAS_RESULT 7
+// TODO: either internally set a flag for "ok" or "not ok" and use a func,
+// or use a bitflag here (1<<6) for "OK", (1<<5) for "FAIL", etc.
+// or, we directly return "OK" or "FAIL" and you can ask for specific error.
+#define MCMC_CODE_STORED 8
+#define MCMC_CODE_EXISTS 9
+#define MCMC_CODE_DELETED 10
+#define MCMC_CODE_TOUCHED 11
+#define MCMC_CODE_VERSION 12
+#define MCMC_CODE_NOT_FOUND 13
+#define MCMC_CODE_NOT_STORED 14
+#define MCMC_CODE_OK 15
+#define MCMC_CODE_NOP 16
+#define MCMC_PARSE_ERROR_SHORT 17
+#define MCMC_PARSE_ERROR 18
+#define MCMC_CODE_MISS 19 // FIXME
+
+
+// response types
+#define MCMC_RESP_GET 100
+#define MCMC_RESP_META 101
+#define MCMC_RESP_STAT 102
+#define MCMC_RESP_GENERIC 104
+#define MCMC_RESP_END 105
+#define MCMC_RESP_VERSION 106
+#define MCMC_RESP_NUMERIC 107 // for weird incr/decr syntax.
+
+#define MCMC_OPTION_BLANK 0
+#define MCMC_OPTION_NONBLOCK 1
+
+// convenience defines. if you want to save RAM you can set these smaller and
+// error handler will only copy what you ask for.
+#define MCMC_ERROR_CODE_MAX 32
+#define MCMC_ERROR_MSG_MAX 512
+
+typedef struct {
+ unsigned short type;
+ unsigned short code;
+ char *value; // pointer to start of value in buffer.
+ size_t reslen; // full length of the response line
+ size_t vlen_read; // amount of value that was in supplied buffer.
+ size_t vlen; // reslen + vlen is the full length of the response.
+ union {
+ // META response
+ struct {
+ char *rline; // start of meta response line.
+ size_t rlen;
+ };
+ // GET response
+ struct {
+ char *key;
+ size_t klen;
+ uint32_t flags;
+ uint64_t cas;
+ // TODO: value info
+ };
+ // STAT response
+ struct {
+ char *stat;
+ size_t slen;
+ };
+ };
+} mcmc_resp_t;
+
+int mcmc_fd(void *c);
+size_t mcmc_size(int options);
+size_t mcmc_min_buffer_size(int options);
+char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain);
+int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r);
+int mcmc_connect(void *c, char *host, char *port, int options);
+int mcmc_check_nonblock_connect(void *c, int *err);
+int mcmc_send_request(void *c, const char *request, int len, int count);
+int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count);
+int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r);
+int mcmc_read_value(void *c, char *val, const size_t vsize, int *read);
+int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read);
+char *mcmc_buffer_consume(void *c, int *remain);
+int mcmc_disconnect(void *c);
+void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen);
+
+#endif