summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2017-05-02 15:05:39 +0200
committerantirez <antirez@gmail.com>2017-05-02 15:05:39 +0200
commit275905b3282581208e13ed61c7aca98fbbfc40c4 (patch)
tree8c2119a2079b00604d7a2b56804bf47b46165f6e
parent9c500b89fb4969dd54512d507549a782b02f6886 (diff)
downloadredis-275905b3282581208e13ed61c7aca98fbbfc40c4.tar.gz
Modules TSC: Handling of RM_Reply* functions.
-rw-r--r--src/adlist.c12
-rw-r--r--src/adlist.h1
-rw-r--r--src/module.c83
3 files changed, 82 insertions, 14 deletions
diff --git a/src/adlist.c b/src/adlist.c
index 2fb61a6f1..96575c72e 100644
--- a/src/adlist.c
+++ b/src/adlist.c
@@ -341,3 +341,15 @@ void listRotate(list *list) {
tail->next = list->head;
list->head = tail;
}
+
+/* Add all the elements of the list 'o' at the end of the
+ * list 'l'. The list 'other' remains empty but otherwise valid. */
+void listJoin(list *l, list *o) {
+ l->tail->next = o->head;
+ o->head->prev = l->tail;
+ l->tail = o->tail;
+
+ /* Setup other as an empty list. */
+ o->head = l->tail = NULL;
+ o->len = 0;
+}
diff --git a/src/adlist.h b/src/adlist.h
index e457a979e..c954fac87 100644
--- a/src/adlist.h
+++ b/src/adlist.h
@@ -86,6 +86,7 @@ listNode *listIndex(list *list, long index);
void listRewind(list *list, listIter *li);
void listRewindTail(list *list, listIter *li);
void listRotate(list *list);
+void listJoin(list *l, list *o);
/* Directions for iterators */
#define AL_START_HEAD 0
diff --git a/src/module.c b/src/module.c
index 171887ff3..5a6189ebc 100644
--- a/src/module.c
+++ b/src/module.c
@@ -953,10 +953,31 @@ int RM_WrongArity(RedisModuleCtx *ctx) {
return REDISMODULE_OK;
}
+/* Return the client object the `RM_Reply*` functions should target.
+ * Normally this is just `ctx->client`, that is the client that called
+ * the module command, however in the case of thread safe contexts there
+ * is no directly associated client (since it would not be safe to access
+ * the client from a thread), so instead the blocked client object referenced
+ * in the thread safe context, has a fake client that we just use to accumulate
+ * the replies. Later, when the client is unblocked, the accumulated replies
+ * are appended to the actual client.
+ *
+ * The function returns the client pointer depending on the context, or
+ * NULL if there is no potential client. This happens when we are in the
+ * context of a thread safe context that was not initialized with a blocked
+ * client object. */
+client *moduleGetReplyClient(RedisModuleCtx *ctx) {
+ if (ctx->client) return ctx->client;
+ if (ctx->blocked_client) return ctx->blocked_client->reply_client;
+ return NULL;
+}
+
/* Send an integer reply to the client, with the specified long long value.
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
- addReplyLongLong(ctx->client,ll);
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyLongLong(c,ll);
return REDISMODULE_OK;
}
@@ -964,10 +985,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
* ReplyWithSimpleString() and ReplyWithError().
* The function always returns REDISMODULE_OK. */
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
sds strmsg = sdsnewlen(prefix,1);
strmsg = sdscat(strmsg,msg);
strmsg = sdscatlen(strmsg,"\r\n",2);
- addReplySds(ctx->client,strmsg);
+ addReplySds(c,strmsg);
return REDISMODULE_OK;
}
@@ -1010,14 +1033,16 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)*
(ctx->postponed_arrays_count+1));
ctx->postponed_arrays[ctx->postponed_arrays_count] =
- addDeferredMultiBulkLength(ctx->client);
+ addDeferredMultiBulkLength(c);
ctx->postponed_arrays_count++;
} else {
- addReplyMultiBulkLen(ctx->client,len);
+ addReplyMultiBulkLen(c,len);
}
return REDISMODULE_OK;
}
@@ -1049,6 +1074,8 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
* that is not easy to calculate in advance the number of elements.
*/
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return;
if (ctx->postponed_arrays_count == 0) {
serverLog(LL_WARNING,
"API misuse detected in module %s: "
@@ -1058,7 +1085,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
return;
}
ctx->postponed_arrays_count--;
- setDeferredMultiBulkLength(ctx->client,
+ setDeferredMultiBulkLength(c,
ctx->postponed_arrays[ctx->postponed_arrays_count],
len);
if (ctx->postponed_arrays_count == 0) {
@@ -1071,7 +1098,9 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
- addReplyBulkCBuffer(ctx->client,(char*)buf,len);
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyBulkCBuffer(c,(char*)buf,len);
return REDISMODULE_OK;
}
@@ -1079,7 +1108,9 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
- addReplyBulk(ctx->client,str);
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyBulk(c,str);
return REDISMODULE_OK;
}
@@ -1088,7 +1119,9 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
- addReply(ctx->client,shared.nullbulk);
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReply(c,shared.nullbulk);
return REDISMODULE_OK;
}
@@ -1099,8 +1132,10 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
sds proto = sdsnewlen(reply->proto, reply->protolen);
- addReplySds(ctx->client,proto);
+ addReplySds(c,proto);
return REDISMODULE_OK;
}
@@ -1111,7 +1146,9 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
*
* The function always returns REDISMODULE_OK. */
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
- addReplyDouble(ctx->client,d);
+ client *c = moduleGetReplyClient(ctx);
+ if (c == NULL) return REDISMODULE_OK;
+ addReplyDouble(c,d);
return REDISMODULE_OK;
}
@@ -3173,7 +3210,8 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
bc->timeout_callback = timeout_callback;
bc->free_privdata = free_privdata;
bc->privdata = NULL;
- bc->reply_client = NULL;
+ bc->reply_client = createClient(-1);
+ bc->reply_client->flags |= CLIENT_MODULE;
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
blockClient(c,BLOCKED_MODULE);
@@ -3236,7 +3274,9 @@ void moduleHandleBlockedClients(void) {
/* Release the lock during the loop, as long as we don't
* touch the shared list. */
- if (c != NULL && bc->reply_callback != NULL) {
+ /* Call the reply callback if the client is valid and we have
+ * any callback. */
+ if (c && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
@@ -3246,8 +3286,24 @@ void moduleHandleBlockedClients(void) {
moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
+
+ /* Free privdata if any. */
if (bc->privdata && bc->free_privdata)
bc->free_privdata(bc->privdata);
+
+ /* It is possible that this blocked client object accumulated
+ * replies to send to the client in a thread safe context.
+ * We need to glue such replies to the client output buffer and
+ * free the temporary client we just used for the replies. */
+ if (c) {
+ if (bc->reply_client->bufpos)
+ addReplyString(c,bc->reply_client->buf,
+ bc->reply_client->bufpos);
+ if (listLength(bc->reply_client->reply))
+ listJoin(c->reply,bc->reply_client->reply);
+ }
+ freeClient(bc->reply_client);
+
if (c != NULL) unblockClient(c);
/* Free 'bc' only after unblocking the client, since it is
* referenced in the client blocking context, and must be valid
@@ -3332,8 +3388,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
memcpy(ctx,&empty,sizeof(empty));
if (bc) {
ctx->blocked_client = bc;
- if (bc->reply_client == NULL)
- bc->reply_client = createClient(-1);
+ ctx->module = bc->module;
}
ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
return ctx;