summaryrefslogtreecommitdiff
path: root/tests/modules/blockonkeys.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/modules/blockonkeys.c')
-rw-r--r--tests/modules/blockonkeys.c167
1 files changed, 167 insertions, 0 deletions
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c
index 94f31d455..b7ab977e9 100644
--- a/tests/modules/blockonkeys.c
+++ b/tests/modules/blockonkeys.c
@@ -2,6 +2,7 @@
#include "redismodule.h"
#include <string.h>
+#include <strings.h>
#include <assert.h>
#include <unistd.h>
@@ -65,6 +66,8 @@ int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int creat
RedisModule_CloseKey(key);
if (reply_on_failure)
RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ RedisModuleCallReply *reply = RedisModule_Call(ctx, "INCR", "c", "fsl_wrong_type");
+ RedisModule_FreeCallReply(reply);
return 0;
}
@@ -298,6 +301,154 @@ int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK;
}
+/* Callback for blockonkeys_popall */
+int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argc);
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) {
+ RedisModuleString *elem;
+ long len = 0;
+ RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
+ while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) {
+ len++;
+ RedisModule_ReplyWithString(ctx, elem);
+ RedisModule_FreeString(ctx, elem);
+ }
+ RedisModule_ReplySetArrayLength(ctx, len);
+ } else {
+ RedisModule_ReplyWithError(ctx, "ERR Not a list");
+ }
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithError(ctx, "ERR Timeout");
+}
+
+/* BLOCKONKEYS.POPALL key
+ *
+ * Blocks on an empty key for up to 3 seconds. When unblocked by a list
+ * operation like LPUSH, all the elements are popped and returned. Fails with an
+ * error on timeout. */
+int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
+ if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
+ RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback,
+ blockonkeys_popall_timeout_callback,
+ NULL, 3000, &argv[1], 1, NULL);
+ } else {
+ RedisModule_ReplyWithError(ctx, "ERR Key not empty");
+ }
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+/* BLOCKONKEYS.LPUSH key val [val ..]
+ * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..]
+ *
+ * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used,
+ * RM_SignalKeyAsReady() is also called. */
+int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc < 3)
+ return RedisModule_WrongArity(ctx);
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY &&
+ RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) {
+ RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ } else {
+ for (int i = 2; i < argc; i++) {
+ if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD,
+ argv[i]) != REDISMODULE_OK) {
+ RedisModule_CloseKey(key);
+ return RedisModule_ReplyWithError(ctx, "ERR Push failed");
+ }
+ }
+ }
+ RedisModule_CloseKey(key);
+
+ /* signal key as ready if the command is lpush_unblock */
+ size_t len;
+ const char *str = RedisModule_StringPtrLen(argv[0], &len);
+ if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
+ RedisModule_SignalKeyAsReady(ctx, argv[1]);
+ }
+ return RedisModule_ReplyWithSimpleString(ctx, "OK");
+}
+
+/* Callback for the BLOCKONKEYS.BLPOPN command */
+int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argc);
+ long long n;
+ RedisModule_StringToLongLong(argv[2], &n);
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ int result;
+ if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST &&
+ RedisModule_ValueLength(key) >= (size_t)n) {
+ RedisModule_ReplyWithArray(ctx, n);
+ for (long i = 0; i < n; i++) {
+ RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
+ RedisModule_ReplyWithString(ctx, elem);
+ RedisModule_FreeString(ctx, elem);
+ }
+ result = REDISMODULE_OK;
+ } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
+ RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
+ /* continue blocking */
+ result = REDISMODULE_ERR;
+ } else {
+ result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ }
+ RedisModule_CloseKey(key);
+ return result;
+}
+
+int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithError(ctx, "ERR Timeout");
+}
+
+/* BLOCKONKEYS.BLPOPN key N
+ *
+ * Blocks until key has N elements and then pops them or fails after 3 seconds.
+ */
+int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc < 3) return RedisModule_WrongArity(ctx);
+
+ long long n;
+ if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
+ }
+
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
+ int keytype = RedisModule_KeyType(key);
+ if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
+ keytype != REDISMODULE_KEYTYPE_LIST) {
+ RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
+ } else if (keytype == REDISMODULE_KEYTYPE_LIST &&
+ RedisModule_ValueLength(key) >= (size_t)n) {
+ RedisModule_ReplyWithArray(ctx, n);
+ for (long i = 0; i < n; i++) {
+ RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
+ RedisModule_ReplyWithString(ctx, elem);
+ RedisModule_FreeString(ctx, elem);
+ }
+ } else {
+ RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
+ blockonkeys_blpopn_timeout_callback,
+ NULL, 3000, &argv[1], 1, NULL);
+ }
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -334,5 +485,21 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn,
+ "", 1, 1, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}