summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrond Norbye <Trond.Norbye@sun.com>2009-03-04 10:50:47 +0100
committerTrond Norbye <Trond.Norbye@sun.com>2009-03-04 10:50:47 +0100
commit1fdfb7e91d29afbf3161bdbf3c7bcb4c8800c511 (patch)
treedf71de8ff74e3d69fb65721578e2ec0793f6eacd
parent69aa542709745e7360b0cd9a81d7a407567106c4 (diff)
downloadmemcached-1fdfb7e91d29afbf3161bdbf3c7bcb4c8800c511.tar.gz
Use threadlocal stats to count the commands
-rw-r--r--items.c2
-rw-r--r--memcached.c88
-rw-r--r--memcached.h36
-rw-r--r--thread.c62
4 files changed, 126 insertions, 62 deletions
diff --git a/items.c b/items.c
index 5a7a12e..cbb4274 100644
--- a/items.c
+++ b/items.c
@@ -326,7 +326,7 @@ char *do_item_cachedump(const unsigned int slabs_clsid, const unsigned int limit
key_temp[it->nkey] = 0x00; /* terminate */
len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
key_temp, it->nbytes - 2,
- (unsigned long)it->exptime + stats.started);
+ (unsigned long)it->exptime + process_started);
if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
break;
strcpy(buffer + bufcurr, temp);
diff --git a/memcached.c b/memcached.c
index 32eea3d..ec01c94 100644
--- a/memcached.c
+++ b/memcached.c
@@ -66,7 +66,6 @@ static int try_read_udp(conn *c);
static void conn_set_state(conn *c, enum conn_states state);
/* stats */
-static void stats_reset(void);
static void stats_init(void);
static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key,
const uint16_t klen, const char *val,
@@ -101,6 +100,7 @@ static void conn_free(conn *c);
/** exported globals **/
struct stats stats;
struct settings settings;
+time_t process_started; /* when the process was started */
/** file scope variables **/
static conn *listen_conn = NULL;
@@ -130,9 +130,9 @@ static rel_time_t realtime(const time_t exptime) {
underflow and wrap around to some large value way in the
future, effectively making items expiring in the past
really expiring never */
- if (exptime <= stats.started)
+ if (exptime <= process_started)
return (rel_time_t)1;
- return (rel_time_t)(exptime - stats.started);
+ return (rel_time_t)(exptime - process_started);
} else {
return (rel_time_t)(exptime + current_time);
}
@@ -140,24 +140,25 @@ static rel_time_t realtime(const time_t exptime) {
static void stats_init(void) {
stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
- stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
+ stats.evictions = 0;
stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
/* make the time we started always be 2 seconds before we really
did, so time(0) - time.started is never zero. if so, things
like 'settings.oldest_live' which act as booleans as well as
values are now false in boolean context... */
- stats.started = time(0) - 2;
+ process_started = time(0) - 2;
stats_prefix_init();
}
static void stats_reset(void) {
STATS_LOCK();
stats.total_items = stats.total_conns = 0;
- stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
+ stats.evictions = 0;
stats.bytes_read = stats.bytes_written = 0;
stats_prefix_clear();
STATS_UNLOCK();
+ threadlocal_stats_reset();
}
static void settings_init(void) {
@@ -822,9 +823,9 @@ static void complete_nread_ascii(conn *c) {
int comm = c->item_comm;
enum store_item_type ret;
- STATS_LOCK();
- stats.set_cmds++;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.set_cmds++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
@@ -1110,9 +1111,9 @@ static void complete_update_bin(conn *c) {
item *it = c->item;
- STATS_LOCK();
- stats.set_cmds++;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.set_cmds++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
/* We don't actually receive the trailing two characters in the bin
* protocol, so we're going to just set them here */
@@ -1195,10 +1196,11 @@ static void process_bin_get(conn *c) {
uint16_t keylen = 0;
uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
- STATS_LOCK();
- stats.get_cmds++;
- stats.get_hits++;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_cmds++;
+ c->thread->stats.get_hits++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
@@ -1223,10 +1225,11 @@ static void process_bin_get(conn *c) {
/* Remember this command so we can garbage collect it later */
c->item = it;
} else {
- STATS_LOCK();
- stats.get_cmds++;
- stats.get_misses++;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_cmds++;
+ c->thread->stats.get_misses++;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
+
MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
if (c->noreply) {
@@ -2071,6 +2074,9 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key,
rel_time_t now = current_time;
*buflen = 0;
+ struct thread_stats thread_stats;
+ threadlocal_stats_aggregate(&thread_stats);
+
#ifndef WIN32
struct rusage usage;
getrusage(RUSAGE_SELF, &usage);
@@ -2089,7 +2095,7 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key,
pos += nbytes;
*buflen += nbytes;
- vlen = sprintf(val, "%ld", now + (long)stats.started);
+ vlen = sprintf(val, "%ld", now + (long)process_started);
nbytes = add_stats(pos, "time", strlen("time"), val, vlen, (void *)c);
pos += nbytes;
*buflen += nbytes;
@@ -2139,23 +2145,23 @@ static char *server_stats(uint32_t (*add_stats)(char *buf, const char *key,
pos += nbytes;
*buflen += nbytes;
- vlen = sprintf(val, "%llu", (unsigned long long)stats.get_cmds);
+ vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_cmds);
nbytes = add_stats(pos, "cmd_get", strlen("cmd_get"), val, vlen, (void *)c);
pos += nbytes;
*buflen += nbytes;
- vlen = sprintf(val, "%llu", (unsigned long long)stats.set_cmds);
+ vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.set_cmds);
nbytes = add_stats(pos, "cmd_set", strlen("cmd_set"), val, vlen, (void *)c);
pos += nbytes;
*buflen += nbytes;
- vlen = sprintf(val, "%llu", (unsigned long long)stats.get_hits);
+ vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_hits);
nbytes = add_stats(pos, "get_hits", strlen("get_hits"), val, vlen,
(void *)c);
pos += nbytes;
*buflen += nbytes;
- vlen = sprintf(val, "%llu", (unsigned long long)stats.get_misses);
+ vlen = sprintf(val, "%llu", (unsigned long long)thread_stats.get_misses);
nbytes = add_stats(pos, "get_misses", strlen("get_misses"), val, vlen,
(void *)c);
pos += nbytes;
@@ -2346,11 +2352,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
nkey = key_token->length;
if(nkey > KEY_MAX_LENGTH) {
- STATS_LOCK();
- stats.get_cmds += stats_get_cmds;
- stats.get_hits += stats_get_hits;
- stats.get_misses += stats_get_misses;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_cmds += stats_get_cmds;
+ c->thread->stats.get_hits += stats_get_hits;
+ c->thread->stats.get_misses += stats_get_misses;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
@@ -2399,11 +2405,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
suffix = suffix_from_freelist();
if (suffix == NULL) {
- STATS_LOCK();
- stats.get_cmds += stats_get_cmds;
- stats.get_hits += stats_get_hits;
- stats.get_misses += stats_get_misses;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_cmds += stats_get_cmds;
+ c->thread->stats.get_hits += stats_get_hits;
+ c->thread->stats.get_misses += stats_get_misses;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
out_string(c, "SERVER_ERROR out of memory making CAS suffix");
item_remove(it);
return;
@@ -2486,11 +2492,11 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
c->msgcurr = 0;
}
- STATS_LOCK();
- stats.get_cmds += stats_get_cmds;
- stats.get_hits += stats_get_hits;
- stats.get_misses += stats_get_misses;
- STATS_UNLOCK();
+ pthread_mutex_lock(&c->thread->stats.mutex);
+ c->thread->stats.get_cmds += stats_get_cmds;
+ c->thread->stats.get_hits += stats_get_hits;
+ c->thread->stats.get_misses += stats_get_misses;
+ pthread_mutex_unlock(&c->thread->stats.mutex);
return;
}
@@ -3736,7 +3742,7 @@ static void set_current_time(void) {
struct timeval timer;
gettimeofday(&timer, NULL);
- current_time = (rel_time_t) (timer.tv_sec - stats.started);
+ current_time = (rel_time_t) (timer.tv_sec - process_started);
}
static void clock_handler(const int fd, const short which, void *arg) {
diff --git a/memcached.h b/memcached.h
index 0fe56cb..07fdc2d 100644
--- a/memcached.h
+++ b/memcached.h
@@ -13,6 +13,7 @@
#include <netdb.h>
#include <stdbool.h>
#include <stdint.h>
+#include <pthread.h>
#include "protocol_binary.h"
@@ -58,19 +59,23 @@
/** Time relative to server start. Smaller than time_t on 64-bit systems. */
typedef unsigned int rel_time_t;
+struct thread_stats {
+ pthread_mutex_t mutex;
+ uint64_t get_cmds;
+ uint64_t set_cmds;
+ uint64_t get_hits;
+ uint64_t get_misses;
+};
+
struct stats {
+ pthread_mutex_t mutex;
unsigned int curr_items;
unsigned int total_items;
uint64_t curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
unsigned int conn_structs;
- uint64_t get_cmds;
- uint64_t set_cmds;
- uint64_t get_hits;
- uint64_t get_misses;
uint64_t evictions;
- time_t started; /* when the process was started */
uint64_t bytes_read;
uint64_t bytes_written;
};
@@ -99,6 +104,7 @@ struct settings {
};
extern struct stats stats;
+extern time_t process_started;
extern struct settings settings;
#define ITEM_LINKED 1
@@ -195,6 +201,16 @@ enum store_item_type {
NOT_STORED=0, STORED, EXISTS, NOT_FOUND
};
+typedef struct {
+ pthread_t thread_id; /* unique ID of this thread */
+ struct event_base *base; /* libevent handle this thread uses */
+ struct event notify_event; /* listen event for notify pipe */
+ int notify_receive_fd; /* receiving end of notify pipe */
+ int notify_send_fd; /* sending end of notify pipe */
+ struct thread_stats stats; /* Stats generated by this thread */
+ struct conn_queue *new_conn_queue; /* queue of new connections to handle */
+} LIBEVENT_THREAD;
+
typedef struct conn conn;
struct conn {
int sfd;
@@ -274,6 +290,7 @@ struct conn {
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
+ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
@@ -347,8 +364,13 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid);
char *slabs_stats(uint32_t (*add_stats)(char *buf,
const char *key, const uint16_t klen, const char *val,
const uint32_t vlen, void *cookie), void *c, int *buflen);
-void STATS_LOCK(void);
-void STATS_UNLOCK(void);
+
+void STATS_LOCK(void);
+void STATS_UNLOCK(void);
+void threadlocal_stats_reset(void);
+void threadlocal_stats_aggregate(struct thread_stats *stats);
+
+
enum store_item_type store_item(item *item, int comm, conn *c);
#if HAVE_DROP_PRIVILEGES
diff --git a/thread.c b/thread.c
index 28bf468..ad18eaf 100644
--- a/thread.c
+++ b/thread.c
@@ -58,15 +58,6 @@ static pthread_mutex_t cqi_freelist_lock;
* Each libevent instance has a wakeup pipe, which other threads
* can use to signal that they've put a new connection on its queue.
*/
-typedef struct {
- pthread_t thread_id; /* unique ID of this thread */
- struct event_base *base; /* libevent handle this thread uses */
- struct event notify_event; /* listen event for notify pipe */
- int notify_receive_fd; /* receiving end of notify pipe */
- int notify_send_fd; /* sending end of notify pipe */
- CQ new_conn_queue; /* queue of new connections to handle */
-} LIBEVENT_THREAD;
-
static LIBEVENT_THREAD *threads;
/*
@@ -275,7 +266,17 @@ static void setup_thread(LIBEVENT_THREAD *me) {
exit(1);
}
- cq_init(&me->new_conn_queue);
+ me->new_conn_queue = malloc(sizeof(struct conn_queue));
+ if (me->new_conn_queue == NULL) {
+ perror("Failed to allocate memory for connection queue");
+ exit(EXIT_FAILURE);
+ }
+ cq_init(me->new_conn_queue);
+
+ if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
+ perror("Failed to initialize mutex");
+ exit(EXIT_FAILURE);
+ }
}
@@ -312,7 +313,7 @@ static void thread_libevent_process(int fd, short which, void *arg) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
- item = cq_pop(&me->new_conn_queue);
+ item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
@@ -328,6 +329,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
}
close(item->sfd);
}
+ } else {
+ c->thread = me;
}
cqi_free(item);
}
@@ -358,7 +361,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->read_buffer_size = read_buffer_size;
item->protocol = prot;
- cq_push(&thread->new_conn_queue, item);
+ cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
if (write(thread->notify_send_fd, "", 1) != 1) {
@@ -570,6 +573,39 @@ void STATS_UNLOCK() {
pthread_mutex_unlock(&stats_lock);
}
+void threadlocal_stats_reset(void) {
+ for (int ii = 0; ii < settings.num_threads; ++ii) {
+ pthread_mutex_lock(&threads[ii].stats.mutex);
+
+ threads[ii].stats.get_cmds = 0;
+ threads[ii].stats.set_cmds = 0;
+ threads[ii].stats.get_hits = 0;
+ threads[ii].stats.get_misses = 0;
+
+ pthread_mutex_unlock(&threads[ii].stats.mutex);
+ }
+}
+
+void threadlocal_stats_aggregate(struct thread_stats *stats) {
+ /* The struct contains a mutex, so I should probably not memset it.. */
+ stats->get_cmds = 0;
+ stats->set_cmds = 0;
+ stats->get_hits = 0;
+ stats->get_misses = 0;
+
+ for (int ii = 0; ii < settings.num_threads; ++ii) {
+ pthread_mutex_lock(&threads[ii].stats.mutex);
+
+ stats->get_cmds += threads[ii].stats.get_cmds;
+ stats->set_cmds += threads[ii].stats.set_cmds;
+ stats->get_hits += threads[ii].stats.get_hits;
+ stats->get_misses += threads[ii].stats.get_misses;
+
+ pthread_mutex_unlock(&threads[ii].stats.mutex);
+ }
+}
+
+
/*
* Initializes the thread subsystem, creating various worker threads.
*
@@ -609,7 +645,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
- setup_thread(&threads[i]);
+ setup_thread(&threads[i]);
}
/* Create threads after we've done all the libevent setup. */