summaryrefslogtreecommitdiff
path: root/proxy.h
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.h')
-rw-r--r--proxy.h471
1 files changed, 471 insertions, 0 deletions
diff --git a/proxy.h b/proxy.h
new file mode 100644
index 0000000..f64952c
--- /dev/null
+++ b/proxy.h
@@ -0,0 +1,471 @@
+#ifndef PROXY_H
+#define PROXY_H
+
+#include "memcached.h"
+#include <string.h>
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <lauxlib.h>
+
+#include "config.h"
+
+#if defined(__linux__)
+#define USE_EVENTFD 1
+#include <sys/eventfd.h>
+#endif
+
+#ifdef HAVE_LIBURING
+#include <liburing.h>
+#include <poll.h> // POLLOUT for liburing.
+#define PRING_QUEUE_SQ_ENTRIES 2048
+#define PRING_QUEUE_CQ_ENTRIES 16384
+#endif
+
+#include "proto_proxy.h"
+#include "proto_text.h"
+#include "queue.h"
+#define XXH_INLINE_ALL // modifier for xxh3's include below
+#include "xxhash.h"
+
+#ifdef PROXY_DEBUG
+#define P_DEBUG(...) \
+ do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+#else
+#define P_DEBUG(...)
+#endif
+
+#define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex);
+#define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex);
+#define WSTAT_INCR(c, stat, amount) { \
+ pthread_mutex_lock(&c->thread->stats.mutex); \
+ c->thread->stats.stat += amount; \
+ pthread_mutex_unlock(&c->thread->stats.mutex); \
+}
+#define WSTAT_DECR(c, stat, amount) { \
+ pthread_mutex_lock(&c->thread->stats.mutex); \
+ c->thread->stats.stat -= amount; \
+ pthread_mutex_unlock(&c->thread->stats.mutex); \
+}
+#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
+#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
+#define STAT_INCR(ctx, stat, amount) { \
+ pthread_mutex_lock(&ctx->stats_lock); \
+ ctx->global_stats.stat += amount; \
+ pthread_mutex_unlock(&ctx->stats_lock); \
+}
+
+#define STAT_DECR(ctx, stat, amount) { \
+ pthread_mutex_lock(&ctx->stats_lock); \
+ ctx->global_stats.stat -= amount; \
+ pthread_mutex_unlock(&ctx->stats_lock); \
+}
+
+// FIXME (v2): do include dir properly.
+#include "vendor/mcmc/mcmc.h"
+
+// Note: value created from thin air. Could be shorter.
+#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2
+
+#define ENDSTR "END\r\n"
+#define ENDLEN sizeof(ENDSTR)-1
+
+#define MCP_THREAD_UPVALUE 1
+#define MCP_ATTACH_UPVALUE 2
+#define MCP_BACKEND_UPVALUE 3
+
+// all possible commands.
+#define CMD_FIELDS \
+ X(CMD_MG) \
+ X(CMD_MS) \
+ X(CMD_MD) \
+ X(CMD_MN) \
+ X(CMD_MA) \
+ X(CMD_ME) \
+ X(CMD_GET) \
+ X(CMD_GAT) \
+ X(CMD_SET) \
+ X(CMD_ADD) \
+ X(CMD_CAS) \
+ X(CMD_GETS) \
+ X(CMD_GATS) \
+ X(CMD_INCR) \
+ X(CMD_DECR) \
+ X(CMD_TOUCH) \
+ X(CMD_APPEND) \
+ X(CMD_DELETE) \
+ X(CMD_REPLACE) \
+ X(CMD_PREPEND) \
+ X(CMD_END_STORAGE) \
+ X(CMD_QUIT) \
+ X(CMD_STATS) \
+ X(CMD_SLABS) \
+ X(CMD_WATCH) \
+ X(CMD_LRU) \
+ X(CMD_VERSION) \
+ X(CMD_SHUTDOWN) \
+ X(CMD_EXTSTORE) \
+ X(CMD_FLUSH_ALL) \
+ X(CMD_VERBOSITY) \
+ X(CMD_LRU_CRAWLER) \
+ X(CMD_REFRESH_CERTS) \
+ X(CMD_CACHE_MEMLIMIT)
+
+#define X(name) name,
+enum proxy_defines {
+ P_OK = 0,
+ CMD_FIELDS
+ CMD_SIZE, // used to define array size for command hooks.
+ CMD_ANY, // override _all_ commands
+ CMD_ANY_STORAGE, // override commands specific to key storage.
+ CMD_FINAL, // end cap for convenience.
+};
+#undef X
+
+// certain classes of ascii commands have similar parsing (ie;
+// get/gets/gat/gats). Use types so we don't have to test a ton of them.
+enum proxy_cmd_types {
+ CMD_TYPE_GENERIC = 0,
+ CMD_TYPE_GET, // get/gets/gat/gats
+ CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
+ CMD_TYPE_META, // m*'s.
+};
+
+enum proxy_be_failures {
+ P_BE_FAIL_TIMEOUT = 0,
+ P_BE_FAIL_DISCONNECTED,
+ P_BE_FAIL_CONNECTING,
+ P_BE_FAIL_WRITING,
+ P_BE_FAIL_READING,
+ P_BE_FAIL_PARSING,
+};
+
+typedef struct _io_pending_proxy_t io_pending_proxy_t;
+typedef struct proxy_event_thread_s proxy_event_thread_t;
+
+#ifdef HAVE_LIBURING
+typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe);
+typedef struct {
+ void *udata;
+ proxy_event_cb cb;
+ bool set; // NOTE: not sure if necessary if code structured properly
+} proxy_event_t;
+
+static struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
+static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
+static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
+static void *proxy_event_thread_ur(void *arg);
+static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
+#endif
+
+// Note: This ends up wasting a few counters, but simplifies the rest of the
+// process for handling internal worker stats.
+struct proxy_int_stats {
+ uint64_t counters[CMD_FINAL];
+};
+
+struct proxy_user_stats {
+ size_t num_stats; // number of stats, for sizing various arrays
+ char **names; // not needed for worker threads
+ uint64_t *counters; // array of counters.
+};
+
+struct proxy_global_stats {
+ uint64_t config_reloads;
+ uint64_t config_reload_fails;
+ uint64_t backend_total;
+ uint64_t backend_disconn; // backends with no connections
+ uint64_t backend_requests; // reqs sent to backends
+ uint64_t backend_responses; // responses received from backends
+ uint64_t backend_errors; // errors from backends
+ uint64_t backend_marked_bad; // backend set to autofail
+ uint64_t backend_failed; // an error caused a backend reset
+};
+
+struct proxy_tunables {
+ struct timeval connect;
+ struct timeval retry; // wait time before retrying a dead backend
+ struct timeval read;
+#ifdef HAVE_LIBURING
+ struct __kernel_timespec connect_ur;
+ struct __kernel_timespec retry_ur;
+ struct __kernel_timespec read_ur;
+#endif // HAVE_LIBURING
+ int backend_failure_limit;
+ bool tcp_keepalive;
+};
+
+typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
+typedef struct {
+ lua_State *proxy_state;
+ void *proxy_code;
+ proxy_event_thread_t *proxy_threads;
+ pthread_mutex_t config_lock;
+ pthread_cond_t config_cond;
+ pthread_t config_tid;
+ pthread_mutex_t worker_lock;
+ pthread_cond_t worker_cond;
+ pthread_t manager_tid; // deallocation management thread
+ pthread_mutex_t manager_lock;
+ pthread_cond_t manager_cond;
+ pool_head_t manager_head; // stack for pool deallocation.
+ bool worker_done; // signal variable for the worker lock/cond system.
+ bool worker_failed; // covered by worker_lock as well.
+ bool use_uring; // use IO_URING for backend connections.
+ struct proxy_global_stats global_stats;
+ struct proxy_user_stats user_stats;
+ struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
+ pthread_mutex_t stats_lock; // used for rare global counters
+} proxy_ctx_t;
+
+struct proxy_hook {
+ int lua_ref;
+ bool is_lua; // pull the lua reference and call it as a lua function.
+};
+
+// TODO (v2): some hash functions (crc?) might require initializers. If we run into
+// any the interface might need expanding.
+typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed);
+struct proxy_hash_func {
+ key_hash_func func;
+};
+typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen);
+typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx);
+struct proxy_hash_caller {
+ hash_selector_func selector_func;
+ void *ctx;
+};
+
+enum mcp_backend_states {
+ mcp_backend_read = 0, // waiting to read any response
+ mcp_backend_parse, // have some buffered data to check
+ mcp_backend_read_end, // looking for an "END" marker for GET
+ mcp_backend_want_read, // read more data to complete command
+ mcp_backend_next, // advance to the next IO
+};
+
+typedef struct mcp_backend_s mcp_backend_t;
+typedef struct mcp_request_s mcp_request_t;
+typedef struct mcp_parser_s mcp_parser_t;
+
+#define PARSER_MAX_TOKENS 24
+
+struct mcp_parser_meta_s {
+ uint64_t flags;
+};
+
+// Note that we must use offsets into request for tokens,
+// as *request can change between parsing and later accessors.
+struct mcp_parser_s {
+ const char *request;
+ void *vbuf; // temporary buffer for holding value lengths.
+ uint8_t command;
+ uint8_t cmd_type; // command class.
+ uint8_t ntokens;
+ uint8_t keytoken; // because GAT. sigh. also cmds without a key.
+ uint32_t parsed; // how far into the request we parsed already
+ uint32_t reqlen; // full length of request buffer.
+ int vlen;
+ uint32_t klen; // length of key.
+ uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
+ bool has_space; // a space was found after the last byte parsed.
+ union {
+ struct mcp_parser_meta_s meta;
+ } t;
+};
+
+#define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]])
+
+#define MAX_REQ_TOKENS 2
+struct mcp_request_s {
+ mcp_parser_t pr; // non-lua-specific parser handling.
+ struct timeval start; // time this object was created.
+ mcp_backend_t *be; // backend handling this request.
+ bool ascii_multiget; // ascii multiget mode. (hide errors/END)
+ bool was_modified; // need to rewrite the request
+ int tokent_ref; // reference to token table if modified.
+ char request[];
+};
+
+typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
+#define MAX_IPLEN 45
+#define MAX_PORTLEN 6
+// TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
+// don't have a good temporary space and don't want to malloc/free on every
+// write. transmit() uses the stack but we can't do that for uring's use case.
+#if (IOV_MAX > 128)
+#define BE_IOV_MAX 128
+#else
+#define BE_IOV_MAX IOV_MAX
+#endif
+struct mcp_backend_s {
+ char ip[MAX_IPLEN+1];
+ char port[MAX_PORTLEN+1];
+ double weight;
+ int depth;
+ int failed_count; // number of fails (timeouts) in a row
+ pthread_mutex_t mutex; // covers stack.
+ proxy_event_thread_t *event_thread; // event thread owning this backend.
+ void *client; // mcmc client
+ STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
+ io_head_t io_head; // stack of requests.
+ char *rbuf; // static allocated read buffer.
+ struct event event; // libevent
+#ifdef HAVE_LIBURING
+ proxy_event_t ur_rd_ev; // liburing.
+ proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
+ proxy_event_t ur_te_ev; // for timeout handling
+#endif
+ enum mcp_backend_states state; // readback state machine
+ int connect_flags; // flags to pass to mcmc_connect
+ bool connecting; // in the process of an asynch connection.
+ bool can_write; // recently got a WANT_WRITE or are connecting.
+ bool stacked; // if backend already queued for syscalls.
+ bool bad; // timed out, marked as bad.
+ struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
+};
+typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
+
+struct proxy_event_thread_s {
+ pthread_t thread_id;
+ struct event_base *base;
+ struct event notify_event; // listen event for the notify pipe/eventfd.
+ struct event clock_event; // timer for updating event thread data.
+#ifdef HAVE_LIBURING
+ struct io_uring ring;
+ proxy_event_t ur_notify_event; // listen on eventfd.
+ proxy_event_t ur_clock_event; // timer for updating event thread data.
+ eventfd_t event_counter;
+ bool use_uring;
+#endif
+ pthread_mutex_t mutex; // covers stack.
+ pthread_cond_t cond; // condition to wait on while stack drains.
+ io_head_t io_head_in; // inbound requests to process.
+ be_head_t be_head; // stack of backends for processing.
+#ifdef USE_EVENTFD
+ int event_fd;
+#else
+ int notify_receive_fd;
+ int notify_send_fd;
+#endif
+ proxy_ctx_t *ctx; // main context.
+ struct proxy_tunables tunables; // periodically copied from main ctx
+};
+
+#define RESP_CMD_MAX 8
+typedef struct {
+ mcmc_resp_t resp;
+ struct timeval start; // start time inherited from paired request
+ char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
+ int status; // status code from mcmc_read()
+ char *buf; // response line + potentially value.
+ size_t blen; // total size of the value to read.
+ int bread; // amount of bytes read into value so far.
+} mcp_resp_t;
+
+// re-cast an io_pending_t into this more descriptive structure.
+// the first few items _must_ match the original struct.
+struct _io_pending_proxy_t {
+ int io_queue_type;
+ LIBEVENT_THREAD *thread;
+ conn *c;
+ mc_resp *resp; // original struct ends here
+
+ struct _io_pending_proxy_t *next; // stack for IO submission
+ STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
+ int coro_ref; // lua registry reference to the coroutine
+ int mcpres_ref; // mcp.res reference used for await()
+ lua_State *coro; // pointer directly to the coroutine
+ mcp_backend_t *backend; // backend server to request from
+ struct iovec iov[2]; // request string + tail buffer
+ int iovcnt; // 1 or 2...
+ unsigned int iovbytes; // total bytes in the iovec
+ int await_ref; // lua reference if we were an await object
+ mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
+ bool flushed; // whether we've fully written this request to a backend.
+ bool ascii_multiget; // passed on from mcp_r_t
+ bool is_await; // are we an await object?
+ bool await_first; // are we the main route for an await object?
+};
+
+// Note: does *be have to be a sub-struct? how stable are userdata pointers?
+// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
+// - says no.
+typedef struct {
+ int ref; // luaL_ref reference.
+ mcp_backend_t *be;
+} mcp_pool_be_t;
+
+#define KEY_HASH_FILTER_MAX 5
+typedef struct mcp_pool_s mcp_pool_t;
+struct mcp_pool_s {
+ struct proxy_hash_caller phc;
+ key_hash_filter_func key_filter;
+ key_hash_func key_hasher;
+ pthread_mutex_t lock; // protects refcount.
+ proxy_ctx_t *ctx; // main context.
+ STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
+ char key_filter_conf[KEY_HASH_FILTER_MAX+1];
+ uint64_t hash_seed; // calculated from a string.
+ int refcount;
+ int phc_ref;
+ int self_ref; // TODO (v2): double check that this is needed.
+ int pool_size;
+ mcp_pool_be_t pool[];
+};
+
+typedef struct {
+ mcp_pool_t *main; // ptr to original
+} mcp_pool_proxy_t;
+
+// networking interface
+void proxy_init_evthread_events(proxy_event_thread_t *t);
+void *proxy_event_thread(void *arg);
+
+// await interface
+enum mcp_await_e {
+ AWAIT_GOOD = 0, // looks for OK + NOT MISS
+ AWAIT_ANY, // any response, including errors,
+ AWAIT_OK, // any non-error response
+ AWAIT_FIRST, // return the result from the first pool
+};
+int mcplib_await(lua_State *L);
+int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
+int mcplib_await_return(io_pending_proxy_t *p);
+
+// user stats interface
+int mcplib_add_stat(lua_State *L);
+int mcplib_stat(lua_State *L);
+size_t _process_request_next_key(mcp_parser_t *pr);
+int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen);
+mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen);
+
+// request interface
+int mcplib_request(lua_State *L);
+int mcplib_request_command(lua_State *L);
+int mcplib_request_key(lua_State *L);
+int mcplib_request_ltrimkey(lua_State *L);
+int mcplib_request_rtrimkey(lua_State *L);
+int mcplib_request_token(lua_State *L);
+int mcplib_request_ntokens(lua_State *L);
+int mcplib_request_gc(lua_State *L);
+
+int mcplib_open_dist_jump_hash(lua_State *L);
+
+int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
+mcp_backend_t *mcplib_pool_proxy_call_helper(lua_State *L, mcp_pool_t *p, const char *key, size_t len);
+void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
+void proxy_lua_error(lua_State *L, const char *s);
+void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
+int _start_proxy_config_threads(proxy_ctx_t *ctx);
+int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
+
+// TODO (v2): more .h files, perhaps?
+int mcplib_open_hash_xxhash(lua_State *L);
+
+__attribute__((unused)) void dump_stack(lua_State *L);
+#endif