summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-05-02 12:53:10 +0200
committerantirez <antirez@gmail.com>2017-05-02 12:53:10 +0200
commit9c500b89fb4969dd54512d507549a782b02f6886 (patch)
tree7fb886781f7f00e7092e5f26161113138bf2e5a1
parent59b06b14c9801d1f65cab55559c75f8146b8b8b1 (diff)
downloadredis-9c500b89fb4969dd54512d507549a782b02f6886.tar.gz
Modules TSC: Basic TS context creeation and handling.
-rw-r--r--src/module.c61
1 files changed, 60 insertions, 1 deletions
diff --git a/src/module.c b/src/module.c
index cbc6ff627..171887ff3 100644
--- a/src/module.c
+++ b/src/module.c
@@ -95,10 +95,15 @@ typedef struct RedisModulePoolAllocBlock {
*
* Note that not all the context structure is always filled with actual values
* but only the fields needed in a given context. */
+
+struct RedisModuleBlockedClient;
+
struct RedisModuleCtx {
void *getapifuncptr; /* NOTE: Must be the first field. */
struct RedisModule *module; /* Module reference. */
client *client; /* Client calling a command. */
+ struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
+ thread safe context. */
struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
int amqueue_len; /* Number of slots in amqueue. */
int amqueue_used; /* Number of used slots in amqueue. */
@@ -115,12 +120,13 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
+#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -198,6 +204,8 @@ typedef struct RedisModuleBlockedClient {
void *privdata; /* Module private data that may be used by the reply
or timeout callback. It is set via the
RedisModule_UnblockClient() API. */
+ client *reply_client; /* Fake client used to accumulate replies
+ in thread safe contexts. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@@ -3165,6 +3173,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
bc->timeout_callback = timeout_callback;
bc->free_privdata = free_privdata;
bc->privdata = NULL;
+ bc->reply_client = NULL;
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
blockClient(c,BLOCKED_MODULE);
@@ -3300,6 +3309,56 @@ void moduleCooperativeMultiTaskingCycle(void) {
pthread_mutex_lock(&moduleGIL);
}
+/* Return a context which can be used inside threads to make Redis context
+ * calls with certain modules APIs. If 'bc' is not NULL then the module will
+ * be bound to a blocked client, and it will be possible to use the
+ * `RedisModule_Reply*` family of functions to accumulate a reply for when the
+ * client will be unblocked. Otherwise the thread safe context will be
+ * detached by a specific client.
+ *
+ * To call non-reply APIs, the thread safe context must be prepared with:
+ *
+ * RedisModule_ThreadSafeCallStart(ctx);
+ * ... make your call here ...
+ * RedisModule_ThreadSafeCallStop(ctx);
+ *
+ * This is not needed when using `RedisModule_Reply*` functions, assuming
+ * that a blocked client was used when the context was created, otherwise
+ * no RedisModule_Reply* call should be made at all.
+ */
+RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
+ RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
+ RedisModuleCtx empty = REDISMODULE_CTX_INIT;
+ memcpy(ctx,&empty,sizeof(empty));
+ if (bc) {
+ ctx->blocked_client = bc;
+ if (bc->reply_client == NULL)
+ bc->reply_client = createClient(-1);
+ }
+ ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
+ return ctx;
+}
+
+/* Release a thread safe context. */
+void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
+ moduleFreeContext(ctx);
+ zfree(ctx);
+}
+
+/* Acquire the server lock before executing a thread safe API call.
+ * This is not needed for `RedisModule_Reply*` calls when there is
+ * a blocked client connected to the thread safe context. */
+void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
+ DICT_NOTUSED(ctx);
+ pthread_mutex_lock(&moduleGIL);
+}
+
+/* Release the server lock after a thread safe API call was executed. */
+void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
+ DICT_NOTUSED(ctx);
+ pthread_mutex_unlock(&moduleGIL);
+}
+
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */