summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorOzan Tezcan <ozantezcan@gmail.com>2022-01-18 14:10:07 +0300
committerGitHub <noreply@github.com>2022-01-18 13:10:07 +0200
commit99ab4236afb210dc118fad892210b9fbb369aa8e (patch)
tree2a3bb0d6279f066261ba7660e3bd44262cd183ec /tests
parent25e6d4d4597d6ca06503d5fa76af0e4e1b57302e (diff)
downloadredis-99ab4236afb210dc118fad892210b9fbb369aa8e.tar.gz
Add event loop support to the module API (#10001)
Modules can now register sockets/pipe to the Redis main thread event loop and do network operations asynchronously. Previously, modules had to maintain an event loop and another thread for asynchronous network operations. Also, if a module is calling API functions after doing some network operations, it had to synchronize its event loop thread's access with Redis main thread by locking the GIL, causing contention on the lock. After this commit, no synchronization is needed as module can operate in Redis main thread context. So, this commit may improve the performance for some use cases. Added three functions to the module API: * RedisModule_EventLoopAdd(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data) * RedisModule_EventLoopDel(int fd, int mask) * RedisModule_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data) - This function can be called from other threads to trigger callback on Redis main thread. Callback will be triggered only once. If Redis main thread is sleeping, this call will wake up the Redis main thread. Event loop callbacks are called by Redis main thread after locking the GIL. Inside callbacks, modules can operate as if they are holding the GIL. Added REDISMODULE_EVENT_EVENTLOOP event with two subevents: * REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP * REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP These events are for modules that want to participate in the before and after sleep action. e.g It might be useful to implement batching : Read data from the network, write all to a file in one go on BEFORE_SLEEP event.
Diffstat (limited to 'tests')
-rw-r--r--tests/modules/Makefile3
-rw-r--r--tests/modules/eventloop.c277
-rw-r--r--tests/unit/moduleapi/eventloop.tcl25
3 files changed, 304 insertions, 1 deletions
diff --git a/tests/modules/Makefile b/tests/modules/Makefile
index 5855a8970..ec8e6de89 100644
--- a/tests/modules/Makefile
+++ b/tests/modules/Makefile
@@ -50,7 +50,8 @@ TEST_MODULES = \
aclcheck.so \
list.so \
subcommands.so \
- reply.so
+ reply.so \
+ eventloop.so
.PHONY: all
diff --git a/tests/modules/eventloop.c b/tests/modules/eventloop.c
new file mode 100644
index 000000000..50d3bc052
--- /dev/null
+++ b/tests/modules/eventloop.c
@@ -0,0 +1,277 @@
+/* This module contains four tests :
+ * 1- test.sanity : Basic tests for argument validation mostly.
+ * 2- test.sendbytes : Creates a pipe and registers its fds to the event loop,
+ * one end of the pipe for read events and the other end for
+ * the write events. On writable event, data is written. On
+ * readable event data is read. Repeated until all data is
+ * received.
+ * 3- test.iteration : A test for BEFORE_SLEEP and AFTER_SLEEP callbacks.
+ * Counters are incremented each time these events are
+ * fired. They should be equal and increment monotonically.
+ * 4- test.oneshot : Test for oneshot API
+ */
+
+#define REDISMODULE_EXPERIMENTAL_API
+#include "redismodule.h"
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <memory.h>
+#include <errno.h>
+
+int fds[2];
+long long buf_size;
+char *src;
+long long src_offset;
+char *dst;
+long long dst_offset;
+
+RedisModuleBlockedClient *bc;
+RedisModuleCtx *reply_ctx;
+
+void onReadable(int fd, void *user_data, int mask) {
+ REDISMODULE_NOT_USED(mask);
+
+ RedisModule_Assert(strcmp(user_data, "userdataread") == 0);
+
+ while (1) {
+ int rd = read(fd, dst + dst_offset, buf_size - dst_offset);
+ if (rd <= 0)
+ return;
+ dst_offset += rd;
+
+ /* Received all bytes */
+ if (dst_offset == buf_size) {
+ if (memcmp(src, dst, buf_size) == 0)
+ RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
+ else
+ RedisModule_ReplyWithError(reply_ctx, "ERR bytes mismatch");
+
+ RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE);
+ RedisModule_EventLoopDel(fds[1], REDISMODULE_EVENTLOOP_WRITABLE);
+ RedisModule_Free(src);
+ RedisModule_Free(dst);
+ close(fds[0]);
+ close(fds[1]);
+
+ RedisModule_FreeThreadSafeContext(reply_ctx);
+ RedisModule_UnblockClient(bc, NULL);
+ return;
+ }
+ };
+}
+
+void onWritable(int fd, void *user_data, int mask) {
+ REDISMODULE_NOT_USED(user_data);
+ REDISMODULE_NOT_USED(mask);
+
+ RedisModule_Assert(strcmp(user_data, "userdatawrite") == 0);
+
+ while (1) {
+ /* Check if we sent all data */
+ if (src_offset >= buf_size)
+ return;
+ int written = write(fd, src + src_offset, buf_size - src_offset);
+ if (written <= 0) {
+ return;
+ }
+
+ src_offset += written;
+ };
+}
+
+/* Create a pipe(), register pipe fds to the event loop and send/receive data
+ * using them. */
+int sendbytes(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+
+ if (RedisModule_StringToLongLong(argv[1], &buf_size) != REDISMODULE_OK ||
+ buf_size == 0) {
+ RedisModule_ReplyWithError(ctx, "Invalid integer value");
+ return REDISMODULE_OK;
+ }
+
+ bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
+ reply_ctx = RedisModule_GetThreadSafeContext(bc);
+
+ /* Allocate source buffer and write some random data */
+ src = RedisModule_Calloc(1,buf_size);
+ src_offset = 0;
+ memset(src, rand() % 0xFF, buf_size);
+ memcpy(src, "randomtestdata", strlen("randomtestdata"));
+
+ dst = RedisModule_Calloc(1,buf_size);
+ dst_offset = 0;
+
+ /* Create a pipe and register it to the event loop. */
+ if (pipe(fds) < 0) return REDISMODULE_ERR;
+ if (fcntl(fds[0], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
+ if (fcntl(fds[1], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
+
+ if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE,
+ onReadable, "userdataread") != REDISMODULE_OK) return REDISMODULE_ERR;
+ if (RedisModule_EventLoopAdd(fds[1], REDISMODULE_EVENTLOOP_WRITABLE,
+ onWritable, "userdatawrite") != REDISMODULE_OK) return REDISMODULE_ERR;
+ return REDISMODULE_OK;
+}
+
+int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (pipe(fds) < 0) return REDISMODULE_ERR;
+
+ if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
+ == REDISMODULE_OK || errno != EINVAL) {
+ RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(-1, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
+ == REDISMODULE_OK || errno != ERANGE) {
+ RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(99999999, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
+ == REDISMODULE_OK || errno != ERANGE) {
+ RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, NULL, NULL)
+ == REDISMODULE_OK || errno != EINVAL) {
+ RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
+ == REDISMODULE_OK || errno != EINVAL) {
+ RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
+ != REDISMODULE_OK || errno != 0) {
+ RedisModule_ReplyWithError(ctx, "ERR del on non-registered fd should not fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopDel(fds[0], 9999999) == REDISMODULE_OK ||
+ errno != EINVAL) {
+ RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopDel(-1, REDISMODULE_EVENTLOOP_READABLE)
+ == REDISMODULE_OK || errno != ERANGE) {
+ RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopDel(99999999, REDISMODULE_EVENTLOOP_READABLE)
+ == REDISMODULE_OK || errno != ERANGE) {
+ RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
+ != REDISMODULE_OK || errno != 0) {
+ RedisModule_ReplyWithError(ctx, "ERR Add failed");
+ goto out;
+ }
+ if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
+ != REDISMODULE_OK || errno != 0) {
+ RedisModule_ReplyWithError(ctx, "ERR Adding same fd twice failed");
+ goto out;
+ }
+ if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
+ != REDISMODULE_OK || errno != 0) {
+ RedisModule_ReplyWithError(ctx, "ERR Del failed");
+ goto out;
+ }
+ if (RedisModule_EventLoopAddOneShot(NULL, NULL) == REDISMODULE_OK || errno != EINVAL) {
+ RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
+ goto out;
+ }
+
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+out:
+ close(fds[0]);
+ close(fds[1]);
+ return REDISMODULE_OK;
+}
+
+static long long beforeSleepCount;
+static long long afterSleepCount;
+
+int iteration(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ /* On each event loop iteration, eventloopCallback() is called. We increment
+ * beforeSleepCount and afterSleepCount, so these two should be equal.
+ * We reply with iteration count, caller can test if iteration count
+ * increments monotonically */
+ RedisModule_Assert(beforeSleepCount == afterSleepCount);
+ RedisModule_ReplyWithLongLong(ctx, beforeSleepCount);
+ return REDISMODULE_OK;
+}
+
+void oneshotCallback(void* arg)
+{
+ RedisModule_Assert(strcmp(arg, "userdata") == 0);
+ RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
+ RedisModule_FreeThreadSafeContext(reply_ctx);
+ RedisModule_UnblockClient(bc, NULL);
+}
+
+int oneshot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
+ reply_ctx = RedisModule_GetThreadSafeContext(bc);
+
+ if (RedisModule_EventLoopAddOneShot(oneshotCallback, "userdata") != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "ERR oneshot failed");
+ RedisModule_FreeThreadSafeContext(reply_ctx);
+ RedisModule_UnblockClient(bc, NULL);
+ }
+ return REDISMODULE_OK;
+}
+
+void eventloopCallback(struct RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(eid);
+ REDISMODULE_NOT_USED(subevent);
+ REDISMODULE_NOT_USED(data);
+
+ RedisModule_Assert(eid.id == REDISMODULE_EVENT_EVENTLOOP);
+ if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP)
+ beforeSleepCount++;
+ else if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP)
+ afterSleepCount++;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"eventloop",1,REDISMODULE_APIVER_1)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ /* Test basics. */
+ if (RedisModule_CreateCommand(ctx, "test.sanity", sanity, "", 0, 0, 0)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ /* Register a command to create a pipe() and send data through it by using
+ * event loop API. */
+ if (RedisModule_CreateCommand(ctx, "test.sendbytes", sendbytes, "", 0, 0, 0)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ /* Register a command to return event loop iteration count. */
+ if (RedisModule_CreateCommand(ctx, "test.iteration", iteration, "", 0, 0, 0)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "test.oneshot", oneshot, "", 0, 0, 0)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_EventLoop,
+ eventloopCallback) != REDISMODULE_OK) return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
diff --git a/tests/unit/moduleapi/eventloop.tcl b/tests/unit/moduleapi/eventloop.tcl
new file mode 100644
index 000000000..8a9f194e3
--- /dev/null
+++ b/tests/unit/moduleapi/eventloop.tcl
@@ -0,0 +1,25 @@
+set testmodule [file normalize tests/modules/eventloop.so]
+
+start_server {tags {"modules"}} {
+ r module load $testmodule
+
+ test "Module eventloop sendbytes" {
+ assert_match "OK" [r test.sendbytes 10000000]
+ assert_match "OK" [r test.sendbytes 2000000]
+ assert_match "OK" [r test.sendbytes 800000000]
+ }
+
+ test "Module eventloop iteration" {
+ set iteration [r test.iteration]
+ set next_iteration [r test.iteration]
+ assert {$next_iteration > $iteration}
+ }
+
+ test "Module eventloop sanity" {
+ r test.sanity
+ }
+
+ test "Module eventloop oneshot" {
+ r test.oneshot
+ }
+}