summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ci.yml21
-rw-r--r--README.md18
-rw-r--r--TLS.md45
-rw-r--r--deps/lua/src/lua_struct.c10
-rw-r--r--redis.conf27
-rw-r--r--src/acl.c202
-rw-r--r--src/ae.c8
-rw-r--r--src/aof.c10
-rw-r--r--src/config.c140
-rw-r--r--src/db.c79
-rw-r--r--src/debug.c11
-rw-r--r--src/defrag.c96
-rw-r--r--src/module.c67
-rw-r--r--src/multi.c4
-rw-r--r--src/networking.c163
-rw-r--r--src/notify.c2
-rw-r--r--src/object.c49
-rw-r--r--src/pubsub.c8
-rw-r--r--src/quicklist.c150
-rw-r--r--src/quicklist.h46
-rw-r--r--src/rax.c1
-rw-r--r--src/rdb.c5
-rw-r--r--src/redis-cli.c32
-rw-r--r--src/redismodule.h4
-rw-r--r--src/replication.c30
-rw-r--r--src/scripting.c5
-rw-r--r--src/sds.h2
-rw-r--r--src/server.c44
-rw-r--r--src/server.h30
-rw-r--r--src/t_stream.c6
-rw-r--r--src/tracking.c421
-rw-r--r--src/util.c3
-rw-r--r--tests/modules/blockonkeys.c13
-rw-r--r--tests/modules/fork.c2
-rw-r--r--tests/modules/misc.c13
-rw-r--r--tests/support/server.tcl126
-rw-r--r--tests/test_helper.tcl15
-rw-r--r--tests/unit/acl.tcl107
-rw-r--r--tests/unit/memefficiency.tcl133
-rw-r--r--tests/unit/moduleapi/blockonkeys.tcl35
-rw-r--r--tests/unit/moduleapi/fork.tcl3
-rw-r--r--tests/unit/scripting.tcl5
-rw-r--r--tests/unit/tracking.tcl66
-rw-r--r--tests/unit/type/hash.tcl7
-rw-r--r--tests/unit/type/stream-cgroups.tcl9
45 files changed, 1720 insertions, 553 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 847abcf02..cc4991606 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,11 +3,8 @@ name: CI
on: [push, pull_request]
jobs:
- build-ubuntu:
- strategy:
- matrix:
- platform: [ubuntu-latest, ubuntu-16.04]
- runs-on: ${{ matrix.platform }}
+ test-ubuntu-latest:
+ runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
@@ -15,13 +12,17 @@ jobs:
- name: test
run: |
sudo apt-get install tcl8.5
- make test
+ ./runtest --clients 2 --verbose
+
+ build-ubuntu-old:
+ runs-on: ubuntu-16.04
+ steps:
+ - uses: actions/checkout@v1
+ - name: make
+ run: make
build-macos-latest:
- strategy:
- matrix:
- platform: [macos-latest, macOS-10.14]
- runs-on: ${{ matrix.platform }}
+ runs-on: macos-latest
steps:
- uses: actions/checkout@v1
- name: make
diff --git a/README.md b/README.md
index 3442659e6..c08013416 100644
--- a/README.md
+++ b/README.md
@@ -35,6 +35,11 @@ It is as simple as:
% make
+To build with TLS support, you'll need OpenSSL development libraries (e.g.
+libssl-dev on Debian/Ubuntu) and run:
+
+ % make BUILD_TLS=yes
+
You can run a 32 bit Redis binary using:
% make 32bit
@@ -43,6 +48,13 @@ After building Redis, it is a good idea to test it using:
% make test
+If TLS is built, running the tests with TLS enabled (you will need `tcl-tls`
+installed):
+
+ % ./utils/gen-test-certs.sh
+ % ./runtest --tls
+
+
Fixing build problems with dependencies or cached build options
---------
@@ -125,6 +137,12 @@ as options using the command line. Examples:
All the options in redis.conf are also supported as options using the command
line, with exactly the same name.
+Running Redis with TLS:
+------------------
+
+Please consult the [TLS.md](TLS.md) file for more information on
+how to use Redis with TLS.
+
Playing with Redis
------------------
diff --git a/TLS.md b/TLS.md
index 76fe0be2e..e480c1e9d 100644
--- a/TLS.md
+++ b/TLS.md
@@ -1,8 +1,5 @@
-TLS Support -- Work In Progress
-===============================
-
-This is a brief note to capture current thoughts/ideas and track pending action
-items.
+TLS Support
+===========
Getting Started
---------------
@@ -69,37 +66,23 @@ probably not be so hard. For cluster keys migration it might be more difficult,
but there are probably other good reasons to improve that part anyway.
To-Do List
-==========
-
-Additional TLS Features
------------------------
-
-1. Add metrics to INFO?
-2. Add session caching support. Check if/how it's handled by clients to assess
- how useful/important it is.
-
-redis-benchmark
----------------
-
-The current implementation is a mix of using hiredis for parsing and basic
-networking (establishing connections), but directly manipulating sockets for
-most actions.
-
-This will need to be cleaned up for proper TLS support. The best approach is
-probably to migrate to hiredis async mode.
-
-redis-cli
----------
+----------
-1. Add support for TLS in --slave and --rdb modes.
+- [ ] Add session caching support. Check if/how it's handled by clients to
+ assess how useful/important it is.
+- [ ] redis-benchmark support. The current implementation is a mix of using
+ hiredis for parsing and basic networking (establishing connections), but
+ directly manipulating sockets for most actions. This will need to be cleaned
+ up for proper TLS support. The best approach is probably to migrate to hiredis
+ async mode.
+- [ ] redis-cli `--slave` and `--rdb` support.
-Others
-------
+Multi-port
+----------
Consider the implications of allowing TLS to be configured on a separate port,
-making Redis listening on multiple ports.
+making Redis listening on multiple ports:
-This impacts many things, like
1. Startup banner port notification
2. Proctitle
3. How slaves announce themselves
diff --git a/deps/lua/src/lua_struct.c b/deps/lua/src/lua_struct.c
index 4d5f027b8..c58c8e72b 100644
--- a/deps/lua/src/lua_struct.c
+++ b/deps/lua/src/lua_struct.c
@@ -89,12 +89,14 @@ typedef struct Header {
} Header;
-static int getnum (const char **fmt, int df) {
+static int getnum (lua_State *L, const char **fmt, int df) {
if (!isdigit(**fmt)) /* no number? */
return df; /* return default value */
else {
int a = 0;
do {
+ if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0')))
+ luaL_error(L, "integral size overflow");
a = a*10 + *((*fmt)++) - '0';
} while (isdigit(**fmt));
return a;
@@ -115,9 +117,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) {
case 'f': return sizeof(float);
case 'd': return sizeof(double);
case 'x': return 1;
- case 'c': return getnum(fmt, 1);
+ case 'c': return getnum(L, fmt, 1);
case 'i': case 'I': {
- int sz = getnum(fmt, sizeof(int));
+ int sz = getnum(L, fmt, sizeof(int));
if (sz > MAXINTSIZE)
luaL_error(L, "integral size %d is larger than limit of %d",
sz, MAXINTSIZE);
@@ -150,7 +152,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt,
case '>': h->endian = BIG; return;
case '<': h->endian = LITTLE; return;
case '!': {
- int a = getnum(fmt, MAXALIGN);
+ int a = getnum(L, fmt, MAXALIGN);
if (!isp2(a))
luaL_error(L, "alignment %d is not a power of 2", a);
h->align = a;
diff --git a/redis.conf b/redis.conf
index 07005cffe..c04880f32 100644
--- a/redis.conf
+++ b/redis.conf
@@ -155,23 +155,22 @@ tcp-keepalive 300
# tls-ca-cert-file ca.crt
# tls-ca-cert-dir /etc/ssl/certs
-# If TLS/SSL clients are required to authenticate using a client side
-# certificate, use this directive.
+# By default, clients (including replica servers) on a TLS port are required
+# to authenticate using valid client side certificates.
#
-# Note: this applies to all incoming clients, including replicas.
+# It is possible to disable authentication using this directive.
#
-# tls-auth-clients yes
+# tls-auth-clients no
-# If TLS/SSL should be used when connecting as a replica to a master, enable
-# this configuration directive:
+# By default, a Redis replica does not attempt to establish a TLS connection
+# with its master.
+#
+# Use the following directive to enable TLS on replication links.
#
# tls-replication yes
-# If TLS/SSL should be used for the Redis Cluster bus, enable this configuration
-# directive.
-#
-# NOTE: If TLS/SSL is enabled for Cluster Bus, mutual authentication is always
-# enforced.
+# By default, the Redis Cluster bus uses a plain TCP connection. To enable
+# TLS for the bus protocol, use the following directive:
#
# tls-cluster yes
@@ -1362,7 +1361,11 @@ latency-monitor-threshold 0
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
-# A Alias for g$lshzxe, so that the "AKE" string means all the events.
+# t Stream commands
+# m Key-miss events (Note: It is not included in the 'A' class)
+# A Alias for g$lshzxet, so that the "AKE" string means all the events
+# (Except key-miss events which are excluded from 'A' due to their
+# unique nature).
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
diff --git a/src/acl.c b/src/acl.c
index 1f395bd3f..b046785ff 100644
--- a/src/acl.c
+++ b/src/acl.c
@@ -49,6 +49,8 @@ list *UsersToLoad; /* This is a list of users found in the configuration file
array of SDS pointers: the first is the user name,
all the remaining pointers are ACL rules in the same
format as ACLSetUser(). */
+list *ACLLog; /* Our security log, the user is able to inspect that
+ using the ACL LOG command .*/
struct ACLCategoryItem {
const char *name;
@@ -93,6 +95,7 @@ struct ACLUserFlag {
void ACLResetSubcommandsForCommand(user *u, unsigned long id);
void ACLResetSubcommands(user *u);
void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub);
+void ACLFreeLogEntry(void *le);
/* The length of the string representation of a hashed password. */
#define HASH_PASSWORD_LEN SHA256_BLOCK_SIZE*2
@@ -920,6 +923,7 @@ void ACLInitDefaultUser(void) {
void ACLInit(void) {
Users = raxNew();
UsersToLoad = listCreate();
+ ACLLog = listCreate();
ACLInitDefaultUser();
}
@@ -978,6 +982,7 @@ int ACLAuthenticateUser(client *c, robj *username, robj *password) {
moduleNotifyUserChanged(c);
return C_OK;
} else {
+ addACLLogEntry(c,ACL_DENIED_AUTH,0,username->ptr);
return C_ERR;
}
}
@@ -1034,7 +1039,7 @@ user *ACLGetUserByName(const char *name, size_t namelen) {
* command cannot be executed because the user is not allowed to run such
* command, the second if the command is denied because the user is trying
* to access keys that are not among the specified patterns. */
-int ACLCheckCommandPerm(client *c) {
+int ACLCheckCommandPerm(client *c, int *keyidxptr) {
user *u = c->user;
uint64_t id = c->cmd->id;
@@ -1094,6 +1099,7 @@ int ACLCheckCommandPerm(client *c) {
}
}
if (!match) {
+ if (keyidxptr) *keyidxptr = keyidx[j];
getKeysFreeResult(keyidx);
return ACL_DENIED_KEY;
}
@@ -1455,12 +1461,138 @@ void ACLLoadUsersAtStartup(void) {
}
/* =============================================================================
+ * ACL log
+ * ==========================================================================*/
+
+#define ACL_LOG_CTX_TOPLEVEL 0
+#define ACL_LOG_CTX_LUA 1
+#define ACL_LOG_CTX_MULTI 2
+#define ACL_LOG_GROUPING_MAX_TIME_DELTA 60000
+
+/* This structure defines an entry inside the ACL log. */
+typedef struct ACLLogEntry {
+ uint64_t count; /* Number of times this happened recently. */
+ int reason; /* Reason for denying the command. ACL_DENIED_*. */
+ int context; /* Toplevel, Lua or MULTI/EXEC? ACL_LOG_CTX_*. */
+ sds object; /* The key name or command name. */
+ sds username; /* User the client is authenticated with. */
+ mstime_t ctime; /* Milliseconds time of last update to this entry. */
+ sds cinfo; /* Client info (last client if updated). */
+} ACLLogEntry;
+
+/* This function will check if ACL entries 'a' and 'b' are similar enough
+ * that we should actually update the existing entry in our ACL log instead
+ * of creating a new one. */
+int ACLLogMatchEntry(ACLLogEntry *a, ACLLogEntry *b) {
+ if (a->reason != b->reason) return 0;
+ if (a->context != b->context) return 0;
+ mstime_t delta = a->ctime - b->ctime;
+ if (delta < 0) delta = -delta;
+ if (delta > ACL_LOG_GROUPING_MAX_TIME_DELTA) return 0;
+ if (sdscmp(a->object,b->object) != 0) return 0;
+ if (sdscmp(a->username,b->username) != 0) return 0;
+ return 1;
+}
+
+/* Release an ACL log entry. */
+void ACLFreeLogEntry(void *leptr) {
+ ACLLogEntry *le = leptr;
+ sdsfree(le->object);
+ sdsfree(le->username);
+ sdsfree(le->cinfo);
+ zfree(le);
+}
+
+/* Adds a new entry in the ACL log, making sure to delete the old entry
+ * if we reach the maximum length allowed for the log. This function attempts
+ * to find similar entries in the current log in order to bump the counter of
+ * the log entry instead of creating many entries for very similar ACL
+ * rules issues.
+ *
+ * The keypos argument is only used when the reason is ACL_DENIED_KEY, since
+ * it allows the function to log the key name that caused the problem.
+ * Similarly the username is only passed when we failed to authenticate the
+ * user with AUTH or HELLO, for the ACL_DENIED_AUTH reason. Otherwise
+ * it will just be NULL.
+ */
+void addACLLogEntry(client *c, int reason, int keypos, sds username) {
+ /* Create a new entry. */
+ struct ACLLogEntry *le = zmalloc(sizeof(*le));
+ le->count = 1;
+ le->reason = reason;
+ le->username = sdsdup(reason == ACL_DENIED_AUTH ? username : c->user->name);
+ le->ctime = mstime();
+
+ switch(reason) {
+ case ACL_DENIED_CMD: le->object = sdsnew(c->cmd->name); break;
+ case ACL_DENIED_KEY: le->object = sdsnew(c->argv[keypos]->ptr); break;
+ case ACL_DENIED_AUTH: le->object = sdsnew(c->argv[0]->ptr); break;
+ default: le->object = sdsempty();
+ }
+
+ client *realclient = c;
+ if (realclient->flags & CLIENT_LUA) realclient = server.lua_caller;
+
+ le->cinfo = catClientInfoString(sdsempty(),realclient);
+ if (c->flags & CLIENT_MULTI) {
+ le->context = ACL_LOG_CTX_MULTI;
+ } else if (c->flags & CLIENT_LUA) {
+ le->context = ACL_LOG_CTX_LUA;
+ } else {
+ le->context = ACL_LOG_CTX_TOPLEVEL;
+ }
+
+ /* Try to match this entry with past ones, to see if we can just
+ * update an existing entry instead of creating a new one. */
+ long toscan = 10; /* Do a limited work trying to find duplicated. */
+ listIter li;
+ listNode *ln;
+ listRewind(ACLLog,&li);
+ ACLLogEntry *match = NULL;
+ while (toscan-- && (ln = listNext(&li)) != NULL) {
+ ACLLogEntry *current = listNodeValue(ln);
+ if (ACLLogMatchEntry(current,le)) {
+ match = current;
+ listDelNode(ACLLog,ln);
+ listAddNodeHead(ACLLog,current);
+ break;
+ }
+ }
+
+ /* If there is a match update the entry, otherwise add it as a
+ * new one. */
+ if (match) {
+ /* We update a few fields of the existing entry and bump the
+ * counter of events for this entry. */
+ sdsfree(match->cinfo);
+ match->cinfo = le->cinfo;
+ match->ctime = le->ctime;
+ match->count++;
+
+ /* Release the old entry. */
+ le->cinfo = NULL;
+ ACLFreeLogEntry(le);
+ } else {
+ /* Add it to our list of entires. We'll have to trim the list
+ * to its maximum size. */
+ listAddNodeHead(ACLLog, le);
+ while(listLength(ACLLog) > server.acllog_max_len) {
+ listNode *ln = listLast(ACLLog);
+ ACLLogEntry *le = listNodeValue(ln);
+ ACLFreeLogEntry(le);
+ listDelNode(ACLLog,ln);
+ }
+ }
+}
+
+/* =============================================================================
* ACL related commands
* ==========================================================================*/
/* ACL -- show and modify the configuration of ACL users.
* ACL HELP
* ACL LOAD
+ * ACL SAVE
* ACL LIST
* ACL USERS
* ACL CAT [<category>]
@@ -1469,6 +1601,7 @@ void ACLLoadUsersAtStartup(void) {
* ACL GETUSER <username>
* ACL GENPASS
* ACL WHOAMI
+ * ACL LOG [<count> | RESET]
*/
void aclCommand(client *c) {
char *sub = c->argv[1]->ptr;
@@ -1655,9 +1788,75 @@ void aclCommand(client *c) {
char pass[32]; /* 128 bits of actual pseudo random data. */
getRandomHexChars(pass,sizeof(pass));
addReplyBulkCBuffer(c,pass,sizeof(pass));
+ } else if (!strcasecmp(sub,"log") && (c->argc == 2 || c->argc ==3)) {
+ long count = 10; /* Number of entries to emit by default. */
+
+ /* Parse the only argument that LOG may have: it could be either
+ * the number of entires the user wants to display, or alternatively
+ * the "RESET" command in order to flush the old entires. */
+ if (c->argc == 3) {
+ if (!strcasecmp(c->argv[2]->ptr,"reset")) {
+ listSetFreeMethod(ACLLog,ACLFreeLogEntry);
+ listEmpty(ACLLog);
+ listSetFreeMethod(ACLLog,NULL);
+ addReply(c,shared.ok);
+ return;
+ } else if (getLongFromObjectOrReply(c,c->argv[2],&count,NULL)
+ != C_OK)
+ {
+ return;
+ }
+ if (count < 0) count = 0;
+ }
+
+ /* Fix the count according to the number of entries we got. */
+ if ((size_t)count > listLength(ACLLog))
+ count = listLength(ACLLog);
+
+ addReplyArrayLen(c,count);
+ listIter li;
+ listNode *ln;
+ listRewind(ACLLog,&li);
+ mstime_t now = mstime();
+ while (count-- && (ln = listNext(&li)) != NULL) {
+ ACLLogEntry *le = listNodeValue(ln);
+ addReplyMapLen(c,7);
+ addReplyBulkCString(c,"count");
+ addReplyLongLong(c,le->count);
+
+ addReplyBulkCString(c,"reason");
+ char *reasonstr;
+ switch(le->reason) {
+ case ACL_DENIED_CMD: reasonstr="command"; break;
+ case ACL_DENIED_KEY: reasonstr="key"; break;
+ case ACL_DENIED_AUTH: reasonstr="auth"; break;
+ }
+ addReplyBulkCString(c,reasonstr);
+
+ addReplyBulkCString(c,"context");
+ char *ctxstr;
+ switch(le->context) {
+ case ACL_LOG_CTX_TOPLEVEL: ctxstr="toplevel"; break;
+ case ACL_LOG_CTX_MULTI: ctxstr="multi"; break;
+ case ACL_LOG_CTX_LUA: ctxstr="lua"; break;
+ default: ctxstr="unknown";
+ }
+ addReplyBulkCString(c,ctxstr);
+
+ addReplyBulkCString(c,"object");
+ addReplyBulkCBuffer(c,le->object,sdslen(le->object));
+ addReplyBulkCString(c,"username");
+ addReplyBulkCBuffer(c,le->username,sdslen(le->username));
+ addReplyBulkCString(c,"age-seconds");
+ double age = (double)(now - le->ctime)/1000;
+ addReplyDouble(c,age);
+ addReplyBulkCString(c,"client-info");
+ addReplyBulkCBuffer(c,le->cinfo,sdslen(le->cinfo));
+ }
} else if (!strcasecmp(sub,"help")) {
const char *help[] = {
"LOAD -- Reload users from the ACL file.",
+"SAVE -- Save the current config to the ACL file."
"LIST -- Show user details in config file format.",
"USERS -- List all the registered usernames.",
"SETUSER <username> [attribs ...] -- Create or modify a user.",
@@ -1667,6 +1866,7 @@ void aclCommand(client *c) {
"CAT <category> -- List commands inside category.",
"GENPASS -- Generate a secure user password.",
"WHOAMI -- Return the current connection username.",
+"LOG [<count> | RESET] -- Show the ACL log entries.",
NULL
};
addReplyHelp(c,help);
diff --git a/src/ae.c b/src/ae.c
index 2c1dae512..d2248fe5c 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -135,6 +135,14 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
+
+ /* Free the time events list. */
+ aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
+ while (te) {
+ next_te = te->next;
+ zfree(te);
+ te = next_te;
+ }
zfree(eventLoop);
}
diff --git a/src/aof.c b/src/aof.c
index 9eeb3f1e2..8ab9349f0 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -242,6 +242,7 @@ void stopAppendOnly(void) {
server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
+ server.aof_rewrite_scheduled = 0;
killAppendOnlyChild();
}
@@ -1797,14 +1798,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
+ server.aof_lastbgrewrite_status = C_ERR;
+
+ serverLog(LL_WARNING,
+ "Background AOF rewrite terminated with error");
+ } else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error condition. */
if (bysignal != SIGUSR1)
server.aof_lastbgrewrite_status = C_ERR;
- serverLog(LL_WARNING,
- "Background AOF rewrite terminated with error");
- } else {
- server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
diff --git a/src/config.c b/src/config.c
index e4b1bf869..5841ae7a0 100644
--- a/src/config.c
+++ b/src/config.c
@@ -190,8 +190,9 @@ typedef struct typeInterface {
void (*init)(typeData data);
/* Called on server start, should return 1 on success, 0 on error and should set err */
int (*load)(typeData data, sds *argc, int argv, char **err);
- /* Called on CONFIG SET, returns 1 on success, 0 on error */
- int (*set)(typeData data, sds value, char **err);
+ /* Called on server startup and CONFIG SET, returns 1 on success, 0 on error
+ * and can set a verbose err string, update is true when called from CONFIG SET */
+ int (*set)(typeData data, sds value, int update, char **err);
/* Called on CONFIG GET, required to add output to the client */
void (*get)(client *c, typeData data);
/* Called on CONFIG REWRITE, required to rewrite the config state */
@@ -323,7 +324,11 @@ void loadServerConfigFromString(char *config) {
if ((!strcasecmp(argv[0],config->name) ||
(config->alias && !strcasecmp(argv[0],config->alias))))
{
- if (!config->interface.load(config->data, argv, argc, &err)) {
+ if (argc != 2) {
+ err = "wrong number of arguments";
+ goto loaderr;
+ }
+ if (!config->interface.set(config->data, argv[1], 0, &err)) {
goto loaderr;
}
@@ -344,6 +349,10 @@ void loadServerConfigFromString(char *config) {
if (addresses > CONFIG_BINDADDR_MAX) {
err = "Too many bind addresses specified"; goto loaderr;
}
+ /* Free old bind addresses */
+ for (j = 0; j < server.bindaddr_count; j++) {
+ zfree(server.bindaddr[j]);
+ }
for (j = 0; j < addresses; j++)
server.bindaddr[j] = zstrdup(argv[j+1]);
server.bindaddr_count = addresses;
@@ -599,7 +608,7 @@ void configSetCommand(client *c) {
if(config->modifiable && (!strcasecmp(c->argv[2]->ptr,config->name) ||
(config->alias && !strcasecmp(c->argv[2]->ptr,config->alias))))
{
- if (!config->interface.set(config->data,o->ptr, &errstr)) {
+ if (!config->interface.set(config->data,o->ptr,1,&errstr)) {
goto badfmt;
}
addReply(c,shared.ok);
@@ -1536,9 +1545,8 @@ static char loadbuf[LOADBUF_SIZE];
.alias = (config_alias), \
.modifiable = (is_modifiable),
-#define embedConfigInterface(initfn, loadfn, setfn, getfn, rewritefn) .interface = { \
+#define embedConfigInterface(initfn, setfn, getfn, rewritefn) .interface = { \
.init = (initfn), \
- .load = (loadfn), \
.set = (setfn), \
.get = (getfn), \
.rewrite = (rewritefn) \
@@ -1561,30 +1569,17 @@ static void boolConfigInit(typeData data) {
*data.yesno.config = data.yesno.default_value;
}
-static int boolConfigLoad(typeData data, sds *argv, int argc, char **err) {
- int yn;
- if (argc != 2) {
- *err = "wrong number of arguments";
- return 0;
- }
- if ((yn = yesnotoi(argv[1])) == -1) {
+static int boolConfigSet(typeData data, sds value, int update, char **err) {
+ int yn = yesnotoi(value);
+ if (yn == -1) {
*err = "argument must be 'yes' or 'no'";
return 0;
}
if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
return 0;
- *data.yesno.config = yn;
- return 1;
-}
-
-static int boolConfigSet(typeData data, sds value, char **err) {
- int yn = yesnotoi(value);
- if (yn == -1) return 0;
- if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
- return 0;
int prev = *(data.yesno.config);
*(data.yesno.config) = yn;
- if (data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) {
+ if (update && data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) {
*(data.yesno.config) = prev;
return 0;
}
@@ -1601,7 +1596,7 @@ static void boolConfigRewrite(typeData data, const char *name, struct rewriteCon
#define createBoolConfig(name, alias, modifiable, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
- embedConfigInterface(boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite) \
+ embedConfigInterface(boolConfigInit, boolConfigSet, boolConfigGet, boolConfigRewrite) \
.data.yesno = { \
.config = &(config_addr), \
.default_value = (default), \
@@ -1619,23 +1614,7 @@ static void stringConfigInit(typeData data) {
}
}
-static int stringConfigLoad(typeData data, sds *argv, int argc, char **err) {
- if (argc != 2) {
- *err = "wrong number of arguments";
- return 0;
- }
- if (data.string.is_valid_fn && !data.string.is_valid_fn(argv[1], err))
- return 0;
- zfree(*data.string.config);
- if (data.string.convert_empty_to_null) {
- *data.string.config = argv[1][0] ? zstrdup(argv[1]) : NULL;
- } else {
- *data.string.config = zstrdup(argv[1]);
- }
- return 1;
-}
-
-static int stringConfigSet(typeData data, sds value, char **err) {
+static int stringConfigSet(typeData data, sds value, int update, char **err) {
if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err))
return 0;
char *prev = *data.string.config;
@@ -1644,7 +1623,7 @@ static int stringConfigSet(typeData data, sds value, char **err) {
} else {
*data.string.config = zstrdup(value);
}
- if (data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) {
+ if (update && data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) {
zfree(*data.string.config);
*data.string.config = prev;
return 0;
@@ -1666,7 +1645,7 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC
#define createStringConfig(name, alias, modifiable, empty_to_null, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
- embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) \
+ embedConfigInterface(stringConfigInit, stringConfigSet, stringConfigGet, stringConfigRewrite) \
.data.string = { \
.config = &(config_addr), \
.default_value = (default), \
@@ -1677,17 +1656,12 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC
}
/* Enum configs */
-static void configEnumInit(typeData data) {
+static void enumConfigInit(typeData data) {
*data.enumd.config = data.enumd.default_value;
}
-static int configEnumLoad(typeData data, sds *argv, int argc, char **err) {
- if (argc != 2) {
- *err = "wrong number of arguments";
- return 0;
- }
-
- int enumval = configEnumGetValue(data.enumd.enum_value, argv[1]);
+static int enumConfigSet(typeData data, sds value, int update, char **err) {
+ int enumval = configEnumGetValue(data.enumd.enum_value, value);
if (enumval == INT_MIN) {
sds enumerr = sdsnew("argument must be one of the following: ");
configEnum *enumNode = data.enumd.enum_value;
@@ -1709,35 +1683,26 @@ static int configEnumLoad(typeData data, sds *argv, int argc, char **err) {
}
if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
return 0;
- *(data.enumd.config) = enumval;
- return 1;
-}
-
-static int configEnumSet(typeData data, sds value, char **err) {
- int enumval = configEnumGetValue(data.enumd.enum_value, value);
- if (enumval == INT_MIN) return 0;
- if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
- return 0;
int prev = *(data.enumd.config);
*(data.enumd.config) = enumval;
- if (data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) {
+ if (update && data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) {
*(data.enumd.config) = prev;
return 0;
}
return 1;
}
-static void configEnumGet(client *c, typeData data) {
+static void enumConfigGet(client *c, typeData data) {
addReplyBulkCString(c, configEnumGetNameOrUnknown(data.enumd.enum_value,*data.enumd.config));
}
-static void configEnumRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
+static void enumConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value);
}
#define createEnumConfig(name, alias, modifiable, enum, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
- embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) \
+ embedConfigInterface(enumConfigInit, enumConfigSet, enumConfigGet, enumConfigRewrite) \
.data.enumd = { \
.config = &(config_addr), \
.default_value = (default), \
@@ -1832,23 +1797,17 @@ static int numericBoundaryCheck(typeData data, long long ll, char **err) {
return 1;
}
-static int numericConfigLoad(typeData data, sds *argv, int argc, char **err) {
- long long ll;
-
- if (argc != 2) {
- *err = "wrong number of arguments";
- return 0;
- }
-
+static int numericConfigSet(typeData data, sds value, int update, char **err) {
+ long long ll, prev = 0;
if (data.numeric.is_memory) {
int memerr;
- ll = memtoll(argv[1], &memerr);
+ ll = memtoll(value, &memerr);
if (memerr || ll < 0) {
*err = "argument must be a memory value";
return 0;
}
} else {
- if (!string2ll(argv[1], sdslen(argv[1]),&ll)) {
+ if (!string2ll(value, sdslen(value),&ll)) {
*err = "argument couldn't be parsed into an integer" ;
return 0;
}
@@ -1860,31 +1819,10 @@ static int numericConfigLoad(typeData data, sds *argv, int argc, char **err) {
if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err))
return 0;
- SET_NUMERIC_TYPE(ll)
-
- return 1;
-}
-
-static int numericConfigSet(typeData data, sds value, char **err) {
- long long ll, prev = 0;
- if (data.numeric.is_memory) {
- int memerr;
- ll = memtoll(value, &memerr);
- if (memerr || ll < 0) return 0;
- } else {
- if (!string2ll(value, sdslen(value),&ll)) return 0;
- }
-
- if (!numericBoundaryCheck(data, ll, err))
- return 0;
-
- if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err))
- return 0;
-
GET_NUMERIC_TYPE(prev)
SET_NUMERIC_TYPE(ll)
- if (data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) {
+ if (update && data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) {
SET_NUMERIC_TYPE(prev)
return 0;
}
@@ -1918,7 +1856,7 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
#define embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
- embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) \
+ embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \
.data.numeric = { \
.lower_bound = (lower), \
.upper_bound = (upper), \
@@ -2061,8 +1999,9 @@ static int updateMaxmemory(long long val, long long prev, char **err) {
UNUSED(prev);
UNUSED(err);
if (val) {
- if ((unsigned long long)val < zmalloc_used_memory()) {
- serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
+ size_t used = zmalloc_used_memory()-freeMemoryGetNotCountedMemory();
+ if ((unsigned long long)val < used) {
+ serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", server.maxmemory, used);
}
freeMemoryIfNeededAndSafe();
}
@@ -2221,7 +2160,6 @@ standardConfig configs[] = {
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
- createIntConfig("tracking-table-max-fill", NULL, MODIFIABLE_CONFIG, 0, 100, server.tracking_table_max_fill, 10, INTEGER_CONFIG, NULL, NULL), /* Default: 10% tracking table max fill. */
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
@@ -2233,6 +2171,7 @@ standardConfig configs[] = {
/* Unsigned Long configs */
createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */
createULongConfig("slowlog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.slowlog_max_len, 128, INTEGER_CONFIG, NULL, NULL),
+ createULongConfig("acllog-max-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.acllog_max_len, 128, INTEGER_CONFIG, NULL, NULL),
/* Long Long configs */
createLongLongConfig("lua-time-limit", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.lua_time_limit, 5000, INTEGER_CONFIG, NULL, NULL),/* milliseconds */
@@ -2255,6 +2194,7 @@ standardConfig configs[] = {
createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
+ createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
diff --git a/src/db.c b/src/db.c
index ba7be2725..8a0242d9e 100644
--- a/src/db.c
+++ b/src/db.c
@@ -347,7 +347,10 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* DB number if we want to flush only a single Redis database number.
*
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
- * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
+ * 1. EMPTYDB_ASYNC if we want the memory to be freed in a different thread.
+ * 2. EMPTYDB_BACKUP if we want to empty the backup dictionaries created by
+ * disklessLoadMakeBackups. In that case we only free memory and avoid
+ * firing module events.
* and the function to return ASAP.
*
* On success the fuction returns the number of keys removed from the
@@ -355,6 +358,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
+ int backup = (flags & EMPTYDB_BACKUP); /* Just free the memory, nothing else */
+ RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
@@ -362,16 +367,18 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
return -1;
}
- /* Fire the flushdb modules event. */
- RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
- moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
- REDISMODULE_SUBEVENT_FLUSHDB_START,
- &fi);
-
- /* Make sure the WATCHed keys are affected by the FLUSH* commands.
- * Note that we need to call the function while the keys are still
- * there. */
- signalFlushedDb(dbnum);
+ /* Pre-flush actions */
+ if (!backup) {
+ /* Fire the flushdb modules event. */
+ moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
+ REDISMODULE_SUBEVENT_FLUSHDB_START,
+ &fi);
+
+ /* Make sure the WATCHed keys are affected by the FLUSH* commands.
+ * Note that we need to call the function while the keys are still
+ * there. */
+ signalFlushedDb(dbnum);
+ }
int startdb, enddb;
if (dbnum == -1) {
@@ -390,20 +397,24 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(
dictEmpty(dbarray[j].expires,callback);
}
}
- if (server.cluster_enabled) {
- if (async) {
- slotToKeyFlushAsync();
- } else {
- slotToKeyFlush();
+
+ /* Post-flush actions */
+ if (!backup) {
+ if (server.cluster_enabled) {
+ if (async) {
+ slotToKeyFlushAsync();
+ } else {
+ slotToKeyFlush();
+ }
}
- }
- if (dbnum == -1) flushSlaveKeysWithExpireList();
+ if (dbnum == -1) flushSlaveKeysWithExpireList();
- /* Also fire the end event. Note that this event will fire almost
- * immediately after the start event if the flush is asynchronous. */
- moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
- REDISMODULE_SUBEVENT_FLUSHDB_END,
- &fi);
+ /* Also fire the end event. Note that this event will fire almost
+ * immediately after the start event if the flush is asynchronous. */
+ moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
+ REDISMODULE_SUBEVENT_FLUSHDB_END,
+ &fi);
+ }
return removed;
}
@@ -1285,8 +1296,10 @@ int expireIfNeeded(redisDb *db, robj *key) {
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
- return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
- dbSyncDelete(db,key);
+ int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
+ dbSyncDelete(db,key);
+ if (retval) signalModifiedKey(db,key);
+ return retval;
}
/* -----------------------------------------------------------------------------
@@ -1522,6 +1535,22 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
return keys;
}
+/* Helper function to extract keys from memory command.
+ * MEMORY USAGE <key> */
+int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
+ int *keys;
+ UNUSED(cmd);
+
+ if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
+ keys = zmalloc(sizeof(int) * 1);
+ keys[0] = 2;
+ *numkeys = 1;
+ return keys;
+ }
+ *numkeys = 0;
+ return NULL;
+}
+
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
diff --git a/src/debug.c b/src/debug.c
index a2d37337d..36af35aec 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -355,6 +355,7 @@ void debugCommand(client *c) {
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
"DIGEST -- Output a hex signature representing the current DB content.",
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
+"DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false]",
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
"LOG <message> -- write message to the server log.",
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
@@ -362,6 +363,7 @@ void debugCommand(client *c) {
"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.",
"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.",
"OBJECT <key> -- Show low level info about key and associated value.",
+"OOM -- Crash the server simulating an out-of-memory error.",
"PANIC -- Crash the server simulating a panic.",
"POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.",
"RELOAD -- Save the RDB on disk and reload it back in memory.",
@@ -586,7 +588,7 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"protocol") && c->argc == 3) {
/* DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|
- * attrib|push|verbatim|true|false|state|err|bloberr] */
+ * attrib|push|verbatim|true|false] */
char *name = c->argv[2]->ptr;
if (!strcasecmp(name,"string")) {
addReplyBulkCString(c,"Hello World");
@@ -634,7 +636,7 @@ NULL
} else if (!strcasecmp(name,"verbatim")) {
addReplyVerbatim(c,"This is a verbatim\nstring",25,"txt");
} else {
- addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false|state|err|bloberr");
+ addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false");
}
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
double dtime = strtod(c->argv[2]->ptr,NULL);
@@ -683,9 +685,12 @@ NULL
sds stats = sdsempty();
char buf[4096];
- if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
+ if (getLongFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) {
+ sdsfree(stats);
return;
+ }
if (dbid < 0 || dbid >= server.dbnum) {
+ sdsfree(stats);
addReplyError(c,"Out of range database");
return;
}
diff --git a/src/defrag.c b/src/defrag.c
index 04e57955b..e729297a5 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -5,8 +5,8 @@
* We do that by scanning the keyspace and for each pointer we have, we can try to
* ask the allocator if moving it to a new address will help reduce fragmentation.
*
- * Copyright (c) 2017, Oran Agra
- * Copyright (c) 2017, Redis Labs, Inc
+ * Copyright (c) 2020, Oran Agra
+ * Copyright (c) 2020, Redis Labs, Inc
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -408,25 +408,32 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
-long activeDefragQuickListNodes(quicklist *ql) {
- quicklistNode *node = ql->head, *newnode;
+long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
+ quicklistNode *newnode, *node = *node_ref;
long defragged = 0;
unsigned char *newzl;
+ if ((newnode = activeDefragAlloc(node))) {
+ if (newnode->prev)
+ newnode->prev->next = newnode;
+ else
+ ql->head = newnode;
+ if (newnode->next)
+ newnode->next->prev = newnode;
+ else
+ ql->tail = newnode;
+ *node_ref = node = newnode;
+ defragged++;
+ }
+ if ((newzl = activeDefragAlloc(node->zl)))
+ defragged++, node->zl = newzl;
+ return defragged;
+}
+
+long activeDefragQuickListNodes(quicklist *ql) {
+ quicklistNode *node = ql->head;
+ long defragged = 0;
while (node) {
- if ((newnode = activeDefragAlloc(node))) {
- if (newnode->prev)
- newnode->prev->next = newnode;
- else
- ql->head = newnode;
- if (newnode->next)
- newnode->next->prev = newnode;
- else
- ql->tail = newnode;
- node = newnode;
- defragged++;
- }
- if ((newzl = activeDefragAlloc(node->zl)))
- defragged++, node->zl = newzl;
+ defragged += activeDefragQuickListNode(ql, &node);
node = node->next;
}
return defragged;
@@ -440,12 +447,48 @@ void defragLater(redisDb *db, dictEntry *kde) {
listAddNodeTail(db->defrag_later, key);
}
-long scanLaterList(robj *ob) {
+/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
+long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
quicklist *ql = ob->ptr;
+ quicklistNode *node;
+ long iterations = 0;
+ int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;
- server.stat_active_defrag_scanned+=ql->len;
- return activeDefragQuickListNodes(ql);
+
+ if (*cursor == 0) {
+ /* if cursor is 0, we start new iteration */
+ node = ql->head;
+ } else {
+ node = quicklistBookmarkFind(ql, "_AD");
+ if (!node) {
+ /* if the bookmark was deleted, it means we reached the end. */
+ *cursor = 0;
+ return 0;
+ }
+ node = node->next;
+ }
+
+ (*cursor)++;
+ while (node) {
+ (*defragged) += activeDefragQuickListNode(ql, &node);
+ server.stat_active_defrag_scanned++;
+ if (++iterations > 128 && !bookmark_failed) {
+ if (ustime() > endtime) {
+ if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
+ bookmark_failed = 1;
+ } else {
+ ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
+ return 1;
+ }
+ }
+ iterations = 0;
+ }
+ node = node->next;
+ }
+ quicklistBookmarkDelete(ql, "_AD");
+ *cursor = 0;
+ return bookmark_failed? 1: 0;
}
typedef struct {
@@ -638,7 +681,8 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime,
void *newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
- if (++iterations > 16) {
+ server.stat_active_defrag_scanned++;
+ if (++iterations > 128) {
if (ustime() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
@@ -900,8 +944,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
- server.stat_active_defrag_hits += scanLaterList(ob);
- *cursor = 0; /* list has no scan, we must finish it in one go */
+ return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else if (ob->type == OBJ_SET) {
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
@@ -961,11 +1004,6 @@ int defragLaterStep(redisDb *db, long long endtime) {
if (defragLaterItem(de, &defrag_later_cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
- /* Don't start a new BIG key in this loop, this is because the
- * next key can be a list, and scanLaterList must be done in once cycle */
- if (!defrag_later_cursor)
- quit = 1;
-
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit. */
diff --git a/src/module.c b/src/module.c
index 965bb4460..cce3d1c11 100644
--- a/src/module.c
+++ b/src/module.c
@@ -714,9 +714,9 @@ void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
* flags into the command flags used by the Redis core.
*
* It returns the set of flags, or -1 if unknown flags are found. */
-int commandFlagsFromString(char *s) {
+int64_t commandFlagsFromString(char *s) {
int count, j;
- int flags = 0;
+ int64_t flags = 0;
sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
for (j = 0; j < count; j++) {
char *t = tokens[j];
@@ -730,6 +730,7 @@ int commandFlagsFromString(char *s) {
else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
+ else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG;
else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH;
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
@@ -781,6 +782,8 @@ int commandFlagsFromString(char *s) {
* this means.
* * **"no-monitor"**: Don't propagate the command on monitor. Use this if
* the command has sensible data among the arguments.
+ * * **"no-slowlog"**: Don't log this command in the slowlog. Use this if
+ * the command has sensible data among the arguments.
* * **"fast"**: The command time complexity is not greater
* than O(log(N)) where N is the size of the collection or
* anything else representing the normal scalability
@@ -798,7 +801,7 @@ int commandFlagsFromString(char *s) {
* to authenticate a client.
*/
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
- int flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
+ int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
if (flags == -1) return REDISMODULE_ERR;
if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
return REDISMODULE_ERR;
@@ -859,6 +862,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->in_call = 0;
module->in_hook = 0;
module->options = 0;
+ module->info_cb = 0;
ctx->module = module;
}
@@ -889,7 +893,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
-/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH). */
+/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
+ * and client side caching). */
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
signalModifiedKey(ctx->client->db,keyname);
return REDISMODULE_OK;
@@ -3648,14 +3653,15 @@ void moduleRDBLoadError(RedisModuleIO *io) {
io->error = 1;
return;
}
- serverLog(LL_WARNING,
+ serverPanic(
"Error loading data from RDB (short read or EOF). "
"Read performed by module '%s' about type '%s' "
- "after reading '%llu' bytes of a value.",
+ "after reading '%llu' bytes of a value "
+ "for key named: '%s'.",
io->type->module->name,
io->type->name,
- (unsigned long long)io->bytes);
- exit(1);
+ (unsigned long long)io->bytes,
+ io->key? (char*)io->key->ptr: "(null)");
}
/* Returns 0 if there's at least one registered data type that did not declare
@@ -3901,7 +3907,7 @@ void RM_SaveLongDouble(RedisModuleIO *io, long double value) {
/* Long double has different number of bits in different platforms, so we
* save it as a string type. */
size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX);
- RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */
+ RM_SaveStringBuffer(io,buf,len);
}
/* In the context of the rdb_save method of a module data type, loads back the
@@ -4277,6 +4283,24 @@ void unblockClientFromModule(client *c) {
moduleFreeContext(&ctx);
}
+ /* If we made it here and client is still blocked it means that the command
+ * timed-out, client was killed or disconnected and disconnect_callback was
+ * not implemented (or it was, but RM_UnblockClient was not called from
+ * within it, as it should).
+ * We must call moduleUnblockClient in order to free privdata and
+ * RedisModuleBlockedClient.
+ *
+ * Note that we only do that for clients that are blocked on keys, for which
+ * the contract is that the module should not call RM_UnblockClient under
+ * normal circumstances.
+ * Clients implementing threads and working with private data should be
+ * aware that calling RM_UnblockClient for every blocked client is their
+ * responsibility, and if they fail to do so memory may leak. Ideally they
+ * should implement the disconnect and timeout callbacks and call
+ * RM_UnblockClient, but any other way is also acceptable. */
+ if (bc->blocked_on_keys && !bc->unblocked)
+ moduleUnblockClient(c);
+
bc->client = NULL;
/* Reset the client for a new query since, for blocking commands implemented
* into modules, we do not it immediately after the command returns (and
@@ -4388,6 +4412,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
*
* free_privdata: called in order to free the private data that is passed
* by RedisModule_UnblockClient() call.
+ *
+ * Note: RedisModule_UnblockClient should be called for every blocked client,
+ * even if client was killed, timed-out or disconnected. Failing to do so
+ * will result in memory leaks.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
@@ -4442,7 +4470,15 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* freed using the free_privdata callback provided by the user.
*
* However the reply callback will be able to access the argument vector of
- * the command, so the private data is often not needed. */
+ * the command, so the private data is often not needed.
+ *
+ * Note: Under normal circumstances RedisModule_UnblockClient should not be
+ * called for clients that are blocked on keys (Either the key will
+ * become ready or a timeout will occur). If for some reason you do want
+ * to call RedisModule_UnblockClient it is possible: Client will be
+ * handled as if it were timed-out (You must implement the timeout
+ * callback in that case).
+ */
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
}
@@ -4704,9 +4740,9 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
*
* To call non-reply APIs, the thread safe context must be prepared with:
*
- * RedisModule_ThreadSafeCallStart(ctx);
+ * RedisModule_ThreadSafeContextLock(ctx);
* ... make your call here ...
- * RedisModule_ThreadSafeCallStop(ctx);
+ * RedisModule_ThreadSafeContextUnlock(ctx);
*
* This is not needed when using `RedisModule_Reply*` functions, assuming
* that a blocked client was used when the context was created, otherwise
@@ -4789,7 +4825,8 @@ void moduleReleaseGIL(void) {
* - REDISMODULE_NOTIFY_EXPIRED: Expiration events
* - REDISMODULE_NOTIFY_EVICTED: Eviction events
* - REDISMODULE_NOTIFY_STREAM: Stream events
- * - REDISMODULE_NOTIFY_ALL: All events
+ * - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
+ * - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
*
* We do not distinguish between key events and keyspace events, and it is up
* to the module to filter the actions taken based on the key.
@@ -6687,7 +6724,7 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
server.module_child_pid = childpid;
moduleForkInfo.done_handler = cb;
moduleForkInfo.done_handler_user_data = user_data;
- serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid);
+ serverLog(LL_VERBOSE, "Module fork started pid: %d ", childpid);
}
return childpid;
}
@@ -6710,7 +6747,7 @@ int TerminateModuleForkChild(int child_pid, int wait) {
server.module_child_pid != child_pid) return C_ERR;
int statloc;
- serverLog(LL_NOTICE,"Killing running module fork child: %ld",
+ serverLog(LL_VERBOSE,"Killing running module fork child: %ld",
(long) server.module_child_pid);
if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
while(wait4(server.module_child_pid,&statloc,0,NULL) !=
diff --git a/src/multi.c b/src/multi.c
index df11225bd..cbbd2c513 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -177,8 +177,10 @@ void execCommand(client *c) {
must_propagate = 1;
}
- int acl_retval = ACLCheckCommandPerm(c);
+ int acl_keypos;
+ int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
+ addACLLogEntry(c,acl_retval,acl_keypos,NULL);
addReplyErrorFormat(c,
"-NOPERM ACLs rules changed between the moment the "
"transaction was accumulated and the EXEC call. "
diff --git a/src/networking.c b/src/networking.c
index a2e454d4b..4c394af70 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -154,6 +154,7 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
+ c->client_tracking_prefixes = NULL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
@@ -369,9 +370,10 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
* Where the master must propagate the first change even if the second
* will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */
- if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
- char* to = c->flags & CLIENT_MASTER? "master": "replica";
- char* from = c->flags & CLIENT_MASTER? "replica": "master";
+ int ctype = getClientType(c);
+ if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) {
+ char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica";
+ char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command "
@@ -1074,7 +1076,7 @@ void freeClient(client *c) {
}
/* Log link disconnection with slave */
- if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c));
}
@@ -1121,7 +1123,7 @@ void freeClient(client *c) {
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
- if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
+ if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
@@ -1162,9 +1164,14 @@ void freeClientAsync(client *c) {
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
- static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
+ if (server.io_threads_num == 1) {
+ /* no need to bother with locking if there's just one thread (the main thread) */
+ listAddNodeTail(server.clients_to_close,c);
+ return;
+ }
+ static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
pthread_mutex_unlock(&async_free_queue_mutex);
@@ -1252,8 +1259,8 @@ int writeToClient(client *c, int handler_installed) {
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
- * a slave (otherwise, on high-speed traffic, the replication
- * buffer will grow indefinitely) */
+ * a slave or a monitor (otherwise, on high-speed traffic, the
+ * replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
@@ -1358,6 +1365,12 @@ void resetClient(client *c) {
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
c->flags &= ~CLIENT_ASKING;
+ /* We do the same for the CACHING command as well. It also affects
+ * the next command or transaction executed, in a way very similar
+ * to ASKING. */
+ if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
+ c->flags &= ~CLIENT_TRACKING_CACHING;
+
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
* to the next command will be sent, but set the flag if the command
* we just processed was "CLIENT REPLY SKIP". */
@@ -1439,7 +1452,7 @@ int processInlineBuffer(client *c) {
/* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* RDB file. */
- if (querylen == 0 && c->flags & CLIENT_SLAVE)
+ if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
c->repl_ack_time = server.unixtime;
/* Move querybuffer position to the next query in the buffer. */
@@ -2021,7 +2034,6 @@ int clientSetNameOrReply(client *c, robj *name) {
void clientCommand(client *c) {
listNode *ln;
listIter li;
- client *client;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
@@ -2038,7 +2050,7 @@ void clientCommand(client *c) {
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
"SETNAME <name> -- Assign the name <name> to the current connection.",
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
-"TRACKING (on|off) [REDIRECT <id>] -- Enable client keys tracking for client side caching.",
+"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.",
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
NULL
};
@@ -2135,7 +2147,7 @@ NULL
/* Iterate clients killing all the matching clients. */
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
- client = listNodeValue(ln);
+ client *client = listNodeValue(ln);
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
if (type != -1 && getClientType(client) != type) continue;
if (id != 0 && client->id != id) continue;
@@ -2213,38 +2225,131 @@ NULL
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
- } else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
- (c->argc == 3 || c->argc == 5))
- {
- /* CLIENT TRACKING (on|off) [REDIRECT <id>] */
+ } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
+ /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
+ * [PREFIX second] [OPTIN] [OPTOUT] ... */
long long redir = 0;
+ uint64_t options = 0;
+ robj **prefix = NULL;
+ size_t numprefix = 0;
+
+ /* Parse the options. */
+ for (int j = 3; j < c->argc; j++) {
+ int moreargs = (c->argc-1) - j;
+
+ if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
+ j++;
+ if (redir != 0) {
+ addReplyError(c,"A client can only redirect to a single "
+ "other client");
+ zfree(prefix);
+ return;
+ }
- /* Parse the redirection option: we'll require the client with
- * the specified ID to exist right now, even if it is possible
- * it will get disconnected later. */
- if (c->argc == 5) {
- if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
- addReply(c,shared.syntaxerr);
- return;
- } else {
- if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
- C_OK) return;
+ if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
+ C_OK)
+ {
+ zfree(prefix);
+ return;
+ }
+ /* We will require the client with the specified ID to exist
+ * right now, even if it is possible that it gets disconnected
+ * later. Still a valid sanity check. */
if (lookupClientByID(redir) == NULL) {
addReplyError(c,"The client ID you want redirect to "
"does not exist");
+ zfree(prefix);
return;
}
+ } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
+ options |= CLIENT_TRACKING_BCAST;
+ } else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
+ options |= CLIENT_TRACKING_OPTIN;
+ } else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
+ options |= CLIENT_TRACKING_OPTOUT;
+ } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
+ j++;
+ prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
+ prefix[numprefix++] = c->argv[j];
+ } else {
+ zfree(prefix);
+ addReply(c,shared.syntaxerr);
+ return;
}
}
+ /* Options are ok: enable or disable the tracking for this client. */
if (!strcasecmp(c->argv[2]->ptr,"on")) {
- enableTracking(c,redir);
+ /* Before enabling tracking, make sure options are compatible
+ * among each other and with the current state of the client. */
+ if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
+ addReplyError(c,
+ "PREFIX option requires BCAST mode to be enabled");
+ zfree(prefix);
+ return;
+ }
+
+ if (c->flags & CLIENT_TRACKING) {
+ int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
+ int newbcast = !!(options & CLIENT_TRACKING_BCAST);
+ if (oldbcast != newbcast) {
+ addReplyError(c,
+ "You can't switch BCAST mode on/off before disabling "
+ "tracking for this client, and then re-enabling it with "
+ "a different mode.");
+ zfree(prefix);
+ return;
+ }
+ }
+
+ if (options & CLIENT_TRACKING_BCAST &&
+ options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
+ {
+ addReplyError(c,
+ "OPTIN and OPTOUT are not compatible with BCAST");
+ zfree(prefix);
+ return;
+ }
+
+ enableTracking(c,redir,options,prefix,numprefix);
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
disableTracking(c);
} else {
+ zfree(prefix);
addReply(c,shared.syntaxerr);
return;
}
+ zfree(prefix);
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
+ if (!(c->flags & CLIENT_TRACKING)) {
+ addReplyError(c,"CLIENT CACHING can be called only when the "
+ "client is in tracking mode with OPTIN or "
+ "OPTOUT mode enabled");
+ return;
+ }
+
+ char *opt = c->argv[2]->ptr;
+ if (!strcasecmp(opt,"yes")) {
+ if (c->flags & CLIENT_TRACKING_OPTIN) {
+ c->flags |= CLIENT_TRACKING_CACHING;
+ } else {
+ addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
+ return;
+ }
+ } else if (!strcasecmp(opt,"no")) {
+ if (c->flags & CLIENT_TRACKING_OPTOUT) {
+ c->flags |= CLIENT_TRACKING_CACHING;
+ } else {
+ addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
+ return;
+ }
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+
+ /* Common reply for when we succeeded. */
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
/* CLIENT GETREDIR */
@@ -2433,12 +2538,14 @@ unsigned long getClientOutputBufferMemoryUsage(client *c) {
*
* The function will return one of the following:
* CLIENT_TYPE_NORMAL -> Normal client
- * CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command
+ * CLIENT_TYPE_SLAVE -> Slave
* CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
* CLIENT_TYPE_MASTER -> The client representing our replication master.
*/
int getClientType(client *c) {
if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
+ /* Even though MONITOR clients are marked as replicas, we
+ * want the expose them as normal clients. */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
return CLIENT_TYPE_SLAVE;
if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
diff --git a/src/notify.c b/src/notify.c
index d6c3ad403..bb1055724 100644
--- a/src/notify.c
+++ b/src/notify.c
@@ -82,10 +82,10 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
- if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
+ if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
return res;
}
diff --git a/src/object.c b/src/object.c
index 2201a317a..11e335afc 100644
--- a/src/object.c
+++ b/src/object.c
@@ -640,21 +640,13 @@ int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *m
int getLongDoubleFromObject(robj *o, long double *target) {
long double value;
- char *eptr;
if (o == NULL) {
value = 0;
} else {
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
if (sdsEncodedObject(o)) {
- errno = 0;
- value = strtold(o->ptr, &eptr);
- if (sdslen(o->ptr) == 0 ||
- isspace(((const char*)o->ptr)[0]) ||
- (size_t)(eptr-(char*)o->ptr) != sdslen(o->ptr) ||
- (errno == ERANGE &&
- (value == HUGE_VAL || value == -HUGE_VAL || value == 0)) ||
- isnan(value))
+ if (!string2ld(o->ptr, sdslen(o->ptr), &value))
return C_ERR;
} else if (o->encoding == OBJ_ENCODING_INT) {
value = (long)o->ptr;
@@ -983,37 +975,28 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem_total += mem;
mem = 0;
- if (listLength(server.slaves)) {
- listIter li;
- listNode *ln;
-
- listRewind(server.slaves,&li);
- while((ln = listNext(&li))) {
- client *c = listNodeValue(ln);
- mem += getClientOutputBufferMemoryUsage(c);
- mem += sdsAllocSize(c->querybuf);
- mem += sizeof(client);
- }
- }
- mh->clients_slaves = mem;
- mem_total+=mem;
-
- mem = 0;
if (listLength(server.clients)) {
listIter li;
listNode *ln;
+ size_t mem_normal = 0, mem_slaves = 0;
listRewind(server.clients,&li);
while((ln = listNext(&li))) {
+ size_t mem_curr = 0;
client *c = listNodeValue(ln);
- if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR))
- continue;
- mem += getClientOutputBufferMemoryUsage(c);
- mem += sdsAllocSize(c->querybuf);
- mem += sizeof(client);
+ int type = getClientType(c);
+ mem_curr += getClientOutputBufferMemoryUsage(c);
+ mem_curr += sdsAllocSize(c->querybuf);
+ mem_curr += sizeof(client);
+ if (type == CLIENT_TYPE_SLAVE)
+ mem_slaves += mem_curr;
+ else
+ mem_normal += mem_curr;
}
+ mh->clients_slaves = mem_slaves;
+ mh->clients_normal = mem_normal;
+ mem = mem_slaves + mem_normal;
}
- mh->clients_normal = mem;
mem_total+=mem;
mem = 0;
@@ -1119,13 +1102,13 @@ sds getMemoryDoctorReport(void) {
num_reports++;
}
- /* Allocator fss is higher than 1.1 and 10MB ? */
+ /* Allocator rss is higher than 1.1 and 10MB ? */
if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) {
high_alloc_rss = 1;
num_reports++;
}
- /* Non-Allocator fss is higher than 1.1 and 10MB ? */
+ /* Non-Allocator rss is higher than 1.1 and 10MB ? */
if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) {
high_proc_rss = 1;
num_reports++;
diff --git a/src/pubsub.c b/src/pubsub.c
index 994dd9734..5cb4298e0 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -35,7 +35,11 @@ int clientSubscriptionsCount(client *c);
* Pubsub client replies API
*----------------------------------------------------------------------------*/
-/* Send a pubsub message of type "message" to the client. */
+/* Send a pubsub message of type "message" to the client.
+ * Normally 'msg' is a Redis object containing the string to send as
+ * message. However if the caller sets 'msg' as NULL, it will be able
+ * to send a special message (for instance an Array type) by using the
+ * addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
@@ -43,7 +47,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
addReplyPushLen(c,3);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
- addReplyBulk(c,msg);
+ if (msg) addReplyBulk(c,msg);
}
/* Send a pubsub message of type "pmessage" to the client. The difference
diff --git a/src/quicklist.c b/src/quicklist.c
index 7b5484116..ae183ffd8 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -70,6 +70,12 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
} while (0);
#endif
+/* Bookmarks forward declarations */
+#define QL_MAX_BM ((1 << QL_BM_BITS)-1)
+quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name);
+quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
+void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
+
/* Simple way to give quicklistEntry structs default values with one call. */
#define initEntry(e) \
do { \
@@ -100,10 +106,11 @@ quicklist *quicklistCreate(void) {
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
+ quicklist->bookmark_count = 0;
return quicklist;
}
-#define COMPRESS_MAX (1 << 16)
+#define COMPRESS_MAX (1 << QL_COMP_BITS)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
if (compress > COMPRESS_MAX) {
compress = COMPRESS_MAX;
@@ -113,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
quicklist->compress = compress;
}
-#define FILL_MAX (1 << 15)
+#define FILL_MAX (1 << (QL_FILL_BITS-1))
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
@@ -169,6 +176,7 @@ void quicklistRelease(quicklist *quicklist) {
quicklist->len--;
current = next;
}
+ quicklistBookmarksClear(quicklist);
zfree(quicklist);
}
@@ -578,6 +586,15 @@ quicklist *quicklistCreateFromZiplist(int fill, int compress,
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
quicklistNode *node) {
+ /* Update the bookmark if any */
+ quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node);
+ if (bm) {
+ bm->node = node->next;
+ /* if the bookmark was to the last node, delete it. */
+ if (!bm->node)
+ _quicklistBookmarkDelete(quicklist, bm);
+ }
+
if (node->next)
node->next->prev = node->prev;
if (node->prev)
@@ -1410,6 +1427,87 @@ void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
}
}
+/* Create or update a bookmark in the list which will be updated to the next node
+ * automatically when the one referenced gets deleted.
+ * Returns 1 on success (creation of new bookmark or override of an existing one).
+ * Returns 0 on failure (reached the maximum supported number of bookmarks).
+ * NOTE: use short simple names, so that string compare on find is quick.
+ * NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer
+ may change and it's the caller responsibilty to update the reference.
+ */
+int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) {
+ quicklist *ql = *ql_ref;
+ if (ql->bookmark_count >= QL_MAX_BM)
+ return 0;
+ quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
+ if (bm) {
+ bm->node = node;
+ return 1;
+ }
+ ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark));
+ *ql_ref = ql;
+ ql->bookmarks[ql->bookmark_count].node = node;
+ ql->bookmarks[ql->bookmark_count].name = zstrdup(name);
+ ql->bookmark_count++;
+ return 1;
+}
+
+/* Find the quicklist node referenced by a named bookmark.
+ * When the bookmarked node is deleted the bookmark is updated to the next node,
+ * and if that's the last node, the bookmark is deleted (so find returns NULL). */
+quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name) {
+ quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
+ if (!bm) return NULL;
+ return bm->node;
+}
+
+/* Delete a named bookmark.
+ * returns 0 if bookmark was not found, and 1 if deleted.
+ * Note that the bookmark memory is not freed yet, and is kept for future use. */
+int quicklistBookmarkDelete(quicklist *ql, const char *name) {
+ quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
+ if (!bm)
+ return 0;
+ _quicklistBookmarkDelete(ql, bm);
+ return 1;
+}
+
+quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) {
+ unsigned i;
+ for (i=0; i<ql->bookmark_count; i++) {
+ if (!strcmp(ql->bookmarks[i].name, name)) {
+ return &ql->bookmarks[i];
+ }
+ }
+ return NULL;
+}
+
+quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) {
+ unsigned i;
+ for (i=0; i<ql->bookmark_count; i++) {
+ if (ql->bookmarks[i].node == node) {
+ return &ql->bookmarks[i];
+ }
+ }
+ return NULL;
+}
+
+void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm) {
+ int index = bm - ql->bookmarks;
+ zfree(bm->name);
+ ql->bookmark_count--;
+ memmove(bm, bm+1, (ql->bookmark_count - index)* sizeof(*bm));
+ /* NOTE: We do not shrink (realloc) the quicklist yet (to avoid resonance,
+ * it may be re-used later (a call to realloc may NOP). */
+}
+
+void quicklistBookmarksClear(quicklist *ql) {
+ while (ql->bookmark_count)
+ zfree(ql->bookmarks[--ql->bookmark_count].name);
+ /* NOTE: We do not shrink (realloc) the quick list. main use case for this
+ * function is just before releasing the allocation. */
+}
+
/* The rest of this file is test cases and test helpers. */
#ifdef REDIS_TEST
#include <stdint.h>
@@ -2641,6 +2739,54 @@ int quicklistTest(int argc, char *argv[]) {
printf("Compressions: %0.2f seconds.\n", (float)(stop - start) / 1000);
printf("\n");
+ TEST("bookmark get updated to next item") {
+ quicklist *ql = quicklistNew(1, 0);
+ quicklistPushTail(ql, "1", 1);
+ quicklistPushTail(ql, "2", 1);
+ quicklistPushTail(ql, "3", 1);
+ quicklistPushTail(ql, "4", 1);
+ quicklistPushTail(ql, "5", 1);
+ assert(ql->len==5);
+ /* add two bookmarks, one pointing to the node before the last. */
+ assert(quicklistBookmarkCreate(&ql, "_dummy", ql->head->next));
+ assert(quicklistBookmarkCreate(&ql, "_test", ql->tail->prev));
+ /* test that the bookmark returns the right node, delete it and see that the bookmark points to the last node */
+ assert(quicklistBookmarkFind(ql, "_test") == ql->tail->prev);
+ assert(quicklistDelRange(ql, -2, 1));
+ assert(quicklistBookmarkFind(ql, "_test") == ql->tail);
+ /* delete the last node, and see that the bookmark was deleted. */
+ assert(quicklistDelRange(ql, -1, 1));
+ assert(quicklistBookmarkFind(ql, "_test") == NULL);
+ /* test that other bookmarks aren't affected */
+ assert(quicklistBookmarkFind(ql, "_dummy") == ql->head->next);
+ assert(quicklistBookmarkFind(ql, "_missing") == NULL);
+ assert(ql->len==3);
+ quicklistBookmarksClear(ql); /* for coverage */
+ assert(quicklistBookmarkFind(ql, "_dummy") == NULL);
+ quicklistRelease(ql);
+ }
+
+ TEST("bookmark limit") {
+ int i;
+ quicklist *ql = quicklistNew(1, 0);
+ quicklistPushHead(ql, "1", 1);
+ for (i=0; i<QL_MAX_BM; i++)
+ assert(quicklistBookmarkCreate(&ql, genstr("",i), ql->head));
+ /* when all bookmarks are used, creation fails */
+ assert(!quicklistBookmarkCreate(&ql, "_test", ql->head));
+ /* delete one and see that we can now create another */
+ assert(quicklistBookmarkDelete(ql, "0"));
+ assert(quicklistBookmarkCreate(&ql, "_test", ql->head));
+ /* delete one and see that the rest survive */
+ assert(quicklistBookmarkDelete(ql, "_test"));
+ for (i=1; i<QL_MAX_BM; i++)
+ assert(quicklistBookmarkFind(ql, genstr("",i)) == ql->head);
+ /* make sure the deleted ones are indeed gone */
+ assert(!quicklistBookmarkFind(ql, "0"));
+ assert(!quicklistBookmarkFind(ql, "_test"));
+ quicklistRelease(ql);
+ }
+
if (!err)
printf("ALL TESTS PASSED!\n");
else
diff --git a/src/quicklist.h b/src/quicklist.h
index a7e27a2dd..8b553c119 100644
--- a/src/quicklist.h
+++ b/src/quicklist.h
@@ -28,6 +28,8 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+#include <stdint.h> // for UINTPTR_MAX
+
#ifndef __QUICKLIST_H__
#define __QUICKLIST_H__
@@ -64,19 +66,51 @@ typedef struct quicklistLZF {
char compressed[];
} quicklistLZF;
+/* Bookmarks are padded with realloc at the end of of the quicklist struct.
+ * They should only be used for very big lists if thousands of nodes were the
+ * excess memory usage is negligible, and there's a real need to iterate on them
+ * in portions.
+ * When not used, they don't add any memory overhead, but when used and then
+ * deleted, some overhead remains (to avoid resonance).
+ * The number of bookmarks used should be kept to minimum since it also adds
+ * overhead on node deletion (searching for a bookmark to update). */
+typedef struct quicklistBookmark {
+ quicklistNode *node;
+ char *name;
+} quicklistBookmark;
+
+#if UINTPTR_MAX == 0xffffffff
+/* 32-bit */
+# define QL_FILL_BITS 14
+# define QL_COMP_BITS 14
+# define QL_BM_BITS 4
+#elif UINTPTR_MAX == 0xffffffffffffffff
+/* 64-bit */
+# define QL_FILL_BITS 16
+# define QL_COMP_BITS 16
+# define QL_BM_BITS 4 /* we can encode more, but we rather limit the user
+ since they cause performance degradation. */
+#else
+# error unknown arch bits count
+#endif
+
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
- * 'fill' is the user-requested (or default) fill factor. */
+ * 'fill' is the user-requested (or default) fill factor.
+ * 'bookmakrs are an optional feature that is used by realloc this struct,
+ * so that they don't consume memory when not used. */
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
- int fill : 16; /* fill factor for individual nodes */
- unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
+ int fill : QL_FILL_BITS; /* fill factor for individual nodes */
+ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
+ unsigned int bookmark_count: QL_BM_BITS;
+ quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistIter {
@@ -158,6 +192,12 @@ unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data);
+/* bookmarks */
+int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node);
+int quicklistBookmarkDelete(quicklist *ql, const char *name);
+quicklistNode *quicklistBookmarkFind(quicklist *ql, const char *name);
+void quicklistBookmarksClear(quicklist *ql);
+
#ifdef REDIS_TEST
int quicklistTest(int argc, char *argv[]);
#endif
diff --git a/src/rax.c b/src/rax.c
index 29b74ae90..a560dde02 100644
--- a/src/rax.c
+++ b/src/rax.c
@@ -1766,6 +1766,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) {
if (n->iskey) steps--;
}
it->node = n;
+ it->data = raxGetData(it->node);
return 1;
}
diff --git a/src/rdb.c b/src/rdb.c
index 27e1b3135..61265433d 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1871,7 +1871,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
- if (rioGetReadError(rdb)) return NULL;
+ if (rioGetReadError(rdb)) {
+ rdbReportReadError("Short read module id");
+ return NULL;
+ }
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 065c389c6..1d79e0db0 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -49,6 +49,7 @@
#include <hiredis.h>
#ifdef USE_OPENSSL
#include <openssl/ssl.h>
+#include <openssl/err.h>
#include <hiredis_ssl.h>
#endif
#include <sds.h> /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */
@@ -1594,15 +1595,15 @@ static int parseOptions(int argc, char **argv) {
#ifdef USE_OPENSSL
} else if (!strcmp(argv[i],"--tls")) {
config.tls = 1;
- } else if (!strcmp(argv[i],"--sni")) {
+ } else if (!strcmp(argv[i],"--sni") && !lastarg) {
config.sni = argv[++i];
- } else if (!strcmp(argv[i],"--cacertdir")) {
+ } else if (!strcmp(argv[i],"--cacertdir") && !lastarg) {
config.cacertdir = argv[++i];
- } else if (!strcmp(argv[i],"--cacert")) {
+ } else if (!strcmp(argv[i],"--cacert") && !lastarg) {
config.cacert = argv[++i];
- } else if (!strcmp(argv[i],"--cert")) {
+ } else if (!strcmp(argv[i],"--cert") && !lastarg) {
config.cert = argv[++i];
- } else if (!strcmp(argv[i],"--key")) {
+ } else if (!strcmp(argv[i],"--key") && !lastarg) {
config.key = argv[++i];
#endif
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
@@ -1687,8 +1688,8 @@ static void usage(void) {
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
" variable to pass this password more safely\n"
" (if both are used, this argument takes predecence).\n"
-" -user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
-" -pass <password> Alias of -a for consistency with the new --user option.\n"
+" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
+" --pass <password> Alias of -a for consistency with the new --user option.\n"
" -u <uri> Server URI.\n"
" -r <repeat> Execute specified command N times.\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
@@ -1700,12 +1701,13 @@ static void usage(void) {
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
#ifdef USE_OPENSSL
" --tls Establish a secure TLS connection.\n"
-" --cacert CA Certificate file to verify with.\n"
-" --cacertdir Directory where trusted CA certificates are stored.\n"
+" --sni <host> Server name indication for TLS.\n"
+" --cacert <file> CA Certificate file to verify with.\n"
+" --cacertdir <dir> Directory where trusted CA certificates are stored.\n"
" If neither cacert nor cacertdir are specified, the default\n"
" system-wide trusted root certs configuration will apply.\n"
-" --cert Client certificate to authenticate with.\n"
-" --key Private key file to authenticate with.\n"
+" --cert <file> Client certificate to authenticate with.\n"
+" --key <file> Private key file to authenticate with.\n"
#endif
" --raw Use raw formatting for replies (default when STDOUT is\n"
" not a tty).\n"
@@ -7933,6 +7935,14 @@ int main(int argc, char **argv) {
parseEnv();
+#ifdef USE_OPENSSL
+ if (config.tls) {
+ ERR_load_crypto_strings();
+ SSL_load_error_strings();
+ SSL_library_init();
+ }
+#endif
+
/* Cluster Manager mode */
if (CLUSTER_MANAGER_MODE()) {
clusterManagerCommandProc *proc = validateClusterManagerCommand();
diff --git a/src/redismodule.h b/src/redismodule.h
index 637078f2b..e74611f13 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -127,8 +127,8 @@
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
-#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */
-#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */
+#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */
+#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */
/* A special pointer that we can use between the core and the module to signal
diff --git a/src/replication.c b/src/replication.c
index b7e77184a..c497051c8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1339,8 +1339,8 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
server.db[i] = backup[i];
}
} else {
- /* Delete. */
- emptyDbGeneric(backup,-1,empty_db_flags,replicationEmptyDbCallback);
+ /* Delete (Pass EMPTYDB_BACKUP in order to avoid firing module events) . */
+ emptyDbGeneric(backup,-1,empty_db_flags|EMPTYDB_BACKUP,replicationEmptyDbCallback);
for (int i=0; i<server.dbnum; i++) {
dictRelease(backup[i].dict);
dictRelease(backup[i].expires);
@@ -1352,9 +1352,9 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
- char buf[4096];
+ char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
- int use_diskless_load;
+ int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
@@ -1411,19 +1411,18 @@ void readSyncBulkPayload(connection *conn) {
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
- useDisklessLoad()? "to parser":"to disk");
+ use_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
- useDisklessLoad()? "to parser":"to disk");
+ use_diskless_load? "to parser":"to disk");
}
return;
}
- use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search
* for the EOF. */
@@ -1525,7 +1524,6 @@ void readSyncBulkPayload(connection *conn) {
/* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
- signalFlushedDb(-1);
/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
@@ -1533,10 +1531,15 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
+ /* Create a backup of server.db[] and initialize to empty
+ * dictionaries */
diskless_load_backup = disklessLoadMakeBackups();
- } else {
- emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
+ /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
+ * (Where disklessLoadMakeBackups left server.db empty) because we
+ * want to execute all the auxiliary logic of emptyDb (Namely,
+ * fire module events) */
+ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
@@ -2399,6 +2402,10 @@ void replicationUnsetMaster(void) {
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
+
+ /* Restart the AOF subsystem in case we shut it down during a sync when
+ * we were still a slave. */
+ if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
/* This function is called when the slave lose the connection with the
@@ -2436,9 +2443,6 @@ void replicaofCommand(client *c) {
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
- /* Restart the AOF subsystem in case we shut it down during a sync when
- * we were still a slave. */
- if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
} else {
long port;
diff --git a/src/scripting.c b/src/scripting.c
index 9282b7fd9..7f64e06db 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -606,8 +606,10 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
}
/* Check the ACLs. */
- int acl_retval = ACLCheckCommandPerm(c);
+ int acl_keypos;
+ int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
+ addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
luaPushError(lua, "The user executing the script can't run this "
"command or subcommand");
@@ -2473,6 +2475,7 @@ void ldbEval(lua_State *lua, sds *argv, int argc) {
ldbLog(sdscatfmt(sdsempty(),"<error> %s",lua_tostring(lua,-1)));
lua_pop(lua,1);
sdsfree(code);
+ sdsfree(expr);
return;
}
}
diff --git a/src/sds.h b/src/sds.h
index 1bdb60dec..adcc12c0a 100644
--- a/src/sds.h
+++ b/src/sds.h
@@ -34,7 +34,7 @@
#define __SDS_H
#define SDS_MAX_PREALLOC (1024*1024)
-const char *SDS_NOINIT;
+extern const char *SDS_NOINIT;
#include <sys/types.h>
#include <stdarg.h>
diff --git a/src/server.c b/src/server.c
index 5845a5485..22c81070c 100644
--- a/src/server.c
+++ b/src/server.c
@@ -579,7 +579,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"select",selectCommand,2,
- "ok-loading fast @keyspace",
+ "ok-loading fast ok-stale @keyspace",
0,NULL,0,0,0,0,0,0},
{"swapdb",swapdbCommand,3,
@@ -660,7 +660,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"lastsave",lastsaveCommand,1,
- "read-only random fast @admin @dangerous",
+ "read-only random fast ok-loading ok-stale @admin @dangerous",
0,NULL,0,0,0,0,0,0},
{"type",typeCommand,2,
@@ -708,7 +708,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"monitor",monitorCommand,1,
- "admin no-script",
+ "admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"ttl",ttlCommand,2,
@@ -740,7 +740,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"debug",debugCommand,-2,
- "admin no-script",
+ "admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"config",configCommand,-2,
@@ -817,14 +817,14 @@ struct redisCommand redisCommandTable[] = {
{"memory",memoryCommand,-2,
"random read-only",
- 0,NULL,0,0,0,0,0,0},
+ 0,memoryGetKeys,0,0,0,0,0,0},
{"client",clientCommand,-2,
- "admin no-script random @connection",
+ "admin no-script random ok-loading ok-stale @connection",
0,NULL,0,0,0,0,0,0},
{"hello",helloCommand,-2,
- "no-auth no-script fast no-monitor no-slowlog @connection",
+ "no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection",
0,NULL,0,0,0,0,0,0},
/* EVAL can modify the dataset, however it is not flagged as a write
@@ -838,7 +838,7 @@ struct redisCommand redisCommandTable[] = {
0,evalGetKeys,0,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,
- "admin random",
+ "admin random ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"script",scriptCommand,-2,
@@ -846,7 +846,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"time",timeCommand,1,
- "read-only random fast",
+ "read-only random fast ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"bitop",bitopCommand,-4,
@@ -1498,7 +1498,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000;
if (server.maxidletime &&
- !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
+ !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
@@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.unblocked_clients))
processUnblockedClients();
+ /* Send the invalidation messages to clients participating to the
+ * client side caching protocol in broadcasting (BCAST) mode. */
+ trackingBroadcastInvalidationMessages();
+
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
@@ -3310,8 +3314,11 @@ void call(client *c, int flags) {
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
server.lua_caller : c;
- if (caller->flags & CLIENT_TRACKING)
+ if (caller->flags & CLIENT_TRACKING &&
+ !(caller->flags & CLIENT_TRACKING_BCAST))
+ {
trackingRememberKeys(caller);
+ }
}
server.fixed_time_expire--;
@@ -3377,8 +3384,10 @@ int processCommand(client *c) {
/* Check if the user can run this command according to the current
* ACLs. */
- int acl_retval = ACLCheckCommandPerm(c);
+ int acl_keypos;
+ int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
+ addACLLogEntry(c,acl_retval,acl_keypos,NULL);
flagTransaction(c);
if (acl_retval == ACL_DENIED_CMD)
addReplyErrorFormat(c,
@@ -3498,7 +3507,10 @@ int processCommand(client *c) {
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
+ addReplyErrorFormat(c,
+ "Can't execute '%s': only (P)SUBSCRIBE / "
+ "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
+ c->cmd->name);
return C_OK;
}
@@ -4216,7 +4228,8 @@ sds genRedisInfoString(const char *section) {
"active_defrag_misses:%lld\r\n"
"active_defrag_key_hits:%lld\r\n"
"active_defrag_key_misses:%lld\r\n"
- "tracking_used_slots:%lld\r\n",
+ "tracking_total_keys:%lld\r\n"
+ "tracking_total_items:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -4244,7 +4257,8 @@ sds genRedisInfoString(const char *section) {
server.stat_active_defrag_misses,
server.stat_active_defrag_key_hits,
server.stat_active_defrag_key_misses,
- trackingGetUsedSlots());
+ (unsigned long long) trackingGetTotalKeys(),
+ (unsigned long long) trackingGetTotalItems());
}
/* Replication */
diff --git a/src/server.h b/src/server.h
index 8e354c03d..87c293c26 100644
--- a/src/server.h
+++ b/src/server.h
@@ -247,6 +247,11 @@ typedef long long ustime_t; /* microsecond time type. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
+#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
+#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */
+#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
+#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
+ depending on optin/optout mode. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -335,7 +340,7 @@ typedef long long ustime_t; /* microsecond time type. */
/* Anti-warning macro... */
#define UNUSED(V) ((void) V)
-#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
+#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* Append only defines */
@@ -413,8 +418,8 @@ typedef long long ustime_t; /* microsecond time type. */
#define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */
#define NOTIFY_STREAM (1<<10) /* t */
-#define NOTIFY_KEY_MISS (1<<11) /* m */
-#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */
+#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */
+#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
/* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
@@ -822,7 +827,9 @@ typedef struct client {
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
-
+ rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
+ subscribed to in BCAST mode, in the
+ context of client side caching. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
@@ -1306,7 +1313,7 @@ struct redisServer {
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
- int tracking_table_max_fill; /* Max fill percentage. */
+ size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@@ -1385,6 +1392,7 @@ struct redisServer {
dict *latency_events;
/* ACLs */
char *acl_filename; /* ACL Users file. NULL if not configured. */
+ unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */
/* Assert & bug reporting */
const char *assert_failed;
const char *assert_file;
@@ -1647,13 +1655,15 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
/* Client side caching (tracking mode) */
-void enableTracking(client *c, uint64_t redirect_to);
+void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
-unsigned long long trackingGetUsedSlots(void);
+uint64_t trackingGetTotalItems(void);
+uint64_t trackingGetTotalKeys(void);
+void trackingBroadcastInvalidationMessages(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
@@ -1820,11 +1830,12 @@ void ACLInit(void);
#define ACL_OK 0
#define ACL_DENIED_CMD 1
#define ACL_DENIED_KEY 2
+#define ACL_DENIED_AUTH 3 /* Only used for ACL LOG entries. */
int ACLCheckUserCredentials(robj *username, robj *password);
int ACLAuthenticateUser(client *c, robj *username, robj *password);
unsigned long ACLGetCommandID(const char *cmdname);
user *ACLGetUserByName(const char *name, size_t namelen);
-int ACLCheckCommandPerm(client *c);
+int ACLCheckCommandPerm(client *c, int *keyidxptr);
int ACLSetUser(user *u, const char *op, ssize_t oplen);
sds ACLDefaultUserFirstPassword(void);
uint64_t ACLGetCommandCategoryFlagByName(const char *name);
@@ -1836,6 +1847,7 @@ void ACLLoadUsersAtStartup(void);
void addReplyCommandCategories(client *c, struct redisCommand *cmd);
user *ACLCreateUnlinkedUser();
void ACLFreeUserAndKillClients(user *u);
+void addACLLogEntry(client *c, int reason, int keypos, sds username);
/* Sorted sets data type */
@@ -2042,6 +2054,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
+#define EMPTYDB_BACKUP (1<<2) /* DB array is a backup for REPL_DISKLESS_LOAD_SWAPDB. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags);
@@ -2074,6 +2087,7 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
+int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
/* Cluster */
void clusterInit(void);
diff --git a/src/t_stream.c b/src/t_stream.c
index 0f0f97a1d..e1efc6bca 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -848,7 +848,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[11] = createStringObject("JUSTID",6);
argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
- propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
@@ -875,7 +875,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
- propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
@@ -1850,6 +1850,8 @@ NULL
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
c->argv[2],c->db->id);
+ /* We want to unblock any XREADGROUP consumers with -NOGROUP. */
+ signalKeyAsReady(c->db,c->argv[2]);
} else {
addReply(c,shared.czero);
}
diff --git a/src/tracking.c b/src/tracking.c
index acb97800a..45f83103a 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -30,39 +30,34 @@
#include "server.h"
-/* The tracking table is constituted by 2^24 radix trees (each tree, and the
- * table itself, are allocated in a lazy way only when needed) tracking
- * clients that may have certain keys in their local, client side, cache.
- *
- * Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
- * slots, however here the function we use is crc64, taking the least
- * significant 24 bits of the output.
+/* The tracking table is constituted by a radix tree of keys, each pointing
+ * to a radix tree of client IDs, used to track the clients that may have
+ * certain keys in their local, client side, cache.
*
* When a client enables tracking with "CLIENT TRACKING on", each key served to
- * the client is hashed to one of such slots, and Redis will remember what
- * client may have keys about such slot. Later, when a key in a given slot is
- * modified, all the clients that may have local copies of keys in that slot
- * will receive an invalidation message. There is no distinction of database
- * number: a single table is used.
+ * the client is remembered in the table mapping the keys to the client IDs.
+ * Later, when a key is modified, all the clients that may have local copy
+ * of such key will receive an invalidation message.
*
* Clients will normally take frequently requested objects in memory, removing
- * them when invalidation messages are received. A strategy clients may use is
- * to just cache objects in a dictionary, associating to each cached object
- * some incremental epoch, or just a timestamp. When invalidation messages are
- * received clients may store, in a different table, the timestamp (or epoch)
- * of the invalidation of such given slot: later when accessing objects, the
- * eviction of stale objects may be performed in a lazy way by checking if the
- * cached object timestamp is older than the invalidation timestamp for such
- * objects.
- *
- * The output of the 24 bit hash function is very large (more than 16 million
- * possible slots), so clients that may want to use less resources may only
- * use the most significant bits instead of the full 24 bits. */
-#define TRACKING_TABLE_SIZE (1<<24)
-rax **TrackingTable = NULL;
-unsigned long TrackingTableUsedSlots = 0;
+ * them when invalidation messages are received. */
+rax *TrackingTable = NULL;
+rax *PrefixTable = NULL;
+uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
+ the whole tracking table. This gives
+ an hint about the total memory we
+ are using server side for CSC. */
robj *TrackingChannelName;
+/* This is the structure that we have as value of the PrefixTable, and
+ * represents the list of keys modified, and the list of clients that need
+ * to be notified, for a given prefix. */
+typedef struct bcastState {
+ rax *keys; /* Keys modified in the current event loop cycle. */
+ rax *clients; /* Clients subscribed to the notification events for this
+ prefix. */
+} bcastState;
+
/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
@@ -70,9 +65,56 @@ robj *TrackingChannelName;
* client with many entries in the table is removed, it would cost a lot of
* time to do the cleanup. */
void disableTracking(client *c) {
+ /* If this client is in broadcasting mode, we need to unsubscribe it
+ * from all the prefixes it is registered to. */
+ if (c->flags & CLIENT_TRACKING_BCAST) {
+ raxIterator ri;
+ raxStart(&ri,c->client_tracking_prefixes);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
+ serverAssert(bs != raxNotFound);
+ raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
+ /* Was it the last client? Remove the prefix from the
+ * table. */
+ if (raxSize(bs->clients) == 0) {
+ raxFree(bs->clients);
+ raxFree(bs->keys);
+ zfree(bs);
+ raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
+ }
+ }
+ raxStop(&ri);
+ raxFree(c->client_tracking_prefixes);
+ c->client_tracking_prefixes = NULL;
+ }
+
+ /* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
server.tracking_clients--;
- c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
+ c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
+ CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
+ CLIENT_TRACKING_OPTOUT);
+ }
+}
+
+/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
+ * already registered for the specified prefix, no operation is performed. */
+void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
+ bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
+ /* If this is the first client subscribing to such prefix, create
+ * the prefix in the table. */
+ if (bs == raxNotFound) {
+ bs = zmalloc(sizeof(*bs));
+ bs->keys = raxNew();
+ bs->clients = raxNew();
+ raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
+ }
+ if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
+ if (c->client_tracking_prefixes == NULL)
+ c->client_tracking_prefixes = raxNew();
+ raxInsert(c->client_tracking_prefixes,
+ (unsigned char*)prefix,plen,NULL,NULL);
}
}
@@ -83,24 +125,43 @@ void disableTracking(client *c) {
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
-void enableTracking(client *c, uint64_t redirect_to) {
- if (c->flags & CLIENT_TRACKING) return;
+void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
+ if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
- c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
+ c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
+ CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
c->client_tracking_redirection = redirect_to;
- server.tracking_clients++;
if (TrackingTable == NULL) {
- TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
+ TrackingTable = raxNew();
+ PrefixTable = raxNew();
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
+
+ if (options & CLIENT_TRACKING_BCAST) {
+ c->flags |= CLIENT_TRACKING_BCAST;
+ if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
+ for (size_t j = 0; j < numprefix; j++) {
+ sds sdsprefix = prefix[j]->ptr;
+ enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
+ }
+ }
+ c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
}
-/* This function is called after the excution of a readonly command in the
- * case the client 'c' has keys tracking enabled. It will populate the
- * tracking ivalidation table according to the keys the user fetched, so that
- * Redis will know what are the clients that should receive an invalidation
- * message with certain groups of keys are modified. */
+/* This function is called after the execution of a readonly command in the
+ * case the client 'c' has keys tracking enabled and the tracking is not
+ * in BCAST mode. It will populate the tracking invalidation table according
+ * to the keys the user fetched, so that Redis will know what are the clients
+ * that should receive an invalidation message with certain groups of keys
+ * are modified. */
void trackingRememberKeys(client *c) {
+ /* Return if we are in optin/out mode and the right CACHING command
+ * was/wasn't given in order to modify the default behavior. */
+ uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
+ uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
+ uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
+ if ((optin && !caching_given) || (optout && caching_given)) return;
+
int numkeys;
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
if (keys == NULL) return;
@@ -108,19 +169,30 @@ void trackingRememberKeys(client *c) {
for(int j = 0; j < numkeys; j++) {
int idx = keys[j];
sds sdskey = c->argv[idx]->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- if (TrackingTable[hash] == NULL) {
- TrackingTable[hash] = raxNew();
- TrackingTableUsedSlots++;
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) {
+ ids = raxNew();
+ int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
+ sdslen(sdskey),ids, NULL);
+ serverAssert(inserted == 1);
}
- raxTryInsert(TrackingTable[hash],
- (unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
+ if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
+ TrackingTableTotalItems++;
}
getKeysFreeResult(keys);
}
-void sendTrackingMessage(client *c, long long hash) {
+/* Given a key name, this function sends an invalidation message in the
+ * proper channel (depending on RESP version: PubSub or Push message) and
+ * to the proper client (in case fo redirection), in the context of the
+ * client 'c' with tracking enabled.
+ *
+ * In case the 'proto' argument is non zero, the function will assume that
+ * 'keyname' points to a buffer of 'keylen' bytes already expressed in the
+ * form of Redis RESP protocol, representing an array of keys to send
+ * to the client as value of the invalidation. This is used in BCAST mode
+ * in order to optimized the implementation to use less CPU time. */
+void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@@ -146,36 +218,45 @@ void sendTrackingMessage(client *c, long long hash) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
- addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
- robj *msg = createStringObjectFromLongLong(hash);
- addReplyPubsubMessage(c,TrackingChannelName,msg);
- decrRefCount(msg);
+ /* We use a static object to speedup things, however we assume
+ * that addReplyPubsubMessage() will not take a reference. */
+ addReplyPubsubMessage(c,TrackingChannelName,NULL);
+ } else {
+ /* If are here, the client is not using RESP3, nor is
+ * redirecting to another client. We can't send anything to
+ * it since RESP2 does not support push messages in the same
+ * connection. */
+ return;
}
-}
-/* Invalidates a caching slot: this is actually the low level implementation
- * of the API that Redis calls externally, that is trackingInvalidateKey(). */
-void trackingInvalidateSlot(uint64_t slot) {
- if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
+ /* Send the "value" part, which is the array of keys. */
+ if (proto) {
+ addReplyProto(c,keyname,keylen);
+ } else {
+ addReplyArrayLen(c,1);
+ addReplyBulkCBuffer(c,keyname,keylen);
+ }
+}
+/* This function is called when a key is modified in Redis and in the case
+ * we have at least one client with the BCAST mode enabled.
+ * Its goal is to set the key in the right broadcast state if the key
+ * matches one or more prefixes in the prefix table. Later when we
+ * return to the event loop, we'll send invalidation messages to the
+ * clients subscribed to each prefix. */
+void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxIterator ri;
- raxStart(&ri,TrackingTable[slot]);
+ raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
- uint64_t id;
- memcpy(&id,ri.key,sizeof(id));
- client *c = lookupClientByID(id);
- if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
- sendTrackingMessage(c,slot);
+ if (ri.key_len > keylen) continue;
+ if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
+ continue;
+ bcastState *bs = ri.data;
+ raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
-
- /* Free the tracking table: we'll create the radix tree and populate it
- * again if more keys will be modified in this caching slot. */
- raxFree(TrackingTable[slot]);
- TrackingTable[slot] = NULL;
- TrackingTableUsedSlots--;
}
/* This function is called from signalModifiedKey() or other places in Redis
@@ -183,28 +264,55 @@ void trackingInvalidateSlot(uint64_t slot) {
* to send a notification to every client that may have keys about such caching
* slot. */
void trackingInvalidateKey(robj *keyobj) {
- if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
-
+ if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
- uint64_t hash = crc64(0,
- (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
- trackingInvalidateSlot(hash);
+
+ if (raxSize(PrefixTable) > 0)
+ trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
+
+ rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
+ if (ids == raxNotFound) return;;
+
+ raxIterator ri;
+ raxStart(&ri,ids);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ uint64_t id;
+ memcpy(&id,ri.key,sizeof(id));
+ client *c = lookupClientByID(id);
+ /* Note that if the client is in BCAST mode, we don't want to
+ * send invalidation messages that were pending in the case
+ * previously the client was not in BCAST mode. This can happen if
+ * TRACKING is enabled normally, and then the client switches to
+ * BCAST mode. */
+ if (c == NULL ||
+ !(c->flags & CLIENT_TRACKING)||
+ c->flags & CLIENT_TRACKING_BCAST)
+ {
+ continue;
+ }
+ sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
+ }
+ raxStop(&ri);
+
+ /* Free the tracking table: we'll create the radix tree and populate it
+ * again if more keys will be modified in this caching slot. */
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
}
/* This function is called when one or all the Redis databases are flushed
- * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for
- * each DB but are global: currently what we do is sending a special
+ * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for
+ * each DB but are global: currently what we do is send a special
* notification to clients with tracking enabled, invalidating the caching
- * slot "-1", which means, "all the keys", in order to avoid flooding clients
+ * key "", which means, "all the keys", in order to avoid flooding clients
* with many invalidation messages for all the keys they may hold.
- *
- * However trying to flush the tracking table here is very costly:
- * we need scanning 16 million caching slots in the table to check
- * if they are used, this introduces a big delay. So what we do is to really
- * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead
- * we just send the invalidation message to all the clients, but don't
- * flush the table: it will slowly get garbage collected as more keys
- * are modified in the used caching slots. */
+ */
+void freeTrackingRadixTree(void *rt) {
+ raxFree(rt);
+}
+
void trackingInvalidateKeysOnFlush(int dbid) {
if (server.tracking_clients) {
listNode *ln;
@@ -213,84 +321,131 @@ void trackingInvalidateKeysOnFlush(int dbid) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
- sendTrackingMessage(c,-1);
+ sendTrackingMessage(c,"",1,0);
}
}
}
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) {
- for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
- if (TrackingTable[j] != NULL) {
- raxFree(TrackingTable[j]);
- TrackingTable[j] = NULL;
- TrackingTableUsedSlots--;
- }
- }
-
- /* If there are no clients with tracking enabled, we can even
- * reclaim the memory used by the table itself. The code assumes
- * the table is allocated only if there is at least one client alive
- * with tracking enabled. */
- if (server.tracking_clients == 0) {
- zfree(TrackingTable);
- TrackingTable = NULL;
- }
+ raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
+ TrackingTable = raxNew();
+ TrackingTableTotalItems = 0;
}
}
/* Tracking forces Redis to remember information about which client may have
- * keys about certian caching slots. In workloads where there are a lot of
- * reads, but keys are hardly modified, the amount of information we have
- * to remember server side could be a lot: for each 16 millions of caching
- * slots we may end with a radix tree containing many entries.
+ * certain keys. In workloads where there are a lot of reads, but keys are
+ * hardly modified, the amount of information we have to remember server side
+ * could be a lot, with the number of keys being totally not bound.
*
- * So Redis allows the user to configure a maximum fill rate for the
+ * So Redis allows the user to configure a maximum number of keys for the
* invalidation table. This function makes sure that we don't go over the
* specified fill rate: if we are over, we can just evict informations about
- * random caching slots, and send invalidation messages to clients like if
- * the key was modified. */
+ * a random key, and send invalidation messages to clients like if the key was
+ * modified. */
void trackingLimitUsedSlots(void) {
static unsigned int timeout_counter = 0;
-
- if (server.tracking_table_max_fill == 0) return; /* No limits set. */
- unsigned int max_slots =
- (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
- if (TrackingTableUsedSlots <= max_slots) {
+ if (TrackingTable == NULL) return;
+ if (server.tracking_table_max_keys == 0) return; /* No limits set. */
+ size_t max_keys = server.tracking_table_max_keys;
+ if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
return; /* Limit not reached. */
}
- /* We have to invalidate a few slots to reach the limit again. The effort
+ /* We have to invalidate a few keys to reach the limit again. The effort
* we do here is proportional to the number of times we entered this
* function and found that we are still over the limit. */
int effort = 100 * (timeout_counter+1);
- /* Let's start at a random position, and perform linear probing, in order
- * to improve cache locality. However once we are able to find an used
- * slot, jump again randomly, in order to avoid creating big holes in the
- * table (that will make this funciton use more resourced later). */
+ /* We just remove one key after another by using a random walk. */
+ raxIterator ri;
+ raxStart(&ri,TrackingTable);
while(effort > 0) {
- unsigned int idx = rand() % TRACKING_TABLE_SIZE;
- do {
- effort--;
- idx = (idx+1) % TRACKING_TABLE_SIZE;
- if (TrackingTable[idx] != NULL) {
- trackingInvalidateSlot(idx);
- if (TrackingTableUsedSlots <= max_slots) {
- timeout_counter = 0;
- return; /* Return ASAP: we are again under the limit. */
- } else {
- break; /* Jump to next random position. */
- }
- }
- } while(effort > 0);
+ effort--;
+ raxSeek(&ri,"^",NULL,0);
+ raxRandomWalk(&ri,0);
+ rax *ids = ri.data;
+ TrackingTableTotalItems -= raxSize(ids);
+ raxFree(ids);
+ raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
+ if (raxSize(TrackingTable) <= max_keys) {
+ timeout_counter = 0;
+ raxStop(&ri);
+ return; /* Return ASAP: we are again under the limit. */
+ }
}
+
+ /* If we reach this point, we were not able to go under the configured
+ * limit using the maximum effort we had for this run. */
+ raxStop(&ri);
timeout_counter++;
}
+/* This function will run the prefixes of clients in BCAST mode and
+ * keys that were modified about each prefix, and will send the
+ * notifications to each client in each prefix. */
+void trackingBroadcastInvalidationMessages(void) {
+ raxIterator ri, ri2;
+
+ /* Return ASAP if there is nothing to do here. */
+ if (TrackingTable == NULL || !server.tracking_clients) return;
+
+ raxStart(&ri,PrefixTable);
+ raxSeek(&ri,"^",NULL,0);
+ while(raxNext(&ri)) {
+ bcastState *bs = ri.data;
+ if (raxSize(bs->keys)) {
+ /* Create the array reply with the list of keys once, then send
+ * it to all the clients subscribed to this prefix. */
+ char buf[32];
+ size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
+ sds proto = sdsempty();
+ proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
+ proto = sdscatlen(proto,"*",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ raxStart(&ri2,bs->keys);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ len = ll2string(buf,sizeof(buf),ri2.key_len);
+ proto = sdscatlen(proto,"$",1);
+ proto = sdscatlen(proto,buf,len);
+ proto = sdscatlen(proto,"\r\n",2);
+ proto = sdscatlen(proto,ri2.key,ri2.key_len);
+ proto = sdscatlen(proto,"\r\n",2);
+ }
+ raxStop(&ri2);
+
+ /* Send this array of keys to every client in the list. */
+ raxStart(&ri2,bs->clients);
+ raxSeek(&ri2,"^",NULL,0);
+ while(raxNext(&ri2)) {
+ client *c;
+ memcpy(&c,ri2.key,sizeof(c));
+ sendTrackingMessage(c,proto,sdslen(proto),1);
+ }
+ raxStop(&ri2);
+
+ /* Clean up: we can remove everything from this state, because we
+ * want to only track the new keys that will be accumulated starting
+ * from now. */
+ sdsfree(proto);
+ }
+ raxFree(bs->keys);
+ bs->keys = raxNew();
+ }
+ raxStop(&ri);
+}
+
/* This is just used in order to access the amount of used slots in the
* tracking table. */
-unsigned long long trackingGetUsedSlots(void) {
- return TrackingTableUsedSlots;
+uint64_t trackingGetTotalItems(void) {
+ return TrackingTableTotalItems;
+}
+
+uint64_t trackingGetTotalKeys(void) {
+ if (TrackingTable == NULL) return 0;
+ return raxSize(TrackingTable);
}
diff --git a/src/util.c b/src/util.c
index 20471b539..2be42a0df 100644
--- a/src/util.c
+++ b/src/util.c
@@ -471,13 +471,14 @@ int string2ld(const char *s, size_t slen, long double *dp) {
long double value;
char *eptr;
- if (slen >= sizeof(buf)) return 0;
+ if (slen == 0 || slen >= sizeof(buf)) return 0;
memcpy(buf,s,slen);
buf[slen] = '\0';
errno = 0;
value = strtold(buf, &eptr);
if (isspace(buf[0]) || eptr[0] != '\0' ||
+ (size_t)(eptr-buf) != slen ||
(errno == ERANGE &&
(value == HUGE_VAL || value == -HUGE_VAL || value == 0)) ||
errno == EINVAL ||
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c
index 959918b1c..10dc65b1a 100644
--- a/tests/modules/blockonkeys.c
+++ b/tests/modules/blockonkeys.c
@@ -172,13 +172,13 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
- long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx);
+ long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
fsl_t *fsl;
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
return REDISMODULE_ERR;
- if (!fsl || fsl->list[fsl->length-1] <= gt)
+ if (!fsl || fsl->list[fsl->length-1] <= *pgt)
return REDISMODULE_ERR;
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
@@ -192,10 +192,8 @@ int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int a
}
void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
- /* Nothing to do because privdata is actually a 'long long',
- * not a pointer to the heap */
REDISMODULE_NOT_USED(ctx);
- REDISMODULE_NOT_USED(privdata);
+ RedisModule_Free(privdata);
}
/* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
@@ -217,9 +215,12 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK;
if (!fsl || fsl->list[fsl->length-1] <= gt) {
+ /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
+ long long *pgt = RedisModule_Alloc(sizeof(long long));
+ *pgt = gt;
/* Key is empty or has <2 elements, we must block */
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
- bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt);
+ bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
} else {
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
}
diff --git a/tests/modules/fork.c b/tests/modules/fork.c
index 1a139ef1b..0443d9ef0 100644
--- a/tests/modules/fork.c
+++ b/tests/modules/fork.c
@@ -42,7 +42,7 @@ int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
/* child */
RedisModule_Log(ctx, "notice", "fork child started");
- usleep(200000);
+ usleep(500000);
RedisModule_Log(ctx, "notice", "fork child exiting");
RedisModule_ExitFromChild(code_to_exit_with);
/* unreachable */
diff --git a/tests/modules/misc.c b/tests/modules/misc.c
index b5a032f60..1048d5065 100644
--- a/tests/modules/misc.c
+++ b/tests/modules/misc.c
@@ -74,6 +74,19 @@ int test_ld_conv(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_ReplyWithError(ctx, err);
goto final;
}
+
+ /* Make sure we can't convert a string that has \0 in it */
+ char buf[4] = "123";
+ buf[1] = '\0';
+ RedisModuleString *s3 = RedisModule_CreateString(ctx, buf, 3);
+ long double ld3;
+ if (RedisModule_StringToLongDouble(s3, &ld3) == REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid string successfully converted to long double");
+ RedisModule_FreeString(ctx, s3);
+ goto final;
+ }
+ RedisModule_FreeString(ctx, s3);
+
RedisModule_ReplyWithLongDouble(ctx, ld2);
final:
RedisModule_FreeString(ctx, s1);
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index b20f1ad36..d086366dc 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -53,6 +53,7 @@ proc kill_server config {
}
# kill server and wait for the process to be totally exited
+ send_data_packet $::test_server_fd server-killing $pid
catch {exec kill $pid}
if {$::valgrind} {
set max_wait 60000
@@ -140,6 +141,18 @@ proc tags {tags code} {
uplevel 1 $code
set ::tags [lrange $::tags 0 end-[llength $tags]]
}
+
+# Write the configuration in the dictionary 'config' in the specified
+# file name.
+proc create_server_config_file {filename config} {
+ set fp [open $filename w+]
+ foreach directive [dict keys $config] {
+ puts -nonewline $fp "$directive "
+ puts $fp [dict get $config $directive]
+ }
+ close $fp
+}
+
proc start_server {options {code undefined}} {
# If we are running against an external server, we just push the
# host/port pair in the stack the first time
@@ -221,56 +234,91 @@ proc start_server {options {code undefined}} {
# write new configuration to temporary file
set config_file [tmpfile redis.conf]
- set fp [open $config_file w+]
- foreach directive [dict keys $config] {
- puts -nonewline $fp "$directive "
- puts $fp [dict get $config $directive]
- }
- close $fp
+ create_server_config_file $config_file $config
set stdout [format "%s/%s" [dict get $config "dir"] "stdout"]
set stderr [format "%s/%s" [dict get $config "dir"] "stderr"]
- if {$::valgrind} {
- set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
- } elseif ($::stack_logging) {
- set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &]
- } else {
- set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
- }
+ # We need a loop here to retry with different ports.
+ set server_started 0
+ while {$server_started == 0} {
+ if {$::verbose} {
+ puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
+ }
- # Tell the test server about this new instance.
- send_data_packet $::test_server_fd server-spawned $pid
+ send_data_packet $::test_server_fd "server-spawning" "port $::port"
- # check that the server actually started
- # ugly but tries to be as fast as possible...
- if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
+ if {$::valgrind} {
+ set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
+ } elseif ($::stack_logging) {
+ set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &]
+ } else {
+ set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
+ }
- if {$::verbose} {
- puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
- }
+ # Tell the test server about this new instance.
+ send_data_packet $::test_server_fd server-spawned $pid
+
+ # check that the server actually started
+ # ugly but tries to be as fast as possible...
+ if {$::valgrind} {set retrynum 1000} else {set retrynum 100}
+
+ # Wait for actual startup
+ set checkperiod 100; # Milliseconds
+ set maxiter [expr {120*1000/100}] ; # Wait up to 2 minutes.
+ set port_busy 0
+ while {![info exists _pid]} {
+ regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
+ after $checkperiod
+ incr maxiter -1
+ if {$maxiter == 0} {
+ start_server_error $config_file "No PID detected in log $stdout"
+ puts "--- LOG CONTENT ---"
+ puts [exec cat $stdout]
+ puts "-------------------"
+ break
+ }
- if {$code ne "undefined"} {
- set serverisup [server_is_up $::host $::port $retrynum]
- } else {
- set serverisup 1
- }
+ # Check if the port is actually busy and the server failed
+ # for this reason.
+ if {[regexp {Could not create server TCP} [exec cat $stdout]]} {
+ set port_busy 1
+ break
+ }
+ }
- if {$::verbose} {
- puts ""
- }
+ # Sometimes we have to try a different port, even if we checked
+ # for availability. Other test clients may grab the port before we
+ # are able to do it for example.
+ if {$port_busy} {
+ puts "Port $::port was already busy, trying another port..."
+ set ::port [find_available_port [expr {$::port+1}]]
+ if {$::tls} {
+ dict set config "tls-port" $::port
+ } else {
+ dict set config port $::port
+ }
+ create_server_config_file $config_file $config
+ continue; # Try again
+ }
- if {!$serverisup} {
- set err {}
- append err [exec cat $stdout] "\n" [exec cat $stderr]
- start_server_error $config_file $err
- return
- }
+ if {$code ne "undefined"} {
+ set serverisup [server_is_up $::host $::port $retrynum]
+ } else {
+ set serverisup 1
+ }
- # Wait for actual startup
- while {![info exists _pid]} {
- regexp {PID:\s(\d+)} [exec cat $stdout] _ _pid
- after 100
+ if {$::verbose} {
+ puts ""
+ }
+
+ if {!$serverisup} {
+ set err {}
+ append err [exec cat $stdout] "\n" [exec cat $stderr]
+ start_server_error $config_file $err
+ return
+ }
+ set server_started 1
}
# setup properties to be able to initialize a client object
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index cb7e4e328..fa5579669 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -87,7 +87,7 @@ set ::file ""; # If set, runs only the tests in this comma separated list
set ::curfile ""; # Hold the filename of the current suite
set ::accurate 0; # If true runs fuzz tests with more iterations
set ::force_failure 0
-set ::timeout 600; # 10 minutes without progresses will quit the test.
+set ::timeout 1200; # 20 minutes without progresses will quit the test.
set ::last_progress [clock seconds]
set ::active_servers {} ; # Pids of active Redis instances.
set ::dont_clean 0
@@ -289,7 +289,7 @@ proc read_from_test_client fd {
puts "\[$completed_tests_count/$all_tests_count [colorstr yellow $status]\]: $data ($elapsed seconds)"
lappend ::clients_time_history $elapsed $data
signal_idle_client $fd
- set ::active_clients_task($fd) DONE
+ set ::active_clients_task($fd) "(DONE) $data"
} elseif {$status eq {ok}} {
if {!$::quiet} {
puts "\[[colorstr green $status]\]: $data"
@@ -320,10 +320,16 @@ proc read_from_test_client fd {
exit 1
} elseif {$status eq {testing}} {
set ::active_clients_task($fd) "(IN PROGRESS) $data"
+ } elseif {$status eq {server-spawning}} {
+ set ::active_clients_task($fd) "(SPAWNING SERVER) $data"
} elseif {$status eq {server-spawned}} {
lappend ::active_servers $data
+ set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data"
+ } elseif {$status eq {server-killing}} {
+ set ::active_clients_task($fd) "(KILLING SERVER) pid:$data"
} elseif {$status eq {server-killed}} {
set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data]
+ set ::active_clients_task($fd) "(KILLED SERVER) pid:$data"
} else {
if {!$::quiet} {
puts "\[$status\]: $data"
@@ -333,7 +339,7 @@ proc read_from_test_client fd {
proc show_clients_state {} {
# The following loop is only useful for debugging tests that may
- # enter an infinite loop. Commented out normally.
+ # enter an infinite loop.
foreach x $::active_clients {
if {[info exist ::active_clients_task($x)]} {
puts "$x => $::active_clients_task($x)"
@@ -363,8 +369,6 @@ proc signal_idle_client fd {
set ::active_clients \
[lsearch -all -inline -not -exact $::active_clients $fd]
- if 0 {show_clients_state}
-
# New unit to process?
if {$::next_test != [llength $::all_tests]} {
if {!$::quiet} {
@@ -380,6 +384,7 @@ proc signal_idle_client fd {
}
} else {
lappend ::idle_clients $fd
+ set ::active_clients_task($fd) "SLEEPING, no more units to assign"
if {[llength $::active_clients] == 0} {
the_end
}
diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl
index 2205d2d86..fc1664a75 100644
--- a/tests/unit/acl.tcl
+++ b/tests/unit/acl.tcl
@@ -141,4 +141,111 @@ start_server {tags {"acl"}} {
r ACL setuser newuser -debug
# The test framework will detect a leak if any.
}
+
+ test {ACL LOG shows failed command executions at toplevel} {
+ r ACL LOG RESET
+ r ACL setuser antirez >foo on +set ~object:1234
+ r ACL setuser antirez +eval +multi +exec
+ r AUTH antirez foo
+ catch {r GET foo}
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry username] eq {antirez}}
+ assert {[dict get $entry context] eq {toplevel}}
+ assert {[dict get $entry reason] eq {command}}
+ assert {[dict get $entry object] eq {get}}
+ }
+
+ test {ACL LOG is able to test similar events} {
+ r AUTH antirez foo
+ catch {r GET foo}
+ catch {r GET foo}
+ catch {r GET foo}
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry count] == 4}
+ }
+
+ test {ACL LOG is able to log keys access violations and key name} {
+ r AUTH antirez foo
+ catch {r SET somekeynotallowed 1234}
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry reason] eq {key}}
+ assert {[dict get $entry object] eq {somekeynotallowed}}
+ }
+
+ test {ACL LOG RESET is able to flush the entries in the log} {
+ r ACL LOG RESET
+ assert {[llength [r ACL LOG]] == 0}
+ }
+
+ test {ACL LOG can distinguish the transaction context (1)} {
+ r AUTH antirez foo
+ r MULTI
+ catch {r INCR foo}
+ catch {r EXEC}
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry context] eq {multi}}
+ assert {[dict get $entry object] eq {incr}}
+ }
+
+ test {ACL LOG can distinguish the transaction context (2)} {
+ set rd1 [redis_deferring_client]
+ r ACL SETUSER antirez +incr
+
+ r AUTH antirez foo
+ r MULTI
+ r INCR object:1234
+ $rd1 ACL SETUSER antirez -incr
+ $rd1 read
+ catch {r EXEC}
+ $rd1 close
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry context] eq {multi}}
+ assert {[dict get $entry object] eq {incr}}
+ r ACL SETUSER antirez -incr
+ }
+
+ test {ACL can log errors in the context of Lua scripting} {
+ r AUTH antirez foo
+ catch {r EVAL {redis.call('incr','foo')} 0}
+ r AUTH default ""
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry context] eq {lua}}
+ assert {[dict get $entry object] eq {incr}}
+ }
+
+ test {ACL LOG can accept a numerical argument to show less entries} {
+ r AUTH antirez foo
+ catch {r INCR foo}
+ catch {r INCR foo}
+ catch {r INCR foo}
+ catch {r INCR foo}
+ r AUTH default ""
+ assert {[llength [r ACL LOG]] > 1}
+ assert {[llength [r ACL LOG 2]] == 2}
+ }
+
+ test {ACL LOG can log failed auth attempts} {
+ catch {r AUTH antirez wrong-password}
+ set entry [lindex [r ACL LOG] 0]
+ assert {[dict get $entry context] eq {toplevel}}
+ assert {[dict get $entry reason] eq {auth}}
+ assert {[dict get $entry object] eq {AUTH}}
+ assert {[dict get $entry username] eq {antirez}}
+ }
+
+ test {ACL LOG entries are limited to a maximum amount} {
+ r ACL LOG RESET
+ r CONFIG SET acllog-max-len 5
+ r AUTH antirez foo
+ for {set j 0} {$j < 10} {incr j} {
+ catch {r SET obj:$j 123}
+ }
+ r AUTH default ""
+ assert {[llength [r ACL LOG]] == 5}
+ }
}
diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl
index d152e212c..06b0e07d7 100644
--- a/tests/unit/memefficiency.tcl
+++ b/tests/unit/memefficiency.tcl
@@ -39,6 +39,8 @@ start_server {tags {"memefficiency"}} {
start_server {tags {"defrag"}} {
if {[string match {*jemalloc*} [s mem_allocator]]} {
test "Active defrag" {
+ r config set save "" ;# prevent bgsave from interfereing with save below
+ r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
@@ -46,8 +48,8 @@ start_server {tags {"defrag"}} {
r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 100mb
r config set maxmemory-policy allkeys-lru
- r debug populate 700000 asdf 150
- r debug populate 170000 asdf 300
+ r debug populate 700000 asdf1 150
+ r debug populate 170000 asdf2 300
r ping ;# trigger eviction following the previous population
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
@@ -55,6 +57,11 @@ start_server {tags {"defrag"}} {
puts "frag $frag"
}
assert {$frag >= 1.4}
+
+ r config set latency-monitor-threshold 5
+ r latency reset
+ r config set maxmemory 110mb ;# prevent further eviction (not to fail the digest test)
+ set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# Wait for the active defrag to start working (decision once a
@@ -78,19 +85,37 @@ start_server {tags {"defrag"}} {
# Test the the fragmentation is lower.
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
+ set max_latency 0
+ foreach event [r latency latest] {
+ lassign $event eventname time latency max
+ if {$eventname == "active-defrag-cycle"} {
+ set max_latency $max
+ }
+ }
if {$::verbose} {
puts "frag $frag"
+ puts "max latency $max_latency"
+ puts [r latency latest]
+ puts [r latency history active-defrag-cycle]
}
assert {$frag < 1.1}
+ # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
+ # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
+ assert {$max_latency <= 30}
} else {
set _ ""
}
- } {}
+ # verify the data isn't corrupted or changed
+ set newdigest [r debug digest]
+ assert {$digest eq $newdigest}
+ r save ;# saving an rdb iterates over all the data / pointers
+ } {OK}
test "Active defrag big keys" {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
+ r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
@@ -142,7 +167,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
}
- assert {[r dbsize] == 500010}
+ assert_equal [r dbsize] 500010
# create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} {
@@ -151,7 +176,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies
}
- assert {[r dbsize] == 250010}
+ assert_equal [r dbsize] 250010
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
@@ -200,14 +225,106 @@ start_server {tags {"defrag"}} {
puts [r latency history active-defrag-cycle]
}
assert {$frag < 1.1}
- # due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75,
- # we expect max latency to be not much higher than 75ms
- assert {$max_latency <= 120}
+ # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
+ # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
+ assert {$max_latency <= 30}
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
+
+ test "Active defrag big list" {
+ r flushdb
+ r config resetstat
+ r config set save "" ;# prevent bgsave from interfereing with save below
+ r config set hz 100
+ r config set activedefrag no
+ r config set active-defrag-max-scan-fields 1000
+ r config set active-defrag-threshold-lower 5
+ r config set active-defrag-cycle-min 65
+ r config set active-defrag-cycle-max 75
+ r config set active-defrag-ignore-bytes 2mb
+ r config set maxmemory 0
+ r config set list-max-ziplist-size 5 ;# list of 500k items will have 100k quicklist nodes
+
+ # create big keys with 10k items
+ set rd [redis_deferring_client]
+
+ set expected_frag 1.7
+ # add a mass of list nodes to two lists (allocations are interlaced)
+ set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
+ for {set j 0} {$j < 500000} {incr j} {
+ $rd lpush biglist1 $val
+ $rd lpush biglist2 $val
+ }
+ for {set j 0} {$j < 500000} {incr j} {
+ $rd read ; # Discard replies
+ $rd read ; # Discard replies
+ }
+
+ # create some fragmentation
+ r del biglist2
+
+ # start defrag
+ after 120 ;# serverCron only updates the info once in 100ms
+ set frag [s allocator_frag_ratio]
+ if {$::verbose} {
+ puts "frag $frag"
+ }
+
+ assert {$frag >= $expected_frag}
+ r config set latency-monitor-threshold 5
+ r latency reset
+
+ set digest [r debug digest]
+ catch {r config set activedefrag yes} e
+ if {![string match {DISABLED*} $e]} {
+ # wait for the active defrag to start working (decision once a second)
+ wait_for_condition 50 100 {
+ [s active_defrag_running] ne 0
+ } else {
+ fail "defrag not started."
+ }
+
+ # wait for the active defrag to stop working
+ wait_for_condition 500 100 {
+ [s active_defrag_running] eq 0
+ } else {
+ after 120 ;# serverCron only updates the info once in 100ms
+ puts [r info memory]
+ puts [r info stats]
+ puts [r memory malloc-stats]
+ fail "defrag didn't stop."
+ }
+
+ # test the the fragmentation is lower
+ after 120 ;# serverCron only updates the info once in 100ms
+ set frag [s allocator_frag_ratio]
+ set max_latency 0
+ foreach event [r latency latest] {
+ lassign $event eventname time latency max
+ if {$eventname == "active-defrag-cycle"} {
+ set max_latency $max
+ }
+ }
+ if {$::verbose} {
+ puts "frag $frag"
+ puts "max latency $max_latency"
+ puts [r latency latest]
+ puts [r latency history active-defrag-cycle]
+ }
+ assert {$frag < 1.1}
+ # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
+ # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
+ assert {$max_latency <= 30}
+ }
+ # verify the data isn't corrupted or changed
+ set newdigest [r debug digest]
+ assert {$digest eq $newdigest}
+ r save ;# saving an rdb iterates over all the data / pointers
+ r del biglist1 ;# coverage for quicklistBookmarksClear
+ } {1}
}
}
diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl
index cb99ab1c9..b380227e0 100644
--- a/tests/unit/moduleapi/blockonkeys.tcl
+++ b/tests/unit/moduleapi/blockonkeys.tcl
@@ -45,18 +45,24 @@ start_server {tags {"modules"}} {
test {Module client blocked on keys (with metadata): Timeout} {
r del k
set rd [redis_deferring_client]
+ $rd client id
+ set cid [$rd read]
r fsl.push k 33
$rd fsl.bpopgt k 35 1
assert_equal {Request timedout} [$rd read]
+ r client kill id $cid ;# try to smoke-out client-related memory leak
}
test {Module client blocked on keys (with metadata): Blocked, case 1} {
r del k
set rd [redis_deferring_client]
+ $rd client id
+ set cid [$rd read]
r fsl.push k 33
$rd fsl.bpopgt k 33 0
r fsl.push k 34
assert_equal {34} [$rd read]
+ r client kill id $cid ;# try to smoke-out client-related memory leak
}
test {Module client blocked on keys (with metadata): Blocked, case 2} {
@@ -70,6 +76,35 @@ start_server {tags {"modules"}} {
assert_equal {36} [$rd read]
}
+ test {Module client blocked on keys (with metadata): Blocked, CLIENT KILL} {
+ r del k
+ set rd [redis_deferring_client]
+ $rd client id
+ set cid [$rd read]
+ $rd fsl.bpopgt k 35 0
+ r client kill id $cid ;# try to smoke-out client-related memory leak
+ }
+
+ test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK TIMEOUT} {
+ r del k
+ set rd [redis_deferring_client]
+ $rd client id
+ set cid [$rd read]
+ $rd fsl.bpopgt k 35 0
+ r client unblock $cid timeout ;# try to smoke-out client-related memory leak
+ assert_equal {Request timedout} [$rd read]
+ }
+
+ test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK ERROR} {
+ r del k
+ set rd [redis_deferring_client]
+ $rd client id
+ set cid [$rd read]
+ $rd fsl.bpopgt k 35 0
+ r client unblock $cid error ;# try to smoke-out client-related memory leak
+ assert_error "*unblocked*" {$rd read}
+ }
+
test {Module client blocked on keys does not wake up on wrong type} {
r del k
set rd [redis_deferring_client]
diff --git a/tests/unit/moduleapi/fork.tcl b/tests/unit/moduleapi/fork.tcl
index f7d7e47d5..8535a3382 100644
--- a/tests/unit/moduleapi/fork.tcl
+++ b/tests/unit/moduleapi/fork.tcl
@@ -20,9 +20,8 @@ start_server {tags {"modules"}} {
test {Module fork kill} {
r fork.create 3
- after 20
+ after 250
r fork.kill
- after 100
assert {[count_log_message "fork child started"] eq "2"}
assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}
diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl
index 2543a0377..fb36d0b80 100644
--- a/tests/unit/scripting.tcl
+++ b/tests/unit/scripting.tcl
@@ -741,3 +741,8 @@ start_server {tags {"scripting repl"}} {
}
}
+start_server {tags {"scripting"}} {
+ r script debug sync
+ r eval {return 'hello'} 0
+ r eval {return 'hello'} 0
+}
diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl
new file mode 100644
index 000000000..2058319f7
--- /dev/null
+++ b/tests/unit/tracking.tcl
@@ -0,0 +1,66 @@
+start_server {tags {"tracking"}} {
+ # Create a deferred client we'll use to redirect invalidation
+ # messages to.
+ set rd1 [redis_deferring_client]
+ $rd1 client id
+ set redir [$rd1 read]
+ $rd1 subscribe __redis__:invalidate
+ $rd1 read ; # Consume the SUBSCRIBE reply.
+
+ test {Clients are able to enable tracking and redirect it} {
+ r CLIENT TRACKING on REDIRECT $redir
+ } {*OK}
+
+ test {The other connection is able to get invalidations} {
+ r SET a 1
+ r GET a
+ r INCR a
+ r INCR b ; # This key should not be notified, since it wasn't fetched.
+ set keys [lindex [$rd1 read] 2]
+ assert {[llength $keys] == 1}
+ assert {[lindex $keys 0] eq {a}}
+ }
+
+ test {The client is now able to disable tracking} {
+ # Make sure to add a few more keys in the tracking list
+ # so that we can check for leaks, as a side effect.
+ r MGET a b c d e f g
+ r CLIENT TRACKING off
+ }
+
+ test {Clients can enable the BCAST mode with the empty prefix} {
+ r CLIENT TRACKING on BCAST REDIRECT $redir
+ } {*OK*}
+
+ test {The connection gets invalidation messages about all the keys} {
+ r MSET a 1 b 2 c 3
+ set keys [lsort [lindex [$rd1 read] 2]]
+ assert {$keys eq {a b c}}
+ }
+
+ test {Clients can enable the BCAST mode with prefixes} {
+ r CLIENT TRACKING off
+ r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX a: PREFIX b:
+ r MULTI
+ r INCR a:1
+ r INCR a:2
+ r INCR b:1
+ r INCR b:2
+ r EXEC
+ # Because of the internals, we know we are going to receive
+ # two separated notifications for the two different prefixes.
+ set keys1 [lsort [lindex [$rd1 read] 2]]
+ set keys2 [lsort [lindex [$rd1 read] 2]]
+ set keys [lsort [list {*}$keys1 {*}$keys2]]
+ assert {$keys eq {a:1 a:2 b:1 b:2}}
+ }
+
+ test {Adding prefixes to BCAST mode works} {
+ r CLIENT TRACKING on BCAST REDIRECT $redir PREFIX c:
+ r INCR c:1234
+ set keys [lsort [lindex [$rd1 read] 2]]
+ assert {$keys eq {c:1234}}
+ }
+
+ $rd1 close
+}
diff --git a/tests/unit/type/hash.tcl b/tests/unit/type/hash.tcl
index d2c679d32..9f8a21b1c 100644
--- a/tests/unit/type/hash.tcl
+++ b/tests/unit/type/hash.tcl
@@ -390,6 +390,13 @@ start_server {tags {"hash"}} {
lappend rv [string match "ERR*not*float*" $bigerr]
} {1 1}
+ test {HINCRBYFLOAT fails against hash value that contains a null-terminator in the middle} {
+ r hset h f "1\x002"
+ catch {r hincrbyfloat h f 1} err
+ set rv {}
+ lappend rv [string match "ERR*not*float*" $err]
+ } {1}
+
test {HSTRLEN against the small hash} {
set err {}
foreach k [array names smallhash *] {
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index a59e168ef..072ed14d6 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -161,6 +161,15 @@ start_server {
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
}
+ test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
+ r del mystream
+ r XGROUP CREATE mystream mygroup $ MKSTREAM
+ set rd [redis_deferring_client]
+ $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">"
+ r XGROUP DESTROY mystream mygroup
+ assert_error "*NOGROUP*" {$rd read}
+ }
+
test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream