summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2020-12-08 16:41:20 +0200
committerOran Agra <oran@redislabs.com>2021-01-12 16:25:37 +0200
commitec74ae7ec1e758aa98c452296c2fa75b6ad9219f (patch)
tree4d63809a94dedec86896bd5de09c1d4c84df07eb
parent785851a7365945e0b05278625f1f1d9a43c13bb4 (diff)
downloadredis-ec74ae7ec1e758aa98c452296c2fa75b6ad9219f.tar.gz
Handle output buffer limits for Module blocked clients (#8141)
Module blocked clients cache the response in a temporary client, the reply list in this client would be affected by the recent fix in #7202, but when the reply is later copied into the real client, it would have bypassed all the checks for output buffer limit, which would have resulted in both: responding with a partial response to the client, and also not disconnecting it at all. (cherry picked from commit 48efc25f749c3620f9245786582ac76cb40e9bf4)
-rw-r--r--src/networking.c33
-rw-r--r--tests/modules/blockedclient.c105
-rw-r--r--tests/unit/moduleapi/blockedclient.tcl13
3 files changed, 150 insertions, 1 deletions
diff --git a/src/networking.c b/src/networking.c
index 54de8ac54..9150426a5 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -260,6 +260,9 @@ int prepareClientToWrite(client *c) {
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
+/* Attempts to add the reply to the static buffer in the client struct.
+ * Returns C_ERR if the buffer is full, or the reply list is not empty,
+ * in which case the reply must be added to the reply list. */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
@@ -277,6 +280,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
return C_OK;
}
+/* Adds the reply to the reply linked list.
+ * Note: some edits to this function need to be relayed to AddReplyFromClient. */
void _addReplyProtoToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
@@ -837,14 +842,40 @@ void addReplySubcommandSyntaxError(client *c) {
/* Append 'src' client output buffers into 'dst' client output buffers.
* This function clears the output buffers of 'src' */
void AddReplyFromClient(client *dst, client *src) {
- if (prepareClientToWrite(dst) != C_OK)
+ /* If the source client contains a partial response due to client output
+ * buffer limits, propagate that to the dest rather than copy a partial
+ * reply. We don't wanna run the risk of copying partial response in case
+ * for some reason the output limits don't reach the same decision (maybe
+ * they changed) */
+ if (src->flags & CLIENT_CLOSE_ASAP) {
+ sds client = catClientInfoString(sdsempty(),dst);
+ freeClientAsync(dst);
+ serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
+ sdsfree(client);
return;
+ }
+
+ /* First add the static buffer (either into the static buffer or reply list) */
addReplyProto(dst,src->buf, src->bufpos);
+
+ /* We need to check with prepareClientToWrite again (after addReplyProto)
+ * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
+ if (prepareClientToWrite(dst) != C_OK)
+ return;
+
+ /* We're bypassing _addReplyProtoToList, so we need to add the pre/post
+ * checks in it. */
+ if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
+
+ /* Concatenate the reply list into the dest */
if (listLength(src->reply))
listJoin(dst->reply,src->reply);
dst->reply_bytes += src->reply_bytes;
src->reply_bytes = 0;
src->bufpos = 0;
+
+ /* Check output buffer limits */
+ asyncCloseClientOnOutputBufferLimitReached(dst);
}
/* Copy 'src' client output buffers into 'dst' client output buffers.
diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c
index 558e06502..1c485a44a 100644
--- a/tests/modules/blockedclient.c
+++ b/tests/modules/blockedclient.c
@@ -79,6 +79,105 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
+typedef struct {
+ RedisModuleString **argv;
+ int argc;
+ RedisModuleBlockedClient *bc;
+} bg_call_data;
+
+void *bg_call_worker(void *arg) {
+ bg_call_data *bg = arg;
+
+ // Get Redis module context
+ RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
+
+ // Acquire GIL
+ RedisModule_ThreadSafeContextLock(ctx);
+
+ // Call the command
+ const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
+
+ // Release GIL
+ RedisModule_ThreadSafeContextUnlock(ctx);
+
+ // Reply to client
+ if (!rep) {
+ RedisModule_ReplyWithError(ctx, "NULL reply returned");
+ } else {
+ RedisModule_ReplyWithCallReply(ctx, rep);
+ RedisModule_FreeCallReply(rep);
+ }
+
+ // Unblock client
+ RedisModule_UnblockClient(bg->bc, NULL);
+
+ /* Free the arguments */
+ for (int i=0; i<bg->argc; i++)
+ RedisModule_FreeString(ctx, bg->argv[i]);
+ RedisModule_Free(bg->argv);
+ RedisModule_Free(bg);
+
+ // Free the Redis module context
+ RedisModule_FreeThreadSafeContext(ctx);
+
+ return NULL;
+}
+
+int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ UNUSED(argv);
+ UNUSED(argc);
+
+ /* Make sure we're not trying to block a client when we shouldn't */
+ int flags = RedisModule_GetContextFlags(ctx);
+ int allFlags = RedisModule_GetContextFlagsAll();
+ if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
+ (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
+ RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
+ return REDISMODULE_OK;
+ }
+
+ /* Make a copy of the arguments and pass them to the thread. */
+ bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
+ bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
+ bg->argc = argc;
+ for (int i=0; i<argc; i++)
+ bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
+
+ /* Block the client */
+ bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
+
+ /* Start a thread to handle the request */
+ pthread_t tid;
+ int res = pthread_create(&tid, NULL, bg_call_worker, bg);
+ assert(res == 0);
+
+ return REDISMODULE_OK;
+}
+
+int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
+ UNUSED(argv);
+ UNUSED(argc);
+
+ if(argc < 2){
+ return RedisModule_WrongArity(ctx);
+ }
+
+ const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
+
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
+ if(!rep){
+ RedisModule_ReplyWithError(ctx, "NULL reply returned");
+ }else{
+ RedisModule_ReplyWithCallReply(ctx, rep);
+ RedisModule_FreeCallReply(rep);
+ }
+
+ return REDISMODULE_OK;
+}
+
+
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@@ -89,5 +188,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
return REDISMODULE_OK;
}
diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl
index 5a541d138..77d94c7bf 100644
--- a/tests/unit/moduleapi/blockedclient.tcl
+++ b/tests/unit/moduleapi/blockedclient.tcl
@@ -14,4 +14,17 @@ start_server {tags {"modules"}} {
r acquire_gil
assert_equal {{Blocked client is not supported inside multi}} [r exec]
}
+
+ test {blocked client reaches client output buffer limit} {
+ r hset hash big [string repeat x 50000]
+ r hset hash bada [string repeat x 50000]
+ r hset hash boom [string repeat x 50000]
+ r config set client-output-buffer-limit {normal 100000 0 0}
+ r client setname myclient
+ catch {r do_bg_rm_call hgetall hash} e
+ assert_match "*I/O error*" $e
+ reconnect
+ set clients [r client list]
+ assert_no_match "*name=myclient*" $clients
+ }
}