diff options
-rw-r--r-- | Makefile.am | 3 | ||||
-rw-r--r-- | memcached.c | 1446 | ||||
-rw-r--r-- | proto_bin.c | 1326 | ||||
-rw-r--r-- | proto_bin.h | 10 | ||||
-rw-r--r-- | proto_text.c | 141 | ||||
-rw-r--r-- | proto_text.h | 2 |
6 files changed, 1480 insertions, 1448 deletions
diff --git a/Makefile.am b/Makefile.am index 11e51b5..8ea4841 100644 --- a/Makefile.am +++ b/Makefile.am @@ -26,7 +26,8 @@ memcached_SOURCES = memcached.c memcached.h \ slab_automove.c slab_automove.h \ authfile.c authfile.h \ restart.c restart.h \ - proto_text.c proto_text.h + proto_text.c proto_text.h \ + proto_bin.c proto_bin.h if BUILD_SOLARIS_PRIVS memcached_SOURCES += solaris_priv.c diff --git a/memcached.c b/memcached.c index c0f1ed8..a21b4aa 100644 --- a/memcached.c +++ b/memcached.c @@ -57,6 +57,7 @@ #endif #include "proto_text.h" +#include "proto_bin.h" #if defined(__FreeBSD__) #include <sys/sysctl.h> @@ -80,7 +81,6 @@ enum try_read_result { static int try_read_command_negotiate(conn *c); static int try_read_command_udp(conn *c); -static int try_read_command_binary(conn *c); static enum try_read_result try_read_network(conn *c); static enum try_read_result try_read_udp(conn *c); @@ -101,25 +101,12 @@ static void conn_close(conn *c); static void conn_init(void); static bool update_event(conn *c, const int new_flags); static void complete_nread(conn *c); -static void write_bin_error(conn *c, protocol_binary_response_status err, - const char *errstr, int swallow); -static void write_bin_miss_response(conn *c, char *key, size_t nkey); #ifdef EXTSTORE static void _get_extstore_cb(void *e, obj_io *io, int ret); #endif static void conn_free(conn *c); -/** binprot handlers **/ -static void process_bin_flush(conn *c, char *extbuf); -static void process_bin_append_prepend(conn *c); -static void process_bin_update(conn *c, char *extbuf); -static void process_bin_get_or_touch(conn *c, char *extbuf); -static void process_bin_delete(conn *c); -static void complete_incr_bin(conn *c, char *extbuf); -static void process_bin_stat(conn *c); -static void process_bin_sasl_auth(conn *c); - /** exported globals **/ struct stats stats; struct stats_state stats_state; @@ -1262,608 +1249,6 @@ void out_of_memory(conn *c, char *ascii_error) { } } -/* - * we get here after reading the value in set/add/replace commands. The command - * has been stored in c->cmd, and the item is ready in c->item. - */ -static void complete_nread_ascii(conn *c) { - assert(c != NULL); - - item *it = c->item; - int comm = c->cmd; - enum store_item_type ret; - bool is_valid = false; - - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); - - if ((it->it_flags & ITEM_CHUNKED) == 0) { - if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) { - is_valid = true; - } - } else { - char buf[2]; - /* should point to the final item chunk */ - item_chunk *ch = (item_chunk *) c->ritem; - assert(ch->used != 0); - /* :( We need to look at the last two bytes. This could span two - * chunks. - */ - if (ch->used > 1) { - buf[0] = ch->data[ch->used - 2]; - buf[1] = ch->data[ch->used - 1]; - } else { - assert(ch->prev); - assert(ch->used == 1); - buf[0] = ch->prev->data[ch->prev->used - 1]; - buf[1] = ch->data[ch->used - 1]; - } - if (strncmp(buf, "\r\n", 2) == 0) { - is_valid = true; - } else { - assert(1 == 0); - } - } - - if (!is_valid) { - // metaset mode always returns errors. - if (c->mset_res) { - c->noreply = false; - } - out_string(c, "CLIENT_ERROR bad data chunk"); - } else { - ret = store_item(it, comm, c); - -#ifdef ENABLE_DTRACE - uint64_t cas = ITEM_get_cas(it); - switch (c->cmd) { - case NREAD_ADD: - MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey, - (ret == 1) ? it->nbytes : -1, cas); - break; - case NREAD_REPLACE: - MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey, - (ret == 1) ? it->nbytes : -1, cas); - break; - case NREAD_APPEND: - MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey, - (ret == 1) ? it->nbytes : -1, cas); - break; - case NREAD_PREPEND: - MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey, - (ret == 1) ? it->nbytes : -1, cas); - break; - case NREAD_SET: - MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey, - (ret == 1) ? it->nbytes : -1, cas); - break; - case NREAD_CAS: - MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes, - cas); - break; - } -#endif - - if (c->mset_res) { - // Replace the status code in the response. - // Rest was prepared during mset parsing. - mc_resp *resp = c->resp; - conn_set_state(c, conn_new_cmd); - switch (ret) { - case STORED: - memcpy(resp->wbuf, "OK ", 3); - // Only place noreply is used for meta cmds is a nominal response. - if (c->noreply) { - resp->skip = true; - } - break; - case EXISTS: - memcpy(resp->wbuf, "EX ", 3); - break; - case NOT_FOUND: - memcpy(resp->wbuf, "NF ", 3); - break; - case NOT_STORED: - memcpy(resp->wbuf, "NS ", 3); - break; - default: - c->noreply = false; - out_string(c, "SERVER_ERROR Unhandled storage type."); - } - } else { - switch (ret) { - case STORED: - out_string(c, "STORED"); - break; - case EXISTS: - out_string(c, "EXISTS"); - break; - case NOT_FOUND: - out_string(c, "NOT_FOUND"); - break; - case NOT_STORED: - out_string(c, "NOT_STORED"); - break; - default: - out_string(c, "SERVER_ERROR Unhandled storage type."); - } - } - - } - - c->set_stale = false; /* force flag to be off just in case */ - c->mset_res = false; - item_remove(c->item); /* release the c->item reference */ - c->item = 0; -} - -/** - * get a pointer to the key in this request - */ -static char* binary_get_key(conn *c) { - return c->rcurr - (c->binary_header.request.keylen); -} - -static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) { - protocol_binary_response_header* header; - mc_resp *resp = c->resp; - - assert(c); - - resp_reset(resp); - - header = (protocol_binary_response_header *)resp->wbuf; - - header->response.magic = (uint8_t)PROTOCOL_BINARY_RES; - header->response.opcode = c->binary_header.request.opcode; - header->response.keylen = (uint16_t)htons(key_len); - - header->response.extlen = (uint8_t)hdr_len; - header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->response.status = (uint16_t)htons(err); - - header->response.bodylen = htonl(body_len); - header->response.opaque = c->opaque; - header->response.cas = htonll(c->cas); - - if (settings.verbose > 1) { - int ii; - fprintf(stderr, ">%d Writing bin response:", c->sfd); - for (ii = 0; ii < sizeof(header->bytes); ++ii) { - if (ii % 4 == 0) { - fprintf(stderr, "\n>%d ", c->sfd); - } - fprintf(stderr, " 0x%02x", header->bytes[ii]); - } - fprintf(stderr, "\n"); - } - - resp->wbytes = sizeof(header->response); - resp_add_iov(resp, resp->wbuf, resp->wbytes); -} - -/** - * Writes a binary error response. If errstr is supplied, it is used as the - * error text; otherwise a generic description of the error status code is - * included. - */ -static void write_bin_error(conn *c, protocol_binary_response_status err, - const char *errstr, int swallow) { - size_t len; - - if (!errstr) { - switch (err) { - case PROTOCOL_BINARY_RESPONSE_ENOMEM: - errstr = "Out of memory"; - break; - case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: - errstr = "Unknown command"; - break; - case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: - errstr = "Not found"; - break; - case PROTOCOL_BINARY_RESPONSE_EINVAL: - errstr = "Invalid arguments"; - break; - case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: - errstr = "Data exists for key."; - break; - case PROTOCOL_BINARY_RESPONSE_E2BIG: - errstr = "Too large."; - break; - case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL: - errstr = "Non-numeric server-side value for incr or decr"; - break; - case PROTOCOL_BINARY_RESPONSE_NOT_STORED: - errstr = "Not stored."; - break; - case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR: - errstr = "Auth failure."; - break; - default: - assert(false); - errstr = "UNHANDLED ERROR"; - fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err); - } - } - - if (settings.verbose > 1) { - fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr); - } - - len = strlen(errstr); - add_bin_header(c, err, 0, 0, len); - if (len > 0) { - resp_add_iov(c->resp, errstr, len); - } - if (swallow > 0) { - c->sbytes = swallow; - conn_set_state(c, conn_swallow); - } else { - conn_set_state(c, conn_mwrite); - } -} - -/* Form and send a response to a command over the binary protocol */ -static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) { - if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET || - c->cmd == PROTOCOL_BINARY_CMD_GETK) { - add_bin_header(c, 0, hlen, keylen, dlen); - mc_resp *resp = c->resp; - if (dlen > 0) { - resp_add_iov(resp, d, dlen); - } - } - - conn_set_state(c, conn_new_cmd); -} - -static void complete_incr_bin(conn *c, char *extbuf) { - item *it; - char *key; - size_t nkey; - /* Weird magic in add_delta forces me to pad here */ - char tmpbuf[INCR_MAX_STORAGE_LEN]; - uint64_t cas = 0; - - protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->resp->wbuf; - protocol_binary_request_incr* req = (void *)extbuf; - - assert(c != NULL); - //assert(c->wsize >= sizeof(*rsp)); - - /* fix byteorder in the request */ - req->message.body.delta = ntohll(req->message.body.delta); - req->message.body.initial = ntohll(req->message.body.initial); - req->message.body.expiration = ntohl(req->message.body.expiration); - key = binary_get_key(c); - nkey = c->binary_header.request.keylen; - - if (settings.verbose > 1) { - int i; - fprintf(stderr, "incr "); - - for (i = 0; i < nkey; i++) { - fprintf(stderr, "%c", key[i]); - } - fprintf(stderr, " %lld, %llu, %d\n", - (long long)req->message.body.delta, - (long long)req->message.body.initial, - req->message.body.expiration); - } - - if (c->binary_header.request.cas != 0) { - cas = c->binary_header.request.cas; - } - switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, - req->message.body.delta, tmpbuf, - &cas)) { - case OK: - rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10)); - if (cas) { - c->cas = cas; - } - write_bin_response(c, &rsp->message.body, 0, 0, - sizeof(rsp->message.body.value)); - break; - case NON_NUMERIC: - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, NULL, 0); - break; - case EOM: - out_of_memory(c, "SERVER_ERROR Out of memory incrementing value"); - break; - case DELTA_ITEM_NOT_FOUND: - if (req->message.body.expiration != 0xffffffff) { - /* Save some room for the response */ - rsp->message.body.value = htonll(req->message.body.initial); - - snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu", - (unsigned long long)req->message.body.initial); - int res = strlen(tmpbuf); - it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration), - res + 2); - - if (it != NULL) { - memcpy(ITEM_data(it), tmpbuf, res); - memcpy(ITEM_data(it) + res, "\r\n", 2); - - if (store_item(it, NREAD_ADD, c)) { - c->cas = ITEM_get_cas(it); - write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, - NULL, 0); - } - item_remove(it); /* release our reference */ - } else { - out_of_memory(c, - "SERVER_ERROR Out of memory allocating new item"); - } - } else { - pthread_mutex_lock(&c->thread->stats.mutex); - if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) { - c->thread->stats.incr_misses++; - } else { - c->thread->stats.decr_misses++; - } - pthread_mutex_unlock(&c->thread->stats.mutex); - - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); - } - break; - case DELTA_ITEM_CAS_MISMATCH: - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); - break; - } -} - -static void complete_update_bin(conn *c) { - protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL; - enum store_item_type ret = NOT_STORED; - assert(c != NULL); - - item *it = c->item; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); - - /* We don't actually receive the trailing two characters in the bin - * protocol, so we're going to just set them here */ - if ((it->it_flags & ITEM_CHUNKED) == 0) { - *(ITEM_data(it) + it->nbytes - 2) = '\r'; - *(ITEM_data(it) + it->nbytes - 1) = '\n'; - } else { - assert(c->ritem); - item_chunk *ch = (item_chunk *) c->ritem; - if (ch->size == ch->used) - ch = ch->next; - assert(ch->size - ch->used >= 2); - ch->data[ch->used] = '\r'; - ch->data[ch->used + 1] = '\n'; - ch->used += 2; - } - - ret = store_item(it, c->cmd, c); - -#ifdef ENABLE_DTRACE - uint64_t cas = ITEM_get_cas(it); - switch (c->cmd) { - case NREAD_ADD: - MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey, - (ret == STORED) ? it->nbytes : -1, cas); - break; - case NREAD_REPLACE: - MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey, - (ret == STORED) ? it->nbytes : -1, cas); - break; - case NREAD_APPEND: - MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey, - (ret == STORED) ? it->nbytes : -1, cas); - break; - case NREAD_PREPEND: - MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey, - (ret == STORED) ? it->nbytes : -1, cas); - break; - case NREAD_SET: - MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey, - (ret == STORED) ? it->nbytes : -1, cas); - break; - } -#endif - - switch (ret) { - case STORED: - /* Stored */ - write_bin_response(c, NULL, 0, 0, 0); - break; - case EXISTS: - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); - break; - case NOT_FOUND: - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); - break; - case NOT_STORED: - case TOO_LARGE: - case NO_MEMORY: - if (c->cmd == NREAD_ADD) { - eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; - } else if(c->cmd == NREAD_REPLACE) { - eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; - } else { - eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED; - } - write_bin_error(c, eno, NULL, 0); - } - - item_remove(c->item); /* release the c->item reference */ - c->item = 0; -} - -static void write_bin_miss_response(conn *c, char *key, size_t nkey) { - if (nkey) { - add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, - 0, nkey, nkey); - char *ofs = c->resp->wbuf + sizeof(protocol_binary_response_header); - memcpy(ofs, key, nkey); - resp_add_iov(c->resp, ofs, nkey); - conn_set_state(c, conn_new_cmd); - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, - NULL, 0); - } -} - -static void process_bin_get_or_touch(conn *c, char *extbuf) { - item *it; - - protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->resp->wbuf; - char* key = binary_get_key(c); - size_t nkey = c->binary_header.request.keylen; - int should_touch = (c->cmd == PROTOCOL_BINARY_CMD_TOUCH || - c->cmd == PROTOCOL_BINARY_CMD_GAT || - c->cmd == PROTOCOL_BINARY_CMD_GATK); - int should_return_key = (c->cmd == PROTOCOL_BINARY_CMD_GETK || - c->cmd == PROTOCOL_BINARY_CMD_GATK); - int should_return_value = (c->cmd != PROTOCOL_BINARY_CMD_TOUCH); - bool failed = false; - - if (settings.verbose > 1) { - fprintf(stderr, "<%d %s ", c->sfd, should_touch ? "TOUCH" : "GET"); - if (fwrite(key, 1, nkey, stderr)) {} - fputc('\n', stderr); - } - - if (should_touch) { - protocol_binary_request_touch *t = (void *)extbuf; - time_t exptime = ntohl(t->message.body.expiration); - - it = item_touch(key, nkey, realtime(exptime), c); - } else { - it = item_get(key, nkey, c, DO_UPDATE); - } - - if (it) { - /* the length has two unnecessary bytes ("\r\n") */ - uint16_t keylen = 0; - uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2); - - pthread_mutex_lock(&c->thread->stats.mutex); - if (should_touch) { - c->thread->stats.touch_cmds++; - c->thread->stats.slab_stats[ITEM_clsid(it)].touch_hits++; - } else { - c->thread->stats.get_cmds++; - c->thread->stats.lru_hits[it->slabs_clsid]++; - } - pthread_mutex_unlock(&c->thread->stats.mutex); - - if (should_touch) { - MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey, - it->nbytes, ITEM_get_cas(it)); - } else { - MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey, - it->nbytes, ITEM_get_cas(it)); - } - - if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) { - bodylen -= it->nbytes - 2; - } else if (should_return_key) { - bodylen += nkey; - keylen = nkey; - } - - add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen); - rsp->message.header.response.cas = htonll(ITEM_get_cas(it)); - - // add the flags - FLAGS_CONV(it, rsp->message.body.flags); - rsp->message.body.flags = htonl(rsp->message.body.flags); - resp_add_iov(c->resp, &rsp->message.body, sizeof(rsp->message.body)); - - if (should_return_key) { - resp_add_iov(c->resp, ITEM_key(it), nkey); - } - - if (should_return_value) { - /* Add the data minus the CRLF */ -#ifdef EXTSTORE - if (it->it_flags & ITEM_HDR) { - if (_get_extstore(c, it, c->resp) != 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.get_oom_extstore++; - pthread_mutex_unlock(&c->thread->stats.mutex); - - failed = true; - } - } else if ((it->it_flags & ITEM_CHUNKED) == 0) { - resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2); - } else { - // Allow transmit handler to find the item and expand iov's - resp_add_chunked_iov(c->resp, it, it->nbytes - 2); - } -#else - if ((it->it_flags & ITEM_CHUNKED) == 0) { - resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2); - } else { - resp_add_chunked_iov(c->resp, it, it->nbytes - 2); - } -#endif - } - - if (!failed) { - conn_set_state(c, conn_new_cmd); - /* Remember this command so we can garbage collect it later */ -#ifdef EXTSTORE - if ((it->it_flags & ITEM_HDR) != 0 && should_return_value) { - // Only have extstore clean if header and returning value. - c->resp->item = NULL; - } else { - c->resp->item = it; - } -#else - c->resp->item = it; -#endif - } else { - item_remove(it); - } - } else { - failed = true; - } - - if (failed) { - pthread_mutex_lock(&c->thread->stats.mutex); - if (should_touch) { - c->thread->stats.touch_cmds++; - c->thread->stats.touch_misses++; - } else { - c->thread->stats.get_cmds++; - c->thread->stats.get_misses++; - } - pthread_mutex_unlock(&c->thread->stats.mutex); - - if (should_touch) { - MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0); - } else { - MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0); - } - - if (c->noreply) { - conn_set_state(c, conn_new_cmd); - } else { - if (should_return_key) { - write_bin_miss_response(c, key, nkey); - } else { - write_bin_miss_response(c, NULL, 0); - } - } - } - - if (settings.detail_enabled) { - stats_prefix_record_get(key, nkey, NULL != it); - } -} - static void append_bin_stats(const char *key, const uint16_t klen, const char *val, const uint32_t vlen, conn *c) { @@ -1973,762 +1358,6 @@ void append_stats(const char *key, const uint16_t klen, assert(c->stats.offset <= c->stats.size); } -static void process_bin_stat(conn *c) { - char *subcommand = binary_get_key(c); - size_t nkey = c->binary_header.request.keylen; - - if (settings.verbose > 1) { - int ii; - fprintf(stderr, "<%d STATS ", c->sfd); - for (ii = 0; ii < nkey; ++ii) { - fprintf(stderr, "%c", subcommand[ii]); - } - fprintf(stderr, "\n"); - } - - if (nkey == 0) { - /* request all statistics */ - server_stats(&append_stats, c); - (void)get_stats(NULL, 0, &append_stats, c); - } else if (strncmp(subcommand, "reset", 5) == 0) { - stats_reset(); - } else if (strncmp(subcommand, "settings", 8) == 0) { - process_stat_settings(&append_stats, c); - } else if (strncmp(subcommand, "detail", 6) == 0) { - char *subcmd_pos = subcommand + 6; - if (strncmp(subcmd_pos, " dump", 5) == 0) { - int len; - char *dump_buf = stats_prefix_dump(&len); - if (dump_buf == NULL || len <= 0) { - out_of_memory(c, "SERVER_ERROR Out of memory generating stats"); - if (dump_buf != NULL) - free(dump_buf); - return; - } else { - append_stats("detailed", strlen("detailed"), dump_buf, len, c); - free(dump_buf); - } - } else if (strncmp(subcmd_pos, " on", 3) == 0) { - settings.detail_enabled = 1; - } else if (strncmp(subcmd_pos, " off", 4) == 0) { - settings.detail_enabled = 0; - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); - return; - } - } else { - if (get_stats(subcommand, nkey, &append_stats, c)) { - if (c->stats.buffer == NULL) { - out_of_memory(c, "SERVER_ERROR Out of memory generating stats"); - } else { - write_and_free(c, c->stats.buffer, c->stats.offset); - c->stats.buffer = NULL; - } - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); - } - - return; - } - - /* Append termination package and start the transfer */ - append_stats(NULL, 0, NULL, 0, c); - if (c->stats.buffer == NULL) { - out_of_memory(c, "SERVER_ERROR Out of memory preparing to send stats"); - } else { - write_and_free(c, c->stats.buffer, c->stats.offset); - c->stats.buffer = NULL; - } -} - -/* Just write an error message and disconnect the client */ -static void handle_binary_protocol_error(conn *c) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, 0); - if (settings.verbose) { - fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n", - c->binary_header.request.opcode, c->sfd); - } - c->close_after_write = true; -} - -static void init_sasl_conn(conn *c) { - assert(c); - /* should something else be returned? */ - if (!settings.sasl) - return; - - c->authenticated = false; - - if (!c->sasl_conn) { - int result=sasl_server_new("memcached", - NULL, - my_sasl_hostname[0] ? my_sasl_hostname : NULL, - NULL, NULL, - NULL, 0, &c->sasl_conn); - if (result != SASL_OK) { - if (settings.verbose) { - fprintf(stderr, "Failed to initialize SASL conn.\n"); - } - c->sasl_conn = NULL; - } - } -} - -static void bin_list_sasl_mechs(conn *c) { - // Guard against a disabled SASL. - if (!settings.sasl) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, - c->binary_header.request.bodylen - - c->binary_header.request.keylen); - return; - } - - init_sasl_conn(c); - const char *result_string = NULL; - unsigned int string_length = 0; - int result=sasl_listmech(c->sasl_conn, NULL, - "", /* What to prepend the string with */ - " ", /* What to separate mechanisms with */ - "", /* What to append to the string */ - &result_string, &string_length, - NULL); - if (result != SASL_OK) { - /* Perhaps there's a better error for this... */ - if (settings.verbose) { - fprintf(stderr, "Failed to list SASL mechanisms.\n"); - } - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); - return; - } - write_bin_response(c, (char*)result_string, 0, 0, string_length); -} - -static void process_bin_sasl_auth(conn *c) { - // Guard for handling disabled SASL on the server. - if (!settings.sasl) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, - c->binary_header.request.bodylen - - c->binary_header.request.keylen); - return; - } - - assert(c->binary_header.request.extlen == 0); - - int nkey = c->binary_header.request.keylen; - int vlen = c->binary_header.request.bodylen - nkey; - - if (nkey > MAX_SASL_MECH_LEN) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); - conn_set_state(c, conn_swallow); - return; - } - - char *key = binary_get_key(c); - assert(key); - - item *it = item_alloc(key, nkey, 0, 0, vlen+2); - - /* Can't use a chunked item for SASL authentication. */ - if (it == 0 || (it->it_flags & ITEM_CHUNKED)) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, NULL, vlen); - conn_set_state(c, conn_swallow); - if (it) { - do_item_remove(it); - } - return; - } - - c->item = it; - c->ritem = ITEM_data(it); - c->rlbytes = vlen; - conn_set_state(c, conn_nread); - c->substate = bin_reading_sasl_auth_data; -} - -static void process_bin_complete_sasl_auth(conn *c) { - assert(settings.sasl); - const char *out = NULL; - unsigned int outlen = 0; - - assert(c->item); - init_sasl_conn(c); - - int nkey = c->binary_header.request.keylen; - int vlen = c->binary_header.request.bodylen - nkey; - - if (nkey > ((item*) c->item)->nkey) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); - conn_set_state(c, conn_swallow); - return; - } - - char mech[nkey+1]; - memcpy(mech, ITEM_key((item*)c->item), nkey); - mech[nkey] = 0x00; - - if (settings.verbose) - fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen); - - const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item); - - if (vlen > ((item*) c->item)->nbytes) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); - conn_set_state(c, conn_swallow); - return; - } - - int result=-1; - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_SASL_AUTH: - result = sasl_server_start(c->sasl_conn, mech, - challenge, vlen, - &out, &outlen); - c->sasl_started = (result == SASL_OK || result == SASL_CONTINUE); - break; - case PROTOCOL_BINARY_CMD_SASL_STEP: - if (!c->sasl_started) { - if (settings.verbose) { - fprintf(stderr, "%d: SASL_STEP called but sasl_server_start " - "not called for this connection!\n", c->sfd); - } - break; - } - result = sasl_server_step(c->sasl_conn, - challenge, vlen, - &out, &outlen); - break; - default: - assert(false); /* CMD should be one of the above */ - /* This code is pretty much impossible, but makes the compiler - happier */ - if (settings.verbose) { - fprintf(stderr, "Unhandled command %d with challenge %s\n", - c->cmd, challenge); - } - break; - } - - if (settings.verbose) { - fprintf(stderr, "sasl result code: %d\n", result); - } - - switch(result) { - case SASL_OK: - c->authenticated = true; - write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated")); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.auth_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); - break; - case SASL_CONTINUE: - add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen); - if (outlen > 0) { - resp_add_iov(c->resp, out, outlen); - } - // Immediately flush our write. - conn_set_state(c, conn_mwrite); - break; - default: - if (settings.verbose) - fprintf(stderr, "Unknown sasl response: %d\n", result); - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.auth_cmds++; - c->thread->stats.auth_errors++; - pthread_mutex_unlock(&c->thread->stats.mutex); - } -} - -static bool authenticated(conn *c) { - assert(settings.sasl); - bool rv = false; - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */ - rv = true; - break; - default: - rv = c->authenticated; - } - - if (settings.verbose > 1) { - fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n", - c->cmd, rv ? "true" : "false"); - } - - return rv; -} - -static void dispatch_bin_command(conn *c, char *extbuf) { - int protocol_error = 0; - - uint8_t extlen = c->binary_header.request.extlen; - uint16_t keylen = c->binary_header.request.keylen; - uint32_t bodylen = c->binary_header.request.bodylen; - - if (keylen > bodylen || keylen + extlen > bodylen) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, 0); - c->close_after_write = true; - return; - } - - if (settings.sasl && !authenticated(c)) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); - c->close_after_write = true; - return; - } - - MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes); - c->noreply = true; - - /* binprot supports 16bit keys, but internals are still 8bit */ - if (keylen > KEY_MAX_LENGTH) { - handle_binary_protocol_error(c); - return; - } - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_SETQ: - c->cmd = PROTOCOL_BINARY_CMD_SET; - break; - case PROTOCOL_BINARY_CMD_ADDQ: - c->cmd = PROTOCOL_BINARY_CMD_ADD; - break; - case PROTOCOL_BINARY_CMD_REPLACEQ: - c->cmd = PROTOCOL_BINARY_CMD_REPLACE; - break; - case PROTOCOL_BINARY_CMD_DELETEQ: - c->cmd = PROTOCOL_BINARY_CMD_DELETE; - break; - case PROTOCOL_BINARY_CMD_INCREMENTQ: - c->cmd = PROTOCOL_BINARY_CMD_INCREMENT; - break; - case PROTOCOL_BINARY_CMD_DECREMENTQ: - c->cmd = PROTOCOL_BINARY_CMD_DECREMENT; - break; - case PROTOCOL_BINARY_CMD_QUITQ: - c->cmd = PROTOCOL_BINARY_CMD_QUIT; - break; - case PROTOCOL_BINARY_CMD_FLUSHQ: - c->cmd = PROTOCOL_BINARY_CMD_FLUSH; - break; - case PROTOCOL_BINARY_CMD_APPENDQ: - c->cmd = PROTOCOL_BINARY_CMD_APPEND; - break; - case PROTOCOL_BINARY_CMD_PREPENDQ: - c->cmd = PROTOCOL_BINARY_CMD_PREPEND; - break; - case PROTOCOL_BINARY_CMD_GETQ: - c->cmd = PROTOCOL_BINARY_CMD_GET; - break; - case PROTOCOL_BINARY_CMD_GETKQ: - c->cmd = PROTOCOL_BINARY_CMD_GETK; - break; - case PROTOCOL_BINARY_CMD_GATQ: - c->cmd = PROTOCOL_BINARY_CMD_GAT; - break; - case PROTOCOL_BINARY_CMD_GATKQ: - c->cmd = PROTOCOL_BINARY_CMD_GATK; - break; - default: - c->noreply = false; - } - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_VERSION: - if (extlen == 0 && keylen == 0 && bodylen == 0) { - write_bin_response(c, VERSION, 0, 0, strlen(VERSION)); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_FLUSH: - if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) { - process_bin_flush(c, extbuf); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_NOOP: - if (extlen == 0 && keylen == 0 && bodylen == 0) { - write_bin_response(c, NULL, 0, 0, 0); - // NOOP forces pipeline flush. - conn_set_state(c, conn_mwrite); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_REPLACE: - if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) { - process_bin_update(c, extbuf); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */ - case PROTOCOL_BINARY_CMD_GETK: - if (extlen == 0 && bodylen == keylen && keylen > 0) { - process_bin_get_or_touch(c, extbuf); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_DELETE: - if (keylen > 0 && extlen == 0 && bodylen == keylen) { - process_bin_delete(c); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_INCREMENT: - case PROTOCOL_BINARY_CMD_DECREMENT: - if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) { - complete_incr_bin(c, extbuf); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_APPEND: - case PROTOCOL_BINARY_CMD_PREPEND: - if (keylen > 0 && extlen == 0) { - process_bin_append_prepend(c); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_STAT: - if (extlen == 0) { - process_bin_stat(c); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_QUIT: - if (keylen == 0 && extlen == 0 && bodylen == 0) { - write_bin_response(c, NULL, 0, 0, 0); - conn_set_state(c, conn_mwrite); - c->close_after_write = true; - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: - if (extlen == 0 && keylen == 0 && bodylen == 0) { - bin_list_sasl_mechs(c); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_SASL_AUTH: - case PROTOCOL_BINARY_CMD_SASL_STEP: - if (extlen == 0 && keylen != 0) { - process_bin_sasl_auth(c); - } else { - protocol_error = 1; - } - break; - case PROTOCOL_BINARY_CMD_TOUCH: - case PROTOCOL_BINARY_CMD_GAT: - case PROTOCOL_BINARY_CMD_GATQ: - case PROTOCOL_BINARY_CMD_GATK: - case PROTOCOL_BINARY_CMD_GATKQ: - if (extlen == 4 && keylen != 0) { - process_bin_get_or_touch(c, extbuf); - } else { - protocol_error = 1; - } - break; - default: - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, - bodylen); - } - - if (protocol_error) - handle_binary_protocol_error(c); -} - -static void process_bin_update(conn *c, char *extbuf) { - char *key; - int nkey; - int vlen; - item *it; - protocol_binary_request_set* req = (void *)extbuf; - - assert(c != NULL); - - key = binary_get_key(c); - nkey = c->binary_header.request.keylen; - - /* fix byteorder in the request */ - req->message.body.flags = ntohl(req->message.body.flags); - req->message.body.expiration = ntohl(req->message.body.expiration); - - vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen); - - if (settings.verbose > 1) { - int ii; - if (c->cmd == PROTOCOL_BINARY_CMD_ADD) { - fprintf(stderr, "<%d ADD ", c->sfd); - } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) { - fprintf(stderr, "<%d SET ", c->sfd); - } else { - fprintf(stderr, "<%d REPLACE ", c->sfd); - } - for (ii = 0; ii < nkey; ++ii) { - fprintf(stderr, "%c", key[ii]); - } - - fprintf(stderr, " Value len is %d", vlen); - fprintf(stderr, "\n"); - } - - if (settings.detail_enabled) { - stats_prefix_record_set(key, nkey); - } - - it = item_alloc(key, nkey, req->message.body.flags, - realtime(req->message.body.expiration), vlen+2); - - if (it == 0) { - enum store_item_type status; - if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen); - status = TOO_LARGE; - } else { - out_of_memory(c, "SERVER_ERROR Out of memory allocating item"); - /* This error generating method eats the swallow value. Add here. */ - c->sbytes = vlen; - status = NO_MEMORY; - } - /* FIXME: losing c->cmd since it's translated below. refactor? */ - LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, - NULL, status, 0, key, nkey, req->message.body.expiration, - ITEM_clsid(it), c->sfd); - - /* Avoid stale data persisting in cache because we failed alloc. - * Unacceptable for SET. Anywhere else too? */ - if (c->cmd == PROTOCOL_BINARY_CMD_SET) { - it = item_get(key, nkey, c, DONT_UPDATE); - if (it) { - item_unlink(it); - STORAGE_delete(c->thread->storage, it); - item_remove(it); - } - } - - /* swallow the data line */ - conn_set_state(c, conn_swallow); - return; - } - - ITEM_set_cas(it, c->binary_header.request.cas); - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_ADD: - c->cmd = NREAD_ADD; - break; - case PROTOCOL_BINARY_CMD_SET: - c->cmd = NREAD_SET; - break; - case PROTOCOL_BINARY_CMD_REPLACE: - c->cmd = NREAD_REPLACE; - break; - default: - assert(0); - } - - if (ITEM_get_cas(it) != 0) { - c->cmd = NREAD_CAS; - } - - c->item = it; -#ifdef NEED_ALIGN - if (it->it_flags & ITEM_CHUNKED) { - c->ritem = ITEM_schunk(it); - } else { - c->ritem = ITEM_data(it); - } -#else - c->ritem = ITEM_data(it); -#endif - c->rlbytes = vlen; - conn_set_state(c, conn_nread); - c->substate = bin_read_set_value; -} - -static void process_bin_append_prepend(conn *c) { - char *key; - int nkey; - int vlen; - item *it; - - assert(c != NULL); - - key = binary_get_key(c); - nkey = c->binary_header.request.keylen; - vlen = c->binary_header.request.bodylen - nkey; - - if (settings.verbose > 1) { - fprintf(stderr, "Value len is %d\n", vlen); - } - - if (settings.detail_enabled) { - stats_prefix_record_set(key, nkey); - } - - it = item_alloc(key, nkey, 0, 0, vlen+2); - - if (it == 0) { - if (! item_size_ok(nkey, 0, vlen + 2)) { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen); - } else { - out_of_memory(c, "SERVER_ERROR Out of memory allocating item"); - /* OOM calls eat the swallow value. Add here. */ - c->sbytes = vlen; - } - /* swallow the data line */ - conn_set_state(c, conn_swallow); - return; - } - - ITEM_set_cas(it, c->binary_header.request.cas); - - switch (c->cmd) { - case PROTOCOL_BINARY_CMD_APPEND: - c->cmd = NREAD_APPEND; - break; - case PROTOCOL_BINARY_CMD_PREPEND: - c->cmd = NREAD_PREPEND; - break; - default: - assert(0); - } - - c->item = it; -#ifdef NEED_ALIGN - if (it->it_flags & ITEM_CHUNKED) { - c->ritem = ITEM_schunk(it); - } else { - c->ritem = ITEM_data(it); - } -#else - c->ritem = ITEM_data(it); -#endif - c->rlbytes = vlen; - conn_set_state(c, conn_nread); - c->substate = bin_read_set_value; -} - -static void process_bin_flush(conn *c, char *extbuf) { - time_t exptime = 0; - protocol_binary_request_flush* req = (void *)extbuf; - rel_time_t new_oldest = 0; - - if (!settings.flush_enabled) { - // flush_all is not allowed but we log it on stats - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); - return; - } - - if (c->binary_header.request.extlen == sizeof(req->message.body)) { - exptime = ntohl(req->message.body.expiration); - } - - if (exptime > 0) { - new_oldest = realtime(exptime); - } else { - new_oldest = current_time; - } - if (settings.use_cas) { - settings.oldest_live = new_oldest - 1; - if (settings.oldest_live <= current_time) - settings.oldest_cas = get_cas_id(); - } else { - settings.oldest_live = new_oldest; - } - - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.flush_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); - - write_bin_response(c, NULL, 0, 0, 0); -} - -static void process_bin_delete(conn *c) { - item *it; - uint32_t hv; - - char* key = binary_get_key(c); - size_t nkey = c->binary_header.request.keylen; - - assert(c != NULL); - - if (settings.verbose > 1) { - int ii; - fprintf(stderr, "Deleting "); - for (ii = 0; ii < nkey; ++ii) { - fprintf(stderr, "%c", key[ii]); - } - fprintf(stderr, "\n"); - } - - if (settings.detail_enabled) { - stats_prefix_record_delete(key, nkey); - } - - it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); - if (it) { - uint64_t cas = c->binary_header.request.cas; - if (cas == 0 || cas == ITEM_get_cas(it)) { - MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++; - pthread_mutex_unlock(&c->thread->stats.mutex); - do_item_unlink(it, hv); - STORAGE_delete(c->thread->storage, it); - write_bin_response(c, NULL, 0, 0, 0); - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); - } - do_item_remove(it); /* release our reference */ - } else { - write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.delete_misses++; - pthread_mutex_unlock(&c->thread->stats.mutex); - } - item_unlock(hv); -} - -static void complete_nread_binary(conn *c) { - assert(c != NULL); - assert(c->cmd >= 0); - - switch(c->substate) { - case bin_read_set_value: - complete_update_bin(c); - break; - case bin_reading_sasl_auth_data: - process_bin_complete_sasl_auth(c); - if (c->item) { - do_item_remove(c->item); - c->item = NULL; - } - break; - default: - fprintf(stderr, "Not handling substate %d\n", c->substate); - assert(0); - } -} - static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; @@ -3952,79 +2581,6 @@ static int try_read_command_udp(conn *c) { } } -static int try_read_command_binary(conn *c) { - /* Do we have the complete packet header? */ - if (c->rbytes < sizeof(c->binary_header)) { - /* need more data! */ - return 0; - } else { - memcpy(&c->binary_header, c->rcurr, sizeof(c->binary_header)); - protocol_binary_request_header* req; - req = &c->binary_header; - - if (settings.verbose > 1) { - /* Dump the packet before we convert it to host order */ - int ii; - fprintf(stderr, "<%d Read binary protocol data:", c->sfd); - for (ii = 0; ii < sizeof(req->bytes); ++ii) { - if (ii % 4 == 0) { - fprintf(stderr, "\n<%d ", c->sfd); - } - fprintf(stderr, " 0x%02x", req->bytes[ii]); - } - fprintf(stderr, "\n"); - } - - c->binary_header = *req; - c->binary_header.request.keylen = ntohs(req->request.keylen); - c->binary_header.request.bodylen = ntohl(req->request.bodylen); - c->binary_header.request.cas = ntohll(req->request.cas); - - if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) { - if (settings.verbose) { - fprintf(stderr, "Invalid magic: %x\n", - c->binary_header.request.magic); - } - conn_set_state(c, conn_closing); - return -1; - } - - uint8_t extlen = c->binary_header.request.extlen; - uint16_t keylen = c->binary_header.request.keylen; - if (c->rbytes < keylen + extlen + sizeof(c->binary_header)) { - // Still need more bytes. Let try_read_network() realign the - // read-buffer and fetch more data as necessary. - return 0; - } - - if (!resp_start(c)) { - conn_set_state(c, conn_closing); - return -1; - } - - c->cmd = c->binary_header.request.opcode; - c->keylen = c->binary_header.request.keylen; - c->opaque = c->binary_header.request.opaque; - /* clear the returned cas value */ - c->cas = 0; - - c->last_cmd_time = current_time; - // sigh. binprot has no "largest possible extlen" define, and I don't - // want to refactor a ton of code either. Header is only ever used out - // of c->binary_header, but the extlen stuff is used for the latter - // bytes. Just wastes 24 bytes on the stack this way. - char extbuf[sizeof(c->binary_header) + BIN_MAX_EXTLEN+1]; - memcpy(extbuf + sizeof(c->binary_header), c->rcurr + sizeof(c->binary_header), - extlen > BIN_MAX_EXTLEN ? BIN_MAX_EXTLEN : extlen); - c->rbytes -= sizeof(c->binary_header) + extlen + keylen; - c->rcurr += sizeof(c->binary_header) + extlen + keylen; - - dispatch_bin_command(c, extbuf); - } - - return 1; -} - /* * read a UDP request. */ diff --git a/proto_bin.c b/proto_bin.c new file mode 100644 index 0000000..3e9c38f --- /dev/null +++ b/proto_bin.c @@ -0,0 +1,1326 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Functions for handling the binary protocol. + * NOTE: The binary protocol is deprecated as of 1.6.0. + */ + +#include "memcached.h" +#include "proto_bin.h" +#ifdef TLS +#include "tls.h" +#endif +#include <string.h> +#include <stdlib.h> + +/** binprot handlers **/ +static void process_bin_flush(conn *c, char *extbuf); +static void process_bin_append_prepend(conn *c); +static void process_bin_update(conn *c, char *extbuf); +static void process_bin_get_or_touch(conn *c, char *extbuf); +static void process_bin_delete(conn *c); +static void complete_incr_bin(conn *c, char *extbuf); +static void process_bin_stat(conn *c); +static void process_bin_sasl_auth(conn *c); +static void dispatch_bin_command(conn *c, char *extbuf); +static void complete_update_bin(conn *c); +static void process_bin_complete_sasl_auth(conn *c); + +static void write_bin_miss_response(conn *c, char *key, size_t nkey); + +void complete_nread_binary(conn *c) { + assert(c != NULL); + assert(c->cmd >= 0); + + switch(c->substate) { + case bin_read_set_value: + complete_update_bin(c); + break; + case bin_reading_sasl_auth_data: + process_bin_complete_sasl_auth(c); + if (c->item) { + do_item_remove(c->item); + c->item = NULL; + } + break; + default: + fprintf(stderr, "Not handling substate %d\n", c->substate); + assert(0); + } +} + +int try_read_command_binary(conn *c) { + /* Do we have the complete packet header? */ + if (c->rbytes < sizeof(c->binary_header)) { + /* need more data! */ + return 0; + } else { + memcpy(&c->binary_header, c->rcurr, sizeof(c->binary_header)); + protocol_binary_request_header* req; + req = &c->binary_header; + + if (settings.verbose > 1) { + /* Dump the packet before we convert it to host order */ + int ii; + fprintf(stderr, "<%d Read binary protocol data:", c->sfd); + for (ii = 0; ii < sizeof(req->bytes); ++ii) { + if (ii % 4 == 0) { + fprintf(stderr, "\n<%d ", c->sfd); + } + fprintf(stderr, " 0x%02x", req->bytes[ii]); + } + fprintf(stderr, "\n"); + } + + c->binary_header = *req; + c->binary_header.request.keylen = ntohs(req->request.keylen); + c->binary_header.request.bodylen = ntohl(req->request.bodylen); + c->binary_header.request.cas = ntohll(req->request.cas); + + if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) { + if (settings.verbose) { + fprintf(stderr, "Invalid magic: %x\n", + c->binary_header.request.magic); + } + conn_set_state(c, conn_closing); + return -1; + } + + uint8_t extlen = c->binary_header.request.extlen; + uint16_t keylen = c->binary_header.request.keylen; + if (c->rbytes < keylen + extlen + sizeof(c->binary_header)) { + // Still need more bytes. Let try_read_network() realign the + // read-buffer and fetch more data as necessary. + return 0; + } + + if (!resp_start(c)) { + conn_set_state(c, conn_closing); + return -1; + } + + c->cmd = c->binary_header.request.opcode; + c->keylen = c->binary_header.request.keylen; + c->opaque = c->binary_header.request.opaque; + /* clear the returned cas value */ + c->cas = 0; + + c->last_cmd_time = current_time; + // sigh. binprot has no "largest possible extlen" define, and I don't + // want to refactor a ton of code either. Header is only ever used out + // of c->binary_header, but the extlen stuff is used for the latter + // bytes. Just wastes 24 bytes on the stack this way. + char extbuf[sizeof(c->binary_header) + BIN_MAX_EXTLEN+1]; + memcpy(extbuf + sizeof(c->binary_header), c->rcurr + sizeof(c->binary_header), + extlen > BIN_MAX_EXTLEN ? BIN_MAX_EXTLEN : extlen); + c->rbytes -= sizeof(c->binary_header) + extlen + keylen; + c->rcurr += sizeof(c->binary_header) + extlen + keylen; + + dispatch_bin_command(c, extbuf); + } + + return 1; +} + +/** + * get a pointer to the key in this request + */ +static char* binary_get_key(conn *c) { + return c->rcurr - (c->binary_header.request.keylen); +} + +static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) { + protocol_binary_response_header* header; + mc_resp *resp = c->resp; + + assert(c); + + resp_reset(resp); + + header = (protocol_binary_response_header *)resp->wbuf; + + header->response.magic = (uint8_t)PROTOCOL_BINARY_RES; + header->response.opcode = c->binary_header.request.opcode; + header->response.keylen = (uint16_t)htons(key_len); + + header->response.extlen = (uint8_t)hdr_len; + header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES; + header->response.status = (uint16_t)htons(err); + + header->response.bodylen = htonl(body_len); + header->response.opaque = c->opaque; + header->response.cas = htonll(c->cas); + + if (settings.verbose > 1) { + int ii; + fprintf(stderr, ">%d Writing bin response:", c->sfd); + for (ii = 0; ii < sizeof(header->bytes); ++ii) { + if (ii % 4 == 0) { + fprintf(stderr, "\n>%d ", c->sfd); + } + fprintf(stderr, " 0x%02x", header->bytes[ii]); + } + fprintf(stderr, "\n"); + } + + resp->wbytes = sizeof(header->response); + resp_add_iov(resp, resp->wbuf, resp->wbytes); +} + + +/** + * Writes a binary error response. If errstr is supplied, it is used as the + * error text; otherwise a generic description of the error status code is + * included. + */ +void write_bin_error(conn *c, protocol_binary_response_status err, + const char *errstr, int swallow) { + size_t len; + + if (!errstr) { + switch (err) { + case PROTOCOL_BINARY_RESPONSE_ENOMEM: + errstr = "Out of memory"; + break; + case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: + errstr = "Unknown command"; + break; + case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: + errstr = "Not found"; + break; + case PROTOCOL_BINARY_RESPONSE_EINVAL: + errstr = "Invalid arguments"; + break; + case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: + errstr = "Data exists for key."; + break; + case PROTOCOL_BINARY_RESPONSE_E2BIG: + errstr = "Too large."; + break; + case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL: + errstr = "Non-numeric server-side value for incr or decr"; + break; + case PROTOCOL_BINARY_RESPONSE_NOT_STORED: + errstr = "Not stored."; + break; + case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR: + errstr = "Auth failure."; + break; + default: + assert(false); + errstr = "UNHANDLED ERROR"; + fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err); + } + } + + if (settings.verbose > 1) { + fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr); + } + + len = strlen(errstr); + add_bin_header(c, err, 0, 0, len); + if (len > 0) { + resp_add_iov(c->resp, errstr, len); + } + if (swallow > 0) { + c->sbytes = swallow; + conn_set_state(c, conn_swallow); + } else { + conn_set_state(c, conn_mwrite); + } +} + +/* Just write an error message and disconnect the client */ +static void handle_binary_protocol_error(conn *c) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, 0); + if (settings.verbose) { + fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n", + c->binary_header.request.opcode, c->sfd); + } + c->close_after_write = true; +} + +/* Form and send a response to a command over the binary protocol */ +static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) { + if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET || + c->cmd == PROTOCOL_BINARY_CMD_GETK) { + add_bin_header(c, 0, hlen, keylen, dlen); + mc_resp *resp = c->resp; + if (dlen > 0) { + resp_add_iov(resp, d, dlen); + } + } + + conn_set_state(c, conn_new_cmd); +} + +static void complete_incr_bin(conn *c, char *extbuf) { + item *it; + char *key; + size_t nkey; + /* Weird magic in add_delta forces me to pad here */ + char tmpbuf[INCR_MAX_STORAGE_LEN]; + uint64_t cas = 0; + + protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->resp->wbuf; + protocol_binary_request_incr* req = (void *)extbuf; + + assert(c != NULL); + //assert(c->wsize >= sizeof(*rsp)); + + /* fix byteorder in the request */ + req->message.body.delta = ntohll(req->message.body.delta); + req->message.body.initial = ntohll(req->message.body.initial); + req->message.body.expiration = ntohl(req->message.body.expiration); + key = binary_get_key(c); + nkey = c->binary_header.request.keylen; + + if (settings.verbose > 1) { + int i; + fprintf(stderr, "incr "); + + for (i = 0; i < nkey; i++) { + fprintf(stderr, "%c", key[i]); + } + fprintf(stderr, " %lld, %llu, %d\n", + (long long)req->message.body.delta, + (long long)req->message.body.initial, + req->message.body.expiration); + } + + if (c->binary_header.request.cas != 0) { + cas = c->binary_header.request.cas; + } + switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT, + req->message.body.delta, tmpbuf, + &cas)) { + case OK: + rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10)); + if (cas) { + c->cas = cas; + } + write_bin_response(c, &rsp->message.body, 0, 0, + sizeof(rsp->message.body.value)); + break; + case NON_NUMERIC: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, NULL, 0); + break; + case EOM: + out_of_memory(c, "SERVER_ERROR Out of memory incrementing value"); + break; + case DELTA_ITEM_NOT_FOUND: + if (req->message.body.expiration != 0xffffffff) { + /* Save some room for the response */ + rsp->message.body.value = htonll(req->message.body.initial); + + snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu", + (unsigned long long)req->message.body.initial); + int res = strlen(tmpbuf); + it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration), + res + 2); + + if (it != NULL) { + memcpy(ITEM_data(it), tmpbuf, res); + memcpy(ITEM_data(it) + res, "\r\n", 2); + + if (store_item(it, NREAD_ADD, c)) { + c->cas = ITEM_get_cas(it); + write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value)); + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, + NULL, 0); + } + item_remove(it); /* release our reference */ + } else { + out_of_memory(c, + "SERVER_ERROR Out of memory allocating new item"); + } + } else { + pthread_mutex_lock(&c->thread->stats.mutex); + if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) { + c->thread->stats.incr_misses++; + } else { + c->thread->stats.decr_misses++; + } + pthread_mutex_unlock(&c->thread->stats.mutex); + + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); + } + break; + case DELTA_ITEM_CAS_MISMATCH: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); + break; + } +} + +static void complete_update_bin(conn *c) { + protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL; + enum store_item_type ret = NOT_STORED; + assert(c != NULL); + + item *it = c->item; + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + /* We don't actually receive the trailing two characters in the bin + * protocol, so we're going to just set them here */ + if ((it->it_flags & ITEM_CHUNKED) == 0) { + *(ITEM_data(it) + it->nbytes - 2) = '\r'; + *(ITEM_data(it) + it->nbytes - 1) = '\n'; + } else { + assert(c->ritem); + item_chunk *ch = (item_chunk *) c->ritem; + if (ch->size == ch->used) + ch = ch->next; + assert(ch->size - ch->used >= 2); + ch->data[ch->used] = '\r'; + ch->data[ch->used + 1] = '\n'; + ch->used += 2; + } + + ret = store_item(it, c->cmd, c); + +#ifdef ENABLE_DTRACE + uint64_t cas = ITEM_get_cas(it); + switch (c->cmd) { + case NREAD_ADD: + MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey, + (ret == STORED) ? it->nbytes : -1, cas); + break; + case NREAD_REPLACE: + MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey, + (ret == STORED) ? it->nbytes : -1, cas); + break; + case NREAD_APPEND: + MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey, + (ret == STORED) ? it->nbytes : -1, cas); + break; + case NREAD_PREPEND: + MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey, + (ret == STORED) ? it->nbytes : -1, cas); + break; + case NREAD_SET: + MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey, + (ret == STORED) ? it->nbytes : -1, cas); + break; + } +#endif + + switch (ret) { + case STORED: + /* Stored */ + write_bin_response(c, NULL, 0, 0, 0); + break; + case EXISTS: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); + break; + case NOT_FOUND: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); + break; + case NOT_STORED: + case TOO_LARGE: + case NO_MEMORY: + if (c->cmd == NREAD_ADD) { + eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS; + } else if(c->cmd == NREAD_REPLACE) { + eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT; + } else { + eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED; + } + write_bin_error(c, eno, NULL, 0); + } + + item_remove(c->item); /* release the c->item reference */ + c->item = 0; +} + +static void write_bin_miss_response(conn *c, char *key, size_t nkey) { + if (nkey) { + add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, + 0, nkey, nkey); + char *ofs = c->resp->wbuf + sizeof(protocol_binary_response_header); + memcpy(ofs, key, nkey); + resp_add_iov(c->resp, ofs, nkey); + conn_set_state(c, conn_new_cmd); + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, + NULL, 0); + } +} + +static void process_bin_get_or_touch(conn *c, char *extbuf) { + item *it; + + protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->resp->wbuf; + char* key = binary_get_key(c); + size_t nkey = c->binary_header.request.keylen; + int should_touch = (c->cmd == PROTOCOL_BINARY_CMD_TOUCH || + c->cmd == PROTOCOL_BINARY_CMD_GAT || + c->cmd == PROTOCOL_BINARY_CMD_GATK); + int should_return_key = (c->cmd == PROTOCOL_BINARY_CMD_GETK || + c->cmd == PROTOCOL_BINARY_CMD_GATK); + int should_return_value = (c->cmd != PROTOCOL_BINARY_CMD_TOUCH); + bool failed = false; + + if (settings.verbose > 1) { + fprintf(stderr, "<%d %s ", c->sfd, should_touch ? "TOUCH" : "GET"); + if (fwrite(key, 1, nkey, stderr)) {} + fputc('\n', stderr); + } + + if (should_touch) { + protocol_binary_request_touch *t = (void *)extbuf; + time_t exptime = ntohl(t->message.body.expiration); + + it = item_touch(key, nkey, realtime(exptime), c); + } else { + it = item_get(key, nkey, c, DO_UPDATE); + } + + if (it) { + /* the length has two unnecessary bytes ("\r\n") */ + uint16_t keylen = 0; + uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2); + + pthread_mutex_lock(&c->thread->stats.mutex); + if (should_touch) { + c->thread->stats.touch_cmds++; + c->thread->stats.slab_stats[ITEM_clsid(it)].touch_hits++; + } else { + c->thread->stats.get_cmds++; + c->thread->stats.lru_hits[it->slabs_clsid]++; + } + pthread_mutex_unlock(&c->thread->stats.mutex); + + if (should_touch) { + MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey, + it->nbytes, ITEM_get_cas(it)); + } else { + MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey, + it->nbytes, ITEM_get_cas(it)); + } + + if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) { + bodylen -= it->nbytes - 2; + } else if (should_return_key) { + bodylen += nkey; + keylen = nkey; + } + + add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen); + rsp->message.header.response.cas = htonll(ITEM_get_cas(it)); + + // add the flags + FLAGS_CONV(it, rsp->message.body.flags); + rsp->message.body.flags = htonl(rsp->message.body.flags); + resp_add_iov(c->resp, &rsp->message.body, sizeof(rsp->message.body)); + + if (should_return_key) { + resp_add_iov(c->resp, ITEM_key(it), nkey); + } + + if (should_return_value) { + /* Add the data minus the CRLF */ +#ifdef EXTSTORE + if (it->it_flags & ITEM_HDR) { + if (_get_extstore(c, it, c->resp) != 0) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.get_oom_extstore++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + failed = true; + } + } else if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2); + } else { + // Allow transmit handler to find the item and expand iov's + resp_add_chunked_iov(c->resp, it, it->nbytes - 2); + } +#else + if ((it->it_flags & ITEM_CHUNKED) == 0) { + resp_add_iov(c->resp, ITEM_data(it), it->nbytes - 2); + } else { + resp_add_chunked_iov(c->resp, it, it->nbytes - 2); + } +#endif + } + + if (!failed) { + conn_set_state(c, conn_new_cmd); + /* Remember this command so we can garbage collect it later */ +#ifdef EXTSTORE + if ((it->it_flags & ITEM_HDR) != 0 && should_return_value) { + // Only have extstore clean if header and returning value. + c->resp->item = NULL; + } else { + c->resp->item = it; + } +#else + c->resp->item = it; +#endif + } else { + item_remove(it); + } + } else { + failed = true; + } + + if (failed) { + pthread_mutex_lock(&c->thread->stats.mutex); + if (should_touch) { + c->thread->stats.touch_cmds++; + c->thread->stats.touch_misses++; + } else { + c->thread->stats.get_cmds++; + c->thread->stats.get_misses++; + } + pthread_mutex_unlock(&c->thread->stats.mutex); + + if (should_touch) { + MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0); + } else { + MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0); + } + + if (c->noreply) { + conn_set_state(c, conn_new_cmd); + } else { + if (should_return_key) { + write_bin_miss_response(c, key, nkey); + } else { + write_bin_miss_response(c, NULL, 0); + } + } + } + + if (settings.detail_enabled) { + stats_prefix_record_get(key, nkey, NULL != it); + } +} + +static void process_bin_stat(conn *c) { + char *subcommand = binary_get_key(c); + size_t nkey = c->binary_header.request.keylen; + + if (settings.verbose > 1) { + int ii; + fprintf(stderr, "<%d STATS ", c->sfd); + for (ii = 0; ii < nkey; ++ii) { + fprintf(stderr, "%c", subcommand[ii]); + } + fprintf(stderr, "\n"); + } + + if (nkey == 0) { + /* request all statistics */ + server_stats(&append_stats, c); + (void)get_stats(NULL, 0, &append_stats, c); + } else if (strncmp(subcommand, "reset", 5) == 0) { + stats_reset(); + } else if (strncmp(subcommand, "settings", 8) == 0) { + process_stat_settings(&append_stats, c); + } else if (strncmp(subcommand, "detail", 6) == 0) { + char *subcmd_pos = subcommand + 6; + if (strncmp(subcmd_pos, " dump", 5) == 0) { + int len; + char *dump_buf = stats_prefix_dump(&len); + if (dump_buf == NULL || len <= 0) { + out_of_memory(c, "SERVER_ERROR Out of memory generating stats"); + if (dump_buf != NULL) + free(dump_buf); + return; + } else { + append_stats("detailed", strlen("detailed"), dump_buf, len, c); + free(dump_buf); + } + } else if (strncmp(subcmd_pos, " on", 3) == 0) { + settings.detail_enabled = 1; + } else if (strncmp(subcmd_pos, " off", 4) == 0) { + settings.detail_enabled = 0; + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); + return; + } + } else { + if (get_stats(subcommand, nkey, &append_stats, c)) { + if (c->stats.buffer == NULL) { + out_of_memory(c, "SERVER_ERROR Out of memory generating stats"); + } else { + write_and_free(c, c->stats.buffer, c->stats.offset); + c->stats.buffer = NULL; + } + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); + } + + return; + } + + /* Append termination package and start the transfer */ + append_stats(NULL, 0, NULL, 0, c); + if (c->stats.buffer == NULL) { + out_of_memory(c, "SERVER_ERROR Out of memory preparing to send stats"); + } else { + write_and_free(c, c->stats.buffer, c->stats.offset); + c->stats.buffer = NULL; + } +} + +static void init_sasl_conn(conn *c) { + assert(c); + /* should something else be returned? */ + if (!settings.sasl) + return; + + c->authenticated = false; + + if (!c->sasl_conn) { + int result=sasl_server_new("memcached", + NULL, + my_sasl_hostname[0] ? my_sasl_hostname : NULL, + NULL, NULL, + NULL, 0, &c->sasl_conn); + if (result != SASL_OK) { + if (settings.verbose) { + fprintf(stderr, "Failed to initialize SASL conn.\n"); + } + c->sasl_conn = NULL; + } + } +} + +static void bin_list_sasl_mechs(conn *c) { + // Guard against a disabled SASL. + if (!settings.sasl) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, + c->binary_header.request.bodylen + - c->binary_header.request.keylen); + return; + } + + init_sasl_conn(c); + const char *result_string = NULL; + unsigned int string_length = 0; + int result=sasl_listmech(c->sasl_conn, NULL, + "", /* What to prepend the string with */ + " ", /* What to separate mechanisms with */ + "", /* What to append to the string */ + &result_string, &string_length, + NULL); + if (result != SASL_OK) { + /* Perhaps there's a better error for this... */ + if (settings.verbose) { + fprintf(stderr, "Failed to list SASL mechanisms.\n"); + } + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); + return; + } + write_bin_response(c, (char*)result_string, 0, 0, string_length); +} + +static void process_bin_sasl_auth(conn *c) { + // Guard for handling disabled SASL on the server. + if (!settings.sasl) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, + c->binary_header.request.bodylen + - c->binary_header.request.keylen); + return; + } + + assert(c->binary_header.request.extlen == 0); + + int nkey = c->binary_header.request.keylen; + int vlen = c->binary_header.request.bodylen - nkey; + + if (nkey > MAX_SASL_MECH_LEN) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); + conn_set_state(c, conn_swallow); + return; + } + + char *key = binary_get_key(c); + assert(key); + + item *it = item_alloc(key, nkey, 0, 0, vlen+2); + + /* Can't use a chunked item for SASL authentication. */ + if (it == 0 || (it->it_flags & ITEM_CHUNKED)) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, NULL, vlen); + conn_set_state(c, conn_swallow); + if (it) { + do_item_remove(it); + } + return; + } + + c->item = it; + c->ritem = ITEM_data(it); + c->rlbytes = vlen; + conn_set_state(c, conn_nread); + c->substate = bin_reading_sasl_auth_data; +} + +static void process_bin_complete_sasl_auth(conn *c) { + assert(settings.sasl); + const char *out = NULL; + unsigned int outlen = 0; + + assert(c->item); + init_sasl_conn(c); + + int nkey = c->binary_header.request.keylen; + int vlen = c->binary_header.request.bodylen - nkey; + + if (nkey > ((item*) c->item)->nkey) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); + conn_set_state(c, conn_swallow); + return; + } + + char mech[nkey+1]; + memcpy(mech, ITEM_key((item*)c->item), nkey); + mech[nkey] = 0x00; + + if (settings.verbose) + fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen); + + const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item); + + if (vlen > ((item*) c->item)->nbytes) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, NULL, vlen); + conn_set_state(c, conn_swallow); + return; + } + + int result=-1; + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_SASL_AUTH: + result = sasl_server_start(c->sasl_conn, mech, + challenge, vlen, + &out, &outlen); + c->sasl_started = (result == SASL_OK || result == SASL_CONTINUE); + break; + case PROTOCOL_BINARY_CMD_SASL_STEP: + if (!c->sasl_started) { + if (settings.verbose) { + fprintf(stderr, "%d: SASL_STEP called but sasl_server_start " + "not called for this connection!\n", c->sfd); + } + break; + } + result = sasl_server_step(c->sasl_conn, + challenge, vlen, + &out, &outlen); + break; + default: + assert(false); /* CMD should be one of the above */ + /* This code is pretty much impossible, but makes the compiler + happier */ + if (settings.verbose) { + fprintf(stderr, "Unhandled command %d with challenge %s\n", + c->cmd, challenge); + } + break; + } + + if (settings.verbose) { + fprintf(stderr, "sasl result code: %d\n", result); + } + + switch(result) { + case SASL_OK: + c->authenticated = true; + write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated")); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.auth_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + break; + case SASL_CONTINUE: + add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen); + if (outlen > 0) { + resp_add_iov(c->resp, out, outlen); + } + // Immediately flush our write. + conn_set_state(c, conn_mwrite); + break; + default: + if (settings.verbose) + fprintf(stderr, "Unknown sasl response: %d\n", result); + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.auth_cmds++; + c->thread->stats.auth_errors++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } +} + +static bool authenticated(conn *c) { + assert(settings.sasl); + bool rv = false; + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */ + rv = true; + break; + default: + rv = c->authenticated; + } + + if (settings.verbose > 1) { + fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n", + c->cmd, rv ? "true" : "false"); + } + + return rv; +} + +static void dispatch_bin_command(conn *c, char *extbuf) { + int protocol_error = 0; + + uint8_t extlen = c->binary_header.request.extlen; + uint16_t keylen = c->binary_header.request.keylen; + uint32_t bodylen = c->binary_header.request.bodylen; + + if (keylen > bodylen || keylen + extlen > bodylen) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, 0); + c->close_after_write = true; + return; + } + + if (settings.sasl && !authenticated(c)) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); + c->close_after_write = true; + return; + } + + MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes); + c->noreply = true; + + /* binprot supports 16bit keys, but internals are still 8bit */ + if (keylen > KEY_MAX_LENGTH) { + handle_binary_protocol_error(c); + return; + } + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_SETQ: + c->cmd = PROTOCOL_BINARY_CMD_SET; + break; + case PROTOCOL_BINARY_CMD_ADDQ: + c->cmd = PROTOCOL_BINARY_CMD_ADD; + break; + case PROTOCOL_BINARY_CMD_REPLACEQ: + c->cmd = PROTOCOL_BINARY_CMD_REPLACE; + break; + case PROTOCOL_BINARY_CMD_DELETEQ: + c->cmd = PROTOCOL_BINARY_CMD_DELETE; + break; + case PROTOCOL_BINARY_CMD_INCREMENTQ: + c->cmd = PROTOCOL_BINARY_CMD_INCREMENT; + break; + case PROTOCOL_BINARY_CMD_DECREMENTQ: + c->cmd = PROTOCOL_BINARY_CMD_DECREMENT; + break; + case PROTOCOL_BINARY_CMD_QUITQ: + c->cmd = PROTOCOL_BINARY_CMD_QUIT; + break; + case PROTOCOL_BINARY_CMD_FLUSHQ: + c->cmd = PROTOCOL_BINARY_CMD_FLUSH; + break; + case PROTOCOL_BINARY_CMD_APPENDQ: + c->cmd = PROTOCOL_BINARY_CMD_APPEND; + break; + case PROTOCOL_BINARY_CMD_PREPENDQ: + c->cmd = PROTOCOL_BINARY_CMD_PREPEND; + break; + case PROTOCOL_BINARY_CMD_GETQ: + c->cmd = PROTOCOL_BINARY_CMD_GET; + break; + case PROTOCOL_BINARY_CMD_GETKQ: + c->cmd = PROTOCOL_BINARY_CMD_GETK; + break; + case PROTOCOL_BINARY_CMD_GATQ: + c->cmd = PROTOCOL_BINARY_CMD_GAT; + break; + case PROTOCOL_BINARY_CMD_GATKQ: + c->cmd = PROTOCOL_BINARY_CMD_GATK; + break; + default: + c->noreply = false; + } + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_VERSION: + if (extlen == 0 && keylen == 0 && bodylen == 0) { + write_bin_response(c, VERSION, 0, 0, strlen(VERSION)); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_FLUSH: + if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) { + process_bin_flush(c, extbuf); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_NOOP: + if (extlen == 0 && keylen == 0 && bodylen == 0) { + write_bin_response(c, NULL, 0, 0, 0); + // NOOP forces pipeline flush. + conn_set_state(c, conn_mwrite); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_REPLACE: + if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) { + process_bin_update(c, extbuf); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */ + case PROTOCOL_BINARY_CMD_GETK: + if (extlen == 0 && bodylen == keylen && keylen > 0) { + process_bin_get_or_touch(c, extbuf); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_DELETE: + if (keylen > 0 && extlen == 0 && bodylen == keylen) { + process_bin_delete(c); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_INCREMENT: + case PROTOCOL_BINARY_CMD_DECREMENT: + if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) { + complete_incr_bin(c, extbuf); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_APPEND: + case PROTOCOL_BINARY_CMD_PREPEND: + if (keylen > 0 && extlen == 0) { + process_bin_append_prepend(c); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_STAT: + if (extlen == 0) { + process_bin_stat(c); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_QUIT: + if (keylen == 0 && extlen == 0 && bodylen == 0) { + write_bin_response(c, NULL, 0, 0, 0); + conn_set_state(c, conn_mwrite); + c->close_after_write = true; + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: + if (extlen == 0 && keylen == 0 && bodylen == 0) { + bin_list_sasl_mechs(c); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_SASL_AUTH: + case PROTOCOL_BINARY_CMD_SASL_STEP: + if (extlen == 0 && keylen != 0) { + process_bin_sasl_auth(c); + } else { + protocol_error = 1; + } + break; + case PROTOCOL_BINARY_CMD_TOUCH: + case PROTOCOL_BINARY_CMD_GAT: + case PROTOCOL_BINARY_CMD_GATQ: + case PROTOCOL_BINARY_CMD_GATK: + case PROTOCOL_BINARY_CMD_GATKQ: + if (extlen == 4 && keylen != 0) { + process_bin_get_or_touch(c, extbuf); + } else { + protocol_error = 1; + } + break; + default: + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, NULL, + bodylen); + } + + if (protocol_error) + handle_binary_protocol_error(c); +} + +static void process_bin_update(conn *c, char *extbuf) { + char *key; + int nkey; + int vlen; + item *it; + protocol_binary_request_set* req = (void *)extbuf; + + assert(c != NULL); + + key = binary_get_key(c); + nkey = c->binary_header.request.keylen; + + /* fix byteorder in the request */ + req->message.body.flags = ntohl(req->message.body.flags); + req->message.body.expiration = ntohl(req->message.body.expiration); + + vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen); + + if (settings.verbose > 1) { + int ii; + if (c->cmd == PROTOCOL_BINARY_CMD_ADD) { + fprintf(stderr, "<%d ADD ", c->sfd); + } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) { + fprintf(stderr, "<%d SET ", c->sfd); + } else { + fprintf(stderr, "<%d REPLACE ", c->sfd); + } + for (ii = 0; ii < nkey; ++ii) { + fprintf(stderr, "%c", key[ii]); + } + + fprintf(stderr, " Value len is %d", vlen); + fprintf(stderr, "\n"); + } + + if (settings.detail_enabled) { + stats_prefix_record_set(key, nkey); + } + + it = item_alloc(key, nkey, req->message.body.flags, + realtime(req->message.body.expiration), vlen+2); + + if (it == 0) { + enum store_item_type status; + if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen); + status = TOO_LARGE; + } else { + out_of_memory(c, "SERVER_ERROR Out of memory allocating item"); + /* This error generating method eats the swallow value. Add here. */ + c->sbytes = vlen; + status = NO_MEMORY; + } + /* FIXME: losing c->cmd since it's translated below. refactor? */ + LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, + NULL, status, 0, key, nkey, req->message.body.expiration, + ITEM_clsid(it), c->sfd); + + /* Avoid stale data persisting in cache because we failed alloc. + * Unacceptable for SET. Anywhere else too? */ + if (c->cmd == PROTOCOL_BINARY_CMD_SET) { + it = item_get(key, nkey, c, DONT_UPDATE); + if (it) { + item_unlink(it); + STORAGE_delete(c->thread->storage, it); + item_remove(it); + } + } + + /* swallow the data line */ + conn_set_state(c, conn_swallow); + return; + } + + ITEM_set_cas(it, c->binary_header.request.cas); + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_ADD: + c->cmd = NREAD_ADD; + break; + case PROTOCOL_BINARY_CMD_SET: + c->cmd = NREAD_SET; + break; + case PROTOCOL_BINARY_CMD_REPLACE: + c->cmd = NREAD_REPLACE; + break; + default: + assert(0); + } + + if (ITEM_get_cas(it) != 0) { + c->cmd = NREAD_CAS; + } + + c->item = it; +#ifdef NEED_ALIGN + if (it->it_flags & ITEM_CHUNKED) { + c->ritem = ITEM_schunk(it); + } else { + c->ritem = ITEM_data(it); + } +#else + c->ritem = ITEM_data(it); +#endif + c->rlbytes = vlen; + conn_set_state(c, conn_nread); + c->substate = bin_read_set_value; +} + +static void process_bin_append_prepend(conn *c) { + char *key; + int nkey; + int vlen; + item *it; + + assert(c != NULL); + + key = binary_get_key(c); + nkey = c->binary_header.request.keylen; + vlen = c->binary_header.request.bodylen - nkey; + + if (settings.verbose > 1) { + fprintf(stderr, "Value len is %d\n", vlen); + } + + if (settings.detail_enabled) { + stats_prefix_record_set(key, nkey); + } + + it = item_alloc(key, nkey, 0, 0, vlen+2); + + if (it == 0) { + if (! item_size_ok(nkey, 0, vlen + 2)) { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, NULL, vlen); + } else { + out_of_memory(c, "SERVER_ERROR Out of memory allocating item"); + /* OOM calls eat the swallow value. Add here. */ + c->sbytes = vlen; + } + /* swallow the data line */ + conn_set_state(c, conn_swallow); + return; + } + + ITEM_set_cas(it, c->binary_header.request.cas); + + switch (c->cmd) { + case PROTOCOL_BINARY_CMD_APPEND: + c->cmd = NREAD_APPEND; + break; + case PROTOCOL_BINARY_CMD_PREPEND: + c->cmd = NREAD_PREPEND; + break; + default: + assert(0); + } + + c->item = it; +#ifdef NEED_ALIGN + if (it->it_flags & ITEM_CHUNKED) { + c->ritem = ITEM_schunk(it); + } else { + c->ritem = ITEM_data(it); + } +#else + c->ritem = ITEM_data(it); +#endif + c->rlbytes = vlen; + conn_set_state(c, conn_nread); + c->substate = bin_read_set_value; +} + +static void process_bin_flush(conn *c, char *extbuf) { + time_t exptime = 0; + protocol_binary_request_flush* req = (void *)extbuf; + rel_time_t new_oldest = 0; + + if (!settings.flush_enabled) { + // flush_all is not allowed but we log it on stats + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, NULL, 0); + return; + } + + if (c->binary_header.request.extlen == sizeof(req->message.body)) { + exptime = ntohl(req->message.body.expiration); + } + + if (exptime > 0) { + new_oldest = realtime(exptime); + } else { + new_oldest = current_time; + } + if (settings.use_cas) { + settings.oldest_live = new_oldest - 1; + if (settings.oldest_live <= current_time) + settings.oldest_cas = get_cas_id(); + } else { + settings.oldest_live = new_oldest; + } + + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.flush_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + write_bin_response(c, NULL, 0, 0, 0); +} + +static void process_bin_delete(conn *c) { + item *it; + uint32_t hv; + + char* key = binary_get_key(c); + size_t nkey = c->binary_header.request.keylen; + + assert(c != NULL); + + if (settings.verbose > 1) { + int ii; + fprintf(stderr, "Deleting "); + for (ii = 0; ii < nkey; ++ii) { + fprintf(stderr, "%c", key[ii]); + } + fprintf(stderr, "\n"); + } + + if (settings.detail_enabled) { + stats_prefix_record_delete(key, nkey); + } + + it = item_get_locked(key, nkey, c, DONT_UPDATE, &hv); + if (it) { + uint64_t cas = c->binary_header.request.cas; + if (cas == 0 || cas == ITEM_get_cas(it)) { + MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++; + pthread_mutex_unlock(&c->thread->stats.mutex); + do_item_unlink(it, hv); + STORAGE_delete(c->thread->storage, it); + write_bin_response(c, NULL, 0, 0, 0); + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0); + } + do_item_remove(it); /* release our reference */ + } else { + write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0); + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.delete_misses++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } + item_unlock(hv); +} + + diff --git a/proto_bin.h b/proto_bin.h new file mode 100644 index 0000000..4f4e004 --- /dev/null +++ b/proto_bin.h @@ -0,0 +1,10 @@ +#ifndef PROTO_BIN_H +#define PROTO_BIN_H + +/* binary protocol handlers */ +int try_read_command_binary(conn *c); +void complete_nread_binary(conn *c); +void write_bin_error(conn *c, protocol_binary_response_status err, + const char *errstr, int swallow); + +#endif diff --git a/proto_text.c b/proto_text.c index 6cbed46..509619a 100644 --- a/proto_text.c +++ b/proto_text.c @@ -10,12 +10,151 @@ #include "tls.h" #endif #include <string.h> +#include <stdlib.h> + +static void process_command(conn *c, char *command); typedef struct token_s { char *value; size_t length; } token_t; +/* + * we get here after reading the value in set/add/replace commands. The command + * has been stored in c->cmd, and the item is ready in c->item. + */ +void complete_nread_ascii(conn *c) { + assert(c != NULL); + + item *it = c->item; + int comm = c->cmd; + enum store_item_type ret; + bool is_valid = false; + + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + + if ((it->it_flags & ITEM_CHUNKED) == 0) { + if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) { + is_valid = true; + } + } else { + char buf[2]; + /* should point to the final item chunk */ + item_chunk *ch = (item_chunk *) c->ritem; + assert(ch->used != 0); + /* :( We need to look at the last two bytes. This could span two + * chunks. + */ + if (ch->used > 1) { + buf[0] = ch->data[ch->used - 2]; + buf[1] = ch->data[ch->used - 1]; + } else { + assert(ch->prev); + assert(ch->used == 1); + buf[0] = ch->prev->data[ch->prev->used - 1]; + buf[1] = ch->data[ch->used - 1]; + } + if (strncmp(buf, "\r\n", 2) == 0) { + is_valid = true; + } else { + assert(1 == 0); + } + } + + if (!is_valid) { + // metaset mode always returns errors. + if (c->mset_res) { + c->noreply = false; + } + out_string(c, "CLIENT_ERROR bad data chunk"); + } else { + ret = store_item(it, comm, c); + +#ifdef ENABLE_DTRACE + uint64_t cas = ITEM_get_cas(it); + switch (c->cmd) { + case NREAD_ADD: + MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey, + (ret == 1) ? it->nbytes : -1, cas); + break; + case NREAD_REPLACE: + MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey, + (ret == 1) ? it->nbytes : -1, cas); + break; + case NREAD_APPEND: + MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey, + (ret == 1) ? it->nbytes : -1, cas); + break; + case NREAD_PREPEND: + MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey, + (ret == 1) ? it->nbytes : -1, cas); + break; + case NREAD_SET: + MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey, + (ret == 1) ? it->nbytes : -1, cas); + break; + case NREAD_CAS: + MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes, + cas); + break; + } +#endif + + if (c->mset_res) { + // Replace the status code in the response. + // Rest was prepared during mset parsing. + mc_resp *resp = c->resp; + conn_set_state(c, conn_new_cmd); + switch (ret) { + case STORED: + memcpy(resp->wbuf, "OK ", 3); + // Only place noreply is used for meta cmds is a nominal response. + if (c->noreply) { + resp->skip = true; + } + break; + case EXISTS: + memcpy(resp->wbuf, "EX ", 3); + break; + case NOT_FOUND: + memcpy(resp->wbuf, "NF ", 3); + break; + case NOT_STORED: + memcpy(resp->wbuf, "NS ", 3); + break; + default: + c->noreply = false; + out_string(c, "SERVER_ERROR Unhandled storage type."); + } + } else { + switch (ret) { + case STORED: + out_string(c, "STORED"); + break; + case EXISTS: + out_string(c, "EXISTS"); + break; + case NOT_FOUND: + out_string(c, "NOT_FOUND"); + break; + case NOT_STORED: + out_string(c, "NOT_STORED"); + break; + default: + out_string(c, "SERVER_ERROR Unhandled storage type."); + } + } + + } + + c->set_stale = false; /* force flag to be off just in case */ + c->mset_res = false; + item_remove(c->item); /* release the c->item reference */ + c->item = 0; +} + #define COMMAND_TOKEN 0 #define SUBCOMMAND_TOKEN 1 #define KEY_TOKEN 1 @@ -2392,7 +2531,7 @@ static void process_refresh_certs_command(conn *c, token_t *tokens, const size_t // we can't drop out and back in again. // Leaving this note here to spend more time on a fix when necessary, or if an // opportunity becomes obvious. -void process_command(conn *c, char *command) { +static void process_command(conn *c, char *command) { token_t tokens[MAX_TOKENS]; size_t ntokens; diff --git a/proto_text.h b/proto_text.h index a06cf15..b626f97 100644 --- a/proto_text.h +++ b/proto_text.h @@ -2,7 +2,7 @@ #define PROTO_TEXT_H /* text protocol handlers */ -void process_command(conn *c, char *command); +void complete_nread_ascii(conn *c); int try_read_command_asciiauth(conn *c); int try_read_command_ascii(conn *c); |