summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/anet.c17
-rw-r--r--src/anet.h1
-rw-r--r--src/aof.c187
-rw-r--r--src/bitops.c4
-rw-r--r--src/cli_common.c2
-rw-r--r--src/cluster.c36
-rw-r--r--src/cluster.h4
-rw-r--r--src/commands.c172
-rw-r--r--src/commands/bitfield.json158
-rw-r--r--src/commands/bitfield_ro.json1
-rw-r--r--src/commands/geosearch.json154
-rw-r--r--src/commands/geosearchstore.json154
-rw-r--r--src/commands/migrate.json40
-rw-r--r--src/commands/module-loadex.json1
-rw-r--r--src/commands/set.json50
-rw-r--r--src/commands/zrangebylex.json2
-rw-r--r--src/config.c131
-rw-r--r--src/config.h18
-rw-r--r--src/db.c1
-rw-r--r--src/debug.c6
-rw-r--r--src/eval.c52
-rw-r--r--src/evict.c4
-rw-r--r--src/function_lua.c119
-rw-r--r--src/help.h14
-rw-r--r--src/listpack.c3
-rw-r--r--src/listpack.h2
-rw-r--r--src/module.c146
-rw-r--r--src/modules/Makefile16
-rw-r--r--src/monotonic.c10
-rw-r--r--src/monotonic.h11
-rw-r--r--src/networking.c75
-rw-r--r--src/notify.c6
-rw-r--r--src/object.c2
-rw-r--r--src/pubsub.c32
-rw-r--r--src/quicklist.h4
-rw-r--r--src/rdb.c25
-rw-r--r--src/redismodule.h21
-rw-r--r--src/replication.c40
-rw-r--r--src/resp_parser.h4
-rw-r--r--src/script.c43
-rw-r--r--src/script.h2
-rw-r--r--src/script_lua.c318
-rw-r--r--src/script_lua.h5
-rw-r--r--src/sentinel.c2
-rw-r--r--src/server.c249
-rw-r--r--src/server.h34
-rw-r--r--src/t_hash.c77
-rw-r--r--src/t_stream.c32
-rw-r--r--src/t_string.c4
-rw-r--r--src/t_zset.c22
-rw-r--r--src/timeout.c2
-rw-r--r--src/util.c49
-rw-r--r--src/util.h1
-rw-r--r--src/version.h4
54 files changed, 1593 insertions, 976 deletions
diff --git a/src/anet.c b/src/anet.c
index bde460fc8..3ded135b0 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -49,6 +49,8 @@
#include "anet.h"
#include "config.h"
+#define UNUSED(x) (void)(x)
+
static void anetSetError(char *err, const char *fmt, ...)
{
va_list ap;
@@ -680,3 +682,18 @@ error:
close(fds[1]);
return -1;
}
+
+int anetSetSockMarkId(char *err, int fd, uint32_t id) {
+#ifdef HAVE_SOCKOPTMARKID
+ if (setsockopt(fd, SOL_SOCKET, SOCKOPTMARKID, (void *)&id, sizeof(id)) == -1) {
+ anetSetError(err, "setsockopt: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+#else
+ UNUSED(fd);
+ UNUSED(id);
+ anetSetError(err,"anetSetSockMarkid unsupported on this platform");
+ return ANET_OK;
+#endif
+}
diff --git a/src/anet.h b/src/anet.h
index 96238aaf4..ff86e2029 100644
--- a/src/anet.h
+++ b/src/anet.h
@@ -73,5 +73,6 @@ int anetKeepAlive(char *err, int fd, int interval);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type);
int anetPipe(int fds[2], int read_flags, int write_flags);
+int anetSetSockMarkId(char *err, int fd, uint32_t id);
#endif
diff --git a/src/aof.c b/src/aof.c
index 39d452390..dcdcd8828 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -87,7 +87,7 @@ void aofManifestFreeAndUpdate(aofManifest *am);
#define RDB_FORMAT_SUFFIX ".rdb"
#define AOF_FORMAT_SUFFIX ".aof"
#define MANIFEST_NAME_SUFFIX ".manifest"
-#define MANIFEST_TEMP_NAME_PREFIX "temp_"
+#define TEMP_FILE_NAME_PREFIX "temp-"
/* AOF manifest key. */
#define AOF_MANIFEST_KEY_FILE_NAME "file"
@@ -169,7 +169,7 @@ sds getAofManifestFileName() {
}
sds getTempAofManifestFileName() {
- return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX,
+ return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX,
server.aof_filename, MANIFEST_NAME_SUFFIX);
}
@@ -462,6 +462,12 @@ sds getNewIncrAofName(aofManifest *am) {
return ai->file_name;
}
+/* Get temp INCR type AOF name. */
+sds getTempIncrAofName() {
+ return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename,
+ INCR_FILE_SUFFIX);
+}
+
/* Get the last INCR AOF name or create a new one. */
sds getLastIncrAofName(aofManifest *am) {
serverAssert(am != NULL);
@@ -674,6 +680,17 @@ int aofDelHistoryFiles(void) {
return persistAofManifest(server.aof_manifest);
}
+/* Used to clean up temp INCR AOF when AOFRW fails. */
+void aofDelTempIncrAofFile() {
+ sds aof_filename = getTempIncrAofName();
+ sds aof_filepath = makePath(server.aof_dirname, aof_filename);
+ serverLog(LL_NOTICE, "Removing the temp incr aof file %s in the background", aof_filename);
+ bg_unlink(aof_filepath);
+ sdsfree(aof_filepath);
+ sdsfree(aof_filename);
+ return;
+}
+
/* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is
* 'AOF_ON', It will do three things:
* 1. Force create a BASE file when redis starts with an empty dataset
@@ -739,44 +756,52 @@ int aofFileExist(char *filename) {
}
/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state`
- * is 'AOF_ON' or 'AOF_WAIT_REWRITE', It will do two things:
+ * is 'AOF_ON', It will do two things:
* 1. Open a new INCR type AOF for writing
* 2. Synchronously update the manifest file to the disk
*
* The above two steps of modification are atomic, that is, if
* any step fails, the entire operation will rollback and returns
* C_ERR, and if all succeeds, it returns C_OK.
+ *
+ * If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF
+ * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be
+ * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file.
* */
int openNewIncrAofForAppend(void) {
serverAssert(server.aof_manifest != NULL);
- int newfd;
+ int newfd = -1;
+ aofManifest *temp_am = NULL;
+ sds new_aof_name = NULL;
/* Only open new INCR AOF when AOF enabled. */
if (server.aof_state == AOF_OFF) return C_OK;
- /* Dup a temp aof_manifest to modify. */
- aofManifest *temp_am = aofManifestDup(server.aof_manifest);
-
/* Open new AOF. */
- sds new_aof_name = getNewIncrAofName(temp_am);
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
+ new_aof_name = getTempIncrAofName();
+ } else {
+ /* Dup a temp aof_manifest to modify. */
+ temp_am = aofManifestDup(server.aof_manifest);
+ new_aof_name = sdsdup(getNewIncrAofName(temp_am));
+ }
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
sdsfree(new_aof_filepath);
if (newfd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
new_aof_name, strerror(errno));
-
- aofManifestFree(temp_am);
- return C_ERR;
+ goto cleanup;
}
- /* Persist AOF Manifest. */
- int ret = persistAofManifest(temp_am);
- if (ret == C_ERR) {
- close(newfd);
- aofManifestFree(temp_am);
- return C_ERR;
+ if (temp_am) {
+ /* Persist AOF Manifest. */
+ if (persistAofManifest(temp_am) == C_ERR) {
+ goto cleanup;
+ }
}
+ sdsfree(new_aof_name);
/* If reaches here, we can safely modify the `server.aof_manifest`
* and `server.aof_fd`. */
@@ -788,8 +813,14 @@ int openNewIncrAofForAppend(void) {
/* Reset the aof_last_incr_size. */
server.aof_last_incr_size = 0;
/* Update `server.aof_manifest`. */
- aofManifestFreeAndUpdate(temp_am);
+ if (temp_am) aofManifestFreeAndUpdate(temp_am);
return C_OK;
+
+cleanup:
+ if (new_aof_name) sdsfree(new_aof_name);
+ if (newfd != -1) close(newfd);
+ if (temp_am) aofManifestFree(temp_am);
+ return C_ERR;
}
/* Whether to limit the execution of Background AOF rewrite.
@@ -815,38 +846,35 @@ int openNewIncrAofForAppend(void) {
#define AOF_REWRITE_LIMITE_THRESHOLD 3
#define AOF_REWRITE_LIMITE_MAX_MINUTES 60 /* 1 hour */
int aofRewriteLimited(void) {
- int limit = 0;
- static int limit_delay_minutes = 0;
+ static int next_delay_minutes = 0;
static time_t next_rewrite_time = 0;
- unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list);
- if (incr_aof_num >= AOF_REWRITE_LIMITE_THRESHOLD) {
+ if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) {
+ /* We may be recovering from limited state, so reset all states. */
+ next_delay_minutes = 0;
+ next_rewrite_time = 0;
+ return 0;
+ }
+
+ /* if it is in the limiting state, then check if the next_rewrite_time is reached */
+ if (next_rewrite_time != 0) {
if (server.unixtime < next_rewrite_time) {
- limit = 1;
+ return 1;
} else {
- if (limit_delay_minutes == 0) {
- limit = 1;
- limit_delay_minutes = 1;
- } else {
- limit_delay_minutes *= 2;
- }
-
- if (limit_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) {
- limit_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES;
- }
-
- next_rewrite_time = server.unixtime + limit_delay_minutes * 60;
-
- serverLog(LL_WARNING,
- "Background AOF rewrite has repeatedly failed %ld times and triggered the limit, will retry in %d minutes",
- incr_aof_num, limit_delay_minutes);
+ next_rewrite_time = 0;
+ return 0;
}
- } else {
- limit_delay_minutes = 0;
- next_rewrite_time = 0;
}
- return limit;
+ next_delay_minutes = (next_delay_minutes == 0) ? 1 : (next_delay_minutes * 2);
+ if (next_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) {
+ next_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES;
+ }
+
+ next_rewrite_time = server.unixtime + next_delay_minutes * 60;
+ serverLog(LL_WARNING,
+ "Background AOF rewrite has repeatedly failed and triggered the limit, will retry in %d minutes", next_delay_minutes);
+ return 1;
}
/* ----------------------------------------------------------------------------
@@ -1265,7 +1293,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON ||
- server.child_type == CHILD_TYPE_AOF)
+ (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
@@ -1558,7 +1586,7 @@ int loadAppendOnlyFiles(aofManifest *am) {
serverAssert(am != NULL);
int status, ret = C_OK;
long long start;
- off_t total_size = 0;
+ off_t total_size = 0, base_size = 0;
sds aof_name;
int total_num, aof_num = 0, last_file;
@@ -1607,6 +1635,7 @@ int loadAppendOnlyFiles(aofManifest *am) {
serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE);
aof_name = (char*)am->base_aof_info->file_name;
updateLoadingFileName(aof_name);
+ base_size = getAppendOnlyFileSize(aof_name, NULL);
last_file = ++aof_num == total_num;
start = ustime();
ret = loadSingleAppendOnlyFile(aof_name);
@@ -1659,7 +1688,16 @@ int loadAppendOnlyFiles(aofManifest *am) {
}
server.aof_current_size = total_size;
- server.aof_rewrite_base_size = server.aof_current_size;
+ /* Ideally, the aof_rewrite_base_size variable should hold the size of the
+ * AOF when the last rewrite ended, this should include the size of the
+ * incremental file that was created during the rewrite since otherwise we
+ * risk the next automatic rewrite to happen too soon (or immediately if
+ * auto-aof-rewrite-percentage is low). However, since we do not persist
+ * aof_rewrite_base_size information anywhere, we initialize it on restart
+ * to the size of BASE AOF file. This might cause the first AOFRW to be
+ * executed early, but that shouldn't be a problem since everything will be
+ * fine after the first AOFRW. */
+ server.aof_rewrite_base_size = base_size;
server.aof_fsync_offset = server.aof_current_size;
cleanup:
@@ -2393,6 +2431,9 @@ void bgrewriteaofCommand(client *c) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess() || server.in_exec) {
server.aof_rewrite_scheduled = 1;
+ /* When manually triggering AOFRW we reset the count
+ * so that it can be executed immediately. */
+ server.stat_aofrw_consecutive_failures = 0;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
@@ -2476,7 +2517,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
char tmpfile[256];
long long now = ustime();
- sds new_base_filename;
+ sds new_base_filepath = NULL;
+ sds new_incr_filepath = NULL;
aofManifest *temp_am;
mstime_t latency;
@@ -2493,9 +2535,9 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Get a new BASE file name and mark the previous (if we have)
* as the HISTORY type. */
- new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
+ sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am);
serverAssert(new_base_filename != NULL);
- sds new_base_filepath = makePath(server.aof_dirname, new_base_filename);
+ new_base_filepath = makePath(server.aof_dirname, new_base_filename);
/* Rename the temporary aof file to 'new_base_filename'. */
latencyStartMonitor(latency);
@@ -2503,7 +2545,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
- new_base_filename,
+ new_base_filepath,
strerror(errno));
aofManifestFree(temp_am);
sdsfree(new_base_filepath);
@@ -2512,6 +2554,34 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename", latency);
+ /* Rename the temporary incr aof file to 'new_incr_filename'. */
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ /* Get temporary incr aof name. */
+ sds temp_incr_aof_name = getTempIncrAofName();
+ sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
+ sdsfree(temp_incr_aof_name);
+ /* Get next new incr aof name. */
+ sds new_incr_filename = getNewIncrAofName(temp_am);
+ new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
+ latencyStartMonitor(latency);
+ if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
+ serverLog(LL_WARNING,
+ "Error trying to rename the temporary incr AOF file %s into %s: %s",
+ temp_incr_filepath,
+ new_incr_filepath,
+ strerror(errno));
+ bg_unlink(new_base_filepath);
+ sdsfree(new_base_filepath);
+ aofManifestFree(temp_am);
+ sdsfree(temp_incr_filepath);
+ sdsfree(new_incr_filepath);
+ goto cleanup;
+ }
+ latencyEndMonitor(latency);
+ latencyAddSampleIfNeeded("aof-rename", latency);
+ sdsfree(temp_incr_filepath);
+ }
+
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
markRewrittenIncrAofAsHistory(temp_am);
@@ -2521,9 +2591,14 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
bg_unlink(new_base_filepath);
aofManifestFree(temp_am);
sdsfree(new_base_filepath);
+ if (new_incr_filepath) {
+ bg_unlink(new_incr_filepath);
+ sdsfree(new_incr_filepath);
+ }
goto cleanup;
}
sdsfree(new_base_filepath);
+ if (new_incr_filepath) sdsfree(new_incr_filepath);
/* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */
aofManifestFreeAndUpdate(temp_am);
@@ -2542,6 +2617,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
aofDelHistoryFiles();
server.aof_lastbgrewrite_status = C_OK;
+ server.stat_aofrw_consecutive_failures = 0;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
@@ -2552,14 +2628,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = C_ERR;
+ server.stat_aofrw_consecutive_failures++;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* triggering an error condition. */
- if (bysignal != SIGUSR1)
+ if (bysignal != SIGUSR1) {
server.aof_lastbgrewrite_status = C_ERR;
+ server.stat_aofrw_consecutive_failures++;
+ }
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
@@ -2567,6 +2646,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
cleanup:
aofRemoveTempFile(server.child_pid);
+ /* Clear AOF buffer and delete temp incr aof for next rewrite. */
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ sdsfree(server.aof_buf);
+ server.aof_buf = sdsempty();
+ aofDelTempIncrAofFile();
+ }
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
diff --git a/src/bitops.c b/src/bitops.c
index 8a6dee44d..e35001937 100644
--- a/src/bitops.c
+++ b/src/bitops.c
@@ -430,7 +430,7 @@ int getBitOffsetFromArgument(client *c, robj *o, uint64_t *offset, int hash, int
if (usehash) loffset *= bits;
/* Limit offset to server.proto_max_bulk_len (512MB in bytes by default) */
- if (loffset < 0 || (!(c->flags & CLIENT_MASTER) && (loffset >> 3) >= server.proto_max_bulk_len))
+ if (loffset < 0 || (!mustObeyClient(c) && (loffset >> 3) >= server.proto_max_bulk_len))
{
addReplyError(c,err);
return C_ERR;
@@ -1002,7 +1002,7 @@ void bitposCommand(client *c) {
}
}
-/* BITFIELD key subcommmand-1 arg ... subcommand-2 arg ... subcommand-N ...
+/* BITFIELD key subcommand-1 arg ... subcommand-2 arg ... subcommand-N ...
*
* Supported subcommands:
*
diff --git a/src/cli_common.c b/src/cli_common.c
index 33069017b..7b4775cde 100644
--- a/src/cli_common.c
+++ b/src/cli_common.c
@@ -390,7 +390,7 @@ sds escapeJsonString(sds s, const char *p, size_t len) {
case '\t': s = sdscatlen(s,"\\t",2); break;
case '\b': s = sdscatlen(s,"\\b",2); break;
default:
- s = sdscatprintf(s,(*p >= 0 && *p <= 0x1f) ? "\\u%04x" : "%c",*p);
+ s = sdscatprintf(s,*(unsigned char *)p <= 0x1f ? "\\u%04x" : "%c",*p);
}
p++;
}
diff --git a/src/cluster.c b/src/cluster.c
index 701871b36..adad07e19 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -960,7 +960,6 @@ clusterNode *createClusterNode(char *nodename, int flags) {
memset(node->slots,0,sizeof(node->slots));
node->slot_info_pairs = NULL;
node->slot_info_pairs_count = 0;
- node->slot_info_pairs_alloc = 0;
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
@@ -2507,11 +2506,7 @@ int clusterProcessPacket(clusterLink *link) {
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
- if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
- pubsubPublishMessageShard(channel, message);
- } else {
- pubsubPublishMessage(channel,message);
- }
+ pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
@@ -3200,20 +3195,19 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
- * For now we do very little, just propagating PUBLISH messages across the whole
+ * If `sharded` is 0:
+ * For now we do very little, just propagating [S]PUBLISH messages across the whole
* cluster. In the future we'll try to get smarter and avoiding propagating those
* messages to hosts without receives for a given channel.
- * -------------------------------------------------------------------------- */
-void clusterPropagatePublish(robj *channel, robj *message) {
- clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
-}
-
-/* -----------------------------------------------------------------------------
- * CLUSTER Pub/Sub shard support
- *
+ * Otherwise:
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
-void clusterPropagatePublishShard(robj *channel, robj *message) {
+void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
+ if (!sharded) {
+ clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
+ return;
+ }
+
list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
if (listLength(nodes_for_slot) != 0) {
listIter li;
@@ -4726,13 +4720,10 @@ void clusterGenNodesSlotsInfo(int filter) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
if (!(n->flags & filter)) {
- if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) {
- if (n->slot_info_pairs_alloc == 0)
- n->slot_info_pairs_alloc = 8;
- else
- n->slot_info_pairs_alloc *= 2;
- n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t));
+ if (!n->slot_info_pairs) {
+ n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
}
+ serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
n->slot_info_pairs[n->slot_info_pairs_count++] = start;
n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
}
@@ -4747,7 +4738,6 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) {
zfree(n->slot_info_pairs);
n->slot_info_pairs = NULL;
n->slot_info_pairs_count = 0;
- n->slot_info_pairs_alloc = 0;
}
/* Generate a csv-alike representation of the nodes we are aware of,
diff --git a/src/cluster.h b/src/cluster.h
index 27e9e7770..1349a7a92 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -120,7 +120,6 @@ typedef struct clusterNode {
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
- int slot_info_pairs_alloc; /* Allocated number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
@@ -385,8 +384,7 @@ void migrateCloseTimedoutSockets(void);
int verifyClusterConfigWithData(void);
unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
-void clusterPropagatePublish(robj *channel, robj *message);
-void clusterPropagatePublishShard(robj *channel, robj *message);
+void clusterPropagatePublish(robj *channel, robj *message, int sharded);
unsigned int keyHashSlot(char *key, int keylen);
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
diff --git a/src/commands.c b/src/commands.c
index efc159ad8..dfec3cdea 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -47,44 +47,62 @@ struct redisCommandArg BITCOUNT_Args[] = {
/* BITFIELD tips */
#define BITFIELD_tips NULL
-/* BITFIELD encoding_offset argument table */
-struct redisCommandArg BITFIELD_encoding_offset_Subargs[] = {
+/* BITFIELD operation encoding_offset argument table */
+struct redisCommandArg BITFIELD_operation_encoding_offset_Subargs[] = {
{"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
-/* BITFIELD encoding_offset_value argument table */
-struct redisCommandArg BITFIELD_encoding_offset_value_Subargs[] = {
+/* BITFIELD operation write wrap_sat_fail argument table */
+struct redisCommandArg BITFIELD_operation_write_wrap_sat_fail_Subargs[] = {
+{"wrap",ARG_TYPE_PURE_TOKEN,-1,"WRAP",NULL,NULL,CMD_ARG_NONE},
+{"sat",ARG_TYPE_PURE_TOKEN,-1,"SAT",NULL,NULL,CMD_ARG_NONE},
+{"fail",ARG_TYPE_PURE_TOKEN,-1,"FAIL",NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
+/* BITFIELD operation write write_operation encoding_offset_value argument table */
+struct redisCommandArg BITFIELD_operation_write_write_operation_encoding_offset_value_Subargs[] = {
{"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"value",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
-/* BITFIELD encoding_offset_increment argument table */
-struct redisCommandArg BITFIELD_encoding_offset_increment_Subargs[] = {
+/* BITFIELD operation write write_operation encoding_offset_increment argument table */
+struct redisCommandArg BITFIELD_operation_write_write_operation_encoding_offset_increment_Subargs[] = {
{"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"increment",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
-/* BITFIELD wrap_sat_fail argument table */
-struct redisCommandArg BITFIELD_wrap_sat_fail_Subargs[] = {
-{"wrap",ARG_TYPE_PURE_TOKEN,-1,"WRAP",NULL,NULL,CMD_ARG_NONE},
-{"sat",ARG_TYPE_PURE_TOKEN,-1,"SAT",NULL,NULL,CMD_ARG_NONE},
-{"fail",ARG_TYPE_PURE_TOKEN,-1,"FAIL",NULL,NULL,CMD_ARG_NONE},
+/* BITFIELD operation write write_operation argument table */
+struct redisCommandArg BITFIELD_operation_write_write_operation_Subargs[] = {
+{"encoding_offset_value",ARG_TYPE_BLOCK,-1,"SET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_encoding_offset_value_Subargs},
+{"encoding_offset_increment",ARG_TYPE_BLOCK,-1,"INCRBY",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_encoding_offset_increment_Subargs},
+{0}
+};
+
+/* BITFIELD operation write argument table */
+struct redisCommandArg BITFIELD_operation_write_Subargs[] = {
+{"wrap_sat_fail",ARG_TYPE_ONEOF,-1,"OVERFLOW",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_operation_write_wrap_sat_fail_Subargs},
+{"write_operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_Subargs},
+{0}
+};
+
+/* BITFIELD operation argument table */
+struct redisCommandArg BITFIELD_operation_Subargs[] = {
+{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_encoding_offset_Subargs},
+{"write",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_Subargs},
{0}
};
/* BITFIELD argument table */
struct redisCommandArg BITFIELD_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
-{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_Subargs},
-{"encoding_offset_value",ARG_TYPE_BLOCK,-1,"SET",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_value_Subargs},
-{"encoding_offset_increment",ARG_TYPE_BLOCK,-1,"INCRBY",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_increment_Subargs},
-{"wrap_sat_fail",ARG_TYPE_ONEOF,-1,"OVERFLOW",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_wrap_sat_fail_Subargs},
+{"operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,.subargs=BITFIELD_operation_Subargs},
{0}
};
@@ -106,7 +124,7 @@ struct redisCommandArg BITFIELD_RO_encoding_offset_Subargs[] = {
/* BITFIELD_RO argument table */
struct redisCommandArg BITFIELD_RO_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
-{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_RO_encoding_offset_Subargs},
+{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_MULTIPLE,.subargs=BITFIELD_RO_encoding_offset_Subargs},
{0}
};
@@ -1313,13 +1331,20 @@ struct redisCommandArg MIGRATE_key_or_empty_string_Subargs[] = {
{0}
};
-/* MIGRATE username_password argument table */
-struct redisCommandArg MIGRATE_username_password_Subargs[] = {
+/* MIGRATE authentication username_password argument table */
+struct redisCommandArg MIGRATE_authentication_username_password_Subargs[] = {
{"username",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"password",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
+/* MIGRATE authentication argument table */
+struct redisCommandArg MIGRATE_authentication_Subargs[] = {
+{"password",ARG_TYPE_STRING,-1,"AUTH",NULL,"4.0.7",CMD_ARG_OPTIONAL},
+{"username_password",ARG_TYPE_BLOCK,-1,"AUTH2",NULL,"6.0.0",CMD_ARG_OPTIONAL,.subargs=MIGRATE_authentication_username_password_Subargs},
+{0}
+};
+
/* MIGRATE argument table */
struct redisCommandArg MIGRATE_Args[] = {
{"host",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
@@ -1329,8 +1354,7 @@ struct redisCommandArg MIGRATE_Args[] = {
{"timeout",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"copy",ARG_TYPE_PURE_TOKEN,-1,"COPY",NULL,"3.0.0",CMD_ARG_OPTIONAL},
{"replace",ARG_TYPE_PURE_TOKEN,-1,"REPLACE",NULL,"3.0.0",CMD_ARG_OPTIONAL},
-{"password",ARG_TYPE_STRING,-1,"AUTH",NULL,"4.0.7",CMD_ARG_OPTIONAL},
-{"username_password",ARG_TYPE_BLOCK,-1,"AUTH2",NULL,"6.0.0",CMD_ARG_OPTIONAL,.subargs=MIGRATE_username_password_Subargs},
+{"authentication",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=MIGRATE_authentication_Subargs},
{"key",ARG_TYPE_KEY,1,"KEYS",NULL,"3.0.6",CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE},
{0}
};
@@ -2089,15 +2113,22 @@ struct redisCommandArg GEORADIUS_RO_Args[] = {
/* GEOSEARCH tips */
#define GEOSEARCH_tips NULL
-/* GEOSEARCH longitude_latitude argument table */
-struct redisCommandArg GEOSEARCH_longitude_latitude_Subargs[] = {
+/* GEOSEARCH from longitude_latitude argument table */
+struct redisCommandArg GEOSEARCH_from_longitude_latitude_Subargs[] = {
{"longitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"latitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
-/* GEOSEARCH circle unit argument table */
-struct redisCommandArg GEOSEARCH_circle_unit_Subargs[] = {
+/* GEOSEARCH from argument table */
+struct redisCommandArg GEOSEARCH_from_Subargs[] = {
+{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_NONE},
+{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_from_longitude_latitude_Subargs},
+{0}
+};
+
+/* GEOSEARCH by circle unit argument table */
+struct redisCommandArg GEOSEARCH_by_circle_unit_Subargs[] = {
{"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE},
{"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE},
{"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE},
@@ -2105,15 +2136,15 @@ struct redisCommandArg GEOSEARCH_circle_unit_Subargs[] = {
{0}
};
-/* GEOSEARCH circle argument table */
-struct redisCommandArg GEOSEARCH_circle_Subargs[] = {
+/* GEOSEARCH by circle argument table */
+struct redisCommandArg GEOSEARCH_by_circle_Subargs[] = {
{"radius",ARG_TYPE_DOUBLE,-1,"BYRADIUS",NULL,NULL,CMD_ARG_NONE},
-{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_circle_unit_Subargs},
+{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_circle_unit_Subargs},
{0}
};
-/* GEOSEARCH box unit argument table */
-struct redisCommandArg GEOSEARCH_box_unit_Subargs[] = {
+/* GEOSEARCH by box unit argument table */
+struct redisCommandArg GEOSEARCH_by_box_unit_Subargs[] = {
{"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE},
{"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE},
{"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE},
@@ -2121,11 +2152,18 @@ struct redisCommandArg GEOSEARCH_box_unit_Subargs[] = {
{0}
};
-/* GEOSEARCH box argument table */
-struct redisCommandArg GEOSEARCH_box_Subargs[] = {
+/* GEOSEARCH by box argument table */
+struct redisCommandArg GEOSEARCH_by_box_Subargs[] = {
{"width",ARG_TYPE_DOUBLE,-1,"BYBOX",NULL,NULL,CMD_ARG_NONE},
{"height",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
-{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_box_unit_Subargs},
+{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_box_unit_Subargs},
+{0}
+};
+
+/* GEOSEARCH by argument table */
+struct redisCommandArg GEOSEARCH_by_Subargs[] = {
+{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_circle_Subargs},
+{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_box_Subargs},
{0}
};
@@ -2146,10 +2184,8 @@ struct redisCommandArg GEOSEARCH_count_Subargs[] = {
/* GEOSEARCH argument table */
struct redisCommandArg GEOSEARCH_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
-{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_OPTIONAL},
-{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_longitude_latitude_Subargs},
-{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_circle_Subargs},
-{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_box_Subargs},
+{"from",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_from_Subargs},
+{"by",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_Subargs},
{"order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_order_Subargs},
{"count",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_count_Subargs},
{"withcoord",ARG_TYPE_PURE_TOKEN,-1,"WITHCOORD",NULL,NULL,CMD_ARG_OPTIONAL},
@@ -2166,15 +2202,22 @@ struct redisCommandArg GEOSEARCH_Args[] = {
/* GEOSEARCHSTORE tips */
#define GEOSEARCHSTORE_tips NULL
-/* GEOSEARCHSTORE longitude_latitude argument table */
-struct redisCommandArg GEOSEARCHSTORE_longitude_latitude_Subargs[] = {
+/* GEOSEARCHSTORE from longitude_latitude argument table */
+struct redisCommandArg GEOSEARCHSTORE_from_longitude_latitude_Subargs[] = {
{"longitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"latitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
-/* GEOSEARCHSTORE circle unit argument table */
-struct redisCommandArg GEOSEARCHSTORE_circle_unit_Subargs[] = {
+/* GEOSEARCHSTORE from argument table */
+struct redisCommandArg GEOSEARCHSTORE_from_Subargs[] = {
+{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_NONE},
+{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_from_longitude_latitude_Subargs},
+{0}
+};
+
+/* GEOSEARCHSTORE by circle unit argument table */
+struct redisCommandArg GEOSEARCHSTORE_by_circle_unit_Subargs[] = {
{"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE},
{"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE},
{"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE},
@@ -2182,15 +2225,15 @@ struct redisCommandArg GEOSEARCHSTORE_circle_unit_Subargs[] = {
{0}
};
-/* GEOSEARCHSTORE circle argument table */
-struct redisCommandArg GEOSEARCHSTORE_circle_Subargs[] = {
+/* GEOSEARCHSTORE by circle argument table */
+struct redisCommandArg GEOSEARCHSTORE_by_circle_Subargs[] = {
{"radius",ARG_TYPE_DOUBLE,-1,"BYRADIUS",NULL,NULL,CMD_ARG_NONE},
-{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_circle_unit_Subargs},
+{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_circle_unit_Subargs},
{0}
};
-/* GEOSEARCHSTORE box unit argument table */
-struct redisCommandArg GEOSEARCHSTORE_box_unit_Subargs[] = {
+/* GEOSEARCHSTORE by box unit argument table */
+struct redisCommandArg GEOSEARCHSTORE_by_box_unit_Subargs[] = {
{"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE},
{"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE},
{"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE},
@@ -2198,11 +2241,18 @@ struct redisCommandArg GEOSEARCHSTORE_box_unit_Subargs[] = {
{0}
};
-/* GEOSEARCHSTORE box argument table */
-struct redisCommandArg GEOSEARCHSTORE_box_Subargs[] = {
+/* GEOSEARCHSTORE by box argument table */
+struct redisCommandArg GEOSEARCHSTORE_by_box_Subargs[] = {
{"width",ARG_TYPE_DOUBLE,-1,"BYBOX",NULL,NULL,CMD_ARG_NONE},
{"height",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE},
-{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_box_unit_Subargs},
+{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_box_unit_Subargs},
+{0}
+};
+
+/* GEOSEARCHSTORE by argument table */
+struct redisCommandArg GEOSEARCHSTORE_by_Subargs[] = {
+{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_circle_Subargs},
+{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_box_Subargs},
{0}
};
@@ -2224,10 +2274,8 @@ struct redisCommandArg GEOSEARCHSTORE_count_Subargs[] = {
struct redisCommandArg GEOSEARCHSTORE_Args[] = {
{"destination",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
{"source",ARG_TYPE_KEY,1,NULL,NULL,NULL,CMD_ARG_NONE},
-{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_OPTIONAL},
-{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_longitude_latitude_Subargs},
-{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_circle_Subargs},
-{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_box_Subargs},
+{"from",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_from_Subargs},
+{"by",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_Subargs},
{"order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_order_Subargs},
{"count",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_count_Subargs},
{"storedist",ARG_TYPE_PURE_TOKEN,-1,"STOREDIST",NULL,NULL,CMD_ARG_OPTIONAL},
@@ -4790,7 +4838,7 @@ struct redisCommandArg MODULE_LOADEX_args_Subargs[] = {
/* MODULE LOADEX argument table */
struct redisCommandArg MODULE_LOADEX_Args[] = {
{"path",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
-{"configs",ARG_TYPE_BLOCK,-1,"CONFIG",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,.subargs=MODULE_LOADEX_configs_Subargs},
+{"configs",ARG_TYPE_BLOCK,-1,"CONFIG",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE|CMD_ARG_MULTIPLE_TOKEN,.subargs=MODULE_LOADEX_configs_Subargs},
{"args",ARG_TYPE_BLOCK,-1,"ARGS",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,.subargs=MODULE_LOADEX_args_Subargs},
{0}
};
@@ -6919,6 +6967,13 @@ commandHistory SET_History[] = {
/* SET tips */
#define SET_tips NULL
+/* SET condition argument table */
+struct redisCommandArg SET_condition_Subargs[] = {
+{"nx",ARG_TYPE_PURE_TOKEN,-1,"NX",NULL,NULL,CMD_ARG_NONE},
+{"xx",ARG_TYPE_PURE_TOKEN,-1,"XX",NULL,NULL,CMD_ARG_NONE},
+{0}
+};
+
/* SET expiration argument table */
struct redisCommandArg SET_expiration_Subargs[] = {
{"seconds",ARG_TYPE_INTEGER,-1,"EX",NULL,"2.6.12",CMD_ARG_NONE},
@@ -6929,20 +6984,13 @@ struct redisCommandArg SET_expiration_Subargs[] = {
{0}
};
-/* SET condition argument table */
-struct redisCommandArg SET_condition_Subargs[] = {
-{"nx",ARG_TYPE_PURE_TOKEN,-1,"NX",NULL,NULL,CMD_ARG_NONE},
-{"xx",ARG_TYPE_PURE_TOKEN,-1,"XX",NULL,NULL,CMD_ARG_NONE},
-{0}
-};
-
/* SET argument table */
struct redisCommandArg SET_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
{"value",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
-{"expiration",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SET_expiration_Subargs},
{"condition",ARG_TYPE_ONEOF,-1,NULL,NULL,"2.6.12",CMD_ARG_OPTIONAL,.subargs=SET_condition_Subargs},
{"get",ARG_TYPE_PURE_TOKEN,-1,"GET",NULL,"6.2.0",CMD_ARG_OPTIONAL},
+{"expiration",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SET_expiration_Subargs},
{0}
};
@@ -7268,7 +7316,7 @@ struct redisCommand redisCommandTable[] = {
{"zpopmin","Remove and return members with the lowest scores in a sorted set","O(log(N)*M) with N being the number of elements in the sorted set, and M being the number of elements popped.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZPOPMIN_History,ZPOPMIN_tips,zpopminCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZPOPMIN_Args},
{"zrandmember","Get one or multiple random elements from a sorted set","O(N) where N is the number of elements returned","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANDMEMBER_History,ZRANDMEMBER_tips,zrandmemberCommand,-2,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANDMEMBER_Args},
{"zrange","Return a range of members in a sorted set","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements returned.","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANGE_History,ZRANGE_tips,zrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGE_Args},
-{"zrangebylex","Return a range of members in a sorted set, by lexicographical range","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","2.8.9",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYSCORE` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYLEX_History,ZRANGEBYLEX_tips,zrangebylexCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYLEX_Args},
+{"zrangebylex","Return a range of members in a sorted set, by lexicographical range","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","2.8.9",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYLEX` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYLEX_History,ZRANGEBYLEX_tips,zrangebylexCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYLEX_Args},
{"zrangebyscore","Return a range of members in a sorted set, by score","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","1.0.5",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYSCORE` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYSCORE_History,ZRANGEBYSCORE_tips,zrangebyscoreCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYSCORE_Args},
{"zrangestore","Store a range of members from sorted set into another key","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements stored into the destination key.","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANGESTORE_History,ZRANGESTORE_tips,zrangestoreCommand,-5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGESTORE_Args},
{"zrank","Determine the index of a member in a sorted set","O(log(N))","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANK_History,ZRANK_tips,zrankCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANK_Args},
diff --git a/src/commands/bitfield.json b/src/commands/bitfield.json
index 1f667ce05..5c6fc6105 100644
--- a/src/commands/bitfield.json
+++ b/src/commands/bitfield.json
@@ -44,84 +44,100 @@
"key_spec_index": 0
},
{
- "token": "GET",
- "name": "encoding_offset",
- "type": "block",
- "optional": true,
- "arguments": [
- {
- "name": "encoding",
- "type": "string"
- },
- {
- "name": "offset",
- "type": "integer"
- }
- ]
- },
- {
- "token": "SET",
- "name": "encoding_offset_value",
- "type": "block",
- "optional": true,
- "arguments": [
- {
- "name": "encoding",
- "type": "string"
- },
- {
- "name": "offset",
- "type": "integer"
- },
- {
- "name": "value",
- "type": "integer"
- }
- ]
- },
- {
- "token": "INCRBY",
- "name": "encoding_offset_increment",
- "type": "block",
- "optional": true,
- "arguments": [
- {
- "name": "encoding",
- "type": "string"
- },
- {
- "name": "offset",
- "type": "integer"
- },
- {
- "name": "increment",
- "type": "integer"
- }
- ]
- },
- {
- "token": "OVERFLOW",
- "name": "wrap_sat_fail",
+ "name": "operation",
"type": "oneof",
- "optional": true,
+ "multiple": "true",
"arguments": [
{
- "name": "wrap",
- "type": "pure-token",
- "token": "WRAP"
- },
- {
- "name": "sat",
- "type": "pure-token",
- "token": "SAT"
+ "token": "GET",
+ "name": "encoding_offset",
+ "type": "block",
+ "arguments": [
+ {
+ "name": "encoding",
+ "type": "string"
+ },
+ {
+ "name": "offset",
+ "type": "integer"
+ }
+ ]
},
{
- "name": "fail",
- "type": "pure-token",
- "token": "FAIL"
+ "name": "write",
+ "type": "block",
+ "arguments": [
+ {
+ "token": "OVERFLOW",
+ "name": "wrap_sat_fail",
+ "type": "oneof",
+ "optional": true,
+ "arguments": [
+ {
+ "name": "wrap",
+ "type": "pure-token",
+ "token": "WRAP"
+ },
+ {
+ "name": "sat",
+ "type": "pure-token",
+ "token": "SAT"
+ },
+ {
+ "name": "fail",
+ "type": "pure-token",
+ "token": "FAIL"
+ }
+ ]
+ },
+ {
+ "name": "write_operation",
+ "type": "oneof",
+ "arguments": [
+ {
+ "token": "SET",
+ "name": "encoding_offset_value",
+ "type": "block",
+ "arguments": [
+ {
+ "name": "encoding",
+ "type": "string"
+ },
+ {
+ "name": "offset",
+ "type": "integer"
+ },
+ {
+ "name": "value",
+ "type": "integer"
+ }
+ ]
+ },
+ {
+ "token": "INCRBY",
+ "name": "encoding_offset_increment",
+ "type": "block",
+ "arguments": [
+ {
+ "name": "encoding",
+ "type": "string"
+ },
+ {
+ "name": "offset",
+ "type": "integer"
+ },
+ {
+ "name": "increment",
+ "type": "integer"
+ }
+ ]
+ }
+ ]
+ }
+ ]
}
]
}
]
}
-}
+} \ No newline at end of file
diff --git a/src/commands/bitfield_ro.json b/src/commands/bitfield_ro.json
index a8ec85c29..951fde0f5 100644
--- a/src/commands/bitfield_ro.json
+++ b/src/commands/bitfield_ro.json
@@ -43,6 +43,7 @@
"token": "GET",
"name": "encoding_offset",
"type": "block",
+ "multiple": "true",
"arguments": [
{
"name": "encoding",
diff --git a/src/commands/geosearch.json b/src/commands/geosearch.json
index 9730d214e..a83dcaadb 100644
--- a/src/commands/geosearch.json
+++ b/src/commands/geosearch.json
@@ -39,102 +39,110 @@
"key_spec_index": 0
},
{
- "token": "FROMMEMBER",
- "name": "member",
- "type": "string",
- "optional": true
- },
- {
- "token": "FROMLONLAT",
- "name": "longitude_latitude",
- "type": "block",
- "optional": true,
- "arguments": [
- {
- "name": "longitude",
- "type": "double"
- },
- {
- "name": "latitude",
- "type": "double"
- }
- ]
- },
- {
- "name": "circle",
- "type": "block",
- "optional": true,
+ "name": "from",
+ "type": "oneof",
"arguments": [
{
- "token": "BYRADIUS",
- "name": "radius",
- "type": "double"
+ "token": "FROMMEMBER",
+ "name": "member",
+ "type": "string"
},
{
- "name": "unit",
- "type": "oneof",
+ "token": "FROMLONLAT",
+ "name": "longitude_latitude",
+ "type": "block",
"arguments": [
{
- "name": "m",
- "type": "pure-token",
- "token": "m"
+ "name": "longitude",
+ "type": "double"
},
{
- "name": "km",
- "type": "pure-token",
- "token": "km"
- },
- {
- "name": "ft",
- "type": "pure-token",
- "token": "ft"
- },
- {
- "name": "mi",
- "type": "pure-token",
- "token": "mi"
+ "name": "latitude",
+ "type": "double"
}
]
}
]
},
{
- "name": "box",
- "type": "block",
- "optional": true,
+ "name": "by",
+ "type": "oneof",
"arguments": [
{
- "token": "BYBOX",
- "name": "width",
- "type": "double"
- },
- {
- "name": "height",
- "type": "double"
- },
- {
- "name": "unit",
- "type": "oneof",
+ "name": "circle",
+ "type": "block",
"arguments": [
{
- "name": "m",
- "type": "pure-token",
- "token": "m"
+ "token": "BYRADIUS",
+ "name": "radius",
+ "type": "double"
},
{
- "name": "km",
- "type": "pure-token",
- "token": "km"
+ "name": "unit",
+ "type": "oneof",
+ "arguments": [
+ {
+ "name": "m",
+ "type": "pure-token",
+ "token": "m"
+ },
+ {
+ "name": "km",
+ "type": "pure-token",
+ "token": "km"
+ },
+ {
+ "name": "ft",
+ "type": "pure-token",
+ "token": "ft"
+ },
+ {
+ "name": "mi",
+ "type": "pure-token",
+ "token": "mi"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "box",
+ "type": "block",
+ "arguments": [
+ {
+ "token": "BYBOX",
+ "name": "width",
+ "type": "double"
},
{
- "name": "ft",
- "type": "pure-token",
- "token": "ft"
+ "name": "height",
+ "type": "double"
},
{
- "name": "mi",
- "type": "pure-token",
- "token": "mi"
+ "name": "unit",
+ "type": "oneof",
+ "arguments": [
+ {
+ "name": "m",
+ "type": "pure-token",
+ "token": "m"
+ },
+ {
+ "name": "km",
+ "type": "pure-token",
+ "token": "km"
+ },
+ {
+ "name": "ft",
+ "type": "pure-token",
+ "token": "ft"
+ },
+ {
+ "name": "mi",
+ "type": "pure-token",
+ "token": "mi"
+ }
+ ]
}
]
}
@@ -195,4 +203,4 @@
}
]
}
-}
+} \ No newline at end of file
diff --git a/src/commands/geosearchstore.json b/src/commands/geosearchstore.json
index a44ebfe86..16db5d37e 100644
--- a/src/commands/geosearchstore.json
+++ b/src/commands/geosearchstore.json
@@ -63,102 +63,110 @@
"key_spec_index": 1
},
{
- "token": "FROMMEMBER",
- "name": "member",
- "type": "string",
- "optional": true
- },
- {
- "token": "FROMLONLAT",
- "name": "longitude_latitude",
- "type": "block",
- "optional": true,
- "arguments": [
- {
- "name": "longitude",
- "type": "double"
- },
- {
- "name": "latitude",
- "type": "double"
- }
- ]
- },
- {
- "name": "circle",
- "type": "block",
- "optional": true,
+ "name": "from",
+ "type": "oneof",
"arguments": [
{
- "token": "BYRADIUS",
- "name": "radius",
- "type": "double"
+ "token": "FROMMEMBER",
+ "name": "member",
+ "type": "string"
},
{
- "name": "unit",
- "type": "oneof",
+ "token": "FROMLONLAT",
+ "name": "longitude_latitude",
+ "type": "block",
"arguments": [
{
- "name": "m",
- "type": "pure-token",
- "token": "m"
+ "name": "longitude",
+ "type": "double"
},
{
- "name": "km",
- "type": "pure-token",
- "token": "km"
- },
- {
- "name": "ft",
- "type": "pure-token",
- "token": "ft"
- },
- {
- "name": "mi",
- "type": "pure-token",
- "token": "mi"
+ "name": "latitude",
+ "type": "double"
}
]
}
]
},
{
- "name": "box",
- "type": "block",
- "optional": true,
+ "name": "by",
+ "type": "oneof",
"arguments": [
{
- "token": "BYBOX",
- "name": "width",
- "type": "double"
- },
- {
- "name": "height",
- "type": "double"
- },
- {
- "name": "unit",
- "type": "oneof",
+ "name": "circle",
+ "type": "block",
"arguments": [
{
- "name": "m",
- "type": "pure-token",
- "token": "m"
+ "token": "BYRADIUS",
+ "name": "radius",
+ "type": "double"
},
{
- "name": "km",
- "type": "pure-token",
- "token": "km"
+ "name": "unit",
+ "type": "oneof",
+ "arguments": [
+ {
+ "name": "m",
+ "type": "pure-token",
+ "token": "m"
+ },
+ {
+ "name": "km",
+ "type": "pure-token",
+ "token": "km"
+ },
+ {
+ "name": "ft",
+ "type": "pure-token",
+ "token": "ft"
+ },
+ {
+ "name": "mi",
+ "type": "pure-token",
+ "token": "mi"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "box",
+ "type": "block",
+ "arguments": [
+ {
+ "token": "BYBOX",
+ "name": "width",
+ "type": "double"
},
{
- "name": "ft",
- "type": "pure-token",
- "token": "ft"
+ "name": "height",
+ "type": "double"
},
{
- "name": "mi",
- "type": "pure-token",
- "token": "mi"
+ "name": "unit",
+ "type": "oneof",
+ "arguments": [
+ {
+ "name": "m",
+ "type": "pure-token",
+ "token": "m"
+ },
+ {
+ "name": "km",
+ "type": "pure-token",
+ "token": "km"
+ },
+ {
+ "name": "ft",
+ "type": "pure-token",
+ "token": "ft"
+ },
+ {
+ "name": "mi",
+ "type": "pure-token",
+ "token": "mi"
+ }
+ ]
}
]
}
@@ -207,4 +215,4 @@
}
]
}
-}
+} \ No newline at end of file
diff --git a/src/commands/migrate.json b/src/commands/migrate.json
index d07fe4b15..b9a52aa69 100644
--- a/src/commands/migrate.json
+++ b/src/commands/migrate.json
@@ -125,27 +125,33 @@
"since": "3.0.0"
},
{
- "token": "AUTH",
- "name": "password",
- "type": "string",
- "optional": true,
- "since": "4.0.7"
-
- },
- {
- "token": "AUTH2",
- "name": "username_password",
- "type": "block",
+ "name": "authentication",
+ "type": "oneof",
"optional": true,
- "since": "6.0.0",
"arguments": [
{
- "name": "username",
- "type": "string"
+ "token": "AUTH",
+ "name": "password",
+ "type": "string",
+ "optional": true,
+ "since": "4.0.7"
},
{
- "name": "password",
- "type": "string"
+ "token": "AUTH2",
+ "name": "username_password",
+ "type": "block",
+ "optional": true,
+ "since": "6.0.0",
+ "arguments": [
+ {
+ "name": "username",
+ "type": "string"
+ },
+ {
+ "name": "password",
+ "type": "string"
+ }
+ ]
}
]
},
@@ -160,4 +166,4 @@
}
]
}
-}
+} \ No newline at end of file
diff --git a/src/commands/module-loadex.json b/src/commands/module-loadex.json
index e772cbfe4..97e8f2b58 100644
--- a/src/commands/module-loadex.json
+++ b/src/commands/module-loadex.json
@@ -23,6 +23,7 @@
"token": "CONFIG",
"type": "block",
"multiple": true,
+ "multiple_token": true,
"optional": true,
"arguments": [
{
diff --git a/src/commands/set.json b/src/commands/set.json
index 267ab311a..688d534d7 100644
--- a/src/commands/set.json
+++ b/src/commands/set.json
@@ -66,6 +66,31 @@
"type": "string"
},
{
+ "name": "condition",
+ "type": "oneof",
+ "optional": true,
+ "since": "2.6.12",
+ "arguments": [
+ {
+ "name": "nx",
+ "type": "pure-token",
+ "token": "NX"
+ },
+ {
+ "name": "xx",
+ "type": "pure-token",
+ "token": "XX"
+ }
+ ]
+ },
+ {
+ "name": "get",
+ "token": "GET",
+ "type": "pure-token",
+ "optional": true,
+ "since": "6.2.0"
+ },
+ {
"name": "expiration",
"type": "oneof",
"optional": true,
@@ -101,31 +126,6 @@
"since": "6.0.0"
}
]
- },
- {
- "name": "condition",
- "type": "oneof",
- "optional": true,
- "since": "2.6.12",
- "arguments": [
- {
- "name": "nx",
- "type": "pure-token",
- "token": "NX"
- },
- {
- "name": "xx",
- "type": "pure-token",
- "token": "XX"
- }
- ]
- },
- {
- "name": "get",
- "token": "GET",
- "type": "pure-token",
- "optional": true,
- "since": "6.2.0"
}
]
}
diff --git a/src/commands/zrangebylex.json b/src/commands/zrangebylex.json
index a8aee82cc..75e82bce6 100644
--- a/src/commands/zrangebylex.json
+++ b/src/commands/zrangebylex.json
@@ -7,7 +7,7 @@
"arity": -4,
"function": "zrangebylexCommand",
"deprecated_since": "6.2.0",
- "replaced_by": "`ZRANGE` with the `BYSCORE` argument",
+ "replaced_by": "`ZRANGE` with the `BYLEX` argument",
"doc_flags": [
"DEPRECATED"
],
diff --git a/src/config.c b/src/config.c
index 3b5d4d349..0d435fecb 100644
--- a/src/config.c
+++ b/src/config.c
@@ -94,6 +94,15 @@ configEnum aof_fsync_enum[] = {
{NULL, 0}
};
+configEnum shutdown_on_sig_enum[] = {
+ {"default", 0},
+ {"save", SHUTDOWN_SAVE},
+ {"nosave", SHUTDOWN_NOSAVE},
+ {"now", SHUTDOWN_NOW},
+ {"force", SHUTDOWN_FORCE},
+ {NULL, 0}
+};
+
configEnum repl_diskless_load_enum[] = {
{"disabled", REPL_DISKLESS_LOAD_DISABLED},
{"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY},
@@ -143,6 +152,13 @@ configEnum cluster_preferred_endpoint_type_enum[] = {
{NULL, 0}
};
+configEnum propagation_error_behavior_enum[] = {
+ {"ignore", PROPAGATION_ERR_BEHAVIOR_IGNORE},
+ {"panic", PROPAGATION_ERR_BEHAVIOR_PANIC},
+ {"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS},
+ {NULL, 0}
+};
+
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@@ -276,33 +292,50 @@ static standardConfig *lookupConfig(sds name) {
*----------------------------------------------------------------------------*/
/* Get enum value from name. If there is no match INT_MIN is returned. */
-int configEnumGetValue(configEnum *ce, char *name) {
- while(ce->name != NULL) {
- if (!strcasecmp(ce->name,name)) return ce->val;
- ce++;
+int configEnumGetValue(configEnum *ce, sds *argv, int argc, int bitflags) {
+ if (argc == 0 || (!bitflags && argc != 1)) return INT_MIN;
+ int values = 0;
+ for (int i = 0; i < argc; i++) {
+ int matched = 0;
+ for (configEnum *ceItem = ce; ceItem->name != NULL; ceItem++) {
+ if (!strcasecmp(argv[i],ceItem->name)) {
+ values |= ceItem->val;
+ matched = 1;
+ }
+ }
+ if (!matched) return INT_MIN;
}
- return INT_MIN;
+ return values;
}
-/* Get enum name from value. If no match is found NULL is returned. */
-const char *configEnumGetName(configEnum *ce, int val) {
- while(ce->name != NULL) {
- if (ce->val == val) return ce->name;
- ce++;
+/* Get enum name/s from value. If no matches are found "unknown" is returned. */
+static sds configEnumGetName(configEnum *ce, int values, int bitflags) {
+ sds names = NULL;
+ int matches = 0;
+ for( ; ce->name != NULL; ce++) {
+ if (values == ce->val) { /* Short path for perfect match */
+ sdsfree(names);
+ return sdsnew(ce->name);
+ }
+ if (bitflags && (values & ce->val)) {
+ names = names ? sdscatfmt(names, " %s", ce->name) : sdsnew(ce->name);
+ matches |= ce->val;
+ }
}
- return NULL;
-}
-
-/* Wrapper for configEnumGetName() returning "unknown" instead of NULL if
- * there is no match. */
-const char *configEnumGetNameOrUnknown(configEnum *ce, int val) {
- const char *name = configEnumGetName(ce,val);
- return name ? name : "unknown";
+ if (!names || values != matches) {
+ sdsfree(names);
+ return sdsnew("unknown");
+ }
+ return names;
}
/* Used for INFO generation. */
const char *evictPolicyToString(void) {
- return configEnumGetNameOrUnknown(maxmemory_policy_enum,server.maxmemory_policy);
+ for (configEnum *ce = maxmemory_policy_enum; ce->name != NULL; ce++) {
+ if (server.maxmemory_policy == ce->val)
+ return ce->name;
+ }
+ serverPanic("unknown eviction policy");
}
/*-----------------------------------------------------------------------------
@@ -514,12 +547,15 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) {
queueLoadModule(argv[1],&argv[2],argc-2);
} else if (strchr(argv[0], '.')) {
- if (argc != 2) {
+ if (argc < 2) {
err = "Module config specified without value";
goto loaderr;
}
sds name = sdsdup(argv[0]);
- if (!dictReplace(server.module_configs_queue, name, sdsdup(argv[1]))) sdsfree(name);
+ sds val = sdsdup(argv[1]);
+ for (int i = 2; i < argc; i++)
+ val = sdscatfmt(val, " %S", argv[i]);
+ if (!dictReplace(server.module_configs_queue, name, val)) sdsfree(name);
} else if (!strcasecmp(argv[0],"sentinel")) {
/* argc == 1 is handled by main() as we need to enter the sentinel
* mode ASAP. */
@@ -1297,12 +1333,13 @@ void rewriteConfigOctalOption(struct rewriteConfigState *state, const char *opti
/* Rewrite an enumeration option. It takes as usually state and option name,
* and in addition the enumeration array and the default value for the
* option. */
-void rewriteConfigEnumOption(struct rewriteConfigState *state, const char *option, int value, configEnum *ce, int defval) {
- sds line;
- const char *name = configEnumGetNameOrUnknown(ce,value);
- int force = value != defval;
+void rewriteConfigEnumOption(struct rewriteConfigState *state, const char *option, int value, standardConfig *config) {
+ int multiarg = config->flags & MULTI_ARG_CONFIG;
+ sds names = configEnumGetName(config->data.enumd.enum_value,value,multiarg);
+ sds line = sdscatfmt(sdsempty(),"%s %s",option,names);
+ sdsfree(names);
+ int force = value != config->data.enumd.default_value;
- line = sdscatprintf(sdsempty(),"%s %s",option,name);
rewriteConfigRewriteLine(state,option,line,force);
}
@@ -1821,10 +1858,16 @@ static int sdsConfigSet(standardConfig *config, sds *argv, int argc, const char
UNUSED(argc);
if (config->data.sds.is_valid_fn && !config->data.sds.is_valid_fn(argv[0], err))
return 0;
+
sds prev = config->flags & MODULE_CONFIG ? getModuleStringConfig(config->privdata) : *config->data.sds.config;
sds new = (config->data.string.convert_empty_to_null && (sdslen(argv[0]) == 0)) ? NULL : argv[0];
+
+ /* if prev and new configuration are not equal, set the new one */
if (new != prev && (new == NULL || prev == NULL || sdscmp(prev, new))) {
+ /* If MODULE_CONFIG flag is set, then free temporary prev getModuleStringConfig returned.
+ * Otherwise, free the actual previous config value Redis held (Same action, different reasons) */
sdsfree(prev);
+
if (config->flags & MODULE_CONFIG) {
return setModuleStringConfig(config->privdata, new, err);
}
@@ -1848,7 +1891,7 @@ static sds sdsConfigGet(standardConfig *config) {
static void sdsConfigRewrite(standardConfig *config, const char *name, struct rewriteConfigState *state) {
sds val = config->flags & MODULE_CONFIG ? getModuleStringConfig(config->privdata) : *config->data.sds.config;
rewriteConfigSdsOption(state, name, val, config->data.sds.default_value);
- if (val) sdsfree(val);
+ if ((val) && (config->flags & MODULE_CONFIG)) sdsfree(val);
}
@@ -1885,10 +1928,12 @@ static void enumConfigInit(standardConfig *config) {
}
static int enumConfigSet(standardConfig *config, sds *argv, int argc, const char **err) {
- UNUSED(argc);
- int enumval = configEnumGetValue(config->data.enumd.enum_value, argv[0]);
+ int enumval;
+ int bitflags = !!(config->flags & MULTI_ARG_CONFIG);
+ enumval = configEnumGetValue(config->data.enumd.enum_value, argv, argc, bitflags);
+
if (enumval == INT_MIN) {
- sds enumerr = sdsnew("argument must be one of the following: ");
+ sds enumerr = sdsnew("argument(s) must be one of the following: ");
configEnum *enumNode = config->data.enumd.enum_value;
while(enumNode->name != NULL) {
enumerr = sdscatlen(enumerr, enumNode->name,
@@ -1919,12 +1964,13 @@ static int enumConfigSet(standardConfig *config, sds *argv, int argc, const char
static sds enumConfigGet(standardConfig *config) {
int val = config->flags & MODULE_CONFIG ? getModuleEnumConfig(config->privdata) : *(config->data.enumd.config);
- return sdsnew(configEnumGetNameOrUnknown(config->data.enumd.enum_value,val));
+ int bitflags = !!(config->flags & MULTI_ARG_CONFIG);
+ return configEnumGetName(config->data.enumd.enum_value,val,bitflags);
}
static void enumConfigRewrite(standardConfig *config, const char *name, struct rewriteConfigState *state) {
int val = config->flags & MODULE_CONFIG ? getModuleEnumConfig(config->privdata) : *(config->data.enumd.config);
- rewriteConfigEnumOption(state, name, val, config->data.enumd.enum_value, config->data.enumd.default_value);
+ rewriteConfigEnumOption(state, name, val, config);
}
#define createEnumConfig(name, alias, flags, enum, config_addr, default, is_valid, apply) { \
@@ -2284,6 +2330,16 @@ static int isValidAOFdirname(char *val, const char **err) {
return 1;
}
+static int isValidShutdownOnSigFlags(int val, const char **err) {
+ /* Individual arguments are validated by createEnumConfig logic.
+ * We just need to ensure valid combinations here. */
+ if (val & SHUTDOWN_NOSAVE && val & SHUTDOWN_SAVE) {
+ *err = "shutdown options SAVE and NOSAVE can't be used simultaneously";
+ return 0;
+ }
+ return 1;
+}
+
static int isValidAnnouncedHostname(char *val, const char **err) {
if (strlen(val) >= NET_HOST_STR_LEN) {
*err = "Hostnames must be less than "
@@ -2641,7 +2697,7 @@ static int setConfigOOMScoreAdjValuesOption(standardConfig *config, sds *argv, i
if (*eptr != '\0' || val < -2000 || val > 2000) {
if (err) *err = "Invalid oom-score-adj-values, elements must be between -2000 and 2000.";
- return -1;
+ return 0;
}
values[i] = val;
@@ -2691,7 +2747,7 @@ static int setConfigNotifyKeyspaceEventsOption(standardConfig *config, sds *argv
}
int flags = keyspaceEventsStringToFlags(argv[0]);
if (flags == -1) {
- *err = "Invalid event class character. Use 'Ag$lshzxeKEtmd'.";
+ *err = "Invalid event class character. Use 'Ag$lshzxeKEtmdn'.";
return 0;
}
server.notify_keyspace_events = flags;
@@ -2887,7 +2943,8 @@ standardConfig static_configs[] = {
createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, server.replica_announced, 1, NULL, NULL),
createBoolConfig("latency-tracking", NULL, MODIFIABLE_CONFIG, server.latency_tracking_enabled, 1, NULL, NULL),
createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
-
+ createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
+
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
createStringConfig("unixsocket", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.unixsocket, NULL, NULL, NULL),
@@ -2928,6 +2985,9 @@ standardConfig static_configs[] = {
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
+ createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
+ createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
+ createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
@@ -2971,6 +3031,7 @@ standardConfig static_configs[] = {
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
createUIntConfig("unixsocketperm", NULL, IMMUTABLE_CONFIG, 0, 0777, server.unixsocketperm, 0, OCTAL_CONFIG, NULL, NULL),
+ createUIntConfig("socket-mark-id", NULL, IMMUTABLE_CONFIG, 0, UINT_MAX, server.socket_mark_id, 0, INTEGER_CONFIG, NULL, NULL),
/* Unsigned Long configs */
createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */
diff --git a/src/config.h b/src/config.h
index 210e55a87..6baa8bd0f 100644
--- a/src/config.h
+++ b/src/config.h
@@ -80,6 +80,10 @@
/* MSG_NOSIGNAL. */
#ifdef __linux__
#define HAVE_MSG_NOSIGNAL 1
+#if defined(SO_MARK)
+#define HAVE_SOCKOPTMARKID 1
+#define SOCKOPTMARKID SO_MARK
+#endif
#endif
/* Test for polling API */
@@ -113,6 +117,20 @@
#define redis_fsync(fd) fsync(fd)
#endif
+#if defined(__FreeBSD__)
+#if defined(SO_USER_COOKIE)
+#define HAVE_SOCKOPTMARKID 1
+#define SOCKOPTMARKID SO_USER_COOKIE
+#endif
+#endif
+
+#if defined(__OpenBSD__)
+#if defined(SO_RTABLE)
+#define HAVE_SOCKOPTMARKID 1
+#define SOCKOPTMARKID SO_RTABLE
+#endif
+#endif
+
#if __GNUC__ >= 4
#define redis_unreachable __builtin_unreachable
#else
diff --git a/src/db.c b/src/db.c
index d4da756f1..10da1e923 100644
--- a/src/db.c
+++ b/src/db.c
@@ -182,6 +182,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
dictSetVal(db->dict, de, val);
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAddEntry(de, db);
+ notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
}
/* This is a special version of dbAdd() that is used only when loading
diff --git a/src/debug.c b/src/debug.c
index 4f0e37777..081cb862f 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -416,6 +416,8 @@ void debugCommand(client *c) {
" Like HTSTATS but for the hash table stored at <key>'s value.",
"LOADAOF",
" Flush the AOF buffers on disk and reload the AOF in memory.",
+"REPLICATE <string>",
+" Replicates the provided string to replicas, allowing data divergence.",
#ifdef USE_JEMALLOC
"MALLCTL <key> [<val>]",
" Get or set a malloc tuning integer.",
@@ -849,6 +851,10 @@ NULL
{
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
+ replicationFeedSlaves(server.slaves, server.slaveseldb,
+ c->argv + 2, c->argc - 2);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
sds errstr = sdsnewlen("-",1);
diff --git a/src/eval.c b/src/eval.c
index 22bcbdb73..332aec9ce 100644
--- a/src/eval.c
+++ b/src/eval.c
@@ -218,31 +218,20 @@ void scriptingInit(int setup) {
lua_setglobal(lua,"redis");
- /* Add a helper function that we use to sort the multi bulk output of non
- * deterministic commands, when containing 'false' elements. */
- {
- char *compare_func = "function __redis__compare_helper(a,b)\n"
- " if a == false then a = '' end\n"
- " if b == false then b = '' end\n"
- " return a<b\n"
- "end\n";
- luaL_loadbuffer(lua,compare_func,strlen(compare_func),"@cmp_func_def");
- lua_pcall(lua,0,0,0);
- }
-
/* Add a helper function we use for pcall error reporting.
* Note that when the error is in the C function we want to report the
* information about the caller, that's what makes sense from the point
* of view of the user debugging a script. */
{
char *errh_func = "local dbg = debug\n"
+ "debug = nil\n"
"function __redis__err__handler(err)\n"
" local i = dbg.getinfo(2,'nSl')\n"
" if i and i.what == 'C' then\n"
" i = dbg.getinfo(3,'nSl')\n"
" end\n"
" if type(err) ~= 'table' then\n"
- " err = {err='ERR' .. tostring(err)}"
+ " err = {err='ERR ' .. tostring(err)}"
" end"
" if i then\n"
" err['source'] = i.source\n"
@@ -266,10 +255,12 @@ void scriptingInit(int setup) {
lctx.lua_client->flags |= CLIENT_DENY_BLOCKING;
}
- /* Lua beginners often don't use "local", this is likely to introduce
- * subtle bugs in their code. To prevent problems we protect accesses
- * to global variables. */
- luaEnableGlobalsProtection(lua, 1);
+ /* Lock the global table from any changes */
+ lua_pushvalue(lua, LUA_GLOBALSINDEX);
+ luaSetErrorMetatable(lua);
+ /* Recursively lock all tables that can be reached from the global table */
+ luaSetTableProtectionRecursively(lua);
+ lua_pop(lua, 1);
lctx.lua = lua;
}
@@ -378,35 +369,20 @@ sds luaCreateFunction(client *c, robj *body) {
sdsfreesplitres(parts, numparts);
}
- /* Build the lua function to be loaded */
- sds funcdef = sdsempty();
- funcdef = sdscat(funcdef,"function ");
- funcdef = sdscatlen(funcdef,funcname,42);
- funcdef = sdscatlen(funcdef,"() ",3);
/* Note that in case of a shebang line we skip it but keep the line feed to conserve the user's line numbers */
- funcdef = sdscatlen(funcdef,(char*)body->ptr + shebang_len,sdslen(body->ptr) - shebang_len);
- funcdef = sdscatlen(funcdef,"\nend",4);
-
- if (luaL_loadbuffer(lctx.lua,funcdef,sdslen(funcdef),"@user_script")) {
+ if (luaL_loadbuffer(lctx.lua,(char*)body->ptr + shebang_len,sdslen(body->ptr) - shebang_len,"@user_script")) {
if (c != NULL) {
addReplyErrorFormat(c,
"Error compiling script (new function): %s",
lua_tostring(lctx.lua,-1));
}
lua_pop(lctx.lua,1);
- sdsfree(funcdef);
return NULL;
}
- sdsfree(funcdef);
- if (lua_pcall(lctx.lua,0,0,0)) {
- if (c != NULL) {
- addReplyErrorFormat(c,"Error running script (new function): %s",
- lua_tostring(lctx.lua,-1));
- }
- lua_pop(lctx.lua,1);
- return NULL;
- }
+ serverAssert(lua_isfunction(lctx.lua, -1));
+
+ lua_setfield(lctx.lua, LUA_REGISTRYINDEX, funcname);
/* We also save a SHA1 -> Original script map in a dictionary
* so that we can replicate / write in the AOF all the
@@ -479,7 +455,7 @@ void evalGenericCommand(client *c, int evalsha) {
lua_getglobal(lua, "__redis__err__handler");
/* Try to lookup the Lua function */
- lua_getglobal(lua, funcname);
+ lua_getfield(lua, LUA_REGISTRYINDEX, funcname);
if (lua_isnil(lua,-1)) {
lua_pop(lua,1); /* remove the nil from the stack */
/* Function not defined... let's define it if we have the
@@ -497,7 +473,7 @@ void evalGenericCommand(client *c, int evalsha) {
return;
}
/* Now the following is guaranteed to return non nil */
- lua_getglobal(lua, funcname);
+ lua_getfield(lua, LUA_REGISTRYINDEX, funcname);
serverAssert(!lua_isnil(lua,-1));
}
diff --git a/src/evict.c b/src/evict.c
index 933141638..a5821a463 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -492,10 +492,6 @@ static int isSafeToPerformEvictions(void) {
* expires and evictions of keys not being performed. */
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
- /* We cannot evict if we already have stuff to propagate (for example,
- * CONFIG SET maxmemory inside a MULTI/EXEC) */
- if (server.also_propagate.numops != 0) return 0;
-
return 1;
}
diff --git a/src/function_lua.c b/src/function_lua.c
index 8f21a1721..2e0250ea2 100644
--- a/src/function_lua.c
+++ b/src/function_lua.c
@@ -50,6 +50,7 @@
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
#define REGISTRY_LOAD_CTX_NAME "__LIBRARY_CTX__"
#define LIBRARY_API_NAME "__LIBRARY_API__"
+#define GLOBALS_API_NAME "__GLOBALS_API__"
#define LOAD_TIMEOUT_MS 500
/* Lua engine ctx */
@@ -99,42 +100,23 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
* Return NULL on compilation error and set the error to the err variable
*/
static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds *err) {
+ int ret = C_ERR;
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
- /* Each library will have its own global distinct table.
- * We will create a new fresh Lua table and use
- * lua_setfenv to set the table as the library globals
- * (https://www.lua.org/manual/5.1/manual.html#lua_setfenv)
- *
- * At first, populate this new table with only the 'library' API
- * to make sure only 'library' API is available at start. After the
- * initial run is finished and all functions are registered, add
- * all the default globals to the library global table and delete
- * the library API.
- *
- * There are 2 ways to achieve the last part (add default
- * globals to the new table):
- *
- * 1. Initialize the new table with all the default globals
- * 2. Inheritance using metatable (https://www.lua.org/pil/14.3.html)
- *
- * For now we are choosing the second, we can change it in the future to
- * achieve a better isolation between functions. */
- lua_newtable(lua); /* Global table for the library */
- lua_pushstring(lua, REDIS_API_NAME);
- lua_pushstring(lua, LIBRARY_API_NAME);
- lua_gettable(lua, LUA_REGISTRYINDEX); /* get library function from registry */
- lua_settable(lua, -3); /* push the library table to the new global table */
-
- /* Set global protection on the new global table */
- luaSetGlobalProtection(lua_engine_ctx->lua);
+ /* set load library globals */
+ lua_getmetatable(lua, LUA_GLOBALSINDEX);
+ lua_enablereadonlytable(lua, -1, 0); /* disable global protection */
+ lua_getfield(lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME);
+ lua_setfield(lua, -2, "__index");
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); /* enable global protection */
+ lua_pop(lua, 1); /* pop the metatable */
/* compile the code */
if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) {
*err = sdscatprintf(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1));
- lua_pop(lua, 2); /* pops the error and globals table */
- return C_ERR;
+ lua_pop(lua, 1); /* pops the error */
+ goto done;
}
serverAssert(lua_isfunction(lua, -1));
@@ -144,45 +126,31 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds
};
luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, &load_ctx);
- /* set the function environment so only 'library' API can be accessed. */
- lua_pushvalue(lua, -2); /* push global table to the front */
- lua_setfenv(lua, -2);
-
lua_sethook(lua,luaEngineLoadHook,LUA_MASKCOUNT,100000);
/* Run the compiled code to allow it to register functions */
if (lua_pcall(lua,0,0,0)) {
errorInfo err_info = {0};
luaExtractErrorInformation(lua, &err_info);
*err = sdscatprintf(sdsempty(), "Error registering functions: %s", err_info.msg);
- lua_pop(lua, 2); /* pops the error and globals table */
- lua_sethook(lua,NULL,0,0); /* Disable hook */
- luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL);
+ lua_pop(lua, 1); /* pops the error */
luaErrorInformationDiscard(&err_info);
- return C_ERR;
+ goto done;
}
- lua_sethook(lua,NULL,0,0); /* Disable hook */
- luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL);
- /* stack contains the global table, lets rearrange it to contains the entire API. */
- /* delete 'redis' API */
- lua_pushstring(lua, REDIS_API_NAME);
- lua_pushnil(lua);
- lua_settable(lua, -3);
-
- /* create metatable */
- lua_newtable(lua);
- lua_pushstring(lua, "__index");
- lua_pushvalue(lua, LUA_GLOBALSINDEX); /* push original globals */
- lua_settable(lua, -3);
- lua_pushstring(lua, "__newindex");
- lua_pushvalue(lua, LUA_GLOBALSINDEX); /* push original globals */
- lua_settable(lua, -3);
-
- lua_setmetatable(lua, -2);
+ ret = C_OK;
- lua_pop(lua, 1); /* pops the global table */
+done:
+ /* restore original globals */
+ lua_getmetatable(lua, LUA_GLOBALSINDEX);
+ lua_enablereadonlytable(lua, -1, 0); /* disable global protection */
+ lua_getfield(lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME);
+ lua_setfield(lua, -2, "__index");
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); /* enable global protection */
+ lua_pop(lua, 1); /* pop the metatable */
- return C_OK;
+ lua_sethook(lua,NULL,0,0); /* Disable hook */
+ luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL);
+ return ret;
}
/*
@@ -458,8 +426,8 @@ int luaEngineInitEngine() {
luaRegisterRedisAPI(lua_engine_ctx->lua);
/* Register the library commands table and fields and store it to registry */
- lua_pushstring(lua_engine_ctx->lua, LIBRARY_API_NAME);
- lua_newtable(lua_engine_ctx->lua);
+ lua_newtable(lua_engine_ctx->lua); /* load library globals */
+ lua_newtable(lua_engine_ctx->lua); /* load library `redis` table */
lua_pushstring(lua_engine_ctx->lua, "register_function");
lua_pushcfunction(lua_engine_ctx->lua, luaRegisterFunction);
@@ -468,18 +436,24 @@ int luaEngineInitEngine() {
luaRegisterLogFunction(lua_engine_ctx->lua);
luaRegisterVersion(lua_engine_ctx->lua);
- lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX);
+ luaSetErrorMetatable(lua_engine_ctx->lua);
+ lua_setfield(lua_engine_ctx->lua, -2, REDIS_API_NAME);
+
+ luaSetErrorMetatable(lua_engine_ctx->lua);
+ luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect load library globals */
+ lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME);
/* Save error handler to registry */
lua_pushstring(lua_engine_ctx->lua, REGISTRY_ERROR_HANDLER_NAME);
char *errh_func = "local dbg = debug\n"
+ "debug = nil\n"
"local error_handler = function (err)\n"
" local i = dbg.getinfo(2,'nSl')\n"
" if i and i.what == 'C' then\n"
" i = dbg.getinfo(3,'nSl')\n"
" end\n"
" if type(err) ~= 'table' then\n"
- " err = {err='ERR' .. tostring(err)}"
+ " err = {err='ERR ' .. tostring(err)}"
" end"
" if i then\n"
" err['source'] = i.source\n"
@@ -492,17 +466,30 @@ int luaEngineInitEngine() {
lua_pcall(lua_engine_ctx->lua,0,1,0);
lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX);
- /* Save global protection to registry */
- luaRegisterGlobalProtectionFunction(lua_engine_ctx->lua);
-
- /* Set global protection on globals */
lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
- luaSetGlobalProtection(lua_engine_ctx->lua);
+ luaSetErrorMetatable(lua_engine_ctx->lua);
+ luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect globals */
lua_pop(lua_engine_ctx->lua, 1);
+ /* Save default globals to registry */
+ lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
+ lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME);
+
/* save the engine_ctx on the registry so we can get it from the Lua interpreter */
luaSaveOnRegistry(lua_engine_ctx->lua, REGISTRY_ENGINE_CTX_NAME, lua_engine_ctx);
+ /* Create new empty table to be the new globals, we will be able to control the real globals
+ * using metatable */
+ lua_newtable(lua_engine_ctx->lua); /* new globals */
+ lua_newtable(lua_engine_ctx->lua); /* new globals metatable */
+ lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
+ lua_setfield(lua_engine_ctx->lua, -2, "__index");
+ lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the metatable */
+ lua_setmetatable(lua_engine_ctx->lua, -2);
+ lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the new global table */
+ lua_replace(lua_engine_ctx->lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */
+
+
engine *lua_engine = zmalloc(sizeof(*lua_engine));
*lua_engine = (engine) {
.engine_ctx = lua_engine_ctx,
diff --git a/src/help.h b/src/help.h
index e25ca3fa3..dfecf5981 100644
--- a/src/help.h
+++ b/src/help.h
@@ -1,4 +1,4 @@
-/* Automatically generated by utils/generate-command-help.rb, do not edit. */
+/* Automatically generated by ./generate-command-help.rb, do not edit. */
#ifndef __REDIS_HELP_H
#define __REDIS_HELP_H
@@ -130,12 +130,12 @@ struct commandHelp {
15,
"2.6.0" },
{ "BITFIELD",
- "key [GET encoding offset] [SET encoding offset value] [INCRBY encoding offset increment] [OVERFLOW WRAP|SAT|FAIL]",
+ "key GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment [GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment ...]",
"Perform arbitrary bitfield integer operations on strings",
15,
"3.2.0" },
{ "BITFIELD_RO",
- "key GET encoding offset",
+ "key GET encoding offset [encoding offset ...]",
"Perform arbitrary bitfield integer operations on strings. Read-only variant of BITFIELD",
15,
"6.2.0" },
@@ -690,12 +690,12 @@ struct commandHelp {
13,
"3.2.10" },
{ "GEOSEARCH",
- "key [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius M|KM|FT|MI] [BYBOX width height M|KM|FT|MI] [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]",
+ "key FROMMEMBER member|FROMLONLAT longitude latitude BYRADIUS radius M|KM|FT|MI|BYBOX width height M|KM|FT|MI [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]",
"Query a sorted set representing a geospatial index to fetch members inside an area of a box or a circle.",
13,
"6.2.0" },
{ "GEOSEARCHSTORE",
- "destination source [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius M|KM|FT|MI] [BYBOX width height M|KM|FT|MI] [ASC|DESC] [COUNT count [ANY]] [STOREDIST]",
+ "destination source FROMMEMBER member|FROMLONLAT longitude latitude BYRADIUS radius M|KM|FT|MI|BYBOX width height M|KM|FT|MI [ASC|DESC] [COUNT count [ANY]] [STOREDIST]",
"Query a sorted set representing a geospatial index to fetch members inside an area of a box or a circle, and store the result in another key.",
13,
"6.2.0" },
@@ -1000,7 +1000,7 @@ struct commandHelp {
1,
"1.0.0" },
{ "MIGRATE",
- "host port key| destination-db timeout [COPY] [REPLACE] [AUTH password] [AUTH2 username password] [KEYS key [key ...]]",
+ "host port key| destination-db timeout [COPY] [REPLACE] [[AUTH password]|[AUTH2 username password]] [KEYS key [key ...]]",
"Atomically transfer a key from a Redis instance to another one.",
0,
"2.6.0" },
@@ -1355,7 +1355,7 @@ struct commandHelp {
8,
"1.0.0" },
{ "SET",
- "key value [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL] [NX|XX] [GET]",
+ "key value [NX|XX] [GET] [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL]",
"Set the string value of a key",
1,
"1.0.0" },
diff --git a/src/listpack.c b/src/listpack.c
index e651e4960..75189f55f 100644
--- a/src/listpack.c
+++ b/src/listpack.c
@@ -180,7 +180,8 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
int negative = 0;
uint64_t v;
- if (plen == slen)
+ /* Abort if length indicates this cannot possibly be an int */
+ if (slen == 0 || slen >= LONG_STR_SIZE)
return 0;
/* Special case: first and only digit is 0. */
diff --git a/src/listpack.h b/src/listpack.h
index 6c4d6bdd6..3e750af5b 100644
--- a/src/listpack.h
+++ b/src/listpack.h
@@ -59,6 +59,8 @@ void lpFree(unsigned char *lp);
unsigned char* lpShrinkToFit(unsigned char *lp);
unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen,
unsigned char *p, int where, unsigned char **newp);
+unsigned char *lpInsertInteger(unsigned char *lp, long long lval,
+ unsigned char *p, int where, unsigned char **newp);
unsigned char *lpPrepend(unsigned char *lp, unsigned char *s, uint32_t slen);
unsigned char *lpPrependInteger(unsigned char *lp, long long lval);
unsigned char *lpAppend(unsigned char *lp, unsigned char *s, uint32_t slen);
diff --git a/src/module.c b/src/module.c
index 3fc6a5499..99d2adcd4 100644
--- a/src/module.c
+++ b/src/module.c
@@ -464,11 +464,18 @@ static int moduleConvertArgFlags(int flags);
/* Use like malloc(). Memory allocated with this function is reported in
* Redis INFO memory, used for keys eviction according to maxmemory settings
* and in general is taken into account as memory allocated by Redis.
- * You should avoid using malloc(). */
+ * You should avoid using malloc().
+ * This function panics if unable to allocate enough memory. */
void *RM_Alloc(size_t bytes) {
return zmalloc(bytes);
}
+/* Similar to RM_Alloc, but returns NULL in case of allocation failure, instead
+ * of panicking. */
+void *RM_TryAlloc(size_t bytes) {
+ return ztrymalloc(bytes);
+}
+
/* Use like calloc(). Memory allocated with this function is reported in
* Redis INFO memory, used for keys eviction according to maxmemory settings
* and in general is taken into account as memory allocated by Redis.
@@ -710,6 +717,8 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
if (server.busy_module_yield_flags) {
blockingOperationEnds();
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ if (server.current_client)
+ unprotectClient(server.current_client);
unblockPostponedClients();
}
}
@@ -1040,9 +1049,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec
* serve stale data. Don't use if you don't know what
* this means.
* * **"no-monitor"**: Don't propagate the command on monitor. Use this if
- * the command has sensible data among the arguments.
+ * the command has sensitive data among the arguments.
* * **"no-slowlog"**: Don't log this command in the slowlog. Use this if
- * the command has sensible data among the arguments.
+ * the command has sensitive data among the arguments.
* * **"fast"**: The command time complexity is not greater
* than O(log(N)) where N is the size of the collection or
* anything else representing the normal scalability
@@ -1924,6 +1933,7 @@ static struct redisCommandArg *moduleCopyCommandArgs(RedisModuleCommandArg *args
if (arg->token) realargs[j].token = zstrdup(arg->token);
if (arg->summary) realargs[j].summary = zstrdup(arg->summary);
if (arg->since) realargs[j].since = zstrdup(arg->since);
+ if (arg->deprecated_since) realargs[j].deprecated_since = zstrdup(arg->deprecated_since);
realargs[j].flags = moduleConvertArgFlags(arg->flags);
if (arg->subargs) realargs[j].subargs = moduleCopyCommandArgs(arg->subargs, version);
}
@@ -2079,6 +2089,12 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
* the -LOADING error)
*/
void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
+ static int yield_nesting = 0;
+ /* Avoid nested calls to RM_Yield */
+ if (yield_nesting)
+ return;
+ yield_nesting++;
+
long long now = getMonotonicUs();
if (now >= ctx->next_yield_time) {
/* In loading mode, there's no need to handle busy_module_yield_reply,
@@ -2092,10 +2108,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
server.busy_module_yield_reply = busy_reply;
/* start the blocking operation if not already started. */
if (!server.busy_module_yield_flags) {
- server.busy_module_yield_flags = flags & REDISMODULE_YIELD_FLAG_CLIENTS ?
- BUSY_MODULE_YIELD_CLIENTS : BUSY_MODULE_YIELD_EVENTS;
+ server.busy_module_yield_flags = BUSY_MODULE_YIELD_EVENTS;
blockingOperationStarts();
+ if (server.current_client)
+ protectClient(server.current_client);
}
+ if (flags & REDISMODULE_YIELD_FLAG_CLIENTS)
+ server.busy_module_yield_flags |= BUSY_MODULE_YIELD_CLIENTS;
/* Let redis process events */
processEventsWhileBlocked();
@@ -2110,6 +2129,7 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
/* decide when the next event should fire. */
ctx->next_yield_time = now + 1000000 / server.hz;
}
+ yield_nesting--;
}
/* Set flags defining capabilities or behavior bit flags.
@@ -2639,9 +2659,7 @@ void RM_TrimStringAllocation(RedisModuleString *str) {
* if (argc != 3) return RedisModule_WrongArity(ctx);
*/
int RM_WrongArity(RedisModuleCtx *ctx) {
- addReplyErrorFormat(ctx->client,
- "wrong number of arguments for '%s' command",
- (char*)ctx->client->argv[0]->ptr);
+ addReplyErrorArity(ctx->client);
return REDISMODULE_OK;
}
@@ -3365,10 +3383,13 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
/* Publish a message to subscribers (see PUBLISH command). */
int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
UNUSED(ctx);
- int receivers = pubsubPublishMessage(channel, message);
- if (server.cluster_enabled)
- clusterPropagatePublish(channel, message);
- return receivers;
+ return pubsubPublishMessageAndPropagateToCluster(channel, message, 0);
+}
+
+/* Publish a message to shard-subscribers (see SPUBLISH command). */
+int RM_PublishMessageShard(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
+ UNUSED(ctx);
+ return pubsubPublishMessageAndPropagateToCluster(channel, message, 1);
}
/* Return the currently selected DB. */
@@ -3615,7 +3636,7 @@ static void moduleInitKeyTypeSpecific(RedisModuleKey *key) {
* key does not exist, NULL is returned. However it is still safe to
* call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL
* value. */
-void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
+RedisModuleKey *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
RedisModuleKey *kp;
robj *value;
int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0;
@@ -3633,7 +3654,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
kp = zmalloc(sizeof(*kp));
moduleInitKey(kp, ctx, keyname, value, mode);
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
- return (void*)kp;
+ return kp;
}
/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
@@ -5736,26 +5757,18 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* Lookup command now, after filters had a chance to make modifications
* if necessary.
*/
- cmd = lookupCommand(c->argv,c->argc);
- if (!cmd) {
+ cmd = c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
+ sds err;
+ if (!commandCheckExistence(c, error_as_call_replies? &err : NULL)) {
errno = ENOENT;
- if (error_as_call_replies) {
- sds msg = sdscatfmt(sdsempty(),"Unknown Redis "
- "command '%S'.",c->argv[0]->ptr);
- reply = callReplyCreateError(msg, ctx);
- }
+ if (error_as_call_replies)
+ reply = callReplyCreateError(err, ctx);
goto cleanup;
}
- c->cmd = c->lastcmd = c->realcmd = cmd;
-
- /* Basic arity checks. */
- if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
+ if (!commandCheckArity(c, error_as_call_replies? &err : NULL)) {
errno = EINVAL;
- if (error_as_call_replies) {
- sds msg = sdscatfmt(sdsempty(), "Wrong number of "
- "args calling Redis command '%S'.", c->cmd->fullname);
- reply = callReplyCreateError(msg, ctx);
- }
+ if (error_as_call_replies)
+ reply = callReplyCreateError(err, ctx);
goto cleanup;
}
@@ -5798,8 +5811,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
}
int deny_write_type = writeCommandsDeniedByDiskError();
+ int obey_client = mustObeyClient(server.current_client);
- if (deny_write_type != DISK_ERROR_TYPE_NONE) {
+ if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
errno = ENOSPC;
if (error_as_call_replies) {
sds msg = writeCommandsGetDiskErrorMessage(deny_write_type);
@@ -5841,7 +5855,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* If this is a Redis Cluster node, we need to make sure the module is not
* trying to access non-local keys, with the exception of commands
* received from our master. */
- if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
+ if (server.cluster_enabled && !mustObeyClient(ctx->client)) {
int error_code;
/* Duplicate relevant flags in the module client. */
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
@@ -5890,11 +5904,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
- /* Set server.current_client */
- client *old_client = server.current_client;
- server.current_client = c;
call(c,call_flags);
- server.current_client = old_client;
server.replication_allowed = prev_replication_allowed;
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
@@ -6074,6 +6084,14 @@ const char *moduleTypeModuleName(moduleType *mt) {
return mt->module->name;
}
+/* Return the module name from a module command */
+const char *moduleNameFromCommand(struct redisCommand *cmd) {
+ serverAssert(cmd->proc == RedisModuleCommandDispatcher);
+
+ RedisModuleCommand *cp = (void*)(unsigned long)cmd->getkeys_proc;
+ return cp->module->name;
+}
+
/* Create a copy of a module type value using the copy callback. If failed
* or not supported, produce an error reply and return NULL.
*/
@@ -7623,6 +7641,8 @@ void moduleGILBeforeUnlock() {
if (server.busy_module_yield_flags) {
blockingOperationEnds();
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
+ if (server.current_client)
+ unprotectClient(server.current_client);
unblockPostponedClients();
}
}
@@ -8676,8 +8696,18 @@ int RM_ACLCheckChannelPermissions(RedisModuleUser *user, RedisModuleString *ch,
* Returns REDISMODULE_OK on success and REDISMODULE_ERR on error.
*
* For more information about ACL log, please refer to https://redis.io/commands/acl-log */
-void RM_ACLAddLogEntry(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object) {
- addACLLogEntry(ctx->client, 0, ACL_LOG_CTX_MODULE, -1, user->user->name, sdsdup(object->ptr));
+int RM_ACLAddLogEntry(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object, RedisModuleACLLogEntryReason reason) {
+ int acl_reason;
+ switch (reason) {
+ case REDISMODULE_ACL_LOG_AUTH: acl_reason = ACL_DENIED_AUTH; break;
+ case REDISMODULE_ACL_LOG_KEY: acl_reason = ACL_DENIED_KEY; break;
+ case REDISMODULE_ACL_LOG_CHANNEL: acl_reason = ACL_DENIED_CHANNEL; break;
+ case REDISMODULE_ACL_LOG_CMD: acl_reason = ACL_DENIED_CMD; break;
+ default: return REDISMODULE_ERR;
+ }
+
+ addACLLogEntry(ctx->client, acl_reason, ACL_LOG_CTX_MODULE, -1, user->user->name, sdsdup(object->ptr));
+ return REDISMODULE_OK;
}
/* Authenticate the client associated with the context with
@@ -9730,10 +9760,29 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
* with the allocation calls, since sometimes the underlying allocator
* will allocate more memory.
*/
-size_t RM_MallocSize(void* ptr){
+size_t RM_MallocSize(void* ptr) {
return zmalloc_size(ptr);
}
+/* Same as RM_MallocSize, except it works on RedisModuleString pointers.
+ */
+size_t RM_MallocSizeString(RedisModuleString* str) {
+ serverAssert(str->type == OBJ_STRING);
+ return sizeof(*str) + getStringObjectSdsUsedMemory(str);
+}
+
+/* Same as RM_MallocSize, except it works on RedisModuleDict pointers.
+ * Note that the returned value is only the overhead of the underlying structures,
+ * it does not include the allocation size of the keys and values.
+ */
+size_t RM_MallocSizeDict(RedisModuleDict* dict) {
+ size_t size = sizeof(RedisModuleDict) + sizeof(rax);
+ size += dict->rax->numnodes * sizeof(raxNode);
+ /* For more info about this weird line, see streamRadixTreeMemoryUsage */
+ size += dict->rax->numnodes * sizeof(long)*30;
+ return size;
+}
+
/* Return the a number between 0 to 1 indicating the amount of memory
* currently used, relative to the Redis "maxmemory" configuration.
*
@@ -10908,6 +10957,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) {
}
zfree((char *)cmd->summary);
zfree((char *)cmd->since);
+ zfree((char *)cmd->deprecated_since);
zfree((char *)cmd->complexity);
if (cmd->latency_histogram) {
hdr_close(cmd->latency_histogram);
@@ -11275,6 +11325,7 @@ int moduleVerifyConfigFlags(unsigned int flags, configType type) {
| REDISMODULE_CONFIG_HIDDEN
| REDISMODULE_CONFIG_PROTECTED
| REDISMODULE_CONFIG_DENY_LOADING
+ | REDISMODULE_CONFIG_BITFLAGS
| REDISMODULE_CONFIG_MEMORY))) {
serverLogRaw(LL_WARNING, "Invalid flag(s) for configuration");
return REDISMODULE_ERR;
@@ -11283,6 +11334,10 @@ int moduleVerifyConfigFlags(unsigned int flags, configType type) {
serverLogRaw(LL_WARNING, "Numeric flag provided for non-numeric configuration.");
return REDISMODULE_ERR;
}
+ if (type != ENUM_CONFIG && flags & REDISMODULE_CONFIG_BITFLAGS) {
+ serverLogRaw(LL_WARNING, "Enum flag provided for non-enum configuration.");
+ return REDISMODULE_ERR;
+ }
return REDISMODULE_OK;
}
@@ -11484,6 +11539,12 @@ unsigned int maskModuleNumericConfigFlags(unsigned int flags) {
return new_flags;
}
+unsigned int maskModuleEnumConfigFlags(unsigned int flags) {
+ unsigned int new_flags = 0;
+ if (flags & REDISMODULE_CONFIG_BITFLAGS) new_flags |= MULTI_ARG_CONFIG;
+ return new_flags;
+}
+
/* Create a string config that Redis users can interact with via the Redis config file,
* `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands.
*
@@ -11523,6 +11584,7 @@ unsigned int maskModuleNumericConfigFlags(unsigned int flags) {
* * REDISMODULE_CONFIG_PROTECTED: This config will be only be modifiable based off the value of enable-protected-configs.
* * REDISMODULE_CONFIG_DENY_LOADING: This config is not modifiable while the server is loading data.
* * REDISMODULE_CONFIG_MEMORY: For numeric configs, this config will convert data unit notations into their byte equivalent.
+ * * REDISMODULE_CONFIG_BITFLAGS: For enum configs, this config will allow multiple entries to be combined as bit flags.
*
* Default values are used on startup to set the value if it is not provided via the config file
* or command line. Default values are also used to compare to on a config rewrite.
@@ -11638,7 +11700,7 @@ int RM_RegisterEnumConfig(RedisModuleCtx *ctx, const char *name, int default_val
enum_vals[num_enum_vals].name = NULL;
enum_vals[num_enum_vals].val = 0;
listAddNodeTail(module->module_configs, new_config);
- flags = maskModuleConfigFlags(flags);
+ flags = maskModuleConfigFlags(flags) | maskModuleEnumConfigFlags(flags);
addModuleEnumConfig(module->name, name, flags, new_config, default_val, enum_vals);
return REDISMODULE_OK;
}
@@ -12225,6 +12287,7 @@ void moduleRegisterCoreAPI(void) {
server.moduleapi = dictCreate(&moduleAPIDictType);
server.sharedapi = dictCreate(&moduleAPIDictType);
REGISTER_API(Alloc);
+ REGISTER_API(TryAlloc);
REGISTER_API(Calloc);
REGISTER_API(Realloc);
REGISTER_API(Free);
@@ -12490,6 +12553,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ServerInfoGetFieldDouble);
REGISTER_API(GetClientInfoById);
REGISTER_API(PublishMessage);
+ REGISTER_API(PublishMessageShard);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(SetLRU);
REGISTER_API(GetLRU);
@@ -12500,6 +12564,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetBlockedClientReadyKey);
REGISTER_API(GetUsedMemoryRatio);
REGISTER_API(MallocSize);
+ REGISTER_API(MallocSizeString);
+ REGISTER_API(MallocSizeDict);
REGISTER_API(ScanCursorCreate);
REGISTER_API(ScanCursorDestroy);
REGISTER_API(ScanCursorRestart);
diff --git a/src/modules/Makefile b/src/modules/Makefile
index c4bc7eb1a..b9ef5786d 100644
--- a/src/modules/Makefile
+++ b/src/modules/Makefile
@@ -28,42 +28,42 @@ all: helloworld.so hellotype.so helloblock.so hellocluster.so hellotimer.so hell
helloworld.xo: ../redismodule.h
helloworld.so: helloworld.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
hellotype.xo: ../redismodule.h
hellotype.so: hellotype.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
helloblock.xo: ../redismodule.h
helloblock.so: helloblock.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
hellocluster.xo: ../redismodule.h
hellocluster.so: hellocluster.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
hellotimer.xo: ../redismodule.h
hellotimer.so: hellotimer.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
hellodict.xo: ../redismodule.h
hellodict.so: hellodict.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
hellohook.xo: ../redismodule.h
hellohook.so: hellohook.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
helloacl.xo: ../redismodule.h
helloacl.so: helloacl.xo
- $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+ $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc
clean:
rm -rf *.xo *.so
diff --git a/src/monotonic.c b/src/monotonic.c
index 5bb4f03bf..608fa351c 100644
--- a/src/monotonic.c
+++ b/src/monotonic.c
@@ -168,3 +168,13 @@ const char * monotonicInit() {
return monotonic_info_string;
}
+
+const char *monotonicInfoString() {
+ return monotonic_info_string;
+}
+
+monotonic_clock_type monotonicGetType() {
+ if (getMonotonicUs == getMonotonicUs_posix)
+ return MONOTONIC_CLOCK_POSIX;
+ return MONOTONIC_CLOCK_HW;
+}
diff --git a/src/monotonic.h b/src/monotonic.h
index 4e82f9d53..32cf70638 100644
--- a/src/monotonic.h
+++ b/src/monotonic.h
@@ -24,13 +24,22 @@ typedef uint64_t monotime;
/* Retrieve counter of micro-seconds relative to an arbitrary point in time. */
extern monotime (*getMonotonicUs)(void);
+typedef enum monotonic_clock_type {
+ MONOTONIC_CLOCK_POSIX,
+ MONOTONIC_CLOCK_HW,
+} monotonic_clock_type;
/* Call once at startup to initialize the monotonic clock. Though this only
* needs to be called once, it may be called additional times without impact.
* Returns a printable string indicating the type of clock initialized.
* (The returned string is static and doesn't need to be freed.) */
-const char * monotonicInit();
+const char *monotonicInit();
+/* Return a string indicating the type of monotonic clock being used. */
+const char *monotonicInfoString();
+
+/* Return the type of monotonic clock being used. */
+monotonic_clock_type monotonicGetType();
/* Functions to measure elapsed time. Example:
* monotime myTimer;
diff --git a/src/networking.c b/src/networking.c
index 767d871d8..0664e2bf0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -160,6 +160,7 @@ client *createClient(connection *conn) {
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
+ c->slot = -1;
c->ctime = c->lastinteraction = server.unixtime;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
@@ -215,6 +216,23 @@ client *createClient(connection *conn) {
return c;
}
+void installClientWriteHandler(client *c) {
+ int ae_barrier = 0;
+ /* For the fsync=always policy, we want that a given FD is never
+ * served for reading and writing in the same event loop iteration,
+ * so that in the middle of receiving the query, and serving it
+ * to the client, we'll call beforeSleep() that will do the
+ * actual fsync of AOF to disk. the write barrier ensures that. */
+ if (server.aof_state == AOF_ON &&
+ server.aof_fsync == AOF_FSYNC_ALWAYS)
+ {
+ ae_barrier = 1;
+ }
+ if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
+ freeClientAsync(c);
+ }
+}
+
/* This function puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need
@@ -222,7 +240,7 @@ client *createClient(connection *conn) {
* handleClientsWithPendingWrites() function).
* If we fail and there is more data to write, compared to what the socket
* buffers can hold, then we'll really install the handler. */
-void clientInstallWriteHandler(client *c) {
+void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
@@ -285,11 +303,11 @@ int prepareClientToWrite(client *c) {
* it should already be setup to do so (it has already pending data).
*
* If CLIENT_PENDING_READ is set, we're in an IO thread and should
- * not install a write handler. Instead, it will be done by
- * handleClientsWithPendingReadsUsingThreads() upon return.
+ * not put the client in pending write queue. Instead, it will be
+ * done by handleClientsWithPendingReadsUsingThreads() upon return.
*/
if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
- clientInstallWriteHandler(c);
+ putClientInPendingWriteQueue(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
@@ -521,6 +539,21 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) {
showLatestBacklog();
}
server.stat_unexpected_error_replies++;
+
+ /* Based off the propagation error behavior, check if we need to panic here. There
+ * are currently two checked cases:
+ * * If this command was from our master and we are not a writable replica.
+ * * We are reading from an AOF file. */
+ int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro)
+ && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC ||
+ server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS);
+ int panic_in_aof = c->id == CLIENT_ID_AOF
+ && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC;
+ if (panic_in_replicas || panic_in_aof) {
+ serverPanic("This %s panicked sending an error to its %s"
+ " after processing the command '%s'",
+ from, to, cmdname ? cmdname : "<unknown>");
+ }
}
}
@@ -1061,7 +1094,7 @@ void addReplySubcommandSyntaxError(client *c) {
sds cmd = sdsnew((char*) c->argv[0]->ptr);
sdstoupper(cmd);
addReplyErrorFormat(c,
- "Unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.",
+ "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.",
(char*)c->argv[1]->ptr,cmd);
sdsfree(cmd);
}
@@ -1995,20 +2028,7 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
- int ae_barrier = 0;
- /* For the fsync=always policy, we want that a given FD is never
- * served for reading and writing in the same event loop iteration,
- * so that in the middle of receiving the query, and serving it
- * to the client, we'll call beforeSleep() that will do the
- * actual fsync of AOF to disk. the write barrier ensures that. */
- if (server.aof_state == AOF_ON &&
- server.aof_fsync == AOF_FSYNC_ALWAYS)
- {
- ae_barrier = 1;
- }
- if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
- freeClientAsync(c);
- }
+ installClientWriteHandler(c);
}
}
return processed;
@@ -2022,6 +2042,7 @@ void resetClient(client *c) {
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
+ c->slot = -1;
if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
@@ -2075,7 +2096,7 @@ void unprotectClient(client *c) {
c->flags &= ~CLIENT_PROTECTED;
if (c->conn) {
connSetReadHandler(c->conn,readQueryFromClient);
- if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
+ if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
}
}
}
@@ -2649,7 +2670,7 @@ void readQueryFromClient(connection *conn) {
/* There is more data in the client input buffer, continue parsing it
* and check if there is a full command to execute. */
- if (processInputBuffer(c) == C_ERR)
+ if (processInputBuffer(c) == C_ERR)
c = NULL;
done:
@@ -3808,7 +3829,7 @@ void flushSlavesOutputBuffers(void) {
}
}
-/* Compute current most restictive pause type and its end time, aggregated for
+/* Compute current most restrictive pause type and its end time, aggregated for
* all pause purposes. */
static void updateClientPauseTypeAndEndTime(void) {
pause_type old_type = server.client_pause_type;
@@ -4212,10 +4233,8 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Install the write handler if there are pending writes in some
* of the clients. */
- if (clientHasPendingReplies(c) &&
- connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
- {
- freeClientAsync(c);
+ if (clientHasPendingReplies(c)) {
+ installClientWriteHandler(c);
}
}
listEmpty(server.clients_pending_write);
@@ -4327,10 +4346,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
/* We may have pending replies if a thread readQueryFromClient() produced
- * replies and did not install a write handler (it can't).
+ * replies and did not put the client in pending write queue (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
- clientInstallWriteHandler(c);
+ putClientInPendingWriteQueue(c);
}
/* Update processed count on server */
diff --git a/src/notify.c b/src/notify.c
index 28c0048cb..2881a48db 100644
--- a/src/notify.c
+++ b/src/notify.c
@@ -57,6 +57,7 @@ int keyspaceEventsStringToFlags(char *classes) {
case 't': flags |= NOTIFY_STREAM; break;
case 'm': flags |= NOTIFY_KEY_MISS; break;
case 'd': flags |= NOTIFY_MODULE; break;
+ case 'n': flags |= NOTIFY_NEW; break;
default: return -1;
}
}
@@ -84,6 +85,7 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1);
+ if (flags & NOTIFY_NEW) res = sdscatlen(res,"n",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
@@ -124,7 +126,7 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
- pubsubPublishMessage(chanobj, eventobj);
+ pubsubPublishMessage(chanobj, eventobj, 0);
decrRefCount(chanobj);
}
@@ -136,7 +138,7 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
- pubsubPublishMessage(chanobj, key);
+ pubsubPublishMessage(chanobj, key, 0);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
diff --git a/src/object.c b/src/object.c
index a60a27e90..093e2619e 100644
--- a/src/object.c
+++ b/src/object.c
@@ -958,7 +958,7 @@ char *strEncoding(int encoding) {
* on the insertion speed and thus the ability of the radix tree
* to compress prefixes. */
size_t streamRadixTreeMemoryUsage(rax *rax) {
- size_t size;
+ size_t size = sizeof(*rax);
size = rax->numele * sizeof(streamID);
size += rax->numnodes * sizeof(raxNode);
/* Add a fixed overhead due to the aux data pointer, children, ... */
diff --git a/src/pubsub.c b/src/pubsub.c
index e805b16ef..a630afc8f 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
}
/* Publish a message to all the subscribers. */
-int pubsubPublishMessage(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel,message,pubSubType);
+int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
+ return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType);
}
-/* Publish a shard message to all the subscribers. */
-int pubsubPublishMessageShard(robj *channel, robj *message) {
- return pubsubPublishMessageInternal(channel, message, pubSubShardType);
-}
-
-
/*-----------------------------------------------------------------------------
* Pubsub commands implementation
*----------------------------------------------------------------------------*/
@@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) {
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
+/* This function wraps pubsubPublishMessage and also propagates the message to cluster.
+ * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) {
+ int receivers = pubsubPublishMessage(channel, message, sharded);
+ if (server.cluster_enabled)
+ clusterPropagatePublish(channel, message, sharded);
+ return receivers;
+}
+
/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
if (server.sentinel_mode) {
@@ -585,10 +588,8 @@ void publishCommand(client *c) {
return;
}
- int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
- if (server.cluster_enabled)
- clusterPropagatePublish(c->argv[1],c->argv[2]);
- else
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
@@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) {
/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
- int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
- if (server.cluster_enabled) {
- clusterPropagatePublishShard(c->argv[1], c->argv[2]);
- } else {
+ int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
+ if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
- }
addReplyLongLong(c,receivers);
}
diff --git a/src/quicklist.h b/src/quicklist.h
index dc30cafd9..8d6951b62 100644
--- a/src/quicklist.h
+++ b/src/quicklist.h
@@ -116,7 +116,7 @@ typedef struct quicklist {
typedef struct quicklistIter {
quicklist *quicklist;
quicklistNode *current;
- unsigned char *zi;
+ unsigned char *zi; /* points to the current element */
long offset; /* offset in current listpack */
int direction;
} quicklistIter;
@@ -141,7 +141,7 @@ typedef struct quicklistEntry {
/* quicklist compression disable */
#define QUICKLIST_NOCOMPRESS 0
-/* quicklist container formats */
+/* quicklist node container formats */
#define QUICKLIST_NODE_CONTAINER_PLAIN 1
#define QUICKLIST_NODE_CONTAINER_PACKED 2
diff --git a/src/rdb.c b/src/rdb.c
index 0283630f7..62ec5bbb2 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -588,22 +588,11 @@ int rdbSaveDoubleValue(rio *rdb, double val) {
len = 1;
buf[0] = (val < 0) ? 255 : 254;
} else {
-#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
- /* Check if the float is in a safe range to be casted into a
- * long long. We are assuming that long long is 64 bit here.
- * Also we are assuming that there are no implementations around where
- * double has precision < 52 bit.
- *
- * Under this assumptions we test if a double is inside an interval
- * where casting to long long is safe. Then using two castings we
- * make sure the decimal part is zero. If all this is true we use
- * integer printing function that is much faster. */
- double min = -4503599627370495; /* (2^52)-1 */
- double max = 4503599627370496; /* -(2^52) */
- if (val > min && val < max && val == ((double)((long long)val)))
- ll2string((char*)buf+1,sizeof(buf)-1,(long long)val);
+ long long lvalue;
+ /* Integer printing function is much faster, check if we can safely use it. */
+ if (double2ll(val, &lvalue))
+ ll2string((char*)buf+1,sizeof(buf)-1,lvalue);
else
-#endif
snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
buf[0] = strlen((char*)buf+1);
len = buf[0]+1;
@@ -2433,6 +2422,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
return NULL;
}
+ if (s->length && !raxSize(s->rax)) {
+ rdbReportCorruptRDB("Stream length inconsistent with rax entries");
+ decrRefCount(o);
+ return NULL;
+ }
+
/* Consumer groups loading */
uint64_t cgroups_count = rdbLoadLen(rdb,NULL);
if (cgroups_count == RDB_LENERR) {
diff --git a/src/redismodule.h b/src/redismodule.h
index f27c06b0e..cd389dd00 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -88,6 +88,7 @@
#define REDISMODULE_CONFIG_DENY_LOADING (1ULL<<6) /* This config is forbidden during loading. */
#define REDISMODULE_CONFIG_MEMORY (1ULL<<7) /* Indicates if this value can be set as a memory value */
+#define REDISMODULE_CONFIG_BITFLAGS (1ULL<<8) /* Indicates if this value can be set as a multiple enum values */
/* StreamID type. */
typedef struct RedisModuleStreamID {
@@ -322,6 +323,7 @@ typedef struct RedisModuleCommandArg {
const char *summary;
const char *since;
int flags; /* The REDISMODULE_CMD_ARG_* macros. */
+ const char *deprecated_since;
struct RedisModuleCommandArg *subargs;
} RedisModuleCommandArg;
@@ -735,6 +737,13 @@ typedef struct RedisModuleSwapDbInfo {
#define RedisModuleSwapDbInfo RedisModuleSwapDbInfoV1
+typedef enum {
+ REDISMODULE_ACL_LOG_AUTH = 0, /* Authentication failure */
+ REDISMODULE_ACL_LOG_CMD, /* Command authorization failure */
+ REDISMODULE_ACL_LOG_KEY, /* Key authorization failure */
+ REDISMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */
+} RedisModuleACLLogEntryReason;
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@@ -861,6 +870,7 @@ typedef struct RedisModuleTypeMethods {
#endif
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;
+REDISMODULE_API void * (*RedisModule_TryAlloc)(size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Free)(void *ptr) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Calloc)(size_t nmemb, size_t size) REDISMODULE_ATTR;
@@ -877,7 +887,7 @@ REDISMODULE_API int (*RedisModule_ReplyWithLongLong)(RedisModuleCtx *ctx, long l
REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR;
-REDISMODULE_API void * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR;
+REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR;
@@ -990,6 +1000,7 @@ REDISMODULE_API unsigned long long (*RedisModule_GetClientId)(RedisModuleCtx *ct
REDISMODULE_API RedisModuleString * (*RedisModule_GetClientUserNameById)(RedisModuleCtx *ctx, uint64_t id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetClientInfoById)(void *ci, uint64_t id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
+REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR;
@@ -1149,6 +1160,8 @@ REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR;
+REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR;
+REDISMODULE_API size_t (*RedisModule_MallocSizeDict)(RedisModuleDict* dict) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleUser * (*RedisModule_CreateModuleUser)(const char *name) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeModuleUser)(RedisModuleUser *user) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SetModuleUserACL)(RedisModuleUser *user, const char* acl) REDISMODULE_ATTR;
@@ -1157,7 +1170,7 @@ REDISMODULE_API RedisModuleUser * (*RedisModule_GetModuleUserFromUserName)(Redis
REDISMODULE_API int (*RedisModule_ACLCheckCommandPermissions)(RedisModuleUser *user, RedisModuleString **argv, int argc) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ACLCheckKeyPermissions)(RedisModuleUser *user, RedisModuleString *key, int flags) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ACLCheckChannelPermissions)(RedisModuleUser *user, RedisModuleString *ch, int literal) REDISMODULE_ATTR;
-REDISMODULE_API void (*RedisModule_ACLAddLogEntry)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object) REDISMODULE_ATTR;
+REDISMODULE_API void (*RedisModule_ACLAddLogEntry)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object, RedisModuleACLLogEntryReason reason) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AuthenticateClientWithACLUser)(RedisModuleCtx *ctx, const char *name, size_t len, RedisModuleUserChangedFunc callback, void *privdata, uint64_t *client_id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AuthenticateClientWithUser)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleUserChangedFunc callback, void *privdata, uint64_t *client_id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id) REDISMODULE_ATTR;
@@ -1191,6 +1204,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
void *getapifuncptr = ((void**)ctx)[0];
RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
REDISMODULE_GET_API(Alloc);
+ REDISMODULE_GET_API(TryAlloc);
REDISMODULE_GET_API(Calloc);
REDISMODULE_GET_API(Free);
REDISMODULE_GET_API(Realloc);
@@ -1411,6 +1425,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ServerInfoGetFieldDouble);
REDISMODULE_GET_API(GetClientInfoById);
REDISMODULE_GET_API(PublishMessage);
+ REDISMODULE_GET_API(PublishMessageShard);
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(SetLRU);
REDISMODULE_GET_API(GetLRU);
@@ -1478,6 +1493,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(KillForkChild);
REDISMODULE_GET_API(GetUsedMemoryRatio);
REDISMODULE_GET_API(MallocSize);
+ REDISMODULE_GET_API(MallocSizeString);
+ REDISMODULE_GET_API(MallocSizeDict);
REDISMODULE_GET_API(CreateModuleUser);
REDISMODULE_GET_API(FreeModuleUser);
REDISMODULE_GET_API(SetModuleUserACL);
diff --git a/src/replication.c b/src/replication.c
index e9a754ab4..2a0404f87 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -327,9 +327,6 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += len;
server.repl_backlog->histlen += len;
- /* Install write handler for all replicas. */
- prepareReplicasToWrite();
-
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
int add_new_block = 0; /* Create new block if current block is total used. */
@@ -440,6 +437,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
+ /* Must install write handler for all replicas first before feeding
+ * replication stream. */
+ prepareReplicasToWrite();
+
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
@@ -539,7 +540,12 @@ void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) {
/* There must be replication backlog if having attached slaves. */
if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL);
- if (server.repl_backlog) feedReplicationBuffer(buf,buflen);
+ if (server.repl_backlog) {
+ /* Must install write handler for all replicas first before feeding
+ * replication stream. */
+ prepareReplicasToWrite();
+ feedReplicationBuffer(buf,buflen);
+ }
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@@ -1285,7 +1291,7 @@ void replicaStartCommandStream(client *slave) {
return;
}
- clientInstallWriteHandler(slave);
+ putClientInPendingWriteQueue(slave);
}
/* We call this function periodically to remove an RDB file that was
@@ -1969,6 +1975,20 @@ void readSyncBulkPayload(connection *conn) {
/* We need to stop any AOF rewriting child before flushing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
+ /* Also try to stop save RDB child before flushing and parsing the RDB:
+ * 1. Ensure background save doesn't overwrite synced data after being loaded.
+ * 2. Avoid copy-on-write disaster. */
+ if (server.child_type == CHILD_TYPE_RDB) {
+ if (!use_diskless_load) {
+ serverLog(LL_NOTICE,
+ "Replica is about to load the RDB file received from the "
+ "master, but there is a pending RDB child running. "
+ "Killing process %ld and removing its temp file to avoid "
+ "any race",
+ (long) server.child_pid);
+ }
+ killRDBChild();
+ }
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Initialize empty tempDb dictionaries. */
@@ -2100,16 +2120,6 @@ void readSyncBulkPayload(connection *conn) {
connNonBlock(conn);
connRecvTimeout(conn,0);
} else {
- /* Ensure background save doesn't overwrite synced data */
- if (server.child_type == CHILD_TYPE_RDB) {
- serverLog(LL_NOTICE,
- "Replica is about to load the RDB file received from the "
- "master, but there is a pending RDB child running. "
- "Killing process %ld and removing its temp file to avoid "
- "any race",
- (long) server.child_pid);
- killRDBChild();
- }
/* Make sure the new file (also used for persistence) is fully synced
* (not covered by earlier calls to rdb_fsync_range). */
diff --git a/src/resp_parser.h b/src/resp_parser.h
index 4597efee3..0b5c8e22c 100644
--- a/src/resp_parser.h
+++ b/src/resp_parser.h
@@ -68,10 +68,10 @@ typedef struct ReplyParserCallbacks {
/* Called when the parser reaches a double (','), which is passed as an argument 'val' */
void (*double_callback)(void *ctx, double val, const char *proto, size_t proto_len);
- /* Called when the parser reaches a big number (','), which is passed as 'str' along with its length 'len' */
+ /* Called when the parser reaches a big number ('('), which is passed as 'str' along with its length 'len' */
void (*big_number_callback)(void *ctx, const char *str, size_t len, const char *proto, size_t proto_len);
- /* Called when the parser reaches a string, which is passed as 'str' along with its 'format' and length 'len' */
+ /* Called when the parser reaches a string ('='), which is passed as 'str' along with its 'format' and length 'len' */
void (*verbatim_string_callback)(void *ctx, const char *format, const char *str, size_t len, const char *proto, size_t proto_len);
/* Called when the parser reaches an attribute ('|'). The attribute length is passed as an argument 'len' */
diff --git a/src/script.c b/src/script.c
index 990248c45..8216b47f5 100644
--- a/src/script.c
+++ b/src/script.c
@@ -36,6 +36,7 @@ scriptFlag scripts_flags_def[] = {
{.flag = SCRIPT_FLAG_ALLOW_OOM, .str = "allow-oom"},
{.flag = SCRIPT_FLAG_ALLOW_STALE, .str = "allow-stale"},
{.flag = SCRIPT_FLAG_NO_CLUSTER, .str = "no-cluster"},
+ {.flag = SCRIPT_FLAG_ALLOW_CROSS_SLOT, .str = "allow-cross-slot-keys"},
{.flag = 0, .str = NULL}, /* flags array end */
};
@@ -114,6 +115,7 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
int running_stale = server.masterhost &&
server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0;
+ int obey_client = mustObeyClient(caller);
if (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE)) {
if ((script_flags & SCRIPT_FLAG_NO_CLUSTER) && server.cluster_enabled) {
@@ -139,16 +141,14 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
* 1. we are not a readonly replica
* 2. no disk error detected
* 3. command is not `fcall_ro`/`eval[sha]_ro` */
- if (server.masterhost && server.repl_slave_ro && caller->id != CLIENT_ID_AOF
- && !(caller->flags & CLIENT_MASTER))
- {
+ if (server.masterhost && server.repl_slave_ro && !obey_client) {
addReplyError(caller, "Can not run script with write flag on readonly replica");
return C_ERR;
}
/* Deny writes if we're unale to persist. */
int deny_write_type = writeCommandsDeniedByDiskError();
- if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL) {
+ if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReplyError(caller, "-MISCONF Redis is configured to save RDB snapshots, "
"but it's currently unable to persist to disk. "
@@ -219,6 +219,10 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
run_ctx->flags |= SCRIPT_ALLOW_OOM;
}
+ if ((script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE) || (script_flags & SCRIPT_FLAG_ALLOW_CROSS_SLOT)) {
+ run_ctx->flags |= SCRIPT_ALLOW_CROSS_SLOT;
+ }
+
/* set the curr_run_ctx so we can use it to kill the script if needed */
curr_run_ctx = run_ctx;
@@ -269,7 +273,7 @@ void scriptKill(client *c, int is_eval) {
addReplyError(c, "-NOTBUSY No scripts in execution right now.");
return;
}
- if (curr_run_ctx->original_client->flags & CLIENT_MASTER) {
+ if (mustObeyClient(curr_run_ctx->original_client)) {
addReplyError(c,
"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
}
@@ -334,8 +338,8 @@ static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) {
* of this script. */
int deny_write_type = writeCommandsDeniedByDiskError();
- if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->id != CLIENT_ID_AOF
- && !(run_ctx->original_client->flags & CLIENT_MASTER))
+ if (server.masterhost && server.repl_slave_ro &&
+ !mustObeyClient(run_ctx->original_client))
{
*err = sdsdup(shared.roslaveerr->ptr);
return C_ERR;
@@ -380,8 +384,7 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
- run_ctx->original_client->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
- !server.masterhost && /* Slave must execute the script. */
+ !mustObeyClient(run_ctx->original_client) && /* Don't care about mem for replicas or AOF. */
!(run_ctx->flags & SCRIPT_WRITE_DIRTY) && /* Script had no side effects so far. */
server.script_oom && /* Detected OOM when script start. */
(run_ctx->c->cmd->flags & CMD_DENYOOM))
@@ -393,8 +396,8 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
return C_OK;
}
-static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
- if (!server.cluster_enabled || original_c->id == CLIENT_ID_AOF || (original_c->flags & CLIENT_MASTER)) {
+static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *original_c, sds *err) {
+ if (!server.cluster_enabled || mustObeyClient(original_c)) {
return C_OK;
}
/* If this is a Redis Cluster node, we need to make sure the script is not
@@ -404,7 +407,8 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
/* Duplicate relevant flags in the script client. */
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
- if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, &error_code) != server.cluster->myself) {
+ int hashslot = -1;
+ if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != server.cluster->myself) {
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
*err = sdsnew(
"Script attempted to execute a write command while the "
@@ -418,6 +422,19 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
}
return C_ERR;
}
+
+ /* If the script declared keys in advanced, the cross slot error would have
+ * already been thrown. This is only checking for cross slot keys being accessed
+ * that weren't pre-declared. */
+ if (hashslot != -1 && !(run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) {
+ if (original_c->slot == -1) {
+ original_c->slot = hashslot;
+ } else if (original_c->slot != hashslot) {
+ *err = sdsnew("Script attempted to access keys that do not hash to "
+ "the same slot");
+ return C_ERR;
+ }
+ }
return C_OK;
}
@@ -522,7 +539,7 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) {
run_ctx->flags |= SCRIPT_WRITE_DIRTY;
}
- if (scriptVerifyClusterState(c, run_ctx->original_client, err) != C_OK) {
+ if (scriptVerifyClusterState(run_ctx, c, run_ctx->original_client, err) != C_OK) {
goto error;
}
diff --git a/src/script.h b/src/script.h
index 9785af095..d855c80e2 100644
--- a/src/script.h
+++ b/src/script.h
@@ -64,6 +64,7 @@
#define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */
#define SCRIPT_ALLOW_OOM (1ULL<<6) /* indicate to allow any command even if OOM reached */
#define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */
+#define SCRIPT_ALLOW_CROSS_SLOT (1ULL<<8) /* Indicate that the current script may access keys from multiple slots */
typedef struct scriptRunCtx scriptRunCtx;
struct scriptRunCtx {
@@ -82,6 +83,7 @@ struct scriptRunCtx {
#define SCRIPT_FLAG_ALLOW_STALE (1ULL<<2)
#define SCRIPT_FLAG_NO_CLUSTER (1ULL<<3)
#define SCRIPT_FLAG_EVAL_COMPAT_MODE (1ULL<<4) /* EVAL Script backwards compatible behavior, no shebang provided */
+#define SCRIPT_FLAG_ALLOW_CROSS_SLOT (1ULL<<5)
/* Defines a script flags */
typedef struct scriptFlag {
diff --git a/src/script_lua.c b/src/script_lua.c
index 9a08a7e47..36868ec2b 100644
--- a/src/script_lua.c
+++ b/src/script_lua.c
@@ -41,6 +41,97 @@
#include <ctype.h>
#include <math.h>
+/* Globals that are added by the Lua libraries */
+static char *libraries_allow_list[] = {
+ "string",
+ "cjson",
+ "bit",
+ "cmsgpack",
+ "math",
+ "table",
+ "struct",
+ NULL,
+};
+
+/* Redis Lua API globals */
+static char *redis_api_allow_list[] = {
+ "redis",
+ "__redis__err__handler", /* error handler for eval, currently located on globals.
+ Should move to registry. */
+ NULL,
+};
+
+/* Lua builtins */
+static char *lua_builtins_allow_list[] = {
+ "xpcall",
+ "tostring",
+ "getfenv",
+ "setmetatable",
+ "next",
+ "assert",
+ "tonumber",
+ "rawequal",
+ "collectgarbage",
+ "getmetatable",
+ "rawset",
+ "pcall",
+ "coroutine",
+ "type",
+ "_G",
+ "select",
+ "unpack",
+ "gcinfo",
+ "pairs",
+ "rawget",
+ "loadstring",
+ "ipairs",
+ "_VERSION",
+ "setfenv",
+ "load",
+ "error",
+ NULL,
+};
+
+/* Lua builtins which are not documented on the Lua documentation */
+static char *lua_builtins_not_documented_allow_list[] = {
+ "newproxy",
+ NULL,
+};
+
+/* Lua builtins which are allowed on initialization but will be removed right after */
+static char *lua_builtins_removed_after_initialization_allow_list[] = {
+ "debug", /* debug will be set to nil after the error handler will be created */
+ NULL,
+};
+
+/* Those allow lists was created from the globals that was
+ * available to the user when the allow lists was first introduce.
+ * Because we do not want to break backward compatibility we keep
+ * all the globals. The allow lists will prevent us from accidentally
+ * creating unwanted globals in the future.
+ *
+ * Also notice that the allow list is only checked on start time,
+ * after that the global table is locked so not need to check anything.*/
+static char **allow_lists[] = {
+ libraries_allow_list,
+ redis_api_allow_list,
+ lua_builtins_allow_list,
+ lua_builtins_not_documented_allow_list,
+ lua_builtins_removed_after_initialization_allow_list,
+ NULL,
+};
+
+/* Deny list contains elements which we know we do not want to add to globals
+ * and there is no need to print a warning message form them. We will print a
+ * log message only if an element was added to the globals and the element is
+ * not on the allow list nor on the back list. */
+static char *deny_list[] = {
+ "dofile",
+ "loadfile",
+ "print",
+ NULL,
+};
+
static int redis_math_random (lua_State *L);
static int redis_math_randomseed (lua_State *L);
static void redisProtocolToLuaType_Int(void *ctx, long long val, const char *proto, size_t proto_len);
@@ -1113,15 +1204,6 @@ static void luaLoadLibraries(lua_State *lua) {
#endif
}
-/* Remove a functions that we don't want to expose to the Redis scripting
- * environment. */
-static void luaRemoveUnsupportedFunctions(lua_State *lua) {
- lua_pushnil(lua);
- lua_setglobal(lua,"loadfile");
- lua_pushnil(lua);
- lua_setglobal(lua,"dofile");
-}
-
/* Return sds of the string value located on stack at the given index.
* Return NULL if the value is not a string. */
sds luaGetStringSds(lua_State *lua, int index) {
@@ -1135,107 +1217,120 @@ sds luaGetStringSds(lua_State *lua, int index) {
return str_sds;
}
-/* This function installs metamethods in the global table _G that prevent
- * the creation of globals accidentally.
- *
- * It should be the last to be called in the scripting engine initialization
- * sequence, because it may interact with creation of globals.
- *
- * On Legacy Lua (eval) we need to check 'w ~= \"main\"' otherwise we will not be able
- * to create the global 'function <sha> ()' variable. On Functions Lua engine we do not use
- * this trick so it's not needed. */
-void luaEnableGlobalsProtection(lua_State *lua, int is_eval) {
- char *s[32];
- sds code = sdsempty();
- int j = 0;
-
- /* strict.lua from: http://metalua.luaforge.net/src/lib/strict.lua.html.
- * Modified to be adapted to Redis. */
- s[j++]="local dbg=debug\n";
- s[j++]="local mt = {}\n";
- s[j++]="setmetatable(_G, mt)\n";
- s[j++]="mt.__newindex = function (t, n, v)\n";
- s[j++]=" if dbg.getinfo(2) then\n";
- s[j++]=" local w = dbg.getinfo(2, \"S\").what\n";
- s[j++]= is_eval ? " if w ~= \"main\" and w ~= \"C\" then\n" : " if w ~= \"C\" then\n";
- s[j++]=" error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n";
- s[j++]=" end\n";
- s[j++]=" end\n";
- s[j++]=" rawset(t, n, v)\n";
- s[j++]="end\n";
- s[j++]="mt.__index = function (t, n)\n";
- s[j++]=" if dbg.getinfo(2) and dbg.getinfo(2, \"S\").what ~= \"C\" then\n";
- s[j++]=" error(\"Script attempted to access nonexistent global variable '\"..tostring(n)..\"'\", 2)\n";
- s[j++]=" end\n";
- s[j++]=" return rawget(t, n)\n";
- s[j++]="end\n";
- s[j++]="debug = nil\n";
- s[j++]=NULL;
-
- for (j = 0; s[j] != NULL; j++) code = sdscatlen(code,s[j],strlen(s[j]));
- luaL_loadbuffer(lua,code,sdslen(code),"@enable_strict_lua");
- lua_pcall(lua,0,0,0);
- sdsfree(code);
+static int luaProtectedTableError(lua_State *lua) {
+ int argc = lua_gettop(lua);
+ if (argc != 2) {
+ serverLog(LL_WARNING, "malicious code trying to call luaProtectedTableError with wrong arguments");
+ luaL_error(lua, "Wrong number of arguments to luaProtectedTableError");
+ }
+ if (!lua_isstring(lua, -1) && !lua_isnumber(lua, -1)) {
+ luaL_error(lua, "Second argument to luaProtectedTableError must be a string or number");
+ }
+ const char *variable_name = lua_tostring(lua, -1);
+ luaL_error(lua, "Script attempted to access nonexistent global variable '%s'", variable_name);
+ return 0;
}
-/* Create a global protection function and put it to registry.
- * This need to be called once in the lua_State lifetime.
- * After called it is possible to use luaSetGlobalProtection
- * to set global protection on a give table.
+/* Set a special metatable on the table on the top of the stack.
+ * The metatable will raise an error if the user tries to fetch
+ * an un-existing value.
*
* The function assumes the Lua stack have a least enough
* space to push 2 element, its up to the caller to verify
- * this before calling this function.
- *
- * Notice, the difference between this and luaEnableGlobalsProtection
- * is that luaEnableGlobalsProtection is enabling global protection
- * on the current Lua globals. This registering a global protection
- * function that later can be applied on any table. */
-void luaRegisterGlobalProtectionFunction(lua_State *lua) {
- lua_pushstring(lua, REGISTRY_SET_GLOBALS_PROTECTION_NAME);
- char *global_protection_func = "local dbg = debug\n"
- "local globals_protection = function (t)\n"
- " local mt = {}\n"
- " setmetatable(t, mt)\n"
- " mt.__newindex = function (t, n, v)\n"
- " if dbg.getinfo(2) then\n"
- " local w = dbg.getinfo(2, \"S\").what\n"
- " if w ~= \"C\" then\n"
- " error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n"
- " end"
- " end"
- " rawset(t, n, v)\n"
- " end\n"
- " mt.__index = function (t, n)\n"
- " if dbg.getinfo(2) and dbg.getinfo(2, \"S\").what ~= \"C\" then\n"
- " error(\"Script attempted to access nonexistent global variable '\"..tostring(n)..\"'\", 2)\n"
- " end\n"
- " return rawget(t, n)\n"
- " end\n"
- "end\n"
- "return globals_protection";
- int res = luaL_loadbuffer(lua, global_protection_func, strlen(global_protection_func), "@global_protection_def");
- serverAssert(res == 0);
- res = lua_pcall(lua,0,1,0);
- serverAssert(res == 0);
- lua_settable(lua, LUA_REGISTRYINDEX);
+ * this before calling this function. */
+void luaSetErrorMetatable(lua_State *lua) {
+ lua_newtable(lua); /* push metatable */
+ lua_pushcfunction(lua, luaProtectedTableError); /* push get error handler */
+ lua_setfield(lua, -2, "__index");
+ lua_setmetatable(lua, -2);
}
-/* Set global protection on a given table.
- * The table need to be located on the top of the lua stack.
- * After called, it will no longer be possible to set
- * new items on the table. The function is not removing
- * the table from the top of the stack!
+static int luaNewIndexAllowList(lua_State *lua) {
+ int argc = lua_gettop(lua);
+ if (argc != 3) {
+ serverLog(LL_WARNING, "malicious code trying to call luaProtectedTableError with wrong arguments");
+ luaL_error(lua, "Wrong number of arguments to luaNewIndexAllowList");
+ }
+ if (!lua_istable(lua, -3)) {
+ luaL_error(lua, "first argument to luaNewIndexAllowList must be a table");
+ }
+ if (!lua_isstring(lua, -2) && !lua_isnumber(lua, -2)) {
+ luaL_error(lua, "Second argument to luaNewIndexAllowList must be a string or number");
+ }
+ const char *variable_name = lua_tostring(lua, -2);
+ /* check if the key is in our allow list */
+
+ char ***allow_l = allow_lists;
+ for (; *allow_l ; ++allow_l){
+ char **c = *allow_l;
+ for (; *c ; ++c) {
+ if (strcmp(*c, variable_name) == 0) {
+ break;
+ }
+ }
+ if (*c) {
+ break;
+ }
+ }
+ if (!*allow_l) {
+ /* Search the value on the back list, if its there we know that it was removed
+ * on purpose and there is no need to print a warning. */
+ char **c = deny_list;
+ for ( ; *c ; ++c) {
+ if (strcmp(*c, variable_name) == 0) {
+ break;
+ }
+ }
+ if (!*c) {
+ serverLog(LL_WARNING, "A key '%s' was added to Lua globals which is not on the globals allow list nor listed on the deny list.", variable_name);
+ }
+ } else {
+ lua_rawset(lua, -3);
+ }
+ return 0;
+}
+
+/* Set a metatable with '__newindex' function that verify that
+ * the new index appears on our globals while list.
*
- * The function assumes the Lua stack have a least enough
- * space to push 2 element, its up to the caller to verify
- * this before calling this function. */
-void luaSetGlobalProtection(lua_State *lua) {
- lua_pushstring(lua, REGISTRY_SET_GLOBALS_PROTECTION_NAME);
- lua_gettable(lua, LUA_REGISTRYINDEX);
- lua_pushvalue(lua, -2);
- int res = lua_pcall(lua, 1, 0, 0);
- serverAssert(res == 0);
+ * The metatable is set on the table which located on the top
+ * of the stack.
+ */
+void luaSetAllowListProtection(lua_State *lua) {
+ lua_newtable(lua); /* push metatable */
+ lua_pushcfunction(lua, luaNewIndexAllowList); /* push get error handler */
+ lua_setfield(lua, -2, "__newindex");
+ lua_setmetatable(lua, -2);
+}
+
+/* Set the readonly flag on the table located on the top of the stack
+ * and recursively call this function on each table located on the original
+ * table. Also, recursively call this function on the metatables.*/
+void luaSetTableProtectionRecursively(lua_State *lua) {
+ /* This protect us from a loop in case we already visited the table
+ * For example, globals has '_G' key which is pointing back to globals. */
+ if (lua_isreadonlytable(lua, -1)) {
+ return;
+ }
+
+ /* protect the current table */
+ lua_enablereadonlytable(lua, -1, 1);
+
+ lua_checkstack(lua, 2);
+ lua_pushnil(lua); /* Use nil to start iteration. */
+ while (lua_next(lua,-2)) {
+ /* Stack now: table, key, value */
+ if (lua_istable(lua, -1)) {
+ luaSetTableProtectionRecursively(lua);
+ }
+ lua_pop(lua, 1);
+ }
+
+ /* protect the metatable if exists */
+ if (lua_getmetatable(lua, -1)) {
+ luaSetTableProtectionRecursively(lua);
+ lua_pop(lua, 1); /* pop the metatable */
+ }
}
void luaRegisterVersion(lua_State* lua) {
@@ -1272,8 +1367,11 @@ void luaRegisterLogFunction(lua_State* lua) {
}
void luaRegisterRedisAPI(lua_State* lua) {
+ lua_pushvalue(lua, LUA_GLOBALSINDEX);
+ luaSetAllowListProtection(lua);
+ lua_pop(lua, 1);
+
luaLoadLibraries(lua);
- luaRemoveUnsupportedFunctions(lua);
lua_pushcfunction(lua,luaRedisPcall);
lua_setglobal(lua, "pcall");
@@ -1504,9 +1602,19 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t
* EVAL received. */
luaCreateArray(lua,keys,nkeys);
/* On eval, keys and arguments are globals. */
- if (run_ctx->flags & SCRIPT_EVAL_MODE) lua_setglobal(lua,"KEYS");
+ if (run_ctx->flags & SCRIPT_EVAL_MODE){
+ /* open global protection to set KEYS */
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 0);
+ lua_setglobal(lua,"KEYS");
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1);
+ }
luaCreateArray(lua,args,nargs);
- if (run_ctx->flags & SCRIPT_EVAL_MODE) lua_setglobal(lua,"ARGV");
+ if (run_ctx->flags & SCRIPT_EVAL_MODE){
+ /* open global protection to set ARGV */
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 0);
+ lua_setglobal(lua,"ARGV");
+ lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1);
+ }
/* At this point whether this script was never seen before or if it was
* already defined, we can call it.
diff --git a/src/script_lua.h b/src/script_lua.h
index 5a4533784..4c2b34804 100644
--- a/src/script_lua.h
+++ b/src/script_lua.h
@@ -67,9 +67,10 @@ typedef struct errorInfo {
void luaRegisterRedisAPI(lua_State* lua);
sds luaGetStringSds(lua_State *lua, int index);
-void luaEnableGlobalsProtection(lua_State *lua, int is_eval);
void luaRegisterGlobalProtectionFunction(lua_State *lua);
-void luaSetGlobalProtection(lua_State *lua);
+void luaSetErrorMetatable(lua_State *lua);
+void luaSetAllowListProtection(lua_State *lua);
+void luaSetTableProtectionRecursively(lua_State *lua);
void luaRegisterLogFunction(lua_State* lua);
void luaRegisterVersion(lua_State* lua);
void luaPushErrorBuff(lua_State *lua, sds err_buff);
diff --git a/src/sentinel.c b/src/sentinel.c
index 3ad8f902b..9ea78aae5 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -705,7 +705,7 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
- pubsubPublishMessage(channel,payload);
+ pubsubPublishMessage(channel,payload,0);
decrRefCount(channel);
decrRefCount(payload);
}
diff --git a/src/server.c b/src/server.c
index 84f21fed3..298834eab 100644
--- a/src/server.c
+++ b/src/server.c
@@ -36,7 +36,6 @@
#include "atomicvar.h"
#include "mt19937-64.h"
#include "functions.h"
-#include "hdr_alloc.h"
#include <time.h>
#include <signal.h>
@@ -1016,18 +1015,8 @@ void databasesCron(void) {
}
}
-/* We take a cached value of the unix time in the global state because with
- * virtual memory and aging there is to store the current time in objects at
- * every object access, and accuracy is not needed. To access a global var is
- * a lot faster than calling time(NULL).
- *
- * This function should be fast because it is called at every command execution
- * in call(), so it is possible to decide if to update the daylight saving
- * info or not using the 'update_daylight_info' argument. Normally we update
- * such info only when calling this function from serverCron() but not when
- * calling it from call(). */
-void updateCachedTime(int update_daylight_info) {
- server.ustime = ustime();
+static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) {
+ server.ustime = ustime;
server.mstime = server.ustime / 1000;
time_t unixtime = server.mstime / 1000;
atomicSet(server.unixtime, unixtime);
@@ -1045,6 +1034,21 @@ void updateCachedTime(int update_daylight_info) {
}
}
+/* We take a cached value of the unix time in the global state because with
+ * virtual memory and aging there is to store the current time in objects at
+ * every object access, and accuracy is not needed. To access a global var is
+ * a lot faster than calling time(NULL).
+ *
+ * This function should be fast because it is called at every command execution
+ * in call(), so it is possible to decide if to update the daylight saving
+ * info or not using the 'update_daylight_info' argument. Normally we update
+ * such info only when calling this function from serverCron() but not when
+ * calling it from call(). */
+void updateCachedTime(int update_daylight_info) {
+ const long long us = ustime();
+ updateCachedTimeWithUs(update_daylight_info, us);
+}
+
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
@@ -1209,10 +1213,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
cronUpdateMemoryStats();
- /* We received a SIGTERM, shutting down here in a safe way, as it is
+ /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap && !isShutdownInitiated()) {
- if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
+ int shutdownFlags = SHUTDOWN_NOFLAGS;
+ if (server.last_sig_received == SIGINT && server.shutdown_on_sigint)
+ shutdownFlags = server.shutdown_on_sigint;
+ else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm)
+ shutdownFlags = server.shutdown_on_sigterm;
+
+ if (prepareForShutdown(shutdownFlags) == C_OK) exit(0);
} else if (isShutdownInitiated()) {
if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
if (finishShutdown() == C_OK) exit(0);
@@ -1296,13 +1306,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
- server.aof_current_size > server.aof_rewrite_min_size &&
- !aofRewriteLimited())
+ server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
- if (growth >= server.aof_rewrite_perc) {
+ if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
@@ -1326,8 +1335,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
- if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
- flushAppendOnlyFile(0);
+ if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
+ server.aof_last_write_status == C_ERR)
+ {
+ flushAppendOnlyFile(0);
+ }
}
/* Clear the paused clients state if needed. */
@@ -1466,6 +1478,7 @@ void whileBlockedCron() {
if (prepareForShutdown(SHUTDOWN_NOSAVE) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
+ server.last_sig_received = 0;
}
}
@@ -1509,6 +1522,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
+ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
+ flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
@@ -1584,15 +1599,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
- /* Write the AOF buffer on disk */
+ /* Try to process blocked clients every once in while.
+ *
+ * Example: A module calls RM_SignalKeyAsReady from within a timer callback
+ * (So we don't visit processCommand() at all).
+ *
+ * must be done before flushAppendOnlyFile, in case of appendfsync=always,
+ * since the unblocked clients may write data. */
+ handleClientsBlockedOnKeys();
+
+ /* Write the AOF buffer on disk,
+ * must be done before handleClientsWithPendingWritesUsingThreads,
+ * in case of appendfsync=always. */
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
- /* Try to process blocked clients every once in while. Example: A module
- * calls RM_SignalKeyAsReady from within a timer callback (So we don't
- * visit processCommand() at all). */
- handleClientsBlockedOnKeys();
-
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
@@ -1878,15 +1899,6 @@ void initServerConfig(void) {
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
- /* Specify the allocation function for the hdr histogram */
- hdrAllocFuncs hdrallocfn = {
- .mallocFn = zmalloc,
- .callocFn = zcalloc_num,
- .reallocFn = zrealloc,
- .freeFn = zfree,
- };
- hdrSetAllocators(&hdrallocfn);
-
/* Replication related */
server.masterhost = NULL;
server.masterport = 6379;
@@ -2286,6 +2298,7 @@ int listenToPort(int port, socketFds *sfd) {
closeSocketListeners(sfd);
return C_ERR;
}
+ if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
anetNonBlock(NULL,sfd->fd[sfd->count]);
anetCloexec(sfd->fd[sfd->count]);
sfd->count++;
@@ -2338,6 +2351,7 @@ void resetServerStats(void) {
}
server.stat_aof_rewrites = 0;
server.stat_rdb_saves = 0;
+ server.stat_aofrw_consecutive_failures = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
@@ -2539,6 +2553,7 @@ void initServer(void) {
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
+ server.last_sig_received = 0;
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
@@ -2978,6 +2993,11 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) {
return cmd;
}
+/* Commands arriving from the master client or AOF client, should never be rejected. */
+int mustObeyClient(client *c) {
+ return c->id == CLIENT_ID_AOF || c->flags & CLIENT_MASTER;
+}
+
static int shouldPropagate(int target) {
if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading)
return 0;
@@ -3205,7 +3225,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) {
*/
void call(client *c, int flags) {
long long dirty;
- monotime call_timer;
uint64_t client_old_flags = c->flags;
struct redisCommand *real_cmd = c->realcmd;
@@ -3230,22 +3249,34 @@ void call(client *c, int flags) {
dirty = server.dirty;
incrCommandStatsOnError(NULL, 0);
+ const long long call_timer = ustime();
+
/* Update cache time, in case we have nested calls we want to
* update only on the first call*/
if (server.fixed_time_expire++ == 0) {
- updateCachedTime(0);
+ updateCachedTimeWithUs(0,call_timer);
}
- server.in_nested_call++;
- elapsedStart(&call_timer);
+ monotime monotonic_start = 0;
+ if (monotonicGetType() == MONOTONIC_CLOCK_HW)
+ monotonic_start = getMonotonicUs();
+
+ server.in_nested_call++;
c->cmd->proc(c);
- const long duration = elapsedUs(call_timer);
+ server.in_nested_call--;
+
+ /* In order to avoid performance implication due to querying the clock using a system call 3 times,
+ * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */
+ ustime_t duration;
+ if (monotonicGetType() == MONOTONIC_CLOCK_HW)
+ duration = getMonotonicUs() - monotonic_start;
+ else
+ duration = ustime() - call_timer;
+
c->duration = duration;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
- server.in_nested_call--;
-
/* Update failed command calls if required. */
if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) {
@@ -3410,6 +3441,8 @@ void rejectCommand(client *c, robj *reply) {
}
void rejectCommandSds(client *c, sds s) {
+ flagTransaction(c);
+ if (c->cmd) c->cmd->rejected_calls++;
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, s);
sdsfree(s);
@@ -3420,8 +3453,6 @@ void rejectCommandSds(client *c, sds s) {
}
void rejectCommandFormat(client *c, const char *fmt, ...) {
- if (c->cmd) c->cmd->rejected_calls++;
- flagTransaction(c);
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
@@ -3475,6 +3506,54 @@ void populateCommandMovableKeys(struct redisCommand *cmd) {
cmd->flags |= CMD_MOVABLE_KEYS;
}
+/* Check if c->cmd exists, fills `err` with details in case it doesn't.
+ * Return 1 if exists. */
+int commandCheckExistence(client *c, sds *err) {
+ if (c->cmd)
+ return 1;
+ if (!err)
+ return 0;
+ if (isContainerCommandBySds(c->argv[0]->ptr)) {
+ /* If we can't find the command but argv[0] by itself is a command
+ * it means we're dealing with an invalid subcommand. Print Help. */
+ sds cmd = sdsnew((char *)c->argv[0]->ptr);
+ sdstoupper(cmd);
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "unknown subcommand '%.128s'. Try %s HELP.",
+ (char *)c->argv[1]->ptr, cmd);
+ sdsfree(cmd);
+ } else {
+ sds args = sdsempty();
+ int i;
+ for (i=1; i < c->argc && sdslen(args) < 128; i++)
+ args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "unknown command '%.128s', with args beginning with: %s",
+ (char*)c->argv[0]->ptr, args);
+ sdsfree(args);
+ }
+ /* Make sure there are no newlines in the string, otherwise invalid protocol
+ * is emitted (The args come from the user, they may contain any character). */
+ sdsmapchars(*err, "\r\n", " ", 2);
+ return 0;
+}
+
+/* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't.
+ * Return 1 if valid. */
+int commandCheckArity(client *c, sds *err) {
+ if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
+ (c->argc < -c->cmd->arity))
+ {
+ if (err) {
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname);
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
@@ -3514,29 +3593,13 @@ int processCommand(client *c) {
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
- if (!c->cmd) {
- if (isContainerCommandBySds(c->argv[0]->ptr)) {
- /* If we can't find the command but argv[0] by itself is a command
- * it means we're dealing with an invalid subcommand. Print Help. */
- sds cmd = sdsnew((char *)c->argv[0]->ptr);
- sdstoupper(cmd);
- rejectCommandFormat(c, "Unknown subcommand '%.128s'. Try %s HELP.",
- (char *)c->argv[1]->ptr, cmd);
- sdsfree(cmd);
- return C_OK;
- }
- sds args = sdsempty();
- int i;
- for (i=1; i < c->argc && sdslen(args) < 128; i++)
- args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
- rejectCommandFormat(c,"unknown command '%s', with args beginning with: %s",
- (char*)c->argv[0]->ptr, args);
- sdsfree(args);
+ sds err;
+ if (!commandCheckExistence(c, &err)) {
+ rejectCommandSds(c, err);
return C_OK;
- } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
- (c->argc < -c->cmd->arity))
- {
- rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->fullname);
+ }
+ if (!commandCheckArity(c, &err)) {
+ rejectCommandSds(c, err);
return C_OK;
}
@@ -3569,6 +3632,7 @@ int processCommand(client *c) {
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
int is_deny_async_loading_command = (c->cmd->flags & CMD_NO_ASYNC_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING));
+ int obey_client = mustObeyClient(c);
if (authRequired(c)) {
/* AUTH and HELLO and no auth commands are valid even in
@@ -3620,23 +3684,20 @@ int processCommand(client *c) {
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
- !(c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_SCRIPT &&
- server.script_caller->flags & CLIENT_MASTER) &&
+ !mustObeyClient(c) &&
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
c->cmd->proc != execCommand))
{
- int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
- &hashslot,&error_code);
+ &c->slot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
- clusterRedirectClient(c,n,hashslot,error_code);
+ clusterRedirectClient(c,n,c->slot,error_code);
c->cmd->rejected_calls++;
return C_OK;
}
@@ -3706,15 +3767,29 @@ int processCommand(client *c) {
if (server.tracking_clients) trackingLimitUsedSlots();
/* Don't accept write commands if there are problems persisting on disk
- * and if this is a master instance. */
+ * unless coming from our master, in which case check the replica ignore
+ * disk write error config to either log or crash. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
- server.masterhost == NULL &&
- (is_write_command ||c->cmd->proc == pingCommand))
+ (is_write_command || c->cmd->proc == pingCommand))
{
- sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
- rejectCommandSds(c, err);
- return C_OK;
+ if (obey_client) {
+ if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) {
+ serverPanic("Replica was unable to write command to disk.");
+ } else {
+ static mstime_t last_log_time_ms = 0;
+ const mstime_t log_interval_ms = 10000;
+ if (server.mstime > last_log_time_ms + log_interval_ms) {
+ last_log_time_ms = server.mstime;
+ serverLog(LL_WARNING, "Replica is applying a command even though "
+ "it is unable to write to disk.");
+ }
+ }
+ } else {
+ sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
+ rejectCommandSds(c, err);
+ return C_OK;
+ }
}
/* Don't accept write commands if there are not enough good slaves and
@@ -3727,7 +3802,7 @@ int processCommand(client *c) {
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
- !(c->flags & CLIENT_MASTER) &&
+ !obey_client &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
@@ -3949,6 +4024,7 @@ static void cancelShutdown(void) {
server.shutdown_asap = 0;
server.shutdown_flags = 0;
server.shutdown_mstime = 0;
+ server.last_sig_received = 0;
replyToClientsBlockedOnShutdown();
unpauseClients(PAUSE_DURING_SHUTDOWN);
}
@@ -4309,6 +4385,7 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg
if (args[j].token) maplen++;
if (args[j].summary) maplen++;
if (args[j].since) maplen++;
+ if (args[j].deprecated_since) maplen++;
if (args[j].flags) maplen++;
if (args[j].type == ARG_TYPE_ONEOF || args[j].type == ARG_TYPE_BLOCK)
maplen++;
@@ -4336,6 +4413,10 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg
addReplyBulkCString(c, "since");
addReplyBulkCString(c, args[j].since);
}
+ if (args[j].deprecated_since) {
+ addReplyBulkCString(c, "deprecated_since");
+ addReplyBulkCString(c, args[j].deprecated_since);
+ }
if (args[j].flags) {
addReplyBulkCString(c, "flags");
addReplyFlagsForArg(c, args[j].flags);
@@ -4562,6 +4643,7 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) {
long maplen = 1;
if (cmd->summary) maplen++;
if (cmd->since) maplen++;
+ if (cmd->flags & CMD_MODULE) maplen++;
if (cmd->complexity) maplen++;
if (cmd->doc_flags) maplen++;
if (cmd->deprecated_since) maplen++;
@@ -4588,6 +4670,10 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) {
addReplyBulkCString(c, "complexity");
addReplyBulkCString(c, cmd->complexity);
}
+ if (cmd->flags & CMD_MODULE) {
+ addReplyBulkCString(c, "module");
+ addReplyBulkCString(c, moduleNameFromCommand(cmd));
+ }
if (cmd->doc_flags) {
addReplyBulkCString(c, "doc_flags");
addReplyDocFlagsForCommand(c, cmd);
@@ -5123,6 +5209,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"redis_mode:%s\r\n"
"os:%s %s %s\r\n"
"arch_bits:%i\r\n"
+ "monotonic_clock:%s\r\n"
"multiplexing_api:%s\r\n"
"atomicvar_api:%s\r\n"
"gcc_version:%i.%i.%i\r\n"
@@ -5146,6 +5233,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
mode,
name.sysname, name.release, name.machine,
server.arch_bits,
+ monotonicInfoString(),
aeGetApiName(),
REDIS_ATOMIC_API,
#ifdef __GNUC__
@@ -5383,6 +5471,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"aof_current_rewrite_time_sec:%jd\r\n"
"aof_last_bgrewrite_status:%s\r\n"
"aof_rewrites:%lld\r\n"
+ "aof_rewrites_consecutive_failures:%lld\r\n"
"aof_last_write_status:%s\r\n"
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
@@ -5414,6 +5503,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
server.stat_aof_rewrites,
+ server.stat_aofrw_consecutive_failures,
(server.aof_last_write_status == C_OK &&
aof_bio_fsync_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes,
@@ -6227,6 +6317,7 @@ static void sigShutdownHandler(int sig) {
serverLogFromHandler(LL_WARNING, msg);
server.shutdown_asap = 1;
+ server.last_sig_received = sig;
}
void setupSignalHandlers(void) {
diff --git a/src/server.h b/src/server.h
index edfea2226..61cff37a5 100644
--- a/src/server.h
+++ b/src/server.h
@@ -603,6 +603,7 @@ typedef enum {
#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */
#define NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */
#define NOTIFY_MODULE (1<<13) /* d, module key space notification */
+#define NOTIFY_NEW (1<<14) /* n, new key notification */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */
/* Using the following macro you can run code inside serverCron() with the
@@ -1061,7 +1062,7 @@ typedef struct replBacklog {
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t unindexed_count; /* The count from last creating index block. */
- rax *blocks_index; /* The index of reocrded blocks of replication
+ rax *blocks_index; /* The index of recorded blocks of replication
* buffer for quickly searching replication
* offset on partial resynchronization. */
long long histlen; /* Backlog actual data length */
@@ -1106,6 +1107,7 @@ typedef struct client {
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
+ int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
@@ -1323,6 +1325,15 @@ struct redisMemOverhead {
} *db;
};
+/* Replication error behavior determines the replica behavior
+ * when it receives an error over the replication stream. In
+ * either case the error is logged. */
+typedef enum {
+ PROPAGATION_ERR_BEHAVIOR_IGNORE = 0,
+ PROPAGATION_ERR_BEHAVIOR_PANIC,
+ PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS
+} replicationErrorBehavior;
+
/* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading
* metadata to the RDB file.
@@ -1451,6 +1462,7 @@ struct redisServer {
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */
mstime_t shutdown_mstime; /* Timestamp to limit graceful shutdown. */
+ int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@@ -1493,6 +1505,7 @@ struct redisServer {
socketFds ipfd; /* TCP socket file descriptors */
socketFds tlsfd; /* TLS socket file descriptors */
int sofd; /* Unix socket file descriptor */
+ uint32_t socket_mark_id; /* ID for listen socket marking */
socketFds cfd; /* Cluster bus listening socket */
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
@@ -1555,6 +1568,7 @@ struct redisServer {
monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */
size_t stat_peak_memory; /* Max used memory record */
long long stat_aof_rewrites; /* number of aof file rewrites performed */
+ long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */
long long stat_rdb_saves; /* number of rdb saves performed */
long long stat_fork_time; /* Time needed to perform latest fork() */
double stat_fork_rate; /* Fork rate in GB/sec. */
@@ -1717,6 +1731,8 @@ struct redisServer {
* abort(). useful for Valgrind. */
/* Shutdown */
int shutdown_timeout; /* Graceful shutdown time limit in seconds. */
+ int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */
+ int shutdown_on_sigterm; /* Shutdown flags configured for SIGTERM. */
/* Replication (master) */
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
@@ -1769,6 +1785,10 @@ struct redisServer {
int replica_announced; /* If true, replica is announced by Sentinel */
int slave_announce_port; /* Give the master this listening port. */
char *slave_announce_ip; /* Give the master this ip address. */
+ int propagation_error_behavior; /* Configures the behavior of the replica
+ * when it receives an error on the replication stream */
+ int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
+ * persist writes to AOF. */
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
@@ -2042,6 +2062,7 @@ typedef struct redisCommandArg {
const char *summary;
const char *since;
int flags;
+ const char *deprecated_since;
struct redisCommandArg *subargs;
/* runtime populated data */
int num_args;
@@ -2349,6 +2370,7 @@ int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int ar
moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
const char *moduleTypeModuleName(moduleType *mt);
+const char *moduleNameFromCommand(struct redisCommand *cmd);
void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
@@ -2509,7 +2531,7 @@ void unprotectClient(client *c);
void initThreadedIO(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
-void clientInstallWriteHandler(client *c);
+void putClientInPendingWriteQueue(client *c);
#ifdef __GNUC__
void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...)
@@ -2860,6 +2882,8 @@ struct redisCommand *lookupCommandBySds(sds s);
struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s);
struct redisCommand *lookupCommandByCString(const char *s);
struct redisCommand *lookupCommandOrOriginal(robj **argv, int argc);
+int commandCheckExistence(client *c, sds *err);
+int commandCheckArity(client *c, sds *err);
void startCommandExecution();
int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
@@ -2877,7 +2901,7 @@ int prepareForShutdown(int flags);
void replyToClientsBlockedOnShutdown(void);
int abortShutdown(void);
void afterCommand(client *c);
-int inNestedCall(void);
+int mustObeyClient(client *c);
#ifdef __GNUC__
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
@@ -2962,8 +2986,8 @@ int pubsubUnsubscribeAllChannels(client *c, int notify);
int pubsubUnsubscribeShardAllChannels(client *c, int notify);
void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
-int pubsubPublishMessage(robj *channel, robj *message);
-int pubsubPublishMessageShard(robj *channel, robj *message);
+int pubsubPublishMessage(robj *channel, robj *message, int sharded);
+int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
int serverPubsubSubscriptionCount();
int serverPubsubShardSubscriptionCount();
diff --git a/src/t_hash.c b/src/t_hash.c
index 92f5cb2b0..7aec270aa 100644
--- a/src/t_hash.c
+++ b/src/t_hash.c
@@ -146,39 +146,24 @@ robj *hashTypeGetValueObject(robj *o, sds field) {
* exist. */
size_t hashTypeGetValueLength(robj *o, sds field) {
size_t len = 0;
- if (o->encoding == OBJ_ENCODING_LISTPACK) {
- unsigned char *vstr = NULL;
- unsigned int vlen = UINT_MAX;
- long long vll = LLONG_MAX;
+ unsigned char *vstr = NULL;
+ unsigned int vlen = UINT_MAX;
+ long long vll = LLONG_MAX;
- if (hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll) == 0)
- len = vstr ? vlen : sdigits10(vll);
- } else if (o->encoding == OBJ_ENCODING_HT) {
- sds aux;
+ if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK)
+ len = vstr ? vlen : sdigits10(vll);
- if ((aux = hashTypeGetFromHashTable(o, field)) != NULL)
- len = sdslen(aux);
- } else {
- serverPanic("Unknown hash encoding");
- }
return len;
}
/* Test if the specified field exists in the given hash. Returns 1 if the field
* exists, and 0 when it doesn't. */
int hashTypeExists(robj *o, sds field) {
- if (o->encoding == OBJ_ENCODING_LISTPACK) {
- unsigned char *vstr = NULL;
- unsigned int vlen = UINT_MAX;
- long long vll = LLONG_MAX;
+ unsigned char *vstr = NULL;
+ unsigned int vlen = UINT_MAX;
+ long long vll = LLONG_MAX;
- if (hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll) == 0) return 1;
- } else if (o->encoding == OBJ_ENCODING_HT) {
- if (hashTypeGetFromHashTable(o, field) != NULL) return 1;
- } else {
- serverPanic("Unknown hash encoding");
- }
- return 0;
+ return hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK;
}
/* Add a new field, overwrite the old with the new value if it already exists.
@@ -205,6 +190,14 @@ int hashTypeExists(robj *o, sds field) {
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
+ /* Check if the field is too long for listpack, and convert before adding the item.
+ * This is needed for HINCRBY* case since in other commands this is handled early by
+ * hashTypeTryConversion, so this check will be a NOP. */
+ if (o->encoding == OBJ_ENCODING_LISTPACK) {
+ if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value)
+ hashTypeConvert(o, OBJ_ENCODING_HT);
+ }
+
if (o->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *zl, *fptr, *vptr;
@@ -717,37 +710,23 @@ void hincrbyfloatCommand(client *c) {
}
static void addHashFieldToReply(client *c, robj *o, sds field) {
- int ret;
-
if (o == NULL) {
addReplyNull(c);
return;
}
- if (o->encoding == OBJ_ENCODING_LISTPACK) {
- unsigned char *vstr = NULL;
- unsigned int vlen = UINT_MAX;
- long long vll = LLONG_MAX;
+ unsigned char *vstr = NULL;
+ unsigned int vlen = UINT_MAX;
+ long long vll = LLONG_MAX;
- ret = hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll);
- if (ret < 0) {
- addReplyNull(c);
+ if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) {
+ if (vstr) {
+ addReplyBulkCBuffer(c, vstr, vlen);
} else {
- if (vstr) {
- addReplyBulkCBuffer(c, vstr, vlen);
- } else {
- addReplyBulkLongLong(c, vll);
- }
+ addReplyBulkLongLong(c, vll);
}
-
- } else if (o->encoding == OBJ_ENCODING_HT) {
- sds value = hashTypeGetFromHashTable(o, field);
- if (value == NULL)
- addReplyNull(c);
- else
- addReplyBulkCBuffer(c, value, sdslen(value));
} else {
- serverPanic("Unknown hash encoding");
+ addReplyNull(c);
}
}
@@ -907,7 +886,7 @@ void hscanCommand(client *c) {
scanGenericCommand(c,o,cursor);
}
-static void harndfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
+static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
for (unsigned long i = 0; i < count; i++) {
if (vals && c->resp > 2)
addReplyArrayLen(c,2);
@@ -990,7 +969,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
sample_count = count > limit ? limit : count;
count -= sample_count;
lpRandomPairs(hash->ptr, sample_count, keys, vals);
- harndfieldReplyWithListpack(c, sample_count, keys, vals);
+ hrandfieldReplyWithListpack(c, sample_count, keys, vals);
}
zfree(keys);
zfree(vals);
@@ -1092,7 +1071,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
if (withvalues)
vals = zmalloc(sizeof(listpackEntry)*count);
serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count);
- harndfieldReplyWithListpack(c, count, keys, vals);
+ hrandfieldReplyWithListpack(c, count, keys, vals);
zfree(keys);
zfree(vals);
return;
diff --git a/src/t_stream.c b/src/t_stream.c
index cd7d9723e..e6e5da731 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -1000,7 +1000,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
return -1;
}
- if (c == server.master || c->id == CLIENT_ID_AOF) {
+ if (mustObeyClient(c)) {
/* If command came from master or from AOF we must not enforce maxnodes
* (The maxlen/minid argument was re-written to make sure there's no
* inconsistency). */
@@ -1370,24 +1370,35 @@ void streamLastValidID(stream *s, streamID *maxid)
streamIteratorStop(&si);
}
+/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
+ * But to avoid chance for off by one issues and null-term, in case this will
+ * be used as parsing buffer, we use a slightly larger buffer. On the other
+ * hand considering sds header is gonna add 4 bytes, we wanna keep below the
+ * allocator's 48 bytes bin. */
+#define STREAM_ID_STR_LEN 44
+
+sds createStreamIDString(streamID *id) {
+ /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
+ sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
+ sdssetlen(str, 0);
+ return sdscatfmt(str,"%U-%U", id->ms,id->seq);
+}
+
/* Emit a reply in the client output buffer by formatting a Stream ID
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
void addReplyStreamID(client *c, streamID *id) {
- sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
- addReplyBulkSds(c,replyid);
+ addReplyBulkSds(c,createStreamIDString(id));
}
void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
- sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
- setDeferredReplyBulkSds(c, dr, replyid);
+ setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
}
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) {
- return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
- id->ms,id->seq));
+ return createObject(OBJ_STRING, createStreamIDString(id));
}
/* Returns non-zero if the ID is 0-0. */
@@ -2025,7 +2036,8 @@ void xaddCommand(client *c) {
addReplyError(c,"Elements are too large to be stored");
return;
}
- addReplyStreamID(c,&id);
+ sds replyid = createStreamIDString(&id);
+ addReplyBulkCBuffer(c, replyid, sdslen(replyid));
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
@@ -2050,9 +2062,11 @@ void xaddCommand(client *c) {
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
if (!parsed_args.id_given || !parsed_args.seq_given) {
- robj *idarg = createObjectFromStreamID(&id);
+ robj *idarg = createObject(OBJ_STRING, replyid);
rewriteClientCommandArgument(c, idpos, idarg);
decrRefCount(idarg);
+ } else {
+ sdsfree(replyid);
}
/* We need to signal to blocked clients that there is new data on this
diff --git a/src/t_string.c b/src/t_string.c
index 2b43a5700..7b67b78ce 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -38,7 +38,7 @@ int getGenericCommand(client *c);
*----------------------------------------------------------------------------*/
static int checkStringLength(client *c, long long size) {
- if (!(c->flags & CLIENT_MASTER) && size > server.proto_max_bulk_len) {
+ if (!mustObeyClient(c) && size > server.proto_max_bulk_len) {
addReplyError(c,"string exceeds maximum allowed size (proto-max-bulk-len)");
return C_ERR;
}
@@ -792,7 +792,7 @@ void lcsCommand(client *c) {
/* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
- * we index it as LCS[j+(blen+1)*j] */
+ * we index it as LCS[j+(blen+1)*i] */
#define LCS(A,B) lcs[(B)+((A)*(blen+1))]
/* Try to allocate the LCS table, and abort on overflow or insufficient memory. */
diff --git a/src/t_zset.c b/src/t_zset.c
index 77ca7c83a..7796a6dec 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1029,17 +1029,25 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, doub
unsigned char *sptr;
char scorebuf[MAX_D2STRING_CHARS];
int scorelen;
-
- scorelen = d2string(scorebuf,sizeof(scorebuf),score);
+ long long lscore;
+ int score_is_long = double2ll(score, &lscore);
+ if (!score_is_long)
+ scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele));
- zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
+ if (score_is_long)
+ zl = lpAppendInteger(zl,lscore);
+ else
+ zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr);
/* Insert score after the member. */
- zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
+ if (score_is_long)
+ zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL);
+ else
+ zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
}
return zl;
}
@@ -3964,7 +3972,7 @@ void zpopminCommand(client *c) {
zpopMinMaxCommand(c, ZSET_MIN);
}
-/* ZMAXPOP key [<count>] */
+/* ZPOPMAX key [<count>] */
void zpopmaxCommand(client *c) {
zpopMinMaxCommand(c, ZSET_MAX);
}
@@ -4351,12 +4359,12 @@ void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
}
}
-/* ZMPOP numkeys [<key> ...] MIN|MAX [COUNT count] */
+/* ZMPOP numkeys key [<key> ...] MIN|MAX [COUNT count] */
void zmpopCommand(client *c) {
zmpopGenericCommand(c, 1, 0);
}
-/* BZMPOP timeout numkeys [<key> ...] MIN|MAX [COUNT count] */
+/* BZMPOP timeout numkeys key [<key> ...] MIN|MAX [COUNT count] */
void bzmpopCommand(client *c) {
zmpopGenericCommand(c, 2, 1);
}
diff --git a/src/timeout.c b/src/timeout.c
index d4c4690e5..36cea7c26 100644
--- a/src/timeout.c
+++ b/src/timeout.c
@@ -58,7 +58,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
if (server.maxidletime &&
/* This handles the idle clients connection timeout if set. */
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
- !(c->flags & CLIENT_MASTER) && /* No timeout for masters */
+ !mustObeyClient(c) && /* No timeout for masters and AOF */
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
diff --git a/src/util.c b/src/util.c
index 45591d9f2..85b631f19 100644
--- a/src/util.c
+++ b/src/util.c
@@ -552,6 +552,36 @@ int string2d(const char *s, size_t slen, double *dp) {
return 1;
}
+/* Returns 1 if the double value can safely be represented in long long without
+ * precision loss, in which case the corresponding long long is stored in the out variable. */
+int double2ll(double d, long long *out) {
+#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL)
+ /* Check if the float is in a safe range to be casted into a
+ * long long. We are assuming that long long is 64 bit here.
+ * Also we are assuming that there are no implementations around where
+ * double has precision < 52 bit.
+ *
+ * Under this assumptions we test if a double is inside a range
+ * where casting to long long is safe. Then using two castings we
+ * make sure the decimal part is zero. If all this is true we can use
+ * integer without precision loss.
+ *
+ * Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part,
+ * and the exponent bits are positive, which means the "decimal" part must be 0.
+ * i.e. all double values in that range are representable as a long without precision loss,
+ * but not all long values in that range can be represented as a double.
+ * we only care about the first part here. */
+ if (d < (double)(-LLONG_MAX/2) || d > (double)(LLONG_MAX/2))
+ return 0;
+ long long ll = d;
+ if (ll == d) {
+ *out = ll;
+ return 1;
+ }
+#endif
+ return 0;
+}
+
/* Convert a double to a string representation. Returns the number of bytes
* required. The representation should always be parsable by strtod(3).
* This function does not support human-friendly formatting like ld2string
@@ -572,22 +602,11 @@ int d2string(char *buf, size_t len, double value) {
else
len = snprintf(buf,len,"0");
} else {
-#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
- /* Check if the float is in a safe range to be casted into a
- * long long. We are assuming that long long is 64 bit here.
- * Also we are assuming that there are no implementations around where
- * double has precision < 52 bit.
- *
- * Under this assumptions we test if a double is inside an interval
- * where casting to long long is safe. Then using two castings we
- * make sure the decimal part is zero. If all this is true we use
- * integer printing function that is much faster. */
- double min = -4503599627370495; /* (2^52)-1 */
- double max = 4503599627370496; /* -(2^52) */
- if (value > min && value < max && value == ((double)((long long)value)))
- len = ll2string(buf,len,(long long)value);
+ long long lvalue;
+ /* Integer printing function is much faster, check if we can safely use it. */
+ if (double2ll(value, &lvalue))
+ len = ll2string(buf,len,lvalue);
else
-#endif
len = snprintf(buf,len,"%.17g",value);
}
diff --git a/src/util.h b/src/util.h
index 5ea71fecd..0515f1a83 100644
--- a/src/util.h
+++ b/src/util.h
@@ -75,6 +75,7 @@ int string2d(const char *s, size_t slen, double *dp);
int trimDoubleString(char *buf, size_t len);
int d2string(char *buf, size_t len, double value);
int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
+int double2ll(double d, long long *out);
int yesnotoi(char *s);
sds getAbsolutePath(char *filename);
long getTimeZone(void);
diff --git a/src/version.h b/src/version.h
index b9dc9258e..4e5e62b08 100644
--- a/src/version.h
+++ b/src/version.h
@@ -1,2 +1,2 @@
-#define REDIS_VERSION "6.9.242"
-#define REDIS_VERSION_NUM 0x000609f2
+#define REDIS_VERSION "7.0.0"
+#define REDIS_VERSION_NUM 0x00070000