summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-09 18:08:26 -0800
committerdormando <dormando@rydia.net>2023-01-11 13:24:22 -0800
commitfccf7b9efdfb0deb11f111496ce53c5892647dab (patch)
treea3c2f6bafee2d80a609806627bd3922e0aa6cd08
parent8bb9d9a3e5ca93c38db97181d4c15b03d48a644d (diff)
downloadmemcached-fccf7b9efdfb0deb11f111496ce53c5892647dab.tar.gz
core: remove *conn object from cache commands
We want to start using cache commands in contexts without a client connection, but the client object has always been passed to all functions. In most cases we only need the worker thread (LIBEVENT_THREAD *t), so this change adjusts the arguments passed in.
-rw-r--r--items.c36
-rw-r--r--items.h8
-rw-r--r--memcached.c62
-rw-r--r--memcached.h24
-rw-r--r--proto_bin.c22
-rw-r--r--proto_text.c32
-rwxr-xr-xt/watcher.t4
-rw-r--r--t/watcher_connid.t4
-rw-r--r--thread.c22
9 files changed, 116 insertions, 98 deletions
diff --git a/items.c b/items.c
index ce8814d..4e329b6 100644
--- a/items.c
+++ b/items.c
@@ -259,7 +259,7 @@ item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain) {
return nch;
}
-item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
+item *do_item_alloc(const char *key, const size_t nkey, const unsigned int flags,
const rel_time_t exptime, const int nbytes) {
uint8_t nsuffix;
item *it = NULL;
@@ -975,7 +975,7 @@ void item_stats_sizes(ADD_STAT add_stats, void *c) {
}
/** wrapper around assoc_find which does the lazy expiration logic */
-item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
+item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, LIBEVENT_THREAD *t, const bool do_update) {
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(it);
@@ -1006,7 +1006,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
int ii;
if (it == NULL) {
fprintf(stderr, "> NOT FOUND ");
- } else {
+ } else if (was_found) {
fprintf(stderr, "> FOUND KEY ");
}
for (ii = 0; ii < nkey; ++ii) {
@@ -1018,31 +1018,31 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
was_found = 1;
if (item_is_flushed(it)) {
do_item_unlink(it, hv);
- STORAGE_delete(c->thread->storage, it);
+ STORAGE_delete(t->storage, it);
do_item_remove(it);
it = NULL;
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.get_flushed++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.get_flushed++;
+ pthread_mutex_unlock(&t->stats.mutex);
if (settings.verbose > 2) {
fprintf(stderr, " -nuked by flush");
}
was_found = 2;
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
- STORAGE_delete(c->thread->storage, it);
+ STORAGE_delete(t->storage, it);
do_item_remove(it);
it = NULL;
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.get_expired++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.get_expired++;
+ pthread_mutex_unlock(&t->stats.mutex);
if (settings.verbose > 2) {
fprintf(stderr, " -nuked by expire");
}
was_found = 3;
} else {
if (do_update) {
- do_item_bump(c, it, hv);
+ do_item_bump(t, it, hv);
}
DEBUG_REFCNT(it, '+');
}
@@ -1051,8 +1051,8 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
if (settings.verbose > 2)
fprintf(stderr, "\n");
/* For now this is in addition to the above verbose logging. */
- LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key,
- nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, c->sfd);
+ LOGGER_LOG(t->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key,
+ nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, t->cur_sfd);
return it;
}
@@ -1060,7 +1060,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c
// Requires lock held for item.
// Split out of do_item_get() to allow mget functions to look through header
// data before losing state modified via the bump function.
-void do_item_bump(conn *c, item *it, const uint32_t hv) {
+void do_item_bump(LIBEVENT_THREAD *t, item *it, const uint32_t hv) {
/* We update the hit markers only during fetches.
* An item needs to be hit twice overall to be considered
* ACTIVE, but only needs a single hit to maintain activity
@@ -1075,7 +1075,7 @@ void do_item_bump(conn *c, item *it, const uint32_t hv) {
it->it_flags |= ITEM_ACTIVE;
if (ITEM_lruid(it) != COLD_LRU) {
it->time = current_time; // only need to bump time.
- } else if (!lru_bump_async(c->thread->lru_bump_buf, it, hv)) {
+ } else if (!lru_bump_async(t->lru_bump_buf, it, hv)) {
// add flag before async bump to avoid race.
it->it_flags &= ~ITEM_ACTIVE;
}
@@ -1088,8 +1088,8 @@ void do_item_bump(conn *c, item *it, const uint32_t hv) {
}
item *do_item_touch(const char *key, size_t nkey, uint32_t exptime,
- const uint32_t hv, conn *c) {
- item *it = do_item_get(key, nkey, hv, c, DO_UPDATE);
+ const uint32_t hv, LIBEVENT_THREAD *t) {
+ item *it = do_item_get(key, nkey, hv, t, DO_UPDATE);
if (it != NULL) {
it->exptime = exptime;
}
diff --git a/items.h b/items.h
index cb25cfa..e204af5 100644
--- a/items.h
+++ b/items.h
@@ -11,7 +11,7 @@ uint64_t get_cas_id(void);
void set_cas_id(uint64_t new_cas);
/*@null@*/
-item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes);
+item *do_item_alloc(const char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes);
item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain);
item *do_item_alloc_pull(const size_t ntotal, const unsigned int id);
void item_free(item *it);
@@ -71,9 +71,9 @@ typedef struct {
} item_stats_automove;
void fill_item_stats_automove(item_stats_automove *am);
-item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update);
-item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv, conn *c);
-void do_item_bump(conn *c, item *it, const uint32_t hv);
+item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, LIBEVENT_THREAD *t, const bool do_update);
+item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv, LIBEVENT_THREAD *t);
+void do_item_bump(LIBEVENT_THREAD *t, item *it, const uint32_t hv);
void item_stats_reset(void);
extern pthread_mutex_t lru_locks[POWER_LARGEST];
diff --git a/memcached.c b/memcached.c
index 084aafd..7df7e55 100644
--- a/memcached.c
+++ b/memcached.c
@@ -1565,9 +1565,9 @@ static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add
*
* Returns the state of storage.
*/
-enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
+enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale) {
char *key = ITEM_key(it);
- item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
+ item *old_it = do_item_get(key, it->nkey, hv, t, DONT_UPDATE);
enum store_item_type stored = NOT_STORED;
enum cas_result { CAS_NONE, CAS_MATCH, CAS_BADVAL, CAS_STALE, CAS_MISS };
@@ -1587,7 +1587,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
cas_res = CAS_NONE;
} else if (it_cas == old_cas) {
cas_res = CAS_MATCH;
- } else if (c->set_stale && it_cas < old_cas) {
+ } else if (cas_stale && it_cas < old_cas) {
cas_res = CAS_STALE;
} else {
cas_res = CAS_BADVAL;
@@ -1603,9 +1603,9 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
+ pthread_mutex_unlock(&t->stats.mutex);
do_store = true;
} else if (cas_res == CAS_STALE) {
// if we're allowed to set a stale value, CAS must be lower than
@@ -1618,15 +1618,15 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
it->it_flags |= ITEM_TOKEN_SENT;
}
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
+ pthread_mutex_unlock(&t->stats.mutex);
do_store = true;
} else {
// NONE or BADVAL are the same for CAS cmd
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
+ pthread_mutex_unlock(&t->stats.mutex);
if (settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
@@ -1674,7 +1674,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
}
if (do_store) {
- STORAGE_delete(c->thread->storage, old_it);
+ STORAGE_delete(t->storage, old_it);
item_replace(old_it, it, hv);
stored = STORED;
}
@@ -1699,9 +1699,9 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
case NREAD_CAS:
// LRU expired
stored = NOT_FOUND;
- pthread_mutex_lock(&c->thread->stats.mutex);
- c->thread->stats.cas_misses++;
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
+ t->stats.cas_misses++;
+ pthread_mutex_unlock(&t->stats.mutex);
break;
case NREAD_REPLACE:
case NREAD_APPEND:
@@ -1716,12 +1716,12 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h
}
}
- if (stored == STORED) {
- c->cas = ITEM_get_cas(it);
+ if (stored == STORED && cas != NULL) {
+ *cas = ITEM_get_cas(it);
}
- LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
+ LOGGER_LOG(t->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
stored, comm, ITEM_key(it), it->nkey, it->nbytes, it->exptime,
- ITEM_clsid(it), c->sfd);
+ ITEM_clsid(it), t->cur_sfd);
return stored;
}
@@ -2187,12 +2187,12 @@ void process_stats_conns(ADD_STAT add_stats, void *c) {
}
#define IT_REFCOUNT_LIMIT 60000
-item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow) {
+item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow) {
item *it;
if (should_touch) {
- it = item_touch(key, nkey, exptime, c);
+ it = item_touch(key, nkey, exptime, t);
} else {
- it = item_get(key, nkey, c, do_update);
+ it = item_get(key, nkey, t, do_update);
}
if (it && it->refcount > IT_REFCOUNT_LIMIT) {
item_remove(it);
@@ -2208,9 +2208,9 @@ item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should
// locked, caller can directly change what it needs.
// though it might eventually be a better interface to sink it all into
// items.c.
-item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow) {
+item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow) {
item *it;
- it = item_get_locked(key, nkey, c, do_update, hv);
+ it = item_get_locked(key, nkey, t, do_update, hv);
if (it && it->refcount > IT_REFCOUNT_LIMIT) {
do_item_remove(it);
it = NULL;
@@ -2233,7 +2233,7 @@ item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32
*
* returns a response string to send back to the client.
*/
-enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
+enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, const size_t nkey,
const bool incr, const int64_t delta,
char *buf, uint64_t *cas,
const uint32_t hv,
@@ -2243,7 +2243,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
int res;
item *it;
- it = do_item_get(key, nkey, hv, c, DONT_UPDATE);
+ it = do_item_get(key, nkey, hv, t, DONT_UPDATE);
if (!it) {
return DELTA_ITEM_NOT_FOUND;
}
@@ -2283,13 +2283,13 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
}
- pthread_mutex_lock(&c->thread->stats.mutex);
+ pthread_mutex_lock(&t->stats.mutex);
if (incr) {
- c->thread->stats.slab_stats[ITEM_clsid(it)].incr_hits++;
+ t->stats.slab_stats[ITEM_clsid(it)].incr_hits++;
} else {
- c->thread->stats.slab_stats[ITEM_clsid(it)].decr_hits++;
+ t->stats.slab_stats[ITEM_clsid(it)].decr_hits++;
}
- pthread_mutex_unlock(&c->thread->stats.mutex);
+ pthread_mutex_unlock(&t->stats.mutex);
itoa_u64(value, buf);
res = strlen(buf);
diff --git a/memcached.h b/memcached.h
index 860fae4..94a2550 100644
--- a/memcached.h
+++ b/memcached.h
@@ -273,6 +273,9 @@ enum close_reasons {
#define NREAD_PREPEND 5
#define NREAD_CAS 6
+#define CAS_ALLOW_STALE true
+#define CAS_NO_STALE false
+
enum store_item_type {
NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY
};
@@ -711,6 +714,7 @@ typedef struct {
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
#endif
+ int cur_sfd; /* client fd for logging commands */
struct thread_stats stats; /* Stats generated by this thread */
io_queue_cb_t io_queues[IO_QUEUE_COUNT];
struct conn_queue *ev_queue; /* Worker/conn event queue */
@@ -914,12 +918,12 @@ extern void *ext_storage;
* Functions
*/
void do_accept_new_conns(const bool do_accept);
-enum delta_result_type do_add_delta(conn *c, const char *key,
+enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
const size_t nkey, const bool incr,
const int64_t delta, char *buf,
uint64_t *cas, const uint32_t hv,
item **it_ret);
-enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
+enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale);
void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
@@ -961,19 +965,19 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in
void sidethread_conn_close(conn *c);
/* Lock wrappers for cache functions that are called from main loop. */
-enum delta_result_type add_delta(conn *c, const char *key,
+enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
const size_t nkey, bool incr,
const int64_t delta, char *buf,
uint64_t *cas);
void accept_new_conns(const bool do_accept);
void conn_close_idle(conn *c);
void conn_close_all(void);
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+item *item_alloc(const char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
#define DO_UPDATE true
#define DONT_UPDATE false
-item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update);
-item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv);
-item *item_touch(const char *key, const size_t nkey, uint32_t exptime, conn *c);
+item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update);
+item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv);
+item *item_touch(const char *key, const size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t);
int item_link(item *it);
void item_remove(item *it);
int item_replace(item *it, item *new_it, const uint32_t hv);
@@ -1002,7 +1006,7 @@ LIBEVENT_THREAD *get_worker_thread(int id);
void append_stat(const char *name, ADD_STAT add_stats, conn *c,
const char *fmt, ...);
-enum store_item_type store_item(item *item, int comm, conn *c);
+enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale);
/* Protocol related code */
void out_string(conn *c, const char *str);
@@ -1013,8 +1017,8 @@ void out_string(conn *c, const char *str);
#define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
REALTIME_MAXDELTA + 1 : exptime
rel_time_t realtime(const time_t exptime);
-item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
-item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow);
+item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
+item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow);
// Read/Response object handlers.
void resp_reset(mc_resp *resp);
void resp_add_iov(mc_resp *resp, const void *buf, int len);
diff --git a/proto_bin.c b/proto_bin.c
index d8b37c5..0a30b8a 100644
--- a/proto_bin.c
+++ b/proto_bin.c
@@ -294,7 +294,7 @@ static void complete_incr_bin(conn *c, char *extbuf) {
if (c->binary_header.request.cas != 0) {
cas = c->binary_header.request.cas;
}
- switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
+ switch(add_delta(c->thread, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
req->message.body.delta, tmpbuf,
&cas)) {
case OK:
@@ -323,11 +323,13 @@ static void complete_incr_bin(conn *c, char *extbuf) {
res + 2);
if (it != NULL) {
+ uint64_t cas = 0;
memcpy(ITEM_data(it), tmpbuf, res);
memcpy(ITEM_data(it) + res, "\r\n", 2);
+ c->thread->cur_sfd = c->sfd; // for store_item logging.
- if (store_item(it, NREAD_ADD, c)) {
- c->cas = ITEM_get_cas(it);
+ if (store_item(it, NREAD_ADD, c->thread, &cas, CAS_NO_STALE)) {
+ c->cas = cas;
write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
} else {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
@@ -382,7 +384,10 @@ static void complete_update_bin(conn *c) {
ch->used += 2;
}
- ret = store_item(it, c->cmd, c);
+ uint64_t cas = 0;
+ c->thread->cur_sfd = c->sfd; // for store_item logging.
+ ret = store_item(it, c->cmd, c->thread, &cas, CAS_NO_STALE);
+ c->cas = cas;
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
@@ -476,9 +481,9 @@ static void process_bin_get_or_touch(conn *c, char *extbuf) {
protocol_binary_request_touch *t = (void *)extbuf;
time_t exptime = ntohl(t->message.body.expiration);
- it = item_touch(key, nkey, realtime(exptime), c);
+ it = item_touch(key, nkey, realtime(exptime), c->thread);
} else {
- it = item_get(key, nkey, c, DO_UPDATE);
+ it = item_get(key, nkey, c->thread, DO_UPDATE);
}
if (it) {
@@ -888,6 +893,7 @@ static void dispatch_bin_command(conn *c, char *extbuf) {
uint8_t extlen = c->binary_header.request.extlen;
uint16_t keylen = c->binary_header.request.keylen;
uint32_t bodylen = c->binary_header.request.bodylen;
+ c->thread->cur_sfd = c->sfd; // cuddle sfd for logging.
if (keylen > bodylen || keylen + extlen > bodylen) {
write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, 0);
@@ -1136,7 +1142,7 @@ static void process_bin_update(conn *c, char *extbuf) {
/* Avoid stale data persisting in cache because we failed alloc.
* Unacceptable for SET. Anywhere else too? */
if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
- it = item_get(key, nkey, c, DONT_UPDATE);
+ it = item_get(key, nkey, c->thread, DONT_UPDATE);
if (it) {
item_unlink(it);
STORAGE_delete(c->thread->storage, it);
@@ -1303,7 +1309,7 @@ static void process_bin_delete(conn *c) {
stats_prefix_record_delete(key, nkey);
}
- it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv);
+ it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv);
if (it) {
uint64_t cas = c->binary_header.request.cas;
if (cas == 0 || cas == ITEM_get_cas(it)) {
diff --git a/proto_text.c b/proto_text.c
index 6fd1212..adb38a6 100644
--- a/proto_text.c
+++ b/proto_text.c
@@ -168,7 +168,9 @@ void complete_nread_ascii(conn *c) {
}
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
- ret = store_item(it, comm, c);
+ uint64_t cas = 0;
+ c->thread->cur_sfd = c->sfd; // cuddle sfd for logging.
+ ret = store_item(it, comm, c->thread, &cas, c->set_stale);
#ifdef ENABLE_DTRACE
uint64_t cas = ITEM_get_cas(it);
@@ -201,6 +203,7 @@ void complete_nread_ascii(conn *c) {
#endif
if (c->mset_res) {
+ c->cas = cas;
_finalize_mset(c, ret);
} else {
switch (ret) {
@@ -565,7 +568,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
goto stop;
}
- it = limited_get(key, nkey, c, exptime, should_touch, DO_UPDATE, &overflow);
+ it = limited_get(key, nkey, c->thread, exptime, should_touch, DO_UPDATE, &overflow);
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
@@ -844,7 +847,7 @@ static void process_meta_command(conn *c, token_t *tokens, const size_t ntokens)
}
bool overflow; // not used here.
- item *it = limited_get(key, nkey, c, 0, false, DONT_UPDATE, &overflow);
+ item *it = limited_get(key, nkey, c->thread, 0, false, DONT_UPDATE, &overflow);
if (it) {
mc_resp *resp = c->resp;
size_t total = 0;
@@ -1093,10 +1096,10 @@ static void process_mget_command(conn *c, token_t *tokens, const size_t ntokens)
// I think we do, since an overflow shouldn't trigger an alloc/replace.
bool overflow = false;
if (!of.locked) {
- it = limited_get(key, nkey, c, 0, false, !of.no_update, &overflow);
+ it = limited_get(key, nkey, c->thread, 0, false, !of.no_update, &overflow);
} else {
// If we had to lock the item, we're doing our own bump later.
- it = limited_get_locked(key, nkey, c, DONT_UPDATE, &hv, &overflow);
+ it = limited_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv, &overflow);
}
// Since we're a new protocol, we can actually inform users that refcount
@@ -1294,7 +1297,7 @@ static void process_mget_command(conn *c, token_t *tokens, const size_t ntokens)
if (of.locked) {
// Delayed bump so we could get fetched/last access time pre-update.
if (!of.no_update && it != NULL) {
- do_item_bump(c, it, hv);
+ do_item_bump(c->thread, it, hv);
}
item_unlock(hv);
}
@@ -1513,7 +1516,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)
/* Avoid stale data persisting in cache because we failed alloc. */
// NOTE: only if SET mode?
- it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv);
+ it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv);
if (it) {
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
@@ -1620,7 +1623,7 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
}
}
- it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv);
+ it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv);
if (it) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
@@ -1759,7 +1762,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n
// return a referenced item if it exists, so we can modify it here, rather
// than adding even more parameters to do_add_delta.
bool item_created = false;
- switch(do_add_delta(c, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) {
+ switch(do_add_delta(c->thread, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) {
case OK:
if (c->noreply)
resp->skip = true;
@@ -1782,7 +1785,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, vlen);
memcpy(ITEM_data(it) + vlen, "\r\n", 2);
- if (do_store_item(it, NREAD_ADD, c, hv)) {
+ if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, CAS_NO_STALE)) {
item_created = true;
} else {
// Not sure how we can get here if we're holding the lock.
@@ -1987,7 +1990,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
/* Avoid stale data persisting in cache because we failed alloc.
* Unacceptable for SET. Anywhere else too? */
if (comm == NREAD_SET) {
- it = item_get(key, nkey, c, DONT_UPDATE);
+ it = item_get(key, nkey, c->thread, DONT_UPDATE);
if (it) {
item_unlink(it);
STORAGE_delete(c->thread->storage, it);
@@ -2039,7 +2042,7 @@ static void process_touch_command(conn *c, token_t *tokens, const size_t ntokens
}
exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));
- it = item_touch(key, nkey, exptime, c);
+ it = item_touch(key, nkey, exptime, c->thread);
if (it) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.touch_cmds++;
@@ -2081,7 +2084,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
return;
}
- switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
+ switch(add_delta(c->thread, key, nkey, incr, delta, temp, NULL)) {
case OK:
out_string(c, temp);
break;
@@ -2141,7 +2144,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
stats_prefix_record_delete(key, nkey);
}
- it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv);
+ it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv);
if (it) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
@@ -2734,6 +2737,7 @@ void process_command_ascii(conn *c, char *command) {
return;
}
+ c->thread->cur_sfd = c->sfd; // cuddle sfd for logging.
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
// All commands need a minimum of two tokens: cmd and NULL finalizer
// There are also no valid commands shorter than two bytes.
diff --git a/t/watcher.t b/t/watcher.t
index ddde520..f977c2a 100755
--- a/t/watcher.t
+++ b/t/watcher.t
@@ -5,11 +5,13 @@ use strict;
use warnings;
use Socket qw/SO_RCVBUF/;
-use Test::More tests => 30;
+use Test::More;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
+plan tests => 30;
+
my $server = new_memcached('-m 60 -o watcher_logbuf_size=8');
my $client = $server->sock;
my $watcher = $server->new_sock;
diff --git a/t/watcher_connid.t b/t/watcher_connid.t
index a5f527b..c98f86b 100644
--- a/t/watcher_connid.t
+++ b/t/watcher_connid.t
@@ -7,11 +7,13 @@ use strict;
use warnings;
use Socket qw/SO_RCVBUF/;
-use Test::More tests => 4;
+use Test::More;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
+plan tests => 4;
+
my $server = new_memcached('-m 60 -o watcher_logbuf_size=8');
my $client_first = $server->sock;
diff --git a/thread.c b/thread.c
index 01b3a3b..f66aa8f 100644
--- a/thread.c
+++ b/thread.c
@@ -809,7 +809,7 @@ void sidethread_conn_close(conn *c) {
/*
* Allocates a new item.
*/
-item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
+item *item_alloc(const char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
item *it;
/* do_item_alloc handles its own locks */
it = do_item_alloc(key, nkey, flags, exptime, nbytes);
@@ -820,12 +820,12 @@ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbyt
* Returns an item if it hasn't been marked as expired,
* lazy-expiring as needed.
*/
-item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
+item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
- it = do_item_get(key, nkey, hv, c, do_update);
+ it = do_item_get(key, nkey, hv, t, do_update);
item_unlock(hv);
return it;
}
@@ -833,20 +833,20 @@ item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update
// returns an item with the item lock held.
// lock will still be held even if return is NULL, allowing caller to replace
// an item atomically if desired.
-item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
+item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv) {
item *it;
*hv = hash(key, nkey);
item_lock(*hv);
- it = do_item_get(key, nkey, *hv, c, do_update);
+ it = do_item_get(key, nkey, *hv, t, do_update);
return it;
}
-item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
+item *item_touch(const char *key, size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
- it = do_item_touch(key, nkey, exptime, hv, c);
+ it = do_item_touch(key, nkey, exptime, hv, t);
item_unlock(hv);
return it;
}
@@ -901,7 +901,7 @@ void item_unlink(item *item) {
/*
* Does arithmetic on a numeric item value.
*/
-enum delta_result_type add_delta(conn *c, const char *key,
+enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key,
const size_t nkey, bool incr,
const int64_t delta, char *buf,
uint64_t *cas) {
@@ -910,7 +910,7 @@ enum delta_result_type add_delta(conn *c, const char *key,
hv = hash(key, nkey);
item_lock(hv);
- ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv, NULL);
+ ret = do_add_delta(t, key, nkey, incr, delta, buf, cas, hv, NULL);
item_unlock(hv);
return ret;
}
@@ -918,13 +918,13 @@ enum delta_result_type add_delta(conn *c, const char *key,
/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
-enum store_item_type store_item(item *item, int comm, conn* c) {
+enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale) {
enum store_item_type ret;
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
- ret = do_store_item(item, comm, c, hv);
+ ret = do_store_item(item, comm, t, hv, cas, cas_stale);
item_unlock(hv);
return ret;
}