diff options
Diffstat (limited to 'tests/modules/blockonkeys.c')
-rw-r--r-- | tests/modules/blockonkeys.c | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 94f31d455..b7ab977e9 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -2,6 +2,7 @@ #include "redismodule.h" #include <string.h> +#include <strings.h> #include <assert.h> #include <unistd.h> @@ -65,6 +66,8 @@ int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int creat RedisModule_CloseKey(key); if (reply_on_failure) RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + RedisModuleCallReply *reply = RedisModule_Call(ctx, "INCR", "c", "fsl_wrong_type"); + RedisModule_FreeCallReply(reply); return 0; } @@ -298,6 +301,154 @@ int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +/* Callback for blockonkeys_popall */ +int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argc); + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) { + RedisModuleString *elem; + long len = 0; + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) { + len++; + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + RedisModule_ReplySetArrayLength(ctx, len); + } else { + RedisModule_ReplyWithError(ctx, "ERR Not a list"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithError(ctx, "ERR Timeout"); +} + +/* BLOCKONKEYS.POPALL key + * + * Blocks on an empty key for up to 3 seconds. When unblocked by a list + * operation like LPUSH, all the elements are popped and returned. Fails with an + * error on timeout. */ +int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback, + blockonkeys_popall_timeout_callback, + NULL, 3000, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithError(ctx, "ERR Key not empty"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* BLOCKONKEYS.LPUSH key val [val ..] + * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..] + * + * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used, + * RM_SignalKeyAsReady() is also called. */ +int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 3) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) { + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } else { + for (int i = 2; i < argc; i++) { + if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD, + argv[i]) != REDISMODULE_OK) { + RedisModule_CloseKey(key); + return RedisModule_ReplyWithError(ctx, "ERR Push failed"); + } + } + } + RedisModule_CloseKey(key); + + /* signal key as ready if the command is lpush_unblock */ + size_t len; + const char *str = RedisModule_StringPtrLen(argv[0], &len); + if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) { + RedisModule_SignalKeyAsReady(ctx, argv[1]); + } + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +/* Callback for the BLOCKONKEYS.BLPOPN command */ +int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argc); + long long n; + RedisModule_StringToLongLong(argv[2], &n); + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + int result; + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST && + RedisModule_ValueLength(key) >= (size_t)n) { + RedisModule_ReplyWithArray(ctx, n); + for (long i = 0; i < n; i++) { + RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD); + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + result = REDISMODULE_OK; + } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST || + RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { + /* continue blocking */ + result = REDISMODULE_ERR; + } else { + result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + RedisModule_CloseKey(key); + return result; +} + +int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithError(ctx, "ERR Timeout"); +} + +/* BLOCKONKEYS.BLPOPN key N + * + * Blocks until key has N elements and then pops them or fails after 3 seconds. + */ +int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 3) return RedisModule_WrongArity(ctx); + + long long n; + if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR Invalid N"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + int keytype = RedisModule_KeyType(key); + if (keytype != REDISMODULE_KEYTYPE_EMPTY && + keytype != REDISMODULE_KEYTYPE_LIST) { + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } else if (keytype == REDISMODULE_KEYTYPE_LIST && + RedisModule_ValueLength(key) >= (size_t)n) { + RedisModule_ReplyWithArray(ctx, n); + for (long i = 0; i < n; i++) { + RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD); + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + } else { + RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback, + blockonkeys_blpopn_timeout_callback, + NULL, 3000, &argv[1], 1, NULL); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -334,5 +485,21 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } |