diff options
author | dormando <dormando@rydia.net> | 2023-01-09 18:08:26 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-01-11 13:24:22 -0800 |
commit | fccf7b9efdfb0deb11f111496ce53c5892647dab (patch) | |
tree | a3c2f6bafee2d80a609806627bd3922e0aa6cd08 | |
parent | 8bb9d9a3e5ca93c38db97181d4c15b03d48a644d (diff) | |
download | memcached-fccf7b9efdfb0deb11f111496ce53c5892647dab.tar.gz |
core: remove *conn object from cache commands
We want to start using cache commands in contexts without a client
connection, but the client object has always been passed to all
functions.
In most cases we only need the worker thread (LIBEVENT_THREAD *t), so
this change adjusts the arguments passed in.
-rw-r--r-- | items.c | 36 | ||||
-rw-r--r-- | items.h | 8 | ||||
-rw-r--r-- | memcached.c | 62 | ||||
-rw-r--r-- | memcached.h | 24 | ||||
-rw-r--r-- | proto_bin.c | 22 | ||||
-rw-r--r-- | proto_text.c | 32 | ||||
-rwxr-xr-x | t/watcher.t | 4 | ||||
-rw-r--r-- | t/watcher_connid.t | 4 | ||||
-rw-r--r-- | thread.c | 22 |
9 files changed, 116 insertions, 98 deletions
@@ -259,7 +259,7 @@ item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain) { return nch; } -item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags, +item *do_item_alloc(const char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes) { uint8_t nsuffix; item *it = NULL; @@ -975,7 +975,7 @@ void item_stats_sizes(ADD_STAT add_stats, void *c) { } /** wrapper around assoc_find which does the lazy expiration logic */ -item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) { +item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, LIBEVENT_THREAD *t, const bool do_update) { item *it = assoc_find(key, nkey, hv); if (it != NULL) { refcount_incr(it); @@ -1006,7 +1006,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c int ii; if (it == NULL) { fprintf(stderr, "> NOT FOUND "); - } else { + } else if (was_found) { fprintf(stderr, "> FOUND KEY "); } for (ii = 0; ii < nkey; ++ii) { @@ -1018,31 +1018,31 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c was_found = 1; if (item_is_flushed(it)) { do_item_unlink(it, hv); - STORAGE_delete(c->thread->storage, it); + STORAGE_delete(t->storage, it); do_item_remove(it); it = NULL; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.get_flushed++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.get_flushed++; + pthread_mutex_unlock(&t->stats.mutex); if (settings.verbose > 2) { fprintf(stderr, " -nuked by flush"); } was_found = 2; } else if (it->exptime != 0 && it->exptime <= current_time) { do_item_unlink(it, hv); - STORAGE_delete(c->thread->storage, it); + STORAGE_delete(t->storage, it); do_item_remove(it); it = NULL; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.get_expired++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.get_expired++; + pthread_mutex_unlock(&t->stats.mutex); if (settings.verbose > 2) { fprintf(stderr, " -nuked by expire"); } was_found = 3; } else { if (do_update) { - do_item_bump(c, it, hv); + do_item_bump(t, it, hv); } DEBUG_REFCNT(it, '+'); } @@ -1051,8 +1051,8 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c if (settings.verbose > 2) fprintf(stderr, "\n"); /* For now this is in addition to the above verbose logging. */ - LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key, - nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, c->sfd); + LOGGER_LOG(t->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key, + nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, t->cur_sfd); return it; } @@ -1060,7 +1060,7 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c // Requires lock held for item. // Split out of do_item_get() to allow mget functions to look through header // data before losing state modified via the bump function. -void do_item_bump(conn *c, item *it, const uint32_t hv) { +void do_item_bump(LIBEVENT_THREAD *t, item *it, const uint32_t hv) { /* We update the hit markers only during fetches. * An item needs to be hit twice overall to be considered * ACTIVE, but only needs a single hit to maintain activity @@ -1075,7 +1075,7 @@ void do_item_bump(conn *c, item *it, const uint32_t hv) { it->it_flags |= ITEM_ACTIVE; if (ITEM_lruid(it) != COLD_LRU) { it->time = current_time; // only need to bump time. - } else if (!lru_bump_async(c->thread->lru_bump_buf, it, hv)) { + } else if (!lru_bump_async(t->lru_bump_buf, it, hv)) { // add flag before async bump to avoid race. it->it_flags &= ~ITEM_ACTIVE; } @@ -1088,8 +1088,8 @@ void do_item_bump(conn *c, item *it, const uint32_t hv) { } item *do_item_touch(const char *key, size_t nkey, uint32_t exptime, - const uint32_t hv, conn *c) { - item *it = do_item_get(key, nkey, hv, c, DO_UPDATE); + const uint32_t hv, LIBEVENT_THREAD *t) { + item *it = do_item_get(key, nkey, hv, t, DO_UPDATE); if (it != NULL) { it->exptime = exptime; } @@ -11,7 +11,7 @@ uint64_t get_cas_id(void); void set_cas_id(uint64_t new_cas); /*@null@*/ -item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes); +item *do_item_alloc(const char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes); item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain); item *do_item_alloc_pull(const size_t ntotal, const unsigned int id); void item_free(item *it); @@ -71,9 +71,9 @@ typedef struct { } item_stats_automove; void fill_item_stats_automove(item_stats_automove *am); -item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update); -item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv, conn *c); -void do_item_bump(conn *c, item *it, const uint32_t hv); +item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, LIBEVENT_THREAD *t, const bool do_update); +item *do_item_touch(const char *key, const size_t nkey, uint32_t exptime, const uint32_t hv, LIBEVENT_THREAD *t); +void do_item_bump(LIBEVENT_THREAD *t, item *it, const uint32_t hv); void item_stats_reset(void); extern pthread_mutex_t lru_locks[POWER_LARGEST]; diff --git a/memcached.c b/memcached.c index 084aafd..7df7e55 100644 --- a/memcached.c +++ b/memcached.c @@ -1565,9 +1565,9 @@ static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add * * Returns the state of storage. */ -enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) { +enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale) { char *key = ITEM_key(it); - item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE); + item *old_it = do_item_get(key, it->nkey, hv, t, DONT_UPDATE); enum store_item_type stored = NOT_STORED; enum cas_result { CAS_NONE, CAS_MATCH, CAS_BADVAL, CAS_STALE, CAS_MISS }; @@ -1587,7 +1587,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h cas_res = CAS_NONE; } else if (it_cas == old_cas) { cas_res = CAS_MATCH; - } else if (c->set_stale && it_cas < old_cas) { + } else if (cas_stale && it_cas < old_cas) { cas_res = CAS_STALE; } else { cas_res = CAS_BADVAL; @@ -1603,9 +1603,9 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h // cas validates // it and old_it may belong to different classes. // I'm updating the stats for the one that's getting pushed out - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; + pthread_mutex_unlock(&t->stats.mutex); do_store = true; } else if (cas_res == CAS_STALE) { // if we're allowed to set a stale value, CAS must be lower than @@ -1618,15 +1618,15 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h it->it_flags |= ITEM_TOKEN_SENT; } - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++; + pthread_mutex_unlock(&t->stats.mutex); do_store = true; } else { // NONE or BADVAL are the same for CAS cmd - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++; + pthread_mutex_unlock(&t->stats.mutex); if (settings.verbose > 1) { fprintf(stderr, "CAS: failure: expected %llu, got %llu\n", @@ -1674,7 +1674,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h } if (do_store) { - STORAGE_delete(c->thread->storage, old_it); + STORAGE_delete(t->storage, old_it); item_replace(old_it, it, hv); stored = STORED; } @@ -1699,9 +1699,9 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h case NREAD_CAS: // LRU expired stored = NOT_FOUND; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.cas_misses++; - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); + t->stats.cas_misses++; + pthread_mutex_unlock(&t->stats.mutex); break; case NREAD_REPLACE: case NREAD_APPEND: @@ -1716,12 +1716,12 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h } } - if (stored == STORED) { - c->cas = ITEM_get_cas(it); + if (stored == STORED && cas != NULL) { + *cas = ITEM_get_cas(it); } - LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL, + LOGGER_LOG(t->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL, stored, comm, ITEM_key(it), it->nkey, it->nbytes, it->exptime, - ITEM_clsid(it), c->sfd); + ITEM_clsid(it), t->cur_sfd); return stored; } @@ -2187,12 +2187,12 @@ void process_stats_conns(ADD_STAT add_stats, void *c) { } #define IT_REFCOUNT_LIMIT 60000 -item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow) { +item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow) { item *it; if (should_touch) { - it = item_touch(key, nkey, exptime, c); + it = item_touch(key, nkey, exptime, t); } else { - it = item_get(key, nkey, c, do_update); + it = item_get(key, nkey, t, do_update); } if (it && it->refcount > IT_REFCOUNT_LIMIT) { item_remove(it); @@ -2208,9 +2208,9 @@ item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should // locked, caller can directly change what it needs. // though it might eventually be a better interface to sink it all into // items.c. -item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow) { +item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow) { item *it; - it = item_get_locked(key, nkey, c, do_update, hv); + it = item_get_locked(key, nkey, t, do_update, hv); if (it && it->refcount > IT_REFCOUNT_LIMIT) { do_item_remove(it); it = NULL; @@ -2233,7 +2233,7 @@ item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32 * * returns a response string to send back to the client. */ -enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, +enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, const size_t nkey, const bool incr, const int64_t delta, char *buf, uint64_t *cas, const uint32_t hv, @@ -2243,7 +2243,7 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, int res; item *it; - it = do_item_get(key, nkey, hv, c, DONT_UPDATE); + it = do_item_get(key, nkey, hv, t, DONT_UPDATE); if (!it) { return DELTA_ITEM_NOT_FOUND; } @@ -2283,13 +2283,13 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value); } - pthread_mutex_lock(&c->thread->stats.mutex); + pthread_mutex_lock(&t->stats.mutex); if (incr) { - c->thread->stats.slab_stats[ITEM_clsid(it)].incr_hits++; + t->stats.slab_stats[ITEM_clsid(it)].incr_hits++; } else { - c->thread->stats.slab_stats[ITEM_clsid(it)].decr_hits++; + t->stats.slab_stats[ITEM_clsid(it)].decr_hits++; } - pthread_mutex_unlock(&c->thread->stats.mutex); + pthread_mutex_unlock(&t->stats.mutex); itoa_u64(value, buf); res = strlen(buf); diff --git a/memcached.h b/memcached.h index 860fae4..94a2550 100644 --- a/memcached.h +++ b/memcached.h @@ -273,6 +273,9 @@ enum close_reasons { #define NREAD_PREPEND 5 #define NREAD_CAS 6 +#define CAS_ALLOW_STALE true +#define CAS_NO_STALE false + enum store_item_type { NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY }; @@ -711,6 +714,7 @@ typedef struct { int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ #endif + int cur_sfd; /* client fd for logging commands */ struct thread_stats stats; /* Stats generated by this thread */ io_queue_cb_t io_queues[IO_QUEUE_COUNT]; struct conn_queue *ev_queue; /* Worker/conn event queue */ @@ -914,12 +918,12 @@ extern void *ext_storage; * Functions */ void do_accept_new_conns(const bool do_accept); -enum delta_result_type do_add_delta(conn *c, const char *key, +enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, const size_t nkey, const bool incr, const int64_t delta, char *buf, uint64_t *cas, const uint32_t hv, item **it_ret); -enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv); +enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, uint64_t *cas, bool cas_stale); void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb); void conn_io_queue_setup(conn *c); io_queue_t *conn_io_queue_get(conn *c, int type); @@ -961,19 +965,19 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in void sidethread_conn_close(conn *c); /* Lock wrappers for cache functions that are called from main loop. */ -enum delta_result_type add_delta(conn *c, const char *key, +enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key, const size_t nkey, bool incr, const int64_t delta, char *buf, uint64_t *cas); void accept_new_conns(const bool do_accept); void conn_close_idle(conn *c); void conn_close_all(void); -item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); +item *item_alloc(const char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); #define DO_UPDATE true #define DONT_UPDATE false -item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update); -item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv); -item *item_touch(const char *key, const size_t nkey, uint32_t exptime, conn *c); +item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update); +item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv); +item *item_touch(const char *key, const size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t); int item_link(item *it); void item_remove(item *it); int item_replace(item *it, item *new_it, const uint32_t hv); @@ -1002,7 +1006,7 @@ LIBEVENT_THREAD *get_worker_thread(int id); void append_stat(const char *name, ADD_STAT add_stats, conn *c, const char *fmt, ...); -enum store_item_type store_item(item *item, int comm, conn *c); +enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale); /* Protocol related code */ void out_string(conn *c, const char *str); @@ -1013,8 +1017,8 @@ void out_string(conn *c, const char *str); #define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \ REALTIME_MAXDELTA + 1 : exptime rel_time_t realtime(const time_t exptime); -item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow); -item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow); +item* limited_get(const char *key, size_t nkey, LIBEVENT_THREAD *t, uint32_t exptime, bool should_touch, bool do_update, bool *overflow); +item* limited_get_locked(const char *key, size_t nkey, LIBEVENT_THREAD *t, bool do_update, uint32_t *hv, bool *overflow); // Read/Response object handlers. void resp_reset(mc_resp *resp); void resp_add_iov(mc_resp *resp, const void *buf, int len); diff --git a/proto_bin.c b/proto_bin.c index d8b37c5..0a30b8a 100644 --- a/proto_bin.c +++ b/proto_bin.c @@ -294,7 +294,7 @@ static void complete_incr_bin(conn *c, char *extbuf) { if (c->binary_header.request.cas != 0) { cas = c->binary_header.request.cas; } - switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, + switch(add_delta(c->thread, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, req->message.body.delta, tmpbuf, &cas)) { case OK: @@ -323,11 +323,13 @@ static void complete_incr_bin(conn *c, char *extbuf) { res + 2); if (it != NULL) { + uint64_t cas = 0; memcpy(ITEM_data(it), tmpbuf, res); memcpy(ITEM_data(it) + res, "\r\n", 2); + c->thread->cur_sfd = c->sfd; // for store_item logging. - if (store_item(it, NREAD_ADD, c)) { - c->cas = ITEM_get_cas(it); + if (store_item(it, NREAD_ADD, c->thread, &cas, CAS_NO_STALE)) { + c->cas = cas; write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); } else { write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, @@ -382,7 +384,10 @@ static void complete_update_bin(conn *c) { ch->used += 2; } - ret = store_item(it, c->cmd, c); + uint64_t cas = 0; + c->thread->cur_sfd = c->sfd; // for store_item logging. + ret = store_item(it, c->cmd, c->thread, &cas, CAS_NO_STALE); + c->cas = cas; #ifdef ENABLE_DTRACE uint64_t cas = ITEM_get_cas(it); @@ -476,9 +481,9 @@ static void process_bin_get_or_touch(conn *c, char *extbuf) { protocol_binary_request_touch *t = (void *)extbuf; time_t exptime = ntohl(t->message.body.expiration); - it = item_touch(key, nkey, realtime(exptime), c); + it = item_touch(key, nkey, realtime(exptime), c->thread); } else { - it = item_get(key, nkey, c, DO_UPDATE); + it = item_get(key, nkey, c->thread, DO_UPDATE); } if (it) { @@ -888,6 +893,7 @@ static void dispatch_bin_command(conn *c, char *extbuf) { uint8_t extlen = c->binary_header.request.extlen; uint16_t keylen = c->binary_header.request.keylen; uint32_t bodylen = c->binary_header.request.bodylen; + c->thread->cur_sfd = c->sfd; // cuddle sfd for logging. if (keylen > bodylen || keylen + extlen > bodylen) { write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, 0); @@ -1136,7 +1142,7 @@ static void process_bin_update(conn *c, char *extbuf) { /* Avoid stale data persisting in cache because we failed alloc. * Unacceptable for SET. Anywhere else too? */ if (c->cmd == PROTOCOL_BINARY_CMD_SET) { - it = item_get(key, nkey, c, DONT_UPDATE); + it = item_get(key, nkey, c->thread, DONT_UPDATE); if (it) { item_unlink(it); STORAGE_delete(c->thread->storage, it); @@ -1303,7 +1309,7 @@ static void process_bin_delete(conn *c) { stats_prefix_record_delete(key, nkey); } - it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); + it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv); if (it) { uint64_t cas = c->binary_header.request.cas; if (cas == 0 || cas == ITEM_get_cas(it)) { diff --git a/proto_text.c b/proto_text.c index 6fd1212..adb38a6 100644 --- a/proto_text.c +++ b/proto_text.c @@ -168,7 +168,9 @@ void complete_nread_ascii(conn *c) { } out_string(c, "CLIENT_ERROR bad data chunk"); } else { - ret = store_item(it, comm, c); + uint64_t cas = 0; + c->thread->cur_sfd = c->sfd; // cuddle sfd for logging. + ret = store_item(it, comm, c->thread, &cas, c->set_stale); #ifdef ENABLE_DTRACE uint64_t cas = ITEM_get_cas(it); @@ -201,6 +203,7 @@ void complete_nread_ascii(conn *c) { #endif if (c->mset_res) { + c->cas = cas; _finalize_mset(c, ret); } else { switch (ret) { @@ -565,7 +568,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, goto stop; } - it = limited_get(key, nkey, c, exptime, should_touch, DO_UPDATE, &overflow); + it = limited_get(key, nkey, c->thread, exptime, should_touch, DO_UPDATE, &overflow); if (settings.detail_enabled) { stats_prefix_record_get(key, nkey, NULL != it); } @@ -844,7 +847,7 @@ static void process_meta_command(conn *c, token_t *tokens, const size_t ntokens) } bool overflow; // not used here. - item *it = limited_get(key, nkey, c, 0, false, DONT_UPDATE, &overflow); + item *it = limited_get(key, nkey, c->thread, 0, false, DONT_UPDATE, &overflow); if (it) { mc_resp *resp = c->resp; size_t total = 0; @@ -1093,10 +1096,10 @@ static void process_mget_command(conn *c, token_t *tokens, const size_t ntokens) // I think we do, since an overflow shouldn't trigger an alloc/replace. bool overflow = false; if (!of.locked) { - it = limited_get(key, nkey, c, 0, false, !of.no_update, &overflow); + it = limited_get(key, nkey, c->thread, 0, false, !of.no_update, &overflow); } else { // If we had to lock the item, we're doing our own bump later. - it = limited_get_locked(key, nkey, c, DONT_UPDATE, &hv, &overflow); + it = limited_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv, &overflow); } // Since we're a new protocol, we can actually inform users that refcount @@ -1294,7 +1297,7 @@ static void process_mget_command(conn *c, token_t *tokens, const size_t ntokens) if (of.locked) { // Delayed bump so we could get fetched/last access time pre-update. if (!of.no_update && it != NULL) { - do_item_bump(c, it, hv); + do_item_bump(c->thread, it, hv); } item_unlock(hv); } @@ -1513,7 +1516,7 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens) /* Avoid stale data persisting in cache because we failed alloc. */ // NOTE: only if SET mode? - it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); + it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv); if (it) { do_item_unlink(it, hv); STORAGE_delete(c->thread->storage, it); @@ -1620,7 +1623,7 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke } } - it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); + it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv); if (it) { MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); @@ -1759,7 +1762,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n // return a referenced item if it exists, so we can modify it here, rather // than adding even more parameters to do_add_delta. bool item_created = false; - switch(do_add_delta(c, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) { + switch(do_add_delta(c->thread, key, nkey, incr, of.delta, tmpbuf, &of.req_cas_id, hv, &it)) { case OK: if (c->noreply) resp->skip = true; @@ -1782,7 +1785,7 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n if (it != NULL) { memcpy(ITEM_data(it), tmpbuf, vlen); memcpy(ITEM_data(it) + vlen, "\r\n", 2); - if (do_store_item(it, NREAD_ADD, c, hv)) { + if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, CAS_NO_STALE)) { item_created = true; } else { // Not sure how we can get here if we're holding the lock. @@ -1987,7 +1990,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken /* Avoid stale data persisting in cache because we failed alloc. * Unacceptable for SET. Anywhere else too? */ if (comm == NREAD_SET) { - it = item_get(key, nkey, c, DONT_UPDATE); + it = item_get(key, nkey, c->thread, DONT_UPDATE); if (it) { item_unlink(it); STORAGE_delete(c->thread->storage, it); @@ -2039,7 +2042,7 @@ static void process_touch_command(conn *c, token_t *tokens, const size_t ntokens } exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int)); - it = item_touch(key, nkey, exptime, c); + it = item_touch(key, nkey, exptime, c->thread); if (it) { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.touch_cmds++; @@ -2081,7 +2084,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt return; } - switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) { + switch(add_delta(c->thread, key, nkey, incr, delta, temp, NULL)) { case OK: out_string(c, temp); break; @@ -2141,7 +2144,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken stats_prefix_record_delete(key, nkey); } - it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); + it = item_get_locked(key, nkey, c->thread, DONT_UPDATE, &hv); if (it) { MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); @@ -2734,6 +2737,7 @@ void process_command_ascii(conn *c, char *command) { return; } + c->thread->cur_sfd = c->sfd; // cuddle sfd for logging. ntokens = tokenize_command(command, tokens, MAX_TOKENS); // All commands need a minimum of two tokens: cmd and NULL finalizer // There are also no valid commands shorter than two bytes. diff --git a/t/watcher.t b/t/watcher.t index ddde520..f977c2a 100755 --- a/t/watcher.t +++ b/t/watcher.t @@ -5,11 +5,13 @@ use strict; use warnings; use Socket qw/SO_RCVBUF/; -use Test::More tests => 30; +use Test::More; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; +plan tests => 30; + my $server = new_memcached('-m 60 -o watcher_logbuf_size=8'); my $client = $server->sock; my $watcher = $server->new_sock; diff --git a/t/watcher_connid.t b/t/watcher_connid.t index a5f527b..c98f86b 100644 --- a/t/watcher_connid.t +++ b/t/watcher_connid.t @@ -7,11 +7,13 @@ use strict; use warnings; use Socket qw/SO_RCVBUF/; -use Test::More tests => 4; +use Test::More; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; +plan tests => 4; + my $server = new_memcached('-m 60 -o watcher_logbuf_size=8'); my $client_first = $server->sock; @@ -809,7 +809,7 @@ void sidethread_conn_close(conn *c) { /* * Allocates a new item. */ -item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) { +item *item_alloc(const char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) { item *it; /* do_item_alloc handles its own locks */ it = do_item_alloc(key, nkey, flags, exptime, nbytes); @@ -820,12 +820,12 @@ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbyt * Returns an item if it hasn't been marked as expired, * lazy-expiring as needed. */ -item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) { +item *item_get(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update) { item *it; uint32_t hv; hv = hash(key, nkey); item_lock(hv); - it = do_item_get(key, nkey, hv, c, do_update); + it = do_item_get(key, nkey, hv, t, do_update); item_unlock(hv); return it; } @@ -833,20 +833,20 @@ item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update // returns an item with the item lock held. // lock will still be held even if return is NULL, allowing caller to replace // an item atomically if desired. -item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) { +item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, const bool do_update, uint32_t *hv) { item *it; *hv = hash(key, nkey); item_lock(*hv); - it = do_item_get(key, nkey, *hv, c, do_update); + it = do_item_get(key, nkey, *hv, t, do_update); return it; } -item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) { +item *item_touch(const char *key, size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t) { item *it; uint32_t hv; hv = hash(key, nkey); item_lock(hv); - it = do_item_touch(key, nkey, exptime, hv, c); + it = do_item_touch(key, nkey, exptime, hv, t); item_unlock(hv); return it; } @@ -901,7 +901,7 @@ void item_unlink(item *item) { /* * Does arithmetic on a numeric item value. */ -enum delta_result_type add_delta(conn *c, const char *key, +enum delta_result_type add_delta(LIBEVENT_THREAD *t, const char *key, const size_t nkey, bool incr, const int64_t delta, char *buf, uint64_t *cas) { @@ -910,7 +910,7 @@ enum delta_result_type add_delta(conn *c, const char *key, hv = hash(key, nkey); item_lock(hv); - ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv, NULL); + ret = do_add_delta(t, key, nkey, incr, delta, buf, cas, hv, NULL); item_unlock(hv); return ret; } @@ -918,13 +918,13 @@ enum delta_result_type add_delta(conn *c, const char *key, /* * Stores an item in the cache (high level, obeys set/add/replace semantics) */ -enum store_item_type store_item(item *item, int comm, conn* c) { +enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, uint64_t *cas, bool cas_stale) { enum store_item_type ret; uint32_t hv; hv = hash(ITEM_key(item), item->nkey); item_lock(hv); - ret = do_store_item(item, comm, c, hv); + ret = do_store_item(item, comm, t, hv, cas, cas_stale); item_unlock(hv); return ret; } |