diff options
author | dormando <dormando@rydia.net> | 2011-07-11 18:12:59 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2011-07-11 18:12:59 -0700 |
commit | ea2d42a50c07b862a465ba7926e9715a945304ef (patch) | |
tree | 7ceac67bb8cf3e9d138f76768f57d92835ba9af7 | |
parent | cbcd3872cbfa0dfc086b82ed08c370dc62fb9235 (diff) | |
download | memcached-ea2d42a50c07b862a465ba7926e9715a945304ef.tar.gz |
fix incr/decr race conditions for binary prot
there were two race conditions in the incr/decr binary protocol handler. One
was the original "fetches item outside of add_delta", and the second was in
the initializer.
I went for the quick fix by changing the semantics of the store request to be
an ADD instead of a SET, so if someone beat them in that very narrow race the
request simply bounces. Not perfect but this is an improvement and good enough
for now.
-rw-r--r-- | memcached.c | 127 | ||||
-rw-r--r-- | memcached.h | 8 | ||||
-rw-r--r-- | thread.c | 5 |
3 files changed, 73 insertions, 67 deletions
diff --git a/memcached.c b/memcached.c index d5c5904..0d484dc 100644 --- a/memcached.c +++ b/memcached.c @@ -998,6 +998,9 @@ static void complete_incr_bin(conn *c) { item *it; char *key; size_t nkey; + /* Weird magic in add_delta forces me to pad here */ + char tmpbuf[INCR_MAX_STORAGE_LEN]; + uint64_t cas = 0; protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf; protocol_binary_request_incr* req = binary_get_request(c); @@ -1025,72 +1028,62 @@ static void complete_incr_bin(conn *c) { req->message.body.expiration); } - it = item_get(key, nkey); - if (it && (c->binary_header.request.cas == 0 || - c->binary_header.request.cas == ITEM_get_cas(it))) { - /* Weird magic in add_delta forces me to pad here */ - char tmpbuf[INCR_MAX_STORAGE_LEN]; - protocol_binary_response_status st = PROTOCOL_BINARY_RESPONSE_SUCCESS; - - switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, - req->message.body.delta, tmpbuf)) { - case OK: - break; - case NON_NUMERIC: - st = PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL; - break; - case EOM: - st = PROTOCOL_BINARY_RESPONSE_ENOMEM; - break; - case DELTA_ITEM_NOT_FOUND: - break; - } - - if (st != PROTOCOL_BINARY_RESPONSE_SUCCESS) { - write_bin_error(c, st, 0); - } else { - rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10)); - c->cas = ITEM_get_cas(it); - write_bin_response(c, &rsp->message.body, 0, 0, - sizeof(rsp->message.body.value)); + if (c->binary_header.request.cas != 0) { + cas = c->binary_header.request.cas; + } + switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, + req->message.body.delta, tmpbuf, + &cas)) { + case OK: + rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10)); + if (cas) { + c->cas = cas; } - - item_remove(it); /* release our reference */ - } else if (!it && req->message.body.expiration != 0xffffffff) { - /* Save some room for the response */ - rsp->message.body.value = htonll(req->message.body.initial); - it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration), - INCR_MAX_STORAGE_LEN); - - if (it != NULL) { - snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", - (unsigned long long)req->message.body.initial); - - if (store_item(it, NREAD_SET, c)) { - c->cas = ITEM_get_cas(it); - write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); + write_bin_response(c, &rsp->message.body, 0, 0, + sizeof(rsp->message.body.value)); + break; + case NON_NUMERIC: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0); + break; + case EOM: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); + break; + case DELTA_ITEM_NOT_FOUND: + if (req->message.body.expiration != 0xffffffff) { + /* Save some room for the response */ + rsp->message.body.value = htonll(req->message.body.initial); + it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration), + INCR_MAX_STORAGE_LEN); + + if (it != NULL) { + snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", + (unsigned long long)req->message.body.initial); + + if (store_item(it, NREAD_ADD, c)) { + c->cas = ITEM_get_cas(it); + write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0); + } + item_remove(it); /* release our reference */ } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0); + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); } - item_remove(it); /* release our reference */ } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0); - } - } else if (it) { - /* incorrect CAS */ - item_remove(it); /* release our reference */ - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0); - } else { + pthread_mutex_lock(&c->thread->stats.mutex); + if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) { + c->thread->stats.incr_misses++; + } else { + c->thread->stats.decr_misses++; + } + pthread_mutex_unlock(&c->thread->stats.mutex); - pthread_mutex_lock(&c->thread->stats.mutex); - if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) { - c->thread->stats.incr_misses++; - } else { - c->thread->stats.decr_misses++; + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0); } - pthread_mutex_unlock(&c->thread->stats.mutex); - - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0); + break; + case DELTA_ITEM_CAS_MISMATCH: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0); + break; } } @@ -2791,7 +2784,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt return; } - switch(add_delta(c, key, nkey, incr, delta, temp)) { + switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) { case OK: out_string(c, temp); break; @@ -2812,6 +2805,8 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt out_string(c, "NOT_FOUND"); break; + case DELTA_ITEM_CAS_MISMATCH: + break; /* Should never get here */ } } @@ -2828,7 +2823,7 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt */ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, const bool incr, const int64_t delta, - char *buf) { + char *buf, uint64_t *cas) { char *ptr; uint64_t value; int res; @@ -2839,6 +2834,11 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, return DELTA_ITEM_NOT_FOUND; } + if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) { + do_item_remove(it); + return DELTA_ITEM_CAS_MISMATCH; + } + ptr = ITEM_data(it); if (!safe_strtoull(ptr, &value)) { @@ -2888,6 +2888,9 @@ enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2); } + if (cas) { + *cas = ITEM_get_cas(it); /* swap the incoming CAS value */ + } do_item_remove(it); /* release our reference */ return OK; } diff --git a/memcached.h b/memcached.h index 27aef52..1146ecc 100644 --- a/memcached.h +++ b/memcached.h @@ -191,7 +191,7 @@ enum store_item_type { }; enum delta_result_type { - OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND + OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH }; /** Time relative to server start. Smaller than time_t on 64-bit systems. */ @@ -437,7 +437,8 @@ extern volatile rel_time_t current_time; void do_accept_new_conns(const bool do_accept); enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey, const bool incr, - const int64_t delta, char *buf); + const int64_t delta, char *buf, + uint64_t *cas); enum store_item_type do_store_item(item *item, int comm, conn* c); conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base); extern int daemonize(int nochdir, int noclose); @@ -465,7 +466,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, in /* Lock wrappers for cache functions that are called from main loop. */ enum delta_result_type add_delta(conn *c, const char *key, const size_t nkey, const int incr, - const int64_t delta, char *buf); + const int64_t delta, char *buf, + uint64_t *cas); void accept_new_conns(const bool do_accept); conn *conn_from_freelist(void); bool conn_add_to_freelist(conn *c); @@ -400,11 +400,12 @@ void item_update(item *item) { */ enum delta_result_type add_delta(conn *c, const char *key, const size_t nkey, int incr, - const int64_t delta, char *buf) { + const int64_t delta, char *buf, + uint64_t *cas) { enum delta_result_type ret; pthread_mutex_lock(&cache_lock); - ret = do_add_delta(c, key, nkey, incr, delta, buf); + ret = do_add_delta(c, key, nkey, incr, delta, buf, cas); pthread_mutex_unlock(&cache_lock); return ret; } |