diff options
author | perryitay <85821686+perryitay@users.noreply.github.com> | 2022-01-20 09:05:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-20 09:05:53 +0200 |
commit | c4b788230ca034761a0e9f6ca35b4aee4b15d340 (patch) | |
tree | 50b89484548307841d799786a75e6cc79a5d39d8 /tests/modules | |
parent | 22172a4aa648374d7076b179dab18de09f72fd52 (diff) | |
download | redis-c4b788230ca034761a0e9f6ca35b4aee4b15d340.tar.gz |
Adding module api for processing commands during busy jobs and allow flagging the commands that should be handled at this status (#9963)
Some modules might perform a long-running logic in different stages of Redis lifetime, for example:
* command execution
* RDB loading
* thread safe context
During this long-running logic Redis is not responsive.
This PR offers
1. An API to process events while a busy command is running (`RM_Yield`)
2. A new flag (`ALLOW_BUSY`) to mark the commands that should be handled during busy
jobs which can also be used by modules (`allow-busy`)
3. In slow commands and thread safe contexts, this flag will start rejecting commands with -BUSY only
after `busy-reply-threshold`
4. During loading (`rdb_load` callback), it'll process events right away (not wait for `busy-reply-threshold`),
but either way, the processing is throttled to the server hz rate.
5. Allow modules to Yield to redis background tasks, but not to client commands
* rename `script-time-limit` to `busy-reply-threshold` (an alias to the pre-7.0 `lua-time-limit`)
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'tests/modules')
-rw-r--r-- | tests/modules/blockedclient.c | 98 | ||||
-rw-r--r-- | tests/modules/datatype.c | 57 |
2 files changed, 155 insertions, 0 deletions
diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 3e97d6779..a2d7c6d00 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -1,3 +1,8 @@ +/* define macros for having usleep */ +#define _BSD_SOURCE +#define _DEFAULT_SOURCE +#include <unistd.h> + #include "redismodule.h" #include <assert.h> #include <stdio.h> @@ -5,6 +10,10 @@ #define UNUSED(V) ((void) V) +/* used to test processing events during slow bg operation */ +static volatile int g_slow_bg_operation = 0; +static volatile int g_is_in_slow_bg_operation = 0; + void *sub_worker(void *arg) { // Get Redis module context RedisModuleCtx *ctx = (RedisModuleCtx *)arg; @@ -99,6 +108,16 @@ void *bg_call_worker(void *arg) { // Acquire GIL RedisModule_ThreadSafeContextLock(ctx); + // Test slow operation yielding + if (g_slow_bg_operation) { + g_is_in_slow_bg_operation = 1; + while (g_slow_bg_operation) { + RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation"); + usleep(1000); + } + g_is_in_slow_bg_operation = 0; + } + // Call the command const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL); RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2); @@ -203,6 +222,73 @@ int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } + +/* this flag is used to work with busy commands, that might take a while + * and ability to stop the busy work with a different command*/ +static volatile int abort_flag = 0; + +int slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + long long block_time = 0; + if (RedisModule_StringToLongLong(argv[1], &block_time) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integer value"); + return REDISMODULE_OK; + } + + uint64_t start_time = RedisModule_MonotonicMicroseconds(); + /* when not blocking indefinitely, we don't process client commands in this test. */ + int yield_flags = block_time? REDISMODULE_YIELD_FLAG_NONE: REDISMODULE_YIELD_FLAG_CLIENTS; + while (!abort_flag) { + RedisModule_Yield(ctx, yield_flags, "Slow module operation"); + usleep(1000); + if (block_time && RedisModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time) + break; + } + + abort_flag = 0; + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int stop_slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + abort_flag = 1; + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +/* used to enable or disable slow operation in do_bg_rm_call */ +static int set_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + long long ll; + if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integer value"); + return REDISMODULE_OK; + } + g_slow_bg_operation = ll; + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* used to test if we reached the slow operation in do_bg_rm_call */ +static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + if (argc != 1) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -223,5 +309,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c index d763a2412..45a356e52 100644 --- a/tests/modules/datatype.c +++ b/tests/modules/datatype.c @@ -2,11 +2,20 @@ * for general ModuleDataType coverage. */ +/* define macros for having usleep */ +#define _BSD_SOURCE +#define _DEFAULT_SOURCE +#include <unistd.h> + #include "redismodule.h" static RedisModuleType *datatype = NULL; static int load_encver = 0; +/* used to test processing events during slow loading */ +static volatile int slow_loading = 0; +static volatile int is_in_slow_loading = 0; + #define DATATYPE_ENC_VER 1 typedef struct { @@ -25,6 +34,17 @@ static void *datatype_load(RedisModuleIO *io, int encver) { DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType)); dt->intval = intval; dt->strval = strval; + + if (slow_loading) { + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(io); + is_in_slow_loading = 1; + while (slow_loading) { + RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation"); + usleep(1000); + } + is_in_slow_loading = 0; + } + return dt; } @@ -185,6 +205,35 @@ static int datatype_swap(RedisModuleCtx *ctx, RedisModuleString **argv, int argc return REDISMODULE_OK; } +/* used to enable or disable slow loading */ +static int datatype_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + long long ll; + if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integer value"); + return REDISMODULE_OK; + } + slow_loading = ll; + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* used to test if we reached the slow loading code */ +static int datatype_is_in_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + if (argc != 1) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithLongLong(ctx, is_in_slow_loading); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -224,5 +273,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 1, 1, 1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "datatype.slow_loading", datatype_slow_loading, + "allow-loading", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "datatype.is_in_slow_loading", datatype_is_in_slow_loading, + "allow-loading", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } |