00001
00002
00008 #ifdef HAVE_CONFIG_H
00009 #include "config.h"
00010 #endif
00011
00012 #include <sys/types.h>
00013 #include <sys/socket.h>
00014 #include <sys/time.h>
00015 #include <netinet/in.h>
00016 #include <event.h>
00017 #include <netdb.h>
00018 #include <stdbool.h>
00019 #include <stdint.h>
00020 #include <pthread.h>
00021
00022 #include "protocol_binary.h"
00023 #include "cache.h"
00024
00026 #define KEY_MAX_LENGTH 250
00027
00028 #define DATA_BUFFER_SIZE 2048
00029 #define UDP_READ_BUFFER_SIZE 65536
00030 #define UDP_MAX_PAYLOAD_SIZE 1400
00031 #define UDP_HEADER_SIZE 8
00032 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
00033
00034
00035 #define SUFFIX_SIZE 24
00036
00038 #define ITEM_LIST_INITIAL 200
00039
00041 #define SUFFIX_LIST_INITIAL 20
00042
00044 #define IOV_LIST_INITIAL 400
00045
00047 #define MSG_LIST_INITIAL 10
00048
00050 #define READ_BUFFER_HIGHWAT 8192
00051 #define ITEM_LIST_HIGHWAT 400
00052 #define IOV_LIST_HIGHWAT 600
00053 #define MSG_LIST_HIGHWAT 100
00054
00055
00056 #define MIN_BIN_PKT_LENGTH 16
00057 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
00058
00059
00060 #if HAVE_UNISTD_H
00061 # include <unistd.h>
00062 #endif
00063
00064
00065 #define POWER_SMALLEST 1
00066 #define POWER_LARGEST 200
00067 #define POWER_BLOCK 1048576
00068 #define CHUNK_ALIGN_BYTES 8
00069 #define DONT_PREALLOC_SLABS
00070 #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1)
00071
00074 #define TAIL_REPAIR_TIME (3 * 3600)
00075
00077 typedef unsigned int rel_time_t;
00078
00080 struct slab_stats {
00081 uint64_t set_cmds;
00082 uint64_t get_hits;
00083 uint64_t delete_hits;
00084 uint64_t cas_hits;
00085 uint64_t cas_badval;
00086 uint64_t incr_hits;
00087 uint64_t decr_hits;
00088 };
00089
00093 struct thread_stats {
00094 pthread_mutex_t mutex;
00095 uint64_t get_cmds;
00096 uint64_t get_misses;
00097 uint64_t delete_misses;
00098 uint64_t incr_misses;
00099 uint64_t decr_misses;
00100 uint64_t cas_misses;
00101 uint64_t bytes_read;
00102 uint64_t bytes_written;
00103 uint64_t flush_cmds;
00104 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
00105 };
00106
00110 struct stats {
00111 pthread_mutex_t mutex;
00112 unsigned int curr_items;
00113 unsigned int total_items;
00114 uint64_t curr_bytes;
00115 unsigned int curr_conns;
00116 unsigned int total_conns;
00117 unsigned int conn_structs;
00118 uint64_t get_cmds;
00119 uint64_t set_cmds;
00120 uint64_t get_hits;
00121 uint64_t get_misses;
00122 uint64_t evictions;
00123 time_t started;
00124 bool accepting_conns;
00125 uint64_t listen_disabled_num;
00126 };
00127
00128 #define MAX_VERBOSITY_LEVEL 2
00129
00130
00134 struct settings {
00135 size_t maxbytes;
00136 int maxconns;
00137 int port;
00138 int udpport;
00139 char *inter;
00140 int verbose;
00141 rel_time_t oldest_live;
00142 int evict_to_free;
00143 char *socketpath;
00144 int access;
00145 double factor;
00146 int chunk_size;
00147 int num_threads;
00148 char prefix_delimiter;
00149 int detail_enabled;
00150 int reqs_per_event;
00151
00152 bool use_cas;
00153 int backlog;
00154 };
00155
00156 extern struct stats stats;
00157 extern time_t process_started;
00158 extern struct settings settings;
00159
00160 #define ITEM_LINKED 1
00161 #define ITEM_CAS 2
00162
00163
00164 #define ITEM_SLABBED 4
00165
00169 typedef struct _stritem {
00170 struct _stritem *next;
00171 struct _stritem *prev;
00172 struct _stritem *h_next;
00173 rel_time_t time;
00174 rel_time_t exptime;
00175 int nbytes;
00176 unsigned short refcount;
00177 uint8_t nsuffix;
00178 uint8_t it_flags;
00179 uint8_t slabs_clsid;
00180 uint8_t nkey;
00181 void * end[];
00182
00183
00184
00185
00186 } item;
00187
00188
00189 #define ITEM_get_cas(i) ((uint64_t)(((i)->it_flags & ITEM_CAS) ? \
00190 *(uint64_t*)&((i)->end[0]) : 0x0))
00191 #define ITEM_set_cas(i,v) { if ((i)->it_flags & ITEM_CAS) { \
00192 *(uint64_t*)&((i)->end[0]) = v; } }
00193
00194 #define ITEM_key(item) (((char*)&((item)->end[0])) \
00195 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00196
00197 #define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
00198 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00199
00200 #define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
00201 + (item)->nsuffix \
00202 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00203
00204 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
00205 + (item)->nsuffix + (item)->nbytes \
00206 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00207
00209 #define APPEND_STAT(name, fmt, val) \
00210 append_stat(name, add_stats, c, fmt, val);
00211
00214 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \
00215 klen = sprintf(key_str, name_fmt, num, name); \
00216 vlen = sprintf(val_str, fmt, val); \
00217 add_stats(key_str, klen, val_str, vlen, c);
00218
00220 #define APPEND_NUM_STAT(num, name, fmt, val) \
00221 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
00222
00232 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
00233 const char *val, const uint32_t vlen,
00234 const void *cookie);
00235
00236
00237
00238
00242 enum conn_states {
00243 conn_listening,
00244 conn_new_cmd,
00245 conn_waiting,
00246 conn_read,
00247 conn_parse_cmd,
00248 conn_write,
00249 conn_nread,
00250 conn_swallow,
00251 conn_closing,
00252 conn_mwrite,
00253 conn_max_state
00254 };
00255
00256 enum bin_substates {
00257 bin_no_state,
00258 bin_reading_set_header,
00259 bin_reading_cas_header,
00260 bin_read_set_value,
00261 bin_reading_get_key,
00262 bin_reading_stat,
00263 bin_reading_del_header,
00264 bin_reading_incr_header,
00265 bin_read_flush_exptime
00266 };
00267
00268 enum protocol {
00269 ascii_prot = 3,
00270 ascii_udp_prot,
00271 binary_prot,
00272 negotiating_prot
00273 };
00274
00275 #define IS_UDP(x) (x == ascii_udp_prot)
00276
00277 #define NREAD_ADD 1
00278 #define NREAD_SET 2
00279 #define NREAD_REPLACE 3
00280 #define NREAD_APPEND 4
00281 #define NREAD_PREPEND 5
00282 #define NREAD_CAS 6
00283
00284 enum store_item_type {
00285 NOT_STORED=0, STORED, EXISTS, NOT_FOUND
00286 };
00287
00288 typedef struct {
00289 pthread_t thread_id;
00290 struct event_base *base;
00291 struct event notify_event;
00292 int notify_receive_fd;
00293 int notify_send_fd;
00294 struct thread_stats stats;
00295 struct conn_queue *new_conn_queue;
00296 cache_t *suffix_cache;
00297 } LIBEVENT_THREAD;
00298
00302 typedef struct conn conn;
00303 struct conn {
00304 int sfd;
00305 enum conn_states state;
00306 enum bin_substates substate;
00307 struct event event;
00308 short ev_flags;
00309 short which;
00311 char *rbuf;
00312 char *rcurr;
00313 int rsize;
00314 int rbytes;
00316 char *wbuf;
00317 char *wcurr;
00318 int wsize;
00319 int wbytes;
00321 enum conn_states write_and_go;
00322 void *write_and_free;
00324 char *ritem;
00325 int rlbytes;
00326
00327
00328
00335 void *item;
00336
00337
00338 int sbytes;
00339
00340
00341 struct iovec *iov;
00342 int iovsize;
00343 int iovused;
00344
00345 struct msghdr *msglist;
00346 int msgsize;
00347 int msgused;
00348 int msgcurr;
00349 int msgbytes;
00350
00351 item **ilist;
00352 int isize;
00353 item **icurr;
00354 int ileft;
00355
00356 char **suffixlist;
00357 int suffixsize;
00358 char **suffixcurr;
00359 int suffixleft;
00360
00361 enum protocol protocol;
00362
00363
00364 int request_id;
00365 struct sockaddr request_addr;
00366 socklen_t request_addr_size;
00367 unsigned char *hdrbuf;
00368 int hdrsize;
00369
00370 bool noreply;
00371
00372 struct {
00373 char *buffer;
00374 size_t size;
00375 size_t offset;
00376 } stats;
00377
00378
00379
00380 protocol_binary_request_header binary_header;
00381 uint64_t cas;
00382 short cmd;
00383 int opaque;
00384 int keylen;
00385 conn *next;
00386 LIBEVENT_THREAD *thread;
00387 };
00388
00389
00390
00391 extern volatile rel_time_t current_time;
00392
00393
00394
00395
00396 void do_accept_new_conns(const bool do_accept);
00397 char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta,
00398 char *buf);
00399 enum store_item_type do_store_item(item *item, int comm, conn* c);
00400 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base);
00401 extern int daemonize(int nochdir, int noclose);
00402
00403
00404 #include "stats.h"
00405 #include "slabs.h"
00406 #include "assoc.h"
00407 #include "items.h"
00408 #include "trace.h"
00409 #include "hash.h"
00410 #include "util.h"
00411
00412
00413
00414
00415
00416
00417
00418
00419 void thread_init(int nthreads, struct event_base *main_base);
00420 int dispatch_event_add(int thread, conn *c);
00421 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot);
00422
00423
00424 char *add_delta(conn *c, item *item, const int incr, const int64_t delta,
00425 char *buf);
00426 void accept_new_conns(const bool do_accept);
00427 conn *conn_from_freelist(void);
00428 bool conn_add_to_freelist(conn *c);
00429 int is_listen_thread(void);
00430 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
00431 char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
00432 void item_flush_expired(void);
00433 item *item_get(const char *key, const size_t nkey);
00434 int item_link(item *it);
00435 void item_remove(item *it);
00436 int item_replace(item *it, item *new_it);
00437 void item_stats(ADD_STAT add_stats, void *c);
00438 void item_stats_sizes(ADD_STAT add_stats, void *c);
00439 void item_unlink(item *it);
00440 void item_update(item *it);
00441
00442 void STATS_LOCK(void);
00443 void STATS_UNLOCK(void);
00444 void threadlocal_stats_reset(void);
00445 void threadlocal_stats_aggregate(struct thread_stats *stats);
00446 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
00447
00448
00449 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
00450 const char *fmt, ...);
00451
00452 enum store_item_type store_item(item *item, int comm, conn *c);
00453
00454 #if HAVE_DROP_PRIVILEGES
00455 extern void drop_privileges();
00456 #else
00457 #define drop_privileges()
00458 #endif
00459
00460
00461 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
00462 #define __builtin_expect(x, expected_value) (x)
00463 #endif
00464
00465 #define likely(x) __builtin_expect((x),1)
00466 #define unlikely(x) __builtin_expect((x),0)