diff options
-rw-r--r-- | .github/workflows/ci.yml | 2 | ||||
-rw-r--r-- | src/cluster.c | 2 | ||||
-rw-r--r-- | src/db.c | 6 | ||||
-rw-r--r-- | src/debug.c | 84 | ||||
-rw-r--r-- | src/expire.c | 1 | ||||
-rw-r--r-- | src/module.c | 17 | ||||
-rw-r--r-- | src/networking.c | 19 | ||||
-rw-r--r-- | src/quicklist.c | 4 | ||||
-rw-r--r-- | src/rdb.c | 4 | ||||
-rw-r--r-- | src/rdb.h | 2 | ||||
-rw-r--r-- | src/redismodule.h | 9 | ||||
-rw-r--r-- | src/scripting.c | 7 | ||||
-rw-r--r-- | src/server.c | 17 | ||||
-rw-r--r-- | src/server.h | 3 | ||||
-rw-r--r-- | src/t_stream.c | 20 | ||||
-rw-r--r-- | src/t_string.c | 2 | ||||
-rw-r--r-- | src/util.c | 4 | ||||
-rw-r--r-- | tests/modules/blockonkeys.c | 122 | ||||
-rw-r--r-- | tests/support/server.tcl | 7 | ||||
-rw-r--r-- | tests/test_helper.tcl | 19 | ||||
-rw-r--r-- | tests/unit/moduleapi/blockonkeys.tcl | 108 | ||||
-rw-r--r-- | tests/unit/scripting.tcl | 11 | ||||
-rw-r--r-- | tests/unit/type/incr.tcl | 7 | ||||
-rw-r--r-- | tests/unit/type/stream-cgroups.tcl | 12 |
24 files changed, 376 insertions, 113 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc4991606..3a81d1a08 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,8 @@ jobs: run: | sudo apt-get install tcl8.5 ./runtest --clients 2 --verbose + - name: module api test + run: ./runtest-moduleapi --clients 2 --verbose build-ubuntu-old: runs-on: ubuntu-16.04 diff --git a/src/cluster.c b/src/cluster.c index a2e9ff5b6..385ff5763 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -933,7 +933,7 @@ int clusterAddNode(clusterNode *node) { return (retval == DICT_OK) ? C_OK : C_ERR; } -/* Remove a node from the cluster. The functio performs the high level +/* Remove a node from the cluster. The function performs the high level * cleanup, calling freeClusterNode() for the low level cleanup. * Here we do the following: * @@ -221,7 +221,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * unless 'keepttl' is true. * * All the new keys in the database should be created via this interface. */ -void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl) { +void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); } else { @@ -229,12 +229,12 @@ void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl) { } incrRefCount(val); if (!keepttl) removeExpire(db,key); - signalModifiedKey(db,key); + if (signal) signalModifiedKey(db,key); } /* Common case for genericSetKey() where the TTL is not retained. */ void setKey(redisDb *db, robj *key, robj *val) { - genericSetKey(db,key,val,0); + genericSetKey(db,key,val,0,1); } /* Return true if the specified key exists in the specified database. diff --git a/src/debug.c b/src/debug.c index 83e5b6197..baaaa2424 100644 --- a/src/debug.c +++ b/src/debug.c @@ -490,7 +490,7 @@ NULL "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", (void*)val, val->refcount, - strenc, rdbSavedObjectLen(val), + strenc, rdbSavedObjectLen(val, c->argv[2]), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { dictEntry *de; @@ -1047,6 +1047,61 @@ void logRegisters(ucontext_t *uc) { (unsigned long) uc->uc_mcontext.gregs[18] ); logStackContent((void**)uc->uc_mcontext.gregs[15]); + #elif defined(__aarch64__) /* Linux AArch64 */ + serverLog(LL_WARNING, + "\n" + "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n" + "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n" + "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n" + "X30:%016lx\n" + "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n", + (unsigned long) uc->uc_mcontext.regs[18], + (unsigned long) uc->uc_mcontext.regs[19], + (unsigned long) uc->uc_mcontext.regs[20], + (unsigned long) uc->uc_mcontext.regs[21], + (unsigned long) uc->uc_mcontext.regs[22], + (unsigned long) uc->uc_mcontext.regs[23], + (unsigned long) uc->uc_mcontext.regs[24], + (unsigned long) uc->uc_mcontext.regs[25], + (unsigned long) uc->uc_mcontext.regs[26], + (unsigned long) uc->uc_mcontext.regs[27], + (unsigned long) uc->uc_mcontext.regs[28], + (unsigned long) uc->uc_mcontext.regs[29], + (unsigned long) uc->uc_mcontext.regs[30], + (unsigned long) uc->uc_mcontext.pc, + (unsigned long) uc->uc_mcontext.sp, + (unsigned long) uc->uc_mcontext.pstate, + (unsigned long) uc->uc_mcontext.fault_address + ); + logStackContent((void**)uc->uc_mcontext.sp); + #elif defined(__arm__) /* Linux ARM */ + serverLog(LL_WARNING, + "\n" + "R10:%016lx R9 :%016lx\nR8 :%016lx R7 :%016lx\n" + "R6 :%016lx R5 :%016lx\nR4 :%016lx R3 :%016lx\n" + "R2 :%016lx R1 :%016lx\nR0 :%016lx EC :%016lx\n" + "fp: %016lx ip:%016lx\n", + "pc:%016lx sp:%016lx\ncpsr:%016lx fault_address:%016lx\n", + (unsigned long) uc->uc_mcontext.arm_r10, + (unsigned long) uc->uc_mcontext.arm_r9, + (unsigned long) uc->uc_mcontext.arm_r8, + (unsigned long) uc->uc_mcontext.arm_r7, + (unsigned long) uc->uc_mcontext.arm_r6, + (unsigned long) uc->uc_mcontext.arm_r5, + (unsigned long) uc->uc_mcontext.arm_r4, + (unsigned long) uc->uc_mcontext.arm_r3, + (unsigned long) uc->uc_mcontext.arm_r2, + (unsigned long) uc->uc_mcontext.arm_r1, + (unsigned long) uc->uc_mcontext.arm_r0, + (unsigned long) uc->uc_mcontext.error_code, + (unsigned long) uc->uc_mcontext.arm_fp, + (unsigned long) uc->uc_mcontext.arm_ip, + (unsigned long) uc->uc_mcontext.arm_pc, + (unsigned long) uc->uc_mcontext.arm_sp, + (unsigned long) uc->uc_mcontext.arm_cpsr, + (unsigned long) uc->uc_mcontext.fault_address + ); + logStackContent((void**)uc->uc_mcontext.arm_sp); #endif #elif defined(__FreeBSD__) #if defined(__x86_64__) @@ -1187,33 +1242,6 @@ void logRegisters(ucontext_t *uc) { (unsigned long) uc->uc_mcontext.mc_cs ); logStackContent((void**)uc->uc_mcontext.mc_rsp); -#elif defined(__aarch64__) /* Linux AArch64 */ - serverLog(LL_WARNING, - "\n" - "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n" - "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n" - "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n" - "X30:%016lx\n" - "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n", - (unsigned long) uc->uc_mcontext.regs[18], - (unsigned long) uc->uc_mcontext.regs[19], - (unsigned long) uc->uc_mcontext.regs[20], - (unsigned long) uc->uc_mcontext.regs[21], - (unsigned long) uc->uc_mcontext.regs[22], - (unsigned long) uc->uc_mcontext.regs[23], - (unsigned long) uc->uc_mcontext.regs[24], - (unsigned long) uc->uc_mcontext.regs[25], - (unsigned long) uc->uc_mcontext.regs[26], - (unsigned long) uc->uc_mcontext.regs[27], - (unsigned long) uc->uc_mcontext.regs[28], - (unsigned long) uc->uc_mcontext.regs[29], - (unsigned long) uc->uc_mcontext.regs[30], - (unsigned long) uc->uc_mcontext.pc, - (unsigned long) uc->uc_mcontext.sp, - (unsigned long) uc->uc_mcontext.pstate, - (unsigned long) uc->uc_mcontext.fault_address - ); - logStackContent((void**)uc->uc_mcontext.sp); #else serverLog(LL_WARNING, " Dumping of registers not supported for this OS/arch"); diff --git a/src/expire.c b/src/expire.c index 5aff72ee0..c102a01ff 100644 --- a/src/expire.c +++ b/src/expire.c @@ -590,6 +590,7 @@ void pttlCommand(client *c) { void persistCommand(client *c) { if (lookupKeyWrite(c->db,c->argv[1])) { if (removeExpire(c->db,c->argv[1])) { + notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id); addReply(c,shared.cone); server.dirty++; } else { diff --git a/src/module.c b/src/module.c index 61dc25169..4d3d9e1af 100644 --- a/src/module.c +++ b/src/module.c @@ -2157,7 +2157,7 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); - setKey(key->db,key->key,str); + genericSetKey(key->db,key->key,str,0,0); key->value = str; return REDISMODULE_OK; } @@ -2237,7 +2237,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) { if (key->value == NULL) { /* Empty key: create it with the new size. */ robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen)); - setKey(key->db,key->key,o); + genericSetKey(key->db,key->key,o,0,0); key->value = o; decrRefCount(o); } else { @@ -3625,7 +3625,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) { if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR; RM_DeleteKey(key); robj *o = createModuleObject(mt,value); - setKey(key->db,key->key,o); + genericSetKey(key->db,key->key,o,0,0); decrRefCount(o); key->value = o; return REDISMODULE_OK; @@ -4393,14 +4393,17 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF * can really be unblocked, since the module was able to serve the client. * If the callback returns REDISMODULE_OK, then the client can be unblocked, * otherwise the client remains blocked and we'll retry again when one of - * the keys it blocked for becomes "ready" again. */ + * the keys it blocked for becomes "ready" again. + * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). */ - if (bc->unblocked) return REDISMODULE_ERR; + * explicit call). See #6798. */ + if (bc->unblocked) return 0; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; @@ -5975,7 +5978,7 @@ sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int struct RedisModule *module = dictGetVal(de); if (!module->info_cb) continue; - RedisModuleInfoCtx info_ctx = {module, section, info, sections, 0}; + RedisModuleInfoCtx info_ctx = {module, section, info, sections, 0, 0}; module->info_cb(&info_ctx, for_crash_report); /* Implicitly end dicts (no way to handle errors, and we must add the newline). */ if (info_ctx.in_dict_field) diff --git a/src/networking.c b/src/networking.c index 3c754c376..85c640e34 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2326,6 +2326,25 @@ NULL return; } + if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT) + { + addReplyError(c, + "You can't specify both OPTIN mode and OPTOUT mode"); + zfree(prefix); + return; + } + + if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) || + (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN)) + { + addReplyError(c, + "You can't switch OPTIN/OPTOUT mode before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + enableTracking(c,redir,options,prefix,numprefix); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { disableTracking(c); diff --git a/src/quicklist.c b/src/quicklist.c index ae183ffd8..52e3988f5 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -110,7 +110,7 @@ quicklist *quicklistCreate(void) { return quicklist; } -#define COMPRESS_MAX (1 << QL_COMP_BITS) +#define COMPRESS_MAX ((1 << QL_COMP_BITS)-1) void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; @@ -120,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) { quicklist->compress = compress; } -#define FILL_MAX (1 << (QL_FILL_BITS-1)) +#define FILL_MAX ((1 << (QL_FILL_BITS-1))-1) void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; @@ -1002,8 +1002,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { * the rdbSaveObject() function. Currently we use a trick to get * this length with very little changes to the code. In the future * we could switch to a faster solution. */ -size_t rdbSavedObjectLen(robj *o) { - ssize_t len = rdbSaveObject(NULL,o,NULL); +size_t rdbSavedObjectLen(robj *o, robj *key) { + ssize_t len = rdbSaveObject(NULL,o,key); serverAssertWithInfo(NULL,o,len != -1); return len; } @@ -143,7 +143,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); int rdbSave(char *filename, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key); -size_t rdbSavedObjectLen(robj *o); +size_t rdbSavedObjectLen(robj *o, robj *key); robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); diff --git a/src/redismodule.h b/src/redismodule.h index d26c41456..23b4d26e0 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -470,7 +470,11 @@ RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(Re RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromDouble)(RedisModuleCtx *ctx, double d); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str); +#ifdef __GNUC__ +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...) __attribute__ ((format (printf, 2, 3))); +#else RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...); +#endif void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str); const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len); int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err); @@ -554,8 +558,13 @@ void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long do long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt); +#ifdef __GNUC__ +void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...) __attribute__ ((format (printf, 3, 4))); +void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) __attribute__ ((format (printf, 3, 4))); +#else void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); +#endif void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); void REDISMODULE_API_FUNC(RedisModule_LatencyAddSample)(const char *event, mstime_t latency); int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len); diff --git a/src/scripting.c b/src/scripting.c index 7f64e06db..32a511e13 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -657,12 +657,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { !server.loading && /* Don't care about mem if loading. */ !server.masterhost && /* Slave must execute the script. */ server.lua_write_dirty == 0 && /* Script had no side effects so far. */ + server.lua_oom && /* Detected OOM when script start. */ (cmd->flags & CMD_DENYOOM)) { - if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) { - luaPushError(lua, shared.oomerr->ptr); - goto cleanup; - } + luaPushError(lua, shared.oomerr->ptr); + goto cleanup; } if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1; diff --git a/src/server.c b/src/server.c index d06a9e29a..56feb09a3 100644 --- a/src/server.c +++ b/src/server.c @@ -672,15 +672,15 @@ struct redisCommand redisCommandTable[] = { 0,NULL,1,1,1,0,0,0}, {"multi",multiCommand,1, - "no-script fast @transaction", + "no-script fast ok-loading ok-stale @transaction", 0,NULL,0,0,0,0,0,0}, {"exec",execCommand,1, - "no-script no-monitor no-slowlog @transaction", + "no-script no-monitor no-slowlog ok-loading ok-stale @transaction", 0,NULL,0,0,0,0,0,0}, {"discard",discardCommand,1, - "no-script fast @transaction", + "no-script fast ok-loading ok-stale @transaction", 0,NULL,0,0,0,0,0,0}, {"sync",syncCommand,1, @@ -947,11 +947,11 @@ struct redisCommand redisCommandTable[] = { 0,NULL,1,1,1,0,0,0}, {"xread",xreadCommand,-4, - "read-only no-script @stream @blocking", + "read-only @stream @blocking", 0,xreadGetKeys,1,1,1,0,0,0}, {"xreadgroup",xreadCommand,-7, - "write no-script @stream @blocking", + "write @stream @blocking", 0,xreadGetKeys,1,1,1,0,0,0}, {"xgroup",xgroupCommand,-2, @@ -3445,6 +3445,13 @@ int processCommand(client *c) { addReply(c, shared.oomerr); return C_OK; } + + /* Save out_of_memory result at script start, otherwise if we check OOM + * untill first write within script, memory used by lua stack and + * arguments might interfere. */ + if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) { + server.lua_oom = out_of_memory; + } } /* Make sure to use a reasonable amount of memory for client side diff --git a/src/server.h b/src/server.h index 1472bcee7..b17995948 100644 --- a/src/server.h +++ b/src/server.h @@ -1392,6 +1392,7 @@ struct redisServer { execution. */ int lua_kill; /* Kill the script if true. */ int lua_always_replicate_commands; /* Default replication type. */ + int lua_oom; /* OOM detected when script start? */ /* Lazy free */ int lazyfree_lazy_eviction; int lazyfree_lazy_expire; @@ -2057,7 +2058,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, #define LOOKUP_NOTOUCH (1<<0) void dbAdd(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val); -void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl); +void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal); void setKey(redisDb *db, robj *key, robj *val); int dbExists(redisDb *db, robj *key); robj *dbRandomKey(redisDb *db); diff --git a/src/t_stream.c b/src/t_stream.c index 557d1d642..e0af87f97 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -935,7 +935,6 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamIterator si; int64_t numfields; streamID id; - int propagate_last_id = 0; /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its @@ -951,6 +950,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end arraylen_ptr = addReplyDeferredLen(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { + int propagate_last_id = 0; + /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { group->last_id = id; @@ -1373,6 +1374,11 @@ void xreadCommand(client *c) { int moreargs = c->argc-i-1; char *o = c->argv[i]->ptr; if (!strcasecmp(o,"BLOCK") && moreargs) { + if (c->flags & CLIENT_LUA) { + /* There is no sense to use BLOCK option within LUA */ + addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr); + return; + } i++; if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, UNIT_MILLISECONDS) != C_OK) return; @@ -1921,11 +1927,21 @@ void xackCommand(client *c) { return; } + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully acknowledged some messages, so it should be + * executed in a "all or nothing" fashion. */ + for (int j = 3; j < c->argc; j++) { + streamID id; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + } + int acknowledged = 0; for (int j = 3; j < c->argc; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) + serverPanic("StreamID invalid after check. Should not be possible."); streamEncodeID(buf,&id); /* Lookup the ID in the group PEL: it will have a reference to the diff --git a/src/t_string.c b/src/t_string.c index 4e693cefa..335bda404 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -84,7 +84,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; } - genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL); + genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL,1); server.dirty++; if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); diff --git a/src/util.c b/src/util.c index 2be42a0df..bd8f0fb98 100644 --- a/src/util.c +++ b/src/util.c @@ -602,6 +602,10 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) { } if (*p == '.') l--; } + if (l == 2 && buf[0] == '-' && buf[1] == '0') { + buf[0] = '0'; + l = 1; + } break; default: return 0; /* Invalid mode. */ } diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 10dc65b1a..94f31d455 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -109,41 +109,33 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); fsl->list[fsl->length++] = ele; - - if (fsl->length >= 2) - RedisModule_SignalKeyAsReady(ctx, argv[1]); + RedisModule_SignalKeyAsReady(ctx, argv[1]); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) - return REDISMODULE_ERR; - - if (!fsl || fsl->length < 2) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); return REDISMODULE_OK; } -int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); } - -/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements. +/* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements. * When that happens, unblock client and pop the last two elements (from the right). */ -int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) return RedisModule_WrongArity(ctx); @@ -155,13 +147,10 @@ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; - if (!fsl || fsl->length < 2) { - /* Key is empty or has <2 elements, we must block */ - RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + if (!fsl) { + RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback, NULL, timeout, &argv[1], 1, NULL); } else { - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } @@ -175,10 +164,10 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - if (!fsl || fsl->list[fsl->length-1] <= *pgt) + if (fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); @@ -218,7 +207,6 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ long long *pgt = RedisModule_Alloc(sizeof(long long)); *pgt = gt; - /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, bpopgt_free_privdata, timeout, &argv[1], 1, pgt); } else { @@ -228,6 +216,88 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx); + RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *src; + if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src) + return REDISMODULE_ERR; + + fsl_t *dst; + if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst) + return REDISMODULE_ERR; + + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, dst_keyname); + return RedisModule_ReplyWithLongLong(ctx, ele); +} + +int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) { + RedisModule_FreeString(ctx, privdata); +} + +/* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element. + * When that happens, unblock client, pop the last element from <src> and push it to <dst> + * (from the right). */ +int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *src; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1)) + return REDISMODULE_OK; + + if (!src) { + /* Retain string for reply callback */ + RedisModule_RetainString(ctx, argv[2]); + /* Key is empty, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback, + bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]); + } else { + fsl_t *dst; + if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1)) + return REDISMODULE_OK; + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, argv[2]); + RedisModule_ReplyWithLongLong(ctx, ele); + } + + return REDISMODULE_OK; +} + +/* FSL.GETALL <key> - Reply with an array containing all elements. */ +int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl) + return RedisModule_ReplyWithArray(ctx, 0); + + RedisModule_ReplyWithArray(ctx, fsl->length); + for (int i = 0; i < fsl->length; i++) + RedisModule_ReplyWithLongLong(ctx, fsl->list[i]); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -252,11 +322,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 400017c5f..d086366dc 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -159,12 +159,9 @@ proc start_server {options {code undefined}} { if {$::external} { if {[llength $::servers] == 0} { set srv {} - # In test_server_main(tests/test_helper.tcl:215~218), increase the value of start_port - # and assign it to ::port through the `--port` option, so we need to reduce it. - set baseport [expr {$::port-100}] dict set srv "host" $::host - dict set srv "port" $baseport - set client [redis $::host $baseport 0 $::tls] + dict set srv "port" $::port + set client [redis $::host $::port 0 $::tls] dict set srv "client" $client $client select 9 diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 5cb43104b..d80cb6907 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -212,13 +212,19 @@ proc test_server_main {} { # Start the client instances set ::clients_pids {} - set start_port [expr {$::port+100}] - for {set j 0} {$j < $::numclients} {incr j} { - set start_port [find_available_port $start_port] + if {$::external} { set p [exec $tclsh [info script] {*}$::argv \ - --client $port --port $start_port &] + --client $port --port $::port &] lappend ::clients_pids $p - incr start_port 10 + } else { + set start_port [expr {$::port+100}] + for {set j 0} {$j < $::numclients} {incr j} { + set start_port [find_available_port $start_port] + set p [exec $tclsh [info script] {*}$::argv \ + --client $port --port $start_port &] + lappend ::clients_pids $p + incr start_port 10 + } } # Setup global state for the test server @@ -506,9 +512,6 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--host}} { set ::external 1 set ::host $arg - # If we use an external server, we can only set numclients to 1, - # otherwise the port will be miscalculated. - set ::numclients 1 incr j } elseif {$opt eq {--port}} { set ::port $arg diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index b380227e0..5e5d93da3 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -3,37 +3,70 @@ set testmodule [file normalize tests/modules/blockonkeys.so] start_server {tags {"modules"}} { r module load $testmodule + test "Module client blocked on keys: Circular BPOPPUSH" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + r del src dst + + $rd1 fsl.bpoppush src dst 0 + $rd2 fsl.bpoppush dst src 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {2} + } else { + fail "Clients are not blocked" + } + + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + assert_equal {} [r fsl.getall dst] + } + + test "Module client blocked on keys: Self-referential BPOPPUSH" { + set rd1 [redis_deferring_client] + + r del src + + $rd1 fsl.bpoppush src src 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + } + test {Module client blocked on keys (no metadata): No block} { r del k r fsl.push k 33 r fsl.push k 34 - r fsl.bpop2 k 0 - } {34 33} + r fsl.bpop k 0 + } {34} test {Module client blocked on keys (no metadata): Timeout} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 1 + $rd fsl.bpop k 1 assert_equal {Request timedout} [$rd read] } - test {Module client blocked on keys (no metadata): Blocked, case 1} { + test {Module client blocked on keys (no metadata): Blocked} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r fsl.push k 34 - assert_equal {34 33} [$rd read] - } - - test {Module client blocked on keys (no metadata): Blocked, case 2} { - r del k - set rd [redis_deferring_client] - r fsl.push k 33 - r fsl.push k 34 - $rd fsl.bpop2 k 0 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } test {Module client blocked on keys (with metadata): No block} { @@ -60,6 +93,12 @@ start_server {tags {"modules"}} { set cid [$rd read] r fsl.push k 33 $rd fsl.bpopgt k 33 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r fsl.push k 34 assert_equal {34} [$rd read] r client kill id $cid ;# try to smoke-out client-related memory leak @@ -69,6 +108,12 @@ start_server {tags {"modules"}} { r del k set rd [redis_deferring_client] $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r fsl.push k 33 r fsl.push k 34 r fsl.push k 35 @@ -82,6 +127,12 @@ start_server {tags {"modules"}} { $rd client id set cid [$rd read] $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r client kill id $cid ;# try to smoke-out client-related memory leak } @@ -91,6 +142,12 @@ start_server {tags {"modules"}} { $rd client id set cid [$rd read] $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r client unblock $cid timeout ;# try to smoke-out client-related memory leak assert_equal {Request timedout} [$rd read] } @@ -101,6 +158,12 @@ start_server {tags {"modules"}} { $rd client id set cid [$rd read] $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r client unblock $cid error ;# try to smoke-out client-related memory leak assert_error "*unblocked*" {$rd read} } @@ -108,13 +171,18 @@ start_server {tags {"modules"}} { test {Module client blocked on keys does not wake up on wrong type} { r del k set rd [redis_deferring_client] - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } r lpush k 12 r lpush k 13 r lpush k 14 r del k - r fsl.push k 33 r fsl.push k 34 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } } diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index fb36d0b80..8b364b287 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -146,6 +146,17 @@ start_server {tags {"scripting"}} { set e } {*not allowed*} + test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} { + r del s + r xgroup create s g $ MKSTREAM + set res [r eval {return redis.pcall('xread','STREAMS','s','$')} 1 s] + assert {$res eq {}} + assert_error "*xread command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s} + set res [r eval {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s] + assert {$res eq {}} + assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s} + } + test {EVAL - Scripts can't run certain commands} { set e {} r debug lua-always-replicate-commands 0 diff --git a/tests/unit/type/incr.tcl b/tests/unit/type/incr.tcl index a58710d39..b7a135203 100644 --- a/tests/unit/type/incr.tcl +++ b/tests/unit/type/incr.tcl @@ -151,4 +151,11 @@ start_server {tags {"incr"}} { catch {r incrbyfloat foo 1} err format $err } {ERR*valid*} + + test {No negative zero} { + r del foo + r incrbyfloat foo [expr double(1)/41] + r incrbyfloat foo [expr double(-1)/41] + r get foo + } {0} } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a27e1f582..04661707b 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -93,6 +93,18 @@ start_server { assert {[r XACK mystream mygroup $id1 $id2] eq 1} } + test {XACK should fail if got at least one invalid ID} { + r del mystream + r xgroup create s g $ MKSTREAM + r xadd s * f1 v1 + set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]] + assert {$c == 1} + set pending [r xpending s g - + 10 c] + set id1 [lindex $pending 0 0] + assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id} + assert {[r xack s g $id1] eq 1} + } + test {PEL NACK reassignment after XGROUP SETID event} { r del events r xadd events * f1 v1 |