summaryrefslogtreecommitdiff
path: root/tests/modules
diff options
context:
space:
mode:
authorperryitay <85821686+perryitay@users.noreply.github.com>2022-01-20 09:05:53 +0200
committerGitHub <noreply@github.com>2022-01-20 09:05:53 +0200
commitc4b788230ca034761a0e9f6ca35b4aee4b15d340 (patch)
tree50b89484548307841d799786a75e6cc79a5d39d8 /tests/modules
parent22172a4aa648374d7076b179dab18de09f72fd52 (diff)
downloadredis-c4b788230ca034761a0e9f6ca35b4aee4b15d340.tar.gz
Adding module api for processing commands during busy jobs and allow flagging the commands that should be handled at this status (#9963)
Some modules might perform a long-running logic in different stages of Redis lifetime, for example: * command execution * RDB loading * thread safe context During this long-running logic Redis is not responsive. This PR offers 1. An API to process events while a busy command is running (`RM_Yield`) 2. A new flag (`ALLOW_BUSY`) to mark the commands that should be handled during busy jobs which can also be used by modules (`allow-busy`) 3. In slow commands and thread safe contexts, this flag will start rejecting commands with -BUSY only after `busy-reply-threshold` 4. During loading (`rdb_load` callback), it'll process events right away (not wait for `busy-reply-threshold`), but either way, the processing is throttled to the server hz rate. 5. Allow modules to Yield to redis background tasks, but not to client commands * rename `script-time-limit` to `busy-reply-threshold` (an alias to the pre-7.0 `lua-time-limit`) Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'tests/modules')
-rw-r--r--tests/modules/blockedclient.c98
-rw-r--r--tests/modules/datatype.c57
2 files changed, 155 insertions, 0 deletions
diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c
index 3e97d6779..a2d7c6d00 100644
--- a/tests/modules/blockedclient.c
+++ b/tests/modules/blockedclient.c
@@ -1,3 +1,8 @@
+/* define macros for having usleep */
+#define _BSD_SOURCE
+#define _DEFAULT_SOURCE
+#include <unistd.h>
+
#include "redismodule.h"
#include <assert.h>
#include <stdio.h>
@@ -5,6 +10,10 @@
#define UNUSED(V) ((void) V)
+/* used to test processing events during slow bg operation */
+static volatile int g_slow_bg_operation = 0;
+static volatile int g_is_in_slow_bg_operation = 0;
+
void *sub_worker(void *arg) {
// Get Redis module context
RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
@@ -99,6 +108,16 @@ void *bg_call_worker(void *arg) {
// Acquire GIL
RedisModule_ThreadSafeContextLock(ctx);
+ // Test slow operation yielding
+ if (g_slow_bg_operation) {
+ g_is_in_slow_bg_operation = 1;
+ while (g_slow_bg_operation) {
+ RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
+ usleep(1000);
+ }
+ g_is_in_slow_bg_operation = 0;
+ }
+
// Call the command
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
@@ -203,6 +222,73 @@ int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK;
}
+
+/* this flag is used to work with busy commands, that might take a while
+ * and ability to stop the busy work with a different command*/
+static volatile int abort_flag = 0;
+
+int slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ long long block_time = 0;
+ if (RedisModule_StringToLongLong(argv[1], &block_time) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+
+ uint64_t start_time = RedisModule_MonotonicMicroseconds();
+ /* when not blocking indefinitely, we don't process client commands in this test. */
+ int yield_flags = block_time? REDISMODULE_YIELD_FLAG_NONE: REDISMODULE_YIELD_FLAG_CLIENTS;
+ while (!abort_flag) {
+ RedisModule_Yield(ctx, yield_flags, "Slow module operation");
+ usleep(1000);
+ if (block_time && RedisModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time)
+ break;
+ }
+
+ abort_flag = 0;
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+int stop_slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ abort_flag = 1;
+ RedisModule_ReplyWithLongLong(ctx, 1);
+ return REDISMODULE_OK;
+}
+
+/* used to enable or disable slow operation in do_bg_rm_call */
+static int set_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ long long ll;
+ if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+ g_slow_bg_operation = ll;
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* used to test if we reached the slow operation in do_bg_rm_call */
+static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ UNUSED(argv);
+ if (argc != 1) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -223,5 +309,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c
index d763a2412..45a356e52 100644
--- a/tests/modules/datatype.c
+++ b/tests/modules/datatype.c
@@ -2,11 +2,20 @@
* for general ModuleDataType coverage.
*/
+/* define macros for having usleep */
+#define _BSD_SOURCE
+#define _DEFAULT_SOURCE
+#include <unistd.h>
+
#include "redismodule.h"
static RedisModuleType *datatype = NULL;
static int load_encver = 0;
+/* used to test processing events during slow loading */
+static volatile int slow_loading = 0;
+static volatile int is_in_slow_loading = 0;
+
#define DATATYPE_ENC_VER 1
typedef struct {
@@ -25,6 +34,17 @@ static void *datatype_load(RedisModuleIO *io, int encver) {
DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType));
dt->intval = intval;
dt->strval = strval;
+
+ if (slow_loading) {
+ RedisModuleCtx *ctx = RedisModule_GetContextFromIO(io);
+ is_in_slow_loading = 1;
+ while (slow_loading) {
+ RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
+ usleep(1000);
+ }
+ is_in_slow_loading = 0;
+ }
+
return dt;
}
@@ -185,6 +205,35 @@ static int datatype_swap(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
return REDISMODULE_OK;
}
+/* used to enable or disable slow loading */
+static int datatype_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ long long ll;
+ if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+ slow_loading = ll;
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* used to test if we reached the slow loading code */
+static int datatype_is_in_slow_loading(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_ReplyWithLongLong(ctx, is_in_slow_loading);
+ return REDISMODULE_OK;
+}
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -224,5 +273,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"write", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "datatype.slow_loading", datatype_slow_loading,
+ "allow-loading", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "datatype.is_in_slow_loading", datatype_is_in_slow_loading,
+ "allow-loading", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}