/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /** \file * The main memcached header holding commonly used data * structures and function prototypes. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include #include #include #include #include #include #include "protocol_binary.h" #include "cache.h" #include "logger.h" #include "sasl_defs.h" /** Maximum length of a key. */ #define KEY_MAX_LENGTH 250 /** Size of an incr buf. */ #define INCR_MAX_STORAGE_LEN 24 #define DATA_BUFFER_SIZE 2048 #define UDP_READ_BUFFER_SIZE 65536 #define UDP_MAX_PAYLOAD_SIZE 1400 #define UDP_HEADER_SIZE 8 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* I'm told the max length of a 64-bit num converted to string is 20 bytes. * Plus a few for spaces, \r\n, \0 */ #define SUFFIX_SIZE 24 /** Initial size of list of items being returned by "get". */ #define ITEM_LIST_INITIAL 200 /** Initial size of list of CAS suffixes appended to "gets" lines. */ #define SUFFIX_LIST_INITIAL 20 /** Initial size of the sendmsg() scatter/gather array. */ #define IOV_LIST_INITIAL 400 /** Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 /** High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 /* Binary protocol stuff */ #define MIN_BIN_PKT_LENGTH 16 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) /* Initial power multiplier for the hash table */ #define HASHPOWER_DEFAULT 16 /* * We only reposition items in the LRU queue if they haven't been repositioned * in this many seconds. That saves us from churning on frequently-accessed * items. */ #define ITEM_UPDATE_INTERVAL 60 /* unistd.h is here */ #if HAVE_UNISTD_H # include #endif /* Slab sizing definitions. */ #define POWER_SMALLEST 1 #define POWER_LARGEST 256 /* actual cap is 255 */ #define SLAB_GLOBAL_PAGE_POOL 0 /* magic slab class for storing pages for reassignment */ #define CHUNK_ALIGN_BYTES 8 /* slab class max is a 6-bit number, -1. */ #define MAX_NUMBER_OF_SLAB_CLASSES (63 + 1) /** How long an object can reasonably be assumed to be locked before harvesting it on a low memory condition. Default: disabled. */ #define TAIL_REPAIR_TIME_DEFAULT 0 /* warning: don't use these macros with a function, as it evals its arg twice */ #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \ (i)->data->cas : (uint64_t)0) #define ITEM_set_cas(i,v) { \ if ((i)->it_flags & ITEM_CAS) { \ (i)->data->cas = v; \ } \ } #define ITEM_key(item) (((char*)&((item)->data)) \ + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0)) #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \ + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0)) #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \ + (item)->nsuffix \ + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0)) #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \ + (item)->nsuffix + (item)->nbytes \ + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0)) #define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6)) #define STAT_KEY_LEN 128 #define STAT_VAL_LEN 128 /** Append a simple stat with a stat name, value format and value */ #define APPEND_STAT(name, fmt, val) \ append_stat(name, add_stats, c, fmt, val); /** Append an indexed stat with a stat name (with format), value format and value */ #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \ klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \ vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \ add_stats(key_str, klen, val_str, vlen, c); /** Common APPEND_NUM_FMT_STAT format. */ #define APPEND_NUM_STAT(num, name, fmt, val) \ APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val) /** * Callback for any function producing stats. * * @param key the stat's key * @param klen length of the key * @param val the stat's value in an ascii form (e.g. text form of a number) * @param vlen length of the value * @parm cookie magic callback cookie */ typedef void (*ADD_STAT)(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, const void *cookie); /* * NOTE: If you modify this table you _MUST_ update the function state_text */ /** * Possible states of a connection. */ enum conn_states { conn_listening, /**< the socket which listens for connections */ conn_new_cmd, /**< Prepare connection for next command */ conn_waiting, /**< waiting for a readable socket */ conn_read, /**< reading in a command line */ conn_parse_cmd, /**< try to parse a command from the input buffer */ conn_write, /**< writing out a simple response */ conn_nread, /**< reading in a fixed number of bytes */ conn_swallow, /**< swallowing unnecessary bytes w/o storing */ conn_closing, /**< closing this connection */ conn_mwrite, /**< writing out many items sequentially */ conn_closed, /**< connection is closed */ conn_watch, /**< held by the logger thread as a watcher */ conn_max_state /**< Max state value (used for assertion) */ }; enum bin_substates { bin_no_state, bin_reading_set_header, bin_reading_cas_header, bin_read_set_value, bin_reading_get_key, bin_reading_stat, bin_reading_del_header, bin_reading_incr_header, bin_read_flush_exptime, bin_reading_sasl_auth, bin_reading_sasl_auth_data, bin_reading_touch_key, }; enum protocol { ascii_prot = 3, /* arbitrary value. */ binary_prot, negotiating_prot /* Discovering the protocol */ }; enum network_transport { local_transport, /* Unix sockets*/ tcp_transport, udp_transport }; enum pause_thread_types { PAUSE_WORKER_THREADS = 0, PAUSE_ALL_THREADS, RESUME_ALL_THREADS, RESUME_WORKER_THREADS }; #define IS_UDP(x) (x == udp_transport) #define NREAD_ADD 1 #define NREAD_SET 2 #define NREAD_REPLACE 3 #define NREAD_APPEND 4 #define NREAD_PREPEND 5 #define NREAD_CAS 6 enum store_item_type { NOT_STORED=0, STORED, EXISTS, NOT_FOUND }; enum delta_result_type { OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH }; /** Time relative to server start. Smaller than time_t on 64-bit systems. */ typedef unsigned int rel_time_t; /** Stats stored per slab (and per thread). */ struct slab_stats { uint64_t set_cmds; uint64_t get_hits; uint64_t touch_hits; uint64_t delete_hits; uint64_t cas_hits; uint64_t cas_badval; uint64_t incr_hits; uint64_t decr_hits; }; /** * Stats stored per-thread. */ struct thread_stats { pthread_mutex_t mutex; uint64_t get_cmds; uint64_t get_misses; uint64_t get_expired; uint64_t touch_cmds; uint64_t touch_misses; uint64_t delete_misses; uint64_t incr_misses; uint64_t decr_misses; uint64_t cas_misses; uint64_t bytes_read; uint64_t bytes_written; uint64_t flush_cmds; uint64_t conn_yields; /* # of yields for connections (-R option)*/ uint64_t auth_cmds; uint64_t auth_errors; struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES]; }; /** * Global stats. */ struct stats { pthread_mutex_t mutex; uint64_t curr_items; uint64_t total_items; uint64_t curr_bytes; uint64_t curr_conns; uint64_t total_conns; uint64_t rejected_conns; uint64_t malloc_fails; unsigned int reserved_fds; unsigned int conn_structs; uint64_t get_cmds; uint64_t set_cmds; uint64_t touch_cmds; uint64_t get_hits; uint64_t get_misses; uint64_t get_expired; uint64_t touch_hits; uint64_t touch_misses; uint64_t evictions; uint64_t reclaimed; time_t started; /* when the process was started */ bool accepting_conns; /* whether we are currently accepting */ uint64_t listen_disabled_num; unsigned int hash_power_level; /* Better hope it's not over 9000 */ uint64_t hash_bytes; /* size used for hash tables */ bool hash_is_expanding; /* If the hash table is being expanded */ uint64_t expired_unfetched; /* items reclaimed but never touched */ uint64_t evicted_unfetched; /* items evicted but never touched */ bool slab_reassign_running; /* slab reassign in progress */ uint64_t slabs_moved; /* times slabs were moved around */ uint64_t slab_reassign_rescues; /* items rescued during slab move */ uint64_t slab_reassign_evictions_nomem; /* valid items lost during slab move */ uint64_t slab_reassign_inline_reclaim; /* valid items lost during slab move */ uint64_t slab_reassign_busy_items; /* valid temporarily unmovable */ uint64_t lru_crawler_starts; /* Number of item crawlers kicked off */ bool lru_crawler_running; /* crawl in progress */ uint64_t lru_maintainer_juggles; /* number of LRU bg pokes */ uint64_t time_in_listen_disabled_us; /* elapsed time in microseconds while server unable to process new connections */ struct timeval maxconns_entered; /* last time maxconns entered */ }; #define MAX_VERBOSITY_LEVEL 2 /* When adding a setting, be sure to update process_stat_settings */ /** * Globally accessible settings as derived from the commandline. */ struct settings { size_t maxbytes; int maxconns; int port; int udpport; char *inter; int verbose; rel_time_t oldest_live; /* ignore existing items older than this */ uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */ int evict_to_free; char *socketpath; /* path to unix socket if using local socket */ int access; /* access mask (a la chmod) for unix domain socket */ double factor; /* chunk size growth factor */ int chunk_size; int num_threads; /* number of worker (without dispatcher) libevent threads to run */ int num_threads_per_udp; /* number of worker threads serving each udp socket */ char prefix_delimiter; /* character that marks a key prefix (for stats) */ int detail_enabled; /* nonzero if we're collecting detailed stats */ int reqs_per_event; /* Maximum number of io to process on each io-event. */ bool use_cas; enum protocol binding_protocol; int backlog; int item_size_max; /* Maximum item size, and upper end for slabs */ bool sasl; /* SASL on/off */ bool maxconns_fast; /* Whether or not to early close connections */ bool lru_crawler; /* Whether or not to enable the autocrawler thread */ bool lru_maintainer_thread; /* LRU maintainer background thread */ bool slab_reassign; /* Whether or not slab reassignment is allowed */ int slab_automove; /* Whether or not to automatically move slabs */ int hashpower_init; /* Starting hash power level */ bool shutdown_command; /* allow shutdown command */ int tail_repair_time; /* LRU tail refcount leak repair time */ bool flush_enabled; /* flush_all enabled */ char *hash_algorithm; /* Hash algorithm in use */ int lru_crawler_sleep; /* Microsecond sleep between items */ uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */ int hot_lru_pct; /* percentage of slab space for HOT_LRU */ int warm_lru_pct; /* percentage of slab space for WARM_LRU */ int crawls_persleep; /* Number of LRU crawls to run before sleeping */ bool expirezero_does_not_evict; /* exptime == 0 goes into NOEXP_LRU */ }; extern struct stats stats; extern time_t process_started; extern struct settings settings; #define ITEM_LINKED 1 #define ITEM_CAS 2 /* temp */ #define ITEM_SLABBED 4 /* Item was fetched at least once in its lifetime */ #define ITEM_FETCHED 8 /* Appended on fetch, removed on LRU shuffling */ #define ITEM_ACTIVE 16 /** * Structure for storing items within memcached. */ typedef struct _stritem { /* Protected by LRU locks */ struct _stritem *next; struct _stritem *prev; /* Rest are protected by an item lock */ struct _stritem *h_next; /* hash chain next */ rel_time_t time; /* least recent access */ rel_time_t exptime; /* expire time */ int nbytes; /* size of data */ unsigned short refcount; uint8_t nsuffix; /* length of flags-and-length string */ uint8_t it_flags; /* ITEM_* above */ uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ /* this odd type prevents type-punning issues when we do * the little shuffle to save space when not using CAS. */ union { uint64_t cas; char end; } data[]; /* if it_flags & ITEM_CAS we have 8 bytes CAS */ /* then null-terminated key */ /* then " flags length\r\n" (no terminating null) */ /* then data with terminating \r\n (no terminating null; it's binary!) */ } item; typedef struct { struct _stritem *next; struct _stritem *prev; struct _stritem *h_next; /* hash chain next */ rel_time_t time; /* least recent access */ rel_time_t exptime; /* expire time */ int nbytes; /* size of data */ unsigned short refcount; uint8_t nsuffix; /* length of flags-and-length string */ uint8_t it_flags; /* ITEM_* above */ uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ uint32_t remaining; /* Max keys to crawl per slab per invocation */ } crawler; typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ logger *l; /* logger buffer */ } LIBEVENT_THREAD; typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ } LIBEVENT_DISPATCHER_THREAD; /** * The structure representing a connection into memcached. */ typedef struct conn conn; struct conn { int sfd; sasl_conn_t *sasl_conn; bool authenticated; enum conn_states state; enum bin_substates substate; rel_time_t last_cmd_time; struct event event; short ev_flags; short which; /** which events were just triggered */ char *rbuf; /** buffer to read commands into */ char *rcurr; /** but if we parsed some already, this is where we stopped */ int rsize; /** total allocated size of rbuf */ int rbytes; /** how much data, starting from rcur, do we have unparsed */ char *wbuf; char *wcurr; int wsize; int wbytes; /** which state to go into after finishing current write */ enum conn_states write_and_go; void *write_and_free; /** free this memory after finishing writing */ char *ritem; /** when we read in an item's value, it goes here */ int rlbytes; /* data for the nread state */ /** * item is used to hold an item structure created after reading the command * line of set/add/replace commands, but before we finished reading the actual * data. The data is read into ITEM_data(item) to avoid extra copying. */ void *item; /* for commands set/add/replace */ /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ item **ilist; /* list of items to write out */ int isize; item **icurr; int ileft; char **suffixlist; int suffixsize; char **suffixcurr; int suffixleft; enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ }; /* array of conn structures, indexed by file descriptor */ extern conn **conns; /* current time of day (updated periodically) */ extern volatile rel_time_t current_time; /* TODO: Move to slabs.h? */ extern volatile int slab_rebalance_signal; struct slab_rebalance { void *slab_start; void *slab_end; void *slab_pos; int s_clsid; int d_clsid; uint32_t busy_items; uint32_t rescues; uint32_t evictions_nomem; uint32_t inline_reclaim; uint8_t done; }; extern struct slab_rebalance slab_rebal; /* * Functions */ void do_accept_new_conns(const bool do_accept); enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, const bool incr, const int64_t delta, char *buf, uint64_t *cas, const uint32_t hv); enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base); extern int daemonize(int nochdir, int noclose); #define mutex_lock(x) pthread_mutex_lock(x) #define mutex_unlock(x) pthread_mutex_unlock(x) #include "stats.h" #include "slabs.h" #include "assoc.h" #include "items.h" #include "trace.h" #include "hash.h" #include "util.h" /* * Functions such as the libevent-related calls that need to do cross-thread * communication in multithreaded mode (rather than actually doing the work * in the current thread) are called via "dispatch_" frontends, which are * also #define-d to directly call the underlying code in singlethreaded mode. */ void memcached_thread_init(int nthreads, struct event_base *main_base); int dispatch_event_add(int thread, conn *c); void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport); void sidethread_conn_close(conn *c); /* Lock wrappers for cache functions that are called from main loop. */ enum delta_result_type add_delta(conn *c, const char *key, const size_t nkey, const int incr, const int64_t delta, char *buf, uint64_t *cas); void accept_new_conns(const bool do_accept); conn *conn_from_freelist(void); bool conn_add_to_freelist(conn *c); int is_listen_thread(void); item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); item *item_get(const char *key, const size_t nkey, conn *c); item *item_touch(const char *key, const size_t nkey, uint32_t exptime, conn *c); int item_link(item *it); void item_remove(item *it); int item_replace(item *it, item *new_it, const uint32_t hv); void item_unlink(item *it); void item_update(item *it); void item_lock(uint32_t hv); void *item_trylock(uint32_t hv); void item_trylock_unlock(void *arg); void item_unlock(uint32_t hv); void pause_threads(enum pause_thread_types type); unsigned short refcount_incr(unsigned short *refcount); unsigned short refcount_decr(unsigned short *refcount); void STATS_LOCK(void); void STATS_UNLOCK(void); void threadlocal_stats_reset(void); void threadlocal_stats_aggregate(struct thread_stats *stats); void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out); /* Stat processing functions */ void append_stat(const char *name, ADD_STAT add_stats, conn *c, const char *fmt, ...); enum store_item_type store_item(item *item, int comm, conn *c); #if HAVE_DROP_PRIVILEGES extern void drop_privileges(void); #else #define drop_privileges() #endif /* If supported, give compiler hints for branch prediction. */ #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96) #define __builtin_expect(x, expected_value) (x) #endif #define likely(x) __builtin_expect((x),1) #define unlikely(x) __builtin_expect((x),0)