summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ci.yml2
-rw-r--r--src/cluster.c2
-rw-r--r--src/db.c6
-rw-r--r--src/debug.c84
-rw-r--r--src/expire.c1
-rw-r--r--src/module.c17
-rw-r--r--src/networking.c19
-rw-r--r--src/quicklist.c4
-rw-r--r--src/rdb.c4
-rw-r--r--src/rdb.h2
-rw-r--r--src/redismodule.h9
-rw-r--r--src/scripting.c7
-rw-r--r--src/server.c17
-rw-r--r--src/server.h3
-rw-r--r--src/t_stream.c20
-rw-r--r--src/t_string.c2
-rw-r--r--src/util.c4
-rw-r--r--tests/modules/blockonkeys.c122
-rw-r--r--tests/support/server.tcl7
-rw-r--r--tests/test_helper.tcl19
-rw-r--r--tests/unit/moduleapi/blockonkeys.tcl108
-rw-r--r--tests/unit/scripting.tcl11
-rw-r--r--tests/unit/type/incr.tcl7
-rw-r--r--tests/unit/type/stream-cgroups.tcl12
24 files changed, 376 insertions, 113 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index cc4991606..3a81d1a08 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -13,6 +13,8 @@ jobs:
run: |
sudo apt-get install tcl8.5
./runtest --clients 2 --verbose
+ - name: module api test
+ run: ./runtest-moduleapi --clients 2 --verbose
build-ubuntu-old:
runs-on: ubuntu-16.04
diff --git a/src/cluster.c b/src/cluster.c
index a2e9ff5b6..385ff5763 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -933,7 +933,7 @@ int clusterAddNode(clusterNode *node) {
return (retval == DICT_OK) ? C_OK : C_ERR;
}
-/* Remove a node from the cluster. The functio performs the high level
+/* Remove a node from the cluster. The function performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
diff --git a/src/db.c b/src/db.c
index e9e10aeeb..6e5a8bf3a 100644
--- a/src/db.c
+++ b/src/db.c
@@ -221,7 +221,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* unless 'keepttl' is true.
*
* All the new keys in the database should be created via this interface. */
-void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl) {
+void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
@@ -229,12 +229,12 @@ void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl) {
}
incrRefCount(val);
if (!keepttl) removeExpire(db,key);
- signalModifiedKey(db,key);
+ if (signal) signalModifiedKey(db,key);
}
/* Common case for genericSetKey() where the TTL is not retained. */
void setKey(redisDb *db, robj *key, robj *val) {
- genericSetKey(db,key,val,0);
+ genericSetKey(db,key,val,0,1);
}
/* Return true if the specified key exists in the specified database.
diff --git a/src/debug.c b/src/debug.c
index 83e5b6197..baaaa2424 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -490,7 +490,7 @@ NULL
"encoding:%s serializedlength:%zu "
"lru:%d lru_seconds_idle:%llu%s",
(void*)val, val->refcount,
- strenc, rdbSavedObjectLen(val),
+ strenc, rdbSavedObjectLen(val, c->argv[2]),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) {
dictEntry *de;
@@ -1047,6 +1047,61 @@ void logRegisters(ucontext_t *uc) {
(unsigned long) uc->uc_mcontext.gregs[18]
);
logStackContent((void**)uc->uc_mcontext.gregs[15]);
+ #elif defined(__aarch64__) /* Linux AArch64 */
+ serverLog(LL_WARNING,
+ "\n"
+ "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n"
+ "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n"
+ "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n"
+ "X30:%016lx\n"
+ "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n",
+ (unsigned long) uc->uc_mcontext.regs[18],
+ (unsigned long) uc->uc_mcontext.regs[19],
+ (unsigned long) uc->uc_mcontext.regs[20],
+ (unsigned long) uc->uc_mcontext.regs[21],
+ (unsigned long) uc->uc_mcontext.regs[22],
+ (unsigned long) uc->uc_mcontext.regs[23],
+ (unsigned long) uc->uc_mcontext.regs[24],
+ (unsigned long) uc->uc_mcontext.regs[25],
+ (unsigned long) uc->uc_mcontext.regs[26],
+ (unsigned long) uc->uc_mcontext.regs[27],
+ (unsigned long) uc->uc_mcontext.regs[28],
+ (unsigned long) uc->uc_mcontext.regs[29],
+ (unsigned long) uc->uc_mcontext.regs[30],
+ (unsigned long) uc->uc_mcontext.pc,
+ (unsigned long) uc->uc_mcontext.sp,
+ (unsigned long) uc->uc_mcontext.pstate,
+ (unsigned long) uc->uc_mcontext.fault_address
+ );
+ logStackContent((void**)uc->uc_mcontext.sp);
+ #elif defined(__arm__) /* Linux ARM */
+ serverLog(LL_WARNING,
+ "\n"
+ "R10:%016lx R9 :%016lx\nR8 :%016lx R7 :%016lx\n"
+ "R6 :%016lx R5 :%016lx\nR4 :%016lx R3 :%016lx\n"
+ "R2 :%016lx R1 :%016lx\nR0 :%016lx EC :%016lx\n"
+ "fp: %016lx ip:%016lx\n",
+ "pc:%016lx sp:%016lx\ncpsr:%016lx fault_address:%016lx\n",
+ (unsigned long) uc->uc_mcontext.arm_r10,
+ (unsigned long) uc->uc_mcontext.arm_r9,
+ (unsigned long) uc->uc_mcontext.arm_r8,
+ (unsigned long) uc->uc_mcontext.arm_r7,
+ (unsigned long) uc->uc_mcontext.arm_r6,
+ (unsigned long) uc->uc_mcontext.arm_r5,
+ (unsigned long) uc->uc_mcontext.arm_r4,
+ (unsigned long) uc->uc_mcontext.arm_r3,
+ (unsigned long) uc->uc_mcontext.arm_r2,
+ (unsigned long) uc->uc_mcontext.arm_r1,
+ (unsigned long) uc->uc_mcontext.arm_r0,
+ (unsigned long) uc->uc_mcontext.error_code,
+ (unsigned long) uc->uc_mcontext.arm_fp,
+ (unsigned long) uc->uc_mcontext.arm_ip,
+ (unsigned long) uc->uc_mcontext.arm_pc,
+ (unsigned long) uc->uc_mcontext.arm_sp,
+ (unsigned long) uc->uc_mcontext.arm_cpsr,
+ (unsigned long) uc->uc_mcontext.fault_address
+ );
+ logStackContent((void**)uc->uc_mcontext.arm_sp);
#endif
#elif defined(__FreeBSD__)
#if defined(__x86_64__)
@@ -1187,33 +1242,6 @@ void logRegisters(ucontext_t *uc) {
(unsigned long) uc->uc_mcontext.mc_cs
);
logStackContent((void**)uc->uc_mcontext.mc_rsp);
-#elif defined(__aarch64__) /* Linux AArch64 */
- serverLog(LL_WARNING,
- "\n"
- "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n"
- "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n"
- "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n"
- "X30:%016lx\n"
- "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n",
- (unsigned long) uc->uc_mcontext.regs[18],
- (unsigned long) uc->uc_mcontext.regs[19],
- (unsigned long) uc->uc_mcontext.regs[20],
- (unsigned long) uc->uc_mcontext.regs[21],
- (unsigned long) uc->uc_mcontext.regs[22],
- (unsigned long) uc->uc_mcontext.regs[23],
- (unsigned long) uc->uc_mcontext.regs[24],
- (unsigned long) uc->uc_mcontext.regs[25],
- (unsigned long) uc->uc_mcontext.regs[26],
- (unsigned long) uc->uc_mcontext.regs[27],
- (unsigned long) uc->uc_mcontext.regs[28],
- (unsigned long) uc->uc_mcontext.regs[29],
- (unsigned long) uc->uc_mcontext.regs[30],
- (unsigned long) uc->uc_mcontext.pc,
- (unsigned long) uc->uc_mcontext.sp,
- (unsigned long) uc->uc_mcontext.pstate,
- (unsigned long) uc->uc_mcontext.fault_address
- );
- logStackContent((void**)uc->uc_mcontext.sp);
#else
serverLog(LL_WARNING,
" Dumping of registers not supported for this OS/arch");
diff --git a/src/expire.c b/src/expire.c
index 5aff72ee0..c102a01ff 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -590,6 +590,7 @@ void pttlCommand(client *c) {
void persistCommand(client *c) {
if (lookupKeyWrite(c->db,c->argv[1])) {
if (removeExpire(c->db,c->argv[1])) {
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id);
addReply(c,shared.cone);
server.dirty++;
} else {
diff --git a/src/module.c b/src/module.c
index 61dc25169..4d3d9e1af 100644
--- a/src/module.c
+++ b/src/module.c
@@ -2157,7 +2157,7 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
- setKey(key->db,key->key,str);
+ genericSetKey(key->db,key->key,str,0,0);
key->value = str;
return REDISMODULE_OK;
}
@@ -2237,7 +2237,7 @@ int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
if (key->value == NULL) {
/* Empty key: create it with the new size. */
robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
- setKey(key->db,key->key,o);
+ genericSetKey(key->db,key->key,o,0,0);
key->value = o;
decrRefCount(o);
} else {
@@ -3625,7 +3625,7 @@ int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
RM_DeleteKey(key);
robj *o = createModuleObject(mt,value);
- setKey(key->db,key->key,o);
+ genericSetKey(key->db,key->key,o,0,0);
decrRefCount(o);
key->value = o;
return REDISMODULE_OK;
@@ -4393,14 +4393,17 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
* can really be unblocked, since the module was able to serve the client.
* If the callback returns REDISMODULE_OK, then the client can be unblocked,
* otherwise the client remains blocked and we'll retry again when one of
- * the keys it blocked for becomes "ready" again. */
+ * the keys it blocked for becomes "ready" again.
+ * This function returns 1 if client was served (and should be unblocked) */
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+
/* Protect against re-processing: don't serve clients that are already
* in the unblocking list for any reason (including RM_UnblockClient()
- * explicit call). */
- if (bc->unblocked) return REDISMODULE_ERR;
+ * explicit call). See #6798. */
+ if (bc->unblocked) return 0;
+
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_ready_key = key;
@@ -5975,7 +5978,7 @@ sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int
struct RedisModule *module = dictGetVal(de);
if (!module->info_cb)
continue;
- RedisModuleInfoCtx info_ctx = {module, section, info, sections, 0};
+ RedisModuleInfoCtx info_ctx = {module, section, info, sections, 0, 0};
module->info_cb(&info_ctx, for_crash_report);
/* Implicitly end dicts (no way to handle errors, and we must add the newline). */
if (info_ctx.in_dict_field)
diff --git a/src/networking.c b/src/networking.c
index 3c754c376..85c640e34 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -2326,6 +2326,25 @@ NULL
return;
}
+ if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT)
+ {
+ addReplyError(c,
+ "You can't specify both OPTIN mode and OPTOUT mode");
+ zfree(prefix);
+ return;
+ }
+
+ if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) ||
+ (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN))
+ {
+ addReplyError(c,
+ "You can't switch OPTIN/OPTOUT mode before disabling "
+ "tracking for this client, and then re-enabling it with "
+ "a different mode.");
+ zfree(prefix);
+ return;
+ }
+
enableTracking(c,redir,options,prefix,numprefix);
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
disableTracking(c);
diff --git a/src/quicklist.c b/src/quicklist.c
index ae183ffd8..52e3988f5 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -110,7 +110,7 @@ quicklist *quicklistCreate(void) {
return quicklist;
}
-#define COMPRESS_MAX (1 << QL_COMP_BITS)
+#define COMPRESS_MAX ((1 << QL_COMP_BITS)-1)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
if (compress > COMPRESS_MAX) {
compress = COMPRESS_MAX;
@@ -120,7 +120,7 @@ void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
quicklist->compress = compress;
}
-#define FILL_MAX (1 << (QL_FILL_BITS-1))
+#define FILL_MAX ((1 << (QL_FILL_BITS-1))-1)
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
diff --git a/src/rdb.c b/src/rdb.c
index 5d34f5a32..fc8911979 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1002,8 +1002,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
* the rdbSaveObject() function. Currently we use a trick to get
* this length with very little changes to the code. In the future
* we could switch to a faster solution. */
-size_t rdbSavedObjectLen(robj *o) {
- ssize_t len = rdbSaveObject(NULL,o,NULL);
+size_t rdbSavedObjectLen(robj *o, robj *key) {
+ ssize_t len = rdbSaveObject(NULL,o,key);
serverAssertWithInfo(NULL,o,len != -1);
return len;
}
diff --git a/src/rdb.h b/src/rdb.h
index 4229beea8..b276a978b 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -143,7 +143,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
-size_t rdbSavedObjectLen(robj *o);
+size_t rdbSavedObjectLen(robj *o, robj *key);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
diff --git a/src/redismodule.h b/src/redismodule.h
index d26c41456..23b4d26e0 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -470,7 +470,11 @@ RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(Re
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromDouble)(RedisModuleCtx *ctx, double d);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
+#ifdef __GNUC__
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
+#else
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
+#endif
void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str);
const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err);
@@ -554,8 +558,13 @@ void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long do
long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io);
void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt);
+#ifdef __GNUC__
+void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...) __attribute__ ((format (printf, 3, 4)));
+void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) __attribute__ ((format (printf, 3, 4)));
+#else
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
+#endif
void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line);
void REDISMODULE_API_FUNC(RedisModule_LatencyAddSample)(const char *event, mstime_t latency);
int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
diff --git a/src/scripting.c b/src/scripting.c
index 7f64e06db..32a511e13 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -657,12 +657,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
!server.loading && /* Don't care about mem if loading. */
!server.masterhost && /* Slave must execute the script. */
server.lua_write_dirty == 0 && /* Script had no side effects so far. */
+ server.lua_oom && /* Detected OOM when script start. */
(cmd->flags & CMD_DENYOOM))
{
- if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) {
- luaPushError(lua, shared.oomerr->ptr);
- goto cleanup;
- }
+ luaPushError(lua, shared.oomerr->ptr);
+ goto cleanup;
}
if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1;
diff --git a/src/server.c b/src/server.c
index d06a9e29a..56feb09a3 100644
--- a/src/server.c
+++ b/src/server.c
@@ -672,15 +672,15 @@ struct redisCommand redisCommandTable[] = {
0,NULL,1,1,1,0,0,0},
{"multi",multiCommand,1,
- "no-script fast @transaction",
+ "no-script fast ok-loading ok-stale @transaction",
0,NULL,0,0,0,0,0,0},
{"exec",execCommand,1,
- "no-script no-monitor no-slowlog @transaction",
+ "no-script no-monitor no-slowlog ok-loading ok-stale @transaction",
0,NULL,0,0,0,0,0,0},
{"discard",discardCommand,1,
- "no-script fast @transaction",
+ "no-script fast ok-loading ok-stale @transaction",
0,NULL,0,0,0,0,0,0},
{"sync",syncCommand,1,
@@ -947,11 +947,11 @@ struct redisCommand redisCommandTable[] = {
0,NULL,1,1,1,0,0,0},
{"xread",xreadCommand,-4,
- "read-only no-script @stream @blocking",
+ "read-only @stream @blocking",
0,xreadGetKeys,1,1,1,0,0,0},
{"xreadgroup",xreadCommand,-7,
- "write no-script @stream @blocking",
+ "write @stream @blocking",
0,xreadGetKeys,1,1,1,0,0,0},
{"xgroup",xgroupCommand,-2,
@@ -3445,6 +3445,13 @@ int processCommand(client *c) {
addReply(c, shared.oomerr);
return C_OK;
}
+
+ /* Save out_of_memory result at script start, otherwise if we check OOM
+ * untill first write within script, memory used by lua stack and
+ * arguments might interfere. */
+ if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
+ server.lua_oom = out_of_memory;
+ }
}
/* Make sure to use a reasonable amount of memory for client side
diff --git a/src/server.h b/src/server.h
index 1472bcee7..b17995948 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1392,6 +1392,7 @@ struct redisServer {
execution. */
int lua_kill; /* Kill the script if true. */
int lua_always_replicate_commands; /* Default replication type. */
+ int lua_oom; /* OOM detected when script start? */
/* Lazy free */
int lazyfree_lazy_eviction;
int lazyfree_lazy_expire;
@@ -2057,7 +2058,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
-void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl);
+void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db);
diff --git a/src/t_stream.c b/src/t_stream.c
index 557d1d642..e0af87f97 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -935,7 +935,6 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si;
int64_t numfields;
streamID id;
- int propagate_last_id = 0;
/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
@@ -951,6 +950,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
+ int propagate_last_id = 0;
+
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id;
@@ -1373,6 +1374,11 @@ void xreadCommand(client *c) {
int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
+ if (c->flags & CLIENT_LUA) {
+ /* There is no sense to use BLOCK option within LUA */
+ addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
+ return;
+ }
i++;
if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
UNIT_MILLISECONDS) != C_OK) return;
@@ -1921,11 +1927,21 @@ void xackCommand(client *c) {
return;
}
+ /* Start parsing the IDs, so that we abort ASAP if there is a syntax
+ * error: the return value of this command cannot be an error in case
+ * the client successfully acknowledged some messages, so it should be
+ * executed in a "all or nothing" fashion. */
+ for (int j = 3; j < c->argc; j++) {
+ streamID id;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ }
+
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
- if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
+ if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
+ serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL: it will have a reference to the
diff --git a/src/t_string.c b/src/t_string.c
index 4e693cefa..335bda404 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -84,7 +84,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
- genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL);
+ genericSetKey(c->db,key,val,flags & OBJ_SET_KEEPTTL,1);
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
diff --git a/src/util.c b/src/util.c
index 2be42a0df..bd8f0fb98 100644
--- a/src/util.c
+++ b/src/util.c
@@ -602,6 +602,10 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
}
if (*p == '.') l--;
}
+ if (l == 2 && buf[0] == '-' && buf[1] == '0') {
+ buf[0] = '0';
+ l = 1;
+ }
break;
default: return 0; /* Invalid mode. */
}
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c
index 10dc65b1a..94f31d455 100644
--- a/tests/modules/blockonkeys.c
+++ b/tests/modules/blockonkeys.c
@@ -109,41 +109,33 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
fsl->list[fsl->length++] = ele;
-
- if (fsl->length >= 2)
- RedisModule_SignalKeyAsReady(ctx, argv[1]);
+ RedisModule_SignalKeyAsReady(ctx, argv[1]);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
-int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
fsl_t *fsl;
- if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
- return REDISMODULE_ERR;
-
- if (!fsl || fsl->length < 2)
+ if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
return REDISMODULE_ERR;
- RedisModule_ReplyWithArray(ctx, 2);
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
return REDISMODULE_OK;
}
-int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
}
-
-/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
+/* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
* When that happens, unblock client and pop the last two elements (from the right). */
-int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3)
return RedisModule_WrongArity(ctx);
@@ -155,13 +147,10 @@ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
return REDISMODULE_OK;
- if (!fsl || fsl->length < 2) {
- /* Key is empty or has <2 elements, we must block */
- RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
+ if (!fsl) {
+ RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
NULL, timeout, &argv[1], 1, NULL);
} else {
- RedisModule_ReplyWithArray(ctx, 2);
- RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
}
@@ -175,10 +164,10 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
fsl_t *fsl;
- if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
+ if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
return REDISMODULE_ERR;
- if (!fsl || fsl->list[fsl->length-1] <= *pgt)
+ if (fsl->list[fsl->length-1] <= *pgt)
return REDISMODULE_ERR;
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
@@ -218,7 +207,6 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
long long *pgt = RedisModule_Alloc(sizeof(long long));
*pgt = gt;
- /* Key is empty or has <2 elements, we must block */
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
} else {
@@ -228,6 +216,88 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK;
}
+int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
+ RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
+
+ fsl_t *src;
+ if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
+ return REDISMODULE_ERR;
+
+ fsl_t *dst;
+ if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
+ return REDISMODULE_ERR;
+
+ long long ele = src->list[--src->length];
+ dst->list[dst->length++] = ele;
+ RedisModule_SignalKeyAsReady(ctx, dst_keyname);
+ return RedisModule_ReplyWithLongLong(ctx, ele);
+}
+
+int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
+}
+
+void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
+ RedisModule_FreeString(ctx, privdata);
+}
+
+/* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
+ * When that happens, unblock client, pop the last element from <src> and push it to <dst>
+ * (from the right). */
+int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 4)
+ return RedisModule_WrongArity(ctx);
+
+ long long timeout;
+ if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
+ return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
+
+ fsl_t *src;
+ if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
+ return REDISMODULE_OK;
+
+ if (!src) {
+ /* Retain string for reply callback */
+ RedisModule_RetainString(ctx, argv[2]);
+ /* Key is empty, we must block */
+ RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
+ bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
+ } else {
+ fsl_t *dst;
+ if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
+ return REDISMODULE_OK;
+ long long ele = src->list[--src->length];
+ dst->list[dst->length++] = ele;
+ RedisModule_SignalKeyAsReady(ctx, argv[2]);
+ RedisModule_ReplyWithLongLong(ctx, ele);
+ }
+
+ return REDISMODULE_OK;
+}
+
+/* FSL.GETALL <key> - Reply with an array containing all elements. */
+int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2)
+ return RedisModule_WrongArity(ctx);
+
+ fsl_t *fsl;
+ if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
+ return REDISMODULE_OK;
+
+ if (!fsl)
+ return RedisModule_ReplyWithArray(ctx, 0);
+
+ RedisModule_ReplyWithArray(ctx, fsl->length);
+ for (int i = 0; i < fsl->length; i++)
+ RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -252,11 +322,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
+ if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index 400017c5f..d086366dc 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -159,12 +159,9 @@ proc start_server {options {code undefined}} {
if {$::external} {
if {[llength $::servers] == 0} {
set srv {}
- # In test_server_main(tests/test_helper.tcl:215~218), increase the value of start_port
- # and assign it to ::port through the `--port` option, so we need to reduce it.
- set baseport [expr {$::port-100}]
dict set srv "host" $::host
- dict set srv "port" $baseport
- set client [redis $::host $baseport 0 $::tls]
+ dict set srv "port" $::port
+ set client [redis $::host $::port 0 $::tls]
dict set srv "client" $client
$client select 9
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 5cb43104b..d80cb6907 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -212,13 +212,19 @@ proc test_server_main {} {
# Start the client instances
set ::clients_pids {}
- set start_port [expr {$::port+100}]
- for {set j 0} {$j < $::numclients} {incr j} {
- set start_port [find_available_port $start_port]
+ if {$::external} {
set p [exec $tclsh [info script] {*}$::argv \
- --client $port --port $start_port &]
+ --client $port --port $::port &]
lappend ::clients_pids $p
- incr start_port 10
+ } else {
+ set start_port [expr {$::port+100}]
+ for {set j 0} {$j < $::numclients} {incr j} {
+ set start_port [find_available_port $start_port]
+ set p [exec $tclsh [info script] {*}$::argv \
+ --client $port --port $start_port &]
+ lappend ::clients_pids $p
+ incr start_port 10
+ }
}
# Setup global state for the test server
@@ -506,9 +512,6 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--host}} {
set ::external 1
set ::host $arg
- # If we use an external server, we can only set numclients to 1,
- # otherwise the port will be miscalculated.
- set ::numclients 1
incr j
} elseif {$opt eq {--port}} {
set ::port $arg
diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl
index b380227e0..5e5d93da3 100644
--- a/tests/unit/moduleapi/blockonkeys.tcl
+++ b/tests/unit/moduleapi/blockonkeys.tcl
@@ -3,37 +3,70 @@ set testmodule [file normalize tests/modules/blockonkeys.so]
start_server {tags {"modules"}} {
r module load $testmodule
+ test "Module client blocked on keys: Circular BPOPPUSH" {
+ set rd1 [redis_deferring_client]
+ set rd2 [redis_deferring_client]
+
+ r del src dst
+
+ $rd1 fsl.bpoppush src dst 0
+ $rd2 fsl.bpoppush dst src 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {2}
+ } else {
+ fail "Clients are not blocked"
+ }
+
+ r fsl.push src 42
+
+ assert_equal {42} [r fsl.getall src]
+ assert_equal {} [r fsl.getall dst]
+ }
+
+ test "Module client blocked on keys: Self-referential BPOPPUSH" {
+ set rd1 [redis_deferring_client]
+
+ r del src
+
+ $rd1 fsl.bpoppush src src 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
+ r fsl.push src 42
+
+ assert_equal {42} [r fsl.getall src]
+ }
+
test {Module client blocked on keys (no metadata): No block} {
r del k
r fsl.push k 33
r fsl.push k 34
- r fsl.bpop2 k 0
- } {34 33}
+ r fsl.bpop k 0
+ } {34}
test {Module client blocked on keys (no metadata): Timeout} {
r del k
set rd [redis_deferring_client]
- r fsl.push k 33
- $rd fsl.bpop2 k 1
+ $rd fsl.bpop k 1
assert_equal {Request timedout} [$rd read]
}
- test {Module client blocked on keys (no metadata): Blocked, case 1} {
+ test {Module client blocked on keys (no metadata): Blocked} {
r del k
set rd [redis_deferring_client]
- r fsl.push k 33
- $rd fsl.bpop2 k 0
+ $rd fsl.bpop k 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r fsl.push k 34
- assert_equal {34 33} [$rd read]
- }
-
- test {Module client blocked on keys (no metadata): Blocked, case 2} {
- r del k
- set rd [redis_deferring_client]
- r fsl.push k 33
- r fsl.push k 34
- $rd fsl.bpop2 k 0
- assert_equal {34 33} [$rd read]
+ assert_equal {34} [$rd read]
}
test {Module client blocked on keys (with metadata): No block} {
@@ -60,6 +93,12 @@ start_server {tags {"modules"}} {
set cid [$rd read]
r fsl.push k 33
$rd fsl.bpopgt k 33 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r fsl.push k 34
assert_equal {34} [$rd read]
r client kill id $cid ;# try to smoke-out client-related memory leak
@@ -69,6 +108,12 @@ start_server {tags {"modules"}} {
r del k
set rd [redis_deferring_client]
$rd fsl.bpopgt k 35 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r fsl.push k 33
r fsl.push k 34
r fsl.push k 35
@@ -82,6 +127,12 @@ start_server {tags {"modules"}} {
$rd client id
set cid [$rd read]
$rd fsl.bpopgt k 35 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r client kill id $cid ;# try to smoke-out client-related memory leak
}
@@ -91,6 +142,12 @@ start_server {tags {"modules"}} {
$rd client id
set cid [$rd read]
$rd fsl.bpopgt k 35 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r client unblock $cid timeout ;# try to smoke-out client-related memory leak
assert_equal {Request timedout} [$rd read]
}
@@ -101,6 +158,12 @@ start_server {tags {"modules"}} {
$rd client id
set cid [$rd read]
$rd fsl.bpopgt k 35 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r client unblock $cid error ;# try to smoke-out client-related memory leak
assert_error "*unblocked*" {$rd read}
}
@@ -108,13 +171,18 @@ start_server {tags {"modules"}} {
test {Module client blocked on keys does not wake up on wrong type} {
r del k
set rd [redis_deferring_client]
- $rd fsl.bpop2 k 0
+ $rd fsl.bpop k 0
+ ;# wait until clients are actually blocked
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Clients are not blocked"
+ }
r lpush k 12
r lpush k 13
r lpush k 14
r del k
- r fsl.push k 33
r fsl.push k 34
- assert_equal {34 33} [$rd read]
+ assert_equal {34} [$rd read]
}
}
diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl
index fb36d0b80..8b364b287 100644
--- a/tests/unit/scripting.tcl
+++ b/tests/unit/scripting.tcl
@@ -146,6 +146,17 @@ start_server {tags {"scripting"}} {
set e
} {*not allowed*}
+ test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} {
+ r del s
+ r xgroup create s g $ MKSTREAM
+ set res [r eval {return redis.pcall('xread','STREAMS','s','$')} 1 s]
+ assert {$res eq {}}
+ assert_error "*xread command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xread','BLOCK',0,'STREAMS','s','$')} 1 s}
+ set res [r eval {return redis.pcall('xreadgroup','group','g','c','STREAMS','s','>')} 1 s]
+ assert {$res eq {}}
+ assert_error "*xreadgroup command is not allowed with BLOCK option from scripts" {r eval {return redis.pcall('xreadgroup','group','g','c','BLOCK',0,'STREAMS','s','>')} 1 s}
+ }
+
test {EVAL - Scripts can't run certain commands} {
set e {}
r debug lua-always-replicate-commands 0
diff --git a/tests/unit/type/incr.tcl b/tests/unit/type/incr.tcl
index a58710d39..b7a135203 100644
--- a/tests/unit/type/incr.tcl
+++ b/tests/unit/type/incr.tcl
@@ -151,4 +151,11 @@ start_server {tags {"incr"}} {
catch {r incrbyfloat foo 1} err
format $err
} {ERR*valid*}
+
+ test {No negative zero} {
+ r del foo
+ r incrbyfloat foo [expr double(1)/41]
+ r incrbyfloat foo [expr double(-1)/41]
+ r get foo
+ } {0}
}
diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl
index a27e1f582..04661707b 100644
--- a/tests/unit/type/stream-cgroups.tcl
+++ b/tests/unit/type/stream-cgroups.tcl
@@ -93,6 +93,18 @@ start_server {
assert {[r XACK mystream mygroup $id1 $id2] eq 1}
}
+ test {XACK should fail if got at least one invalid ID} {
+ r del mystream
+ r xgroup create s g $ MKSTREAM
+ r xadd s * f1 v1
+ set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]]
+ assert {$c == 1}
+ set pending [r xpending s g - + 10 c]
+ set id1 [lindex $pending 0 0]
+ assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id}
+ assert {[r xack s g $id1] eq 1}
+ }
+
test {PEL NACK reassignment after XGROUP SETID event} {
r del events
r xadd events * f1 v1