summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-05-03 11:26:21 +0200
committerantirez <antirez@gmail.com>2017-05-03 11:26:21 +0200
commit3fcf959e609e850a114d4016843e4c991066ebac (patch)
tree6ec7dc7bd6ecb0d8b36b247018ab999c5a75290e
parentba4a5a3255d9a28c6aa9ead36f640705704249a7 (diff)
downloadredis-3fcf959e609e850a114d4016843e4c991066ebac.tar.gz
Modules TSC: Release the GIL for all the time we are blocked.
Instead of giving the module background operations just a small time to run in the beforeSleep() function, we can have the lock released for all the time we are blocked in the multiplexing syscall.
-rw-r--r--src/ae.c12
-rw-r--r--src/ae.h2
-rw-r--r--src/module.c45
-rw-r--r--src/modules/helloblock.c42
-rw-r--r--src/server.c17
-rw-r--r--src/server.h4
6 files changed, 100 insertions, 22 deletions
diff --git a/src/ae.c b/src/ae.c
index e66808a81..ecbaa94f3 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -75,6 +75,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
+ eventLoop->aftersleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
@@ -397,7 +398,14 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
}
}
+ /* Call the multiplexing API, will return only on timeout or when
+ * some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
+
+ /* After sleep callback. */
+ if (eventLoop->aftersleep != NULL)
+ eventLoop->aftersleep(eventLoop);
+
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
@@ -463,3 +471,7 @@ char *aeGetApiName(void) {
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
+
+void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
+ eventLoop->aftersleep = aftersleep;
+}
diff --git a/src/ae.h b/src/ae.h
index 827c4c9e4..e3617759b 100644
--- a/src/ae.h
+++ b/src/ae.h
@@ -98,6 +98,7 @@ typedef struct aeEventLoop {
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
+ aeBeforeSleepProc *aftersleep;
} aeEventLoop;
/* Prototypes */
@@ -117,6 +118,7 @@ int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
+void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
diff --git a/src/module.c b/src/module.c
index e9b95f974..9b78a4a56 100644
--- a/src/module.c
+++ b/src/module.c
@@ -433,6 +433,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
"calls.",
ctx->module->name);
}
+ if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
}
/* Helper function for when a command callback is called, in order to handle
@@ -967,8 +968,10 @@ int RM_WrongArity(RedisModuleCtx *ctx) {
* context of a thread safe context that was not initialized with a blocked
* client object. */
client *moduleGetReplyClient(RedisModuleCtx *ctx) {
- if (ctx->client) return ctx->client;
- if (ctx->blocked_client) return ctx->blocked_client->reply_client;
+ if (!(ctx->flags & REDISMODULE_CTX_THREAD_SAFE) && ctx->client)
+ return ctx->client;
+ if (ctx->blocked_client)
+ return ctx->blocked_client->reply_client;
return NULL;
}
@@ -3351,20 +3354,6 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
* Thread Safe Contexts
* -------------------------------------------------------------------------- */
-/* Operations executed in thread safe contexts use a global lock in order to
- * be ran at a safe time. This function unlocks and re-acquire the locks:
- * hopefully with *any* sane implementation of pthreads, this will allow the
- * modules to make progresses.
- *
- * This function is called in beforeSleep(). */
-void moduleCooperativeMultiTaskingCycle(void) {
- if (dictSize(modules) == 0) return; /* No modules, no async ops. */
- pthread_mutex_unlock(&moduleGIL);
- /* Here hopefully thread modules waiting to be executed at a safe time
- * should be able to acquire the lock. */
- pthread_mutex_lock(&moduleGIL);
-}
-
/* Return a context which can be used inside threads to make Redis context
* calls with certain modules APIs. If 'bc' is not NULL then the module will
* be bound to a blocked client, and it will be possible to use the
@@ -3381,7 +3370,9 @@ void moduleCooperativeMultiTaskingCycle(void) {
* This is not needed when using `RedisModule_Reply*` functions, assuming
* that a blocked client was used when the context was created, otherwise
* no RedisModule_Reply* call should be made at all.
- */
+ *
+ * TODO: thread safe contexts do not inherit the blocked client
+ * selected database. */
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
RedisModuleCtx empty = REDISMODULE_CTX_INIT;
@@ -3391,6 +3382,11 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
ctx->module = bc->module;
}
ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
+ /* Even when the context is associated with a blocked client, we can't
+ * access it safely from another thread, so we create a fake client here
+ * in order to keep things like the currently selected database and similar
+ * things. */
+ ctx->client = createClient(-1);
return ctx;
}
@@ -3405,12 +3401,20 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
* a blocked client connected to the thread safe context. */
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
DICT_NOTUSED(ctx);
- pthread_mutex_lock(&moduleGIL);
+ moduleAcquireGIL();
}
/* Release the server lock after a thread safe API call was executed. */
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
DICT_NOTUSED(ctx);
+ moduleReleaseGIL();
+}
+
+void moduleAcquireGIL(void) {
+ pthread_mutex_lock(&moduleGIL);
+}
+
+void moduleReleaseGIL(void) {
pthread_mutex_unlock(&moduleGIL);
}
@@ -3655,6 +3659,11 @@ void moduleCommand(client *c) {
}
}
+/* Return the number of registered modules. */
+size_t moduleCount(void) {
+ return dictSize(modules);
+}
+
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
diff --git a/src/modules/helloblock.c b/src/modules/helloblock.c
index 71ec9b121..e760e33fb 100644
--- a/src/modules/helloblock.c
+++ b/src/modules/helloblock.c
@@ -105,6 +105,45 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
return REDISMODULE_OK;
}
+/* The thread entry point that actually executes the blocking part
+ * of the command HELLO.KEYS. */
+void *HelloKeys_ThreadMain(void *arg) {
+ RedisModuleBlockedClient *bc = arg;
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
+
+ RedisModule_ThreadSafeContextLock(ctx);
+ RedisModule_ReplyWithLongLong(ctx,1234);
+ RedisModule_ThreadSafeContextUnlock(ctx);
+
+ RedisModule_UnblockClient(bc,NULL);
+ return NULL;
+}
+
+/* HELLO.KEYS -- Return all the keys in the current database without blocking
+ * the server. The keys do not represent a point-in-time state so only the keys
+ * that were in the database from the start to the end are guaranteed to be
+ * there. */
+int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1) return RedisModule_WrongArity(ctx);
+
+ pthread_t tid;
+
+ /* Note that when blocking the client we do not set any callback: no
+ * timeout is possible since we passed '0', nor we need a reply callback
+ * because we'll use the thread safe context to accumulate a reply. */
+ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
+
+ /* Now that we setup a blocking client, we need to pass the control
+ * to the thread. However we need to pass arguments to the thread:
+ * the reference to the blocked client handle. */
+ if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
+ RedisModule_AbortBlock(bc);
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ }
+ return REDISMODULE_OK;
+}
+
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
@@ -117,6 +156,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"hello.block",
HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"hello.keys",
+ HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
return REDISMODULE_OK;
}
diff --git a/src/server.c b/src/server.c
index e9013bf60..6be12cffe 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1172,9 +1172,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
- /* Give some run time to modules threads using thread safe contexts. */
- moduleCooperativeMultiTaskingCycle();
-
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
@@ -1219,6 +1216,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
+
+ /* Before we are going to sleep, let the threads access the dataset by
+ * releasing the GIL. Redis main thread will not touch anything at this
+ * time. */
+ if (moduleCount()) moduleReleaseGIL();
+}
+
+/* This function is called immadiately after the event loop multiplexing
+ * API returned, and the control is going to soon return to Redis by invoking
+ * the different events callbacks. */
+void afterSleep(struct aeEventLoop *eventLoop) {
+ UNUSED(eventLoop);
+ if (moduleCount()) moduleAcquireGIL();
}
/* =========================== Server initialization ======================== */
@@ -3808,6 +3818,7 @@ int main(int argc, char **argv) {
}
aeSetBeforeSleepProc(server.el,beforeSleep);
+ aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
diff --git a/src/server.h b/src/server.h
index 956370296..2bc49299b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1294,7 +1294,9 @@ void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
-void moduleCooperativeMultiTaskingCycle(void);
+size_t moduleCount(void);
+void moduleAcquireGIL(void);
+void moduleReleaseGIL(void);
/* Utils */
long long ustime(void);