diff options
Diffstat (limited to 'src')
54 files changed, 1593 insertions, 976 deletions
diff --git a/src/anet.c b/src/anet.c index bde460fc8..3ded135b0 100644 --- a/src/anet.c +++ b/src/anet.c @@ -49,6 +49,8 @@ #include "anet.h" #include "config.h" +#define UNUSED(x) (void)(x) + static void anetSetError(char *err, const char *fmt, ...) { va_list ap; @@ -680,3 +682,18 @@ error: close(fds[1]); return -1; } + +int anetSetSockMarkId(char *err, int fd, uint32_t id) { +#ifdef HAVE_SOCKOPTMARKID + if (setsockopt(fd, SOL_SOCKET, SOCKOPTMARKID, (void *)&id, sizeof(id)) == -1) { + anetSetError(err, "setsockopt: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +#else + UNUSED(fd); + UNUSED(id); + anetSetError(err,"anetSetSockMarkid unsupported on this platform"); + return ANET_OK; +#endif +} diff --git a/src/anet.h b/src/anet.h index 96238aaf4..ff86e2029 100644 --- a/src/anet.h +++ b/src/anet.h @@ -73,5 +73,6 @@ int anetKeepAlive(char *err, int fd, int interval); int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port); int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type); int anetPipe(int fds[2], int read_flags, int write_flags); +int anetSetSockMarkId(char *err, int fd, uint32_t id); #endif @@ -87,7 +87,7 @@ void aofManifestFreeAndUpdate(aofManifest *am); #define RDB_FORMAT_SUFFIX ".rdb" #define AOF_FORMAT_SUFFIX ".aof" #define MANIFEST_NAME_SUFFIX ".manifest" -#define MANIFEST_TEMP_NAME_PREFIX "temp_" +#define TEMP_FILE_NAME_PREFIX "temp-" /* AOF manifest key. */ #define AOF_MANIFEST_KEY_FILE_NAME "file" @@ -169,7 +169,7 @@ sds getAofManifestFileName() { } sds getTempAofManifestFileName() { - return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX, + return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename, MANIFEST_NAME_SUFFIX); } @@ -462,6 +462,12 @@ sds getNewIncrAofName(aofManifest *am) { return ai->file_name; } +/* Get temp INCR type AOF name. */ +sds getTempIncrAofName() { + return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename, + INCR_FILE_SUFFIX); +} + /* Get the last INCR AOF name or create a new one. */ sds getLastIncrAofName(aofManifest *am) { serverAssert(am != NULL); @@ -674,6 +680,17 @@ int aofDelHistoryFiles(void) { return persistAofManifest(server.aof_manifest); } +/* Used to clean up temp INCR AOF when AOFRW fails. */ +void aofDelTempIncrAofFile() { + sds aof_filename = getTempIncrAofName(); + sds aof_filepath = makePath(server.aof_dirname, aof_filename); + serverLog(LL_NOTICE, "Removing the temp incr aof file %s in the background", aof_filename); + bg_unlink(aof_filepath); + sdsfree(aof_filepath); + sdsfree(aof_filename); + return; +} + /* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is * 'AOF_ON', It will do three things: * 1. Force create a BASE file when redis starts with an empty dataset @@ -739,44 +756,52 @@ int aofFileExist(char *filename) { } /* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state` - * is 'AOF_ON' or 'AOF_WAIT_REWRITE', It will do two things: + * is 'AOF_ON', It will do two things: * 1. Open a new INCR type AOF for writing * 2. Synchronously update the manifest file to the disk * * The above two steps of modification are atomic, that is, if * any step fails, the entire operation will rollback and returns * C_ERR, and if all succeeds, it returns C_OK. + * + * If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF + * file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be + * renamed in the `backgroundRewriteDoneHandler` and written to the manifest file. * */ int openNewIncrAofForAppend(void) { serverAssert(server.aof_manifest != NULL); - int newfd; + int newfd = -1; + aofManifest *temp_am = NULL; + sds new_aof_name = NULL; /* Only open new INCR AOF when AOF enabled. */ if (server.aof_state == AOF_OFF) return C_OK; - /* Dup a temp aof_manifest to modify. */ - aofManifest *temp_am = aofManifestDup(server.aof_manifest); - /* Open new AOF. */ - sds new_aof_name = getNewIncrAofName(temp_am); + if (server.aof_state == AOF_WAIT_REWRITE) { + /* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */ + new_aof_name = getTempIncrAofName(); + } else { + /* Dup a temp aof_manifest to modify. */ + temp_am = aofManifestDup(server.aof_manifest); + new_aof_name = sdsdup(getNewIncrAofName(temp_am)); + } sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name); newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644); sdsfree(new_aof_filepath); if (newfd == -1) { serverLog(LL_WARNING, "Can't open the append-only file %s: %s", new_aof_name, strerror(errno)); - - aofManifestFree(temp_am); - return C_ERR; + goto cleanup; } - /* Persist AOF Manifest. */ - int ret = persistAofManifest(temp_am); - if (ret == C_ERR) { - close(newfd); - aofManifestFree(temp_am); - return C_ERR; + if (temp_am) { + /* Persist AOF Manifest. */ + if (persistAofManifest(temp_am) == C_ERR) { + goto cleanup; + } } + sdsfree(new_aof_name); /* If reaches here, we can safely modify the `server.aof_manifest` * and `server.aof_fd`. */ @@ -788,8 +813,14 @@ int openNewIncrAofForAppend(void) { /* Reset the aof_last_incr_size. */ server.aof_last_incr_size = 0; /* Update `server.aof_manifest`. */ - aofManifestFreeAndUpdate(temp_am); + if (temp_am) aofManifestFreeAndUpdate(temp_am); return C_OK; + +cleanup: + if (new_aof_name) sdsfree(new_aof_name); + if (newfd != -1) close(newfd); + if (temp_am) aofManifestFree(temp_am); + return C_ERR; } /* Whether to limit the execution of Background AOF rewrite. @@ -815,38 +846,35 @@ int openNewIncrAofForAppend(void) { #define AOF_REWRITE_LIMITE_THRESHOLD 3 #define AOF_REWRITE_LIMITE_MAX_MINUTES 60 /* 1 hour */ int aofRewriteLimited(void) { - int limit = 0; - static int limit_delay_minutes = 0; + static int next_delay_minutes = 0; static time_t next_rewrite_time = 0; - unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list); - if (incr_aof_num >= AOF_REWRITE_LIMITE_THRESHOLD) { + if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) { + /* We may be recovering from limited state, so reset all states. */ + next_delay_minutes = 0; + next_rewrite_time = 0; + return 0; + } + + /* if it is in the limiting state, then check if the next_rewrite_time is reached */ + if (next_rewrite_time != 0) { if (server.unixtime < next_rewrite_time) { - limit = 1; + return 1; } else { - if (limit_delay_minutes == 0) { - limit = 1; - limit_delay_minutes = 1; - } else { - limit_delay_minutes *= 2; - } - - if (limit_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) { - limit_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES; - } - - next_rewrite_time = server.unixtime + limit_delay_minutes * 60; - - serverLog(LL_WARNING, - "Background AOF rewrite has repeatedly failed %ld times and triggered the limit, will retry in %d minutes", - incr_aof_num, limit_delay_minutes); + next_rewrite_time = 0; + return 0; } - } else { - limit_delay_minutes = 0; - next_rewrite_time = 0; } - return limit; + next_delay_minutes = (next_delay_minutes == 0) ? 1 : (next_delay_minutes * 2); + if (next_delay_minutes > AOF_REWRITE_LIMITE_MAX_MINUTES) { + next_delay_minutes = AOF_REWRITE_LIMITE_MAX_MINUTES; + } + + next_rewrite_time = server.unixtime + next_delay_minutes * 60; + serverLog(LL_WARNING, + "Background AOF rewrite has repeatedly failed and triggered the limit, will retry in %d minutes", next_delay_minutes); + return 1; } /* ---------------------------------------------------------------------------- @@ -1265,7 +1293,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ if (server.aof_state == AOF_ON || - server.child_type == CHILD_TYPE_AOF) + (server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF)) { server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf)); } @@ -1558,7 +1586,7 @@ int loadAppendOnlyFiles(aofManifest *am) { serverAssert(am != NULL); int status, ret = C_OK; long long start; - off_t total_size = 0; + off_t total_size = 0, base_size = 0; sds aof_name; int total_num, aof_num = 0, last_file; @@ -1607,6 +1635,7 @@ int loadAppendOnlyFiles(aofManifest *am) { serverAssert(am->base_aof_info->file_type == AOF_FILE_TYPE_BASE); aof_name = (char*)am->base_aof_info->file_name; updateLoadingFileName(aof_name); + base_size = getAppendOnlyFileSize(aof_name, NULL); last_file = ++aof_num == total_num; start = ustime(); ret = loadSingleAppendOnlyFile(aof_name); @@ -1659,7 +1688,16 @@ int loadAppendOnlyFiles(aofManifest *am) { } server.aof_current_size = total_size; - server.aof_rewrite_base_size = server.aof_current_size; + /* Ideally, the aof_rewrite_base_size variable should hold the size of the + * AOF when the last rewrite ended, this should include the size of the + * incremental file that was created during the rewrite since otherwise we + * risk the next automatic rewrite to happen too soon (or immediately if + * auto-aof-rewrite-percentage is low). However, since we do not persist + * aof_rewrite_base_size information anywhere, we initialize it on restart + * to the size of BASE AOF file. This might cause the first AOFRW to be + * executed early, but that shouldn't be a problem since everything will be + * fine after the first AOFRW. */ + server.aof_rewrite_base_size = base_size; server.aof_fsync_offset = server.aof_current_size; cleanup: @@ -2393,6 +2431,9 @@ void bgrewriteaofCommand(client *c) { addReplyError(c,"Background append only file rewriting already in progress"); } else if (hasActiveChildProcess() || server.in_exec) { server.aof_rewrite_scheduled = 1; + /* When manually triggering AOFRW we reset the count + * so that it can be executed immediately. */ + server.stat_aofrw_consecutive_failures = 0; addReplyStatus(c,"Background append only file rewriting scheduled"); } else if (rewriteAppendOnlyFileBackground() == C_OK) { addReplyStatus(c,"Background append only file rewriting started"); @@ -2476,7 +2517,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { char tmpfile[256]; long long now = ustime(); - sds new_base_filename; + sds new_base_filepath = NULL; + sds new_incr_filepath = NULL; aofManifest *temp_am; mstime_t latency; @@ -2493,9 +2535,9 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* Get a new BASE file name and mark the previous (if we have) * as the HISTORY type. */ - new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am); + sds new_base_filename = getNewBaseFileNameAndMarkPreAsHistory(temp_am); serverAssert(new_base_filename != NULL); - sds new_base_filepath = makePath(server.aof_dirname, new_base_filename); + new_base_filepath = makePath(server.aof_dirname, new_base_filename); /* Rename the temporary aof file to 'new_base_filename'. */ latencyStartMonitor(latency); @@ -2503,7 +2545,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverLog(LL_WARNING, "Error trying to rename the temporary AOF file %s into %s: %s", tmpfile, - new_base_filename, + new_base_filepath, strerror(errno)); aofManifestFree(temp_am); sdsfree(new_base_filepath); @@ -2512,6 +2554,34 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rename", latency); + /* Rename the temporary incr aof file to 'new_incr_filename'. */ + if (server.aof_state == AOF_WAIT_REWRITE) { + /* Get temporary incr aof name. */ + sds temp_incr_aof_name = getTempIncrAofName(); + sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name); + sdsfree(temp_incr_aof_name); + /* Get next new incr aof name. */ + sds new_incr_filename = getNewIncrAofName(temp_am); + new_incr_filepath = makePath(server.aof_dirname, new_incr_filename); + latencyStartMonitor(latency); + if (rename(temp_incr_filepath, new_incr_filepath) == -1) { + serverLog(LL_WARNING, + "Error trying to rename the temporary incr AOF file %s into %s: %s", + temp_incr_filepath, + new_incr_filepath, + strerror(errno)); + bg_unlink(new_base_filepath); + sdsfree(new_base_filepath); + aofManifestFree(temp_am); + sdsfree(temp_incr_filepath); + sdsfree(new_incr_filepath); + goto cleanup; + } + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("aof-rename", latency); + sdsfree(temp_incr_filepath); + } + /* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR * to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */ markRewrittenIncrAofAsHistory(temp_am); @@ -2521,9 +2591,14 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { bg_unlink(new_base_filepath); aofManifestFree(temp_am); sdsfree(new_base_filepath); + if (new_incr_filepath) { + bg_unlink(new_incr_filepath); + sdsfree(new_incr_filepath); + } goto cleanup; } sdsfree(new_base_filepath); + if (new_incr_filepath) sdsfree(new_incr_filepath); /* We can safely let `server.aof_manifest` point to 'temp_am' and free the previous one. */ aofManifestFreeAndUpdate(temp_am); @@ -2542,6 +2617,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { aofDelHistoryFiles(); server.aof_lastbgrewrite_status = C_OK; + server.stat_aofrw_consecutive_failures = 0; serverLog(LL_NOTICE, "Background AOF rewrite finished successfully"); /* Change state from WAIT_REWRITE to ON if needed */ @@ -2552,14 +2628,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { server.aof_lastbgrewrite_status = C_ERR; + server.stat_aofrw_consecutive_failures++; serverLog(LL_WARNING, "Background AOF rewrite terminated with error"); } else { /* SIGUSR1 is whitelisted, so we have a way to kill a child without * triggering an error condition. */ - if (bysignal != SIGUSR1) + if (bysignal != SIGUSR1) { server.aof_lastbgrewrite_status = C_ERR; + server.stat_aofrw_consecutive_failures++; + } serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); @@ -2567,6 +2646,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { cleanup: aofRemoveTempFile(server.child_pid); + /* Clear AOF buffer and delete temp incr aof for next rewrite. */ + if (server.aof_state == AOF_WAIT_REWRITE) { + sdsfree(server.aof_buf); + server.aof_buf = sdsempty(); + aofDelTempIncrAofFile(); + } server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ diff --git a/src/bitops.c b/src/bitops.c index 8a6dee44d..e35001937 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -430,7 +430,7 @@ int getBitOffsetFromArgument(client *c, robj *o, uint64_t *offset, int hash, int if (usehash) loffset *= bits; /* Limit offset to server.proto_max_bulk_len (512MB in bytes by default) */ - if (loffset < 0 || (!(c->flags & CLIENT_MASTER) && (loffset >> 3) >= server.proto_max_bulk_len)) + if (loffset < 0 || (!mustObeyClient(c) && (loffset >> 3) >= server.proto_max_bulk_len)) { addReplyError(c,err); return C_ERR; @@ -1002,7 +1002,7 @@ void bitposCommand(client *c) { } } -/* BITFIELD key subcommmand-1 arg ... subcommand-2 arg ... subcommand-N ... +/* BITFIELD key subcommand-1 arg ... subcommand-2 arg ... subcommand-N ... * * Supported subcommands: * diff --git a/src/cli_common.c b/src/cli_common.c index 33069017b..7b4775cde 100644 --- a/src/cli_common.c +++ b/src/cli_common.c @@ -390,7 +390,7 @@ sds escapeJsonString(sds s, const char *p, size_t len) { case '\t': s = sdscatlen(s,"\\t",2); break; case '\b': s = sdscatlen(s,"\\b",2); break; default: - s = sdscatprintf(s,(*p >= 0 && *p <= 0x1f) ? "\\u%04x" : "%c",*p); + s = sdscatprintf(s,*(unsigned char *)p <= 0x1f ? "\\u%04x" : "%c",*p); } p++; } diff --git a/src/cluster.c b/src/cluster.c index 701871b36..adad07e19 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -960,7 +960,6 @@ clusterNode *createClusterNode(char *nodename, int flags) { memset(node->slots,0,sizeof(node->slots)); node->slot_info_pairs = NULL; node->slot_info_pairs_count = 0; - node->slot_info_pairs_alloc = 0; node->numslots = 0; node->numslaves = 0; node->slaves = NULL; @@ -2507,11 +2506,7 @@ int clusterProcessPacket(clusterLink *link) { message = createStringObject( (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); - if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - pubsubPublishMessageShard(channel, message); - } else { - pubsubPublishMessage(channel,message); - } + pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); decrRefCount(channel); decrRefCount(message); } @@ -3200,20 +3195,19 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin /* ----------------------------------------------------------------------------- * CLUSTER Pub/Sub support * - * For now we do very little, just propagating PUBLISH messages across the whole + * If `sharded` is 0: + * For now we do very little, just propagating [S]PUBLISH messages across the whole * cluster. In the future we'll try to get smarter and avoiding propagating those * messages to hosts without receives for a given channel. - * -------------------------------------------------------------------------- */ -void clusterPropagatePublish(robj *channel, robj *message) { - clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER Pub/Sub shard support - * + * Otherwise: * Publish this message across the slot (primary/replica). * -------------------------------------------------------------------------- */ -void clusterPropagatePublishShard(robj *channel, robj *message) { +void clusterPropagatePublish(robj *channel, robj *message, int sharded) { + if (!sharded) { + clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); + return; + } + list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself); if (listLength(nodes_for_slot) != 0) { listIter li; @@ -4726,13 +4720,10 @@ void clusterGenNodesSlotsInfo(int filter) { * or end of slot. */ if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) { if (!(n->flags & filter)) { - if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) { - if (n->slot_info_pairs_alloc == 0) - n->slot_info_pairs_alloc = 8; - else - n->slot_info_pairs_alloc *= 2; - n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t)); + if (!n->slot_info_pairs) { + n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t)); } + serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots)); n->slot_info_pairs[n->slot_info_pairs_count++] = start; n->slot_info_pairs[n->slot_info_pairs_count++] = i-1; } @@ -4747,7 +4738,6 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) { zfree(n->slot_info_pairs); n->slot_info_pairs = NULL; n->slot_info_pairs_count = 0; - n->slot_info_pairs_alloc = 0; } /* Generate a csv-alike representation of the nodes we are aware of, diff --git a/src/cluster.h b/src/cluster.h index 27e9e7770..1349a7a92 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -120,7 +120,6 @@ typedef struct clusterNode { unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */ int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */ - int slot_info_pairs_alloc; /* Allocated number of slots in slot_info_pairs */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ struct clusterNode **slaves; /* pointers to slave nodes */ @@ -385,8 +384,7 @@ void migrateCloseTimedoutSockets(void); int verifyClusterConfigWithData(void); unsigned long getClusterConnectionsCount(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len); -void clusterPropagatePublish(robj *channel, robj *message); -void clusterPropagatePublishShard(robj *channel, robj *message); +void clusterPropagatePublish(robj *channel, robj *message, int sharded); unsigned int keyHashSlot(char *key, int keylen); void slotToKeyAddEntry(dictEntry *entry, redisDb *db); void slotToKeyDelEntry(dictEntry *entry, redisDb *db); diff --git a/src/commands.c b/src/commands.c index efc159ad8..dfec3cdea 100644 --- a/src/commands.c +++ b/src/commands.c @@ -47,44 +47,62 @@ struct redisCommandArg BITCOUNT_Args[] = { /* BITFIELD tips */ #define BITFIELD_tips NULL -/* BITFIELD encoding_offset argument table */ -struct redisCommandArg BITFIELD_encoding_offset_Subargs[] = { +/* BITFIELD operation encoding_offset argument table */ +struct redisCommandArg BITFIELD_operation_encoding_offset_Subargs[] = { {"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; -/* BITFIELD encoding_offset_value argument table */ -struct redisCommandArg BITFIELD_encoding_offset_value_Subargs[] = { +/* BITFIELD operation write wrap_sat_fail argument table */ +struct redisCommandArg BITFIELD_operation_write_wrap_sat_fail_Subargs[] = { +{"wrap",ARG_TYPE_PURE_TOKEN,-1,"WRAP",NULL,NULL,CMD_ARG_NONE}, +{"sat",ARG_TYPE_PURE_TOKEN,-1,"SAT",NULL,NULL,CMD_ARG_NONE}, +{"fail",ARG_TYPE_PURE_TOKEN,-1,"FAIL",NULL,NULL,CMD_ARG_NONE}, +{0} +}; + +/* BITFIELD operation write write_operation encoding_offset_value argument table */ +struct redisCommandArg BITFIELD_operation_write_write_operation_encoding_offset_value_Subargs[] = { {"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"value",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; -/* BITFIELD encoding_offset_increment argument table */ -struct redisCommandArg BITFIELD_encoding_offset_increment_Subargs[] = { +/* BITFIELD operation write write_operation encoding_offset_increment argument table */ +struct redisCommandArg BITFIELD_operation_write_write_operation_encoding_offset_increment_Subargs[] = { {"encoding",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"offset",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"increment",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; -/* BITFIELD wrap_sat_fail argument table */ -struct redisCommandArg BITFIELD_wrap_sat_fail_Subargs[] = { -{"wrap",ARG_TYPE_PURE_TOKEN,-1,"WRAP",NULL,NULL,CMD_ARG_NONE}, -{"sat",ARG_TYPE_PURE_TOKEN,-1,"SAT",NULL,NULL,CMD_ARG_NONE}, -{"fail",ARG_TYPE_PURE_TOKEN,-1,"FAIL",NULL,NULL,CMD_ARG_NONE}, +/* BITFIELD operation write write_operation argument table */ +struct redisCommandArg BITFIELD_operation_write_write_operation_Subargs[] = { +{"encoding_offset_value",ARG_TYPE_BLOCK,-1,"SET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_encoding_offset_value_Subargs}, +{"encoding_offset_increment",ARG_TYPE_BLOCK,-1,"INCRBY",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_encoding_offset_increment_Subargs}, +{0} +}; + +/* BITFIELD operation write argument table */ +struct redisCommandArg BITFIELD_operation_write_Subargs[] = { +{"wrap_sat_fail",ARG_TYPE_ONEOF,-1,"OVERFLOW",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_operation_write_wrap_sat_fail_Subargs}, +{"write_operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_write_operation_Subargs}, +{0} +}; + +/* BITFIELD operation argument table */ +struct redisCommandArg BITFIELD_operation_Subargs[] = { +{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_encoding_offset_Subargs}, +{"write",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_operation_write_Subargs}, {0} }; /* BITFIELD argument table */ struct redisCommandArg BITFIELD_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, -{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_Subargs}, -{"encoding_offset_value",ARG_TYPE_BLOCK,-1,"SET",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_value_Subargs}, -{"encoding_offset_increment",ARG_TYPE_BLOCK,-1,"INCRBY",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_encoding_offset_increment_Subargs}, -{"wrap_sat_fail",ARG_TYPE_ONEOF,-1,"OVERFLOW",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=BITFIELD_wrap_sat_fail_Subargs}, +{"operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,.subargs=BITFIELD_operation_Subargs}, {0} }; @@ -106,7 +124,7 @@ struct redisCommandArg BITFIELD_RO_encoding_offset_Subargs[] = { /* BITFIELD_RO argument table */ struct redisCommandArg BITFIELD_RO_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, -{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_NONE,.subargs=BITFIELD_RO_encoding_offset_Subargs}, +{"encoding_offset",ARG_TYPE_BLOCK,-1,"GET",NULL,NULL,CMD_ARG_MULTIPLE,.subargs=BITFIELD_RO_encoding_offset_Subargs}, {0} }; @@ -1313,13 +1331,20 @@ struct redisCommandArg MIGRATE_key_or_empty_string_Subargs[] = { {0} }; -/* MIGRATE username_password argument table */ -struct redisCommandArg MIGRATE_username_password_Subargs[] = { +/* MIGRATE authentication username_password argument table */ +struct redisCommandArg MIGRATE_authentication_username_password_Subargs[] = { {"username",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"password",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; +/* MIGRATE authentication argument table */ +struct redisCommandArg MIGRATE_authentication_Subargs[] = { +{"password",ARG_TYPE_STRING,-1,"AUTH",NULL,"4.0.7",CMD_ARG_OPTIONAL}, +{"username_password",ARG_TYPE_BLOCK,-1,"AUTH2",NULL,"6.0.0",CMD_ARG_OPTIONAL,.subargs=MIGRATE_authentication_username_password_Subargs}, +{0} +}; + /* MIGRATE argument table */ struct redisCommandArg MIGRATE_Args[] = { {"host",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, @@ -1329,8 +1354,7 @@ struct redisCommandArg MIGRATE_Args[] = { {"timeout",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"copy",ARG_TYPE_PURE_TOKEN,-1,"COPY",NULL,"3.0.0",CMD_ARG_OPTIONAL}, {"replace",ARG_TYPE_PURE_TOKEN,-1,"REPLACE",NULL,"3.0.0",CMD_ARG_OPTIONAL}, -{"password",ARG_TYPE_STRING,-1,"AUTH",NULL,"4.0.7",CMD_ARG_OPTIONAL}, -{"username_password",ARG_TYPE_BLOCK,-1,"AUTH2",NULL,"6.0.0",CMD_ARG_OPTIONAL,.subargs=MIGRATE_username_password_Subargs}, +{"authentication",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=MIGRATE_authentication_Subargs}, {"key",ARG_TYPE_KEY,1,"KEYS",NULL,"3.0.6",CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE}, {0} }; @@ -2089,15 +2113,22 @@ struct redisCommandArg GEORADIUS_RO_Args[] = { /* GEOSEARCH tips */ #define GEOSEARCH_tips NULL -/* GEOSEARCH longitude_latitude argument table */ -struct redisCommandArg GEOSEARCH_longitude_latitude_Subargs[] = { +/* GEOSEARCH from longitude_latitude argument table */ +struct redisCommandArg GEOSEARCH_from_longitude_latitude_Subargs[] = { {"longitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"latitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; -/* GEOSEARCH circle unit argument table */ -struct redisCommandArg GEOSEARCH_circle_unit_Subargs[] = { +/* GEOSEARCH from argument table */ +struct redisCommandArg GEOSEARCH_from_Subargs[] = { +{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_NONE}, +{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_from_longitude_latitude_Subargs}, +{0} +}; + +/* GEOSEARCH by circle unit argument table */ +struct redisCommandArg GEOSEARCH_by_circle_unit_Subargs[] = { {"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE}, {"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE}, {"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE}, @@ -2105,15 +2136,15 @@ struct redisCommandArg GEOSEARCH_circle_unit_Subargs[] = { {0} }; -/* GEOSEARCH circle argument table */ -struct redisCommandArg GEOSEARCH_circle_Subargs[] = { +/* GEOSEARCH by circle argument table */ +struct redisCommandArg GEOSEARCH_by_circle_Subargs[] = { {"radius",ARG_TYPE_DOUBLE,-1,"BYRADIUS",NULL,NULL,CMD_ARG_NONE}, -{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_circle_unit_Subargs}, +{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_circle_unit_Subargs}, {0} }; -/* GEOSEARCH box unit argument table */ -struct redisCommandArg GEOSEARCH_box_unit_Subargs[] = { +/* GEOSEARCH by box unit argument table */ +struct redisCommandArg GEOSEARCH_by_box_unit_Subargs[] = { {"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE}, {"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE}, {"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE}, @@ -2121,11 +2152,18 @@ struct redisCommandArg GEOSEARCH_box_unit_Subargs[] = { {0} }; -/* GEOSEARCH box argument table */ -struct redisCommandArg GEOSEARCH_box_Subargs[] = { +/* GEOSEARCH by box argument table */ +struct redisCommandArg GEOSEARCH_by_box_Subargs[] = { {"width",ARG_TYPE_DOUBLE,-1,"BYBOX",NULL,NULL,CMD_ARG_NONE}, {"height",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, -{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_box_unit_Subargs}, +{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_box_unit_Subargs}, +{0} +}; + +/* GEOSEARCH by argument table */ +struct redisCommandArg GEOSEARCH_by_Subargs[] = { +{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_circle_Subargs}, +{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_box_Subargs}, {0} }; @@ -2146,10 +2184,8 @@ struct redisCommandArg GEOSEARCH_count_Subargs[] = { /* GEOSEARCH argument table */ struct redisCommandArg GEOSEARCH_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, -{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_OPTIONAL}, -{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_longitude_latitude_Subargs}, -{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_circle_Subargs}, -{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_box_Subargs}, +{"from",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_from_Subargs}, +{"by",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCH_by_Subargs}, {"order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_order_Subargs}, {"count",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCH_count_Subargs}, {"withcoord",ARG_TYPE_PURE_TOKEN,-1,"WITHCOORD",NULL,NULL,CMD_ARG_OPTIONAL}, @@ -2166,15 +2202,22 @@ struct redisCommandArg GEOSEARCH_Args[] = { /* GEOSEARCHSTORE tips */ #define GEOSEARCHSTORE_tips NULL -/* GEOSEARCHSTORE longitude_latitude argument table */ -struct redisCommandArg GEOSEARCHSTORE_longitude_latitude_Subargs[] = { +/* GEOSEARCHSTORE from longitude_latitude argument table */ +struct redisCommandArg GEOSEARCHSTORE_from_longitude_latitude_Subargs[] = { {"longitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {"latitude",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, {0} }; -/* GEOSEARCHSTORE circle unit argument table */ -struct redisCommandArg GEOSEARCHSTORE_circle_unit_Subargs[] = { +/* GEOSEARCHSTORE from argument table */ +struct redisCommandArg GEOSEARCHSTORE_from_Subargs[] = { +{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_NONE}, +{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_from_longitude_latitude_Subargs}, +{0} +}; + +/* GEOSEARCHSTORE by circle unit argument table */ +struct redisCommandArg GEOSEARCHSTORE_by_circle_unit_Subargs[] = { {"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE}, {"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE}, {"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE}, @@ -2182,15 +2225,15 @@ struct redisCommandArg GEOSEARCHSTORE_circle_unit_Subargs[] = { {0} }; -/* GEOSEARCHSTORE circle argument table */ -struct redisCommandArg GEOSEARCHSTORE_circle_Subargs[] = { +/* GEOSEARCHSTORE by circle argument table */ +struct redisCommandArg GEOSEARCHSTORE_by_circle_Subargs[] = { {"radius",ARG_TYPE_DOUBLE,-1,"BYRADIUS",NULL,NULL,CMD_ARG_NONE}, -{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_circle_unit_Subargs}, +{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_circle_unit_Subargs}, {0} }; -/* GEOSEARCHSTORE box unit argument table */ -struct redisCommandArg GEOSEARCHSTORE_box_unit_Subargs[] = { +/* GEOSEARCHSTORE by box unit argument table */ +struct redisCommandArg GEOSEARCHSTORE_by_box_unit_Subargs[] = { {"m",ARG_TYPE_PURE_TOKEN,-1,"M",NULL,NULL,CMD_ARG_NONE}, {"km",ARG_TYPE_PURE_TOKEN,-1,"KM",NULL,NULL,CMD_ARG_NONE}, {"ft",ARG_TYPE_PURE_TOKEN,-1,"FT",NULL,NULL,CMD_ARG_NONE}, @@ -2198,11 +2241,18 @@ struct redisCommandArg GEOSEARCHSTORE_box_unit_Subargs[] = { {0} }; -/* GEOSEARCHSTORE box argument table */ -struct redisCommandArg GEOSEARCHSTORE_box_Subargs[] = { +/* GEOSEARCHSTORE by box argument table */ +struct redisCommandArg GEOSEARCHSTORE_by_box_Subargs[] = { {"width",ARG_TYPE_DOUBLE,-1,"BYBOX",NULL,NULL,CMD_ARG_NONE}, {"height",ARG_TYPE_DOUBLE,-1,NULL,NULL,NULL,CMD_ARG_NONE}, -{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_box_unit_Subargs}, +{"unit",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_box_unit_Subargs}, +{0} +}; + +/* GEOSEARCHSTORE by argument table */ +struct redisCommandArg GEOSEARCHSTORE_by_Subargs[] = { +{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_circle_Subargs}, +{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_box_Subargs}, {0} }; @@ -2224,10 +2274,8 @@ struct redisCommandArg GEOSEARCHSTORE_count_Subargs[] = { struct redisCommandArg GEOSEARCHSTORE_Args[] = { {"destination",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, {"source",ARG_TYPE_KEY,1,NULL,NULL,NULL,CMD_ARG_NONE}, -{"member",ARG_TYPE_STRING,-1,"FROMMEMBER",NULL,NULL,CMD_ARG_OPTIONAL}, -{"longitude_latitude",ARG_TYPE_BLOCK,-1,"FROMLONLAT",NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_longitude_latitude_Subargs}, -{"circle",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_circle_Subargs}, -{"box",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_box_Subargs}, +{"from",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_from_Subargs}, +{"by",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=GEOSEARCHSTORE_by_Subargs}, {"order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_order_Subargs}, {"count",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=GEOSEARCHSTORE_count_Subargs}, {"storedist",ARG_TYPE_PURE_TOKEN,-1,"STOREDIST",NULL,NULL,CMD_ARG_OPTIONAL}, @@ -4790,7 +4838,7 @@ struct redisCommandArg MODULE_LOADEX_args_Subargs[] = { /* MODULE LOADEX argument table */ struct redisCommandArg MODULE_LOADEX_Args[] = { {"path",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, -{"configs",ARG_TYPE_BLOCK,-1,"CONFIG",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,.subargs=MODULE_LOADEX_configs_Subargs}, +{"configs",ARG_TYPE_BLOCK,-1,"CONFIG",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE|CMD_ARG_MULTIPLE_TOKEN,.subargs=MODULE_LOADEX_configs_Subargs}, {"args",ARG_TYPE_BLOCK,-1,"ARGS",NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE,.subargs=MODULE_LOADEX_args_Subargs}, {0} }; @@ -6919,6 +6967,13 @@ commandHistory SET_History[] = { /* SET tips */ #define SET_tips NULL +/* SET condition argument table */ +struct redisCommandArg SET_condition_Subargs[] = { +{"nx",ARG_TYPE_PURE_TOKEN,-1,"NX",NULL,NULL,CMD_ARG_NONE}, +{"xx",ARG_TYPE_PURE_TOKEN,-1,"XX",NULL,NULL,CMD_ARG_NONE}, +{0} +}; + /* SET expiration argument table */ struct redisCommandArg SET_expiration_Subargs[] = { {"seconds",ARG_TYPE_INTEGER,-1,"EX",NULL,"2.6.12",CMD_ARG_NONE}, @@ -6929,20 +6984,13 @@ struct redisCommandArg SET_expiration_Subargs[] = { {0} }; -/* SET condition argument table */ -struct redisCommandArg SET_condition_Subargs[] = { -{"nx",ARG_TYPE_PURE_TOKEN,-1,"NX",NULL,NULL,CMD_ARG_NONE}, -{"xx",ARG_TYPE_PURE_TOKEN,-1,"XX",NULL,NULL,CMD_ARG_NONE}, -{0} -}; - /* SET argument table */ struct redisCommandArg SET_Args[] = { {"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE}, {"value",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, -{"expiration",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SET_expiration_Subargs}, {"condition",ARG_TYPE_ONEOF,-1,NULL,NULL,"2.6.12",CMD_ARG_OPTIONAL,.subargs=SET_condition_Subargs}, {"get",ARG_TYPE_PURE_TOKEN,-1,"GET",NULL,"6.2.0",CMD_ARG_OPTIONAL}, +{"expiration",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SET_expiration_Subargs}, {0} }; @@ -7268,7 +7316,7 @@ struct redisCommand redisCommandTable[] = { {"zpopmin","Remove and return members with the lowest scores in a sorted set","O(log(N)*M) with N being the number of elements in the sorted set, and M being the number of elements popped.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZPOPMIN_History,ZPOPMIN_tips,zpopminCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZPOPMIN_Args}, {"zrandmember","Get one or multiple random elements from a sorted set","O(N) where N is the number of elements returned","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANDMEMBER_History,ZRANDMEMBER_tips,zrandmemberCommand,-2,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANDMEMBER_Args}, {"zrange","Return a range of members in a sorted set","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements returned.","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANGE_History,ZRANGE_tips,zrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGE_Args}, -{"zrangebylex","Return a range of members in a sorted set, by lexicographical range","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","2.8.9",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYSCORE` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYLEX_History,ZRANGEBYLEX_tips,zrangebylexCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYLEX_Args}, +{"zrangebylex","Return a range of members in a sorted set, by lexicographical range","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","2.8.9",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYLEX` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYLEX_History,ZRANGEBYLEX_tips,zrangebylexCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYLEX_Args}, {"zrangebyscore","Return a range of members in a sorted set, by score","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements being returned. If M is constant (e.g. always asking for the first 10 elements with LIMIT), you can consider it O(log(N)).","1.0.5",CMD_DOC_DEPRECATED,"`ZRANGE` with the `BYSCORE` argument","6.2.0",COMMAND_GROUP_SORTED_SET,ZRANGEBYSCORE_History,ZRANGEBYSCORE_tips,zrangebyscoreCommand,-4,CMD_READONLY,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGEBYSCORE_Args}, {"zrangestore","Store a range of members from sorted set into another key","O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements stored into the destination key.","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANGESTORE_History,ZRANGESTORE_tips,zrangestoreCommand,-5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANGESTORE_Args}, {"zrank","Determine the index of a member in a sorted set","O(log(N))","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZRANK_History,ZRANK_tips,zrankCommand,3,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZRANK_Args}, diff --git a/src/commands/bitfield.json b/src/commands/bitfield.json index 1f667ce05..5c6fc6105 100644 --- a/src/commands/bitfield.json +++ b/src/commands/bitfield.json @@ -44,84 +44,100 @@ "key_spec_index": 0 }, { - "token": "GET", - "name": "encoding_offset", - "type": "block", - "optional": true, - "arguments": [ - { - "name": "encoding", - "type": "string" - }, - { - "name": "offset", - "type": "integer" - } - ] - }, - { - "token": "SET", - "name": "encoding_offset_value", - "type": "block", - "optional": true, - "arguments": [ - { - "name": "encoding", - "type": "string" - }, - { - "name": "offset", - "type": "integer" - }, - { - "name": "value", - "type": "integer" - } - ] - }, - { - "token": "INCRBY", - "name": "encoding_offset_increment", - "type": "block", - "optional": true, - "arguments": [ - { - "name": "encoding", - "type": "string" - }, - { - "name": "offset", - "type": "integer" - }, - { - "name": "increment", - "type": "integer" - } - ] - }, - { - "token": "OVERFLOW", - "name": "wrap_sat_fail", + "name": "operation", "type": "oneof", - "optional": true, + "multiple": "true", "arguments": [ { - "name": "wrap", - "type": "pure-token", - "token": "WRAP" - }, - { - "name": "sat", - "type": "pure-token", - "token": "SAT" + "token": "GET", + "name": "encoding_offset", + "type": "block", + "arguments": [ + { + "name": "encoding", + "type": "string" + }, + { + "name": "offset", + "type": "integer" + } + ] }, { - "name": "fail", - "type": "pure-token", - "token": "FAIL" + "name": "write", + "type": "block", + "arguments": [ + { + "token": "OVERFLOW", + "name": "wrap_sat_fail", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "wrap", + "type": "pure-token", + "token": "WRAP" + }, + { + "name": "sat", + "type": "pure-token", + "token": "SAT" + }, + { + "name": "fail", + "type": "pure-token", + "token": "FAIL" + } + ] + }, + { + "name": "write_operation", + "type": "oneof", + "arguments": [ + { + "token": "SET", + "name": "encoding_offset_value", + "type": "block", + "arguments": [ + { + "name": "encoding", + "type": "string" + }, + { + "name": "offset", + "type": "integer" + }, + { + "name": "value", + "type": "integer" + } + ] + }, + { + "token": "INCRBY", + "name": "encoding_offset_increment", + "type": "block", + "arguments": [ + { + "name": "encoding", + "type": "string" + }, + { + "name": "offset", + "type": "integer" + }, + { + "name": "increment", + "type": "integer" + } + ] + } + ] + } + ] } ] } ] } -} +}
\ No newline at end of file diff --git a/src/commands/bitfield_ro.json b/src/commands/bitfield_ro.json index a8ec85c29..951fde0f5 100644 --- a/src/commands/bitfield_ro.json +++ b/src/commands/bitfield_ro.json @@ -43,6 +43,7 @@ "token": "GET", "name": "encoding_offset", "type": "block", + "multiple": "true", "arguments": [ { "name": "encoding", diff --git a/src/commands/geosearch.json b/src/commands/geosearch.json index 9730d214e..a83dcaadb 100644 --- a/src/commands/geosearch.json +++ b/src/commands/geosearch.json @@ -39,102 +39,110 @@ "key_spec_index": 0 }, { - "token": "FROMMEMBER", - "name": "member", - "type": "string", - "optional": true - }, - { - "token": "FROMLONLAT", - "name": "longitude_latitude", - "type": "block", - "optional": true, - "arguments": [ - { - "name": "longitude", - "type": "double" - }, - { - "name": "latitude", - "type": "double" - } - ] - }, - { - "name": "circle", - "type": "block", - "optional": true, + "name": "from", + "type": "oneof", "arguments": [ { - "token": "BYRADIUS", - "name": "radius", - "type": "double" + "token": "FROMMEMBER", + "name": "member", + "type": "string" }, { - "name": "unit", - "type": "oneof", + "token": "FROMLONLAT", + "name": "longitude_latitude", + "type": "block", "arguments": [ { - "name": "m", - "type": "pure-token", - "token": "m" + "name": "longitude", + "type": "double" }, { - "name": "km", - "type": "pure-token", - "token": "km" - }, - { - "name": "ft", - "type": "pure-token", - "token": "ft" - }, - { - "name": "mi", - "type": "pure-token", - "token": "mi" + "name": "latitude", + "type": "double" } ] } ] }, { - "name": "box", - "type": "block", - "optional": true, + "name": "by", + "type": "oneof", "arguments": [ { - "token": "BYBOX", - "name": "width", - "type": "double" - }, - { - "name": "height", - "type": "double" - }, - { - "name": "unit", - "type": "oneof", + "name": "circle", + "type": "block", "arguments": [ { - "name": "m", - "type": "pure-token", - "token": "m" + "token": "BYRADIUS", + "name": "radius", + "type": "double" }, { - "name": "km", - "type": "pure-token", - "token": "km" + "name": "unit", + "type": "oneof", + "arguments": [ + { + "name": "m", + "type": "pure-token", + "token": "m" + }, + { + "name": "km", + "type": "pure-token", + "token": "km" + }, + { + "name": "ft", + "type": "pure-token", + "token": "ft" + }, + { + "name": "mi", + "type": "pure-token", + "token": "mi" + } + ] + } + ] + }, + { + "name": "box", + "type": "block", + "arguments": [ + { + "token": "BYBOX", + "name": "width", + "type": "double" }, { - "name": "ft", - "type": "pure-token", - "token": "ft" + "name": "height", + "type": "double" }, { - "name": "mi", - "type": "pure-token", - "token": "mi" + "name": "unit", + "type": "oneof", + "arguments": [ + { + "name": "m", + "type": "pure-token", + "token": "m" + }, + { + "name": "km", + "type": "pure-token", + "token": "km" + }, + { + "name": "ft", + "type": "pure-token", + "token": "ft" + }, + { + "name": "mi", + "type": "pure-token", + "token": "mi" + } + ] } ] } @@ -195,4 +203,4 @@ } ] } -} +}
\ No newline at end of file diff --git a/src/commands/geosearchstore.json b/src/commands/geosearchstore.json index a44ebfe86..16db5d37e 100644 --- a/src/commands/geosearchstore.json +++ b/src/commands/geosearchstore.json @@ -63,102 +63,110 @@ "key_spec_index": 1 }, { - "token": "FROMMEMBER", - "name": "member", - "type": "string", - "optional": true - }, - { - "token": "FROMLONLAT", - "name": "longitude_latitude", - "type": "block", - "optional": true, - "arguments": [ - { - "name": "longitude", - "type": "double" - }, - { - "name": "latitude", - "type": "double" - } - ] - }, - { - "name": "circle", - "type": "block", - "optional": true, + "name": "from", + "type": "oneof", "arguments": [ { - "token": "BYRADIUS", - "name": "radius", - "type": "double" + "token": "FROMMEMBER", + "name": "member", + "type": "string" }, { - "name": "unit", - "type": "oneof", + "token": "FROMLONLAT", + "name": "longitude_latitude", + "type": "block", "arguments": [ { - "name": "m", - "type": "pure-token", - "token": "m" + "name": "longitude", + "type": "double" }, { - "name": "km", - "type": "pure-token", - "token": "km" - }, - { - "name": "ft", - "type": "pure-token", - "token": "ft" - }, - { - "name": "mi", - "type": "pure-token", - "token": "mi" + "name": "latitude", + "type": "double" } ] } ] }, { - "name": "box", - "type": "block", - "optional": true, + "name": "by", + "type": "oneof", "arguments": [ { - "token": "BYBOX", - "name": "width", - "type": "double" - }, - { - "name": "height", - "type": "double" - }, - { - "name": "unit", - "type": "oneof", + "name": "circle", + "type": "block", "arguments": [ { - "name": "m", - "type": "pure-token", - "token": "m" + "token": "BYRADIUS", + "name": "radius", + "type": "double" }, { - "name": "km", - "type": "pure-token", - "token": "km" + "name": "unit", + "type": "oneof", + "arguments": [ + { + "name": "m", + "type": "pure-token", + "token": "m" + }, + { + "name": "km", + "type": "pure-token", + "token": "km" + }, + { + "name": "ft", + "type": "pure-token", + "token": "ft" + }, + { + "name": "mi", + "type": "pure-token", + "token": "mi" + } + ] + } + ] + }, + { + "name": "box", + "type": "block", + "arguments": [ + { + "token": "BYBOX", + "name": "width", + "type": "double" }, { - "name": "ft", - "type": "pure-token", - "token": "ft" + "name": "height", + "type": "double" }, { - "name": "mi", - "type": "pure-token", - "token": "mi" + "name": "unit", + "type": "oneof", + "arguments": [ + { + "name": "m", + "type": "pure-token", + "token": "m" + }, + { + "name": "km", + "type": "pure-token", + "token": "km" + }, + { + "name": "ft", + "type": "pure-token", + "token": "ft" + }, + { + "name": "mi", + "type": "pure-token", + "token": "mi" + } + ] } ] } @@ -207,4 +215,4 @@ } ] } -} +}
\ No newline at end of file diff --git a/src/commands/migrate.json b/src/commands/migrate.json index d07fe4b15..b9a52aa69 100644 --- a/src/commands/migrate.json +++ b/src/commands/migrate.json @@ -125,27 +125,33 @@ "since": "3.0.0" }, { - "token": "AUTH", - "name": "password", - "type": "string", - "optional": true, - "since": "4.0.7" - - }, - { - "token": "AUTH2", - "name": "username_password", - "type": "block", + "name": "authentication", + "type": "oneof", "optional": true, - "since": "6.0.0", "arguments": [ { - "name": "username", - "type": "string" + "token": "AUTH", + "name": "password", + "type": "string", + "optional": true, + "since": "4.0.7" }, { - "name": "password", - "type": "string" + "token": "AUTH2", + "name": "username_password", + "type": "block", + "optional": true, + "since": "6.0.0", + "arguments": [ + { + "name": "username", + "type": "string" + }, + { + "name": "password", + "type": "string" + } + ] } ] }, @@ -160,4 +166,4 @@ } ] } -} +}
\ No newline at end of file diff --git a/src/commands/module-loadex.json b/src/commands/module-loadex.json index e772cbfe4..97e8f2b58 100644 --- a/src/commands/module-loadex.json +++ b/src/commands/module-loadex.json @@ -23,6 +23,7 @@ "token": "CONFIG", "type": "block", "multiple": true, + "multiple_token": true, "optional": true, "arguments": [ { diff --git a/src/commands/set.json b/src/commands/set.json index 267ab311a..688d534d7 100644 --- a/src/commands/set.json +++ b/src/commands/set.json @@ -66,6 +66,31 @@ "type": "string" }, { + "name": "condition", + "type": "oneof", + "optional": true, + "since": "2.6.12", + "arguments": [ + { + "name": "nx", + "type": "pure-token", + "token": "NX" + }, + { + "name": "xx", + "type": "pure-token", + "token": "XX" + } + ] + }, + { + "name": "get", + "token": "GET", + "type": "pure-token", + "optional": true, + "since": "6.2.0" + }, + { "name": "expiration", "type": "oneof", "optional": true, @@ -101,31 +126,6 @@ "since": "6.0.0" } ] - }, - { - "name": "condition", - "type": "oneof", - "optional": true, - "since": "2.6.12", - "arguments": [ - { - "name": "nx", - "type": "pure-token", - "token": "NX" - }, - { - "name": "xx", - "type": "pure-token", - "token": "XX" - } - ] - }, - { - "name": "get", - "token": "GET", - "type": "pure-token", - "optional": true, - "since": "6.2.0" } ] } diff --git a/src/commands/zrangebylex.json b/src/commands/zrangebylex.json index a8aee82cc..75e82bce6 100644 --- a/src/commands/zrangebylex.json +++ b/src/commands/zrangebylex.json @@ -7,7 +7,7 @@ "arity": -4, "function": "zrangebylexCommand", "deprecated_since": "6.2.0", - "replaced_by": "`ZRANGE` with the `BYSCORE` argument", + "replaced_by": "`ZRANGE` with the `BYLEX` argument", "doc_flags": [ "DEPRECATED" ], diff --git a/src/config.c b/src/config.c index 3b5d4d349..0d435fecb 100644 --- a/src/config.c +++ b/src/config.c @@ -94,6 +94,15 @@ configEnum aof_fsync_enum[] = { {NULL, 0} }; +configEnum shutdown_on_sig_enum[] = { + {"default", 0}, + {"save", SHUTDOWN_SAVE}, + {"nosave", SHUTDOWN_NOSAVE}, + {"now", SHUTDOWN_NOW}, + {"force", SHUTDOWN_FORCE}, + {NULL, 0} +}; + configEnum repl_diskless_load_enum[] = { {"disabled", REPL_DISKLESS_LOAD_DISABLED}, {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY}, @@ -143,6 +152,13 @@ configEnum cluster_preferred_endpoint_type_enum[] = { {NULL, 0} }; +configEnum propagation_error_behavior_enum[] = { + {"ignore", PROPAGATION_ERR_BEHAVIOR_IGNORE}, + {"panic", PROPAGATION_ERR_BEHAVIOR_PANIC}, + {"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -276,33 +292,50 @@ static standardConfig *lookupConfig(sds name) { *----------------------------------------------------------------------------*/ /* Get enum value from name. If there is no match INT_MIN is returned. */ -int configEnumGetValue(configEnum *ce, char *name) { - while(ce->name != NULL) { - if (!strcasecmp(ce->name,name)) return ce->val; - ce++; +int configEnumGetValue(configEnum *ce, sds *argv, int argc, int bitflags) { + if (argc == 0 || (!bitflags && argc != 1)) return INT_MIN; + int values = 0; + for (int i = 0; i < argc; i++) { + int matched = 0; + for (configEnum *ceItem = ce; ceItem->name != NULL; ceItem++) { + if (!strcasecmp(argv[i],ceItem->name)) { + values |= ceItem->val; + matched = 1; + } + } + if (!matched) return INT_MIN; } - return INT_MIN; + return values; } -/* Get enum name from value. If no match is found NULL is returned. */ -const char *configEnumGetName(configEnum *ce, int val) { - while(ce->name != NULL) { - if (ce->val == val) return ce->name; - ce++; +/* Get enum name/s from value. If no matches are found "unknown" is returned. */ +static sds configEnumGetName(configEnum *ce, int values, int bitflags) { + sds names = NULL; + int matches = 0; + for( ; ce->name != NULL; ce++) { + if (values == ce->val) { /* Short path for perfect match */ + sdsfree(names); + return sdsnew(ce->name); + } + if (bitflags && (values & ce->val)) { + names = names ? sdscatfmt(names, " %s", ce->name) : sdsnew(ce->name); + matches |= ce->val; + } } - return NULL; -} - -/* Wrapper for configEnumGetName() returning "unknown" instead of NULL if - * there is no match. */ -const char *configEnumGetNameOrUnknown(configEnum *ce, int val) { - const char *name = configEnumGetName(ce,val); - return name ? name : "unknown"; + if (!names || values != matches) { + sdsfree(names); + return sdsnew("unknown"); + } + return names; } /* Used for INFO generation. */ const char *evictPolicyToString(void) { - return configEnumGetNameOrUnknown(maxmemory_policy_enum,server.maxmemory_policy); + for (configEnum *ce = maxmemory_policy_enum; ce->name != NULL; ce++) { + if (server.maxmemory_policy == ce->val) + return ce->name; + } + serverPanic("unknown eviction policy"); } /*----------------------------------------------------------------------------- @@ -514,12 +547,15 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) { queueLoadModule(argv[1],&argv[2],argc-2); } else if (strchr(argv[0], '.')) { - if (argc != 2) { + if (argc < 2) { err = "Module config specified without value"; goto loaderr; } sds name = sdsdup(argv[0]); - if (!dictReplace(server.module_configs_queue, name, sdsdup(argv[1]))) sdsfree(name); + sds val = sdsdup(argv[1]); + for (int i = 2; i < argc; i++) + val = sdscatfmt(val, " %S", argv[i]); + if (!dictReplace(server.module_configs_queue, name, val)) sdsfree(name); } else if (!strcasecmp(argv[0],"sentinel")) { /* argc == 1 is handled by main() as we need to enter the sentinel * mode ASAP. */ @@ -1297,12 +1333,13 @@ void rewriteConfigOctalOption(struct rewriteConfigState *state, const char *opti /* Rewrite an enumeration option. It takes as usually state and option name, * and in addition the enumeration array and the default value for the * option. */ -void rewriteConfigEnumOption(struct rewriteConfigState *state, const char *option, int value, configEnum *ce, int defval) { - sds line; - const char *name = configEnumGetNameOrUnknown(ce,value); - int force = value != defval; +void rewriteConfigEnumOption(struct rewriteConfigState *state, const char *option, int value, standardConfig *config) { + int multiarg = config->flags & MULTI_ARG_CONFIG; + sds names = configEnumGetName(config->data.enumd.enum_value,value,multiarg); + sds line = sdscatfmt(sdsempty(),"%s %s",option,names); + sdsfree(names); + int force = value != config->data.enumd.default_value; - line = sdscatprintf(sdsempty(),"%s %s",option,name); rewriteConfigRewriteLine(state,option,line,force); } @@ -1821,10 +1858,16 @@ static int sdsConfigSet(standardConfig *config, sds *argv, int argc, const char UNUSED(argc); if (config->data.sds.is_valid_fn && !config->data.sds.is_valid_fn(argv[0], err)) return 0; + sds prev = config->flags & MODULE_CONFIG ? getModuleStringConfig(config->privdata) : *config->data.sds.config; sds new = (config->data.string.convert_empty_to_null && (sdslen(argv[0]) == 0)) ? NULL : argv[0]; + + /* if prev and new configuration are not equal, set the new one */ if (new != prev && (new == NULL || prev == NULL || sdscmp(prev, new))) { + /* If MODULE_CONFIG flag is set, then free temporary prev getModuleStringConfig returned. + * Otherwise, free the actual previous config value Redis held (Same action, different reasons) */ sdsfree(prev); + if (config->flags & MODULE_CONFIG) { return setModuleStringConfig(config->privdata, new, err); } @@ -1848,7 +1891,7 @@ static sds sdsConfigGet(standardConfig *config) { static void sdsConfigRewrite(standardConfig *config, const char *name, struct rewriteConfigState *state) { sds val = config->flags & MODULE_CONFIG ? getModuleStringConfig(config->privdata) : *config->data.sds.config; rewriteConfigSdsOption(state, name, val, config->data.sds.default_value); - if (val) sdsfree(val); + if ((val) && (config->flags & MODULE_CONFIG)) sdsfree(val); } @@ -1885,10 +1928,12 @@ static void enumConfigInit(standardConfig *config) { } static int enumConfigSet(standardConfig *config, sds *argv, int argc, const char **err) { - UNUSED(argc); - int enumval = configEnumGetValue(config->data.enumd.enum_value, argv[0]); + int enumval; + int bitflags = !!(config->flags & MULTI_ARG_CONFIG); + enumval = configEnumGetValue(config->data.enumd.enum_value, argv, argc, bitflags); + if (enumval == INT_MIN) { - sds enumerr = sdsnew("argument must be one of the following: "); + sds enumerr = sdsnew("argument(s) must be one of the following: "); configEnum *enumNode = config->data.enumd.enum_value; while(enumNode->name != NULL) { enumerr = sdscatlen(enumerr, enumNode->name, @@ -1919,12 +1964,13 @@ static int enumConfigSet(standardConfig *config, sds *argv, int argc, const char static sds enumConfigGet(standardConfig *config) { int val = config->flags & MODULE_CONFIG ? getModuleEnumConfig(config->privdata) : *(config->data.enumd.config); - return sdsnew(configEnumGetNameOrUnknown(config->data.enumd.enum_value,val)); + int bitflags = !!(config->flags & MULTI_ARG_CONFIG); + return configEnumGetName(config->data.enumd.enum_value,val,bitflags); } static void enumConfigRewrite(standardConfig *config, const char *name, struct rewriteConfigState *state) { int val = config->flags & MODULE_CONFIG ? getModuleEnumConfig(config->privdata) : *(config->data.enumd.config); - rewriteConfigEnumOption(state, name, val, config->data.enumd.enum_value, config->data.enumd.default_value); + rewriteConfigEnumOption(state, name, val, config); } #define createEnumConfig(name, alias, flags, enum, config_addr, default, is_valid, apply) { \ @@ -2284,6 +2330,16 @@ static int isValidAOFdirname(char *val, const char **err) { return 1; } +static int isValidShutdownOnSigFlags(int val, const char **err) { + /* Individual arguments are validated by createEnumConfig logic. + * We just need to ensure valid combinations here. */ + if (val & SHUTDOWN_NOSAVE && val & SHUTDOWN_SAVE) { + *err = "shutdown options SAVE and NOSAVE can't be used simultaneously"; + return 0; + } + return 1; +} + static int isValidAnnouncedHostname(char *val, const char **err) { if (strlen(val) >= NET_HOST_STR_LEN) { *err = "Hostnames must be less than " @@ -2641,7 +2697,7 @@ static int setConfigOOMScoreAdjValuesOption(standardConfig *config, sds *argv, i if (*eptr != '\0' || val < -2000 || val > 2000) { if (err) *err = "Invalid oom-score-adj-values, elements must be between -2000 and 2000."; - return -1; + return 0; } values[i] = val; @@ -2691,7 +2747,7 @@ static int setConfigNotifyKeyspaceEventsOption(standardConfig *config, sds *argv } int flags = keyspaceEventsStringToFlags(argv[0]); if (flags == -1) { - *err = "Invalid event class character. Use 'Ag$lshzxeKEtmd'."; + *err = "Invalid event class character. Use 'Ag$lshzxeKEtmdn'."; return 0; } server.notify_keyspace_events = flags; @@ -2887,7 +2943,8 @@ standardConfig static_configs[] = { createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, server.replica_announced, 1, NULL, NULL), createBoolConfig("latency-tracking", NULL, MODIFIABLE_CONFIG, server.latency_tracking_enabled, 1, NULL, NULL), createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled), - + createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL), + /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), createStringConfig("unixsocket", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.unixsocket, NULL, NULL, NULL), @@ -2928,6 +2985,9 @@ standardConfig static_configs[] = { createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL), + createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), + createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), + createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), @@ -2971,6 +3031,7 @@ standardConfig static_configs[] = { /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("unixsocketperm", NULL, IMMUTABLE_CONFIG, 0, 0777, server.unixsocketperm, 0, OCTAL_CONFIG, NULL, NULL), + createUIntConfig("socket-mark-id", NULL, IMMUTABLE_CONFIG, 0, UINT_MAX, server.socket_mark_id, 0, INTEGER_CONFIG, NULL, NULL), /* Unsigned Long configs */ createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */ diff --git a/src/config.h b/src/config.h index 210e55a87..6baa8bd0f 100644 --- a/src/config.h +++ b/src/config.h @@ -80,6 +80,10 @@ /* MSG_NOSIGNAL. */ #ifdef __linux__ #define HAVE_MSG_NOSIGNAL 1 +#if defined(SO_MARK) +#define HAVE_SOCKOPTMARKID 1 +#define SOCKOPTMARKID SO_MARK +#endif #endif /* Test for polling API */ @@ -113,6 +117,20 @@ #define redis_fsync(fd) fsync(fd) #endif +#if defined(__FreeBSD__) +#if defined(SO_USER_COOKIE) +#define HAVE_SOCKOPTMARKID 1 +#define SOCKOPTMARKID SO_USER_COOKIE +#endif +#endif + +#if defined(__OpenBSD__) +#if defined(SO_RTABLE) +#define HAVE_SOCKOPTMARKID 1 +#define SOCKOPTMARKID SO_RTABLE +#endif +#endif + #if __GNUC__ >= 4 #define redis_unreachable __builtin_unreachable #else @@ -182,6 +182,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) { dictSetVal(db->dict, de, val); signalKeyAsReady(db, key, val->type); if (server.cluster_enabled) slotToKeyAddEntry(de, db); + notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); } /* This is a special version of dbAdd() that is used only when loading diff --git a/src/debug.c b/src/debug.c index 4f0e37777..081cb862f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -416,6 +416,8 @@ void debugCommand(client *c) { " Like HTSTATS but for the hash table stored at <key>'s value.", "LOADAOF", " Flush the AOF buffers on disk and reload the AOF in memory.", +"REPLICATE <string>", +" Replicates the provided string to replicas, allowing data divergence.", #ifdef USE_JEMALLOC "MALLCTL <key> [<val>]", " Get or set a malloc tuning integer.", @@ -849,6 +851,10 @@ NULL { server.aof_flush_sleep = atoi(c->argv[2]->ptr); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) { + replicationFeedSlaves(server.slaves, server.slaveseldb, + c->argv + 2, c->argc - 2); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) { sds errstr = sdsnewlen("-",1); diff --git a/src/eval.c b/src/eval.c index 22bcbdb73..332aec9ce 100644 --- a/src/eval.c +++ b/src/eval.c @@ -218,31 +218,20 @@ void scriptingInit(int setup) { lua_setglobal(lua,"redis"); - /* Add a helper function that we use to sort the multi bulk output of non - * deterministic commands, when containing 'false' elements. */ - { - char *compare_func = "function __redis__compare_helper(a,b)\n" - " if a == false then a = '' end\n" - " if b == false then b = '' end\n" - " return a<b\n" - "end\n"; - luaL_loadbuffer(lua,compare_func,strlen(compare_func),"@cmp_func_def"); - lua_pcall(lua,0,0,0); - } - /* Add a helper function we use for pcall error reporting. * Note that when the error is in the C function we want to report the * information about the caller, that's what makes sense from the point * of view of the user debugging a script. */ { char *errh_func = "local dbg = debug\n" + "debug = nil\n" "function __redis__err__handler(err)\n" " local i = dbg.getinfo(2,'nSl')\n" " if i and i.what == 'C' then\n" " i = dbg.getinfo(3,'nSl')\n" " end\n" " if type(err) ~= 'table' then\n" - " err = {err='ERR' .. tostring(err)}" + " err = {err='ERR ' .. tostring(err)}" " end" " if i then\n" " err['source'] = i.source\n" @@ -266,10 +255,12 @@ void scriptingInit(int setup) { lctx.lua_client->flags |= CLIENT_DENY_BLOCKING; } - /* Lua beginners often don't use "local", this is likely to introduce - * subtle bugs in their code. To prevent problems we protect accesses - * to global variables. */ - luaEnableGlobalsProtection(lua, 1); + /* Lock the global table from any changes */ + lua_pushvalue(lua, LUA_GLOBALSINDEX); + luaSetErrorMetatable(lua); + /* Recursively lock all tables that can be reached from the global table */ + luaSetTableProtectionRecursively(lua); + lua_pop(lua, 1); lctx.lua = lua; } @@ -378,35 +369,20 @@ sds luaCreateFunction(client *c, robj *body) { sdsfreesplitres(parts, numparts); } - /* Build the lua function to be loaded */ - sds funcdef = sdsempty(); - funcdef = sdscat(funcdef,"function "); - funcdef = sdscatlen(funcdef,funcname,42); - funcdef = sdscatlen(funcdef,"() ",3); /* Note that in case of a shebang line we skip it but keep the line feed to conserve the user's line numbers */ - funcdef = sdscatlen(funcdef,(char*)body->ptr + shebang_len,sdslen(body->ptr) - shebang_len); - funcdef = sdscatlen(funcdef,"\nend",4); - - if (luaL_loadbuffer(lctx.lua,funcdef,sdslen(funcdef),"@user_script")) { + if (luaL_loadbuffer(lctx.lua,(char*)body->ptr + shebang_len,sdslen(body->ptr) - shebang_len,"@user_script")) { if (c != NULL) { addReplyErrorFormat(c, "Error compiling script (new function): %s", lua_tostring(lctx.lua,-1)); } lua_pop(lctx.lua,1); - sdsfree(funcdef); return NULL; } - sdsfree(funcdef); - if (lua_pcall(lctx.lua,0,0,0)) { - if (c != NULL) { - addReplyErrorFormat(c,"Error running script (new function): %s", - lua_tostring(lctx.lua,-1)); - } - lua_pop(lctx.lua,1); - return NULL; - } + serverAssert(lua_isfunction(lctx.lua, -1)); + + lua_setfield(lctx.lua, LUA_REGISTRYINDEX, funcname); /* We also save a SHA1 -> Original script map in a dictionary * so that we can replicate / write in the AOF all the @@ -479,7 +455,7 @@ void evalGenericCommand(client *c, int evalsha) { lua_getglobal(lua, "__redis__err__handler"); /* Try to lookup the Lua function */ - lua_getglobal(lua, funcname); + lua_getfield(lua, LUA_REGISTRYINDEX, funcname); if (lua_isnil(lua,-1)) { lua_pop(lua,1); /* remove the nil from the stack */ /* Function not defined... let's define it if we have the @@ -497,7 +473,7 @@ void evalGenericCommand(client *c, int evalsha) { return; } /* Now the following is guaranteed to return non nil */ - lua_getglobal(lua, funcname); + lua_getfield(lua, LUA_REGISTRYINDEX, funcname); serverAssert(!lua_isnil(lua,-1)); } diff --git a/src/evict.c b/src/evict.c index 933141638..a5821a463 100644 --- a/src/evict.c +++ b/src/evict.c @@ -492,10 +492,6 @@ static int isSafeToPerformEvictions(void) { * expires and evictions of keys not being performed. */ if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; - /* We cannot evict if we already have stuff to propagate (for example, - * CONFIG SET maxmemory inside a MULTI/EXEC) */ - if (server.also_propagate.numops != 0) return 0; - return 1; } diff --git a/src/function_lua.c b/src/function_lua.c index 8f21a1721..2e0250ea2 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -50,6 +50,7 @@ #define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__" #define REGISTRY_LOAD_CTX_NAME "__LIBRARY_CTX__" #define LIBRARY_API_NAME "__LIBRARY_API__" +#define GLOBALS_API_NAME "__GLOBALS_API__" #define LOAD_TIMEOUT_MS 500 /* Lua engine ctx */ @@ -99,42 +100,23 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) { * Return NULL on compilation error and set the error to the err variable */ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds *err) { + int ret = C_ERR; luaEngineCtx *lua_engine_ctx = engine_ctx; lua_State *lua = lua_engine_ctx->lua; - /* Each library will have its own global distinct table. - * We will create a new fresh Lua table and use - * lua_setfenv to set the table as the library globals - * (https://www.lua.org/manual/5.1/manual.html#lua_setfenv) - * - * At first, populate this new table with only the 'library' API - * to make sure only 'library' API is available at start. After the - * initial run is finished and all functions are registered, add - * all the default globals to the library global table and delete - * the library API. - * - * There are 2 ways to achieve the last part (add default - * globals to the new table): - * - * 1. Initialize the new table with all the default globals - * 2. Inheritance using metatable (https://www.lua.org/pil/14.3.html) - * - * For now we are choosing the second, we can change it in the future to - * achieve a better isolation between functions. */ - lua_newtable(lua); /* Global table for the library */ - lua_pushstring(lua, REDIS_API_NAME); - lua_pushstring(lua, LIBRARY_API_NAME); - lua_gettable(lua, LUA_REGISTRYINDEX); /* get library function from registry */ - lua_settable(lua, -3); /* push the library table to the new global table */ - - /* Set global protection on the new global table */ - luaSetGlobalProtection(lua_engine_ctx->lua); + /* set load library globals */ + lua_getmetatable(lua, LUA_GLOBALSINDEX); + lua_enablereadonlytable(lua, -1, 0); /* disable global protection */ + lua_getfield(lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME); + lua_setfield(lua, -2, "__index"); + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); /* enable global protection */ + lua_pop(lua, 1); /* pop the metatable */ /* compile the code */ if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) { *err = sdscatprintf(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1)); - lua_pop(lua, 2); /* pops the error and globals table */ - return C_ERR; + lua_pop(lua, 1); /* pops the error */ + goto done; } serverAssert(lua_isfunction(lua, -1)); @@ -144,45 +126,31 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds }; luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, &load_ctx); - /* set the function environment so only 'library' API can be accessed. */ - lua_pushvalue(lua, -2); /* push global table to the front */ - lua_setfenv(lua, -2); - lua_sethook(lua,luaEngineLoadHook,LUA_MASKCOUNT,100000); /* Run the compiled code to allow it to register functions */ if (lua_pcall(lua,0,0,0)) { errorInfo err_info = {0}; luaExtractErrorInformation(lua, &err_info); *err = sdscatprintf(sdsempty(), "Error registering functions: %s", err_info.msg); - lua_pop(lua, 2); /* pops the error and globals table */ - lua_sethook(lua,NULL,0,0); /* Disable hook */ - luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL); + lua_pop(lua, 1); /* pops the error */ luaErrorInformationDiscard(&err_info); - return C_ERR; + goto done; } - lua_sethook(lua,NULL,0,0); /* Disable hook */ - luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL); - /* stack contains the global table, lets rearrange it to contains the entire API. */ - /* delete 'redis' API */ - lua_pushstring(lua, REDIS_API_NAME); - lua_pushnil(lua); - lua_settable(lua, -3); - - /* create metatable */ - lua_newtable(lua); - lua_pushstring(lua, "__index"); - lua_pushvalue(lua, LUA_GLOBALSINDEX); /* push original globals */ - lua_settable(lua, -3); - lua_pushstring(lua, "__newindex"); - lua_pushvalue(lua, LUA_GLOBALSINDEX); /* push original globals */ - lua_settable(lua, -3); - - lua_setmetatable(lua, -2); + ret = C_OK; - lua_pop(lua, 1); /* pops the global table */ +done: + /* restore original globals */ + lua_getmetatable(lua, LUA_GLOBALSINDEX); + lua_enablereadonlytable(lua, -1, 0); /* disable global protection */ + lua_getfield(lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME); + lua_setfield(lua, -2, "__index"); + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); /* enable global protection */ + lua_pop(lua, 1); /* pop the metatable */ - return C_OK; + lua_sethook(lua,NULL,0,0); /* Disable hook */ + luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, NULL); + return ret; } /* @@ -458,8 +426,8 @@ int luaEngineInitEngine() { luaRegisterRedisAPI(lua_engine_ctx->lua); /* Register the library commands table and fields and store it to registry */ - lua_pushstring(lua_engine_ctx->lua, LIBRARY_API_NAME); - lua_newtable(lua_engine_ctx->lua); + lua_newtable(lua_engine_ctx->lua); /* load library globals */ + lua_newtable(lua_engine_ctx->lua); /* load library `redis` table */ lua_pushstring(lua_engine_ctx->lua, "register_function"); lua_pushcfunction(lua_engine_ctx->lua, luaRegisterFunction); @@ -468,18 +436,24 @@ int luaEngineInitEngine() { luaRegisterLogFunction(lua_engine_ctx->lua); luaRegisterVersion(lua_engine_ctx->lua); - lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX); + luaSetErrorMetatable(lua_engine_ctx->lua); + lua_setfield(lua_engine_ctx->lua, -2, REDIS_API_NAME); + + luaSetErrorMetatable(lua_engine_ctx->lua); + luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect load library globals */ + lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME); /* Save error handler to registry */ lua_pushstring(lua_engine_ctx->lua, REGISTRY_ERROR_HANDLER_NAME); char *errh_func = "local dbg = debug\n" + "debug = nil\n" "local error_handler = function (err)\n" " local i = dbg.getinfo(2,'nSl')\n" " if i and i.what == 'C' then\n" " i = dbg.getinfo(3,'nSl')\n" " end\n" " if type(err) ~= 'table' then\n" - " err = {err='ERR' .. tostring(err)}" + " err = {err='ERR ' .. tostring(err)}" " end" " if i then\n" " err['source'] = i.source\n" @@ -492,17 +466,30 @@ int luaEngineInitEngine() { lua_pcall(lua_engine_ctx->lua,0,1,0); lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX); - /* Save global protection to registry */ - luaRegisterGlobalProtectionFunction(lua_engine_ctx->lua); - - /* Set global protection on globals */ lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX); - luaSetGlobalProtection(lua_engine_ctx->lua); + luaSetErrorMetatable(lua_engine_ctx->lua); + luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect globals */ lua_pop(lua_engine_ctx->lua, 1); + /* Save default globals to registry */ + lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX); + lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME); + /* save the engine_ctx on the registry so we can get it from the Lua interpreter */ luaSaveOnRegistry(lua_engine_ctx->lua, REGISTRY_ENGINE_CTX_NAME, lua_engine_ctx); + /* Create new empty table to be the new globals, we will be able to control the real globals + * using metatable */ + lua_newtable(lua_engine_ctx->lua); /* new globals */ + lua_newtable(lua_engine_ctx->lua); /* new globals metatable */ + lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX); + lua_setfield(lua_engine_ctx->lua, -2, "__index"); + lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the metatable */ + lua_setmetatable(lua_engine_ctx->lua, -2); + lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the new global table */ + lua_replace(lua_engine_ctx->lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */ + + engine *lua_engine = zmalloc(sizeof(*lua_engine)); *lua_engine = (engine) { .engine_ctx = lua_engine_ctx, diff --git a/src/help.h b/src/help.h index e25ca3fa3..dfecf5981 100644 --- a/src/help.h +++ b/src/help.h @@ -1,4 +1,4 @@ -/* Automatically generated by utils/generate-command-help.rb, do not edit. */ +/* Automatically generated by ./generate-command-help.rb, do not edit. */ #ifndef __REDIS_HELP_H #define __REDIS_HELP_H @@ -130,12 +130,12 @@ struct commandHelp { 15, "2.6.0" }, { "BITFIELD", - "key [GET encoding offset] [SET encoding offset value] [INCRBY encoding offset increment] [OVERFLOW WRAP|SAT|FAIL]", + "key GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment [GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment ...]", "Perform arbitrary bitfield integer operations on strings", 15, "3.2.0" }, { "BITFIELD_RO", - "key GET encoding offset", + "key GET encoding offset [encoding offset ...]", "Perform arbitrary bitfield integer operations on strings. Read-only variant of BITFIELD", 15, "6.2.0" }, @@ -690,12 +690,12 @@ struct commandHelp { 13, "3.2.10" }, { "GEOSEARCH", - "key [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius M|KM|FT|MI] [BYBOX width height M|KM|FT|MI] [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]", + "key FROMMEMBER member|FROMLONLAT longitude latitude BYRADIUS radius M|KM|FT|MI|BYBOX width height M|KM|FT|MI [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]", "Query a sorted set representing a geospatial index to fetch members inside an area of a box or a circle.", 13, "6.2.0" }, { "GEOSEARCHSTORE", - "destination source [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius M|KM|FT|MI] [BYBOX width height M|KM|FT|MI] [ASC|DESC] [COUNT count [ANY]] [STOREDIST]", + "destination source FROMMEMBER member|FROMLONLAT longitude latitude BYRADIUS radius M|KM|FT|MI|BYBOX width height M|KM|FT|MI [ASC|DESC] [COUNT count [ANY]] [STOREDIST]", "Query a sorted set representing a geospatial index to fetch members inside an area of a box or a circle, and store the result in another key.", 13, "6.2.0" }, @@ -1000,7 +1000,7 @@ struct commandHelp { 1, "1.0.0" }, { "MIGRATE", - "host port key| destination-db timeout [COPY] [REPLACE] [AUTH password] [AUTH2 username password] [KEYS key [key ...]]", + "host port key| destination-db timeout [COPY] [REPLACE] [[AUTH password]|[AUTH2 username password]] [KEYS key [key ...]]", "Atomically transfer a key from a Redis instance to another one.", 0, "2.6.0" }, @@ -1355,7 +1355,7 @@ struct commandHelp { 8, "1.0.0" }, { "SET", - "key value [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL] [NX|XX] [GET]", + "key value [NX|XX] [GET] [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL]", "Set the string value of a key", 1, "1.0.0" }, diff --git a/src/listpack.c b/src/listpack.c index e651e4960..75189f55f 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -180,7 +180,8 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) { int negative = 0; uint64_t v; - if (plen == slen) + /* Abort if length indicates this cannot possibly be an int */ + if (slen == 0 || slen >= LONG_STR_SIZE) return 0; /* Special case: first and only digit is 0. */ diff --git a/src/listpack.h b/src/listpack.h index 6c4d6bdd6..3e750af5b 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -59,6 +59,8 @@ void lpFree(unsigned char *lp); unsigned char* lpShrinkToFit(unsigned char *lp); unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen, unsigned char *p, int where, unsigned char **newp); +unsigned char *lpInsertInteger(unsigned char *lp, long long lval, + unsigned char *p, int where, unsigned char **newp); unsigned char *lpPrepend(unsigned char *lp, unsigned char *s, uint32_t slen); unsigned char *lpPrependInteger(unsigned char *lp, long long lval); unsigned char *lpAppend(unsigned char *lp, unsigned char *s, uint32_t slen); diff --git a/src/module.c b/src/module.c index 3fc6a5499..99d2adcd4 100644 --- a/src/module.c +++ b/src/module.c @@ -464,11 +464,18 @@ static int moduleConvertArgFlags(int flags); /* Use like malloc(). Memory allocated with this function is reported in * Redis INFO memory, used for keys eviction according to maxmemory settings * and in general is taken into account as memory allocated by Redis. - * You should avoid using malloc(). */ + * You should avoid using malloc(). + * This function panics if unable to allocate enough memory. */ void *RM_Alloc(size_t bytes) { return zmalloc(bytes); } +/* Similar to RM_Alloc, but returns NULL in case of allocation failure, instead + * of panicking. */ +void *RM_TryAlloc(size_t bytes) { + return ztrymalloc(bytes); +} + /* Use like calloc(). Memory allocated with this function is reported in * Redis INFO memory, used for keys eviction according to maxmemory settings * and in general is taken into account as memory allocated by Redis. @@ -710,6 +717,8 @@ void moduleFreeContext(RedisModuleCtx *ctx) { if (server.busy_module_yield_flags) { blockingOperationEnds(); server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; + if (server.current_client) + unprotectClient(server.current_client); unblockPostponedClients(); } } @@ -1040,9 +1049,9 @@ RedisModuleCommand *moduleCreateCommandProxy(struct RedisModule *module, sds dec * serve stale data. Don't use if you don't know what * this means. * * **"no-monitor"**: Don't propagate the command on monitor. Use this if - * the command has sensible data among the arguments. + * the command has sensitive data among the arguments. * * **"no-slowlog"**: Don't log this command in the slowlog. Use this if - * the command has sensible data among the arguments. + * the command has sensitive data among the arguments. * * **"fast"**: The command time complexity is not greater * than O(log(N)) where N is the size of the collection or * anything else representing the normal scalability @@ -1924,6 +1933,7 @@ static struct redisCommandArg *moduleCopyCommandArgs(RedisModuleCommandArg *args if (arg->token) realargs[j].token = zstrdup(arg->token); if (arg->summary) realargs[j].summary = zstrdup(arg->summary); if (arg->since) realargs[j].since = zstrdup(arg->since); + if (arg->deprecated_since) realargs[j].deprecated_since = zstrdup(arg->deprecated_since); realargs[j].flags = moduleConvertArgFlags(arg->flags); if (arg->subargs) realargs[j].subargs = moduleCopyCommandArgs(arg->subargs, version); } @@ -2079,6 +2089,12 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { * the -LOADING error) */ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { + static int yield_nesting = 0; + /* Avoid nested calls to RM_Yield */ + if (yield_nesting) + return; + yield_nesting++; + long long now = getMonotonicUs(); if (now >= ctx->next_yield_time) { /* In loading mode, there's no need to handle busy_module_yield_reply, @@ -2092,10 +2108,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { server.busy_module_yield_reply = busy_reply; /* start the blocking operation if not already started. */ if (!server.busy_module_yield_flags) { - server.busy_module_yield_flags = flags & REDISMODULE_YIELD_FLAG_CLIENTS ? - BUSY_MODULE_YIELD_CLIENTS : BUSY_MODULE_YIELD_EVENTS; + server.busy_module_yield_flags = BUSY_MODULE_YIELD_EVENTS; blockingOperationStarts(); + if (server.current_client) + protectClient(server.current_client); } + if (flags & REDISMODULE_YIELD_FLAG_CLIENTS) + server.busy_module_yield_flags |= BUSY_MODULE_YIELD_CLIENTS; /* Let redis process events */ processEventsWhileBlocked(); @@ -2110,6 +2129,7 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { /* decide when the next event should fire. */ ctx->next_yield_time = now + 1000000 / server.hz; } + yield_nesting--; } /* Set flags defining capabilities or behavior bit flags. @@ -2639,9 +2659,7 @@ void RM_TrimStringAllocation(RedisModuleString *str) { * if (argc != 3) return RedisModule_WrongArity(ctx); */ int RM_WrongArity(RedisModuleCtx *ctx) { - addReplyErrorFormat(ctx->client, - "wrong number of arguments for '%s' command", - (char*)ctx->client->argv[0]->ptr); + addReplyErrorArity(ctx->client); return REDISMODULE_OK; } @@ -3365,10 +3383,13 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { /* Publish a message to subscribers (see PUBLISH command). */ int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { UNUSED(ctx); - int receivers = pubsubPublishMessage(channel, message); - if (server.cluster_enabled) - clusterPropagatePublish(channel, message); - return receivers; + return pubsubPublishMessageAndPropagateToCluster(channel, message, 0); +} + +/* Publish a message to shard-subscribers (see SPUBLISH command). */ +int RM_PublishMessageShard(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + return pubsubPublishMessageAndPropagateToCluster(channel, message, 1); } /* Return the currently selected DB. */ @@ -3615,7 +3636,7 @@ static void moduleInitKeyTypeSpecific(RedisModuleKey *key) { * key does not exist, NULL is returned. However it is still safe to * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL * value. */ -void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { +RedisModuleKey *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { RedisModuleKey *kp; robj *value; int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0; @@ -3633,7 +3654,7 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { kp = zmalloc(sizeof(*kp)); moduleInitKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); - return (void*)kp; + return kp; } /* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */ @@ -5736,26 +5757,18 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* Lookup command now, after filters had a chance to make modifications * if necessary. */ - cmd = lookupCommand(c->argv,c->argc); - if (!cmd) { + cmd = c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); + sds err; + if (!commandCheckExistence(c, error_as_call_replies? &err : NULL)) { errno = ENOENT; - if (error_as_call_replies) { - sds msg = sdscatfmt(sdsempty(),"Unknown Redis " - "command '%S'.",c->argv[0]->ptr); - reply = callReplyCreateError(msg, ctx); - } + if (error_as_call_replies) + reply = callReplyCreateError(err, ctx); goto cleanup; } - c->cmd = c->lastcmd = c->realcmd = cmd; - - /* Basic arity checks. */ - if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { + if (!commandCheckArity(c, error_as_call_replies? &err : NULL)) { errno = EINVAL; - if (error_as_call_replies) { - sds msg = sdscatfmt(sdsempty(), "Wrong number of " - "args calling Redis command '%S'.", c->cmd->fullname); - reply = callReplyCreateError(msg, ctx); - } + if (error_as_call_replies) + reply = callReplyCreateError(err, ctx); goto cleanup; } @@ -5798,8 +5811,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch } int deny_write_type = writeCommandsDeniedByDiskError(); + int obey_client = mustObeyClient(server.current_client); - if (deny_write_type != DISK_ERROR_TYPE_NONE) { + if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) { errno = ENOSPC; if (error_as_call_replies) { sds msg = writeCommandsGetDiskErrorMessage(deny_write_type); @@ -5841,7 +5855,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* If this is a Redis Cluster node, we need to make sure the module is not * trying to access non-local keys, with the exception of commands * received from our master. */ - if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) { + if (server.cluster_enabled && !mustObeyClient(ctx->client)) { int error_code; /* Duplicate relevant flags in the module client. */ c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); @@ -5890,11 +5904,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) call_flags |= CMD_CALL_PROPAGATE_REPL; } - /* Set server.current_client */ - client *old_client = server.current_client; - server.current_client = c; call(c,call_flags); - server.current_client = old_client; server.replication_allowed = prev_replication_allowed; serverAssert((c->flags & CLIENT_BLOCKED) == 0); @@ -6074,6 +6084,14 @@ const char *moduleTypeModuleName(moduleType *mt) { return mt->module->name; } +/* Return the module name from a module command */ +const char *moduleNameFromCommand(struct redisCommand *cmd) { + serverAssert(cmd->proc == RedisModuleCommandDispatcher); + + RedisModuleCommand *cp = (void*)(unsigned long)cmd->getkeys_proc; + return cp->module->name; +} + /* Create a copy of a module type value using the copy callback. If failed * or not supported, produce an error reply and return NULL. */ @@ -7623,6 +7641,8 @@ void moduleGILBeforeUnlock() { if (server.busy_module_yield_flags) { blockingOperationEnds(); server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; + if (server.current_client) + unprotectClient(server.current_client); unblockPostponedClients(); } } @@ -8676,8 +8696,18 @@ int RM_ACLCheckChannelPermissions(RedisModuleUser *user, RedisModuleString *ch, * Returns REDISMODULE_OK on success and REDISMODULE_ERR on error. * * For more information about ACL log, please refer to https://redis.io/commands/acl-log */ -void RM_ACLAddLogEntry(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object) { - addACLLogEntry(ctx->client, 0, ACL_LOG_CTX_MODULE, -1, user->user->name, sdsdup(object->ptr)); +int RM_ACLAddLogEntry(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object, RedisModuleACLLogEntryReason reason) { + int acl_reason; + switch (reason) { + case REDISMODULE_ACL_LOG_AUTH: acl_reason = ACL_DENIED_AUTH; break; + case REDISMODULE_ACL_LOG_KEY: acl_reason = ACL_DENIED_KEY; break; + case REDISMODULE_ACL_LOG_CHANNEL: acl_reason = ACL_DENIED_CHANNEL; break; + case REDISMODULE_ACL_LOG_CMD: acl_reason = ACL_DENIED_CMD; break; + default: return REDISMODULE_ERR; + } + + addACLLogEntry(ctx->client, acl_reason, ACL_LOG_CTX_MODULE, -1, user->user->name, sdsdup(object->ptr)); + return REDISMODULE_OK; } /* Authenticate the client associated with the context with @@ -9730,10 +9760,29 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) * with the allocation calls, since sometimes the underlying allocator * will allocate more memory. */ -size_t RM_MallocSize(void* ptr){ +size_t RM_MallocSize(void* ptr) { return zmalloc_size(ptr); } +/* Same as RM_MallocSize, except it works on RedisModuleString pointers. + */ +size_t RM_MallocSizeString(RedisModuleString* str) { + serverAssert(str->type == OBJ_STRING); + return sizeof(*str) + getStringObjectSdsUsedMemory(str); +} + +/* Same as RM_MallocSize, except it works on RedisModuleDict pointers. + * Note that the returned value is only the overhead of the underlying structures, + * it does not include the allocation size of the keys and values. + */ +size_t RM_MallocSizeDict(RedisModuleDict* dict) { + size_t size = sizeof(RedisModuleDict) + sizeof(rax); + size += dict->rax->numnodes * sizeof(raxNode); + /* For more info about this weird line, see streamRadixTreeMemoryUsage */ + size += dict->rax->numnodes * sizeof(long)*30; + return size; +} + /* Return the a number between 0 to 1 indicating the amount of memory * currently used, relative to the Redis "maxmemory" configuration. * @@ -10908,6 +10957,7 @@ int moduleFreeCommand(struct RedisModule *module, struct redisCommand *cmd) { } zfree((char *)cmd->summary); zfree((char *)cmd->since); + zfree((char *)cmd->deprecated_since); zfree((char *)cmd->complexity); if (cmd->latency_histogram) { hdr_close(cmd->latency_histogram); @@ -11275,6 +11325,7 @@ int moduleVerifyConfigFlags(unsigned int flags, configType type) { | REDISMODULE_CONFIG_HIDDEN | REDISMODULE_CONFIG_PROTECTED | REDISMODULE_CONFIG_DENY_LOADING + | REDISMODULE_CONFIG_BITFLAGS | REDISMODULE_CONFIG_MEMORY))) { serverLogRaw(LL_WARNING, "Invalid flag(s) for configuration"); return REDISMODULE_ERR; @@ -11283,6 +11334,10 @@ int moduleVerifyConfigFlags(unsigned int flags, configType type) { serverLogRaw(LL_WARNING, "Numeric flag provided for non-numeric configuration."); return REDISMODULE_ERR; } + if (type != ENUM_CONFIG && flags & REDISMODULE_CONFIG_BITFLAGS) { + serverLogRaw(LL_WARNING, "Enum flag provided for non-enum configuration."); + return REDISMODULE_ERR; + } return REDISMODULE_OK; } @@ -11484,6 +11539,12 @@ unsigned int maskModuleNumericConfigFlags(unsigned int flags) { return new_flags; } +unsigned int maskModuleEnumConfigFlags(unsigned int flags) { + unsigned int new_flags = 0; + if (flags & REDISMODULE_CONFIG_BITFLAGS) new_flags |= MULTI_ARG_CONFIG; + return new_flags; +} + /* Create a string config that Redis users can interact with via the Redis config file, * `CONFIG SET`, `CONFIG GET`, and `CONFIG REWRITE` commands. * @@ -11523,6 +11584,7 @@ unsigned int maskModuleNumericConfigFlags(unsigned int flags) { * * REDISMODULE_CONFIG_PROTECTED: This config will be only be modifiable based off the value of enable-protected-configs. * * REDISMODULE_CONFIG_DENY_LOADING: This config is not modifiable while the server is loading data. * * REDISMODULE_CONFIG_MEMORY: For numeric configs, this config will convert data unit notations into their byte equivalent. + * * REDISMODULE_CONFIG_BITFLAGS: For enum configs, this config will allow multiple entries to be combined as bit flags. * * Default values are used on startup to set the value if it is not provided via the config file * or command line. Default values are also used to compare to on a config rewrite. @@ -11638,7 +11700,7 @@ int RM_RegisterEnumConfig(RedisModuleCtx *ctx, const char *name, int default_val enum_vals[num_enum_vals].name = NULL; enum_vals[num_enum_vals].val = 0; listAddNodeTail(module->module_configs, new_config); - flags = maskModuleConfigFlags(flags); + flags = maskModuleConfigFlags(flags) | maskModuleEnumConfigFlags(flags); addModuleEnumConfig(module->name, name, flags, new_config, default_val, enum_vals); return REDISMODULE_OK; } @@ -12225,6 +12287,7 @@ void moduleRegisterCoreAPI(void) { server.moduleapi = dictCreate(&moduleAPIDictType); server.sharedapi = dictCreate(&moduleAPIDictType); REGISTER_API(Alloc); + REGISTER_API(TryAlloc); REGISTER_API(Calloc); REGISTER_API(Realloc); REGISTER_API(Free); @@ -12490,6 +12553,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ServerInfoGetFieldDouble); REGISTER_API(GetClientInfoById); REGISTER_API(PublishMessage); + REGISTER_API(PublishMessageShard); REGISTER_API(SubscribeToServerEvent); REGISTER_API(SetLRU); REGISTER_API(GetLRU); @@ -12500,6 +12564,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetBlockedClientReadyKey); REGISTER_API(GetUsedMemoryRatio); REGISTER_API(MallocSize); + REGISTER_API(MallocSizeString); + REGISTER_API(MallocSizeDict); REGISTER_API(ScanCursorCreate); REGISTER_API(ScanCursorDestroy); REGISTER_API(ScanCursorRestart); diff --git a/src/modules/Makefile b/src/modules/Makefile index c4bc7eb1a..b9ef5786d 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -28,42 +28,42 @@ all: helloworld.so hellotype.so helloblock.so hellocluster.so hellotimer.so hell helloworld.xo: ../redismodule.h helloworld.so: helloworld.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc hellotype.xo: ../redismodule.h hellotype.so: hellotype.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc helloblock.xo: ../redismodule.h helloblock.so: helloblock.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc hellocluster.xo: ../redismodule.h hellocluster.so: hellocluster.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc hellotimer.xo: ../redismodule.h hellotimer.so: hellotimer.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc hellohook.xo: ../redismodule.h hellohook.so: hellohook.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc helloacl.xo: ../redismodule.h helloacl.so: helloacl.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $^ $(SHOBJ_LDFLAGS) $(LIBS) -lc clean: rm -rf *.xo *.so diff --git a/src/monotonic.c b/src/monotonic.c index 5bb4f03bf..608fa351c 100644 --- a/src/monotonic.c +++ b/src/monotonic.c @@ -168,3 +168,13 @@ const char * monotonicInit() { return monotonic_info_string; } + +const char *monotonicInfoString() { + return monotonic_info_string; +} + +monotonic_clock_type monotonicGetType() { + if (getMonotonicUs == getMonotonicUs_posix) + return MONOTONIC_CLOCK_POSIX; + return MONOTONIC_CLOCK_HW; +} diff --git a/src/monotonic.h b/src/monotonic.h index 4e82f9d53..32cf70638 100644 --- a/src/monotonic.h +++ b/src/monotonic.h @@ -24,13 +24,22 @@ typedef uint64_t monotime; /* Retrieve counter of micro-seconds relative to an arbitrary point in time. */ extern monotime (*getMonotonicUs)(void); +typedef enum monotonic_clock_type { + MONOTONIC_CLOCK_POSIX, + MONOTONIC_CLOCK_HW, +} monotonic_clock_type; /* Call once at startup to initialize the monotonic clock. Though this only * needs to be called once, it may be called additional times without impact. * Returns a printable string indicating the type of clock initialized. * (The returned string is static and doesn't need to be freed.) */ -const char * monotonicInit(); +const char *monotonicInit(); +/* Return a string indicating the type of monotonic clock being used. */ +const char *monotonicInfoString(); + +/* Return the type of monotonic clock being used. */ +monotonic_clock_type monotonicGetType(); /* Functions to measure elapsed time. Example: * monotime myTimer; diff --git a/src/networking.c b/src/networking.c index 767d871d8..0664e2bf0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -160,6 +160,7 @@ client *createClient(connection *conn) { c->bulklen = -1; c->sentlen = 0; c->flags = 0; + c->slot = -1; c->ctime = c->lastinteraction = server.unixtime; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; @@ -215,6 +216,23 @@ client *createClient(connection *conn) { return c; } +void installClientWriteHandler(client *c) { + int ae_barrier = 0; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. the write barrier ensures that. */ + if (server.aof_state == AOF_ON && + server.aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_barrier = 1; + } + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { + freeClientAsync(c); + } +} + /* This function puts the client in the queue of clients that should write * their output buffers to the socket. Note that it does not *yet* install * the write handler, to start clients are put in a queue of clients that need @@ -222,7 +240,7 @@ client *createClient(connection *conn) { * handleClientsWithPendingWrites() function). * If we fail and there is more data to write, compared to what the socket * buffers can hold, then we'll really install the handler. */ -void clientInstallWriteHandler(client *c) { +void putClientInPendingWriteQueue(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ @@ -285,11 +303,11 @@ int prepareClientToWrite(client *c) { * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should - * not install a write handler. Instead, it will be done by - * handleClientsWithPendingReadsUsingThreads() upon return. + * not put the client in pending write queue. Instead, it will be + * done by handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -521,6 +539,21 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) { showLatestBacklog(); } server.stat_unexpected_error_replies++; + + /* Based off the propagation error behavior, check if we need to panic here. There + * are currently two checked cases: + * * If this command was from our master and we are not a writable replica. + * * We are reading from an AOF file. */ + int panic_in_replicas = (ctype == CLIENT_TYPE_MASTER && server.repl_slave_ro) + && (server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC || + server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS); + int panic_in_aof = c->id == CLIENT_ID_AOF + && server.propagation_error_behavior == PROPAGATION_ERR_BEHAVIOR_PANIC; + if (panic_in_replicas || panic_in_aof) { + serverPanic("This %s panicked sending an error to its %s" + " after processing the command '%s'", + from, to, cmdname ? cmdname : "<unknown>"); + } } } @@ -1061,7 +1094,7 @@ void addReplySubcommandSyntaxError(client *c) { sds cmd = sdsnew((char*) c->argv[0]->ptr); sdstoupper(cmd); addReplyErrorFormat(c, - "Unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.", + "unknown subcommand or wrong number of arguments for '%.128s'. Try %s HELP.", (char*)c->argv[1]->ptr,cmd); sdsfree(cmd); } @@ -1995,20 +2028,7 @@ int handleClientsWithPendingWrites(void) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_barrier = 0; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. the write barrier ensures that. */ - if (server.aof_state == AOF_ON && - server.aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_barrier = 1; - } - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { - freeClientAsync(c); - } + installClientWriteHandler(c); } } return processed; @@ -2022,6 +2042,7 @@ void resetClient(client *c) { c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; + c->slot = -1; if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -2075,7 +2096,7 @@ void unprotectClient(client *c) { c->flags &= ~CLIENT_PROTECTED; if (c->conn) { connSetReadHandler(c->conn,readQueryFromClient); - if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); } } } @@ -2649,7 +2670,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ - if (processInputBuffer(c) == C_ERR) + if (processInputBuffer(c) == C_ERR) c = NULL; done: @@ -3808,7 +3829,7 @@ void flushSlavesOutputBuffers(void) { } } -/* Compute current most restictive pause type and its end time, aggregated for +/* Compute current most restrictive pause type and its end time, aggregated for * all pause purposes. */ static void updateClientPauseTypeAndEndTime(void) { pause_type old_type = server.client_pause_type; @@ -4212,10 +4233,8 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Install the write handler if there are pending writes in some * of the clients. */ - if (clientHasPendingReplies(c) && - connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) - { - freeClientAsync(c); + if (clientHasPendingReplies(c)) { + installClientWriteHandler(c); } } listEmpty(server.clients_pending_write); @@ -4327,10 +4346,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { } /* We may have pending replies if a thread readQueryFromClient() produced - * replies and did not install a write handler (it can't). + * replies and did not put the client in pending write queue (it can't). */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); } /* Update processed count on server */ diff --git a/src/notify.c b/src/notify.c index 28c0048cb..2881a48db 100644 --- a/src/notify.c +++ b/src/notify.c @@ -57,6 +57,7 @@ int keyspaceEventsStringToFlags(char *classes) { case 't': flags |= NOTIFY_STREAM; break; case 'm': flags |= NOTIFY_KEY_MISS; break; case 'd': flags |= NOTIFY_MODULE; break; + case 'n': flags |= NOTIFY_NEW; break; default: return -1; } } @@ -84,6 +85,7 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1); + if (flags & NOTIFY_NEW) res = sdscatlen(res,"n",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); @@ -124,7 +126,7 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, key->ptr); chanobj = createObject(OBJ_STRING, chan); - pubsubPublishMessage(chanobj, eventobj); + pubsubPublishMessage(chanobj, eventobj, 0); decrRefCount(chanobj); } @@ -136,7 +138,7 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, eventobj->ptr); chanobj = createObject(OBJ_STRING, chan); - pubsubPublishMessage(chanobj, key); + pubsubPublishMessage(chanobj, key, 0); decrRefCount(chanobj); } decrRefCount(eventobj); diff --git a/src/object.c b/src/object.c index a60a27e90..093e2619e 100644 --- a/src/object.c +++ b/src/object.c @@ -958,7 +958,7 @@ char *strEncoding(int encoding) { * on the insertion speed and thus the ability of the radix tree * to compress prefixes. */ size_t streamRadixTreeMemoryUsage(rax *rax) { - size_t size; + size_t size = sizeof(*rax); size = rax->numele * sizeof(streamID); size += rax->numnodes * sizeof(raxNode); /* Add a fixed overhead due to the aux data pointer, children, ... */ diff --git a/src/pubsub.c b/src/pubsub.c index e805b16ef..a630afc8f 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -499,16 +499,10 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) } /* Publish a message to all the subscribers. */ -int pubsubPublishMessage(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel,message,pubSubType); +int pubsubPublishMessage(robj *channel, robj *message, int sharded) { + return pubsubPublishMessageInternal(channel, message, sharded? pubSubShardType : pubSubType); } -/* Publish a shard message to all the subscribers. */ -int pubsubPublishMessageShard(robj *channel, robj *message) { - return pubsubPublishMessageInternal(channel, message, pubSubShardType); -} - - /*----------------------------------------------------------------------------- * Pubsub commands implementation *----------------------------------------------------------------------------*/ @@ -578,6 +572,15 @@ void punsubscribeCommand(client *c) { if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } +/* This function wraps pubsubPublishMessage and also propagates the message to cluster. + * Used by the commands PUBLISH/SPUBLISH and their respective module APIs.*/ +int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded) { + int receivers = pubsubPublishMessage(channel, message, sharded); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message, sharded); + return receivers; +} + /* PUBLISH <channel> <message> */ void publishCommand(client *c) { if (server.sentinel_mode) { @@ -585,10 +588,8 @@ void publishCommand(client *c) { return; } - int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - if (server.cluster_enabled) - clusterPropagatePublish(c->argv[1],c->argv[2]); - else + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],0); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } @@ -677,12 +678,9 @@ void channelList(client *c, sds pat, dict *pubsub_channels) { /* SPUBLISH <channel> <message> */ void spublishCommand(client *c) { - int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType); - if (server.cluster_enabled) { - clusterPropagatePublishShard(c->argv[1], c->argv[2]); - } else { + int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1); + if (!server.cluster_enabled) forceCommandPropagation(c,PROPAGATE_REPL); - } addReplyLongLong(c,receivers); } diff --git a/src/quicklist.h b/src/quicklist.h index dc30cafd9..8d6951b62 100644 --- a/src/quicklist.h +++ b/src/quicklist.h @@ -116,7 +116,7 @@ typedef struct quicklist { typedef struct quicklistIter { quicklist *quicklist; quicklistNode *current; - unsigned char *zi; + unsigned char *zi; /* points to the current element */ long offset; /* offset in current listpack */ int direction; } quicklistIter; @@ -141,7 +141,7 @@ typedef struct quicklistEntry { /* quicklist compression disable */ #define QUICKLIST_NOCOMPRESS 0 -/* quicklist container formats */ +/* quicklist node container formats */ #define QUICKLIST_NODE_CONTAINER_PLAIN 1 #define QUICKLIST_NODE_CONTAINER_PACKED 2 @@ -588,22 +588,11 @@ int rdbSaveDoubleValue(rio *rdb, double val) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { -#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) - /* Check if the float is in a safe range to be casted into a - * long long. We are assuming that long long is 64 bit here. - * Also we are assuming that there are no implementations around where - * double has precision < 52 bit. - * - * Under this assumptions we test if a double is inside an interval - * where casting to long long is safe. Then using two castings we - * make sure the decimal part is zero. If all this is true we use - * integer printing function that is much faster. */ - double min = -4503599627370495; /* (2^52)-1 */ - double max = 4503599627370496; /* -(2^52) */ - if (val > min && val < max && val == ((double)((long long)val))) - ll2string((char*)buf+1,sizeof(buf)-1,(long long)val); + long long lvalue; + /* Integer printing function is much faster, check if we can safely use it. */ + if (double2ll(val, &lvalue)) + ll2string((char*)buf+1,sizeof(buf)-1,lvalue); else -#endif snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); buf[0] = strlen((char*)buf+1); len = buf[0]+1; @@ -2433,6 +2422,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { return NULL; } + if (s->length && !raxSize(s->rax)) { + rdbReportCorruptRDB("Stream length inconsistent with rax entries"); + decrRefCount(o); + return NULL; + } + /* Consumer groups loading */ uint64_t cgroups_count = rdbLoadLen(rdb,NULL); if (cgroups_count == RDB_LENERR) { diff --git a/src/redismodule.h b/src/redismodule.h index f27c06b0e..cd389dd00 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -88,6 +88,7 @@ #define REDISMODULE_CONFIG_DENY_LOADING (1ULL<<6) /* This config is forbidden during loading. */ #define REDISMODULE_CONFIG_MEMORY (1ULL<<7) /* Indicates if this value can be set as a memory value */ +#define REDISMODULE_CONFIG_BITFLAGS (1ULL<<8) /* Indicates if this value can be set as a multiple enum values */ /* StreamID type. */ typedef struct RedisModuleStreamID { @@ -322,6 +323,7 @@ typedef struct RedisModuleCommandArg { const char *summary; const char *since; int flags; /* The REDISMODULE_CMD_ARG_* macros. */ + const char *deprecated_since; struct RedisModuleCommandArg *subargs; } RedisModuleCommandArg; @@ -735,6 +737,13 @@ typedef struct RedisModuleSwapDbInfo { #define RedisModuleSwapDbInfo RedisModuleSwapDbInfoV1 +typedef enum { + REDISMODULE_ACL_LOG_AUTH = 0, /* Authentication failure */ + REDISMODULE_ACL_LOG_CMD, /* Command authorization failure */ + REDISMODULE_ACL_LOG_KEY, /* Key authorization failure */ + REDISMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */ +} RedisModuleACLLogEntryReason; + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -861,6 +870,7 @@ typedef struct RedisModuleTypeMethods { #endif REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR; +REDISMODULE_API void * (*RedisModule_TryAlloc)(size_t bytes) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_Free)(void *ptr) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_Calloc)(size_t nmemb, size_t size) REDISMODULE_ATTR; @@ -877,7 +887,7 @@ REDISMODULE_API int (*RedisModule_ReplyWithLongLong)(RedisModuleCtx *ctx, long l REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR; -REDISMODULE_API void * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR; @@ -990,6 +1000,7 @@ REDISMODULE_API unsigned long long (*RedisModule_GetClientId)(RedisModuleCtx *ct REDISMODULE_API RedisModuleString * (*RedisModule_GetClientUserNameById)(RedisModuleCtx *ctx, uint64_t id) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetClientInfoById)(void *ci, uint64_t id) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR; @@ -1149,6 +1160,8 @@ REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR; REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR; +REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR; +REDISMODULE_API size_t (*RedisModule_MallocSizeDict)(RedisModuleDict* dict) REDISMODULE_ATTR; REDISMODULE_API RedisModuleUser * (*RedisModule_CreateModuleUser)(const char *name) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_FreeModuleUser)(RedisModuleUser *user) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SetModuleUserACL)(RedisModuleUser *user, const char* acl) REDISMODULE_ATTR; @@ -1157,7 +1170,7 @@ REDISMODULE_API RedisModuleUser * (*RedisModule_GetModuleUserFromUserName)(Redis REDISMODULE_API int (*RedisModule_ACLCheckCommandPermissions)(RedisModuleUser *user, RedisModuleString **argv, int argc) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ACLCheckKeyPermissions)(RedisModuleUser *user, RedisModuleString *key, int flags) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ACLCheckChannelPermissions)(RedisModuleUser *user, RedisModuleString *ch, int literal) REDISMODULE_ATTR; -REDISMODULE_API void (*RedisModule_ACLAddLogEntry)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_ACLAddLogEntry)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleString *object, RedisModuleACLLogEntryReason reason) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AuthenticateClientWithACLUser)(RedisModuleCtx *ctx, const char *name, size_t len, RedisModuleUserChangedFunc callback, void *privdata, uint64_t *client_id) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AuthenticateClientWithUser)(RedisModuleCtx *ctx, RedisModuleUser *user, RedisModuleUserChangedFunc callback, void *privdata, uint64_t *client_id) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id) REDISMODULE_ATTR; @@ -1191,6 +1204,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int void *getapifuncptr = ((void**)ctx)[0]; RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr; REDISMODULE_GET_API(Alloc); + REDISMODULE_GET_API(TryAlloc); REDISMODULE_GET_API(Calloc); REDISMODULE_GET_API(Free); REDISMODULE_GET_API(Realloc); @@ -1411,6 +1425,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ServerInfoGetFieldDouble); REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(PublishMessage); + REDISMODULE_GET_API(PublishMessageShard); REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(SetLRU); REDISMODULE_GET_API(GetLRU); @@ -1478,6 +1493,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(KillForkChild); REDISMODULE_GET_API(GetUsedMemoryRatio); REDISMODULE_GET_API(MallocSize); + REDISMODULE_GET_API(MallocSizeString); + REDISMODULE_GET_API(MallocSizeDict); REDISMODULE_GET_API(CreateModuleUser); REDISMODULE_GET_API(FreeModuleUser); REDISMODULE_GET_API(SetModuleUserACL); diff --git a/src/replication.c b/src/replication.c index e9a754ab4..2a0404f87 100644 --- a/src/replication.c +++ b/src/replication.c @@ -327,9 +327,6 @@ void feedReplicationBuffer(char *s, size_t len) { server.master_repl_offset += len; server.repl_backlog->histlen += len; - /* Install write handler for all replicas. */ - prepareReplicasToWrite(); - size_t start_pos = 0; /* The position of referenced block to start sending. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ int add_new_block = 0; /* Create new block if current block is total used. */ @@ -440,6 +437,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; @@ -539,7 +540,12 @@ void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { /* There must be replication backlog if having attached slaves. */ if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); - if (server.repl_backlog) feedReplicationBuffer(buf,buflen); + if (server.repl_backlog) { + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + feedReplicationBuffer(buf,buflen); + } } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -1285,7 +1291,7 @@ void replicaStartCommandStream(client *slave) { return; } - clientInstallWriteHandler(slave); + putClientInPendingWriteQueue(slave); } /* We call this function periodically to remove an RDB file that was @@ -1969,6 +1975,20 @@ void readSyncBulkPayload(connection *conn) { /* We need to stop any AOF rewriting child before flushing and parsing * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); + /* Also try to stop save RDB child before flushing and parsing the RDB: + * 1. Ensure background save doesn't overwrite synced data after being loaded. + * 2. Avoid copy-on-write disaster. */ + if (server.child_type == CHILD_TYPE_RDB) { + if (!use_diskless_load) { + serverLog(LL_NOTICE, + "Replica is about to load the RDB file received from the " + "master, but there is a pending RDB child running. " + "Killing process %ld and removing its temp file to avoid " + "any race", + (long) server.child_pid); + } + killRDBChild(); + } if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { /* Initialize empty tempDb dictionaries. */ @@ -2100,16 +2120,6 @@ void readSyncBulkPayload(connection *conn) { connNonBlock(conn); connRecvTimeout(conn,0); } else { - /* Ensure background save doesn't overwrite synced data */ - if (server.child_type == CHILD_TYPE_RDB) { - serverLog(LL_NOTICE, - "Replica is about to load the RDB file received from the " - "master, but there is a pending RDB child running. " - "Killing process %ld and removing its temp file to avoid " - "any race", - (long) server.child_pid); - killRDBChild(); - } /* Make sure the new file (also used for persistence) is fully synced * (not covered by earlier calls to rdb_fsync_range). */ diff --git a/src/resp_parser.h b/src/resp_parser.h index 4597efee3..0b5c8e22c 100644 --- a/src/resp_parser.h +++ b/src/resp_parser.h @@ -68,10 +68,10 @@ typedef struct ReplyParserCallbacks { /* Called when the parser reaches a double (','), which is passed as an argument 'val' */ void (*double_callback)(void *ctx, double val, const char *proto, size_t proto_len); - /* Called when the parser reaches a big number (','), which is passed as 'str' along with its length 'len' */ + /* Called when the parser reaches a big number ('('), which is passed as 'str' along with its length 'len' */ void (*big_number_callback)(void *ctx, const char *str, size_t len, const char *proto, size_t proto_len); - /* Called when the parser reaches a string, which is passed as 'str' along with its 'format' and length 'len' */ + /* Called when the parser reaches a string ('='), which is passed as 'str' along with its 'format' and length 'len' */ void (*verbatim_string_callback)(void *ctx, const char *format, const char *str, size_t len, const char *proto, size_t proto_len); /* Called when the parser reaches an attribute ('|'). The attribute length is passed as an argument 'len' */ diff --git a/src/script.c b/src/script.c index 990248c45..8216b47f5 100644 --- a/src/script.c +++ b/src/script.c @@ -36,6 +36,7 @@ scriptFlag scripts_flags_def[] = { {.flag = SCRIPT_FLAG_ALLOW_OOM, .str = "allow-oom"}, {.flag = SCRIPT_FLAG_ALLOW_STALE, .str = "allow-stale"}, {.flag = SCRIPT_FLAG_NO_CLUSTER, .str = "no-cluster"}, + {.flag = SCRIPT_FLAG_ALLOW_CROSS_SLOT, .str = "allow-cross-slot-keys"}, {.flag = 0, .str = NULL}, /* flags array end */ }; @@ -114,6 +115,7 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca int running_stale = server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0; + int obey_client = mustObeyClient(caller); if (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE)) { if ((script_flags & SCRIPT_FLAG_NO_CLUSTER) && server.cluster_enabled) { @@ -139,16 +141,14 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca * 1. we are not a readonly replica * 2. no disk error detected * 3. command is not `fcall_ro`/`eval[sha]_ro` */ - if (server.masterhost && server.repl_slave_ro && caller->id != CLIENT_ID_AOF - && !(caller->flags & CLIENT_MASTER)) - { + if (server.masterhost && server.repl_slave_ro && !obey_client) { addReplyError(caller, "Can not run script with write flag on readonly replica"); return C_ERR; } /* Deny writes if we're unale to persist. */ int deny_write_type = writeCommandsDeniedByDiskError(); - if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL) { + if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) { if (deny_write_type == DISK_ERROR_TYPE_RDB) addReplyError(caller, "-MISCONF Redis is configured to save RDB snapshots, " "but it's currently unable to persist to disk. " @@ -219,6 +219,10 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca run_ctx->flags |= SCRIPT_ALLOW_OOM; } + if ((script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE) || (script_flags & SCRIPT_FLAG_ALLOW_CROSS_SLOT)) { + run_ctx->flags |= SCRIPT_ALLOW_CROSS_SLOT; + } + /* set the curr_run_ctx so we can use it to kill the script if needed */ curr_run_ctx = run_ctx; @@ -269,7 +273,7 @@ void scriptKill(client *c, int is_eval) { addReplyError(c, "-NOTBUSY No scripts in execution right now."); return; } - if (curr_run_ctx->original_client->flags & CLIENT_MASTER) { + if (mustObeyClient(curr_run_ctx->original_client)) { addReplyError(c, "-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed."); } @@ -334,8 +338,8 @@ static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) { * of this script. */ int deny_write_type = writeCommandsDeniedByDiskError(); - if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->id != CLIENT_ID_AOF - && !(run_ctx->original_client->flags & CLIENT_MASTER)) + if (server.masterhost && server.repl_slave_ro && + !mustObeyClient(run_ctx->original_client)) { *err = sdsdup(shared.roslaveerr->ptr); return C_ERR; @@ -380,8 +384,7 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) { * in the middle. */ if (server.maxmemory && /* Maxmemory is actually enabled. */ - run_ctx->original_client->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */ - !server.masterhost && /* Slave must execute the script. */ + !mustObeyClient(run_ctx->original_client) && /* Don't care about mem for replicas or AOF. */ !(run_ctx->flags & SCRIPT_WRITE_DIRTY) && /* Script had no side effects so far. */ server.script_oom && /* Detected OOM when script start. */ (run_ctx->c->cmd->flags & CMD_DENYOOM)) @@ -393,8 +396,8 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) { return C_OK; } -static int scriptVerifyClusterState(client *c, client *original_c, sds *err) { - if (!server.cluster_enabled || original_c->id == CLIENT_ID_AOF || (original_c->flags & CLIENT_MASTER)) { +static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *original_c, sds *err) { + if (!server.cluster_enabled || mustObeyClient(original_c)) { return C_OK; } /* If this is a Redis Cluster node, we need to make sure the script is not @@ -404,7 +407,8 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) { /* Duplicate relevant flags in the script client. */ c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING); c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING); - if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, &error_code) != server.cluster->myself) { + int hashslot = -1; + if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != server.cluster->myself) { if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { *err = sdsnew( "Script attempted to execute a write command while the " @@ -418,6 +422,19 @@ static int scriptVerifyClusterState(client *c, client *original_c, sds *err) { } return C_ERR; } + + /* If the script declared keys in advanced, the cross slot error would have + * already been thrown. This is only checking for cross slot keys being accessed + * that weren't pre-declared. */ + if (hashslot != -1 && !(run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) { + if (original_c->slot == -1) { + original_c->slot = hashslot; + } else if (original_c->slot != hashslot) { + *err = sdsnew("Script attempted to access keys that do not hash to " + "the same slot"); + return C_ERR; + } + } return C_OK; } @@ -522,7 +539,7 @@ void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) { run_ctx->flags |= SCRIPT_WRITE_DIRTY; } - if (scriptVerifyClusterState(c, run_ctx->original_client, err) != C_OK) { + if (scriptVerifyClusterState(run_ctx, c, run_ctx->original_client, err) != C_OK) { goto error; } diff --git a/src/script.h b/src/script.h index 9785af095..d855c80e2 100644 --- a/src/script.h +++ b/src/script.h @@ -64,6 +64,7 @@ #define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */ #define SCRIPT_ALLOW_OOM (1ULL<<6) /* indicate to allow any command even if OOM reached */ #define SCRIPT_EVAL_MODE (1ULL<<7) /* Indicate that the current script called from legacy Lua */ +#define SCRIPT_ALLOW_CROSS_SLOT (1ULL<<8) /* Indicate that the current script may access keys from multiple slots */ typedef struct scriptRunCtx scriptRunCtx; struct scriptRunCtx { @@ -82,6 +83,7 @@ struct scriptRunCtx { #define SCRIPT_FLAG_ALLOW_STALE (1ULL<<2) #define SCRIPT_FLAG_NO_CLUSTER (1ULL<<3) #define SCRIPT_FLAG_EVAL_COMPAT_MODE (1ULL<<4) /* EVAL Script backwards compatible behavior, no shebang provided */ +#define SCRIPT_FLAG_ALLOW_CROSS_SLOT (1ULL<<5) /* Defines a script flags */ typedef struct scriptFlag { diff --git a/src/script_lua.c b/src/script_lua.c index 9a08a7e47..36868ec2b 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -41,6 +41,97 @@ #include <ctype.h> #include <math.h> +/* Globals that are added by the Lua libraries */ +static char *libraries_allow_list[] = { + "string", + "cjson", + "bit", + "cmsgpack", + "math", + "table", + "struct", + NULL, +}; + +/* Redis Lua API globals */ +static char *redis_api_allow_list[] = { + "redis", + "__redis__err__handler", /* error handler for eval, currently located on globals. + Should move to registry. */ + NULL, +}; + +/* Lua builtins */ +static char *lua_builtins_allow_list[] = { + "xpcall", + "tostring", + "getfenv", + "setmetatable", + "next", + "assert", + "tonumber", + "rawequal", + "collectgarbage", + "getmetatable", + "rawset", + "pcall", + "coroutine", + "type", + "_G", + "select", + "unpack", + "gcinfo", + "pairs", + "rawget", + "loadstring", + "ipairs", + "_VERSION", + "setfenv", + "load", + "error", + NULL, +}; + +/* Lua builtins which are not documented on the Lua documentation */ +static char *lua_builtins_not_documented_allow_list[] = { + "newproxy", + NULL, +}; + +/* Lua builtins which are allowed on initialization but will be removed right after */ +static char *lua_builtins_removed_after_initialization_allow_list[] = { + "debug", /* debug will be set to nil after the error handler will be created */ + NULL, +}; + +/* Those allow lists was created from the globals that was + * available to the user when the allow lists was first introduce. + * Because we do not want to break backward compatibility we keep + * all the globals. The allow lists will prevent us from accidentally + * creating unwanted globals in the future. + * + * Also notice that the allow list is only checked on start time, + * after that the global table is locked so not need to check anything.*/ +static char **allow_lists[] = { + libraries_allow_list, + redis_api_allow_list, + lua_builtins_allow_list, + lua_builtins_not_documented_allow_list, + lua_builtins_removed_after_initialization_allow_list, + NULL, +}; + +/* Deny list contains elements which we know we do not want to add to globals + * and there is no need to print a warning message form them. We will print a + * log message only if an element was added to the globals and the element is + * not on the allow list nor on the back list. */ +static char *deny_list[] = { + "dofile", + "loadfile", + "print", + NULL, +}; + static int redis_math_random (lua_State *L); static int redis_math_randomseed (lua_State *L); static void redisProtocolToLuaType_Int(void *ctx, long long val, const char *proto, size_t proto_len); @@ -1113,15 +1204,6 @@ static void luaLoadLibraries(lua_State *lua) { #endif } -/* Remove a functions that we don't want to expose to the Redis scripting - * environment. */ -static void luaRemoveUnsupportedFunctions(lua_State *lua) { - lua_pushnil(lua); - lua_setglobal(lua,"loadfile"); - lua_pushnil(lua); - lua_setglobal(lua,"dofile"); -} - /* Return sds of the string value located on stack at the given index. * Return NULL if the value is not a string. */ sds luaGetStringSds(lua_State *lua, int index) { @@ -1135,107 +1217,120 @@ sds luaGetStringSds(lua_State *lua, int index) { return str_sds; } -/* This function installs metamethods in the global table _G that prevent - * the creation of globals accidentally. - * - * It should be the last to be called in the scripting engine initialization - * sequence, because it may interact with creation of globals. - * - * On Legacy Lua (eval) we need to check 'w ~= \"main\"' otherwise we will not be able - * to create the global 'function <sha> ()' variable. On Functions Lua engine we do not use - * this trick so it's not needed. */ -void luaEnableGlobalsProtection(lua_State *lua, int is_eval) { - char *s[32]; - sds code = sdsempty(); - int j = 0; - - /* strict.lua from: http://metalua.luaforge.net/src/lib/strict.lua.html. - * Modified to be adapted to Redis. */ - s[j++]="local dbg=debug\n"; - s[j++]="local mt = {}\n"; - s[j++]="setmetatable(_G, mt)\n"; - s[j++]="mt.__newindex = function (t, n, v)\n"; - s[j++]=" if dbg.getinfo(2) then\n"; - s[j++]=" local w = dbg.getinfo(2, \"S\").what\n"; - s[j++]= is_eval ? " if w ~= \"main\" and w ~= \"C\" then\n" : " if w ~= \"C\" then\n"; - s[j++]=" error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n"; - s[j++]=" end\n"; - s[j++]=" end\n"; - s[j++]=" rawset(t, n, v)\n"; - s[j++]="end\n"; - s[j++]="mt.__index = function (t, n)\n"; - s[j++]=" if dbg.getinfo(2) and dbg.getinfo(2, \"S\").what ~= \"C\" then\n"; - s[j++]=" error(\"Script attempted to access nonexistent global variable '\"..tostring(n)..\"'\", 2)\n"; - s[j++]=" end\n"; - s[j++]=" return rawget(t, n)\n"; - s[j++]="end\n"; - s[j++]="debug = nil\n"; - s[j++]=NULL; - - for (j = 0; s[j] != NULL; j++) code = sdscatlen(code,s[j],strlen(s[j])); - luaL_loadbuffer(lua,code,sdslen(code),"@enable_strict_lua"); - lua_pcall(lua,0,0,0); - sdsfree(code); +static int luaProtectedTableError(lua_State *lua) { + int argc = lua_gettop(lua); + if (argc != 2) { + serverLog(LL_WARNING, "malicious code trying to call luaProtectedTableError with wrong arguments"); + luaL_error(lua, "Wrong number of arguments to luaProtectedTableError"); + } + if (!lua_isstring(lua, -1) && !lua_isnumber(lua, -1)) { + luaL_error(lua, "Second argument to luaProtectedTableError must be a string or number"); + } + const char *variable_name = lua_tostring(lua, -1); + luaL_error(lua, "Script attempted to access nonexistent global variable '%s'", variable_name); + return 0; } -/* Create a global protection function and put it to registry. - * This need to be called once in the lua_State lifetime. - * After called it is possible to use luaSetGlobalProtection - * to set global protection on a give table. +/* Set a special metatable on the table on the top of the stack. + * The metatable will raise an error if the user tries to fetch + * an un-existing value. * * The function assumes the Lua stack have a least enough * space to push 2 element, its up to the caller to verify - * this before calling this function. - * - * Notice, the difference between this and luaEnableGlobalsProtection - * is that luaEnableGlobalsProtection is enabling global protection - * on the current Lua globals. This registering a global protection - * function that later can be applied on any table. */ -void luaRegisterGlobalProtectionFunction(lua_State *lua) { - lua_pushstring(lua, REGISTRY_SET_GLOBALS_PROTECTION_NAME); - char *global_protection_func = "local dbg = debug\n" - "local globals_protection = function (t)\n" - " local mt = {}\n" - " setmetatable(t, mt)\n" - " mt.__newindex = function (t, n, v)\n" - " if dbg.getinfo(2) then\n" - " local w = dbg.getinfo(2, \"S\").what\n" - " if w ~= \"C\" then\n" - " error(\"Script attempted to create global variable '\"..tostring(n)..\"'\", 2)\n" - " end" - " end" - " rawset(t, n, v)\n" - " end\n" - " mt.__index = function (t, n)\n" - " if dbg.getinfo(2) and dbg.getinfo(2, \"S\").what ~= \"C\" then\n" - " error(\"Script attempted to access nonexistent global variable '\"..tostring(n)..\"'\", 2)\n" - " end\n" - " return rawget(t, n)\n" - " end\n" - "end\n" - "return globals_protection"; - int res = luaL_loadbuffer(lua, global_protection_func, strlen(global_protection_func), "@global_protection_def"); - serverAssert(res == 0); - res = lua_pcall(lua,0,1,0); - serverAssert(res == 0); - lua_settable(lua, LUA_REGISTRYINDEX); + * this before calling this function. */ +void luaSetErrorMetatable(lua_State *lua) { + lua_newtable(lua); /* push metatable */ + lua_pushcfunction(lua, luaProtectedTableError); /* push get error handler */ + lua_setfield(lua, -2, "__index"); + lua_setmetatable(lua, -2); } -/* Set global protection on a given table. - * The table need to be located on the top of the lua stack. - * After called, it will no longer be possible to set - * new items on the table. The function is not removing - * the table from the top of the stack! +static int luaNewIndexAllowList(lua_State *lua) { + int argc = lua_gettop(lua); + if (argc != 3) { + serverLog(LL_WARNING, "malicious code trying to call luaProtectedTableError with wrong arguments"); + luaL_error(lua, "Wrong number of arguments to luaNewIndexAllowList"); + } + if (!lua_istable(lua, -3)) { + luaL_error(lua, "first argument to luaNewIndexAllowList must be a table"); + } + if (!lua_isstring(lua, -2) && !lua_isnumber(lua, -2)) { + luaL_error(lua, "Second argument to luaNewIndexAllowList must be a string or number"); + } + const char *variable_name = lua_tostring(lua, -2); + /* check if the key is in our allow list */ + + char ***allow_l = allow_lists; + for (; *allow_l ; ++allow_l){ + char **c = *allow_l; + for (; *c ; ++c) { + if (strcmp(*c, variable_name) == 0) { + break; + } + } + if (*c) { + break; + } + } + if (!*allow_l) { + /* Search the value on the back list, if its there we know that it was removed + * on purpose and there is no need to print a warning. */ + char **c = deny_list; + for ( ; *c ; ++c) { + if (strcmp(*c, variable_name) == 0) { + break; + } + } + if (!*c) { + serverLog(LL_WARNING, "A key '%s' was added to Lua globals which is not on the globals allow list nor listed on the deny list.", variable_name); + } + } else { + lua_rawset(lua, -3); + } + return 0; +} + +/* Set a metatable with '__newindex' function that verify that + * the new index appears on our globals while list. * - * The function assumes the Lua stack have a least enough - * space to push 2 element, its up to the caller to verify - * this before calling this function. */ -void luaSetGlobalProtection(lua_State *lua) { - lua_pushstring(lua, REGISTRY_SET_GLOBALS_PROTECTION_NAME); - lua_gettable(lua, LUA_REGISTRYINDEX); - lua_pushvalue(lua, -2); - int res = lua_pcall(lua, 1, 0, 0); - serverAssert(res == 0); + * The metatable is set on the table which located on the top + * of the stack. + */ +void luaSetAllowListProtection(lua_State *lua) { + lua_newtable(lua); /* push metatable */ + lua_pushcfunction(lua, luaNewIndexAllowList); /* push get error handler */ + lua_setfield(lua, -2, "__newindex"); + lua_setmetatable(lua, -2); +} + +/* Set the readonly flag on the table located on the top of the stack + * and recursively call this function on each table located on the original + * table. Also, recursively call this function on the metatables.*/ +void luaSetTableProtectionRecursively(lua_State *lua) { + /* This protect us from a loop in case we already visited the table + * For example, globals has '_G' key which is pointing back to globals. */ + if (lua_isreadonlytable(lua, -1)) { + return; + } + + /* protect the current table */ + lua_enablereadonlytable(lua, -1, 1); + + lua_checkstack(lua, 2); + lua_pushnil(lua); /* Use nil to start iteration. */ + while (lua_next(lua,-2)) { + /* Stack now: table, key, value */ + if (lua_istable(lua, -1)) { + luaSetTableProtectionRecursively(lua); + } + lua_pop(lua, 1); + } + + /* protect the metatable if exists */ + if (lua_getmetatable(lua, -1)) { + luaSetTableProtectionRecursively(lua); + lua_pop(lua, 1); /* pop the metatable */ + } } void luaRegisterVersion(lua_State* lua) { @@ -1272,8 +1367,11 @@ void luaRegisterLogFunction(lua_State* lua) { } void luaRegisterRedisAPI(lua_State* lua) { + lua_pushvalue(lua, LUA_GLOBALSINDEX); + luaSetAllowListProtection(lua); + lua_pop(lua, 1); + luaLoadLibraries(lua); - luaRemoveUnsupportedFunctions(lua); lua_pushcfunction(lua,luaRedisPcall); lua_setglobal(lua, "pcall"); @@ -1504,9 +1602,19 @@ void luaCallFunction(scriptRunCtx* run_ctx, lua_State *lua, robj** keys, size_t * EVAL received. */ luaCreateArray(lua,keys,nkeys); /* On eval, keys and arguments are globals. */ - if (run_ctx->flags & SCRIPT_EVAL_MODE) lua_setglobal(lua,"KEYS"); + if (run_ctx->flags & SCRIPT_EVAL_MODE){ + /* open global protection to set KEYS */ + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 0); + lua_setglobal(lua,"KEYS"); + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); + } luaCreateArray(lua,args,nargs); - if (run_ctx->flags & SCRIPT_EVAL_MODE) lua_setglobal(lua,"ARGV"); + if (run_ctx->flags & SCRIPT_EVAL_MODE){ + /* open global protection to set ARGV */ + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 0); + lua_setglobal(lua,"ARGV"); + lua_enablereadonlytable(lua, LUA_GLOBALSINDEX, 1); + } /* At this point whether this script was never seen before or if it was * already defined, we can call it. diff --git a/src/script_lua.h b/src/script_lua.h index 5a4533784..4c2b34804 100644 --- a/src/script_lua.h +++ b/src/script_lua.h @@ -67,9 +67,10 @@ typedef struct errorInfo { void luaRegisterRedisAPI(lua_State* lua); sds luaGetStringSds(lua_State *lua, int index); -void luaEnableGlobalsProtection(lua_State *lua, int is_eval); void luaRegisterGlobalProtectionFunction(lua_State *lua); -void luaSetGlobalProtection(lua_State *lua); +void luaSetErrorMetatable(lua_State *lua); +void luaSetAllowListProtection(lua_State *lua); +void luaSetTableProtectionRecursively(lua_State *lua); void luaRegisterLogFunction(lua_State* lua); void luaRegisterVersion(lua_State* lua); void luaPushErrorBuff(lua_State *lua, sds err_buff); diff --git a/src/sentinel.c b/src/sentinel.c index 3ad8f902b..9ea78aae5 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -705,7 +705,7 @@ void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, if (level != LL_DEBUG) { channel = createStringObject(type,strlen(type)); payload = createStringObject(msg,strlen(msg)); - pubsubPublishMessage(channel,payload); + pubsubPublishMessage(channel,payload,0); decrRefCount(channel); decrRefCount(payload); } diff --git a/src/server.c b/src/server.c index 84f21fed3..298834eab 100644 --- a/src/server.c +++ b/src/server.c @@ -36,7 +36,6 @@ #include "atomicvar.h" #include "mt19937-64.h" #include "functions.h" -#include "hdr_alloc.h" #include <time.h> #include <signal.h> @@ -1016,18 +1015,8 @@ void databasesCron(void) { } } -/* We take a cached value of the unix time in the global state because with - * virtual memory and aging there is to store the current time in objects at - * every object access, and accuracy is not needed. To access a global var is - * a lot faster than calling time(NULL). - * - * This function should be fast because it is called at every command execution - * in call(), so it is possible to decide if to update the daylight saving - * info or not using the 'update_daylight_info' argument. Normally we update - * such info only when calling this function from serverCron() but not when - * calling it from call(). */ -void updateCachedTime(int update_daylight_info) { - server.ustime = ustime(); +static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) { + server.ustime = ustime; server.mstime = server.ustime / 1000; time_t unixtime = server.mstime / 1000; atomicSet(server.unixtime, unixtime); @@ -1045,6 +1034,21 @@ void updateCachedTime(int update_daylight_info) { } } +/* We take a cached value of the unix time in the global state because with + * virtual memory and aging there is to store the current time in objects at + * every object access, and accuracy is not needed. To access a global var is + * a lot faster than calling time(NULL). + * + * This function should be fast because it is called at every command execution + * in call(), so it is possible to decide if to update the daylight saving + * info or not using the 'update_daylight_info' argument. Normally we update + * such info only when calling this function from serverCron() but not when + * calling it from call(). */ +void updateCachedTime(int update_daylight_info) { + const long long us = ustime(); + updateCachedTimeWithUs(update_daylight_info, us); +} + void checkChildrenDone(void) { int statloc = 0; pid_t pid; @@ -1209,10 +1213,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { cronUpdateMemoryStats(); - /* We received a SIGTERM, shutting down here in a safe way, as it is + /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (server.shutdown_asap && !isShutdownInitiated()) { - if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); + int shutdownFlags = SHUTDOWN_NOFLAGS; + if (server.last_sig_received == SIGINT && server.shutdown_on_sigint) + shutdownFlags = server.shutdown_on_sigint; + else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm) + shutdownFlags = server.shutdown_on_sigterm; + + if (prepareForShutdown(shutdownFlags) == C_OK) exit(0); } else if (isShutdownInitiated()) { if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) { if (finishShutdown() == C_OK) exit(0); @@ -1296,13 +1306,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && - server.aof_current_size > server.aof_rewrite_min_size && - !aofRewriteLimited()) + server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; - if (growth >= server.aof_rewrite_perc) { + if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } @@ -1326,8 +1335,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * however to try every second is enough in case of 'hz' is set to * a higher frequency. */ run_with_period(1000) { - if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR) - flushAppendOnlyFile(0); + if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && + server.aof_last_write_status == C_ERR) + { + flushAppendOnlyFile(0); + } } /* Clear the paused clients state if needed. */ @@ -1466,6 +1478,7 @@ void whileBlockedCron() { if (prepareForShutdown(SHUTDOWN_NOSAVE) == C_OK) exit(0); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; + server.last_sig_received = 0; } } @@ -1509,6 +1522,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); processed += tlsProcessPendingData(); + if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) + flushAppendOnlyFile(0); processed += handleClientsWithPendingWrites(); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; @@ -1584,15 +1599,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); - /* Write the AOF buffer on disk */ + /* Try to process blocked clients every once in while. + * + * Example: A module calls RM_SignalKeyAsReady from within a timer callback + * (So we don't visit processCommand() at all). + * + * must be done before flushAppendOnlyFile, in case of appendfsync=always, + * since the unblocked clients may write data. */ + handleClientsBlockedOnKeys(); + + /* Write the AOF buffer on disk, + * must be done before handleClientsWithPendingWritesUsingThreads, + * in case of appendfsync=always. */ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0); - /* Try to process blocked clients every once in while. Example: A module - * calls RM_SignalKeyAsReady from within a timer callback (So we don't - * visit processCommand() at all). */ - handleClientsBlockedOnKeys(); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); @@ -1878,15 +1899,6 @@ void initServerConfig(void) { appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ - /* Specify the allocation function for the hdr histogram */ - hdrAllocFuncs hdrallocfn = { - .mallocFn = zmalloc, - .callocFn = zcalloc_num, - .reallocFn = zrealloc, - .freeFn = zfree, - }; - hdrSetAllocators(&hdrallocfn); - /* Replication related */ server.masterhost = NULL; server.masterport = 6379; @@ -2286,6 +2298,7 @@ int listenToPort(int port, socketFds *sfd) { closeSocketListeners(sfd); return C_ERR; } + if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id); anetNonBlock(NULL,sfd->fd[sfd->count]); anetCloexec(sfd->fd[sfd->count]); sfd->count++; @@ -2338,6 +2351,7 @@ void resetServerStats(void) { } server.stat_aof_rewrites = 0; server.stat_rdb_saves = 0; + server.stat_aofrw_consecutive_failures = 0; atomicSet(server.stat_net_input_bytes, 0); atomicSet(server.stat_net_output_bytes, 0); server.stat_unexpected_error_replies = 0; @@ -2539,6 +2553,7 @@ void initServer(void) { server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; + server.last_sig_received = 0; /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed @@ -2978,6 +2993,11 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) { return cmd; } +/* Commands arriving from the master client or AOF client, should never be rejected. */ +int mustObeyClient(client *c) { + return c->id == CLIENT_ID_AOF || c->flags & CLIENT_MASTER; +} + static int shouldPropagate(int target) { if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading) return 0; @@ -3205,7 +3225,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) { */ void call(client *c, int flags) { long long dirty; - monotime call_timer; uint64_t client_old_flags = c->flags; struct redisCommand *real_cmd = c->realcmd; @@ -3230,22 +3249,34 @@ void call(client *c, int flags) { dirty = server.dirty; incrCommandStatsOnError(NULL, 0); + const long long call_timer = ustime(); + /* Update cache time, in case we have nested calls we want to * update only on the first call*/ if (server.fixed_time_expire++ == 0) { - updateCachedTime(0); + updateCachedTimeWithUs(0,call_timer); } - server.in_nested_call++; - elapsedStart(&call_timer); + monotime monotonic_start = 0; + if (monotonicGetType() == MONOTONIC_CLOCK_HW) + monotonic_start = getMonotonicUs(); + + server.in_nested_call++; c->cmd->proc(c); - const long duration = elapsedUs(call_timer); + server.in_nested_call--; + + /* In order to avoid performance implication due to querying the clock using a system call 3 times, + * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */ + ustime_t duration; + if (monotonicGetType() == MONOTONIC_CLOCK_HW) + duration = getMonotonicUs() - monotonic_start; + else + duration = ustime() - call_timer; + c->duration = duration; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; - server.in_nested_call--; - /* Update failed command calls if required. */ if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) { @@ -3410,6 +3441,8 @@ void rejectCommand(client *c, robj *reply) { } void rejectCommandSds(client *c, sds s) { + flagTransaction(c); + if (c->cmd) c->cmd->rejected_calls++; if (c->cmd && c->cmd->proc == execCommand) { execCommandAbort(c, s); sdsfree(s); @@ -3420,8 +3453,6 @@ void rejectCommandSds(client *c, sds s) { } void rejectCommandFormat(client *c, const char *fmt, ...) { - if (c->cmd) c->cmd->rejected_calls++; - flagTransaction(c); va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); @@ -3475,6 +3506,54 @@ void populateCommandMovableKeys(struct redisCommand *cmd) { cmd->flags |= CMD_MOVABLE_KEYS; } +/* Check if c->cmd exists, fills `err` with details in case it doesn't. + * Return 1 if exists. */ +int commandCheckExistence(client *c, sds *err) { + if (c->cmd) + return 1; + if (!err) + return 0; + if (isContainerCommandBySds(c->argv[0]->ptr)) { + /* If we can't find the command but argv[0] by itself is a command + * it means we're dealing with an invalid subcommand. Print Help. */ + sds cmd = sdsnew((char *)c->argv[0]->ptr); + sdstoupper(cmd); + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "unknown subcommand '%.128s'. Try %s HELP.", + (char *)c->argv[1]->ptr, cmd); + sdsfree(cmd); + } else { + sds args = sdsempty(); + int i; + for (i=1; i < c->argc && sdslen(args) < 128; i++) + args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "unknown command '%.128s', with args beginning with: %s", + (char*)c->argv[0]->ptr, args); + sdsfree(args); + } + /* Make sure there are no newlines in the string, otherwise invalid protocol + * is emitted (The args come from the user, they may contain any character). */ + sdsmapchars(*err, "\r\n", " ", 2); + return 0; +} + +/* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't. + * Return 1 if valid. */ +int commandCheckArity(client *c, sds *err) { + if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || + (c->argc < -c->cmd->arity)) + { + if (err) { + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname); + } + return 0; + } + + return 1; +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -3514,29 +3593,13 @@ int processCommand(client *c) { /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); - if (!c->cmd) { - if (isContainerCommandBySds(c->argv[0]->ptr)) { - /* If we can't find the command but argv[0] by itself is a command - * it means we're dealing with an invalid subcommand. Print Help. */ - sds cmd = sdsnew((char *)c->argv[0]->ptr); - sdstoupper(cmd); - rejectCommandFormat(c, "Unknown subcommand '%.128s'. Try %s HELP.", - (char *)c->argv[1]->ptr, cmd); - sdsfree(cmd); - return C_OK; - } - sds args = sdsempty(); - int i; - for (i=1; i < c->argc && sdslen(args) < 128; i++) - args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); - rejectCommandFormat(c,"unknown command '%s', with args beginning with: %s", - (char*)c->argv[0]->ptr, args); - sdsfree(args); + sds err; + if (!commandCheckExistence(c, &err)) { + rejectCommandSds(c, err); return C_OK; - } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || - (c->argc < -c->cmd->arity)) - { - rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->fullname); + } + if (!commandCheckArity(c, &err)) { + rejectCommandSds(c, err); return C_OK; } @@ -3569,6 +3632,7 @@ int processCommand(client *c) { (c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE))); int is_deny_async_loading_command = (c->cmd->flags & CMD_NO_ASYNC_LOADING) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING)); + int obey_client = mustObeyClient(c); if (authRequired(c)) { /* AUTH and HELLO and no auth commands are valid even in @@ -3620,23 +3684,20 @@ int processCommand(client *c) { * 1) The sender of this command is our master. * 2) The command has no key arguments. */ if (server.cluster_enabled && - !(c->flags & CLIENT_MASTER) && - !(c->flags & CLIENT_SCRIPT && - server.script_caller->flags & CLIENT_MASTER) && + !mustObeyClient(c) && !(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 && c->cmd->proc != execCommand)) { - int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, - &hashslot,&error_code); + &c->slot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } - clusterRedirectClient(c,n,hashslot,error_code); + clusterRedirectClient(c,n,c->slot,error_code); c->cmd->rejected_calls++; return C_OK; } @@ -3706,15 +3767,29 @@ int processCommand(client *c) { if (server.tracking_clients) trackingLimitUsedSlots(); /* Don't accept write commands if there are problems persisting on disk - * and if this is a master instance. */ + * unless coming from our master, in which case check the replica ignore + * disk write error config to either log or crash. */ int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && - server.masterhost == NULL && - (is_write_command ||c->cmd->proc == pingCommand)) + (is_write_command || c->cmd->proc == pingCommand)) { - sds err = writeCommandsGetDiskErrorMessage(deny_write_type); - rejectCommandSds(c, err); - return C_OK; + if (obey_client) { + if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) { + serverPanic("Replica was unable to write command to disk."); + } else { + static mstime_t last_log_time_ms = 0; + const mstime_t log_interval_ms = 10000; + if (server.mstime > last_log_time_ms + log_interval_ms) { + last_log_time_ms = server.mstime; + serverLog(LL_WARNING, "Replica is applying a command even though " + "it is unable to write to disk."); + } + } + } else { + sds err = writeCommandsGetDiskErrorMessage(deny_write_type); + rejectCommandSds(c, err); + return C_OK; + } } /* Don't accept write commands if there are not enough good slaves and @@ -3727,7 +3802,7 @@ int processCommand(client *c) { /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && - !(c->flags & CLIENT_MASTER) && + !obey_client && is_write_command) { rejectCommand(c, shared.roslaveerr); @@ -3949,6 +4024,7 @@ static void cancelShutdown(void) { server.shutdown_asap = 0; server.shutdown_flags = 0; server.shutdown_mstime = 0; + server.last_sig_received = 0; replyToClientsBlockedOnShutdown(); unpauseClients(PAUSE_DURING_SHUTDOWN); } @@ -4309,6 +4385,7 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg if (args[j].token) maplen++; if (args[j].summary) maplen++; if (args[j].since) maplen++; + if (args[j].deprecated_since) maplen++; if (args[j].flags) maplen++; if (args[j].type == ARG_TYPE_ONEOF || args[j].type == ARG_TYPE_BLOCK) maplen++; @@ -4336,6 +4413,10 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg addReplyBulkCString(c, "since"); addReplyBulkCString(c, args[j].since); } + if (args[j].deprecated_since) { + addReplyBulkCString(c, "deprecated_since"); + addReplyBulkCString(c, args[j].deprecated_since); + } if (args[j].flags) { addReplyBulkCString(c, "flags"); addReplyFlagsForArg(c, args[j].flags); @@ -4562,6 +4643,7 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) { long maplen = 1; if (cmd->summary) maplen++; if (cmd->since) maplen++; + if (cmd->flags & CMD_MODULE) maplen++; if (cmd->complexity) maplen++; if (cmd->doc_flags) maplen++; if (cmd->deprecated_since) maplen++; @@ -4588,6 +4670,10 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) { addReplyBulkCString(c, "complexity"); addReplyBulkCString(c, cmd->complexity); } + if (cmd->flags & CMD_MODULE) { + addReplyBulkCString(c, "module"); + addReplyBulkCString(c, moduleNameFromCommand(cmd)); + } if (cmd->doc_flags) { addReplyBulkCString(c, "doc_flags"); addReplyDocFlagsForCommand(c, cmd); @@ -5123,6 +5209,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "redis_mode:%s\r\n" "os:%s %s %s\r\n" "arch_bits:%i\r\n" + "monotonic_clock:%s\r\n" "multiplexing_api:%s\r\n" "atomicvar_api:%s\r\n" "gcc_version:%i.%i.%i\r\n" @@ -5146,6 +5233,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { mode, name.sysname, name.release, name.machine, server.arch_bits, + monotonicInfoString(), aeGetApiName(), REDIS_ATOMIC_API, #ifdef __GNUC__ @@ -5383,6 +5471,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "aof_current_rewrite_time_sec:%jd\r\n" "aof_last_bgrewrite_status:%s\r\n" "aof_rewrites:%lld\r\n" + "aof_rewrites_consecutive_failures:%lld\r\n" "aof_last_write_status:%s\r\n" "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" @@ -5414,6 +5503,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { -1 : time(NULL)-server.aof_rewrite_time_start), (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", server.stat_aof_rewrites, + server.stat_aofrw_consecutive_failures, (server.aof_last_write_status == C_OK && aof_bio_fsync_status == C_OK) ? "ok" : "err", server.stat_aof_cow_bytes, @@ -6227,6 +6317,7 @@ static void sigShutdownHandler(int sig) { serverLogFromHandler(LL_WARNING, msg); server.shutdown_asap = 1; + server.last_sig_received = sig; } void setupSignalHandlers(void) { diff --git a/src/server.h b/src/server.h index edfea2226..61cff37a5 100644 --- a/src/server.h +++ b/src/server.h @@ -603,6 +603,7 @@ typedef enum { #define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */ #define NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */ #define NOTIFY_MODULE (1<<13) /* d, module key space notification */ +#define NOTIFY_NEW (1<<14) /* n, new key notification */ #define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */ /* Using the following macro you can run code inside serverCron() with the @@ -1061,7 +1062,7 @@ typedef struct replBacklog { listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, * see the definition of replBufBlock. */ size_t unindexed_count; /* The count from last creating index block. */ - rax *blocks_index; /* The index of reocrded blocks of replication + rax *blocks_index; /* The index of recorded blocks of replication * buffer for quickly searching replication * offset on partial resynchronization. */ long long histlen; /* Backlog actual data length */ @@ -1106,6 +1107,7 @@ typedef struct client { buffer or object being sent. */ time_t ctime; /* Client creation time. */ long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */ + int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; uint64_t flags; /* Client flags: CLIENT_* macros. */ @@ -1323,6 +1325,15 @@ struct redisMemOverhead { } *db; }; +/* Replication error behavior determines the replica behavior + * when it receives an error over the replication stream. In + * either case the error is logged. */ +typedef enum { + PROPAGATION_ERR_BEHAVIOR_IGNORE = 0, + PROPAGATION_ERR_BEHAVIOR_PANIC, + PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS +} replicationErrorBehavior; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1451,6 +1462,7 @@ struct redisServer { redisAtomic unsigned int lruclock; /* Clock for LRU eviction */ volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */ mstime_t shutdown_mstime; /* Timestamp to limit graceful shutdown. */ + int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */ int shutdown_flags; /* Flags passed to prepareForShutdown(). */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1493,6 +1505,7 @@ struct redisServer { socketFds ipfd; /* TCP socket file descriptors */ socketFds tlsfd; /* TLS socket file descriptors */ int sofd; /* Unix socket file descriptor */ + uint32_t socket_mark_id; /* ID for listen socket marking */ socketFds cfd; /* Cluster bus listening socket */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ @@ -1555,6 +1568,7 @@ struct redisServer { monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */ size_t stat_peak_memory; /* Max used memory record */ long long stat_aof_rewrites; /* number of aof file rewrites performed */ + long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */ long long stat_rdb_saves; /* number of rdb saves performed */ long long stat_fork_time; /* Time needed to perform latest fork() */ double stat_fork_rate; /* Fork rate in GB/sec. */ @@ -1717,6 +1731,8 @@ struct redisServer { * abort(). useful for Valgrind. */ /* Shutdown */ int shutdown_timeout; /* Graceful shutdown time limit in seconds. */ + int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */ + int shutdown_on_sigterm; /* Shutdown flags configured for SIGTERM. */ /* Replication (master) */ char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ @@ -1769,6 +1785,10 @@ struct redisServer { int replica_announced; /* If true, replica is announced by Sentinel */ int slave_announce_port; /* Give the master this listening port. */ char *slave_announce_ip; /* Give the master this ip address. */ + int propagation_error_behavior; /* Configures the behavior of the replica + * when it receives an error on the replication stream */ + int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to + * persist writes to AOF. */ /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ @@ -2042,6 +2062,7 @@ typedef struct redisCommandArg { const char *summary; const char *since; int flags; + const char *deprecated_since; struct redisCommandArg *subargs; /* runtime populated data */ int num_args; @@ -2349,6 +2370,7 @@ int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int ar moduleType *moduleTypeLookupModuleByID(uint64_t id); void moduleTypeNameByID(char *name, uint64_t moduleid); const char *moduleTypeModuleName(moduleType *mt); +const char *moduleNameFromCommand(struct redisCommand *cmd); void moduleFreeContext(struct RedisModuleCtx *ctx); void unblockClientFromModule(client *c); void moduleHandleBlockedClients(void); @@ -2509,7 +2531,7 @@ void unprotectClient(client *c); void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); -void clientInstallWriteHandler(client *c); +void putClientInPendingWriteQueue(client *c); #ifdef __GNUC__ void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) @@ -2860,6 +2882,8 @@ struct redisCommand *lookupCommandBySds(sds s); struct redisCommand *lookupCommandByCStringLogic(dict *commands, const char *s); struct redisCommand *lookupCommandByCString(const char *s); struct redisCommand *lookupCommandOrOriginal(robj **argv, int argc); +int commandCheckExistence(client *c, sds *err); +int commandCheckArity(client *c, sds *err); void startCommandExecution(); int incrCommandStatsOnError(struct redisCommand *cmd, int flags); void call(client *c, int flags); @@ -2877,7 +2901,7 @@ int prepareForShutdown(int flags); void replyToClientsBlockedOnShutdown(void); int abortShutdown(void); void afterCommand(client *c); -int inNestedCall(void); +int mustObeyClient(client *c); #ifdef __GNUC__ void _serverLog(int level, const char *fmt, ...) __attribute__((format(printf, 2, 3))); @@ -2962,8 +2986,8 @@ int pubsubUnsubscribeAllChannels(client *c, int notify); int pubsubUnsubscribeShardAllChannels(client *c, int notify); void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count); int pubsubUnsubscribeAllPatterns(client *c, int notify); -int pubsubPublishMessage(robj *channel, robj *message); -int pubsubPublishMessageShard(robj *channel, robj *message); +int pubsubPublishMessage(robj *channel, robj *message, int sharded); +int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded); void addReplyPubsubMessage(client *c, robj *channel, robj *msg); int serverPubsubSubscriptionCount(); int serverPubsubShardSubscriptionCount(); diff --git a/src/t_hash.c b/src/t_hash.c index 92f5cb2b0..7aec270aa 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -146,39 +146,24 @@ robj *hashTypeGetValueObject(robj *o, sds field) { * exist. */ size_t hashTypeGetValueLength(robj *o, sds field) { size_t len = 0; - if (o->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; - if (hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll) == 0) - len = vstr ? vlen : sdigits10(vll); - } else if (o->encoding == OBJ_ENCODING_HT) { - sds aux; + if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) + len = vstr ? vlen : sdigits10(vll); - if ((aux = hashTypeGetFromHashTable(o, field)) != NULL) - len = sdslen(aux); - } else { - serverPanic("Unknown hash encoding"); - } return len; } /* Test if the specified field exists in the given hash. Returns 1 if the field * exists, and 0 when it doesn't. */ int hashTypeExists(robj *o, sds field) { - if (o->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; - if (hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll) == 0) return 1; - } else if (o->encoding == OBJ_ENCODING_HT) { - if (hashTypeGetFromHashTable(o, field) != NULL) return 1; - } else { - serverPanic("Unknown hash encoding"); - } - return 0; + return hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK; } /* Add a new field, overwrite the old with the new value if it already exists. @@ -205,6 +190,14 @@ int hashTypeExists(robj *o, sds field) { int hashTypeSet(robj *o, sds field, sds value, int flags) { int update = 0; + /* Check if the field is too long for listpack, and convert before adding the item. + * This is needed for HINCRBY* case since in other commands this is handled early by + * hashTypeTryConversion, so this check will be a NOP. */ + if (o->encoding == OBJ_ENCODING_LISTPACK) { + if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value) + hashTypeConvert(o, OBJ_ENCODING_HT); + } + if (o->encoding == OBJ_ENCODING_LISTPACK) { unsigned char *zl, *fptr, *vptr; @@ -717,37 +710,23 @@ void hincrbyfloatCommand(client *c) { } static void addHashFieldToReply(client *c, robj *o, sds field) { - int ret; - if (o == NULL) { addReplyNull(c); return; } - if (o->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *vstr = NULL; - unsigned int vlen = UINT_MAX; - long long vll = LLONG_MAX; + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; - ret = hashTypeGetFromListpack(o, field, &vstr, &vlen, &vll); - if (ret < 0) { - addReplyNull(c); + if (hashTypeGetValue(o, field, &vstr, &vlen, &vll) == C_OK) { + if (vstr) { + addReplyBulkCBuffer(c, vstr, vlen); } else { - if (vstr) { - addReplyBulkCBuffer(c, vstr, vlen); - } else { - addReplyBulkLongLong(c, vll); - } + addReplyBulkLongLong(c, vll); } - - } else if (o->encoding == OBJ_ENCODING_HT) { - sds value = hashTypeGetFromHashTable(o, field); - if (value == NULL) - addReplyNull(c); - else - addReplyBulkCBuffer(c, value, sdslen(value)); } else { - serverPanic("Unknown hash encoding"); + addReplyNull(c); } } @@ -907,7 +886,7 @@ void hscanCommand(client *c) { scanGenericCommand(c,o,cursor); } -static void harndfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { +static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) { for (unsigned long i = 0; i < count; i++) { if (vals && c->resp > 2) addReplyArrayLen(c,2); @@ -990,7 +969,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { sample_count = count > limit ? limit : count; count -= sample_count; lpRandomPairs(hash->ptr, sample_count, keys, vals); - harndfieldReplyWithListpack(c, sample_count, keys, vals); + hrandfieldReplyWithListpack(c, sample_count, keys, vals); } zfree(keys); zfree(vals); @@ -1092,7 +1071,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { if (withvalues) vals = zmalloc(sizeof(listpackEntry)*count); serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count); - harndfieldReplyWithListpack(c, count, keys, vals); + hrandfieldReplyWithListpack(c, count, keys, vals); zfree(keys); zfree(vals); return; diff --git a/src/t_stream.c b/src/t_stream.c index cd7d9723e..e6e5da731 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1000,7 +1000,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i return -1; } - if (c == server.master || c->id == CLIENT_ID_AOF) { + if (mustObeyClient(c)) { /* If command came from master or from AOF we must not enforce maxnodes * (The maxlen/minid argument was re-written to make sure there's no * inconsistency). */ @@ -1370,24 +1370,35 @@ void streamLastValidID(stream *s, streamID *maxid) streamIteratorStop(&si); } +/* Maximum size for a stream ID string. In theory 20*2+1 should be enough, + * But to avoid chance for off by one issues and null-term, in case this will + * be used as parsing buffer, we use a slightly larger buffer. On the other + * hand considering sds header is gonna add 4 bytes, we wanna keep below the + * allocator's 48 bytes bin. */ +#define STREAM_ID_STR_LEN 44 + +sds createStreamIDString(streamID *id) { + /* Optimization: pre-allocate a big enough buffer to avoid reallocs. */ + sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN); + sdssetlen(str, 0); + return sdscatfmt(str,"%U-%U", id->ms,id->seq); +} + /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard <ms>-<seq> format, using the simple string protocol * of REPL. */ void addReplyStreamID(client *c, streamID *id) { - sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - addReplyBulkSds(c,replyid); + addReplyBulkSds(c,createStreamIDString(id)); } void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { - sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - setDeferredReplyBulkSds(c, dr, replyid); + setDeferredReplyBulkSds(c, dr, createStreamIDString(id)); } /* Similar to the above function, but just creates an object, usually useful * for replication purposes to create arguments. */ robj *createObjectFromStreamID(streamID *id) { - return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U", - id->ms,id->seq)); + return createObject(OBJ_STRING, createStreamIDString(id)); } /* Returns non-zero if the ID is 0-0. */ @@ -2025,7 +2036,8 @@ void xaddCommand(client *c) { addReplyError(c,"Elements are too large to be stored"); return; } - addReplyStreamID(c,&id); + sds replyid = createStreamIDString(&id); + addReplyBulkCBuffer(c, replyid, sdslen(replyid)); signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); @@ -2050,9 +2062,11 @@ void xaddCommand(client *c) { /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ if (!parsed_args.id_given || !parsed_args.seq_given) { - robj *idarg = createObjectFromStreamID(&id); + robj *idarg = createObject(OBJ_STRING, replyid); rewriteClientCommandArgument(c, idpos, idarg); decrRefCount(idarg); + } else { + sdsfree(replyid); } /* We need to signal to blocked clients that there is new data on this diff --git a/src/t_string.c b/src/t_string.c index 2b43a5700..7b67b78ce 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -38,7 +38,7 @@ int getGenericCommand(client *c); *----------------------------------------------------------------------------*/ static int checkStringLength(client *c, long long size) { - if (!(c->flags & CLIENT_MASTER) && size > server.proto_max_bulk_len) { + if (!mustObeyClient(c) && size > server.proto_max_bulk_len) { addReplyError(c,"string exceeds maximum allowed size (proto-max-bulk-len)"); return C_ERR; } @@ -792,7 +792,7 @@ void lcsCommand(client *c) { /* Setup an uint32_t array to store at LCS[i,j] the length of the * LCS A0..i-1, B0..j-1. Note that we have a linear array here, so - * we index it as LCS[j+(blen+1)*j] */ + * we index it as LCS[j+(blen+1)*i] */ #define LCS(A,B) lcs[(B)+((A)*(blen+1))] /* Try to allocate the LCS table, and abort on overflow or insufficient memory. */ diff --git a/src/t_zset.c b/src/t_zset.c index 77ca7c83a..7796a6dec 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1029,17 +1029,25 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, doub unsigned char *sptr; char scorebuf[MAX_D2STRING_CHARS]; int scorelen; - - scorelen = d2string(scorebuf,sizeof(scorebuf),score); + long long lscore; + int score_is_long = double2ll(score, &lscore); + if (!score_is_long) + scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele)); - zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen); + if (score_is_long) + zl = lpAppendInteger(zl,lscore); + else + zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen); } else { /* Insert member before the element 'eptr'. */ zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr); /* Insert score after the member. */ - zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL); + if (score_is_long) + zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL); + else + zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL); } return zl; } @@ -3964,7 +3972,7 @@ void zpopminCommand(client *c) { zpopMinMaxCommand(c, ZSET_MIN); } -/* ZMAXPOP key [<count>] */ +/* ZPOPMAX key [<count>] */ void zpopmaxCommand(client *c) { zpopMinMaxCommand(c, ZSET_MAX); } @@ -4351,12 +4359,12 @@ void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) { } } -/* ZMPOP numkeys [<key> ...] MIN|MAX [COUNT count] */ +/* ZMPOP numkeys key [<key> ...] MIN|MAX [COUNT count] */ void zmpopCommand(client *c) { zmpopGenericCommand(c, 1, 0); } -/* BZMPOP timeout numkeys [<key> ...] MIN|MAX [COUNT count] */ +/* BZMPOP timeout numkeys key [<key> ...] MIN|MAX [COUNT count] */ void bzmpopCommand(client *c) { zmpopGenericCommand(c, 2, 1); } diff --git a/src/timeout.c b/src/timeout.c index d4c4690e5..36cea7c26 100644 --- a/src/timeout.c +++ b/src/timeout.c @@ -58,7 +58,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { if (server.maxidletime && /* This handles the idle clients connection timeout if set. */ !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !mustObeyClient(c) && /* No timeout for masters and AOF */ !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ (now - c->lastinteraction > server.maxidletime)) diff --git a/src/util.c b/src/util.c index 45591d9f2..85b631f19 100644 --- a/src/util.c +++ b/src/util.c @@ -552,6 +552,36 @@ int string2d(const char *s, size_t slen, double *dp) { return 1; } +/* Returns 1 if the double value can safely be represented in long long without + * precision loss, in which case the corresponding long long is stored in the out variable. */ +int double2ll(double d, long long *out) { +#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL) + /* Check if the float is in a safe range to be casted into a + * long long. We are assuming that long long is 64 bit here. + * Also we are assuming that there are no implementations around where + * double has precision < 52 bit. + * + * Under this assumptions we test if a double is inside a range + * where casting to long long is safe. Then using two castings we + * make sure the decimal part is zero. If all this is true we can use + * integer without precision loss. + * + * Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part, + * and the exponent bits are positive, which means the "decimal" part must be 0. + * i.e. all double values in that range are representable as a long without precision loss, + * but not all long values in that range can be represented as a double. + * we only care about the first part here. */ + if (d < (double)(-LLONG_MAX/2) || d > (double)(LLONG_MAX/2)) + return 0; + long long ll = d; + if (ll == d) { + *out = ll; + return 1; + } +#endif + return 0; +} + /* Convert a double to a string representation. Returns the number of bytes * required. The representation should always be parsable by strtod(3). * This function does not support human-friendly formatting like ld2string @@ -572,22 +602,11 @@ int d2string(char *buf, size_t len, double value) { else len = snprintf(buf,len,"0"); } else { -#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) - /* Check if the float is in a safe range to be casted into a - * long long. We are assuming that long long is 64 bit here. - * Also we are assuming that there are no implementations around where - * double has precision < 52 bit. - * - * Under this assumptions we test if a double is inside an interval - * where casting to long long is safe. Then using two castings we - * make sure the decimal part is zero. If all this is true we use - * integer printing function that is much faster. */ - double min = -4503599627370495; /* (2^52)-1 */ - double max = 4503599627370496; /* -(2^52) */ - if (value > min && value < max && value == ((double)((long long)value))) - len = ll2string(buf,len,(long long)value); + long long lvalue; + /* Integer printing function is much faster, check if we can safely use it. */ + if (double2ll(value, &lvalue)) + len = ll2string(buf,len,lvalue); else -#endif len = snprintf(buf,len,"%.17g",value); } diff --git a/src/util.h b/src/util.h index 5ea71fecd..0515f1a83 100644 --- a/src/util.h +++ b/src/util.h @@ -75,6 +75,7 @@ int string2d(const char *s, size_t slen, double *dp); int trimDoubleString(char *buf, size_t len); int d2string(char *buf, size_t len, double value); int ld2string(char *buf, size_t len, long double value, ld2string_mode mode); +int double2ll(double d, long long *out); int yesnotoi(char *s); sds getAbsolutePath(char *filename); long getTimeZone(void); diff --git a/src/version.h b/src/version.h index b9dc9258e..4e5e62b08 100644 --- a/src/version.h +++ b/src/version.h @@ -1,2 +1,2 @@ -#define REDIS_VERSION "6.9.242" -#define REDIS_VERSION_NUM 0x000609f2 +#define REDIS_VERSION "7.0.0" +#define REDIS_VERSION_NUM 0x00070000 |