diff options
author | Trond Norbye <Trond.Norbye@sun.com> | 2009-03-04 10:50:47 +0100 |
---|---|---|
committer | Trond Norbye <Trond.Norbye@sun.com> | 2009-03-04 10:50:47 +0100 |
commit | 1fdfb7e91d29afbf3161bdbf3c7bcb4c8800c511 (patch) | |
tree | df71de8ff74e3d69fb65721578e2ec0793f6eacd | |
parent | 69aa542709745e7360b0cd9a81d7a407567106c4 (diff) | |
download | memcached-1fdfb7e91d29afbf3161bdbf3c7bcb4c8800c511.tar.gz |
Use threadlocal stats to count the commands
-rw-r--r-- | items.c | 2 | ||||
-rw-r--r-- | memcached.c | 88 | ||||
-rw-r--r-- | memcached.h | 36 | ||||
-rw-r--r-- | thread.c | 62 |
4 files changed, 126 insertions, 62 deletions
@@ -326,7 +326,7 @@ char *do_item_cachedump(const unsigned int slabs_clsid, const unsigned int limit key_temp[it->nkey] = 0x00; /* terminate */ len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n", key_temp, it->nbytes - 2, - (unsigned long)it->exptime + stats.started); + (unsigned long)it->exptime + process_started); if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */ break; strcpy(buffer + bufcurr, temp); diff --git a/memcached.c b/memcached.c index 32eea3d..ec01c94 100644 --- a/memcached.c +++ b/memcached.c @@ -66,7 +66,6 @@ static int try_read_udp(conn *c); static void conn_set_state(conn *c, enum conn_states state); /* stats */ -static void stats_reset(void); static void stats_init(void); static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key, const uint16_t klen, const char *val, @@ -101,6 +100,7 @@ static void conn_free(conn *c); /** exported globals **/ struct stats stats; struct settings settings; +time_t process_started; /* when the process was started */ /** file scope variables **/ static conn *listen_conn = NULL; @@ -130,9 +130,9 @@ static rel_time_t realtime(const time_t exptime) { underflow and wrap around to some large value way in the future, effectively making items expiring in the past really expiring never */ - if (exptime <= stats.started) + if (exptime <= process_started) return (rel_time_t)1; - return (rel_time_t)(exptime - stats.started); + return (rel_time_t)(exptime - process_started); } else { return (rel_time_t)(exptime + current_time); } @@ -140,24 +140,25 @@ static rel_time_t realtime(const time_t exptime) { static void stats_init(void) { stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0; - stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0; + stats.evictions = 0; stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0; /* make the time we started always be 2 seconds before we really did, so time(0) - time.started is never zero. if so, things like 'settings.oldest_live' which act as booleans as well as values are now false in boolean context... */ - stats.started = time(0) - 2; + process_started = time(0) - 2; stats_prefix_init(); } static void stats_reset(void) { STATS_LOCK(); stats.total_items = stats.total_conns = 0; - stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0; + stats.evictions = 0; stats.bytes_read = stats.bytes_written = 0; stats_prefix_clear(); STATS_UNLOCK(); + threadlocal_stats_reset(); } static void settings_init(void) { @@ -822,9 +823,9 @@ static void complete_nread_ascii(conn *c) { int comm = c->item_comm; enum store_item_type ret; - STATS_LOCK(); - stats.set_cmds++; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); @@ -1110,9 +1111,9 @@ static void complete_update_bin(conn *c) { item *it = c->item; - STATS_LOCK(); - stats.set_cmds++; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); /* We don't actually receive the trailing two characters in the bin * protocol, so we're going to just set them here */ @@ -1195,10 +1196,11 @@ static void process_bin_get(conn *c) { uint16_t keylen = 0; uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2); - STATS_LOCK(); - stats.get_cmds++; - stats.get_hits++; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_cmds++; + c->thread->stats.get_hits++; + pthread_mutex_unlock(&c->thread->stats.mutex); + MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey, it->nbytes, ITEM_get_cas(it)); @@ -1223,10 +1225,11 @@ static void process_bin_get(conn *c) { /* Remember this command so we can garbage collect it later */ c->item = it; } else { - STATS_LOCK(); - stats.get_cmds++; - stats.get_misses++; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_cmds++; + c->thread->stats.get_misses++; + pthread_mutex_unlock(&c->thread->stats.mutex); + MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0); if (c->noreply) { @@ -2071,6 +2074,9 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key, rel_time_t now = current_time; *buflen = 0; + struct thread_stats thread_stats; + threadlocal_stats_aggregate(&thread_stats); + #ifndef WIN32 struct rusage usage; getrusage(RUSAGE_SELF, &usage); @@ -2089,7 +2095,7 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key, pos += nbytes; *buflen += nbytes; - vlen = sprintf(val, "%ld", now + (long)stats.started); + vlen = sprintf(val, "%ld", now + (long)process_started); nbytes = add_stats(pos, "time", strlen("time"), val, vlen, (void *)c); pos += nbytes; *buflen += nbytes; @@ -2139,23 +2145,23 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key, pos += nbytes; *buflen += nbytes; - vlen = sprintf(val, "%llu", (unsigned long long)stats.get_cmds); + vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_cmds); nbytes = add_stats(pos, "cmd_get", strlen("cmd_get"), val, vlen, (void *)c); pos += nbytes; *buflen += nbytes; - vlen = sprintf(val, "%llu", (unsigned long long)stats.set_cmds); + vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.set_cmds); nbytes = add_stats(pos, "cmd_set", strlen("cmd_set"), val, vlen, (void *)c); pos += nbytes; *buflen += nbytes; - vlen = sprintf(val, "%llu", (unsigned long long)stats.get_hits); + vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_hits); nbytes = add_stats(pos, "get_hits", strlen("get_hits"), val, vlen, (void *)c); pos += nbytes; *buflen += nbytes; - vlen = sprintf(val, "%llu", (unsigned long long)stats.get_misses); + vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_misses); nbytes = add_stats(pos, "get_misses", strlen("get_misses"), val, vlen, (void *)c); pos += nbytes; @@ -2346,11 +2352,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, nkey = key_token->length; if(nkey > KEY_MAX_LENGTH) { - STATS_LOCK(); - stats.get_cmds += stats_get_cmds; - stats.get_hits += stats_get_hits; - stats.get_misses += stats_get_misses; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_cmds += stats_get_cmds; + c->thread->stats.get_hits += stats_get_hits; + c->thread->stats.get_misses += stats_get_misses; + pthread_mutex_unlock(&c->thread->stats.mutex); out_string(c, "CLIENT_ERROR bad command line format"); return; } @@ -2399,11 +2405,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, suffix = suffix_from_freelist(); if (suffix == NULL) { - STATS_LOCK(); - stats.get_cmds += stats_get_cmds; - stats.get_hits += stats_get_hits; - stats.get_misses += stats_get_misses; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_cmds += stats_get_cmds; + c->thread->stats.get_hits += stats_get_hits; + c->thread->stats.get_misses += stats_get_misses; + pthread_mutex_unlock(&c->thread->stats.mutex); out_string(c, "SERVER_ERROR out of memory making CAS suffix"); item_remove(it); return; @@ -2486,11 +2492,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, c->msgcurr = 0; } - STATS_LOCK(); - stats.get_cmds += stats_get_cmds; - stats.get_hits += stats_get_hits; - stats.get_misses += stats_get_misses; - STATS_UNLOCK(); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_cmds += stats_get_cmds; + c->thread->stats.get_hits += stats_get_hits; + c->thread->stats.get_misses += stats_get_misses; + pthread_mutex_unlock(&c->thread->stats.mutex); return; } @@ -3736,7 +3742,7 @@ static void set_current_time(void) { struct timeval timer; gettimeofday(&timer, NULL); - current_time = (rel_time_t) (timer.tv_sec - stats.started); + current_time = (rel_time_t) (timer.tv_sec - process_started); } static void clock_handler(const int fd, const short which, void *arg) { diff --git a/memcached.h b/memcached.h index 0fe56cb..07fdc2d 100644 --- a/memcached.h +++ b/memcached.h @@ -13,6 +13,7 @@ #include <netdb.h> #include <stdbool.h> #include <stdint.h> +#include <pthread.h> #include "protocol_binary.h" @@ -58,19 +59,23 @@ /** Time relative to server start. Smaller than time_t on 64-bit systems. */ typedef unsigned int rel_time_t; +struct thread_stats { + pthread_mutex_t mutex; + uint64_t get_cmds; + uint64_t set_cmds; + uint64_t get_hits; + uint64_t get_misses; +}; + struct stats { + pthread_mutex_t mutex; unsigned int curr_items; unsigned int total_items; uint64_t curr_bytes; unsigned int curr_conns; unsigned int total_conns; unsigned int conn_structs; - uint64_t get_cmds; - uint64_t set_cmds; - uint64_t get_hits; - uint64_t get_misses; uint64_t evictions; - time_t started; /* when the process was started */ uint64_t bytes_read; uint64_t bytes_written; }; @@ -99,6 +104,7 @@ struct settings { }; extern struct stats stats; +extern time_t process_started; extern struct settings settings; #define ITEM_LINKED 1 @@ -195,6 +201,16 @@ enum store_item_type { NOT_STORED=0, STORED, EXISTS, NOT_FOUND }; +typedef struct { + pthread_t thread_id; /* unique ID of this thread */ + struct event_base *base; /* libevent handle this thread uses */ + struct event notify_event; /* listen event for notify pipe */ + int notify_receive_fd; /* receiving end of notify pipe */ + int notify_send_fd; /* sending end of notify pipe */ + struct thread_stats stats; /* Stats generated by this thread */ + struct conn_queue *new_conn_queue; /* queue of new connections to handle */ +} LIBEVENT_THREAD; + typedef struct conn conn; struct conn { int sfd; @@ -274,6 +290,7 @@ struct conn { int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ + LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ }; @@ -347,8 +364,13 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid); char *slabs_stats(uint32_t (*add_stats)(char *buf, const char *key, const uint16_t klen, const char *val, const uint32_t vlen, void *cookie), void *c, int *buflen); -void STATS_LOCK(void); -void STATS_UNLOCK(void); + +void STATS_LOCK(void); +void STATS_UNLOCK(void); +void threadlocal_stats_reset(void); +void threadlocal_stats_aggregate(struct thread_stats *stats); + + enum store_item_type store_item(item *item, int comm, conn *c); #if HAVE_DROP_PRIVILEGES @@ -58,15 +58,6 @@ static pthread_mutex_t cqi_freelist_lock; * Each libevent instance has a wakeup pipe, which other threads * can use to signal that they've put a new connection on its queue. */ -typedef struct { - pthread_t thread_id; /* unique ID of this thread */ - struct event_base *base; /* libevent handle this thread uses */ - struct event notify_event; /* listen event for notify pipe */ - int notify_receive_fd; /* receiving end of notify pipe */ - int notify_send_fd; /* sending end of notify pipe */ - CQ new_conn_queue; /* queue of new connections to handle */ -} LIBEVENT_THREAD; - static LIBEVENT_THREAD *threads; /* @@ -275,7 +266,17 @@ static void setup_thread(LIBEVENT_THREAD *me) { exit(1); } - cq_init(&me->new_conn_queue); + me->new_conn_queue = malloc(sizeof(struct conn_queue)); + if (me->new_conn_queue == NULL) { + perror("Failed to allocate memory for connection queue"); + exit(EXIT_FAILURE); + } + cq_init(me->new_conn_queue); + + if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { + perror("Failed to initialize mutex"); + exit(EXIT_FAILURE); + } } @@ -312,7 +313,7 @@ static void thread_libevent_process(int fd, short which, void *arg) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); - item = cq_pop(&me->new_conn_queue); + item = cq_pop(me->new_conn_queue); if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, @@ -328,6 +329,8 @@ static void thread_libevent_process(int fd, short which, void *arg) { } close(item->sfd); } + } else { + c->thread = me; } cqi_free(item); } @@ -358,7 +361,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, item->read_buffer_size = read_buffer_size; item->protocol = prot; - cq_push(&thread->new_conn_queue, item); + cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); if (write(thread->notify_send_fd, "", 1) != 1) { @@ -570,6 +573,39 @@ void STATS_UNLOCK() { pthread_mutex_unlock(&stats_lock); } +void threadlocal_stats_reset(void) { + for (int ii = 0; ii < settings.num_threads; ++ii) { + pthread_mutex_lock(&threads[ii].stats.mutex); + + threads[ii].stats.get_cmds = 0; + threads[ii].stats.set_cmds = 0; + threads[ii].stats.get_hits = 0; + threads[ii].stats.get_misses = 0; + + pthread_mutex_unlock(&threads[ii].stats.mutex); + } +} + +void threadlocal_stats_aggregate(struct thread_stats *stats) { + /* The struct contains a mutex, so I should probably not memset it.. */ + stats->get_cmds = 0; + stats->set_cmds = 0; + stats->get_hits = 0; + stats->get_misses = 0; + + for (int ii = 0; ii < settings.num_threads; ++ii) { + pthread_mutex_lock(&threads[ii].stats.mutex); + + stats->get_cmds += threads[ii].stats.get_cmds; + stats->set_cmds += threads[ii].stats.set_cmds; + stats->get_hits += threads[ii].stats.get_hits; + stats->get_misses += threads[ii].stats.get_misses; + + pthread_mutex_unlock(&threads[ii].stats.mutex); + } +} + + /* * Initializes the thread subsystem, creating various worker threads. * @@ -609,7 +645,7 @@ void thread_init(int nthreads, struct event_base *main_base) { threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; - setup_thread(&threads[i]); + setup_thread(&threads[i]); } /* Create threads after we've done all the libevent setup. */ |