diff options
author | Salvatore Sanfilippo <antirez@gmail.com> | 2019-09-27 11:24:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-27 11:24:06 +0200 |
commit | 61297585584f4e4bda3904d678ce001ca8e70758 (patch) | |
tree | 0e64cca1223e35b2ae3d29c9fcd8138b85b986cf /src | |
parent | 83e87bac762c84f2a2e9ee6922d038ee09ae9cd4 (diff) | |
parent | fddc4757c8c585d384889c1c7efba1ccf2121e6b (diff) | |
download | redis-61297585584f4e4bda3904d678ce001ca8e70758.tar.gz |
Merge branch 'unstable' into modules_fork
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/acl.c | 40 | ||||
-rw-r--r-- | src/aof.c | 10 | ||||
-rw-r--r-- | src/blocked.c | 405 | ||||
-rw-r--r-- | src/cluster.c | 22 | ||||
-rw-r--r-- | src/config.c | 28 | ||||
-rw-r--r-- | src/db.c | 10 | ||||
-rw-r--r-- | src/debug.c | 32 | ||||
-rw-r--r-- | src/expire.c | 2 | ||||
-rw-r--r-- | src/hyperloglog.c | 6 | ||||
-rw-r--r-- | src/latency.c | 4 | ||||
-rw-r--r-- | src/lolwut.c | 3 | ||||
-rw-r--r-- | src/lolwut5.c | 3 | ||||
-rw-r--r-- | src/module.c | 186 | ||||
-rw-r--r-- | src/multi.c | 14 | ||||
-rw-r--r-- | src/networking.c | 34 | ||||
-rw-r--r-- | src/object.c | 19 | ||||
-rw-r--r-- | src/rdb.c | 264 | ||||
-rw-r--r-- | src/rdb.h | 1 | ||||
-rw-r--r-- | src/redis-benchmark.c | 15 | ||||
-rw-r--r-- | src/redis-check-rdb.c | 6 | ||||
-rw-r--r-- | src/redis-cli.c | 108 | ||||
-rw-r--r-- | src/redismodule.h | 22 | ||||
-rw-r--r-- | src/replication.c | 51 | ||||
-rw-r--r-- | src/rio.c | 4 | ||||
-rw-r--r-- | src/rio.h | 35 | ||||
-rw-r--r-- | src/scripting.c | 254 | ||||
-rw-r--r-- | src/server.c | 63 | ||||
-rw-r--r-- | src/server.h | 72 | ||||
-rw-r--r-- | src/sha256.c | 158 | ||||
-rw-r--r-- | src/sha256.h | 35 | ||||
-rw-r--r-- | src/siphash.c | 3 | ||||
-rw-r--r-- | src/stream.h | 1 | ||||
-rw-r--r-- | src/t_hash.c | 4 | ||||
-rw-r--r-- | src/t_list.c | 6 | ||||
-rw-r--r-- | src/t_set.c | 10 | ||||
-rw-r--r-- | src/t_zset.c | 23 | ||||
-rw-r--r-- | src/tracking.c | 207 | ||||
-rw-r--r-- | src/zmalloc.c | 20 |
39 files changed, 1692 insertions, 490 deletions
diff --git a/src/Makefile b/src/Makefile index b6cc69e2f..198d85cd5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -164,7 +164,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o sha256.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark @@ -28,6 +28,7 @@ */ #include "server.h" +#include "sha256.h" #include <fcntl.h> /* ============================================================================= @@ -139,6 +140,25 @@ int time_independent_strcmp(char *a, char *b) { return diff; /* If zero strings are the same. */ } +/* Given an SDS string, returns the SHA256 hex representation as a + * new SDS string. */ +sds ACLHashPassword(unsigned char *cleartext, size_t len) { + SHA256_CTX ctx; + unsigned char hash[SHA256_BLOCK_SIZE]; + char hex[SHA256_BLOCK_SIZE*2]; + char *cset = "0123456789abcdef"; + + sha256_init(&ctx); + sha256_update(&ctx,(unsigned char*)cleartext,len); + sha256_final(&ctx,hash); + + for (int j = 0; j < SHA256_BLOCK_SIZE; j++) { + hex[j*2] = cset[((hash[j]&0xF0)>>4)]; + hex[j*2+1] = cset[(hash[j]&0xF)]; + } + return sdsnewlen(hex,SHA256_BLOCK_SIZE*2); +} + /* ============================================================================= * Low level ACL API * ==========================================================================*/ @@ -701,13 +721,16 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { u->flags &= ~USER_FLAG_NOPASS; listEmpty(u->passwords); } else if (op[0] == '>') { - sds newpass = sdsnewlen(op+1,oplen-1); + sds newpass = ACLHashPassword((unsigned char*)op+1,oplen-1); listNode *ln = listSearchKey(u->passwords,newpass); /* Avoid re-adding the same password multiple times. */ - if (ln == NULL) listAddNodeTail(u->passwords,newpass); + if (ln == NULL) + listAddNodeTail(u->passwords,newpass); + else + sdsfree(newpass); u->flags &= ~USER_FLAG_NOPASS; } else if (op[0] == '<') { - sds delpass = sdsnewlen(op+1,oplen-1); + sds delpass = ACLHashPassword((unsigned char*)op+1,oplen-1); listNode *ln = listSearchKey(u->passwords,delpass); sdsfree(delpass); if (ln) { @@ -724,7 +747,10 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { sds newpat = sdsnewlen(op+1,oplen-1); listNode *ln = listSearchKey(u->patterns,newpat); /* Avoid re-adding the same pattern multiple times. */ - if (ln == NULL) listAddNodeTail(u->patterns,newpat); + if (ln == NULL) + listAddNodeTail(u->patterns,newpat); + else + sdsfree(newpat); u->flags &= ~USER_FLAG_ALLKEYS; } else if (op[0] == '+' && op[1] != '@') { if (strchr(op,'|') == NULL) { @@ -879,11 +905,15 @@ int ACLCheckUserCredentials(robj *username, robj *password) { listIter li; listNode *ln; listRewind(u->passwords,&li); + sds hashed = ACLHashPassword(password->ptr,sdslen(password->ptr)); while((ln = listNext(&li))) { sds thispass = listNodeValue(ln); - if (!time_independent_strcmp(password->ptr, thispass)) + if (!time_independent_strcmp(hashed, thispass)) { + sdsfree(hashed); return C_OK; + } } + sdsfree(hashed); /* If we reached this point, no password matched. */ errno = EINVAL; @@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { nwritten = write(fd, buf, len); if (nwritten < 0) { - if (errno == EINTR) { - continue; - } + if (errno == EINTR) continue; return totwritten ? totwritten : -1; } @@ -863,6 +861,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ if (!feof(fp)) { if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ + fclose(fp); serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); exit(1); } @@ -893,11 +892,13 @@ uxeof: /* Unexpected AOF end of file. */ } } if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ + fclose(fp); serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server."); exit(1); fmterr: /* Format error. */ if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ + fclose(fp); serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>"); exit(1); } @@ -1612,7 +1613,8 @@ void bgrewriteaofCommand(client *c) { } else if (rewriteAppendOnlyFileBackground() == C_OK) { addReplyStatus(c,"Background append only file rewriting started"); } else { - addReply(c,shared.err); + addReplyError(c,"Can't execute an AOF background rewriting. " + "Please check the server logs for more information."); } } diff --git a/src/blocked.c b/src/blocked.c index 1db657869..867f03de6 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -229,6 +229,207 @@ void disconnectAllBlockedClients(void) { } } +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * when there may be clients blocked on a list key, and there may be new + * data to fetch (the key is ready). */ +void serveClientsBlockedOnListKey(robj *o, readyList *rl) { + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (receiver->btype != BLOCKED_LIST) { + /* Put at the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } + + robj *dstkey = receiver->bpop.target; + int where = (receiver->lastcmd && + receiver->lastcmd->proc == blpopCommand) ? + LIST_HEAD : LIST_TAIL; + robj *value = listTypePop(o,where); + + if (value) { + /* Protect receiver->bpop.target, that will be + * freed by the next unblockClient() + * call. */ + if (dstkey) incrRefCount(dstkey); + unblockClient(receiver); + + if (serveClientBlockedOnList(receiver, + rl->key,dstkey,rl->db,value, + where) == C_ERR) + { + /* If we failed serving the client we need + * to also undo the POP operation. */ + listTypePush(o,value,where); + } + + if (dstkey) decrRefCount(dstkey); + decrRefCount(value); + } else { + break; + } + } + } + + if (listTypeLength(o) == 0) { + dbDelete(rl->db,rl->key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id); + } + /* We don't call signalModifiedKey() as it was already called + * when an element was pushed on the list. */ +} + +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * when there may be clients blocked on a sorted set key, and there may be new + * data to fetch (the key is ready). */ +void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + unsigned long zcard = zsetLength(o); + + while(numclients-- && zcard) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (receiver->btype != BLOCKED_ZSET) { + /* Put at the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } + + int where = (receiver->lastcmd && + receiver->lastcmd->proc == bzpopminCommand) + ? ZSET_MIN : ZSET_MAX; + unblockClient(receiver); + genericZpopCommand(receiver,&rl->key,1,where,1,NULL); + zcard--; + + /* Replicate the command. */ + robj *argv[2]; + struct redisCommand *cmd = where == ZSET_MIN ? + server.zpopminCommand : + server.zpopmaxCommand; + argv[0] = createStringObject(cmd->name,strlen(cmd->name)); + argv[1] = rl->key; + incrRefCount(rl->key); + propagate(cmd,receiver->db->id, + argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + } + } +} + +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * when there may be clients blocked on a stream key, and there may be new + * data to fetch (the key is ready). */ +void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + stream *s = o->ptr; + + /* We need to provide the new data arrived on the stream + * to all the clients that are waiting for an offset smaller + * than the current top item. */ + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_STREAM) continue; + streamID *gt = dictFetchValue(receiver->bpop.keys, + rl->key); + + /* If we blocked in the context of a consumer + * group, we need to resolve the group and update the + * last ID the client is blocked for: this is needed + * because serving other clients in the same consumer + * group will alter the "last ID" of the consumer + * group, and clients blocked in a consumer group are + * always blocked for the ">" ID: we need to deliver + * only new messages and avoid unblocking the client + * otherwise. */ + streamCG *group = NULL; + if (receiver->bpop.xread_group) { + group = streamLookupCG(s, + receiver->bpop.xread_group->ptr); + /* If the group was not found, send an error + * to the consumer. */ + if (!group) { + addReplyError(receiver, + "-NOGROUP the consumer group this client " + "was blocked on no longer exists"); + unblockClient(receiver); + continue; + } else { + *gt = group->last_id; + } + } + + if (streamCompareID(&s->last_id, gt) > 0) { + streamID start = *gt; + start.seq++; /* Can't overflow, it's an uint64_t */ + + /* Lookup the consumer for the group, if any. */ + streamConsumer *consumer = NULL; + int noack = 0; + + if (group) { + consumer = streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + 1); + noack = receiver->bpop.xread_group_noack; + } + + /* Emit the two elements sub-array consisting of + * the name of the stream and the data we + * extracted from it. Wrapped in a single-item + * array, since we have just one key. */ + if (receiver->resp == 2) { + addReplyArrayLen(receiver,1); + addReplyArrayLen(receiver,2); + } else { + addReplyMapLen(receiver,1); + } + addReplyBulk(receiver,rl->key); + + streamPropInfo pi = { + rl->key, + receiver->bpop.xread_group + }; + streamReplyWithRange(receiver,s,&start,NULL, + receiver->bpop.xread_count, + 0, group, consumer, noack, &pi); + + /* Note that after we unblock the client, 'gt' + * and other receiver->bpop stuff are no longer + * valid, so we must do the setup above before + * this call. */ + unblockClient(receiver); + } + } + } +} + /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. It handles serving clients blocked in @@ -271,202 +472,14 @@ void handleClientsBlockedOnKeys(void) { /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); - if (o != NULL && o->type == OBJ_LIST) { - dictEntry *de; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - int numclients = listLength(clients); - - while(numclients--) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; - - if (receiver->btype != BLOCKED_LIST) { - /* Put at the tail, so that at the next call - * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); - continue; - } - - robj *dstkey = receiver->bpop.target; - int where = (receiver->lastcmd && - receiver->lastcmd->proc == blpopCommand) ? - LIST_HEAD : LIST_TAIL; - robj *value = listTypePop(o,where); - - if (value) { - /* Protect receiver->bpop.target, that will be - * freed by the next unblockClient() - * call. */ - if (dstkey) incrRefCount(dstkey); - unblockClient(receiver); - - if (serveClientBlockedOnList(receiver, - rl->key,dstkey,rl->db,value, - where) == C_ERR) - { - /* If we failed serving the client we need - * to also undo the POP operation. */ - listTypePush(o,value,where); - } - - if (dstkey) decrRefCount(dstkey); - decrRefCount(value); - } else { - break; - } - } - } - - if (listTypeLength(o) == 0) { - dbDelete(rl->db,rl->key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id); - } - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - } - /* Serve clients blocked on sorted set key. */ - else if (o != NULL && o->type == OBJ_ZSET) { - dictEntry *de; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - int numclients = listLength(clients); - unsigned long zcard = zsetLength(o); - - while(numclients-- && zcard) { - listNode *clientnode = listFirst(clients); - client *receiver = clientnode->value; - - if (receiver->btype != BLOCKED_ZSET) { - /* Put at the tail, so that at the next call - * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); - continue; - } - - int where = (receiver->lastcmd && - receiver->lastcmd->proc == bzpopminCommand) - ? ZSET_MIN : ZSET_MAX; - unblockClient(receiver); - genericZpopCommand(receiver,&rl->key,1,where,1,NULL); - zcard--; - - /* Replicate the command. */ - robj *argv[2]; - struct redisCommand *cmd = where == ZSET_MIN ? - server.zpopminCommand : - server.zpopmaxCommand; - argv[0] = createStringObject(cmd->name,strlen(cmd->name)); - argv[1] = rl->key; - incrRefCount(rl->key); - propagate(cmd,receiver->db->id, - argv,2,PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - } - } - } - - /* Serve clients blocked on stream key. */ - else if (o != NULL && o->type == OBJ_STREAM) { - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - stream *s = o->ptr; - - /* We need to provide the new data arrived on the stream - * to all the clients that are waiting for an offset smaller - * than the current top item. */ - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_STREAM) continue; - streamID *gt = dictFetchValue(receiver->bpop.keys, - rl->key); - - /* If we blocked in the context of a consumer - * group, we need to resolve the group and update the - * last ID the client is blocked for: this is needed - * because serving other clients in the same consumer - * group will alter the "last ID" of the consumer - * group, and clients blocked in a consumer group are - * always blocked for the ">" ID: we need to deliver - * only new messages and avoid unblocking the client - * otherwise. */ - streamCG *group = NULL; - if (receiver->bpop.xread_group) { - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* If the group was not found, send an error - * to the consumer. */ - if (!group) { - addReplyError(receiver, - "-NOGROUP the consumer group this client " - "was blocked on no longer exists"); - unblockClient(receiver); - continue; - } else { - *gt = group->last_id; - } - } - - if (streamCompareID(&s->last_id, gt) > 0) { - streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ - - /* Lookup the consumer for the group, if any. */ - streamConsumer *consumer = NULL; - int noack = 0; - - if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); - noack = receiver->bpop.xread_group_noack; - } - - /* Emit the two elements sub-array consisting of - * the name of the stream and the data we - * extracted from it. Wrapped in a single-item - * array, since we have just one key. */ - if (receiver->resp == 2) { - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); - } else { - addReplyMapLen(receiver,1); - } - addReplyBulk(receiver,rl->key); - - streamPropInfo pi = { - rl->key, - receiver->bpop.xread_group - }; - streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count, - 0, group, consumer, noack, &pi); - - /* Note that after we unblock the client, 'gt' - * and other receiver->bpop stuff are no longer - * valid, so we must do the setup above before - * this call. */ - unblockClient(receiver); - } - } - } + if (o != NULL) { + if (o->type == OBJ_LIST) + serveClientsBlockedOnListKey(o,rl); + else if (o->type == OBJ_ZSET) + serveClientsBlockedOnSortedSetKey(o,rl); + else if (o->type == OBJ_STREAM) + serveClientsBlockedOnStreamKey(o,rl); } /* Free this item. */ @@ -592,7 +605,7 @@ void unblockClientWaitingData(client *c) { * the same key again and again in the list in case of multiple pushes * made by a script or in the context of MULTI/EXEC. * - * The list will be finally processed by handleClientsBlockedOnLists() */ + * The list will be finally processed by handleClientsBlockedOnKeys() */ void signalKeyAsReady(redisDb *db, robj *key) { readyList *rl; diff --git a/src/cluster.c b/src/cluster.c index c85e3791d..1e7dcd50e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -138,6 +138,7 @@ int clusterLoadConfig(char *filename) { /* Handle the special "vars" line. Don't pretend it is the last * line even if it actually is when generated by Redis. */ if (strcasecmp(argv[0],"vars") == 0) { + if (!(argc % 2)) goto fmterr; for (j = 1; j < argc; j += 2) { if (strcasecmp(argv[j],"currentEpoch") == 0) { server.cluster->currentEpoch = @@ -4251,12 +4252,9 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { /* CLUSTER NODES */ - robj *o; - sds ci = clusterGenNodesDescription(0); - - o = createObject(OBJ_STRING,ci); - addReplyBulk(c,o); - decrRefCount(o); + sds nodes = clusterGenNodesDescription(0); + addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); + sdsfree(nodes); } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { /* CLUSTER MYID */ addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN); @@ -4498,10 +4496,8 @@ NULL "cluster_stats_messages_received:%lld\r\n", tot_msg_received); /* Produce the reply protocol. */ - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", - (unsigned long)sdslen(info))); - addReplySds(c,info); - addReply(c,shared.crlf); + addReplyVerbatim(c,info,sdslen(info),"txt"); + sdsfree(info); } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { int retval = clusterSaveConfig(1); @@ -4832,7 +4828,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) { * DUMP is actually not used by Redis Cluster but it is the obvious * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { - robj *o, *dumpobj; + robj *o; rio payload; /* Check if the key is here. */ @@ -4845,9 +4841,7 @@ void dumpCommand(client *c) { createDumpPayload(&payload,o,c->argv[1]); /* Transfer to the client */ - dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); - addReplyBulk(c,dumpobj); - decrRefCount(dumpobj); + addReplyBulkSds(c,payload.io.buffer.ptr); return; } diff --git a/src/config.c b/src/config.c index fde00ddf5..d37e3c566 100644 --- a/src/config.c +++ b/src/config.c @@ -672,6 +672,10 @@ void loadServerConfigFromString(char *config) { server.lua_time_limit = strtoll(argv[1],NULL,10); } else if (!strcasecmp(argv[0],"lua-replicate-commands") && argc == 2) { server.lua_always_replicate_commands = yesnotoi(argv[1]); + if (server.lua_always_replicate_commands == -1) { + err = "argument must be 'yes' or 'no'"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"slowlog-log-slower-than") && argc == 2) { @@ -686,6 +690,17 @@ void loadServerConfigFromString(char *config) { } } else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) { server.slowlog_max_len = strtoll(argv[1],NULL,10); + } else if (!strcasecmp(argv[0],"tracking-table-max-fill") && + argc == 2) + { + server.tracking_table_max_fill = strtoll(argv[1],NULL,10); + if (server.tracking_table_max_fill > 100 || + server.tracking_table_max_fill < 0) + { + err = "The tracking table fill percentage must be an " + "integer between 0 and 100"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"client-output-buffer-limit") && argc == 5) { @@ -1134,6 +1149,8 @@ void configSetCommand(client *c) { /* Cast to unsigned. */ server.slowlog_max_len = (unsigned long)ll; } config_set_numerical_field( + "tracking-table-max-fill",server.tracking_table_max_fill,0,100) { + } config_set_numerical_field( "latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){ } config_set_numerical_field( "repl-ping-slave-period",server.repl_ping_slave_period,1,INT_MAX) { @@ -1338,8 +1355,8 @@ void configGetCommand(client *c) { server.slowlog_log_slower_than); config_get_numerical_field("latency-monitor-threshold", server.latency_monitor_threshold); - config_get_numerical_field("slowlog-max-len", - server.slowlog_max_len); + config_get_numerical_field("slowlog-max-len", server.slowlog_max_len); + config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill); config_get_numerical_field("port",server.port); config_get_numerical_field("cluster-announce-port",server.cluster_announce_port); config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port); @@ -1470,12 +1487,10 @@ void configGetCommand(client *c) { matches++; } if (stringmatch(pattern,"notify-keyspace-events",1)) { - robj *flagsobj = createObject(OBJ_STRING, - keyspaceEventsFlagsToString(server.notify_keyspace_events)); + sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events); addReplyBulkCString(c,"notify-keyspace-events"); - addReplyBulk(c,flagsobj); - decrRefCount(flagsobj); + addReplyBulkSds(c,flags); matches++; } if (stringmatch(pattern,"bind",1)) { @@ -2167,6 +2182,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN); rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD); rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN); + rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL); rewriteConfigNotifykeyspaceeventsOption(state); rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE); @@ -350,6 +350,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( return -1; } + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + int startdb, enddb; if (dbnum == -1) { startdb = 0; @@ -409,11 +414,12 @@ long long dbTotalServerKeyCount() { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); - if (server.tracking_clients) trackingInvalidateKey(key); + trackingInvalidateKey(key); } void signalFlushedDb(int dbid) { touchWatchedKeysOnFlush(dbid); + trackingInvalidateKeysOnFlush(dbid); } /*----------------------------------------------------------------------------- @@ -449,7 +455,6 @@ void flushdbCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(c->db->id); server.dirty += emptyDb(c->db->id,flags,NULL); addReply(c,shared.ok); } @@ -461,7 +466,6 @@ void flushallCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(-1); server.dirty += emptyDb(-1,flags,NULL); addReply(c,shared.ok); if (server.rdb_child_pid != -1) killRDBChild(); diff --git a/src/debug.c b/src/debug.c index 1f1157d4a..0d29165de 100644 --- a/src/debug.c +++ b/src/debug.c @@ -638,7 +638,8 @@ NULL dictGetStats(buf,sizeof(buf),server.db[dbid].expires); stats = sdscat(stats,buf); - addReplyBulkSds(c,stats); + addReplyVerbatim(c,stats,sdslen(stats),"txt"); + sdsfree(stats); } else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) { robj *o; dict *ht = NULL; @@ -665,7 +666,7 @@ NULL } else { char buf[4096]; dictGetStats(buf,sizeof(buf),ht); - addReplyBulkCString(c,buf); + addReplyVerbatim(c,buf,strlen(buf),"txt"); } } else if (!strcasecmp(c->argv[1]->ptr,"change-repl-id") && c->argc == 2) { serverLog(LL_WARNING,"Changing replication IDs after receiving DEBUG change-repl-id"); @@ -1110,6 +1111,33 @@ void logRegisters(ucontext_t *uc) { (unsigned long) uc->uc_mcontext.mc_cs ); logStackContent((void**)uc->uc_mcontext.mc_rsp); +#elif defined(__aarch64__) /* Linux AArch64 */ + serverLog(LL_WARNING, + "\n" + "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n" + "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n" + "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n" + "X30:%016lx\n" + "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n", + (unsigned long) uc->uc_mcontext.regs[18], + (unsigned long) uc->uc_mcontext.regs[19], + (unsigned long) uc->uc_mcontext.regs[20], + (unsigned long) uc->uc_mcontext.regs[21], + (unsigned long) uc->uc_mcontext.regs[22], + (unsigned long) uc->uc_mcontext.regs[23], + (unsigned long) uc->uc_mcontext.regs[24], + (unsigned long) uc->uc_mcontext.regs[25], + (unsigned long) uc->uc_mcontext.regs[26], + (unsigned long) uc->uc_mcontext.regs[27], + (unsigned long) uc->uc_mcontext.regs[28], + (unsigned long) uc->uc_mcontext.regs[29], + (unsigned long) uc->uc_mcontext.regs[30], + (unsigned long) uc->uc_mcontext.pc, + (unsigned long) uc->uc_mcontext.sp, + (unsigned long) uc->uc_mcontext.pstate, + (unsigned long) uc->uc_mcontext.fault_address + ); + logStackContent((void**)uc->uc_mcontext.sp); #else serverLog(LL_WARNING, " Dumping of registers not supported for this OS/arch"); diff --git a/src/expire.c b/src/expire.c index b23117a3c..598b27f96 100644 --- a/src/expire.c +++ b/src/expire.c @@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); - if (server.tracking_clients) trackingInvalidateKey(keyobj); + trackingInvalidateKey(keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; diff --git a/src/hyperloglog.c b/src/hyperloglog.c index e01ea6042..a44d15646 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -700,7 +700,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) { p += oplen; first += span; } - if (span == 0) return -1; /* Invalid format. */ + if (span == 0 || p >= end) return -1; /* Invalid format. */ next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1; if (next >= end) next = NULL; @@ -1242,7 +1242,7 @@ void pfcountCommand(client *c) { if (o == NULL) continue; /* Assume empty HLL for non existing var.*/ if (isHLLObjectOrReply(c,o) != C_OK) return; - /* Merge with this HLL with our 'max' HHL by setting max[i] + /* Merge with this HLL with our 'max' HLL by setting max[i] * to MAX(max[i],hll[i]). */ if (hllMerge(registers,o) == C_ERR) { addReplySds(c,sdsnew(invalid_hll_err)); @@ -1329,7 +1329,7 @@ void pfmergeCommand(client *c) { hdr = o->ptr; if (hdr->encoding == HLL_DENSE) use_dense = 1; - /* Merge with this HLL with our 'max' HHL by setting max[i] + /* Merge with this HLL with our 'max' HLL by setting max[i] * to MAX(max[i],hll[i]). */ if (hllMerge(max,o) == C_ERR) { addReplySds(c,sdsnew(invalid_hll_err)); diff --git a/src/latency.c b/src/latency.c index 33aa1245b..b834da5c7 100644 --- a/src/latency.c +++ b/src/latency.c @@ -599,7 +599,7 @@ NULL event = dictGetKey(de); graph = latencyCommandGenSparkeline(event,ts); - addReplyBulkCString(c,graph); + addReplyVerbatim(c,graph,sdslen(graph),"txt"); sdsfree(graph); } else if (!strcasecmp(c->argv[1]->ptr,"latest") && c->argc == 2) { /* LATENCY LATEST */ @@ -608,7 +608,7 @@ NULL /* LATENCY DOCTOR */ sds report = createLatencyReport(); - addReplyBulkCBuffer(c,report,sdslen(report)); + addReplyVerbatim(c,report,sdslen(report),"txt"); sdsfree(report); } else if (!strcasecmp(c->argv[1]->ptr,"reset") && c->argc >= 2) { /* LATENCY RESET */ diff --git a/src/lolwut.c b/src/lolwut.c index 19cbcf642..ba7e1069e 100644 --- a/src/lolwut.c +++ b/src/lolwut.c @@ -43,7 +43,8 @@ void lolwutUnstableCommand(client *c) { sds rendered = sdsnew("Redis ver. "); rendered = sdscat(rendered,REDIS_VERSION); rendered = sdscatlen(rendered,"\n",1); - addReplyBulkSds(c,rendered); + addReplyVerbatim(c,rendered,sdslen(rendered),"txt"); + sdsfree(rendered); } void lolwutCommand(client *c) { diff --git a/src/lolwut5.c b/src/lolwut5.c index 8408b378d..52a98c0d7 100644 --- a/src/lolwut5.c +++ b/src/lolwut5.c @@ -277,6 +277,7 @@ void lolwut5Command(client *c) { "\nGeorg Nees - schotter, plotter on paper, 1968. Redis ver. "); rendered = sdscat(rendered,REDIS_VERSION); rendered = sdscatlen(rendered,"\n",1); - addReplyBulkSds(c,rendered); + addReplyVerbatim(c,rendered,sdslen(rendered),"txt"); + sdsfree(rendered); lwFreeCanvas(canvas); } diff --git a/src/module.c b/src/module.c index 6f3be61af..854989e73 100644 --- a/src/module.c +++ b/src/module.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "rdb.h" #include <dlfcn.h> #include <wait.h> @@ -52,6 +53,7 @@ struct RedisModule { list *using; /* List of modules we use some APIs of. */ list *filters; /* List of filters the module has registered. */ int in_call; /* RM_Call() nesting level */ + int options; /* Module options and capabilities. */ }; typedef struct RedisModule RedisModule; @@ -780,6 +782,19 @@ long long RM_Milliseconds(void) { return mstime(); } +/* Set flags defining capabilities or behavior bit flags. + * + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS: + * Generally, modules don't need to bother with this, as the process will just + * terminate if a read error happens, however, setting this flag would allow + * repl-diskless-load to work if enabled. + * The module should use RedisModule_IsIOError after reads, before using the + * data that was read, and in case of error, propagate it upwards, and also be + * able to release the partially populated value and all it's allocations. */ +void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { + ctx->module->options = options; +} + /* -------------------------------------------------------------------------- * Automatic memory management for modules * -------------------------------------------------------------------------- */ @@ -2397,7 +2412,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * * REDISMODULE_HASH_EXISTS: instead of setting the value of the field * expecting a RedisModuleString pointer to pointer, the function just - * reports if the field esists or not and expects an integer pointer + * reports if the field exists or not and expects an integer pointer * as the second element of each pair. * * Example of REDISMODULE_HASH_CFIELD: @@ -3087,6 +3102,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + struct { + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; + } v2; } *tms = (struct typemethods*) typemethods_ptr; moduleType *mt = zcalloc(sizeof(*mt)); @@ -3098,6 +3118,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->mem_usage = tms->mem_usage; mt->digest = tms->digest; mt->free = tms->free; + if (tms->version >= 2) { + mt->aux_load = tms->v2.aux_load; + mt->aux_save = tms->v2.aux_save; + mt->aux_save_triggers = tms->v2.aux_save_triggers; + } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); return mt; @@ -3148,9 +3173,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) { * RDB loading and saving functions * -------------------------------------------------------------------------- */ -/* Called when there is a load error in the context of a module. This cannot - * be recovered like for the built-in types. */ +/* Called when there is a load error in the context of a module. On some + * modules this cannot be recovered, but if the module declared capability + * to handle errors, we'll raise a flag rather than exiting. */ void moduleRDBLoadError(RedisModuleIO *io) { + if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) { + io->error = 1; + return; + } serverLog(LL_WARNING, "Error loading data from RDB (short read or EOF). " "Read performed by module '%s' about type '%s' " @@ -3161,6 +3191,33 @@ void moduleRDBLoadError(RedisModuleIO *io) { exit(1); } +/* Returns 0 if there's at least one registered data type that did not declare + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should + * be avoided since it could cause data loss. */ +int moduleAllDatatypesHandleErrors() { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (listLength(module->types) && + !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)) + { + dictReleaseIterator(di); + return 0; + } + } + dictReleaseIterator(di); + return 1; +} + +/* Returns true if any previous IO API failed. + * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with + * RediModule_SetModuleOptions first. */ +int RM_IsIOError(RedisModuleIO *io) { + return io->error; +} + /* Save an unsigned 64 bit value into the RDB file. This function should only * be called in the context of the rdb_save method of modules implementing new * data types. */ @@ -3184,6 +3241,7 @@ saveerr: * be called in the context of the rdb_load method of modules implementing * new data types. */ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr; @@ -3195,7 +3253,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */ @@ -3254,6 +3312,7 @@ saveerr: /* Implements RM_LoadString() and RM_LoadStringBuffer() */ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { + if (io->error) return NULL; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr; @@ -3265,7 +3324,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { loaderr: moduleRDBLoadError(io); - return NULL; /* Never reached. */ + return NULL; } /* In the context of the rdb_load method of a module data type, loads a string @@ -3286,7 +3345,7 @@ RedisModuleString *RM_LoadString(RedisModuleIO *io) { * RedisModule_Realloc() or RedisModule_Free(). * * The size of the string is stored at '*lenptr' if not NULL. - * The returned string is not automatically NULL termianted, it is loaded + * The returned string is not automatically NULL terminated, it is loaded * exactly as it was stored inisde the RDB file. */ char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) { return moduleLoadString(io,1,lenptr); @@ -3314,6 +3373,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * double value saved by RedisModule_SaveDouble(). */ double RM_LoadDouble(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr; @@ -3325,7 +3385,7 @@ double RM_LoadDouble(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* In the context of the rdb_save method of a module data type, saves a float @@ -3350,6 +3410,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * float value saved by RedisModule_SaveFloat(). */ float RM_LoadFloat(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr; @@ -3361,7 +3422,37 @@ float RM_LoadFloat(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; +} + +/* Iterate over modules, and trigger rdb aux saving for the ones modules types + * who asked for it. */ +ssize_t rdbSaveModulesAux(rio *rdb, int when) { + size_t total_written = 0; + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + listIter li; + listNode *ln; + + listRewind(module->types,&li); + while((ln = listNext(&li))) { + moduleType *mt = ln->value; + if (!mt->aux_save || !(mt->aux_save_triggers & when)) + continue; + ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt); + if (ret==-1) { + dictReleaseIterator(di); + return -1; + } + total_written += ret; + } + } + + dictReleaseIterator(di); + return total_written; } /* -------------------------------------------------------------------------- @@ -3524,7 +3615,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li if (level < server.verbosity) return; - name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); + name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module"); vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); serverLogRaw(level,msg); } @@ -3542,13 +3633,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li * There is a fixed limit to the length of the log line this function is able * to emit, this limit is not specified but is guaranteed to be more than * a few lines of text. + * + * The ctx argument may be NULL if cannot be provided in the context of the + * caller for instance threads or callbacks, in which case a generic "module" + * will be used instead of the module name. */ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { - if (!ctx->module) return; /* Can only log if module is initialized */ - va_list ap; va_start(ap, fmt); - RM_LogRaw(ctx->module,levelstr,fmt,ap); + RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap); va_end(ap); } @@ -3564,6 +3657,15 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ... va_end(ap); } +/* Redis-like assert function. + * + * A failed assertion will shut down the server and produce logging information + * that looks identical to information generated by Redis itself. + */ +void RM__Assert(const char *estr, const char *file, int line) { + _serverAssert(estr, file, line); +} + /* -------------------------------------------------------------------------- * Blocking clients from modules * -------------------------------------------------------------------------- */ @@ -5362,6 +5464,62 @@ void addReplyLoadedModules(client *c) { dictReleaseIterator(di); } +/* Helper for genModulesInfoString(): given a list of modules, return + * am SDS string in the form "[modulename|modulename2|...]" */ +sds genModulesInfoStringRenderModulesList(list *l) { + listIter li; + listNode *ln; + listRewind(l,&li); + sds output = sdsnew("["); + while((ln = listNext(&li))) { + RedisModule *module = ln->value; + output = sdscat(output,module->name); + } + output = sdstrim(output,"|"); + output = sdscat(output,"]"); + return output; +} + +/* Helper for genModulesInfoString(): render module options as an SDS string. */ +sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) { + sds output = sdsnew("["); + if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) + output = sdscat(output,"handle-io-errors|"); + output = sdstrim(output,"|"); + output = sdscat(output,"]"); + return output; +} + + +/* Helper function for the INFO command: adds loaded modules as to info's + * output. + * + * After the call, the passed sds info string is no longer valid and all the + * references must be substituted with the new pointer returned by the call. */ +sds genModulesInfoString(sds info) { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + sds name = dictGetKey(de); + struct RedisModule *module = dictGetVal(de); + + sds usedby = genModulesInfoStringRenderModulesList(module->usedby); + sds using = genModulesInfoStringRenderModulesList(module->using); + sds options = genModulesInfoStringRenderModuleOptions(module); + info = sdscatprintf(info, + "module:name=%s,ver=%d,api=%d,filters=%d," + "usedby=%s,using=%s,options=%s\r\n", + name, module->ver, module->apiver, + (int)listLength(module->filters), usedby, using, options); + sdsfree(usedby); + sdsfree(using); + sdsfree(options); + } + dictReleaseIterator(di); + return info; +} + /* Redis MODULE command. * * MODULE LOAD <path> [args...] */ @@ -5447,6 +5605,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ReplySetArrayLength); REGISTER_API(ReplyWithString); REGISTER_API(ReplyWithStringBuffer); + REGISTER_API(ReplyWithCString); REGISTER_API(ReplyWithNull); REGISTER_API(ReplyWithCallReply); REGISTER_API(ReplyWithDouble); @@ -5509,6 +5668,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ModuleTypeSetValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); + REGISTER_API(IsIOError); + REGISTER_API(SetModuleOptions); REGISTER_API(SaveUnsigned); REGISTER_API(LoadUnsigned); REGISTER_API(SaveSigned); @@ -5524,6 +5685,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); + REGISTER_API(_Assert); REGISTER_API(StringAppendBuffer); REGISTER_API(RetainString); REGISTER_API(StringCompare); diff --git a/src/multi.c b/src/multi.c index 71090d8ed..f885fa19c 100644 --- a/src/multi.c +++ b/src/multi.c @@ -175,7 +175,19 @@ void execCommand(client *c) { must_propagate = 1; } - call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int acl_retval = ACLCheckCommandPerm(c); + if (acl_retval != ACL_OK) { + addReplyErrorFormat(c, + "-NOPERM ACLs rules changed between the moment the " + "transaction was accumulated and the EXEC call. " + "This command is no longer allowed for the " + "following reason: %s", + (acl_retval == ACL_DENIED_CMD) ? + "no permission to execute the command or subcommand" : + "no permission to touch the specified keys"); + } else { + call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); + } /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; diff --git a/src/networking.c b/src/networking.c index 7976caf29..a959d557a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1990,7 +1990,7 @@ NULL return; } sds o = getAllClientsInfoString(type); - addReplyBulkCBuffer(c,o,sdslen(o)); + addReplyVerbatim(c,o,sdslen(o),"txt"); sdsfree(o); } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) { /* CLIENT REPLY ON|OFF|SKIP */ @@ -2468,17 +2468,27 @@ void flushSlavesOutputBuffers(void) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = listNodeValue(ln); - int events; - - /* Note that the following will not flush output buffers of slaves - * in STATE_ONLINE but having put_online_on_ack set to true: in this - * case the writable event is never installed, since the purpose - * of put_online_on_ack is to postpone the moment it is installed. - * This is what we want since slaves in this state should not receive - * writes before the first ACK. */ - events = aeGetFileEvents(server.el,slave->fd); - if (events & AE_WRITABLE && - slave->replstate == SLAVE_STATE_ONLINE && + int events = aeGetFileEvents(server.el,slave->fd); + int can_receive_writes = (events & AE_WRITABLE) || + (slave->flags & CLIENT_PENDING_WRITE); + + /* We don't want to send the pending data to the replica in a few + * cases: + * + * 1. For some reason there is neither the write handler installed + * nor the client is flagged as to have pending writes: for some + * reason this replica may not be set to receive data. This is + * just for the sake of defensive programming. + * + * 2. The put_online_on_ack flag is true. To know why we don't want + * to send data to the replica in this case, please grep for the + * flag for this flag. + * + * 3. Obviously if the slave is not ONLINE. + */ + if (slave->replstate == SLAVE_STATE_ONLINE && + can_receive_writes && + !slave->repl_put_online_on_ack && clientHasPendingReplies(slave)) { writeToClient(slave->fd,slave,0); diff --git a/src/object.c b/src/object.c index 10209a6c8..697429b84 100644 --- a/src/object.c +++ b/src/object.c @@ -467,10 +467,15 @@ robj *tryObjectEncoding(robj *o) { incrRefCount(shared.integers[value]); return shared.integers[value]; } else { - if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr); - o->encoding = OBJ_ENCODING_INT; - o->ptr = (void*) value; - return o; + if (o->encoding == OBJ_ENCODING_RAW) { + sdsfree(o->ptr); + o->encoding = OBJ_ENCODING_INT; + o->ptr = (void*) value; + return o; + } else if (o->encoding == OBJ_ENCODING_EMBSTR) { + decrRefCount(o); + return createStringObjectFromLongLongForValue(value); + } } } @@ -1435,13 +1440,15 @@ NULL #if defined(USE_JEMALLOC) sds info = sdsempty(); je_malloc_stats_print(inputCatSds, &info, NULL); - addReplyBulkSds(c, info); + addReplyVerbatim(c,info,sdslen(info),"txt"); + sdsfree(info); #else addReplyBulkCString(c,"Stats not supported for the current allocator"); #endif } else if (!strcasecmp(c->argv[1]->ptr,"doctor") && c->argc == 2) { sds report = getMemoryDoctorReport(); - addReplyBulkSds(c,report); + addReplyVerbatim(c,report,sdslen(report),"txt"); + sdsfree(report); } else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) { #if defined(USE_JEMALLOC) char tmp[32]; @@ -42,31 +42,35 @@ #include <sys/stat.h> #include <sys/param.h> -#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +/* This macro is called when the internal RDB stracture is corrupt */ +#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__) +/* This macro is called when RDB read failed (possibly a short read) */ +#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__) char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); -void rdbCheckThenExit(int linenum, char *reason, ...) { +void rdbReportError(int corruption_error, int linenum, char *reason, ...) { va_list ap; char msg[1024]; int len; len = snprintf(msg,sizeof(msg), - "Internal error in RDB reading function at rdb.c:%d -> ", linenum); + "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ", + (unsigned long long)server.loading_loaded_bytes, linenum); va_start(ap,reason); vsnprintf(msg+len,sizeof(msg)-len,reason,ap); va_end(ap); if (!rdbCheckMode) { - serverLog(LL_WARNING, "%s", msg); - if (rdbFileBeingLoaded) { + if (rdbFileBeingLoaded || corruption_error) { + serverLog(LL_WARNING, "%s", msg); char *argv[2] = {"",rdbFileBeingLoaded}; redis_check_rdb_main(2,argv,NULL); } else { - serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg); return; } } else { @@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) { return len; } -/* This is just a wrapper for the low level function rioRead() that will - * automatically abort if it is not possible to read the specified amount - * of bytes. */ -void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { - if (rioRead(rdb,buf,len) == 0) { - rdbExitReportCorruptRDB( - "Impossible to read %llu bytes in rdbLoadRaw()", - (unsigned long long) len); - return; /* Not reached. */ - } -} - int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } @@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) { /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS - * opcode. */ + * opcode. On error -1 is returned, however this could be a valid time, so + * to check for loading errors the caller should call rioGetReadError() after + * calling this function. */ time_t rdbLoadTime(rio *rdb) { int32_t t32; - rdbLoadRaw(rdb,&t32,4); + if (rioRead(rdb,&t32,4) == 0) return -1; return (time_t)t32; } @@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { * after upgrading to Redis version 5 they will no longer be able to load their * own old RDB files. Because of that, we instead fix the function only for new * RDB versions, and load older RDB versions as we used to do in the past, - * allowing big endian systems to load their own old RDB files. */ + * allowing big endian systems to load their own old RDB files. + * + * On I/O error the function returns LLONG_MAX, however if this is also a + * valid stored value, the caller should use rioGetReadError() to check for + * errors after calling this function. */ long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { int64_t t64; - rdbLoadRaw(rdb,&t64,8); + if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX; if (rdbver >= 9) /* Check the top comment of this function. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ return (long long)t64; @@ -262,7 +260,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) { /* Loads an integer-encoded object with the specified encoding type "enctype". * The returned value changes according to the flags, see - * rdbGenerincLoadStringObject() for more info. */ + * rdbGenericLoadStringObject() for more info. */ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { int plain = flags & RDB_LOAD_PLAIN; int sds = flags & RDB_LOAD_SDS; @@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); val = (int32_t)v; } else { - val = 0; /* anti-warning */ rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); + return NULL; /* Never reached. */ } if (plain || sds) { char buf[LONG_STR_SIZE], *p; @@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { /* Load the compressed representation and uncompress it to target. */ if (rioRead(rdb,c,clen) == 0) goto err; if (lzf_decompress(c,clen,val,len) == 0) { - if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string"); - goto err; + rdbExitReportCorruptRDB("Invalid LZF compressed string"); } zfree(c); @@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { return rdbLoadLzfStringObject(rdb,flags,lenptr); default: rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); + return NULL; /* Never reached. */ } } @@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,rdb,key); /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ @@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { io.bytes += retval; /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,key); mt->rdb_save(&io,mv->value); retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); - if (retval == -1) return -1; - io.bytes += retval; + if (retval == -1) + io.error = 1; + else + io.bytes += retval; if (io.ctx) { moduleFreeContext(io.ctx); @@ -1103,6 +1103,45 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { + /* Save a module-specific aux value. */ + RedisModuleIO io; + int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX); + + /* Write the "module" identifier as prefix, so that we'll be able + * to call the right module during loading. */ + retval = rdbSaveLen(rdb,mt->id); + if (retval == -1) return -1; + io.bytes += retval; + + /* write the 'when' so that we can provide it on loading. add a UINT opcode + * for backwards compatibility, everything after the MT needs to be prefixed + * by an opcode. */ + retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT); + if (retval == -1) return -1; + io.bytes += retval; + retval = rdbSaveLen(rdb,when); + if (retval == -1) return -1; + io.bytes += retval; + + /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,NULL); + mt->aux_save(&io,when); + retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); + if (retval == -1) + io.error = 1; + else + io.bytes += retval; + + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + if (io.error) + return -1; + return io.bytes; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1124,6 +1163,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -1185,6 +1225,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { di = NULL; /* So that we don't release it again on error. */ } + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; + /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; @@ -1628,6 +1670,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { hashTypeConvert(o, OBJ_ENCODING_HT); break; default: + /* totally unreachable */ rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } @@ -1635,6 +1678,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); + if (listpacks == RDB_LENERR) { + rdbReportReadError("Stream listpacks len loading failed."); + decrRefCount(o); + return NULL; + } while(listpacks--) { /* Get the master ID, the one we'll use as key of the radix tree @@ -1642,7 +1690,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (nodekey == NULL) { - rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error."); + decrRefCount(o); + return NULL; } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " @@ -1652,7 +1702,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Load the listpack. */ unsigned char *lp = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (lp == NULL) return NULL; + if (lp == NULL) { + rdbReportReadError("Stream listpacks loading failed."); + sdsfree(nodekey); + decrRefCount(o); + return NULL; + } unsigned char *first = lpFirst(lp); if (first == NULL) { /* Serialized listpacks should never be empty, since on @@ -1670,12 +1725,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } /* Load total number of items inside the stream. */ s->length = rdbLoadLen(rdb,NULL); + /* Load the last entry ID. */ s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream object metadata loading failed."); + decrRefCount(o); + return NULL; + } + /* Consumer groups loading */ - size_t cgroups_count = rdbLoadLen(rdb,NULL); + uint64_t cgroups_count = rdbLoadLen(rdb,NULL); + if (cgroups_count == RDB_LENERR) { + rdbReportReadError("Stream cgroup count loading failed."); + decrRefCount(o); + return NULL; + } while(cgroups_count--) { /* Get the consumer group name and ID. We can then create the * consumer group ASAP and populate its structure as @@ -1683,11 +1750,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { streamID cg_id; sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cgname == NULL) { - rdbExitReportCorruptRDB( + rdbReportReadError( "Error reading the consumer group name from Stream"); + decrRefCount(o); + return NULL; } + cg_id.ms = rdbLoadLen(rdb,NULL); cg_id.seq = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream cgroup ID loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; + } + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); if (cgroup == NULL) rdbExitReportCorruptRDB("Duplicated consumer group name %s", @@ -1699,13 +1776,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * owner, since consumers for this group and their messages will * be read as a next step. So for now leave them not resolved * and later populate it. */ - size_t pel_size = rdbLoadLen(rdb,NULL); + uint64_t pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream PEL size loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream PEL ID loading failed."); + decrRefCount(o); + return NULL; + } streamNACK *nack = streamCreateNACK(NULL); nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); nack->delivery_count = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream PEL NACK loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) rdbExitReportCorruptRDB("Duplicated gobal PEL entry " "loading stream consumer group"); @@ -1713,24 +1805,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Now that we loaded our global PEL, we need to load the * consumers and their local PELs. */ - size_t consumers_num = rdbLoadLen(rdb,NULL); + uint64_t consumers_num = rdbLoadLen(rdb,NULL); + if (consumers_num == RDB_LENERR) { + rdbReportReadError("Stream consumers num loading failed."); + decrRefCount(o); + return NULL; + } while(consumers_num--) { sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cname == NULL) { - rdbExitReportCorruptRDB( - "Error reading the consumer name from Stream group"); + rdbReportReadError( + "Error reading the consumer name from Stream group."); + decrRefCount(o); + return NULL; } streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream short read reading seen time."); + decrRefCount(o); + return NULL; + } /* Load the PEL about entries owned by this specific * consumer. */ pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError( + "Stream consumer PEL num loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError( + "Stream short read reading PEL streamID."); + decrRefCount(o); + return NULL; + } streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); if (nack == raxNotFound) rdbExitReportCorruptRDB("Consumer entry not found in " @@ -1749,6 +1864,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) return NULL; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -1776,6 +1892,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Module v2 serialization has an EOF mark at the end. */ if (io.ver == 2) { uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof == RDB_LENERR) { + o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */ + decrRefCount(o); + return NULL; + } if (eof != RDB_MODULE_OPCODE_EOF) { serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name); exit(1); @@ -1789,7 +1910,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } o = createModuleObject(mt,ptr); } else { - rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); + rdbReportReadError("Unknown RDB encoding type %d",rdbtype); + return NULL; } return o; } @@ -1888,11 +2010,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * load the actual type, and continue. */ expiretime = rdbLoadTime(rdb); expiretime *= 1000; + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ expiretime = rdbLoadMillisecondTime(rdb,rdbver); + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ @@ -1993,15 +2117,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(auxval); continue; /* Read type again. */ } else if (type == RDB_OPCODE_MODULE_AUX) { - /* This is just for compatibility with the future: we have plans - * to add the ability for modules to store anything in the RDB - * file, like data that is not related to the Redis key space. - * Such data will potentially be stored both before and after the - * RDB keys-values section. For this reason since RDB version 9, - * we have the ability to read a MODULE_AUX opcode followed by an - * identifier of the module, and a serialized value in "MODULE V2" - * format. */ + /* Load module data that is not related to the Redis key space. + * Such data can be potentially be stored both before and after the + * RDB keys-values section. */ uint64_t moduleid = rdbLoadLen(rdb,NULL); + int when_opcode = rdbLoadLen(rdb,NULL); + int when = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) goto eoferr; + if (when_opcode != RDB_MODULE_OPCODE_UINT) + rdbReportReadError("bad when_opcode"); moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; moduleTypeNameByID(name,moduleid); @@ -2011,14 +2135,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name); exit(1); } else if (!rdbCheckMode && mt != NULL) { - /* This version of Redis actually does not know what to do - * with modules AUX data... */ - serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name); - exit(1); + if (!mt->aux_load) { + /* Module doesn't support AUX. */ + serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name); + exit(1); + } + + RedisModuleIO io; + moduleInitIOContext(io,mt,rdb,NULL); + io.ver = 2; + /* Call the rdb_load method of the module providing the 10 bit + * encoding version in the lower 10 bits of the module ID. */ + if (mt->aux_load(&io,moduleid&1023, when) || io.error) { + moduleTypeNameByID(name,moduleid); + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); + exit(1); + } + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof != RDB_MODULE_OPCODE_EOF) { + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name); + exit(1); + } + continue; } else { /* RDB check mode. */ robj *aux = rdbLoadCheckModuleValue(rdb,name); decrRefCount(aux); + continue; /* Read next opcode. */ } } @@ -2072,10 +2219,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } return C_OK; -eoferr: /* unexpected end of file is handled here with a fatal exit */ - serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); - rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); - return C_ERR; /* Just to avoid warning */ + /* Unexpected end of file is handled here calling rdbReportReadError(): + * this will in turn either abort Redis in most cases, or if we are loading + * the RDB file from a socket during initial SYNC (diskless replica mode), + * we'll report the error to the caller, so that we can retry. */ +eoferr: + serverLog(LL_WARNING, + "Short read or OOM loading DB. Unrecoverable error, aborting now."); + rdbReportReadError("Unexpected EOF reading RDB file"); + return C_ERR; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The @@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 1d16fa4ee..2df41580b 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -104,6 +104,7 @@ static struct config { int is_fetching_slots; int is_updating_slots; int slots_last_update; + int enable_tracking; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; @@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port, goto fail; } - if(config.auth){ + if(config.auth) { void *authReply = NULL; redisAppendCommand(c, "AUTH %s", config.auth); if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; @@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } + if (config.enable_tracking) { + char *buf = NULL; + int len = redisFormatCommand(&buf, "CLIENT TRACKING on"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + /* If a DB number different than zero is selected, prefix our request * buffer with the SELECT command, that will be discarded the first * time the replies are received, so if the client is reused the @@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) { } else if (config.num_threads < 0) config.num_threads = 0; } else if (!strcmp(argv[i],"--cluster")) { config.cluster_mode = 1; + } else if (!strcmp(argv[i],"--enable-tracking")) { + config.enable_tracking = 1; } else if (!strcmp(argv[i],"--help")) { exit_status = 0; goto usage; @@ -1380,6 +1391,7 @@ usage: " --dbnum <db> SELECT the specified db number (default 0)\n" " --threads <num> Enable multi-thread mode.\n" " --cluster Enable cluster mode.\n" +" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k <boolean> 1=keep alive 0=reconnect (default 1)\n" " -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n" " Using this option the benchmark will expand the string __rand_int__\n" @@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) { config.is_fetching_slots = 0; config.is_updating_slots = 0; config.slots_last_update = 0; + config.enable_tracking = 0; i = parseOptions(argc,argv); argc -= i; diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index e2d71b5a5..5e7415046 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -216,14 +216,16 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; + expiretime = rdbLoadTime(&rdb); expiretime *= 1000; + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; - if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr; + expiretime = rdbLoadMillisecondTime(&rdb, rdbver); + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ diff --git a/src/redis-cli.c b/src/redis-cli.c index e363a2795..c183155cb 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -218,6 +218,7 @@ static struct config { int hotkeys; int stdinarg; /* get last arg from stdin. (-x option) */ char *auth; + char *user; int output; /* output mode, see OUTPUT_* defines */ sds mb_delim; char prompt[128]; @@ -230,6 +231,7 @@ static struct config { int verbose; clusterManagerCommand cluster_manager_command; int no_auth_warning; + int resp3; } config; /* User preferences. */ @@ -728,8 +730,13 @@ static int cliAuth(void) { redisReply *reply; if (config.auth == NULL) return REDIS_OK; - reply = redisCommand(context,"AUTH %s",config.auth); + if (config.user == NULL) + reply = redisCommand(context,"AUTH %s",config.auth); + else + reply = redisCommand(context,"AUTH %s %s",config.user,config.auth); if (reply != NULL) { + if (reply->type == REDIS_REPLY_ERROR) + fprintf(stderr,"Warning: AUTH failed\n"); freeReplyObject(reply); return REDIS_OK; } @@ -751,6 +758,21 @@ static int cliSelect(void) { return REDIS_ERR; } +/* Select RESP3 mode if redis-cli was started with the -3 option. */ +static int cliSwitchProto(void) { + redisReply *reply; + if (config.resp3 == 0) return REDIS_OK; + + reply = redisCommand(context,"HELLO 3"); + if (reply != NULL) { + int result = REDIS_OK; + if (reply->type == REDIS_REPLY_ERROR) result = REDIS_ERR; + freeReplyObject(reply); + return result; + } + return REDIS_ERR; +} + /* Connect to the server. It is possible to pass certain flags to the function: * CC_FORCE: The connection is performed even if there is already * a connected socket. @@ -788,11 +810,13 @@ static int cliConnect(int flags) { * errors. */ anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); - /* Do AUTH and select the right DB. */ + /* Do AUTH, select the right DB, switch to RESP3 if needed. */ if (cliAuth() != REDIS_OK) return REDIS_ERR; if (cliSelect() != REDIS_OK) return REDIS_ERR; + if (cliSwitchProto() != REDIS_OK) + return REDIS_ERR; } return REDIS_OK; } @@ -819,10 +843,17 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) { out = sdscatprintf(out,"(double) %s\n",r->str); break; case REDIS_REPLY_STRING: + case REDIS_REPLY_VERB: /* If you are producing output for the standard output we want - * a more interesting output with quoted characters and so forth */ - out = sdscatrepr(out,r->str,r->len); - out = sdscat(out,"\n"); + * a more interesting output with quoted characters and so forth, + * unless it's a verbatim string type. */ + if (r->type == REDIS_REPLY_STRING) { + out = sdscatrepr(out,r->str,r->len); + out = sdscat(out,"\n"); + } else { + out = sdscatlen(out,r->str,r->len); + out = sdscat(out,"\n"); + } break; case REDIS_REPLY_NIL: out = sdscat(out,"(nil)\n"); @@ -961,6 +992,7 @@ static sds cliFormatReplyRaw(redisReply *r) { break; case REDIS_REPLY_STATUS: case REDIS_REPLY_STRING: + case REDIS_REPLY_VERB: if (r->type == REDIS_REPLY_STATUS && config.eval_ldb) { /* The Lua debugger replies with arrays of simple (status) * strings. We colorize the output for more fun if this @@ -980,9 +1012,15 @@ static sds cliFormatReplyRaw(redisReply *r) { out = sdscatlen(out,r->str,r->len); } break; + case REDIS_REPLY_BOOL: + out = sdscat(out,r->integer ? "(true)" : "(false)"); + break; case REDIS_REPLY_INTEGER: out = sdscatprintf(out,"%lld",r->integer); break; + case REDIS_REPLY_DOUBLE: + out = sdscatprintf(out,"%s",r->str); + break; case REDIS_REPLY_ARRAY: for (i = 0; i < r->elements; i++) { if (i > 0) out = sdscat(out,config.mb_delim); @@ -991,6 +1029,19 @@ static sds cliFormatReplyRaw(redisReply *r) { sdsfree(tmp); } break; + case REDIS_REPLY_MAP: + for (i = 0; i < r->elements; i += 2) { + if (i > 0) out = sdscat(out,config.mb_delim); + tmp = cliFormatReplyRaw(r->element[i]); + out = sdscatlen(out,tmp,sdslen(tmp)); + sdsfree(tmp); + + out = sdscatlen(out," ",1); + tmp = cliFormatReplyRaw(r->element[i+1]); + out = sdscatlen(out,tmp,sdslen(tmp)); + sdsfree(tmp); + } + break; default: fprintf(stderr,"Unknown reply type: %d\n", r->type); exit(1); @@ -1013,13 +1064,21 @@ static sds cliFormatReplyCSV(redisReply *r) { case REDIS_REPLY_INTEGER: out = sdscatprintf(out,"%lld",r->integer); break; + case REDIS_REPLY_DOUBLE: + out = sdscatprintf(out,"%s",r->str); + break; case REDIS_REPLY_STRING: + case REDIS_REPLY_VERB: out = sdscatrepr(out,r->str,r->len); break; case REDIS_REPLY_NIL: - out = sdscat(out,"NIL"); + out = sdscat(out,"NULL"); + break; + case REDIS_REPLY_BOOL: + out = sdscat(out,r->integer ? "true" : "false"); break; case REDIS_REPLY_ARRAY: + case REDIS_REPLY_MAP: /* CSV has no map type, just output flat list. */ for (i = 0; i < r->elements; i++) { sds tmp = cliFormatReplyCSV(r->element[i]); out = sdscatlen(out,tmp,sdslen(tmp)); @@ -1213,7 +1272,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) { config.dbnum = atoi(argv[1]); cliRefreshPrompt(); - } else if (!strcasecmp(command,"auth") && argc == 2) { + } else if (!strcasecmp(command,"auth") && (argc == 2 || argc == 3)) + { cliSelect(); } } @@ -1296,8 +1356,12 @@ static int parseOptions(int argc, char **argv) { config.dbnum = atoi(argv[++i]); } else if (!strcmp(argv[i], "--no-auth-warning")) { config.no_auth_warning = 1; - } else if (!strcmp(argv[i],"-a") && !lastarg) { + } else if ((!strcmp(argv[i],"-a") || !strcmp(argv[i],"--pass")) + && !lastarg) + { config.auth = argv[++i]; + } else if (!strcmp(argv[i],"--user") && !lastarg) { + config.user = argv[++i]; } else if (!strcmp(argv[i],"-u") && !lastarg) { parseRedisUri(argv[++i]); } else if (!strcmp(argv[i],"--raw")) { @@ -1439,6 +1503,8 @@ static int parseOptions(int argc, char **argv) { printf("redis-cli %s\n", version); sdsfree(version); exit(0); + } else if (!strcmp(argv[i],"-3")) { + config.resp3 = 1; } else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') { if (config.cluster_manager_command.argc == 0) { int j = i + 1; @@ -1514,11 +1580,14 @@ static void usage(void) { " You can also use the " REDIS_CLI_AUTH_ENV " environment\n" " variable to pass this password more safely\n" " (if both are used, this argument takes predecence).\n" +" -user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n" +" -pass <password> Alias of -a for consistency with the new --user option.\n" " -u <uri> Server URI.\n" " -r <repeat> Execute specified command N times.\n" " -i <interval> When -r is used, waits <interval> seconds per command.\n" " It is possible to specify sub-second times like -i 0.1.\n" " -n <db> Database number.\n" +" -3 Start session in RESP3 protocol mode.\n" " -x Read last argument from STDIN.\n" " -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n).\n" " -c Enable cluster mode (follow -ASK and -MOVED redirections).\n" @@ -1533,7 +1602,9 @@ static void usage(void) { " --csv is specified, or if you redirect the output to a non\n" " TTY, it samples the latency for 1 second (you can use\n" " -i to change the interval), then produces a single output\n" -" and exits.\n" +" and exits.\n",version); + + fprintf(stderr, " --latency-history Like --latency but tracking latency changes over time.\n" " Default time interval is 15 sec. Change it using -i.\n" " --latency-dist Shows latency as a spectrum, requires xterm 256 colors.\n" @@ -1568,7 +1639,7 @@ static void usage(void) { " --help Output this help and exit.\n" " --version Output version and exit.\n" "\n", - version, REDIS_CLI_DEFAULT_PIPE_TIMEOUT); + REDIS_CLI_DEFAULT_PIPE_TIMEOUT); /* Using another fprintf call to avoid -Woverlength-strings compile warning */ fprintf(stderr, "Cluster Manager Commands:\n" @@ -2350,7 +2421,12 @@ static int clusterManagerNodeConnect(clusterManagerNode *node) { * errors. */ anetKeepAlive(NULL, node->context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); if (config.auth) { - redisReply *reply = redisCommand(node->context,"AUTH %s",config.auth); + redisReply *reply; + if (config.user == NULL) + reply = redisCommand(node->context,"AUTH %s", config.auth); + else + reply = redisCommand(node->context,"AUTH %s %s", + config.user,config.auth); int ok = clusterManagerCheckRedisReply(node, reply, NULL); if (reply != NULL) freeReplyObject(reply); if (!ok) return 0; @@ -6724,6 +6800,7 @@ static void pipeMode(void) { /* Handle the readable state: we can read replies from the server. */ if (mask & AE_READABLE) { ssize_t nread; + int read_error = 0; /* Read from socket and feed the hiredis reader. */ do { @@ -6731,7 +6808,8 @@ static void pipeMode(void) { if (nread == -1 && errno != EAGAIN && errno != EINTR) { fprintf(stderr, "Error reading from the server: %s\n", strerror(errno)); - exit(1); + read_error = 1; + break; } if (nread > 0) { redisReaderFeed(reader,ibuf,nread); @@ -6764,6 +6842,11 @@ static void pipeMode(void) { freeReplyObject(reply); } } while(reply); + + /* Abort on read errors. We abort here because it is important + * to consume replies even after a read error: this way we can + * show a potential problem to the user. */ + if (read_error) exit(1); } /* Handle the writable state: we can send protocol to the server. */ @@ -7671,6 +7754,7 @@ int main(int argc, char **argv) { config.hotkeys = 0; config.stdinarg = 0; config.auth = NULL; + config.user = NULL; config.eval = NULL; config.eval_ldb = 0; config.eval_ldb_end = 0; diff --git a/src/redismodule.h b/src/redismodule.h index 60681da7c..6a3a164b5 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -129,6 +129,10 @@ #define REDISMODULE_NOT_USED(V) ((void) V) +/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + /* This type represents a timer handle, and is returned when a timer is * registered and used in order to invalidate a timer. It's just a 64 bit * number, because this is how each timer is represented inside the radix tree @@ -140,6 +144,9 @@ typedef uint64_t RedisModuleTimerID; /* Do filter RedisModule_Call() commands initiated by module itself. */ #define REDISMODULE_CMDFILTER_NOSELF (1<<0) +/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */ +#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -166,6 +173,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); +typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); +typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); @@ -175,7 +184,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); -#define REDISMODULE_TYPE_METHOD_VERSION 1 +#define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { uint64_t version; RedisModuleTypeLoadFunc rdb_load; @@ -184,6 +193,9 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeMemUsageFunc mem_usage; RedisModuleTypeDigestFunc digest; RedisModuleTypeFreeFunc free; + RedisModuleTypeAuxLoadFunc aux_load; + RedisModuleTypeAuxSaveFunc aux_save; + int aux_save_triggers; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ @@ -272,6 +284,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); +int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); @@ -287,6 +301,7 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value) float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); +void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len); void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); @@ -448,6 +463,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); + REDISMODULE_GET_API(IsIOError); + REDISMODULE_GET_API(SetModuleOptions); REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(SaveSigned); @@ -463,6 +480,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); + REDISMODULE_GET_API(_Assert); REDISMODULE_GET_API(StringAppendBuffer); REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(StringCompare); @@ -542,6 +560,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int return REDISMODULE_OK; } +#define RedisModule_Assert(_e) ((_e)?(void)0 : (RedisModule__Assert(#_e,__FILE__,__LINE__),exit(1))) + #else /* Things only defined for the modules core, not exported to modules diff --git a/src/replication.c b/src/replication.c index 7adf5ba38..8039e06ae 100644 --- a/src/replication.c +++ b/src/replication.c @@ -823,7 +823,9 @@ void replconfCommand(client *c) { c->repl_ack_time = server.unixtime; /* If this was a diskless replication, we need to really put * the slave online when the first ACK is received (which - * confirms slave is online and ready to get more data). */ + * confirms slave is online and ready to get more data). This + * allows for simpler and less CPU intensive EOF detection + * when streaming RDB files. */ if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) putSlaveOnline(c); /* Note: this command does not reply anything! */ @@ -842,18 +844,20 @@ void replconfCommand(client *c) { addReply(c,shared.ok); } -/* This function puts a slave in the online state, and should be called just - * after a slave received the RDB file for the initial synchronization, and +/* This function puts a replica in the online state, and should be called just + * after a replica received the RDB file for the initial synchronization, and * we are finally ready to send the incremental stream of commands. * * It does a few things: * - * 1) Put the slave in ONLINE state (useless when the function is called - * because state is already ONLINE but repl_put_online_on_ack is true). + * 1) Put the slave in ONLINE state. Note that the function may also be called + * for a replicas that are already in ONLINE state, but having the flag + * repl_put_online_on_ack set to true: we still have to install the write + * handler in that case. This function will take care of that. * 2) Make sure the writable event is re-installed, since calling the SYNC * command disables it, so that we can accumulate output buffer without - * sending it to the slave. - * 3) Update the count of good slaves. */ + * sending it to the replica. + * 3) Update the count of "good replicas". */ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; @@ -965,11 +969,31 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { serverLog(LL_NOTICE, "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", replicationGetSlaveName(slave)); - /* Note: we wait for a REPLCONF ACK message from slave in + /* Note: we wait for a REPLCONF ACK message from the replica in * order to really put it online (install the write handler * so that the accumulated data can be transferred). However * we change the replication state ASAP, since our slave - * is technically online now. */ + * is technically online now. + * + * So things work like that: + * + * 1. We end trasnferring the RDB file via socket. + * 2. The replica is put ONLINE but the write handler + * is not installed. + * 3. The replica however goes really online, and pings us + * back via REPLCONF ACK commands. + * 4. Now we finally install the write handler, and send + * the buffers accumulated so far to the replica. + * + * But why we do that? Because the replica, when we stream + * the RDB directly via the socket, must detect the RDB + * EOF (end of file), that is a special random string at the + * end of the RDB (for streamed RDBs we don't know the length + * in advance). Detecting such final EOF string is much + * simpler and less CPU intensive if no more data is sent + * after such final EOF. So we don't want to glue the end of + * the RDB trasfer with the start of the other replication + * data. */ slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 1; slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ @@ -1115,8 +1139,15 @@ void restartAOFAfterSYNC() { static int useDisklessLoad() { /* compute boolean decision to use diskless load */ - return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); + /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ + if (enabled && !moduleAllDatatypesHandleErrors()) { + serverLog(LL_WARNING, + "Skipping diskless-load because there are modules that don't handle read errors."); + enabled = 0; + } + return enabled; } /* Helper function for readSyncBulkPayload() to make backups of the current @@ -92,6 +92,7 @@ static const rio rioBufferIO = { rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -145,6 +146,7 @@ static const rio rioFileIO = { rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -239,6 +241,7 @@ static const rio rioFdIO = { rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -374,6 +377,7 @@ static const rio rioFdsetIO = { rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -1,6 +1,6 @@ /* * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com> - * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com> * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,6 +36,9 @@ #include <stdint.h> #include "sds.h" +#define RIO_FLAG_READ_ERROR (1<<0) +#define RIO_FLAG_WRITE_ERROR (1<<1) + struct _rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return @@ -51,8 +54,8 @@ struct _rio { * computation. */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); - /* The current checksum */ - uint64_t cksum; + /* The current checksum and flags (see RIO_FLAG_*) */ + uint64_t cksum, flags; /* number of bytes read or written */ size_t processed_bytes; @@ -99,11 +102,14 @@ typedef struct _rio rio; * if needed. */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { + if (r->flags & RIO_FLAG_WRITE_ERROR) return 0; while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); - if (r->write(r,buf,bytes_to_write) == 0) + if (r->write(r,buf,bytes_to_write) == 0) { + r->flags |= RIO_FLAG_WRITE_ERROR; return 0; + } buf = (char*)buf + bytes_to_write; len -= bytes_to_write; r->processed_bytes += bytes_to_write; @@ -112,10 +118,13 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { } static inline size_t rioRead(rio *r, void *buf, size_t len) { + if (r->flags & RIO_FLAG_READ_ERROR) return 0; while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; - if (r->read(r,buf,bytes_to_read) == 0) + if (r->read(r,buf,bytes_to_read) == 0) { + r->flags |= RIO_FLAG_READ_ERROR; return 0; + } if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); buf = (char*)buf + bytes_to_read; len -= bytes_to_read; @@ -132,6 +141,22 @@ static inline int rioFlush(rio *r) { return r->flush(r); } +/* This function allows to know if there was a read error in any past + * operation, since the rio stream was created or since the last call + * to rioClearError(). */ +static inline int rioGetReadError(rio *r) { + return (r->flags & RIO_FLAG_READ_ERROR) != 0; +} + +/* Like rioGetReadError() but for write errors. */ +static inline int rioGetWriteError(rio *r) { + return (r->flags & RIO_FLAG_WRITE_ERROR) != 0; +} + +static inline void rioClearErrors(rio *r) { + r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFd(rio *r, int fd, size_t read_limit); diff --git a/src/scripting.c b/src/scripting.c index deb406457..3129e4f47 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -42,7 +42,10 @@ char *redisProtocolToLuaType_Int(lua_State *lua, char *reply); char *redisProtocolToLuaType_Bulk(lua_State *lua, char *reply); char *redisProtocolToLuaType_Status(lua_State *lua, char *reply); char *redisProtocolToLuaType_Error(lua_State *lua, char *reply); -char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype); +char *redisProtocolToLuaType_Aggregate(lua_State *lua, char *reply, int atype); +char *redisProtocolToLuaType_Null(lua_State *lua, char *reply); +char *redisProtocolToLuaType_Bool(lua_State *lua, char *reply, int tf); +char *redisProtocolToLuaType_Double(lua_State *lua, char *reply); int redis_math_random (lua_State *L); int redis_math_randomseed (lua_State *L); void ldbInit(void); @@ -132,9 +135,12 @@ char *redisProtocolToLuaType(lua_State *lua, char* reply) { case '$': p = redisProtocolToLuaType_Bulk(lua,reply); break; case '+': p = redisProtocolToLuaType_Status(lua,reply); break; case '-': p = redisProtocolToLuaType_Error(lua,reply); break; - case '*': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break; - case '%': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break; - case '~': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break; + case '*': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break; + case '%': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break; + case '~': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break; + case '_': p = redisProtocolToLuaType_Null(lua,reply); break; + case '#': p = redisProtocolToLuaType_Bool(lua,reply,p[1]); break; + case ',': p = redisProtocolToLuaType_Double(lua,reply); break; } return p; } @@ -182,13 +188,13 @@ char *redisProtocolToLuaType_Error(lua_State *lua, char *reply) { return p+2; } -char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) { +char *redisProtocolToLuaType_Aggregate(lua_State *lua, char *reply, int atype) { char *p = strchr(reply+1,'\r'); long long mbulklen; int j = 0; string2ll(reply+1,p-reply-1,&mbulklen); - if (server.lua_caller->resp == 2 || atype == '*') { + if (server.lua_client->resp == 2 || atype == '*') { p += 2; if (mbulklen == -1) { lua_pushboolean(lua,0); @@ -200,11 +206,15 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) { p = redisProtocolToLuaType(lua,p); lua_settable(lua,-3); } - } else if (server.lua_caller->resp == 3) { + } else if (server.lua_client->resp == 3) { /* Here we handle only Set and Map replies in RESP3 mode, since arrays - * follow the above RESP2 code path. */ + * follow the above RESP2 code path. Note that those are represented + * as a table with the "map" or "set" field populated with the actual + * table representing the set or the map type. */ p += 2; lua_newtable(lua); + lua_pushstring(lua,atype == '%' ? "map" : "set"); + lua_newtable(lua); for (j = 0; j < mbulklen; j++) { p = redisProtocolToLuaType(lua,p); if (atype == '%') { @@ -214,10 +224,44 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) { } lua_settable(lua,-3); } + lua_settable(lua,-3); } return p; } +char *redisProtocolToLuaType_Null(lua_State *lua, char *reply) { + char *p = strchr(reply+1,'\r'); + lua_pushnil(lua); + return p+2; +} + +char *redisProtocolToLuaType_Bool(lua_State *lua, char *reply, int tf) { + char *p = strchr(reply+1,'\r'); + lua_pushboolean(lua,tf == 't'); + return p+2; +} + +char *redisProtocolToLuaType_Double(lua_State *lua, char *reply) { + char *p = strchr(reply+1,'\r'); + char buf[MAX_LONG_DOUBLE_CHARS+1]; + size_t len = p-reply-1; + double d; + + if (len <= MAX_LONG_DOUBLE_CHARS) { + memcpy(buf,reply+1,len); + buf[len] = '\0'; + d = strtod(buf,NULL); /* We expect a valid representation. */ + } else { + d = 0; + } + + lua_newtable(lua); + lua_pushstring(lua,"double"); + lua_pushnumber(lua,d); + lua_settable(lua,-3); + return p+2; +} + /* This function is used in order to push an error on the Lua stack in the * format used by redis.pcall to return errors, which is a lua table * with a single "err" field set to the error string. Note that this @@ -292,6 +336,8 @@ void luaSortArray(lua_State *lua) { * Lua reply to Redis reply conversion functions. * ------------------------------------------------------------------------- */ +/* Reply to client 'c' converting the top element in the Lua stack to a + * Redis reply. As a side effect the element is consumed from the stack. */ void luaReplyToRedisReply(client *c, lua_State *lua) { int t = lua_type(lua,-1); @@ -300,7 +346,11 @@ void luaReplyToRedisReply(client *c, lua_State *lua) { addReplyBulkCBuffer(c,(char*)lua_tostring(lua,-1),lua_strlen(lua,-1)); break; case LUA_TBOOLEAN: - addReply(c,lua_toboolean(lua,-1) ? shared.cone : shared.null[c->resp]); + if (server.lua_client->resp == 2) + addReply(c,lua_toboolean(lua,-1) ? shared.cone : + shared.null[c->resp]); + else + addReplyBool(c,lua_toboolean(lua,-1)); break; case LUA_TNUMBER: addReplyLongLong(c,(long long)lua_tonumber(lua,-1)); @@ -310,6 +360,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) { * Error are returned as a single element table with 'err' field. * Status replies are returned as single element table with 'ok' * field. */ + + /* Handle error reply. */ lua_pushstring(lua,"err"); lua_gettable(lua,-2); t = lua_type(lua,-1); @@ -321,8 +373,9 @@ void luaReplyToRedisReply(client *c, lua_State *lua) { lua_pop(lua,2); return; } + lua_pop(lua,1); /* Discard field name pushed before. */ - lua_pop(lua,1); + /* Handle status reply. */ lua_pushstring(lua,"ok"); lua_gettable(lua,-2); t = lua_type(lua,-1); @@ -331,25 +384,81 @@ void luaReplyToRedisReply(client *c, lua_State *lua) { sdsmapchars(ok,"\r\n"," ",2); addReplySds(c,sdscatprintf(sdsempty(),"+%s\r\n",ok)); sdsfree(ok); - lua_pop(lua,1); - } else { + lua_pop(lua,2); + return; + } + lua_pop(lua,1); /* Discard field name pushed before. */ + + /* Handle double reply. */ + lua_pushstring(lua,"double"); + lua_gettable(lua,-2); + t = lua_type(lua,-1); + if (t == LUA_TNUMBER) { + addReplyDouble(c,lua_tonumber(lua,-1)); + lua_pop(lua,2); + return; + } + lua_pop(lua,1); /* Discard field name pushed before. */ + + /* Handle map reply. */ + lua_pushstring(lua,"map"); + lua_gettable(lua,-2); + t = lua_type(lua,-1); + if (t == LUA_TTABLE) { + int maplen = 0; void *replylen = addReplyDeferredLen(c); - int j = 1, mbulklen = 0; - - lua_pop(lua,1); /* Discard the 'ok' field value we popped */ - while(1) { - lua_pushnumber(lua,j++); - lua_gettable(lua,-2); - t = lua_type(lua,-1); - if (t == LUA_TNIL) { - lua_pop(lua,1); - break; - } - luaReplyToRedisReply(c, lua); - mbulklen++; + lua_pushnil(lua); /* Use nil to start iteration. */ + while (lua_next(lua,-2)) { + /* Stack now: table, key, value */ + luaReplyToRedisReply(c, lua); /* Return value. */ + lua_pushvalue(lua,-1); /* Dup key before consuming. */ + luaReplyToRedisReply(c, lua); /* Return key. */ + /* Stack now: table, key. */ + maplen++; + } + setDeferredMapLen(c,replylen,maplen); + lua_pop(lua,2); + return; + } + lua_pop(lua,1); /* Discard field name pushed before. */ + + /* Handle set reply. */ + lua_pushstring(lua,"set"); + lua_gettable(lua,-2); + t = lua_type(lua,-1); + if (t == LUA_TTABLE) { + int setlen = 0; + void *replylen = addReplyDeferredLen(c); + lua_pushnil(lua); /* Use nil to start iteration. */ + while (lua_next(lua,-2)) { + /* Stack now: table, key, true */ + lua_pop(lua,1); /* Discard the boolean value. */ + lua_pushvalue(lua,-1); /* Dup key before consuming. */ + luaReplyToRedisReply(c, lua); /* Return key. */ + /* Stack now: table, key. */ + setlen++; } - setDeferredArrayLen(c,replylen,mbulklen); + setDeferredSetLen(c,replylen,setlen); + lua_pop(lua,2); + return; } + lua_pop(lua,1); /* Discard field name pushed before. */ + + /* Handle the array reply. */ + void *replylen = addReplyDeferredLen(c); + int j = 1, mbulklen = 0; + while(1) { + lua_pushnumber(lua,j++); + lua_gettable(lua,-2); + t = lua_type(lua,-1); + if (t == LUA_TNIL) { + lua_pop(lua,1); + break; + } + luaReplyToRedisReply(c, lua); + mbulklen++; + } + setDeferredArrayLen(c,replylen,mbulklen); break; default: addReplyNull(c); @@ -859,6 +968,25 @@ int luaLogCommand(lua_State *lua) { return 0; } +/* redis.setresp() */ +int luaSetResp(lua_State *lua) { + int argc = lua_gettop(lua); + + if (argc != 1) { + lua_pushstring(lua, "redis.setresp() requires one argument."); + return lua_error(lua); + } + + int resp = lua_tonumber(lua,-argc); + if (resp != 2 && resp != 3) { + lua_pushstring(lua, "RESP version must be 2 or 3."); + return lua_error(lua); + } + + server.lua_client->resp = resp; + return 0; +} + /* --------------------------------------------------------------------------- * Lua engine initialization and reset. * ------------------------------------------------------------------------- */ @@ -986,6 +1114,11 @@ void scriptingInit(int setup) { lua_pushcfunction(lua,luaLogCommand); lua_settable(lua,-3); + /* redis.setresp */ + lua_pushstring(lua,"setresp"); + lua_pushcfunction(lua,luaSetResp); + lua_settable(lua,-3); + lua_pushstring(lua,"LOG_DEBUG"); lua_pushnumber(lua,LL_DEBUG); lua_settable(lua,-3); @@ -1379,8 +1512,9 @@ void evalGenericCommand(client *c, int evalsha) { luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys); luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys); - /* Select the right DB in the context of the Lua client */ + /* Set the Lua client database and protocol. */ selectDb(server.lua_client,c->db->id); + server.lua_client->resp = 2; /* Default is RESP2, scripts can change it. */ /* Set a hook in order to be able to stop the script execution if it * is running for too much time. @@ -2052,6 +2186,11 @@ char *ldbRedisProtocolToHuman_Int(sds *o, char *reply); char *ldbRedisProtocolToHuman_Bulk(sds *o, char *reply); char *ldbRedisProtocolToHuman_Status(sds *o, char *reply); char *ldbRedisProtocolToHuman_MultiBulk(sds *o, char *reply); +char *ldbRedisProtocolToHuman_Set(sds *o, char *reply); +char *ldbRedisProtocolToHuman_Map(sds *o, char *reply); +char *ldbRedisProtocolToHuman_Null(sds *o, char *reply); +char *ldbRedisProtocolToHuman_Bool(sds *o, char *reply); +char *ldbRedisProtocolToHuman_Double(sds *o, char *reply); /* Get Redis protocol from 'reply' and appends it in human readable form to * the passed SDS string 'o'. @@ -2066,6 +2205,11 @@ char *ldbRedisProtocolToHuman(sds *o, char *reply) { case '+': p = ldbRedisProtocolToHuman_Status(o,reply); break; case '-': p = ldbRedisProtocolToHuman_Status(o,reply); break; case '*': p = ldbRedisProtocolToHuman_MultiBulk(o,reply); break; + case '~': p = ldbRedisProtocolToHuman_Set(o,reply); break; + case '%': p = ldbRedisProtocolToHuman_Map(o,reply); break; + case '_': p = ldbRedisProtocolToHuman_Null(o,reply); break; + case '#': p = ldbRedisProtocolToHuman_Bool(o,reply); break; + case ',': p = ldbRedisProtocolToHuman_Double(o,reply); break; } return p; } @@ -2120,6 +2264,62 @@ char *ldbRedisProtocolToHuman_MultiBulk(sds *o, char *reply) { return p; } +char *ldbRedisProtocolToHuman_Set(sds *o, char *reply) { + char *p = strchr(reply+1,'\r'); + long long mbulklen; + int j = 0; + + string2ll(reply+1,p-reply-1,&mbulklen); + p += 2; + *o = sdscatlen(*o,"~(",2); + for (j = 0; j < mbulklen; j++) { + p = ldbRedisProtocolToHuman(o,p); + if (j != mbulklen-1) *o = sdscatlen(*o,",",1); + } + *o = sdscatlen(*o,")",1); + return p; +} + +char *ldbRedisProtocolToHuman_Map(sds *o, char *reply) { + char *p = strchr(reply+1,'\r'); + long long mbulklen; + int j = 0; + + string2ll(reply+1,p-reply-1,&mbulklen); + p += 2; + *o = sdscatlen(*o,"{",1); + for (j = 0; j < mbulklen; j++) { + p = ldbRedisProtocolToHuman(o,p); + *o = sdscatlen(*o," => ",4); + p = ldbRedisProtocolToHuman(o,p); + if (j != mbulklen-1) *o = sdscatlen(*o,",",1); + } + *o = sdscatlen(*o,"}",1); + return p; +} + +char *ldbRedisProtocolToHuman_Null(sds *o, char *reply) { + char *p = strchr(reply+1,'\r'); + *o = sdscatlen(*o,"(null)",6); + return p+2; +} + +char *ldbRedisProtocolToHuman_Bool(sds *o, char *reply) { + char *p = strchr(reply+1,'\r'); + if (reply[1] == 't') + *o = sdscatlen(*o,"#true",5); + else + *o = sdscatlen(*o,"#false",6); + return p+2; +} + +char *ldbRedisProtocolToHuman_Double(sds *o, char *reply) { + char *p = strchr(reply+1,'\r'); + *o = sdscatlen(*o,"(double) ",9); + *o = sdscatlen(*o,reply+1,p-reply-1); + return p+2; +} + /* Log a Redis reply as debugger output, in an human readable format. * If the resulting string is longer than 'len' plus a few more chars * used as prefix, it gets truncated. */ diff --git a/src/server.c b/src/server.c index c0e59c86c..f38ed7897 100644 --- a/src/server.c +++ b/src/server.c @@ -146,6 +146,8 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */ * in this condition but just a few. * * no-monitor: Do not automatically propagate the command on MONITOR. + * + * no-slowlog: Do not automatically propagate the command to the slowlog. * * cluster-asking: Perform an implicit ASKING for this command, so the * command will be accepted in cluster mode if the slot is marked @@ -627,7 +629,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"auth",authCommand,-2, - "no-script ok-loading ok-stale fast @connection", + "no-script ok-loading ok-stale fast no-monitor no-slowlog @connection", 0,NULL,0,0,0,0,0,0}, /* We don't allow PING during loading since in Redis PING is used as @@ -670,7 +672,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"exec",execCommand,1, - "no-script no-monitor @transaction", + "no-script no-monitor no-slowlog @transaction", 0,NULL,0,0,0,0,0,0}, {"discard",discardCommand,1, @@ -822,7 +824,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"hello",helloCommand,-2, - "no-script fast @connection", + "no-script fast no-monitor no-slowlog @connection", 0,NULL,0,0,0,0,0,0}, /* EVAL can modify the dataset, however it is not flagged as a write @@ -2174,6 +2176,16 @@ void createSharedObjects(void) { shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n")); shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n")); + shared.emptymap[0] = NULL; + shared.emptymap[1] = NULL; + shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); + shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n")); + + shared.emptyset[0] = NULL; + shared.emptyset[1] = NULL; + shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); + shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n")); + for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { char dictid_str[64]; int dictid_len; @@ -2418,6 +2430,9 @@ void initServerConfig(void) { /* Latency monitor */ server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; + /* Tracking. */ + server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL; + /* Debugging */ server.assert_failed = "<no assertion failed>"; server.assert_file = "<no file>"; @@ -2926,6 +2941,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) { c->flags |= CMD_STALE; } else if (!strcasecmp(flag,"no-monitor")) { c->flags |= CMD_SKIP_MONITOR; + } else if (!strcasecmp(flag,"no-slowlog")) { + c->flags |= CMD_SKIP_SLOWLOG; } else if (!strcasecmp(flag,"cluster-asking")) { c->flags |= CMD_ASKING; } else if (!strcasecmp(flag,"fast")) { @@ -3210,7 +3227,7 @@ void call(client *c, int flags) { /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ - if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { + if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); @@ -3341,9 +3358,10 @@ int processCommand(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ - int auth_required = !(DefaultUser->flags & USER_FLAG_NOPASS) && + int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || + DefaultUser->flags & USER_FLAG_DISABLED) && !c->authenticated; - if (auth_required || DefaultUser->flags & USER_FLAG_DISABLED) { + if (auth_required) { /* AUTH and HELLO are valid even in non authenticated state. */ if (c->cmd->proc != authCommand || c->cmd->proc == helloCommand) { flagTransaction(c); @@ -3411,13 +3429,20 @@ int processCommand(client *c) { * is in MULTI/EXEC context? Error. */ if (out_of_memory && (c->cmd->flags & CMD_DENYOOM || - (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) { + (c->flags & CLIENT_MULTI && + c->cmd->proc != execCommand && + c->cmd->proc != discardCommand))) + { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } + /* Make sure to use a reasonable amount of memory for client side + * caching metadata. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); @@ -3718,6 +3743,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) { flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); + flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog"); flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) || @@ -4167,7 +4193,8 @@ sds genRedisInfoString(char *section) { "active_defrag_hits:%lld\r\n" "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" - "active_defrag_key_misses:%lld\r\n", + "active_defrag_key_misses:%lld\r\n" + "tracking_used_slots:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4193,7 +4220,8 @@ sds genRedisInfoString(char *section) { server.stat_active_defrag_hits, server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, - server.stat_active_defrag_key_misses); + server.stat_active_defrag_key_misses, + trackingGetUsedSlots()); } /* Replication */ @@ -4339,6 +4367,13 @@ sds genRedisInfoString(char *section) { (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec); } + /* Modules */ + if (allsections || defsections || !strcasecmp(section,"modules")) { + if (sections++) info = sdscat(info,"\r\n"); + info = sdscatprintf(info,"# Modules\r\n"); + info = genModulesInfoString(info); + } + /* Command statistics */ if (allsections || !strcasecmp(section,"commandstats")) { if (sections++) info = sdscat(info,"\r\n"); @@ -4394,7 +4429,9 @@ void infoCommand(client *c) { addReply(c,shared.syntaxerr); return; } - addReplyBulkSds(c, genRedisInfoString(section)); + sds info = genRedisInfoString(section); + addReplyVerbatim(c,info,sdslen(info),"txt"); + sdsfree(info); } void monitorCommand(client *c) { @@ -4840,9 +4877,9 @@ int main(int argc, char **argv) { srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); - char hashseed[16]; - getRandomHexChars(hashseed,sizeof(hashseed)); - dictSetHashFunctionSeed((uint8_t*)hashseed); + uint8_t hashseed[16]; + getRandomBytes(hashseed,sizeof(hashseed)); + dictSetHashFunctionSeed(hashseed); server.sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the diff --git a/src/server.h b/src/server.h index 7aa4bc2b7..d132cf09c 100644 --- a/src/server.h +++ b/src/server.h @@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ +#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -219,35 +220,36 @@ typedef long long mstime_t; /* millisecond time type. */ #define CMD_LOADING (1ULL<<9) /* "ok-loading" flag */ #define CMD_STALE (1ULL<<10) /* "ok-stale" flag */ #define CMD_SKIP_MONITOR (1ULL<<11) /* "no-monitor" flag */ -#define CMD_ASKING (1ULL<<12) /* "cluster-asking" flag */ -#define CMD_FAST (1ULL<<13) /* "fast" flag */ +#define CMD_SKIP_SLOWLOG (1ULL<<12) /* "no-slowlog" flag */ +#define CMD_ASKING (1ULL<<13) /* "cluster-asking" flag */ +#define CMD_FAST (1ULL<<14) /* "fast" flag */ /* Command flags used by the module system. */ -#define CMD_MODULE_GETKEYS (1ULL<<14) /* Use the modules getkeys interface. */ -#define CMD_MODULE_NO_CLUSTER (1ULL<<15) /* Deny on Redis Cluster. */ +#define CMD_MODULE_GETKEYS (1ULL<<15) /* Use the modules getkeys interface. */ +#define CMD_MODULE_NO_CLUSTER (1ULL<<16) /* Deny on Redis Cluster. */ /* Command flags that describe ACLs categories. */ -#define CMD_CATEGORY_KEYSPACE (1ULL<<16) -#define CMD_CATEGORY_READ (1ULL<<17) -#define CMD_CATEGORY_WRITE (1ULL<<18) -#define CMD_CATEGORY_SET (1ULL<<19) -#define CMD_CATEGORY_SORTEDSET (1ULL<<20) -#define CMD_CATEGORY_LIST (1ULL<<21) -#define CMD_CATEGORY_HASH (1ULL<<22) -#define CMD_CATEGORY_STRING (1ULL<<23) -#define CMD_CATEGORY_BITMAP (1ULL<<24) -#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<25) -#define CMD_CATEGORY_GEO (1ULL<<26) -#define CMD_CATEGORY_STREAM (1ULL<<27) -#define CMD_CATEGORY_PUBSUB (1ULL<<28) -#define CMD_CATEGORY_ADMIN (1ULL<<29) -#define CMD_CATEGORY_FAST (1ULL<<30) -#define CMD_CATEGORY_SLOW (1ULL<<31) -#define CMD_CATEGORY_BLOCKING (1ULL<<32) -#define CMD_CATEGORY_DANGEROUS (1ULL<<33) -#define CMD_CATEGORY_CONNECTION (1ULL<<34) -#define CMD_CATEGORY_TRANSACTION (1ULL<<35) -#define CMD_CATEGORY_SCRIPTING (1ULL<<36) +#define CMD_CATEGORY_KEYSPACE (1ULL<<17) +#define CMD_CATEGORY_READ (1ULL<<18) +#define CMD_CATEGORY_WRITE (1ULL<<19) +#define CMD_CATEGORY_SET (1ULL<<20) +#define CMD_CATEGORY_SORTEDSET (1ULL<<21) +#define CMD_CATEGORY_LIST (1ULL<<22) +#define CMD_CATEGORY_HASH (1ULL<<23) +#define CMD_CATEGORY_STRING (1ULL<<24) +#define CMD_CATEGORY_BITMAP (1ULL<<25) +#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<26) +#define CMD_CATEGORY_GEO (1ULL<<27) +#define CMD_CATEGORY_STREAM (1ULL<<28) +#define CMD_CATEGORY_PUBSUB (1ULL<<29) +#define CMD_CATEGORY_ADMIN (1ULL<<30) +#define CMD_CATEGORY_FAST (1ULL<<31) +#define CMD_CATEGORY_SLOW (1ULL<<32) +#define CMD_CATEGORY_BLOCKING (1ULL<<33) +#define CMD_CATEGORY_DANGEROUS (1ULL<<34) +#define CMD_CATEGORY_CONNECTION (1ULL<<35) +#define CMD_CATEGORY_TRANSACTION (1ULL<<36) +#define CMD_CATEGORY_SCRIPTING (1ULL<<37) /* AOF states */ #define AOF_OFF 0 /* AOF is off */ @@ -536,6 +538,10 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK) #define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS) +/* Bit flags for moduleTypeAuxSaveFunc */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + struct RedisModule; struct RedisModuleIO; struct RedisModuleDigest; @@ -548,6 +554,8 @@ struct redisObject; * is deleted. */ typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver); typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value); +typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when); +typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when); typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value); typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value); typedef size_t (*moduleTypeMemUsageFunc)(const void *value); @@ -564,6 +572,9 @@ typedef struct RedisModuleType { moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ } moduleType; @@ -837,7 +848,7 @@ typedef struct client { uint64_t flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ - int repl_put_online_on_ack; /* Install slave write handler on ACK. */ + int repl_put_online_on_ack; /* Install slave write handler on first ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ @@ -886,7 +897,7 @@ struct moduleLoadQueueEntry { struct sharedObjectsStruct { robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, - *colon, *queued, *null[4], *nullarray[4], + *colon, *queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4], *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr, *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, @@ -1319,6 +1330,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ + int tracking_table_max_fill; /* Max fill percentage. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1533,6 +1545,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) void moduleCallCommandFilters(client *c); void ModuleForkDoneHandler(int exitcode, int bysignal); void TerminateModuleForkChild(int wait); +ssize_t rdbSaveModulesAux(rio *rdb, int when); +int moduleAllDatatypesHandleErrors(); /* Utils */ long long ustime(void); @@ -1643,6 +1657,9 @@ void enableTracking(client *c, uint64_t redirect_to); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); +void trackingInvalidateKeysOnFlush(int dbid); +void trackingLimitUsedSlots(void); +unsigned long long trackingGetUsedSlots(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); @@ -2328,6 +2345,7 @@ void bugReportStart(void); void serverLogObjectDebugInfo(const robj *o); void sigsegvHandler(int sig, siginfo_t *info, void *secret); sds genRedisInfoString(char *section); +sds genModulesInfoString(sds info); void enableWatchdog(int period); void disableWatchdog(void); void watchdogScheduleSignal(int period); diff --git a/src/sha256.c b/src/sha256.c new file mode 100644 index 000000000..d644d2d4e --- /dev/null +++ b/src/sha256.c @@ -0,0 +1,158 @@ +/********************************************************************* +* Filename: sha256.c +* Author: Brad Conte (brad AT bradconte.com) +* Copyright: +* Disclaimer: This code is presented "as is" without any guarantees. +* Details: Implementation of the SHA-256 hashing algorithm. + SHA-256 is one of the three algorithms in the SHA2 + specification. The others, SHA-384 and SHA-512, are not + offered in this implementation. + Algorithm specification can be found here: + * http://csrc.nist.gov/publications/fips/fips180-2/fips180-2withchangenotice.pdf + This implementation uses little endian byte order. +*********************************************************************/ + +/*************************** HEADER FILES ***************************/ +#include <stdlib.h> +#include <string.h> +#include "sha256.h" + +/****************************** MACROS ******************************/ +#define ROTLEFT(a,b) (((a) << (b)) | ((a) >> (32-(b)))) +#define ROTRIGHT(a,b) (((a) >> (b)) | ((a) << (32-(b)))) + +#define CH(x,y,z) (((x) & (y)) ^ (~(x) & (z))) +#define MAJ(x,y,z) (((x) & (y)) ^ ((x) & (z)) ^ ((y) & (z))) +#define EP0(x) (ROTRIGHT(x,2) ^ ROTRIGHT(x,13) ^ ROTRIGHT(x,22)) +#define EP1(x) (ROTRIGHT(x,6) ^ ROTRIGHT(x,11) ^ ROTRIGHT(x,25)) +#define SIG0(x) (ROTRIGHT(x,7) ^ ROTRIGHT(x,18) ^ ((x) >> 3)) +#define SIG1(x) (ROTRIGHT(x,17) ^ ROTRIGHT(x,19) ^ ((x) >> 10)) + +/**************************** VARIABLES *****************************/ +static const WORD k[64] = { + 0x428a2f98,0x71374491,0xb5c0fbcf,0xe9b5dba5,0x3956c25b,0x59f111f1,0x923f82a4,0xab1c5ed5, + 0xd807aa98,0x12835b01,0x243185be,0x550c7dc3,0x72be5d74,0x80deb1fe,0x9bdc06a7,0xc19bf174, + 0xe49b69c1,0xefbe4786,0x0fc19dc6,0x240ca1cc,0x2de92c6f,0x4a7484aa,0x5cb0a9dc,0x76f988da, + 0x983e5152,0xa831c66d,0xb00327c8,0xbf597fc7,0xc6e00bf3,0xd5a79147,0x06ca6351,0x14292967, + 0x27b70a85,0x2e1b2138,0x4d2c6dfc,0x53380d13,0x650a7354,0x766a0abb,0x81c2c92e,0x92722c85, + 0xa2bfe8a1,0xa81a664b,0xc24b8b70,0xc76c51a3,0xd192e819,0xd6990624,0xf40e3585,0x106aa070, + 0x19a4c116,0x1e376c08,0x2748774c,0x34b0bcb5,0x391c0cb3,0x4ed8aa4a,0x5b9cca4f,0x682e6ff3, + 0x748f82ee,0x78a5636f,0x84c87814,0x8cc70208,0x90befffa,0xa4506ceb,0xbef9a3f7,0xc67178f2 +}; + +/*********************** FUNCTION DEFINITIONS ***********************/ +void sha256_transform(SHA256_CTX *ctx, const BYTE data[]) +{ + WORD a, b, c, d, e, f, g, h, i, j, t1, t2, m[64]; + + for (i = 0, j = 0; i < 16; ++i, j += 4) + m[i] = (data[j] << 24) | (data[j + 1] << 16) | (data[j + 2] << 8) | (data[j + 3]); + for ( ; i < 64; ++i) + m[i] = SIG1(m[i - 2]) + m[i - 7] + SIG0(m[i - 15]) + m[i - 16]; + + a = ctx->state[0]; + b = ctx->state[1]; + c = ctx->state[2]; + d = ctx->state[3]; + e = ctx->state[4]; + f = ctx->state[5]; + g = ctx->state[6]; + h = ctx->state[7]; + + for (i = 0; i < 64; ++i) { + t1 = h + EP1(e) + CH(e,f,g) + k[i] + m[i]; + t2 = EP0(a) + MAJ(a,b,c); + h = g; + g = f; + f = e; + e = d + t1; + d = c; + c = b; + b = a; + a = t1 + t2; + } + + ctx->state[0] += a; + ctx->state[1] += b; + ctx->state[2] += c; + ctx->state[3] += d; + ctx->state[4] += e; + ctx->state[5] += f; + ctx->state[6] += g; + ctx->state[7] += h; +} + +void sha256_init(SHA256_CTX *ctx) +{ + ctx->datalen = 0; + ctx->bitlen = 0; + ctx->state[0] = 0x6a09e667; + ctx->state[1] = 0xbb67ae85; + ctx->state[2] = 0x3c6ef372; + ctx->state[3] = 0xa54ff53a; + ctx->state[4] = 0x510e527f; + ctx->state[5] = 0x9b05688c; + ctx->state[6] = 0x1f83d9ab; + ctx->state[7] = 0x5be0cd19; +} + +void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len) +{ + WORD i; + + for (i = 0; i < len; ++i) { + ctx->data[ctx->datalen] = data[i]; + ctx->datalen++; + if (ctx->datalen == 64) { + sha256_transform(ctx, ctx->data); + ctx->bitlen += 512; + ctx->datalen = 0; + } + } +} + +void sha256_final(SHA256_CTX *ctx, BYTE hash[]) +{ + WORD i; + + i = ctx->datalen; + + // Pad whatever data is left in the buffer. + if (ctx->datalen < 56) { + ctx->data[i++] = 0x80; + while (i < 56) + ctx->data[i++] = 0x00; + } + else { + ctx->data[i++] = 0x80; + while (i < 64) + ctx->data[i++] = 0x00; + sha256_transform(ctx, ctx->data); + memset(ctx->data, 0, 56); + } + + // Append to the padding the total message's length in bits and transform. + ctx->bitlen += ctx->datalen * 8; + ctx->data[63] = ctx->bitlen; + ctx->data[62] = ctx->bitlen >> 8; + ctx->data[61] = ctx->bitlen >> 16; + ctx->data[60] = ctx->bitlen >> 24; + ctx->data[59] = ctx->bitlen >> 32; + ctx->data[58] = ctx->bitlen >> 40; + ctx->data[57] = ctx->bitlen >> 48; + ctx->data[56] = ctx->bitlen >> 56; + sha256_transform(ctx, ctx->data); + + // Since this implementation uses little endian byte ordering and SHA uses big endian, + // reverse all the bytes when copying the final state to the output hash. + for (i = 0; i < 4; ++i) { + hash[i] = (ctx->state[0] >> (24 - i * 8)) & 0x000000ff; + hash[i + 4] = (ctx->state[1] >> (24 - i * 8)) & 0x000000ff; + hash[i + 8] = (ctx->state[2] >> (24 - i * 8)) & 0x000000ff; + hash[i + 12] = (ctx->state[3] >> (24 - i * 8)) & 0x000000ff; + hash[i + 16] = (ctx->state[4] >> (24 - i * 8)) & 0x000000ff; + hash[i + 20] = (ctx->state[5] >> (24 - i * 8)) & 0x000000ff; + hash[i + 24] = (ctx->state[6] >> (24 - i * 8)) & 0x000000ff; + hash[i + 28] = (ctx->state[7] >> (24 - i * 8)) & 0x000000ff; + } +} diff --git a/src/sha256.h b/src/sha256.h new file mode 100644 index 000000000..dc53ead2b --- /dev/null +++ b/src/sha256.h @@ -0,0 +1,35 @@ +/********************************************************************* +* Filename: sha256.h +* Author: Brad Conte (brad AT bradconte.com) +* Copyright: +* Disclaimer: This code is presented "as is" without any guarantees. +* Details: Defines the API for the corresponding SHA1 implementation. +*********************************************************************/ + +#ifndef SHA256_H +#define SHA256_H + +/*************************** HEADER FILES ***************************/ +#include <stddef.h> +#include <stdint.h> + +/****************************** MACROS ******************************/ +#define SHA256_BLOCK_SIZE 32 // SHA256 outputs a 32 byte digest + +/**************************** DATA TYPES ****************************/ +typedef uint8_t BYTE; // 8-bit byte +typedef uint32_t WORD; // 32-bit word + +typedef struct { + BYTE data[64]; + WORD datalen; + unsigned long long bitlen; + WORD state[8]; +} SHA256_CTX; + +/*********************** FUNCTION DECLARATIONS **********************/ +void sha256_init(SHA256_CTX *ctx); +void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len); +void sha256_final(SHA256_CTX *ctx, BYTE hash[]); + +#endif // SHA256_H diff --git a/src/siphash.c b/src/siphash.c index 6b9419031..357741132 100644 --- a/src/siphash.c +++ b/src/siphash.c @@ -58,7 +58,8 @@ int siptlw(int c) { /* Test of the CPU is Little Endian and supports not aligned accesses. * Two interesting conditions to speedup the function that happen to be * in most of x86 servers. */ -#if defined(__X86_64__) || defined(__x86_64__) || defined (__i386__) +#if defined(__X86_64__) || defined(__x86_64__) || defined (__i386__) \ + || defined (__aarch64__) || defined (__arm64__) #define UNALIGNED_LE_CPU #endif diff --git a/src/stream.h b/src/stream.h index ef08753b5..8ae90ce77 100644 --- a/src/stream.h +++ b/src/stream.h @@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); +void streamFreeNACK(streamNACK *na); #endif diff --git a/src/t_hash.c b/src/t_hash.c index bc70e4051..e6ed33819 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -772,8 +772,8 @@ void genericHgetallCommand(client *c, int flags) { hashTypeIterator *hi; int length, count = 0; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL - || checkType(c,o,OBJ_HASH)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymap[c->resp])) + == NULL || checkType(c,o,OBJ_HASH)) return; /* We return a map if the user requested keys and values, like in the * HGETALL case. Otherwise to use a flat array makes more sense. */ diff --git a/src/t_list.c b/src/t_list.c index 54e4959b9..9bbd61de3 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -402,7 +402,7 @@ void lrangeCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL || checkType(c,o,OBJ_LIST)) return; llen = listTypeLength(o); @@ -414,7 +414,7 @@ void lrangeCommand(client *c) { /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - addReplyNull(c); + addReply(c,shared.emptyarray); return; } if (end >= llen) end = llen-1; @@ -606,7 +606,7 @@ void rpoplpushCommand(client *c) { * Blocking POP operations *----------------------------------------------------------------------------*/ -/* This is a helper function for handleClientsBlockedOnLists(). It's work +/* This is a helper function for handleClientsBlockedOnKeys(). It's work * is to serve a specific client (receiver) that is blocked on 'key' * in the context of the specified 'db', doing the following: * diff --git a/src/t_set.c b/src/t_set.c index 05d9ee243..abbf82fde 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -418,10 +418,10 @@ void spopWithCountCommand(client *c) { if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; - /* If count is zero, serve an empty multibulk ASAP to avoid special + /* If count is zero, serve an empty set ASAP to avoid special * cases later. */ if (count == 0) { - addReplyNull(c); + addReply(c,shared.emptyset[c->resp]); return; } @@ -632,13 +632,13 @@ void srandmemberWithCountCommand(client *c) { uniq = 0; } - if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) + if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyset[c->resp])) == NULL || checkType(c,set,OBJ_SET)) return; size = setTypeSize(set); /* If count is zero, serve it ASAP to avoid special cases later. */ if (count == 0) { - addReplyNull(c); + addReply(c,shared.emptyset[c->resp]); return; } @@ -813,7 +813,7 @@ void sinterGenericCommand(client *c, robj **setkeys, } addReply(c,shared.czero); } else { - addReplyNull(c); + addReply(c,shared.emptyset[c->resp]); } return; } diff --git a/src/t_zset.c b/src/t_zset.c index fb7078abd..ea6f4b848 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); - if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); - if (sdslen(ele) > server.zset_max_ziplist_value) + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || + sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; @@ -2427,7 +2426,7 @@ void zrangeGenericCommand(client *c, int reverse) { return; } - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; /* Sanitize indexes. */ @@ -2439,7 +2438,7 @@ void zrangeGenericCommand(client *c, int reverse) { /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - addReplyNull(c); + addReply(c,shared.emptyarray); return; } if (end >= llen) end = llen-1; @@ -2575,7 +2574,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) { } /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { @@ -2595,7 +2594,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) { /* No "first" element in the specified interval. */ if (eptr == NULL) { - addReplyNull(c); + addReply(c,shared.emptyarray); return; } @@ -2662,7 +2661,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) { /* No "first" element in the specified interval. */ if (ln == NULL) { - addReplyNull(c); + addReply(c,shared.emptyarray); return; } @@ -2920,7 +2919,7 @@ void genericZrangebylexCommand(client *c, int reverse) { } /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL || + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL || checkType(c,zobj,OBJ_ZSET)) { zslFreeLexRange(&range); @@ -2943,7 +2942,7 @@ void genericZrangebylexCommand(client *c, int reverse) { /* No "first" element in the specified interval. */ if (eptr == NULL) { - addReplyNull(c); + addReply(c,shared.emptyarray); zslFreeLexRange(&range); return; } @@ -3007,7 +3006,7 @@ void genericZrangebylexCommand(client *c, int reverse) { /* No "first" element in the specified interval. */ if (ln == NULL) { - addReplyNull(c); + addReply(c,shared.emptyarray); zslFreeLexRange(&range); return; } @@ -3161,7 +3160,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey /* No candidate for zpopping, return empty. */ if (!zobj) { - addReplyNull(c); + addReply(c,shared.emptyarray); return; } diff --git a/src/tracking.c b/src/tracking.c index bbfc66a72..f7f0fc755 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -60,6 +60,7 @@ * use the most significant bits instead of the full 24 bits. */ #define TRACKING_TABLE_SIZE (1<<24) rax **TrackingTable = NULL; +unsigned long TrackingTableUsedSlots = 0; robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much @@ -109,67 +110,187 @@ void trackingRememberKeys(client *c) { sds sdskey = c->argv[idx]->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) + if (TrackingTable[hash] == NULL) { TrackingTable[hash] = raxNew(); + TrackingTableUsedSlots++; + } raxTryInsert(TrackingTable[hash], (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); } getKeysFreeResult(keys); } -/* This function is called from signalModifiedKey() or other places in Redis - * when a key changes value. In the context of keys tracking, our task here is - * to send a notification to every client that may have keys about such . */ -void trackingInvalidateKey(robj *keyobj) { - sds sdskey = keyobj->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable == NULL || TrackingTable[hash] == NULL) return; +void sendTrackingMessage(client *c, long long hash) { + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + return; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } +} + +/* Invalidates a caching slot: this is actually the low level implementation + * of the API that Redis calls externally, that is trackingInvalidateKey(). */ +void trackingInvalidateSlot(uint64_t slot) { + if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; raxIterator ri; - raxStart(&ri,TrackingTable[hash]); + raxStart(&ri,TrackingTable[slot]); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); - if (c == NULL) continue; - int using_redirection = 0; - if (c->client_tracking_redirection) { - client *redir = lookupClientByID(c->client_tracking_redirection); - if (!redir) { - /* We need to signal to the original connection that we - * are unable to send invalidation messages to the redirected - * connection, because the client no longer exist. */ - if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); - } - continue; + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + sendTrackingMessage(c,slot); + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + raxFree(TrackingTable[slot]); + TrackingTable[slot] = NULL; + TrackingTableUsedSlots--; +} + +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such caching + * slot. */ +void trackingInvalidateKey(robj *keyobj) { + if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; + + sds sdskey = keyobj->ptr; + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + trackingInvalidateSlot(hash); +} + +/* This function is called when one or all the Redis databases are flushed + * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for + * each DB but are global: currently what we do is sending a special + * notification to clients with tracking enabled, invalidating the caching + * slot "-1", which means, "all the keys", in order to avoid flooding clients + * with many invalidation messages for all the keys they may hold. + * + * However trying to flush the tracking table here is very costly: + * we need scanning 16 million caching slots in the table to check + * if they are used, this introduces a big delay. So what we do is to really + * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead + * we just send the invalidation message to all the clients, but don't + * flush the table: it will slowly get garbage collected as more keys + * are modified in the used caching slots. */ +void trackingInvalidateKeysOnFlush(int dbid) { + if (server.tracking_clients) { + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + sendTrackingMessage(c,-1); + } + } + } + + /* In case of FLUSHALL, reclaim all the memory used by tracking. */ + if (dbid == -1 && TrackingTable) { + for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { + if (TrackingTable[j] != NULL) { + raxFree(TrackingTable[j]); + TrackingTable[j] = NULL; + TrackingTableUsedSlots--; } - c = redir; - using_redirection = 1; } - /* Only send such info for clients in RESP version 3 or more. However - * if redirection is active, and the connection we redirect to is - * in Pub/Sub mode, we can support the feature with RESP 2 as well, - * by sending Pub/Sub messages in the __redis__:invalidate channel. */ - if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); - } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); + /* If there are no clients with tracking enabled, we can even + * reclaim the memory used by the table itself. The code assumes + * the table is allocated only if there is at least one client alive + * with tracking enabled. */ + if (server.tracking_clients == 0) { + zfree(TrackingTable); + TrackingTable = NULL; } } - raxStop(&ri); +} - /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this hash slot. */ - raxFree(TrackingTable[hash]); - TrackingTable[hash] = NULL; +/* Tracking forces Redis to remember information about which client may have + * keys about certian caching slots. In workloads where there are a lot of + * reads, but keys are hardly modified, the amount of information we have + * to remember server side could be a lot: for each 16 millions of caching + * slots we may end with a radix tree containing many entries. + * + * So Redis allows the user to configure a maximum fill rate for the + * invalidation table. This function makes sure that we don't go over the + * specified fill rate: if we are over, we can just evict informations about + * random caching slots, and send invalidation messages to clients like if + * the key was modified. */ +void trackingLimitUsedSlots(void) { + static unsigned int timeout_counter = 0; + + if (server.tracking_table_max_fill == 0) return; /* No limits set. */ + unsigned int max_slots = + (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Limit not reached. */ + } + + /* We have to invalidate a few slots to reach the limit again. The effort + * we do here is proportional to the number of times we entered this + * function and found that we are still over the limit. */ + int effort = 100 * (timeout_counter+1); + + /* Let's start at a random position, and perform linear probing, in order + * to improve cache locality. However once we are able to find an used + * slot, jump again randomly, in order to avoid creating big holes in the + * table (that will make this funciton use more resourced later). */ + while(effort > 0) { + unsigned int idx = rand() % TRACKING_TABLE_SIZE; + do { + effort--; + idx = (idx+1) % TRACKING_TABLE_SIZE; + if (TrackingTable[idx] != NULL) { + trackingInvalidateSlot(idx); + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Return ASAP: we are again under the limit. */ + } else { + break; /* Jump to next random position. */ + } + } + } while(effort > 0); + } + timeout_counter++; +} + +/* This is just used in order to access the amount of used slots in the + * tracking table. */ +unsigned long long trackingGetUsedSlots(void) { + return TrackingTableUsedSlots; } diff --git a/src/zmalloc.c b/src/zmalloc.c index 5e6010278..fd8bb6938 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -294,6 +294,26 @@ size_t zmalloc_get_rss(void) { return t_info.resident_size; } +#elif defined(__FreeBSD__) +#include <sys/types.h> +#include <sys/sysctl.h> +#include <sys/user.h> +#include <unistd.h> + +size_t zmalloc_get_rss(void) { + struct kinfo_proc info; + size_t infolen = sizeof(info); + int mib[4]; + mib[0] = CTL_KERN; + mib[1] = KERN_PROC; + mib[2] = KERN_PROC_PID; + mib[3] = getpid(); + + if (sysctl(mib, 4, &info, &infolen, NULL, 0) == 0) + return (size_t)info.ki_rssize; + + return 0L; +} #else size_t zmalloc_get_rss(void) { /* If we can't get the RSS in an OS-specific way for this system just |