summaryrefslogtreecommitdiff
path: root/tests/modules
diff options
context:
space:
mode:
authorViktor Söderqvist <viktor.soderqvist@est.tech>2021-01-22 15:19:37 +0100
committerGitHub <noreply@github.com>2021-01-22 16:19:37 +0200
commit9c1483100a1d0a9160951852d70e632fced179fa (patch)
treee72e7ce05fa8fc6921a3402bd62d8d1b928ba48c /tests/modules
parent8449a5df87017361000919d372b520a897c2de07 (diff)
downloadredis-9c1483100a1d0a9160951852d70e632fced179fa.tar.gz
Test that module can wake up module blocked on non-empty list key (#8382)
BLPOP and other blocking list commands can only block on empty keys and LPUSH only wakes up clients when the list is created. Using the module API, it's possible to block on a non-empty key. Unblocking a client blocked on a non-empty list (or zset) can only be done using RedisModule_SignalKeyAsReady(). This commit tests it.
Diffstat (limited to 'tests/modules')
-rw-r--r--tests/modules/blockonkeys.c89
1 files changed, 88 insertions, 1 deletions
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c
index 6e4b5b79c..2b7b614a9 100644
--- a/tests/modules/blockonkeys.c
+++ b/tests/modules/blockonkeys.c
@@ -2,6 +2,7 @@
#include "redismodule.h"
#include <string.h>
+#include <strings.h>
#include <assert.h>
#include <unistd.h>
@@ -347,7 +348,11 @@ int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
-/* A module equivalent of LPUSH */
+/* BLOCKONKEYS.LPUSH key val [val ..]
+ * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..]
+ *
+ * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used,
+ * RM_SignalKeyAsReady() is also called. */
int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3)
return RedisModule_WrongArity(ctx);
@@ -366,9 +371,83 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
}
}
RedisModule_CloseKey(key);
+
+ /* signal key as ready if the command is lpush_unblock */
+ size_t len;
+ const char *str = RedisModule_StringPtrLen(argv[0], &len);
+ if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
+ RedisModule_SignalKeyAsReady(ctx, argv[1]);
+ }
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
+/* Callback for the BLOCKONKEYS.BLPOPN command */
+int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argc);
+ long long n;
+ RedisModule_StringToLongLong(argv[2], &n);
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ int result;
+ if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST &&
+ RedisModule_ValueLength(key) >= (size_t)n) {
+ RedisModule_ReplyWithArray(ctx, n);
+ for (long i = 0; i < n; i++) {
+ RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
+ RedisModule_ReplyWithString(ctx, elem);
+ RedisModule_FreeString(ctx, elem);
+ }
+ result = REDISMODULE_OK;
+ } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
+ RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
+ /* continue blocking */
+ result = REDISMODULE_ERR;
+ } else {
+ result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ }
+ RedisModule_CloseKey(key);
+ return result;
+}
+
+int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithError(ctx, "ERR Timeout");
+}
+
+/* BLOCKONKEYS.BLPOPN key N
+ *
+ * Blocks until key has N elements and then pops them or fails after 3 seconds.
+ */
+int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc < 3) return RedisModule_WrongArity(ctx);
+
+ long long n;
+ if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ int keytype = RedisModule_KeyType(key);
+ if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
+ keytype != REDISMODULE_KEYTYPE_LIST) {
+ RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ } else if (keytype == REDISMODULE_KEYTYPE_LIST &&
+ RedisModule_ValueLength(key) >= (size_t)n) {
+ RedisModule_ReplyWithArray(ctx, n);
+ for (long i = 0; i < n; i++) {
+ RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
+ RedisModule_ReplyWithString(ctx, elem);
+ RedisModule_FreeString(ctx, elem);
+ }
+ } else {
+ RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
+ blockonkeys_blpopn_timeout_callback,
+ NULL, 3000, &argv[1], 1, NULL);
+ }
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -413,5 +492,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}