summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-09-27 11:24:06 +0200
committerGitHub <noreply@github.com>2019-09-27 11:24:06 +0200
commit61297585584f4e4bda3904d678ce001ca8e70758 (patch)
tree0e64cca1223e35b2ae3d29c9fcd8138b85b986cf /src
parent83e87bac762c84f2a2e9ee6922d038ee09ae9cd4 (diff)
parentfddc4757c8c585d384889c1c7efba1ccf2121e6b (diff)
downloadredis-61297585584f4e4bda3904d678ce001ca8e70758.tar.gz
Merge branch 'unstable' into modules_fork
Diffstat (limited to 'src')
-rw-r--r--src/Makefile2
-rw-r--r--src/acl.c40
-rw-r--r--src/aof.c10
-rw-r--r--src/blocked.c405
-rw-r--r--src/cluster.c22
-rw-r--r--src/config.c28
-rw-r--r--src/db.c10
-rw-r--r--src/debug.c32
-rw-r--r--src/expire.c2
-rw-r--r--src/hyperloglog.c6
-rw-r--r--src/latency.c4
-rw-r--r--src/lolwut.c3
-rw-r--r--src/lolwut5.c3
-rw-r--r--src/module.c186
-rw-r--r--src/multi.c14
-rw-r--r--src/networking.c34
-rw-r--r--src/object.c19
-rw-r--r--src/rdb.c264
-rw-r--r--src/rdb.h1
-rw-r--r--src/redis-benchmark.c15
-rw-r--r--src/redis-check-rdb.c6
-rw-r--r--src/redis-cli.c108
-rw-r--r--src/redismodule.h22
-rw-r--r--src/replication.c51
-rw-r--r--src/rio.c4
-rw-r--r--src/rio.h35
-rw-r--r--src/scripting.c254
-rw-r--r--src/server.c63
-rw-r--r--src/server.h72
-rw-r--r--src/sha256.c158
-rw-r--r--src/sha256.h35
-rw-r--r--src/siphash.c3
-rw-r--r--src/stream.h1
-rw-r--r--src/t_hash.c4
-rw-r--r--src/t_list.c6
-rw-r--r--src/t_set.c10
-rw-r--r--src/t_zset.c23
-rw-r--r--src/tracking.c207
-rw-r--r--src/zmalloc.c20
39 files changed, 1692 insertions, 490 deletions
diff --git a/src/Makefile b/src/Makefile
index b6cc69e2f..198d85cd5 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -164,7 +164,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o gopher.o tracking.o sha256.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/acl.c b/src/acl.c
index a2ee65dd0..2cd729e77 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -28,6 +28,7 @@
*/
#include "server.h"
+#include "sha256.h"
#include <fcntl.h>
/* =============================================================================
@@ -139,6 +140,25 @@ int time_independent_strcmp(char *a, char *b) {
return diff; /* If zero strings are the same. */
}
+/* Given an SDS string, returns the SHA256 hex representation as a
+ * new SDS string. */
+sds ACLHashPassword(unsigned char *cleartext, size_t len) {
+ SHA256_CTX ctx;
+ unsigned char hash[SHA256_BLOCK_SIZE];
+ char hex[SHA256_BLOCK_SIZE*2];
+ char *cset = "0123456789abcdef";
+
+ sha256_init(&ctx);
+ sha256_update(&ctx,(unsigned char*)cleartext,len);
+ sha256_final(&ctx,hash);
+
+ for (int j = 0; j < SHA256_BLOCK_SIZE; j++) {
+ hex[j*2] = cset[((hash[j]&0xF0)>>4)];
+ hex[j*2+1] = cset[(hash[j]&0xF)];
+ }
+ return sdsnewlen(hex,SHA256_BLOCK_SIZE*2);
+}
+
/* =============================================================================
* Low level ACL API
* ==========================================================================*/
@@ -701,13 +721,16 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
u->flags &= ~USER_FLAG_NOPASS;
listEmpty(u->passwords);
} else if (op[0] == '>') {
- sds newpass = sdsnewlen(op+1,oplen-1);
+ sds newpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
listNode *ln = listSearchKey(u->passwords,newpass);
/* Avoid re-adding the same password multiple times. */
- if (ln == NULL) listAddNodeTail(u->passwords,newpass);
+ if (ln == NULL)
+ listAddNodeTail(u->passwords,newpass);
+ else
+ sdsfree(newpass);
u->flags &= ~USER_FLAG_NOPASS;
} else if (op[0] == '<') {
- sds delpass = sdsnewlen(op+1,oplen-1);
+ sds delpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
listNode *ln = listSearchKey(u->passwords,delpass);
sdsfree(delpass);
if (ln) {
@@ -724,7 +747,10 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
sds newpat = sdsnewlen(op+1,oplen-1);
listNode *ln = listSearchKey(u->patterns,newpat);
/* Avoid re-adding the same pattern multiple times. */
- if (ln == NULL) listAddNodeTail(u->patterns,newpat);
+ if (ln == NULL)
+ listAddNodeTail(u->patterns,newpat);
+ else
+ sdsfree(newpat);
u->flags &= ~USER_FLAG_ALLKEYS;
} else if (op[0] == '+' && op[1] != '@') {
if (strchr(op,'|') == NULL) {
@@ -879,11 +905,15 @@ int ACLCheckUserCredentials(robj *username, robj *password) {
listIter li;
listNode *ln;
listRewind(u->passwords,&li);
+ sds hashed = ACLHashPassword(password->ptr,sdslen(password->ptr));
while((ln = listNext(&li))) {
sds thispass = listNodeValue(ln);
- if (!time_independent_strcmp(password->ptr, thispass))
+ if (!time_independent_strcmp(hashed, thispass)) {
+ sdsfree(hashed);
return C_OK;
+ }
}
+ sdsfree(hashed);
/* If we reached this point, no password matched. */
errno = EINVAL;
diff --git a/src/aof.c b/src/aof.c
index fc62d86ed..8bc6c543d 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
nwritten = write(fd, buf, len);
if (nwritten < 0) {
- if (errno == EINTR) {
- continue;
- }
+ if (errno == EINTR) continue;
return totwritten ? totwritten : -1;
}
@@ -863,6 +861,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
if (!feof(fp)) {
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
+ fclose(fp);
serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
exit(1);
}
@@ -893,11 +892,13 @@ uxeof: /* Unexpected AOF end of file. */
}
}
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
+ fclose(fp);
serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
exit(1);
fmterr: /* Format error. */
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
+ fclose(fp);
serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
@@ -1612,7 +1613,8 @@ void bgrewriteaofCommand(client *c) {
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
- addReply(c,shared.err);
+ addReplyError(c,"Can't execute an AOF background rewriting. "
+ "Please check the server logs for more information.");
}
}
diff --git a/src/blocked.c b/src/blocked.c
index 1db657869..867f03de6 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -229,6 +229,207 @@ void disconnectAllBlockedClients(void) {
}
}
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * when there may be clients blocked on a list key, and there may be new
+ * data to fetch (the key is ready). */
+void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (receiver->btype != BLOCKED_LIST) {
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+ continue;
+ }
+
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ LIST_HEAD : LIST_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClient()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClient(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == C_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) {
+ dbDelete(rl->db,rl->key);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
+ }
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+}
+
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * when there may be clients blocked on a sorted set key, and there may be new
+ * data to fetch (the key is ready). */
+void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+ unsigned long zcard = zsetLength(o);
+
+ while(numclients-- && zcard) {
+ listNode *clientnode = listFirst(clients);
+ client *receiver = clientnode->value;
+
+ if (receiver->btype != BLOCKED_ZSET) {
+ /* Put at the tail, so that at the next call
+ * we'll not run into it again. */
+ listDelNode(clients,clientnode);
+ listAddNodeTail(clients,receiver);
+ continue;
+ }
+
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == bzpopminCommand)
+ ? ZSET_MIN : ZSET_MAX;
+ unblockClient(receiver);
+ genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
+ zcard--;
+
+ /* Replicate the command. */
+ robj *argv[2];
+ struct redisCommand *cmd = where == ZSET_MIN ?
+ server.zpopminCommand :
+ server.zpopmaxCommand;
+ argv[0] = createStringObject(cmd->name,strlen(cmd->name));
+ argv[1] = rl->key;
+ incrRefCount(rl->key);
+ propagate(cmd,receiver->db->id,
+ argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
+ decrRefCount(argv[0]);
+ decrRefCount(argv[1]);
+ }
+ }
+}
+
+/* Helper function for handleClientsBlockedOnKeys(). This function is called
+ * when there may be clients blocked on a stream key, and there may be new
+ * data to fetch (the key is ready). */
+void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
+ dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
+ stream *s = o->ptr;
+
+ /* We need to provide the new data arrived on the stream
+ * to all the clients that are waiting for an offset smaller
+ * than the current top item. */
+ if (de) {
+ list *clients = dictGetVal(de);
+ listNode *ln;
+ listIter li;
+ listRewind(clients,&li);
+
+ while((ln = listNext(&li))) {
+ client *receiver = listNodeValue(ln);
+ if (receiver->btype != BLOCKED_STREAM) continue;
+ streamID *gt = dictFetchValue(receiver->bpop.keys,
+ rl->key);
+
+ /* If we blocked in the context of a consumer
+ * group, we need to resolve the group and update the
+ * last ID the client is blocked for: this is needed
+ * because serving other clients in the same consumer
+ * group will alter the "last ID" of the consumer
+ * group, and clients blocked in a consumer group are
+ * always blocked for the ">" ID: we need to deliver
+ * only new messages and avoid unblocking the client
+ * otherwise. */
+ streamCG *group = NULL;
+ if (receiver->bpop.xread_group) {
+ group = streamLookupCG(s,
+ receiver->bpop.xread_group->ptr);
+ /* If the group was not found, send an error
+ * to the consumer. */
+ if (!group) {
+ addReplyError(receiver,
+ "-NOGROUP the consumer group this client "
+ "was blocked on no longer exists");
+ unblockClient(receiver);
+ continue;
+ } else {
+ *gt = group->last_id;
+ }
+ }
+
+ if (streamCompareID(&s->last_id, gt) > 0) {
+ streamID start = *gt;
+ start.seq++; /* Can't overflow, it's an uint64_t */
+
+ /* Lookup the consumer for the group, if any. */
+ streamConsumer *consumer = NULL;
+ int noack = 0;
+
+ if (group) {
+ consumer = streamLookupConsumer(group,
+ receiver->bpop.xread_consumer->ptr,
+ 1);
+ noack = receiver->bpop.xread_group_noack;
+ }
+
+ /* Emit the two elements sub-array consisting of
+ * the name of the stream and the data we
+ * extracted from it. Wrapped in a single-item
+ * array, since we have just one key. */
+ if (receiver->resp == 2) {
+ addReplyArrayLen(receiver,1);
+ addReplyArrayLen(receiver,2);
+ } else {
+ addReplyMapLen(receiver,1);
+ }
+ addReplyBulk(receiver,rl->key);
+
+ streamPropInfo pi = {
+ rl->key,
+ receiver->bpop.xread_group
+ };
+ streamReplyWithRange(receiver,s,&start,NULL,
+ receiver->bpop.xread_count,
+ 0, group, consumer, noack, &pi);
+
+ /* Note that after we unblock the client, 'gt'
+ * and other receiver->bpop stuff are no longer
+ * valid, so we must do the setup above before
+ * this call. */
+ unblockClient(receiver);
+ }
+ }
+ }
+}
+
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@@ -271,202 +472,14 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
- if (o != NULL && o->type == OBJ_LIST) {
- dictEntry *de;
-
- /* We serve clients in the same order they blocked for
- * this key, from the first blocked to the last. */
- de = dictFind(rl->db->blocking_keys,rl->key);
- if (de) {
- list *clients = dictGetVal(de);
- int numclients = listLength(clients);
-
- while(numclients--) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
-
- if (receiver->btype != BLOCKED_LIST) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
- continue;
- }
-
- robj *dstkey = receiver->bpop.target;
- int where = (receiver->lastcmd &&
- receiver->lastcmd->proc == blpopCommand) ?
- LIST_HEAD : LIST_TAIL;
- robj *value = listTypePop(o,where);
-
- if (value) {
- /* Protect receiver->bpop.target, that will be
- * freed by the next unblockClient()
- * call. */
- if (dstkey) incrRefCount(dstkey);
- unblockClient(receiver);
-
- if (serveClientBlockedOnList(receiver,
- rl->key,dstkey,rl->db,value,
- where) == C_ERR)
- {
- /* If we failed serving the client we need
- * to also undo the POP operation. */
- listTypePush(o,value,where);
- }
-
- if (dstkey) decrRefCount(dstkey);
- decrRefCount(value);
- } else {
- break;
- }
- }
- }
-
- if (listTypeLength(o) == 0) {
- dbDelete(rl->db,rl->key);
- notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
- }
- /* We don't call signalModifiedKey() as it was already called
- * when an element was pushed on the list. */
- }
- /* Serve clients blocked on sorted set key. */
- else if (o != NULL && o->type == OBJ_ZSET) {
- dictEntry *de;
-
- /* We serve clients in the same order they blocked for
- * this key, from the first blocked to the last. */
- de = dictFind(rl->db->blocking_keys,rl->key);
- if (de) {
- list *clients = dictGetVal(de);
- int numclients = listLength(clients);
- unsigned long zcard = zsetLength(o);
-
- while(numclients-- && zcard) {
- listNode *clientnode = listFirst(clients);
- client *receiver = clientnode->value;
-
- if (receiver->btype != BLOCKED_ZSET) {
- /* Put at the tail, so that at the next call
- * we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
- continue;
- }
-
- int where = (receiver->lastcmd &&
- receiver->lastcmd->proc == bzpopminCommand)
- ? ZSET_MIN : ZSET_MAX;
- unblockClient(receiver);
- genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
- zcard--;
-
- /* Replicate the command. */
- robj *argv[2];
- struct redisCommand *cmd = where == ZSET_MIN ?
- server.zpopminCommand :
- server.zpopmaxCommand;
- argv[0] = createStringObject(cmd->name,strlen(cmd->name));
- argv[1] = rl->key;
- incrRefCount(rl->key);
- propagate(cmd,receiver->db->id,
- argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
- decrRefCount(argv[0]);
- decrRefCount(argv[1]);
- }
- }
- }
-
- /* Serve clients blocked on stream key. */
- else if (o != NULL && o->type == OBJ_STREAM) {
- dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
- stream *s = o->ptr;
-
- /* We need to provide the new data arrived on the stream
- * to all the clients that are waiting for an offset smaller
- * than the current top item. */
- if (de) {
- list *clients = dictGetVal(de);
- listNode *ln;
- listIter li;
- listRewind(clients,&li);
-
- while((ln = listNext(&li))) {
- client *receiver = listNodeValue(ln);
- if (receiver->btype != BLOCKED_STREAM) continue;
- streamID *gt = dictFetchValue(receiver->bpop.keys,
- rl->key);
-
- /* If we blocked in the context of a consumer
- * group, we need to resolve the group and update the
- * last ID the client is blocked for: this is needed
- * because serving other clients in the same consumer
- * group will alter the "last ID" of the consumer
- * group, and clients blocked in a consumer group are
- * always blocked for the ">" ID: we need to deliver
- * only new messages and avoid unblocking the client
- * otherwise. */
- streamCG *group = NULL;
- if (receiver->bpop.xread_group) {
- group = streamLookupCG(s,
- receiver->bpop.xread_group->ptr);
- /* If the group was not found, send an error
- * to the consumer. */
- if (!group) {
- addReplyError(receiver,
- "-NOGROUP the consumer group this client "
- "was blocked on no longer exists");
- unblockClient(receiver);
- continue;
- } else {
- *gt = group->last_id;
- }
- }
-
- if (streamCompareID(&s->last_id, gt) > 0) {
- streamID start = *gt;
- start.seq++; /* Can't overflow, it's an uint64_t */
-
- /* Lookup the consumer for the group, if any. */
- streamConsumer *consumer = NULL;
- int noack = 0;
-
- if (group) {
- consumer = streamLookupConsumer(group,
- receiver->bpop.xread_consumer->ptr,
- 1);
- noack = receiver->bpop.xread_group_noack;
- }
-
- /* Emit the two elements sub-array consisting of
- * the name of the stream and the data we
- * extracted from it. Wrapped in a single-item
- * array, since we have just one key. */
- if (receiver->resp == 2) {
- addReplyArrayLen(receiver,1);
- addReplyArrayLen(receiver,2);
- } else {
- addReplyMapLen(receiver,1);
- }
- addReplyBulk(receiver,rl->key);
-
- streamPropInfo pi = {
- rl->key,
- receiver->bpop.xread_group
- };
- streamReplyWithRange(receiver,s,&start,NULL,
- receiver->bpop.xread_count,
- 0, group, consumer, noack, &pi);
-
- /* Note that after we unblock the client, 'gt'
- * and other receiver->bpop stuff are no longer
- * valid, so we must do the setup above before
- * this call. */
- unblockClient(receiver);
- }
- }
- }
+ if (o != NULL) {
+ if (o->type == OBJ_LIST)
+ serveClientsBlockedOnListKey(o,rl);
+ else if (o->type == OBJ_ZSET)
+ serveClientsBlockedOnSortedSetKey(o,rl);
+ else if (o->type == OBJ_STREAM)
+ serveClientsBlockedOnStreamKey(o,rl);
}
/* Free this item. */
@@ -592,7 +605,7 @@ void unblockClientWaitingData(client *c) {
* the same key again and again in the list in case of multiple pushes
* made by a script or in the context of MULTI/EXEC.
*
- * The list will be finally processed by handleClientsBlockedOnLists() */
+ * The list will be finally processed by handleClientsBlockedOnKeys() */
void signalKeyAsReady(redisDb *db, robj *key) {
readyList *rl;
diff --git a/src/cluster.c b/src/cluster.c
index c85e3791d..1e7dcd50e 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -138,6 +138,7 @@ int clusterLoadConfig(char *filename) {
/* Handle the special "vars" line. Don't pretend it is the last
* line even if it actually is when generated by Redis. */
if (strcasecmp(argv[0],"vars") == 0) {
+ if (!(argc % 2)) goto fmterr;
for (j = 1; j < argc; j += 2) {
if (strcasecmp(argv[j],"currentEpoch") == 0) {
server.cluster->currentEpoch =
@@ -4251,12 +4252,9 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
/* CLUSTER NODES */
- robj *o;
- sds ci = clusterGenNodesDescription(0);
-
- o = createObject(OBJ_STRING,ci);
- addReplyBulk(c,o);
- decrRefCount(o);
+ sds nodes = clusterGenNodesDescription(0);
+ addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
+ sdsfree(nodes);
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
/* CLUSTER MYID */
addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
@@ -4498,10 +4496,8 @@ NULL
"cluster_stats_messages_received:%lld\r\n", tot_msg_received);
/* Produce the reply protocol. */
- addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
- (unsigned long)sdslen(info)));
- addReplySds(c,info);
- addReply(c,shared.crlf);
+ addReplyVerbatim(c,info,sdslen(info),"txt");
+ sdsfree(info);
} else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
int retval = clusterSaveConfig(1);
@@ -4832,7 +4828,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
* DUMP is actually not used by Redis Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
- robj *o, *dumpobj;
+ robj *o;
rio payload;
/* Check if the key is here. */
@@ -4845,9 +4841,7 @@ void dumpCommand(client *c) {
createDumpPayload(&payload,o,c->argv[1]);
/* Transfer to the client */
- dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
- addReplyBulk(c,dumpobj);
- decrRefCount(dumpobj);
+ addReplyBulkSds(c,payload.io.buffer.ptr);
return;
}
diff --git a/src/config.c b/src/config.c
index fde00ddf5..d37e3c566 100644
--- a/src/config.c
+++ b/src/config.c
@@ -672,6 +672,10 @@ void loadServerConfigFromString(char *config) {
server.lua_time_limit = strtoll(argv[1],NULL,10);
} else if (!strcasecmp(argv[0],"lua-replicate-commands") && argc == 2) {
server.lua_always_replicate_commands = yesnotoi(argv[1]);
+ if (server.lua_always_replicate_commands == -1) {
+ err = "argument must be 'yes' or 'no'";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"slowlog-log-slower-than") &&
argc == 2)
{
@@ -686,6 +690,17 @@ void loadServerConfigFromString(char *config) {
}
} else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) {
server.slowlog_max_len = strtoll(argv[1],NULL,10);
+ } else if (!strcasecmp(argv[0],"tracking-table-max-fill") &&
+ argc == 2)
+ {
+ server.tracking_table_max_fill = strtoll(argv[1],NULL,10);
+ if (server.tracking_table_max_fill > 100 ||
+ server.tracking_table_max_fill < 0)
+ {
+ err = "The tracking table fill percentage must be an "
+ "integer between 0 and 100";
+ goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"client-output-buffer-limit") &&
argc == 5)
{
@@ -1134,6 +1149,8 @@ void configSetCommand(client *c) {
/* Cast to unsigned. */
server.slowlog_max_len = (unsigned long)ll;
} config_set_numerical_field(
+ "tracking-table-max-fill",server.tracking_table_max_fill,0,100) {
+ } config_set_numerical_field(
"latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){
} config_set_numerical_field(
"repl-ping-slave-period",server.repl_ping_slave_period,1,INT_MAX) {
@@ -1338,8 +1355,8 @@ void configGetCommand(client *c) {
server.slowlog_log_slower_than);
config_get_numerical_field("latency-monitor-threshold",
server.latency_monitor_threshold);
- config_get_numerical_field("slowlog-max-len",
- server.slowlog_max_len);
+ config_get_numerical_field("slowlog-max-len", server.slowlog_max_len);
+ config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill);
config_get_numerical_field("port",server.port);
config_get_numerical_field("cluster-announce-port",server.cluster_announce_port);
config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port);
@@ -1470,12 +1487,10 @@ void configGetCommand(client *c) {
matches++;
}
if (stringmatch(pattern,"notify-keyspace-events",1)) {
- robj *flagsobj = createObject(OBJ_STRING,
- keyspaceEventsFlagsToString(server.notify_keyspace_events));
+ sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events);
addReplyBulkCString(c,"notify-keyspace-events");
- addReplyBulk(c,flagsobj);
- decrRefCount(flagsobj);
+ addReplyBulkSds(c,flags);
matches++;
}
if (stringmatch(pattern,"bind",1)) {
@@ -2167,6 +2182,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN);
rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD);
rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN);
+ rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL);
rewriteConfigNotifykeyspaceeventsOption(state);
rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES);
rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE);
diff --git a/src/db.c b/src/db.c
index 4a489036a..a46e0251f 100644
--- a/src/db.c
+++ b/src/db.c
@@ -350,6 +350,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
return -1;
}
+ /* Make sure the WATCHed keys are affected by the FLUSH* commands.
+ * Note that we need to call the function while the keys are still
+ * there. */
+ signalFlushedDb(dbnum);
+
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
@@ -409,11 +414,12 @@ long long dbTotalServerKeyCount() {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
- if (server.tracking_clients) trackingInvalidateKey(key);
+ trackingInvalidateKey(key);
}
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
+ trackingInvalidateKeysOnFlush(dbid);
}
/*-----------------------------------------------------------------------------
@@ -449,7 +455,6 @@ void flushdbCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- signalFlushedDb(c->db->id);
server.dirty += emptyDb(c->db->id,flags,NULL);
addReply(c,shared.ok);
}
@@ -461,7 +466,6 @@ void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
- signalFlushedDb(-1);
server.dirty += emptyDb(-1,flags,NULL);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) killRDBChild();
diff --git a/src/debug.c b/src/debug.c
index 1f1157d4a..0d29165de 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -638,7 +638,8 @@ NULL
dictGetStats(buf,sizeof(buf),server.db[dbid].expires);
stats = sdscat(stats,buf);
- addReplyBulkSds(c,stats);
+ addReplyVerbatim(c,stats,sdslen(stats),"txt");
+ sdsfree(stats);
} else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) {
robj *o;
dict *ht = NULL;
@@ -665,7 +666,7 @@ NULL
} else {
char buf[4096];
dictGetStats(buf,sizeof(buf),ht);
- addReplyBulkCString(c,buf);
+ addReplyVerbatim(c,buf,strlen(buf),"txt");
}
} else if (!strcasecmp(c->argv[1]->ptr,"change-repl-id") && c->argc == 2) {
serverLog(LL_WARNING,"Changing replication IDs after receiving DEBUG change-repl-id");
@@ -1110,6 +1111,33 @@ void logRegisters(ucontext_t *uc) {
(unsigned long) uc->uc_mcontext.mc_cs
);
logStackContent((void**)uc->uc_mcontext.mc_rsp);
+#elif defined(__aarch64__) /* Linux AArch64 */
+ serverLog(LL_WARNING,
+ "\n"
+ "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n"
+ "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n"
+ "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n"
+ "X30:%016lx\n"
+ "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n",
+ (unsigned long) uc->uc_mcontext.regs[18],
+ (unsigned long) uc->uc_mcontext.regs[19],
+ (unsigned long) uc->uc_mcontext.regs[20],
+ (unsigned long) uc->uc_mcontext.regs[21],
+ (unsigned long) uc->uc_mcontext.regs[22],
+ (unsigned long) uc->uc_mcontext.regs[23],
+ (unsigned long) uc->uc_mcontext.regs[24],
+ (unsigned long) uc->uc_mcontext.regs[25],
+ (unsigned long) uc->uc_mcontext.regs[26],
+ (unsigned long) uc->uc_mcontext.regs[27],
+ (unsigned long) uc->uc_mcontext.regs[28],
+ (unsigned long) uc->uc_mcontext.regs[29],
+ (unsigned long) uc->uc_mcontext.regs[30],
+ (unsigned long) uc->uc_mcontext.pc,
+ (unsigned long) uc->uc_mcontext.sp,
+ (unsigned long) uc->uc_mcontext.pstate,
+ (unsigned long) uc->uc_mcontext.fault_address
+ );
+ logStackContent((void**)uc->uc_mcontext.sp);
#else
serverLog(LL_WARNING,
" Dumping of registers not supported for this OS/arch");
diff --git a/src/expire.c b/src/expire.c
index b23117a3c..598b27f96 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
- if (server.tracking_clients) trackingInvalidateKey(keyobj);
+ trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index e01ea6042..a44d15646 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -700,7 +700,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) {
p += oplen;
first += span;
}
- if (span == 0) return -1; /* Invalid format. */
+ if (span == 0 || p >= end) return -1; /* Invalid format. */
next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1;
if (next >= end) next = NULL;
@@ -1242,7 +1242,7 @@ void pfcountCommand(client *c) {
if (o == NULL) continue; /* Assume empty HLL for non existing var.*/
if (isHLLObjectOrReply(c,o) != C_OK) return;
- /* Merge with this HLL with our 'max' HHL by setting max[i]
+ /* Merge with this HLL with our 'max' HLL by setting max[i]
* to MAX(max[i],hll[i]). */
if (hllMerge(registers,o) == C_ERR) {
addReplySds(c,sdsnew(invalid_hll_err));
@@ -1329,7 +1329,7 @@ void pfmergeCommand(client *c) {
hdr = o->ptr;
if (hdr->encoding == HLL_DENSE) use_dense = 1;
- /* Merge with this HLL with our 'max' HHL by setting max[i]
+ /* Merge with this HLL with our 'max' HLL by setting max[i]
* to MAX(max[i],hll[i]). */
if (hllMerge(max,o) == C_ERR) {
addReplySds(c,sdsnew(invalid_hll_err));
diff --git a/src/latency.c b/src/latency.c
index 33aa1245b..b834da5c7 100644
--- a/src/latency.c
+++ b/src/latency.c
@@ -599,7 +599,7 @@ NULL
event = dictGetKey(de);
graph = latencyCommandGenSparkeline(event,ts);
- addReplyBulkCString(c,graph);
+ addReplyVerbatim(c,graph,sdslen(graph),"txt");
sdsfree(graph);
} else if (!strcasecmp(c->argv[1]->ptr,"latest") && c->argc == 2) {
/* LATENCY LATEST */
@@ -608,7 +608,7 @@ NULL
/* LATENCY DOCTOR */
sds report = createLatencyReport();
- addReplyBulkCBuffer(c,report,sdslen(report));
+ addReplyVerbatim(c,report,sdslen(report),"txt");
sdsfree(report);
} else if (!strcasecmp(c->argv[1]->ptr,"reset") && c->argc >= 2) {
/* LATENCY RESET */
diff --git a/src/lolwut.c b/src/lolwut.c
index 19cbcf642..ba7e1069e 100644
--- a/src/lolwut.c
+++ b/src/lolwut.c
@@ -43,7 +43,8 @@ void lolwutUnstableCommand(client *c) {
sds rendered = sdsnew("Redis ver. ");
rendered = sdscat(rendered,REDIS_VERSION);
rendered = sdscatlen(rendered,"\n",1);
- addReplyBulkSds(c,rendered);
+ addReplyVerbatim(c,rendered,sdslen(rendered),"txt");
+ sdsfree(rendered);
}
void lolwutCommand(client *c) {
diff --git a/src/lolwut5.c b/src/lolwut5.c
index 8408b378d..52a98c0d7 100644
--- a/src/lolwut5.c
+++ b/src/lolwut5.c
@@ -277,6 +277,7 @@ void lolwut5Command(client *c) {
"\nGeorg Nees - schotter, plotter on paper, 1968. Redis ver. ");
rendered = sdscat(rendered,REDIS_VERSION);
rendered = sdscatlen(rendered,"\n",1);
- addReplyBulkSds(c,rendered);
+ addReplyVerbatim(c,rendered,sdslen(rendered),"txt");
+ sdsfree(rendered);
lwFreeCanvas(canvas);
}
diff --git a/src/module.c b/src/module.c
index 6f3be61af..854989e73 100644
--- a/src/module.c
+++ b/src/module.c
@@ -29,6 +29,7 @@
#include "server.h"
#include "cluster.h"
+#include "rdb.h"
#include <dlfcn.h>
#include <wait.h>
@@ -52,6 +53,7 @@ struct RedisModule {
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
int in_call; /* RM_Call() nesting level */
+ int options; /* Module options and capabilities. */
};
typedef struct RedisModule RedisModule;
@@ -780,6 +782,19 @@ long long RM_Milliseconds(void) {
return mstime();
}
+/* Set flags defining capabilities or behavior bit flags.
+ *
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
+ * Generally, modules don't need to bother with this, as the process will just
+ * terminate if a read error happens, however, setting this flag would allow
+ * repl-diskless-load to work if enabled.
+ * The module should use RedisModule_IsIOError after reads, before using the
+ * data that was read, and in case of error, propagate it upwards, and also be
+ * able to release the partially populated value and all it's allocations. */
+void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
+ ctx->module->options = options;
+}
+
/* --------------------------------------------------------------------------
* Automatic memory management for modules
* -------------------------------------------------------------------------- */
@@ -2397,7 +2412,7 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) {
*
* REDISMODULE_HASH_EXISTS: instead of setting the value of the field
* expecting a RedisModuleString pointer to pointer, the function just
- * reports if the field esists or not and expects an integer pointer
+ * reports if the field exists or not and expects an integer pointer
* as the second element of each pair.
*
* Example of REDISMODULE_HASH_CFIELD:
@@ -3087,6 +3102,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ struct {
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
+ } v2;
} *tms = (struct typemethods*) typemethods_ptr;
moduleType *mt = zcalloc(sizeof(*mt));
@@ -3098,6 +3118,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->mem_usage = tms->mem_usage;
mt->digest = tms->digest;
mt->free = tms->free;
+ if (tms->version >= 2) {
+ mt->aux_load = tms->v2.aux_load;
+ mt->aux_save = tms->v2.aux_save;
+ mt->aux_save_triggers = tms->v2.aux_save_triggers;
+ }
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
return mt;
@@ -3148,9 +3173,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
* RDB loading and saving functions
* -------------------------------------------------------------------------- */
-/* Called when there is a load error in the context of a module. This cannot
- * be recovered like for the built-in types. */
+/* Called when there is a load error in the context of a module. On some
+ * modules this cannot be recovered, but if the module declared capability
+ * to handle errors, we'll raise a flag rather than exiting. */
void moduleRDBLoadError(RedisModuleIO *io) {
+ if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) {
+ io->error = 1;
+ return;
+ }
serverLog(LL_WARNING,
"Error loading data from RDB (short read or EOF). "
"Read performed by module '%s' about type '%s' "
@@ -3161,6 +3191,33 @@ void moduleRDBLoadError(RedisModuleIO *io) {
exit(1);
}
+/* Returns 0 if there's at least one registered data type that did not declare
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
+ * be avoided since it could cause data loss. */
+int moduleAllDatatypesHandleErrors() {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (listLength(module->types) &&
+ !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS))
+ {
+ dictReleaseIterator(di);
+ return 0;
+ }
+ }
+ dictReleaseIterator(di);
+ return 1;
+}
+
+/* Returns true if any previous IO API failed.
+ * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
+ * RediModule_SetModuleOptions first. */
+int RM_IsIOError(RedisModuleIO *io) {
+ return io->error;
+}
+
/* Save an unsigned 64 bit value into the RDB file. This function should only
* be called in the context of the rdb_save method of modules implementing new
* data types. */
@@ -3184,6 +3241,7 @@ saveerr:
* be called in the context of the rdb_load method of modules implementing
* new data types. */
uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
@@ -3195,7 +3253,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
@@ -3254,6 +3312,7 @@ saveerr:
/* Implements RM_LoadString() and RM_LoadStringBuffer() */
void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
+ if (io->error) return NULL;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
@@ -3265,7 +3324,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
loaderr:
moduleRDBLoadError(io);
- return NULL; /* Never reached. */
+ return NULL;
}
/* In the context of the rdb_load method of a module data type, loads a string
@@ -3286,7 +3345,7 @@ RedisModuleString *RM_LoadString(RedisModuleIO *io) {
* RedisModule_Realloc() or RedisModule_Free().
*
* The size of the string is stored at '*lenptr' if not NULL.
- * The returned string is not automatically NULL termianted, it is loaded
+ * The returned string is not automatically NULL terminated, it is loaded
* exactly as it was stored inisde the RDB file. */
char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
return moduleLoadString(io,1,lenptr);
@@ -3314,6 +3373,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* double value saved by RedisModule_SaveDouble(). */
double RM_LoadDouble(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
@@ -3325,7 +3385,7 @@ double RM_LoadDouble(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* In the context of the rdb_save method of a module data type, saves a float
@@ -3350,6 +3410,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* float value saved by RedisModule_SaveFloat(). */
float RM_LoadFloat(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
@@ -3361,7 +3422,37 @@ float RM_LoadFloat(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
+}
+
+/* Iterate over modules, and trigger rdb aux saving for the ones modules types
+ * who asked for it. */
+ssize_t rdbSaveModulesAux(rio *rdb, int when) {
+ size_t total_written = 0;
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ listIter li;
+ listNode *ln;
+
+ listRewind(module->types,&li);
+ while((ln = listNext(&li))) {
+ moduleType *mt = ln->value;
+ if (!mt->aux_save || !(mt->aux_save_triggers & when))
+ continue;
+ ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
+ if (ret==-1) {
+ dictReleaseIterator(di);
+ return -1;
+ }
+ total_written += ret;
+ }
+ }
+
+ dictReleaseIterator(di);
+ return total_written;
}
/* --------------------------------------------------------------------------
@@ -3524,7 +3615,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
if (level < server.verbosity) return;
- name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
+ name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module");
vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
serverLogRaw(level,msg);
}
@@ -3542,13 +3633,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
* There is a fixed limit to the length of the log line this function is able
* to emit, this limit is not specified but is guaranteed to be more than
* a few lines of text.
+ *
+ * The ctx argument may be NULL if cannot be provided in the context of the
+ * caller for instance threads or callbacks, in which case a generic "module"
+ * will be used instead of the module name.
*/
void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
- if (!ctx->module) return; /* Can only log if module is initialized */
-
va_list ap;
va_start(ap, fmt);
- RM_LogRaw(ctx->module,levelstr,fmt,ap);
+ RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap);
va_end(ap);
}
@@ -3564,6 +3657,15 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...
va_end(ap);
}
+/* Redis-like assert function.
+ *
+ * A failed assertion will shut down the server and produce logging information
+ * that looks identical to information generated by Redis itself.
+ */
+void RM__Assert(const char *estr, const char *file, int line) {
+ _serverAssert(estr, file, line);
+}
+
/* --------------------------------------------------------------------------
* Blocking clients from modules
* -------------------------------------------------------------------------- */
@@ -5362,6 +5464,62 @@ void addReplyLoadedModules(client *c) {
dictReleaseIterator(di);
}
+/* Helper for genModulesInfoString(): given a list of modules, return
+ * am SDS string in the form "[modulename|modulename2|...]" */
+sds genModulesInfoStringRenderModulesList(list *l) {
+ listIter li;
+ listNode *ln;
+ listRewind(l,&li);
+ sds output = sdsnew("[");
+ while((ln = listNext(&li))) {
+ RedisModule *module = ln->value;
+ output = sdscat(output,module->name);
+ }
+ output = sdstrim(output,"|");
+ output = sdscat(output,"]");
+ return output;
+}
+
+/* Helper for genModulesInfoString(): render module options as an SDS string. */
+sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) {
+ sds output = sdsnew("[");
+ if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)
+ output = sdscat(output,"handle-io-errors|");
+ output = sdstrim(output,"|");
+ output = sdscat(output,"]");
+ return output;
+}
+
+
+/* Helper function for the INFO command: adds loaded modules as to info's
+ * output.
+ *
+ * After the call, the passed sds info string is no longer valid and all the
+ * references must be substituted with the new pointer returned by the call. */
+sds genModulesInfoString(sds info) {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ sds name = dictGetKey(de);
+ struct RedisModule *module = dictGetVal(de);
+
+ sds usedby = genModulesInfoStringRenderModulesList(module->usedby);
+ sds using = genModulesInfoStringRenderModulesList(module->using);
+ sds options = genModulesInfoStringRenderModuleOptions(module);
+ info = sdscatprintf(info,
+ "module:name=%s,ver=%d,api=%d,filters=%d,"
+ "usedby=%s,using=%s,options=%s\r\n",
+ name, module->ver, module->apiver,
+ (int)listLength(module->filters), usedby, using, options);
+ sdsfree(usedby);
+ sdsfree(using);
+ sdsfree(options);
+ }
+ dictReleaseIterator(di);
+ return info;
+}
+
/* Redis MODULE command.
*
* MODULE LOAD <path> [args...] */
@@ -5447,6 +5605,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ReplySetArrayLength);
REGISTER_API(ReplyWithString);
REGISTER_API(ReplyWithStringBuffer);
+ REGISTER_API(ReplyWithCString);
REGISTER_API(ReplyWithNull);
REGISTER_API(ReplyWithCallReply);
REGISTER_API(ReplyWithDouble);
@@ -5509,6 +5668,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ModuleTypeSetValue);
REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue);
+ REGISTER_API(IsIOError);
+ REGISTER_API(SetModuleOptions);
REGISTER_API(SaveUnsigned);
REGISTER_API(LoadUnsigned);
REGISTER_API(SaveSigned);
@@ -5524,6 +5685,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(EmitAOF);
REGISTER_API(Log);
REGISTER_API(LogIOError);
+ REGISTER_API(_Assert);
REGISTER_API(StringAppendBuffer);
REGISTER_API(RetainString);
REGISTER_API(StringCompare);
diff --git a/src/multi.c b/src/multi.c
index 71090d8ed..f885fa19c 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -175,7 +175,19 @@ void execCommand(client *c) {
must_propagate = 1;
}
- call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
+ int acl_retval = ACLCheckCommandPerm(c);
+ if (acl_retval != ACL_OK) {
+ addReplyErrorFormat(c,
+ "-NOPERM ACLs rules changed between the moment the "
+ "transaction was accumulated and the EXEC call. "
+ "This command is no longer allowed for the "
+ "following reason: %s",
+ (acl_retval == ACL_DENIED_CMD) ?
+ "no permission to execute the command or subcommand" :
+ "no permission to touch the specified keys");
+ } else {
+ call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
+ }
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
diff --git a/src/networking.c b/src/networking.c
index 7976caf29..a959d557a 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1990,7 +1990,7 @@ NULL
return;
}
sds o = getAllClientsInfoString(type);
- addReplyBulkCBuffer(c,o,sdslen(o));
+ addReplyVerbatim(c,o,sdslen(o),"txt");
sdsfree(o);
} else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
/* CLIENT REPLY ON|OFF|SKIP */
@@ -2468,17 +2468,27 @@ void flushSlavesOutputBuffers(void) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
- int events;
-
- /* Note that the following will not flush output buffers of slaves
- * in STATE_ONLINE but having put_online_on_ack set to true: in this
- * case the writable event is never installed, since the purpose
- * of put_online_on_ack is to postpone the moment it is installed.
- * This is what we want since slaves in this state should not receive
- * writes before the first ACK. */
- events = aeGetFileEvents(server.el,slave->fd);
- if (events & AE_WRITABLE &&
- slave->replstate == SLAVE_STATE_ONLINE &&
+ int events = aeGetFileEvents(server.el,slave->fd);
+ int can_receive_writes = (events & AE_WRITABLE) ||
+ (slave->flags & CLIENT_PENDING_WRITE);
+
+ /* We don't want to send the pending data to the replica in a few
+ * cases:
+ *
+ * 1. For some reason there is neither the write handler installed
+ * nor the client is flagged as to have pending writes: for some
+ * reason this replica may not be set to receive data. This is
+ * just for the sake of defensive programming.
+ *
+ * 2. The put_online_on_ack flag is true. To know why we don't want
+ * to send data to the replica in this case, please grep for the
+ * flag for this flag.
+ *
+ * 3. Obviously if the slave is not ONLINE.
+ */
+ if (slave->replstate == SLAVE_STATE_ONLINE &&
+ can_receive_writes &&
+ !slave->repl_put_online_on_ack &&
clientHasPendingReplies(slave))
{
writeToClient(slave->fd,slave,0);
diff --git a/src/object.c b/src/object.c
index 10209a6c8..697429b84 100644
--- a/src/object.c
+++ b/src/object.c
@@ -467,10 +467,15 @@ robj *tryObjectEncoding(robj *o) {
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
- if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr);
- o->encoding = OBJ_ENCODING_INT;
- o->ptr = (void*) value;
- return o;
+ if (o->encoding == OBJ_ENCODING_RAW) {
+ sdsfree(o->ptr);
+ o->encoding = OBJ_ENCODING_INT;
+ o->ptr = (void*) value;
+ return o;
+ } else if (o->encoding == OBJ_ENCODING_EMBSTR) {
+ decrRefCount(o);
+ return createStringObjectFromLongLongForValue(value);
+ }
}
}
@@ -1435,13 +1440,15 @@ NULL
#if defined(USE_JEMALLOC)
sds info = sdsempty();
je_malloc_stats_print(inputCatSds, &info, NULL);
- addReplyBulkSds(c, info);
+ addReplyVerbatim(c,info,sdslen(info),"txt");
+ sdsfree(info);
#else
addReplyBulkCString(c,"Stats not supported for the current allocator");
#endif
} else if (!strcasecmp(c->argv[1]->ptr,"doctor") && c->argc == 2) {
sds report = getMemoryDoctorReport();
- addReplyBulkSds(c,report);
+ addReplyVerbatim(c,report,sdslen(report),"txt");
+ sdsfree(report);
} else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) {
#if defined(USE_JEMALLOC)
char tmp[32];
diff --git a/src/rdb.c b/src/rdb.c
index 0c3a80d01..d9164b21c 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -42,31 +42,35 @@
#include <sys/stat.h>
#include <sys/param.h>
-#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
+/* This macro is called when the internal RDB stracture is corrupt */
+#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__)
+/* This macro is called when RDB read failed (possibly a short read) */
+#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
-void rdbCheckThenExit(int linenum, char *reason, ...) {
+void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
va_list ap;
char msg[1024];
int len;
len = snprintf(msg,sizeof(msg),
- "Internal error in RDB reading function at rdb.c:%d -> ", linenum);
+ "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ",
+ (unsigned long long)server.loading_loaded_bytes, linenum);
va_start(ap,reason);
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
if (!rdbCheckMode) {
- serverLog(LL_WARNING, "%s", msg);
- if (rdbFileBeingLoaded) {
+ if (rdbFileBeingLoaded || corruption_error) {
+ serverLog(LL_WARNING, "%s", msg);
char *argv[2] = {"",rdbFileBeingLoaded};
redis_check_rdb_main(2,argv,NULL);
} else {
- serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
+ serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg);
return;
}
} else {
@@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
return len;
}
-/* This is just a wrapper for the low level function rioRead() that will
- * automatically abort if it is not possible to read the specified amount
- * of bytes. */
-void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
- if (rioRead(rdb,buf,len) == 0) {
- rdbExitReportCorruptRDB(
- "Impossible to read %llu bytes in rdbLoadRaw()",
- (unsigned long long) len);
- return; /* Not reached. */
- }
-}
-
int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1);
}
@@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) {
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
- * opcode. */
+ * opcode. On error -1 is returned, however this could be a valid time, so
+ * to check for loading errors the caller should call rioGetReadError() after
+ * calling this function. */
time_t rdbLoadTime(rio *rdb) {
int32_t t32;
- rdbLoadRaw(rdb,&t32,4);
+ if (rioRead(rdb,&t32,4) == 0) return -1;
return (time_t)t32;
}
@@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
* after upgrading to Redis version 5 they will no longer be able to load their
* own old RDB files. Because of that, we instead fix the function only for new
* RDB versions, and load older RDB versions as we used to do in the past,
- * allowing big endian systems to load their own old RDB files. */
+ * allowing big endian systems to load their own old RDB files.
+ *
+ * On I/O error the function returns LLONG_MAX, however if this is also a
+ * valid stored value, the caller should use rioGetReadError() to check for
+ * errors after calling this function. */
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
int64_t t64;
- rdbLoadRaw(rdb,&t64,8);
+ if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
if (rdbver >= 9) /* Check the top comment of this function. */
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
return (long long)t64;
@@ -262,7 +260,7 @@ int rdbEncodeInteger(long long value, unsigned char *enc) {
/* Loads an integer-encoded object with the specified encoding type "enctype".
* The returned value changes according to the flags, see
- * rdbGenerincLoadStringObject() for more info. */
+ * rdbGenericLoadStringObject() for more info. */
void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
@@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
- val = 0; /* anti-warning */
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
+ return NULL; /* Never reached. */
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
@@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
/* Load the compressed representation and uncompress it to target. */
if (rioRead(rdb,c,clen) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) {
- if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
- goto err;
+ rdbExitReportCorruptRDB("Invalid LZF compressed string");
}
zfree(c);
@@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
return rdbLoadLzfStringObject(rdb,flags,lenptr);
default:
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
+ return NULL; /* Never reached. */
}
}
@@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,key);
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
- if (retval == -1) return -1;
- io.bytes += retval;
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
@@ -1103,6 +1103,45 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
+ /* Save a module-specific aux value. */
+ RedisModuleIO io;
+ int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
+
+ /* Write the "module" identifier as prefix, so that we'll be able
+ * to call the right module during loading. */
+ retval = rdbSaveLen(rdb,mt->id);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* write the 'when' so that we can provide it on loading. add a UINT opcode
+ * for backwards compatibility, everything after the MT needs to be prefixed
+ * by an opcode. */
+ retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+ retval = rdbSaveLen(rdb,when);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,NULL);
+ mt->aux_save(&io,when);
+ retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
+
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ if (io.error)
+ return -1;
+ return io.bytes;
+}
+
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@@ -1124,6 +1163,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -1185,6 +1225,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
di = NULL; /* So that we don't release it again on error. */
}
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
+
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@@ -1628,6 +1670,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
default:
+ /* totally unreachable */
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
@@ -1635,6 +1678,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
+ if (listpacks == RDB_LENERR) {
+ rdbReportReadError("Stream listpacks len loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(listpacks--) {
/* Get the master ID, the one we'll use as key of the radix tree
@@ -1642,7 +1690,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* relatively to this ID. */
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (nodekey == NULL) {
- rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
+ rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error.");
+ decrRefCount(o);
+ return NULL;
}
if (sdslen(nodekey) != sizeof(streamID)) {
rdbExitReportCorruptRDB("Stream node key entry is not the "
@@ -1652,7 +1702,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Load the listpack. */
unsigned char *lp =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
- if (lp == NULL) return NULL;
+ if (lp == NULL) {
+ rdbReportReadError("Stream listpacks loading failed.");
+ sdsfree(nodekey);
+ decrRefCount(o);
+ return NULL;
+ }
unsigned char *first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
@@ -1670,12 +1725,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
/* Load total number of items inside the stream. */
s->length = rdbLoadLen(rdb,NULL);
+
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream object metadata loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
+
/* Consumer groups loading */
- size_t cgroups_count = rdbLoadLen(rdb,NULL);
+ uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
+ if (cgroups_count == RDB_LENERR) {
+ rdbReportReadError("Stream cgroup count loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
@@ -1683,11 +1750,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
streamID cg_id;
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cgname == NULL) {
- rdbExitReportCorruptRDB(
+ rdbReportReadError(
"Error reading the consumer group name from Stream");
+ decrRefCount(o);
+ return NULL;
}
+
cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream cgroup ID loading failed.");
+ sdsfree(cgname);
+ decrRefCount(o);
+ return NULL;
+ }
+
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
@@ -1699,13 +1776,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
- size_t pel_size = rdbLoadLen(rdb,NULL);
+ uint64_t pel_size = rdbLoadLen(rdb,NULL);
+ if (pel_size == RDB_LENERR) {
+ rdbReportReadError("Stream PEL size loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
- rdbLoadRaw(rdb,rawid,sizeof(rawid));
+ if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
+ rdbReportReadError("Stream PEL ID loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
streamNACK *nack = streamCreateNACK(NULL);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream PEL NACK loading failed.");
+ decrRefCount(o);
+ streamFreeNACK(nack);
+ return NULL;
+ }
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
"loading stream consumer group");
@@ -1713,24 +1805,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
- size_t consumers_num = rdbLoadLen(rdb,NULL);
+ uint64_t consumers_num = rdbLoadLen(rdb,NULL);
+ if (consumers_num == RDB_LENERR) {
+ rdbReportReadError("Stream consumers num loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(consumers_num--) {
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cname == NULL) {
- rdbExitReportCorruptRDB(
- "Error reading the consumer name from Stream group");
+ rdbReportReadError(
+ "Error reading the consumer name from Stream group.");
+ decrRefCount(o);
+ return NULL;
}
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Stream short read reading seen time.");
+ decrRefCount(o);
+ return NULL;
+ }
/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);
+ if (pel_size == RDB_LENERR) {
+ rdbReportReadError(
+ "Stream consumer PEL num loading failed.");
+ decrRefCount(o);
+ return NULL;
+ }
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
- rdbLoadRaw(rdb,rawid,sizeof(rawid));
+ if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
+ rdbReportReadError(
+ "Stream short read reading PEL streamID.");
+ decrRefCount(o);
+ return NULL;
+ }
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
if (nack == raxNotFound)
rdbExitReportCorruptRDB("Consumer entry not found in "
@@ -1749,6 +1864,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) return NULL;
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
@@ -1776,6 +1892,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Module v2 serialization has an EOF mark at the end. */
if (io.ver == 2) {
uint64_t eof = rdbLoadLen(rdb,NULL);
+ if (eof == RDB_LENERR) {
+ o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */
+ decrRefCount(o);
+ return NULL;
+ }
if (eof != RDB_MODULE_OPCODE_EOF) {
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
exit(1);
@@ -1789,7 +1910,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
o = createModuleObject(mt,ptr);
} else {
- rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
+ rdbReportReadError("Unknown RDB encoding type %d",rdbtype);
+ return NULL;
}
return o;
}
@@ -1888,11 +2010,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* load the actual type, and continue. */
expiretime = rdbLoadTime(rdb);
expiretime *= 1000;
+ if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
+ if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
@@ -1993,15 +2117,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(auxval);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_MODULE_AUX) {
- /* This is just for compatibility with the future: we have plans
- * to add the ability for modules to store anything in the RDB
- * file, like data that is not related to the Redis key space.
- * Such data will potentially be stored both before and after the
- * RDB keys-values section. For this reason since RDB version 9,
- * we have the ability to read a MODULE_AUX opcode followed by an
- * identifier of the module, and a serialized value in "MODULE V2"
- * format. */
+ /* Load module data that is not related to the Redis key space.
+ * Such data can be potentially be stored both before and after the
+ * RDB keys-values section. */
uint64_t moduleid = rdbLoadLen(rdb,NULL);
+ int when_opcode = rdbLoadLen(rdb,NULL);
+ int when = rdbLoadLen(rdb,NULL);
+ if (rioGetReadError(rdb)) goto eoferr;
+ if (when_opcode != RDB_MODULE_OPCODE_UINT)
+ rdbReportReadError("bad when_opcode");
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
moduleTypeNameByID(name,moduleid);
@@ -2011,14 +2135,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
- /* This version of Redis actually does not know what to do
- * with modules AUX data... */
- serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
- exit(1);
+ if (!mt->aux_load) {
+ /* Module doesn't support AUX. */
+ serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
+ exit(1);
+ }
+
+ RedisModuleIO io;
+ moduleInitIOContext(io,mt,rdb,NULL);
+ io.ver = 2;
+ /* Call the rdb_load method of the module providing the 10 bit
+ * encoding version in the lower 10 bits of the module ID. */
+ if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
+ moduleTypeNameByID(name,moduleid);
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
+ exit(1);
+ }
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ uint64_t eof = rdbLoadLen(rdb,NULL);
+ if (eof != RDB_MODULE_OPCODE_EOF) {
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
+ exit(1);
+ }
+ continue;
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
decrRefCount(aux);
+ continue; /* Read next opcode. */
}
}
@@ -2072,10 +2219,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
}
return C_OK;
-eoferr: /* unexpected end of file is handled here with a fatal exit */
- serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
- rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
- return C_ERR; /* Just to avoid warning */
+ /* Unexpected end of file is handled here calling rdbReportReadError():
+ * this will in turn either abort Redis in most cases, or if we are loading
+ * the RDB file from a socket during initial SYNC (diskless replica mode),
+ * we'll report the error to the caller, so that we can retry. */
+eoferr:
+ serverLog(LL_WARNING,
+ "Short read or OOM loading DB. Unrecoverable error, aborting now.");
+ rdbReportReadError("Unexpected EOF reading RDB file");
+ return C_ERR;
}
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
diff --git a/src/rdb.h b/src/rdb.h
index 0acddf9ab..40a50f7ba 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadStringObject(rio *rdb);
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 1d16fa4ee..2df41580b 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -104,6 +104,7 @@ static struct config {
int is_fetching_slots;
int is_updating_slots;
int slots_last_update;
+ int enable_tracking;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
@@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port,
goto fail;
}
- if(config.auth){
+ if(config.auth) {
void *authReply = NULL;
redisAppendCommand(c, "AUTH %s", config.auth);
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
@@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->prefix_pending++;
}
+ if (config.enable_tracking) {
+ char *buf = NULL;
+ int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
+ c->obuf = sdscatlen(c->obuf, buf, len);
+ free(buf);
+ c->prefix_pending++;
+ }
+
/* If a DB number different than zero is selected, prefix our request
* buffer with the SELECT command, that will be discarded the first
* time the replies are received, so if the client is reused the
@@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) {
} else if (config.num_threads < 0) config.num_threads = 0;
} else if (!strcmp(argv[i],"--cluster")) {
config.cluster_mode = 1;
+ } else if (!strcmp(argv[i],"--enable-tracking")) {
+ config.enable_tracking = 1;
} else if (!strcmp(argv[i],"--help")) {
exit_status = 0;
goto usage;
@@ -1380,6 +1391,7 @@ usage:
" --dbnum <db> SELECT the specified db number (default 0)\n"
" --threads <num> Enable multi-thread mode.\n"
" --cluster Enable cluster mode.\n"
+" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD\n"
" Using this option the benchmark will expand the string __rand_int__\n"
@@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) {
config.is_fetching_slots = 0;
config.is_updating_slots = 0;
config.slots_last_update = 0;
+ config.enable_tracking = 0;
i = parseOptions(argc,argv);
argc -= i;
diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c
index e2d71b5a5..5e7415046 100644
--- a/src/redis-check-rdb.c
+++ b/src/redis-check-rdb.c
@@ -216,14 +216,16 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
/* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to
* load the actual type, and continue. */
- if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
+ expiretime = rdbLoadTime(&rdb);
expiretime *= 1000;
+ if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
- if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr;
+ expiretime = rdbLoadMillisecondTime(&rdb, rdbver);
+ if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */
diff --git a/src/redis-cli.c b/src/redis-cli.c
index e363a2795..c183155cb 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -218,6 +218,7 @@ static struct config {
int hotkeys;
int stdinarg; /* get last arg from stdin. (-x option) */
char *auth;
+ char *user;
int output; /* output mode, see OUTPUT_* defines */
sds mb_delim;
char prompt[128];
@@ -230,6 +231,7 @@ static struct config {
int verbose;
clusterManagerCommand cluster_manager_command;
int no_auth_warning;
+ int resp3;
} config;
/* User preferences. */
@@ -728,8 +730,13 @@ static int cliAuth(void) {
redisReply *reply;
if (config.auth == NULL) return REDIS_OK;
- reply = redisCommand(context,"AUTH %s",config.auth);
+ if (config.user == NULL)
+ reply = redisCommand(context,"AUTH %s",config.auth);
+ else
+ reply = redisCommand(context,"AUTH %s %s",config.user,config.auth);
if (reply != NULL) {
+ if (reply->type == REDIS_REPLY_ERROR)
+ fprintf(stderr,"Warning: AUTH failed\n");
freeReplyObject(reply);
return REDIS_OK;
}
@@ -751,6 +758,21 @@ static int cliSelect(void) {
return REDIS_ERR;
}
+/* Select RESP3 mode if redis-cli was started with the -3 option. */
+static int cliSwitchProto(void) {
+ redisReply *reply;
+ if (config.resp3 == 0) return REDIS_OK;
+
+ reply = redisCommand(context,"HELLO 3");
+ if (reply != NULL) {
+ int result = REDIS_OK;
+ if (reply->type == REDIS_REPLY_ERROR) result = REDIS_ERR;
+ freeReplyObject(reply);
+ return result;
+ }
+ return REDIS_ERR;
+}
+
/* Connect to the server. It is possible to pass certain flags to the function:
* CC_FORCE: The connection is performed even if there is already
* a connected socket.
@@ -788,11 +810,13 @@ static int cliConnect(int flags) {
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
- /* Do AUTH and select the right DB. */
+ /* Do AUTH, select the right DB, switch to RESP3 if needed. */
if (cliAuth() != REDIS_OK)
return REDIS_ERR;
if (cliSelect() != REDIS_OK)
return REDIS_ERR;
+ if (cliSwitchProto() != REDIS_OK)
+ return REDIS_ERR;
}
return REDIS_OK;
}
@@ -819,10 +843,17 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
out = sdscatprintf(out,"(double) %s\n",r->str);
break;
case REDIS_REPLY_STRING:
+ case REDIS_REPLY_VERB:
/* If you are producing output for the standard output we want
- * a more interesting output with quoted characters and so forth */
- out = sdscatrepr(out,r->str,r->len);
- out = sdscat(out,"\n");
+ * a more interesting output with quoted characters and so forth,
+ * unless it's a verbatim string type. */
+ if (r->type == REDIS_REPLY_STRING) {
+ out = sdscatrepr(out,r->str,r->len);
+ out = sdscat(out,"\n");
+ } else {
+ out = sdscatlen(out,r->str,r->len);
+ out = sdscat(out,"\n");
+ }
break;
case REDIS_REPLY_NIL:
out = sdscat(out,"(nil)\n");
@@ -961,6 +992,7 @@ static sds cliFormatReplyRaw(redisReply *r) {
break;
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
+ case REDIS_REPLY_VERB:
if (r->type == REDIS_REPLY_STATUS && config.eval_ldb) {
/* The Lua debugger replies with arrays of simple (status)
* strings. We colorize the output for more fun if this
@@ -980,9 +1012,15 @@ static sds cliFormatReplyRaw(redisReply *r) {
out = sdscatlen(out,r->str,r->len);
}
break;
+ case REDIS_REPLY_BOOL:
+ out = sdscat(out,r->integer ? "(true)" : "(false)");
+ break;
case REDIS_REPLY_INTEGER:
out = sdscatprintf(out,"%lld",r->integer);
break;
+ case REDIS_REPLY_DOUBLE:
+ out = sdscatprintf(out,"%s",r->str);
+ break;
case REDIS_REPLY_ARRAY:
for (i = 0; i < r->elements; i++) {
if (i > 0) out = sdscat(out,config.mb_delim);
@@ -991,6 +1029,19 @@ static sds cliFormatReplyRaw(redisReply *r) {
sdsfree(tmp);
}
break;
+ case REDIS_REPLY_MAP:
+ for (i = 0; i < r->elements; i += 2) {
+ if (i > 0) out = sdscat(out,config.mb_delim);
+ tmp = cliFormatReplyRaw(r->element[i]);
+ out = sdscatlen(out,tmp,sdslen(tmp));
+ sdsfree(tmp);
+
+ out = sdscatlen(out," ",1);
+ tmp = cliFormatReplyRaw(r->element[i+1]);
+ out = sdscatlen(out,tmp,sdslen(tmp));
+ sdsfree(tmp);
+ }
+ break;
default:
fprintf(stderr,"Unknown reply type: %d\n", r->type);
exit(1);
@@ -1013,13 +1064,21 @@ static sds cliFormatReplyCSV(redisReply *r) {
case REDIS_REPLY_INTEGER:
out = sdscatprintf(out,"%lld",r->integer);
break;
+ case REDIS_REPLY_DOUBLE:
+ out = sdscatprintf(out,"%s",r->str);
+ break;
case REDIS_REPLY_STRING:
+ case REDIS_REPLY_VERB:
out = sdscatrepr(out,r->str,r->len);
break;
case REDIS_REPLY_NIL:
- out = sdscat(out,"NIL");
+ out = sdscat(out,"NULL");
+ break;
+ case REDIS_REPLY_BOOL:
+ out = sdscat(out,r->integer ? "true" : "false");
break;
case REDIS_REPLY_ARRAY:
+ case REDIS_REPLY_MAP: /* CSV has no map type, just output flat list. */
for (i = 0; i < r->elements; i++) {
sds tmp = cliFormatReplyCSV(r->element[i]);
out = sdscatlen(out,tmp,sdslen(tmp));
@@ -1213,7 +1272,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) {
config.dbnum = atoi(argv[1]);
cliRefreshPrompt();
- } else if (!strcasecmp(command,"auth") && argc == 2) {
+ } else if (!strcasecmp(command,"auth") && (argc == 2 || argc == 3))
+ {
cliSelect();
}
}
@@ -1296,8 +1356,12 @@ static int parseOptions(int argc, char **argv) {
config.dbnum = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--no-auth-warning")) {
config.no_auth_warning = 1;
- } else if (!strcmp(argv[i],"-a") && !lastarg) {
+ } else if ((!strcmp(argv[i],"-a") || !strcmp(argv[i],"--pass"))
+ && !lastarg)
+ {
config.auth = argv[++i];
+ } else if (!strcmp(argv[i],"--user") && !lastarg) {
+ config.user = argv[++i];
} else if (!strcmp(argv[i],"-u") && !lastarg) {
parseRedisUri(argv[++i]);
} else if (!strcmp(argv[i],"--raw")) {
@@ -1439,6 +1503,8 @@ static int parseOptions(int argc, char **argv) {
printf("redis-cli %s\n", version);
sdsfree(version);
exit(0);
+ } else if (!strcmp(argv[i],"-3")) {
+ config.resp3 = 1;
} else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') {
if (config.cluster_manager_command.argc == 0) {
int j = i + 1;
@@ -1514,11 +1580,14 @@ static void usage(void) {
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
" variable to pass this password more safely\n"
" (if both are used, this argument takes predecence).\n"
+" -user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
+" -pass <password> Alias of -a for consistency with the new --user option.\n"
" -u <uri> Server URI.\n"
" -r <repeat> Execute specified command N times.\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
" It is possible to specify sub-second times like -i 0.1.\n"
" -n <db> Database number.\n"
+" -3 Start session in RESP3 protocol mode.\n"
" -x Read last argument from STDIN.\n"
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n).\n"
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
@@ -1533,7 +1602,9 @@ static void usage(void) {
" --csv is specified, or if you redirect the output to a non\n"
" TTY, it samples the latency for 1 second (you can use\n"
" -i to change the interval), then produces a single output\n"
-" and exits.\n"
+" and exits.\n",version);
+
+ fprintf(stderr,
" --latency-history Like --latency but tracking latency changes over time.\n"
" Default time interval is 15 sec. Change it using -i.\n"
" --latency-dist Shows latency as a spectrum, requires xterm 256 colors.\n"
@@ -1568,7 +1639,7 @@ static void usage(void) {
" --help Output this help and exit.\n"
" --version Output version and exit.\n"
"\n",
- version, REDIS_CLI_DEFAULT_PIPE_TIMEOUT);
+ REDIS_CLI_DEFAULT_PIPE_TIMEOUT);
/* Using another fprintf call to avoid -Woverlength-strings compile warning */
fprintf(stderr,
"Cluster Manager Commands:\n"
@@ -2350,7 +2421,12 @@ static int clusterManagerNodeConnect(clusterManagerNode *node) {
* errors. */
anetKeepAlive(NULL, node->context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
if (config.auth) {
- redisReply *reply = redisCommand(node->context,"AUTH %s",config.auth);
+ redisReply *reply;
+ if (config.user == NULL)
+ reply = redisCommand(node->context,"AUTH %s", config.auth);
+ else
+ reply = redisCommand(node->context,"AUTH %s %s",
+ config.user,config.auth);
int ok = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply != NULL) freeReplyObject(reply);
if (!ok) return 0;
@@ -6724,6 +6800,7 @@ static void pipeMode(void) {
/* Handle the readable state: we can read replies from the server. */
if (mask & AE_READABLE) {
ssize_t nread;
+ int read_error = 0;
/* Read from socket and feed the hiredis reader. */
do {
@@ -6731,7 +6808,8 @@ static void pipeMode(void) {
if (nread == -1 && errno != EAGAIN && errno != EINTR) {
fprintf(stderr, "Error reading from the server: %s\n",
strerror(errno));
- exit(1);
+ read_error = 1;
+ break;
}
if (nread > 0) {
redisReaderFeed(reader,ibuf,nread);
@@ -6764,6 +6842,11 @@ static void pipeMode(void) {
freeReplyObject(reply);
}
} while(reply);
+
+ /* Abort on read errors. We abort here because it is important
+ * to consume replies even after a read error: this way we can
+ * show a potential problem to the user. */
+ if (read_error) exit(1);
}
/* Handle the writable state: we can send protocol to the server. */
@@ -7671,6 +7754,7 @@ int main(int argc, char **argv) {
config.hotkeys = 0;
config.stdinarg = 0;
config.auth = NULL;
+ config.user = NULL;
config.eval = NULL;
config.eval_ldb = 0;
config.eval_ldb_end = 0;
diff --git a/src/redismodule.h b/src/redismodule.h
index 60681da7c..6a3a164b5 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -129,6 +129,10 @@
#define REDISMODULE_NOT_USED(V) ((void) V)
+/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
/* This type represents a timer handle, and is returned when a timer is
* registered and used in order to invalidate a timer. It's just a 64 bit
* number, because this is how each timer is represented inside the radix tree
@@ -140,6 +144,9 @@ typedef uint64_t RedisModuleTimerID;
/* Do filter RedisModule_Call() commands initiated by module itself. */
#define REDISMODULE_CMDFILTER_NOSELF (1<<0)
+/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */
+#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0)
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@@ -166,6 +173,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
+typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
+typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
@@ -175,7 +184,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
-#define REDISMODULE_TYPE_METHOD_VERSION 1
+#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
uint64_t version;
RedisModuleTypeLoadFunc rdb_load;
@@ -184,6 +193,9 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeMemUsageFunc mem_usage;
RedisModuleTypeDigestFunc digest;
RedisModuleTypeFreeFunc free;
+ RedisModuleTypeAuxLoadFunc aux_load;
+ RedisModuleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
@@ -272,6 +284,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
+int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io);
+void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options);
void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value);
uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value);
@@ -287,6 +301,7 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value)
float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
+void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line);
int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
@@ -448,6 +463,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ModuleTypeSetValue);
REDISMODULE_GET_API(ModuleTypeGetType);
REDISMODULE_GET_API(ModuleTypeGetValue);
+ REDISMODULE_GET_API(IsIOError);
+ REDISMODULE_GET_API(SetModuleOptions);
REDISMODULE_GET_API(SaveUnsigned);
REDISMODULE_GET_API(LoadUnsigned);
REDISMODULE_GET_API(SaveSigned);
@@ -463,6 +480,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(EmitAOF);
REDISMODULE_GET_API(Log);
REDISMODULE_GET_API(LogIOError);
+ REDISMODULE_GET_API(_Assert);
REDISMODULE_GET_API(StringAppendBuffer);
REDISMODULE_GET_API(RetainString);
REDISMODULE_GET_API(StringCompare);
@@ -542,6 +560,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
return REDISMODULE_OK;
}
+#define RedisModule_Assert(_e) ((_e)?(void)0 : (RedisModule__Assert(#_e,__FILE__,__LINE__),exit(1)))
+
#else
/* Things only defined for the modules core, not exported to modules
diff --git a/src/replication.c b/src/replication.c
index 7adf5ba38..8039e06ae 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -823,7 +823,9 @@ void replconfCommand(client *c) {
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
- * confirms slave is online and ready to get more data). */
+ * confirms slave is online and ready to get more data). This
+ * allows for simpler and less CPU intensive EOF detection
+ * when streaming RDB files. */
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
/* Note: this command does not reply anything! */
@@ -842,18 +844,20 @@ void replconfCommand(client *c) {
addReply(c,shared.ok);
}
-/* This function puts a slave in the online state, and should be called just
- * after a slave received the RDB file for the initial synchronization, and
+/* This function puts a replica in the online state, and should be called just
+ * after a replica received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
*
- * 1) Put the slave in ONLINE state (useless when the function is called
- * because state is already ONLINE but repl_put_online_on_ack is true).
+ * 1) Put the slave in ONLINE state. Note that the function may also be called
+ * for a replicas that are already in ONLINE state, but having the flag
+ * repl_put_online_on_ack set to true: we still have to install the write
+ * handler in that case. This function will take care of that.
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
- * sending it to the slave.
- * 3) Update the count of good slaves. */
+ * sending it to the replica.
+ * 3) Update the count of "good replicas". */
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
@@ -965,11 +969,31 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
- /* Note: we wait for a REPLCONF ACK message from slave in
+ /* Note: we wait for a REPLCONF ACK message from the replica in
* order to really put it online (install the write handler
* so that the accumulated data can be transferred). However
* we change the replication state ASAP, since our slave
- * is technically online now. */
+ * is technically online now.
+ *
+ * So things work like that:
+ *
+ * 1. We end trasnferring the RDB file via socket.
+ * 2. The replica is put ONLINE but the write handler
+ * is not installed.
+ * 3. The replica however goes really online, and pings us
+ * back via REPLCONF ACK commands.
+ * 4. Now we finally install the write handler, and send
+ * the buffers accumulated so far to the replica.
+ *
+ * But why we do that? Because the replica, when we stream
+ * the RDB directly via the socket, must detect the RDB
+ * EOF (end of file), that is a special random string at the
+ * end of the RDB (for streamed RDBs we don't know the length
+ * in advance). Detecting such final EOF string is much
+ * simpler and less CPU intensive if no more data is sent
+ * after such final EOF. So we don't want to glue the end of
+ * the RDB trasfer with the start of the other replication
+ * data. */
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
@@ -1115,8 +1139,15 @@ void restartAOFAfterSYNC() {
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
- return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
+ int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
+ /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
+ if (enabled && !moduleAllDatatypesHandleErrors()) {
+ serverLog(LL_WARNING,
+ "Skipping diskless-load because there are modules that don't handle read errors.");
+ enabled = 0;
+ }
+ return enabled;
}
/* Helper function for readSyncBulkPayload() to make backups of the current
diff --git a/src/rio.c b/src/rio.c
index 5359bc3d6..bdbc5d0e9 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -92,6 +92,7 @@ static const rio rioBufferIO = {
rioBufferFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -145,6 +146,7 @@ static const rio rioFileIO = {
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -239,6 +241,7 @@ static const rio rioFdIO = {
rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
@@ -374,6 +377,7 @@ static const rio rioFdsetIO = {
rioFdsetFlush,
NULL, /* update_checksum */
0, /* current checksum */
+ 0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
diff --git a/src/rio.h b/src/rio.h
index beea06888..eb7a05748 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
- * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,6 +36,9 @@
#include <stdint.h>
#include "sds.h"
+#define RIO_FLAG_READ_ERROR (1<<0)
+#define RIO_FLAG_WRITE_ERROR (1<<1)
+
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
@@ -51,8 +54,8 @@ struct _rio {
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
- /* The current checksum */
- uint64_t cksum;
+ /* The current checksum and flags (see RIO_FLAG_*) */
+ uint64_t cksum, flags;
/* number of bytes read or written */
size_t processed_bytes;
@@ -99,11 +102,14 @@ typedef struct _rio rio;
* if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
+ if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
- if (r->write(r,buf,bytes_to_write) == 0)
+ if (r->write(r,buf,bytes_to_write) == 0) {
+ r->flags |= RIO_FLAG_WRITE_ERROR;
return 0;
+ }
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
@@ -112,10 +118,13 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}
static inline size_t rioRead(rio *r, void *buf, size_t len) {
+ if (r->flags & RIO_FLAG_READ_ERROR) return 0;
while (len) {
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
- if (r->read(r,buf,bytes_to_read) == 0)
+ if (r->read(r,buf,bytes_to_read) == 0) {
+ r->flags |= RIO_FLAG_READ_ERROR;
return 0;
+ }
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
@@ -132,6 +141,22 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}
+/* This function allows to know if there was a read error in any past
+ * operation, since the rio stream was created or since the last call
+ * to rioClearError(). */
+static inline int rioGetReadError(rio *r) {
+ return (r->flags & RIO_FLAG_READ_ERROR) != 0;
+}
+
+/* Like rioGetReadError() but for write errors. */
+static inline int rioGetWriteError(rio *r) {
+ return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
+}
+
+static inline void rioClearErrors(rio *r) {
+ r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
+}
+
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithFd(rio *r, int fd, size_t read_limit);
diff --git a/src/scripting.c b/src/scripting.c
index deb406457..3129e4f47 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -42,7 +42,10 @@ char *redisProtocolToLuaType_Int(lua_State *lua, char *reply);
char *redisProtocolToLuaType_Bulk(lua_State *lua, char *reply);
char *redisProtocolToLuaType_Status(lua_State *lua, char *reply);
char *redisProtocolToLuaType_Error(lua_State *lua, char *reply);
-char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype);
+char *redisProtocolToLuaType_Aggregate(lua_State *lua, char *reply, int atype);
+char *redisProtocolToLuaType_Null(lua_State *lua, char *reply);
+char *redisProtocolToLuaType_Bool(lua_State *lua, char *reply, int tf);
+char *redisProtocolToLuaType_Double(lua_State *lua, char *reply);
int redis_math_random (lua_State *L);
int redis_math_randomseed (lua_State *L);
void ldbInit(void);
@@ -132,9 +135,12 @@ char *redisProtocolToLuaType(lua_State *lua, char* reply) {
case '$': p = redisProtocolToLuaType_Bulk(lua,reply); break;
case '+': p = redisProtocolToLuaType_Status(lua,reply); break;
case '-': p = redisProtocolToLuaType_Error(lua,reply); break;
- case '*': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break;
- case '%': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break;
- case '~': p = redisProtocolToLuaType_MultiBulk(lua,reply,*p); break;
+ case '*': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break;
+ case '%': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break;
+ case '~': p = redisProtocolToLuaType_Aggregate(lua,reply,*p); break;
+ case '_': p = redisProtocolToLuaType_Null(lua,reply); break;
+ case '#': p = redisProtocolToLuaType_Bool(lua,reply,p[1]); break;
+ case ',': p = redisProtocolToLuaType_Double(lua,reply); break;
}
return p;
}
@@ -182,13 +188,13 @@ char *redisProtocolToLuaType_Error(lua_State *lua, char *reply) {
return p+2;
}
-char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) {
+char *redisProtocolToLuaType_Aggregate(lua_State *lua, char *reply, int atype) {
char *p = strchr(reply+1,'\r');
long long mbulklen;
int j = 0;
string2ll(reply+1,p-reply-1,&mbulklen);
- if (server.lua_caller->resp == 2 || atype == '*') {
+ if (server.lua_client->resp == 2 || atype == '*') {
p += 2;
if (mbulklen == -1) {
lua_pushboolean(lua,0);
@@ -200,11 +206,15 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) {
p = redisProtocolToLuaType(lua,p);
lua_settable(lua,-3);
}
- } else if (server.lua_caller->resp == 3) {
+ } else if (server.lua_client->resp == 3) {
/* Here we handle only Set and Map replies in RESP3 mode, since arrays
- * follow the above RESP2 code path. */
+ * follow the above RESP2 code path. Note that those are represented
+ * as a table with the "map" or "set" field populated with the actual
+ * table representing the set or the map type. */
p += 2;
lua_newtable(lua);
+ lua_pushstring(lua,atype == '%' ? "map" : "set");
+ lua_newtable(lua);
for (j = 0; j < mbulklen; j++) {
p = redisProtocolToLuaType(lua,p);
if (atype == '%') {
@@ -214,10 +224,44 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) {
}
lua_settable(lua,-3);
}
+ lua_settable(lua,-3);
}
return p;
}
+char *redisProtocolToLuaType_Null(lua_State *lua, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ lua_pushnil(lua);
+ return p+2;
+}
+
+char *redisProtocolToLuaType_Bool(lua_State *lua, char *reply, int tf) {
+ char *p = strchr(reply+1,'\r');
+ lua_pushboolean(lua,tf == 't');
+ return p+2;
+}
+
+char *redisProtocolToLuaType_Double(lua_State *lua, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ char buf[MAX_LONG_DOUBLE_CHARS+1];
+ size_t len = p-reply-1;
+ double d;
+
+ if (len <= MAX_LONG_DOUBLE_CHARS) {
+ memcpy(buf,reply+1,len);
+ buf[len] = '\0';
+ d = strtod(buf,NULL); /* We expect a valid representation. */
+ } else {
+ d = 0;
+ }
+
+ lua_newtable(lua);
+ lua_pushstring(lua,"double");
+ lua_pushnumber(lua,d);
+ lua_settable(lua,-3);
+ return p+2;
+}
+
/* This function is used in order to push an error on the Lua stack in the
* format used by redis.pcall to return errors, which is a lua table
* with a single "err" field set to the error string. Note that this
@@ -292,6 +336,8 @@ void luaSortArray(lua_State *lua) {
* Lua reply to Redis reply conversion functions.
* ------------------------------------------------------------------------- */
+/* Reply to client 'c' converting the top element in the Lua stack to a
+ * Redis reply. As a side effect the element is consumed from the stack. */
void luaReplyToRedisReply(client *c, lua_State *lua) {
int t = lua_type(lua,-1);
@@ -300,7 +346,11 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
addReplyBulkCBuffer(c,(char*)lua_tostring(lua,-1),lua_strlen(lua,-1));
break;
case LUA_TBOOLEAN:
- addReply(c,lua_toboolean(lua,-1) ? shared.cone : shared.null[c->resp]);
+ if (server.lua_client->resp == 2)
+ addReply(c,lua_toboolean(lua,-1) ? shared.cone :
+ shared.null[c->resp]);
+ else
+ addReplyBool(c,lua_toboolean(lua,-1));
break;
case LUA_TNUMBER:
addReplyLongLong(c,(long long)lua_tonumber(lua,-1));
@@ -310,6 +360,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
* Error are returned as a single element table with 'err' field.
* Status replies are returned as single element table with 'ok'
* field. */
+
+ /* Handle error reply. */
lua_pushstring(lua,"err");
lua_gettable(lua,-2);
t = lua_type(lua,-1);
@@ -321,8 +373,9 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
lua_pop(lua,2);
return;
}
+ lua_pop(lua,1); /* Discard field name pushed before. */
- lua_pop(lua,1);
+ /* Handle status reply. */
lua_pushstring(lua,"ok");
lua_gettable(lua,-2);
t = lua_type(lua,-1);
@@ -331,25 +384,81 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
sdsmapchars(ok,"\r\n"," ",2);
addReplySds(c,sdscatprintf(sdsempty(),"+%s\r\n",ok));
sdsfree(ok);
- lua_pop(lua,1);
- } else {
+ lua_pop(lua,2);
+ return;
+ }
+ lua_pop(lua,1); /* Discard field name pushed before. */
+
+ /* Handle double reply. */
+ lua_pushstring(lua,"double");
+ lua_gettable(lua,-2);
+ t = lua_type(lua,-1);
+ if (t == LUA_TNUMBER) {
+ addReplyDouble(c,lua_tonumber(lua,-1));
+ lua_pop(lua,2);
+ return;
+ }
+ lua_pop(lua,1); /* Discard field name pushed before. */
+
+ /* Handle map reply. */
+ lua_pushstring(lua,"map");
+ lua_gettable(lua,-2);
+ t = lua_type(lua,-1);
+ if (t == LUA_TTABLE) {
+ int maplen = 0;
void *replylen = addReplyDeferredLen(c);
- int j = 1, mbulklen = 0;
-
- lua_pop(lua,1); /* Discard the 'ok' field value we popped */
- while(1) {
- lua_pushnumber(lua,j++);
- lua_gettable(lua,-2);
- t = lua_type(lua,-1);
- if (t == LUA_TNIL) {
- lua_pop(lua,1);
- break;
- }
- luaReplyToRedisReply(c, lua);
- mbulklen++;
+ lua_pushnil(lua); /* Use nil to start iteration. */
+ while (lua_next(lua,-2)) {
+ /* Stack now: table, key, value */
+ luaReplyToRedisReply(c, lua); /* Return value. */
+ lua_pushvalue(lua,-1); /* Dup key before consuming. */
+ luaReplyToRedisReply(c, lua); /* Return key. */
+ /* Stack now: table, key. */
+ maplen++;
+ }
+ setDeferredMapLen(c,replylen,maplen);
+ lua_pop(lua,2);
+ return;
+ }
+ lua_pop(lua,1); /* Discard field name pushed before. */
+
+ /* Handle set reply. */
+ lua_pushstring(lua,"set");
+ lua_gettable(lua,-2);
+ t = lua_type(lua,-1);
+ if (t == LUA_TTABLE) {
+ int setlen = 0;
+ void *replylen = addReplyDeferredLen(c);
+ lua_pushnil(lua); /* Use nil to start iteration. */
+ while (lua_next(lua,-2)) {
+ /* Stack now: table, key, true */
+ lua_pop(lua,1); /* Discard the boolean value. */
+ lua_pushvalue(lua,-1); /* Dup key before consuming. */
+ luaReplyToRedisReply(c, lua); /* Return key. */
+ /* Stack now: table, key. */
+ setlen++;
}
- setDeferredArrayLen(c,replylen,mbulklen);
+ setDeferredSetLen(c,replylen,setlen);
+ lua_pop(lua,2);
+ return;
}
+ lua_pop(lua,1); /* Discard field name pushed before. */
+
+ /* Handle the array reply. */
+ void *replylen = addReplyDeferredLen(c);
+ int j = 1, mbulklen = 0;
+ while(1) {
+ lua_pushnumber(lua,j++);
+ lua_gettable(lua,-2);
+ t = lua_type(lua,-1);
+ if (t == LUA_TNIL) {
+ lua_pop(lua,1);
+ break;
+ }
+ luaReplyToRedisReply(c, lua);
+ mbulklen++;
+ }
+ setDeferredArrayLen(c,replylen,mbulklen);
break;
default:
addReplyNull(c);
@@ -859,6 +968,25 @@ int luaLogCommand(lua_State *lua) {
return 0;
}
+/* redis.setresp() */
+int luaSetResp(lua_State *lua) {
+ int argc = lua_gettop(lua);
+
+ if (argc != 1) {
+ lua_pushstring(lua, "redis.setresp() requires one argument.");
+ return lua_error(lua);
+ }
+
+ int resp = lua_tonumber(lua,-argc);
+ if (resp != 2 && resp != 3) {
+ lua_pushstring(lua, "RESP version must be 2 or 3.");
+ return lua_error(lua);
+ }
+
+ server.lua_client->resp = resp;
+ return 0;
+}
+
/* ---------------------------------------------------------------------------
* Lua engine initialization and reset.
* ------------------------------------------------------------------------- */
@@ -986,6 +1114,11 @@ void scriptingInit(int setup) {
lua_pushcfunction(lua,luaLogCommand);
lua_settable(lua,-3);
+ /* redis.setresp */
+ lua_pushstring(lua,"setresp");
+ lua_pushcfunction(lua,luaSetResp);
+ lua_settable(lua,-3);
+
lua_pushstring(lua,"LOG_DEBUG");
lua_pushnumber(lua,LL_DEBUG);
lua_settable(lua,-3);
@@ -1379,8 +1512,9 @@ void evalGenericCommand(client *c, int evalsha) {
luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys);
luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);
- /* Select the right DB in the context of the Lua client */
+ /* Set the Lua client database and protocol. */
selectDb(server.lua_client,c->db->id);
+ server.lua_client->resp = 2; /* Default is RESP2, scripts can change it. */
/* Set a hook in order to be able to stop the script execution if it
* is running for too much time.
@@ -2052,6 +2186,11 @@ char *ldbRedisProtocolToHuman_Int(sds *o, char *reply);
char *ldbRedisProtocolToHuman_Bulk(sds *o, char *reply);
char *ldbRedisProtocolToHuman_Status(sds *o, char *reply);
char *ldbRedisProtocolToHuman_MultiBulk(sds *o, char *reply);
+char *ldbRedisProtocolToHuman_Set(sds *o, char *reply);
+char *ldbRedisProtocolToHuman_Map(sds *o, char *reply);
+char *ldbRedisProtocolToHuman_Null(sds *o, char *reply);
+char *ldbRedisProtocolToHuman_Bool(sds *o, char *reply);
+char *ldbRedisProtocolToHuman_Double(sds *o, char *reply);
/* Get Redis protocol from 'reply' and appends it in human readable form to
* the passed SDS string 'o'.
@@ -2066,6 +2205,11 @@ char *ldbRedisProtocolToHuman(sds *o, char *reply) {
case '+': p = ldbRedisProtocolToHuman_Status(o,reply); break;
case '-': p = ldbRedisProtocolToHuman_Status(o,reply); break;
case '*': p = ldbRedisProtocolToHuman_MultiBulk(o,reply); break;
+ case '~': p = ldbRedisProtocolToHuman_Set(o,reply); break;
+ case '%': p = ldbRedisProtocolToHuman_Map(o,reply); break;
+ case '_': p = ldbRedisProtocolToHuman_Null(o,reply); break;
+ case '#': p = ldbRedisProtocolToHuman_Bool(o,reply); break;
+ case ',': p = ldbRedisProtocolToHuman_Double(o,reply); break;
}
return p;
}
@@ -2120,6 +2264,62 @@ char *ldbRedisProtocolToHuman_MultiBulk(sds *o, char *reply) {
return p;
}
+char *ldbRedisProtocolToHuman_Set(sds *o, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ long long mbulklen;
+ int j = 0;
+
+ string2ll(reply+1,p-reply-1,&mbulklen);
+ p += 2;
+ *o = sdscatlen(*o,"~(",2);
+ for (j = 0; j < mbulklen; j++) {
+ p = ldbRedisProtocolToHuman(o,p);
+ if (j != mbulklen-1) *o = sdscatlen(*o,",",1);
+ }
+ *o = sdscatlen(*o,")",1);
+ return p;
+}
+
+char *ldbRedisProtocolToHuman_Map(sds *o, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ long long mbulklen;
+ int j = 0;
+
+ string2ll(reply+1,p-reply-1,&mbulklen);
+ p += 2;
+ *o = sdscatlen(*o,"{",1);
+ for (j = 0; j < mbulklen; j++) {
+ p = ldbRedisProtocolToHuman(o,p);
+ *o = sdscatlen(*o," => ",4);
+ p = ldbRedisProtocolToHuman(o,p);
+ if (j != mbulklen-1) *o = sdscatlen(*o,",",1);
+ }
+ *o = sdscatlen(*o,"}",1);
+ return p;
+}
+
+char *ldbRedisProtocolToHuman_Null(sds *o, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ *o = sdscatlen(*o,"(null)",6);
+ return p+2;
+}
+
+char *ldbRedisProtocolToHuman_Bool(sds *o, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ if (reply[1] == 't')
+ *o = sdscatlen(*o,"#true",5);
+ else
+ *o = sdscatlen(*o,"#false",6);
+ return p+2;
+}
+
+char *ldbRedisProtocolToHuman_Double(sds *o, char *reply) {
+ char *p = strchr(reply+1,'\r');
+ *o = sdscatlen(*o,"(double) ",9);
+ *o = sdscatlen(*o,reply+1,p-reply-1);
+ return p+2;
+}
+
/* Log a Redis reply as debugger output, in an human readable format.
* If the resulting string is longer than 'len' plus a few more chars
* used as prefix, it gets truncated. */
diff --git a/src/server.c b/src/server.c
index c0e59c86c..f38ed7897 100644
--- a/src/server.c
+++ b/src/server.c
@@ -146,6 +146,8 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */
* in this condition but just a few.
*
* no-monitor: Do not automatically propagate the command on MONITOR.
+ *
+ * no-slowlog: Do not automatically propagate the command to the slowlog.
*
* cluster-asking: Perform an implicit ASKING for this command, so the
* command will be accepted in cluster mode if the slot is marked
@@ -627,7 +629,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"auth",authCommand,-2,
- "no-script ok-loading ok-stale fast @connection",
+ "no-script ok-loading ok-stale fast no-monitor no-slowlog @connection",
0,NULL,0,0,0,0,0,0},
/* We don't allow PING during loading since in Redis PING is used as
@@ -670,7 +672,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"exec",execCommand,1,
- "no-script no-monitor @transaction",
+ "no-script no-monitor no-slowlog @transaction",
0,NULL,0,0,0,0,0,0},
{"discard",discardCommand,1,
@@ -822,7 +824,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"hello",helloCommand,-2,
- "no-script fast @connection",
+ "no-script fast no-monitor no-slowlog @connection",
0,NULL,0,0,0,0,0,0},
/* EVAL can modify the dataset, however it is not flagged as a write
@@ -2174,6 +2176,16 @@ void createSharedObjects(void) {
shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));
+ shared.emptymap[0] = NULL;
+ shared.emptymap[1] = NULL;
+ shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
+ shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n"));
+
+ shared.emptyset[0] = NULL;
+ shared.emptyset[1] = NULL;
+ shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n"));
+ shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n"));
+
for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
char dictid_str[64];
int dictid_len;
@@ -2418,6 +2430,9 @@ void initServerConfig(void) {
/* Latency monitor */
server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
+ /* Tracking. */
+ server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL;
+
/* Debugging */
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
@@ -2926,6 +2941,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) {
c->flags |= CMD_STALE;
} else if (!strcasecmp(flag,"no-monitor")) {
c->flags |= CMD_SKIP_MONITOR;
+ } else if (!strcasecmp(flag,"no-slowlog")) {
+ c->flags |= CMD_SKIP_SLOWLOG;
} else if (!strcasecmp(flag,"cluster-asking")) {
c->flags |= CMD_ASKING;
} else if (!strcasecmp(flag,"fast")) {
@@ -3210,7 +3227,7 @@ void call(client *c, int flags) {
/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
- if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
+ if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
@@ -3341,9 +3358,10 @@ int processCommand(client *c) {
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
- int auth_required = !(DefaultUser->flags & USER_FLAG_NOPASS) &&
+ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
+ DefaultUser->flags & USER_FLAG_DISABLED) &&
!c->authenticated;
- if (auth_required || DefaultUser->flags & USER_FLAG_DISABLED) {
+ if (auth_required) {
/* AUTH and HELLO are valid even in non authenticated state. */
if (c->cmd->proc != authCommand || c->cmd->proc == helloCommand) {
flagTransaction(c);
@@ -3411,13 +3429,20 @@ int processCommand(client *c) {
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(c->cmd->flags & CMD_DENYOOM ||
- (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
+ (c->flags & CLIENT_MULTI &&
+ c->cmd->proc != execCommand &&
+ c->cmd->proc != discardCommand)))
+ {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
+ /* Make sure to use a reasonable amount of memory for client side
+ * caching metadata. */
+ if (server.tracking_clients) trackingLimitUsedSlots();
+
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
int deny_write_type = writeCommandsDeniedByDiskError();
@@ -3718,6 +3743,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) {
flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
+ flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog");
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
@@ -4167,7 +4193,8 @@ sds genRedisInfoString(char *section) {
"active_defrag_hits:%lld\r\n"
"active_defrag_misses:%lld\r\n"
"active_defrag_key_hits:%lld\r\n"
- "active_defrag_key_misses:%lld\r\n",
+ "active_defrag_key_misses:%lld\r\n"
+ "tracking_used_slots:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -4193,7 +4220,8 @@ sds genRedisInfoString(char *section) {
server.stat_active_defrag_hits,
server.stat_active_defrag_misses,
server.stat_active_defrag_key_hits,
- server.stat_active_defrag_key_misses);
+ server.stat_active_defrag_key_misses,
+ trackingGetUsedSlots());
}
/* Replication */
@@ -4339,6 +4367,13 @@ sds genRedisInfoString(char *section) {
(long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec);
}
+ /* Modules */
+ if (allsections || defsections || !strcasecmp(section,"modules")) {
+ if (sections++) info = sdscat(info,"\r\n");
+ info = sdscatprintf(info,"# Modules\r\n");
+ info = genModulesInfoString(info);
+ }
+
/* Command statistics */
if (allsections || !strcasecmp(section,"commandstats")) {
if (sections++) info = sdscat(info,"\r\n");
@@ -4394,7 +4429,9 @@ void infoCommand(client *c) {
addReply(c,shared.syntaxerr);
return;
}
- addReplyBulkSds(c, genRedisInfoString(section));
+ sds info = genRedisInfoString(section);
+ addReplyVerbatim(c,info,sdslen(info),"txt");
+ sdsfree(info);
}
void monitorCommand(client *c) {
@@ -4840,9 +4877,9 @@ int main(int argc, char **argv) {
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
- char hashseed[16];
- getRandomHexChars(hashseed,sizeof(hashseed));
- dictSetHashFunctionSeed((uint8_t*)hashseed);
+ uint8_t hashseed[16];
+ getRandomBytes(hashseed,sizeof(hashseed));
+ dictSetHashFunctionSeed(hashseed);
server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
diff --git a/src/server.h b/src/server.h
index 7aa4bc2b7..d132cf09c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
+#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
@@ -219,35 +220,36 @@ typedef long long mstime_t; /* millisecond time type. */
#define CMD_LOADING (1ULL<<9) /* "ok-loading" flag */
#define CMD_STALE (1ULL<<10) /* "ok-stale" flag */
#define CMD_SKIP_MONITOR (1ULL<<11) /* "no-monitor" flag */
-#define CMD_ASKING (1ULL<<12) /* "cluster-asking" flag */
-#define CMD_FAST (1ULL<<13) /* "fast" flag */
+#define CMD_SKIP_SLOWLOG (1ULL<<12) /* "no-slowlog" flag */
+#define CMD_ASKING (1ULL<<13) /* "cluster-asking" flag */
+#define CMD_FAST (1ULL<<14) /* "fast" flag */
/* Command flags used by the module system. */
-#define CMD_MODULE_GETKEYS (1ULL<<14) /* Use the modules getkeys interface. */
-#define CMD_MODULE_NO_CLUSTER (1ULL<<15) /* Deny on Redis Cluster. */
+#define CMD_MODULE_GETKEYS (1ULL<<15) /* Use the modules getkeys interface. */
+#define CMD_MODULE_NO_CLUSTER (1ULL<<16) /* Deny on Redis Cluster. */
/* Command flags that describe ACLs categories. */
-#define CMD_CATEGORY_KEYSPACE (1ULL<<16)
-#define CMD_CATEGORY_READ (1ULL<<17)
-#define CMD_CATEGORY_WRITE (1ULL<<18)
-#define CMD_CATEGORY_SET (1ULL<<19)
-#define CMD_CATEGORY_SORTEDSET (1ULL<<20)
-#define CMD_CATEGORY_LIST (1ULL<<21)
-#define CMD_CATEGORY_HASH (1ULL<<22)
-#define CMD_CATEGORY_STRING (1ULL<<23)
-#define CMD_CATEGORY_BITMAP (1ULL<<24)
-#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<25)
-#define CMD_CATEGORY_GEO (1ULL<<26)
-#define CMD_CATEGORY_STREAM (1ULL<<27)
-#define CMD_CATEGORY_PUBSUB (1ULL<<28)
-#define CMD_CATEGORY_ADMIN (1ULL<<29)
-#define CMD_CATEGORY_FAST (1ULL<<30)
-#define CMD_CATEGORY_SLOW (1ULL<<31)
-#define CMD_CATEGORY_BLOCKING (1ULL<<32)
-#define CMD_CATEGORY_DANGEROUS (1ULL<<33)
-#define CMD_CATEGORY_CONNECTION (1ULL<<34)
-#define CMD_CATEGORY_TRANSACTION (1ULL<<35)
-#define CMD_CATEGORY_SCRIPTING (1ULL<<36)
+#define CMD_CATEGORY_KEYSPACE (1ULL<<17)
+#define CMD_CATEGORY_READ (1ULL<<18)
+#define CMD_CATEGORY_WRITE (1ULL<<19)
+#define CMD_CATEGORY_SET (1ULL<<20)
+#define CMD_CATEGORY_SORTEDSET (1ULL<<21)
+#define CMD_CATEGORY_LIST (1ULL<<22)
+#define CMD_CATEGORY_HASH (1ULL<<23)
+#define CMD_CATEGORY_STRING (1ULL<<24)
+#define CMD_CATEGORY_BITMAP (1ULL<<25)
+#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<26)
+#define CMD_CATEGORY_GEO (1ULL<<27)
+#define CMD_CATEGORY_STREAM (1ULL<<28)
+#define CMD_CATEGORY_PUBSUB (1ULL<<29)
+#define CMD_CATEGORY_ADMIN (1ULL<<30)
+#define CMD_CATEGORY_FAST (1ULL<<31)
+#define CMD_CATEGORY_SLOW (1ULL<<32)
+#define CMD_CATEGORY_BLOCKING (1ULL<<33)
+#define CMD_CATEGORY_DANGEROUS (1ULL<<34)
+#define CMD_CATEGORY_CONNECTION (1ULL<<35)
+#define CMD_CATEGORY_TRANSACTION (1ULL<<36)
+#define CMD_CATEGORY_SCRIPTING (1ULL<<37)
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
@@ -536,6 +538,10 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK)
#define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS)
+/* Bit flags for moduleTypeAuxSaveFunc */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
@@ -548,6 +554,8 @@ struct redisObject;
* is deleted. */
typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
+typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when);
+typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when);
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
@@ -564,6 +572,9 @@ typedef struct RedisModuleType {
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
@@ -837,7 +848,7 @@ typedef struct client {
uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
- int repl_put_online_on_ack; /* Install slave write handler on ACK. */
+ int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
@@ -886,7 +897,7 @@ struct moduleLoadQueueEntry {
struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
- *colon, *queued, *null[4], *nullarray[4],
+ *colon, *queued, *null[4], *nullarray[4], *emptymap[4], *emptyset[4],
*emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
@@ -1319,6 +1330,7 @@ struct redisServer {
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
+ int tracking_table_max_fill; /* Max fill percentage. */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@@ -1533,6 +1545,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
void moduleCallCommandFilters(client *c);
void ModuleForkDoneHandler(int exitcode, int bysignal);
void TerminateModuleForkChild(int wait);
+ssize_t rdbSaveModulesAux(rio *rdb, int when);
+int moduleAllDatatypesHandleErrors();
/* Utils */
long long ustime(void);
@@ -1643,6 +1657,9 @@ void enableTracking(client *c, uint64_t redirect_to);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
+void trackingInvalidateKeysOnFlush(int dbid);
+void trackingLimitUsedSlots(void);
+unsigned long long trackingGetUsedSlots(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
@@ -2328,6 +2345,7 @@ void bugReportStart(void);
void serverLogObjectDebugInfo(const robj *o);
void sigsegvHandler(int sig, siginfo_t *info, void *secret);
sds genRedisInfoString(char *section);
+sds genModulesInfoString(sds info);
void enableWatchdog(int period);
void disableWatchdog(void);
void watchdogScheduleSignal(int period);
diff --git a/src/sha256.c b/src/sha256.c
new file mode 100644
index 000000000..d644d2d4e
--- /dev/null
+++ b/src/sha256.c
@@ -0,0 +1,158 @@
+/*********************************************************************
+* Filename: sha256.c
+* Author: Brad Conte (brad AT bradconte.com)
+* Copyright:
+* Disclaimer: This code is presented "as is" without any guarantees.
+* Details: Implementation of the SHA-256 hashing algorithm.
+ SHA-256 is one of the three algorithms in the SHA2
+ specification. The others, SHA-384 and SHA-512, are not
+ offered in this implementation.
+ Algorithm specification can be found here:
+ * http://csrc.nist.gov/publications/fips/fips180-2/fips180-2withchangenotice.pdf
+ This implementation uses little endian byte order.
+*********************************************************************/
+
+/*************************** HEADER FILES ***************************/
+#include <stdlib.h>
+#include <string.h>
+#include "sha256.h"
+
+/****************************** MACROS ******************************/
+#define ROTLEFT(a,b) (((a) << (b)) | ((a) >> (32-(b))))
+#define ROTRIGHT(a,b) (((a) >> (b)) | ((a) << (32-(b))))
+
+#define CH(x,y,z) (((x) & (y)) ^ (~(x) & (z)))
+#define MAJ(x,y,z) (((x) & (y)) ^ ((x) & (z)) ^ ((y) & (z)))
+#define EP0(x) (ROTRIGHT(x,2) ^ ROTRIGHT(x,13) ^ ROTRIGHT(x,22))
+#define EP1(x) (ROTRIGHT(x,6) ^ ROTRIGHT(x,11) ^ ROTRIGHT(x,25))
+#define SIG0(x) (ROTRIGHT(x,7) ^ ROTRIGHT(x,18) ^ ((x) >> 3))
+#define SIG1(x) (ROTRIGHT(x,17) ^ ROTRIGHT(x,19) ^ ((x) >> 10))
+
+/**************************** VARIABLES *****************************/
+static const WORD k[64] = {
+ 0x428a2f98,0x71374491,0xb5c0fbcf,0xe9b5dba5,0x3956c25b,0x59f111f1,0x923f82a4,0xab1c5ed5,
+ 0xd807aa98,0x12835b01,0x243185be,0x550c7dc3,0x72be5d74,0x80deb1fe,0x9bdc06a7,0xc19bf174,
+ 0xe49b69c1,0xefbe4786,0x0fc19dc6,0x240ca1cc,0x2de92c6f,0x4a7484aa,0x5cb0a9dc,0x76f988da,
+ 0x983e5152,0xa831c66d,0xb00327c8,0xbf597fc7,0xc6e00bf3,0xd5a79147,0x06ca6351,0x14292967,
+ 0x27b70a85,0x2e1b2138,0x4d2c6dfc,0x53380d13,0x650a7354,0x766a0abb,0x81c2c92e,0x92722c85,
+ 0xa2bfe8a1,0xa81a664b,0xc24b8b70,0xc76c51a3,0xd192e819,0xd6990624,0xf40e3585,0x106aa070,
+ 0x19a4c116,0x1e376c08,0x2748774c,0x34b0bcb5,0x391c0cb3,0x4ed8aa4a,0x5b9cca4f,0x682e6ff3,
+ 0x748f82ee,0x78a5636f,0x84c87814,0x8cc70208,0x90befffa,0xa4506ceb,0xbef9a3f7,0xc67178f2
+};
+
+/*********************** FUNCTION DEFINITIONS ***********************/
+void sha256_transform(SHA256_CTX *ctx, const BYTE data[])
+{
+ WORD a, b, c, d, e, f, g, h, i, j, t1, t2, m[64];
+
+ for (i = 0, j = 0; i < 16; ++i, j += 4)
+ m[i] = (data[j] << 24) | (data[j + 1] << 16) | (data[j + 2] << 8) | (data[j + 3]);
+ for ( ; i < 64; ++i)
+ m[i] = SIG1(m[i - 2]) + m[i - 7] + SIG0(m[i - 15]) + m[i - 16];
+
+ a = ctx->state[0];
+ b = ctx->state[1];
+ c = ctx->state[2];
+ d = ctx->state[3];
+ e = ctx->state[4];
+ f = ctx->state[5];
+ g = ctx->state[6];
+ h = ctx->state[7];
+
+ for (i = 0; i < 64; ++i) {
+ t1 = h + EP1(e) + CH(e,f,g) + k[i] + m[i];
+ t2 = EP0(a) + MAJ(a,b,c);
+ h = g;
+ g = f;
+ f = e;
+ e = d + t1;
+ d = c;
+ c = b;
+ b = a;
+ a = t1 + t2;
+ }
+
+ ctx->state[0] += a;
+ ctx->state[1] += b;
+ ctx->state[2] += c;
+ ctx->state[3] += d;
+ ctx->state[4] += e;
+ ctx->state[5] += f;
+ ctx->state[6] += g;
+ ctx->state[7] += h;
+}
+
+void sha256_init(SHA256_CTX *ctx)
+{
+ ctx->datalen = 0;
+ ctx->bitlen = 0;
+ ctx->state[0] = 0x6a09e667;
+ ctx->state[1] = 0xbb67ae85;
+ ctx->state[2] = 0x3c6ef372;
+ ctx->state[3] = 0xa54ff53a;
+ ctx->state[4] = 0x510e527f;
+ ctx->state[5] = 0x9b05688c;
+ ctx->state[6] = 0x1f83d9ab;
+ ctx->state[7] = 0x5be0cd19;
+}
+
+void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len)
+{
+ WORD i;
+
+ for (i = 0; i < len; ++i) {
+ ctx->data[ctx->datalen] = data[i];
+ ctx->datalen++;
+ if (ctx->datalen == 64) {
+ sha256_transform(ctx, ctx->data);
+ ctx->bitlen += 512;
+ ctx->datalen = 0;
+ }
+ }
+}
+
+void sha256_final(SHA256_CTX *ctx, BYTE hash[])
+{
+ WORD i;
+
+ i = ctx->datalen;
+
+ // Pad whatever data is left in the buffer.
+ if (ctx->datalen < 56) {
+ ctx->data[i++] = 0x80;
+ while (i < 56)
+ ctx->data[i++] = 0x00;
+ }
+ else {
+ ctx->data[i++] = 0x80;
+ while (i < 64)
+ ctx->data[i++] = 0x00;
+ sha256_transform(ctx, ctx->data);
+ memset(ctx->data, 0, 56);
+ }
+
+ // Append to the padding the total message's length in bits and transform.
+ ctx->bitlen += ctx->datalen * 8;
+ ctx->data[63] = ctx->bitlen;
+ ctx->data[62] = ctx->bitlen >> 8;
+ ctx->data[61] = ctx->bitlen >> 16;
+ ctx->data[60] = ctx->bitlen >> 24;
+ ctx->data[59] = ctx->bitlen >> 32;
+ ctx->data[58] = ctx->bitlen >> 40;
+ ctx->data[57] = ctx->bitlen >> 48;
+ ctx->data[56] = ctx->bitlen >> 56;
+ sha256_transform(ctx, ctx->data);
+
+ // Since this implementation uses little endian byte ordering and SHA uses big endian,
+ // reverse all the bytes when copying the final state to the output hash.
+ for (i = 0; i < 4; ++i) {
+ hash[i] = (ctx->state[0] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 4] = (ctx->state[1] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 8] = (ctx->state[2] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 12] = (ctx->state[3] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 16] = (ctx->state[4] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 20] = (ctx->state[5] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 24] = (ctx->state[6] >> (24 - i * 8)) & 0x000000ff;
+ hash[i + 28] = (ctx->state[7] >> (24 - i * 8)) & 0x000000ff;
+ }
+}
diff --git a/src/sha256.h b/src/sha256.h
new file mode 100644
index 000000000..dc53ead2b
--- /dev/null
+++ b/src/sha256.h
@@ -0,0 +1,35 @@
+/*********************************************************************
+* Filename: sha256.h
+* Author: Brad Conte (brad AT bradconte.com)
+* Copyright:
+* Disclaimer: This code is presented "as is" without any guarantees.
+* Details: Defines the API for the corresponding SHA1 implementation.
+*********************************************************************/
+
+#ifndef SHA256_H
+#define SHA256_H
+
+/*************************** HEADER FILES ***************************/
+#include <stddef.h>
+#include <stdint.h>
+
+/****************************** MACROS ******************************/
+#define SHA256_BLOCK_SIZE 32 // SHA256 outputs a 32 byte digest
+
+/**************************** DATA TYPES ****************************/
+typedef uint8_t BYTE; // 8-bit byte
+typedef uint32_t WORD; // 32-bit word
+
+typedef struct {
+ BYTE data[64];
+ WORD datalen;
+ unsigned long long bitlen;
+ WORD state[8];
+} SHA256_CTX;
+
+/*********************** FUNCTION DECLARATIONS **********************/
+void sha256_init(SHA256_CTX *ctx);
+void sha256_update(SHA256_CTX *ctx, const BYTE data[], size_t len);
+void sha256_final(SHA256_CTX *ctx, BYTE hash[]);
+
+#endif // SHA256_H
diff --git a/src/siphash.c b/src/siphash.c
index 6b9419031..357741132 100644
--- a/src/siphash.c
+++ b/src/siphash.c
@@ -58,7 +58,8 @@ int siptlw(int c) {
/* Test of the CPU is Little Endian and supports not aligned accesses.
* Two interesting conditions to speedup the function that happen to be
* in most of x86 servers. */
-#if defined(__X86_64__) || defined(__x86_64__) || defined (__i386__)
+#if defined(__X86_64__) || defined(__x86_64__) || defined (__i386__) \
+ || defined (__aarch64__) || defined (__arm64__)
#define UNALIGNED_LE_CPU
#endif
diff --git a/src/stream.h b/src/stream.h
index ef08753b5..8ae90ce77 100644
--- a/src/stream.h
+++ b/src/stream.h
@@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
+void streamFreeNACK(streamNACK *na);
#endif
diff --git a/src/t_hash.c b/src/t_hash.c
index bc70e4051..e6ed33819 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -772,8 +772,8 @@ void genericHgetallCommand(client *c, int flags) {
hashTypeIterator *hi;
int length, count = 0;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL
- || checkType(c,o,OBJ_HASH)) return;
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymap[c->resp]))
+ == NULL || checkType(c,o,OBJ_HASH)) return;
/* We return a map if the user requested keys and values, like in the
* HGETALL case. Otherwise to use a flat array makes more sense. */
diff --git a/src/t_list.c b/src/t_list.c
index 54e4959b9..9bbd61de3 100644
--- a/src/t_list.c
+++ b/src/t_list.c
@@ -402,7 +402,7 @@ void lrangeCommand(client *c) {
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL
+ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
@@ -414,7 +414,7 @@ void lrangeCommand(client *c) {
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
return;
}
if (end >= llen) end = llen-1;
@@ -606,7 +606,7 @@ void rpoplpushCommand(client *c) {
* Blocking POP operations
*----------------------------------------------------------------------------*/
-/* This is a helper function for handleClientsBlockedOnLists(). It's work
+/* This is a helper function for handleClientsBlockedOnKeys(). It's work
* is to serve a specific client (receiver) that is blocked on 'key'
* in the context of the specified 'db', doing the following:
*
diff --git a/src/t_set.c b/src/t_set.c
index 05d9ee243..abbf82fde 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -418,10 +418,10 @@ void spopWithCountCommand(client *c) {
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
- /* If count is zero, serve an empty multibulk ASAP to avoid special
+ /* If count is zero, serve an empty set ASAP to avoid special
* cases later. */
if (count == 0) {
- addReplyNull(c);
+ addReply(c,shared.emptyset[c->resp]);
return;
}
@@ -632,13 +632,13 @@ void srandmemberWithCountCommand(client *c) {
uniq = 0;
}
- if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyset[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
size = setTypeSize(set);
/* If count is zero, serve it ASAP to avoid special cases later. */
if (count == 0) {
- addReplyNull(c);
+ addReply(c,shared.emptyset[c->resp]);
return;
}
@@ -813,7 +813,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
}
addReply(c,shared.czero);
} else {
- addReplyNull(c);
+ addReply(c,shared.emptyset[c->resp]);
}
return;
}
diff --git a/src/t_zset.c b/src/t_zset.c
index fb7078abd..ea6f4b848 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
- if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
- zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
- if (sdslen(ele) > server.zset_max_ziplist_value)
+ if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
+ sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
@@ -2427,7 +2426,7 @@ void zrangeGenericCommand(client *c, int reverse) {
return;
}
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL
|| checkType(c,zobj,OBJ_ZSET)) return;
/* Sanitize indexes. */
@@ -2439,7 +2438,7 @@ void zrangeGenericCommand(client *c, int reverse) {
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
return;
}
if (end >= llen) end = llen-1;
@@ -2575,7 +2574,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
}
/* Ok, lookup the key and get the range */
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
@@ -2595,7 +2594,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
/* No "first" element in the specified interval. */
if (eptr == NULL) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
return;
}
@@ -2662,7 +2661,7 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
/* No "first" element in the specified interval. */
if (ln == NULL) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
return;
}
@@ -2920,7 +2919,7 @@ void genericZrangebylexCommand(client *c, int reverse) {
}
/* Ok, lookup the key and get the range */
- if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL ||
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL ||
checkType(c,zobj,OBJ_ZSET))
{
zslFreeLexRange(&range);
@@ -2943,7 +2942,7 @@ void genericZrangebylexCommand(client *c, int reverse) {
/* No "first" element in the specified interval. */
if (eptr == NULL) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
zslFreeLexRange(&range);
return;
}
@@ -3007,7 +3006,7 @@ void genericZrangebylexCommand(client *c, int reverse) {
/* No "first" element in the specified interval. */
if (ln == NULL) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
zslFreeLexRange(&range);
return;
}
@@ -3161,7 +3160,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
/* No candidate for zpopping, return empty. */
if (!zobj) {
- addReplyNull(c);
+ addReply(c,shared.emptyarray);
return;
}
diff --git a/src/tracking.c b/src/tracking.c
index bbfc66a72..f7f0fc755 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -60,6 +60,7 @@
* use the most significant bits instead of the full 24 bits. */
#define TRACKING_TABLE_SIZE (1<<24)
rax **TrackingTable = NULL;
+unsigned long TrackingTableUsedSlots = 0;
robj *TrackingChannelName;
/* Remove the tracking state from the client 'c'. Note that there is not much
@@ -109,67 +110,187 @@ void trackingRememberKeys(client *c) {
sds sdskey = c->argv[idx]->ptr;
uint64_t hash = crc64(0,
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable[hash] == NULL)
+ if (TrackingTable[hash] == NULL) {
TrackingTable[hash] = raxNew();
+ TrackingTableUsedSlots++;
+ }
raxTryInsert(TrackingTable[hash],
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
}
getKeysFreeResult(keys);
}
-/* This function is called from signalModifiedKey() or other places in Redis
- * when a key changes value. In the context of keys tracking, our task here is
- * to send a notification to every client that may have keys about such . */
-void trackingInvalidateKey(robj *keyobj) {
- sds sdskey = keyobj->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable == NULL || TrackingTable[hash] == NULL) return;
+void sendTrackingMessage(client *c, long long hash) {
+ int using_redirection = 0;
+ if (c->client_tracking_redirection) {
+ client *redir = lookupClientByID(c->client_tracking_redirection);
+ if (!redir) {
+ /* We need to signal to the original connection that we
+ * are unable to send invalidation messages to the redirected
+ * connection, because the client no longer exist. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,3);
+ addReplyBulkCBuffer(c,"tracking-redir-broken",21);
+ addReplyLongLong(c,c->client_tracking_redirection);
+ }
+ return;
+ }
+ c = redir;
+ using_redirection = 1;
+ }
+
+ /* Only send such info for clients in RESP version 3 or more. However
+ * if redirection is active, and the connection we redirect to is
+ * in Pub/Sub mode, we can support the feature with RESP 2 as well,
+ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
+ if (c->resp > 2) {
+ addReplyPushLen(c,2);
+ addReplyBulkCBuffer(c,"invalidate",10);
+ addReplyLongLong(c,hash);
+ } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
+ robj *msg = createStringObjectFromLongLong(hash);
+ addReplyPubsubMessage(c,TrackingChannelName,msg);
+ decrRefCount(msg);
+ }
+}
+
+/* Invalidates a caching slot: this is actually the low level implementation
+ * of the API that Redis calls externally, that is trackingInvalidateKey(). */
+void trackingInvalidateSlot(uint64_t slot) {
+ if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
raxIterator ri;
- raxStart(&ri,TrackingTable[hash]);
+ raxStart(&ri,TrackingTable[slot]);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id;
memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id);
- if (c == NULL) continue;
- int using_redirection = 0;
- if (c->client_tracking_redirection) {
- client *redir = lookupClientByID(c->client_tracking_redirection);
- if (!redir) {
- /* We need to signal to the original connection that we
- * are unable to send invalidation messages to the redirected
- * connection, because the client no longer exist. */
- if (c->resp > 2) {
- addReplyPushLen(c,3);
- addReplyBulkCBuffer(c,"tracking-redir-broken",21);
- addReplyLongLong(c,c->client_tracking_redirection);
- }
- continue;
+ if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
+ sendTrackingMessage(c,slot);
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this caching slot. */
+ raxFree(TrackingTable[slot]);
+ TrackingTable[slot] = NULL;
+ TrackingTableUsedSlots--;
+}
+
+/* This function is called from signalModifiedKey() or other places in Redis
+ * when a key changes value. In the context of keys tracking, our task here is
+ * to send a notification to every client that may have keys about such caching
+ * slot. */
+void trackingInvalidateKey(robj *keyobj) {
+ if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
+
+ sds sdskey = keyobj->ptr;
+ uint64_t hash = crc64(0,
+ (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
+ trackingInvalidateSlot(hash);
+}
+
+/* This function is called when one or all the Redis databases are flushed
+ * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
+ * each DB but are global: currently what we do is sending a special
+ * notification to clients with tracking enabled, invalidating the caching
+ * slot "-1", which means, "all the keys", in order to avoid flooding clients
+ * with many invalidation messages for all the keys they may hold.
+ *
+ * However trying to flush the tracking table here is very costly:
+ * we need scanning 16 million caching slots in the table to check
+ * if they are used, this introduces a big delay. So what we do is to really
+ * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
+ * we just send the invalidation message to all the clients, but don't
+ * flush the table: it will slowly get garbage collected as more keys
+ * are modified in the used caching slots. */
+void trackingInvalidateKeysOnFlush(int dbid) {
+ if (server.tracking_clients) {
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_TRACKING) {
+ sendTrackingMessage(c,-1);
+ }
+ }
+ }
+
+ /* In case of FLUSHALL, reclaim all the memory used by tracking. */
+ if (dbid == -1 && TrackingTable) {
+ for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
+ if (TrackingTable[j] != NULL) {
+ raxFree(TrackingTable[j]);
+ TrackingTable[j] = NULL;
+ TrackingTableUsedSlots--;
}
- c = redir;
- using_redirection = 1;
}
- /* Only send such info for clients in RESP version 3 or more. However
- * if redirection is active, and the connection we redirect to is
- * in Pub/Sub mode, we can support the feature with RESP 2 as well,
- * by sending Pub/Sub messages in the __redis__:invalidate channel. */
- if (c->resp > 2) {
- addReplyPushLen(c,2);
- addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
- } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
+ /* If there are no clients with tracking enabled, we can even
+ * reclaim the memory used by the table itself. The code assumes
+ * the table is allocated only if there is at least one client alive
+ * with tracking enabled. */
+ if (server.tracking_clients == 0) {
+ zfree(TrackingTable);
+ TrackingTable = NULL;
}
}
- raxStop(&ri);
+}
- /* Free the tracking table: we'll create the radix tree and populate it
- * again if more keys will be modified in this hash slot. */
- raxFree(TrackingTable[hash]);
- TrackingTable[hash] = NULL;
+/* Tracking forces Redis to remember information about which client may have
+ * keys about certian caching slots. In workloads where there are a lot of
+ * reads, but keys are hardly modified, the amount of information we have
+ * to remember server side could be a lot: for each 16 millions of caching
+ * slots we may end with a radix tree containing many entries.
+ *
+ * So Redis allows the user to configure a maximum fill rate for the
+ * invalidation table. This function makes sure that we don't go over the
+ * specified fill rate: if we are over, we can just evict informations about
+ * random caching slots, and send invalidation messages to clients like if
+ * the key was modified. */
+void trackingLimitUsedSlots(void) {
+ static unsigned int timeout_counter = 0;
+
+ if (server.tracking_table_max_fill == 0) return; /* No limits set. */
+ unsigned int max_slots =
+ (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
+ if (TrackingTableUsedSlots <= max_slots) {
+ timeout_counter = 0;
+ return; /* Limit not reached. */
+ }
+
+ /* We have to invalidate a few slots to reach the limit again. The effort
+ * we do here is proportional to the number of times we entered this
+ * function and found that we are still over the limit. */
+ int effort = 100 * (timeout_counter+1);
+
+ /* Let's start at a random position, and perform linear probing, in order
+ * to improve cache locality. However once we are able to find an used
+ * slot, jump again randomly, in order to avoid creating big holes in the
+ * table (that will make this funciton use more resourced later). */
+ while(effort > 0) {
+ unsigned int idx = rand() % TRACKING_TABLE_SIZE;
+ do {
+ effort--;
+ idx = (idx+1) % TRACKING_TABLE_SIZE;
+ if (TrackingTable[idx] != NULL) {
+ trackingInvalidateSlot(idx);
+ if (TrackingTableUsedSlots <= max_slots) {
+ timeout_counter = 0;
+ return; /* Return ASAP: we are again under the limit. */
+ } else {
+ break; /* Jump to next random position. */
+ }
+ }
+ } while(effort > 0);
+ }
+ timeout_counter++;
+}
+
+/* This is just used in order to access the amount of used slots in the
+ * tracking table. */
+unsigned long long trackingGetUsedSlots(void) {
+ return TrackingTableUsedSlots;
}
diff --git a/src/zmalloc.c b/src/zmalloc.c
index 5e6010278..fd8bb6938 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -294,6 +294,26 @@ size_t zmalloc_get_rss(void) {
return t_info.resident_size;
}
+#elif defined(__FreeBSD__)
+#include <sys/types.h>
+#include <sys/sysctl.h>
+#include <sys/user.h>
+#include <unistd.h>
+
+size_t zmalloc_get_rss(void) {
+ struct kinfo_proc info;
+ size_t infolen = sizeof(info);
+ int mib[4];
+ mib[0] = CTL_KERN;
+ mib[1] = KERN_PROC;
+ mib[2] = KERN_PROC_PID;
+ mib[3] = getpid();
+
+ if (sysctl(mib, 4, &info, &infolen, NULL, 0) == 0)
+ return (size_t)info.ki_rssize;
+
+ return 0L;
+}
#else
size_t zmalloc_get_rss(void) {
/* If we can't get the RSS in an OS-specific way for this system just