diff options
-rw-r--r-- | src/call_reply.c | 15 | ||||
-rw-r--r-- | src/call_reply.h | 3 | ||||
-rw-r--r-- | src/module.c | 12 | ||||
-rw-r--r-- | src/networking.c | 37 | ||||
-rw-r--r-- | src/server.h | 2 | ||||
-rw-r--r-- | tests/unit/moduleapi/blockedclient.tcl | 20 |
6 files changed, 85 insertions, 4 deletions
diff --git a/src/call_reply.c b/src/call_reply.c index 7aa79d089..3694db55e 100644 --- a/src/call_reply.c +++ b/src/call_reply.c @@ -60,7 +60,7 @@ struct CallReply { double d; /* Reply value for double reply. */ struct CallReply *array; /* Array of sub-reply elements. used for set, array, map, and attribute */ } val; - + list *deferred_error_list; /* list of errors in sds form or NULL */ struct CallReply *attribute; /* attribute reply, NULL if not exists */ }; @@ -237,6 +237,8 @@ void freeCallReply(CallReply *rep) { freeCallReplyInternal(rep); } sdsfree(rep->original_proto); + if (rep->deferred_error_list) + listRelease(rep->deferred_error_list); zfree(rep); } @@ -488,6 +490,11 @@ int callReplyIsResp3(CallReply *rep) { return rep->flags & REPLY_FLAG_RESP3; } +/* Returns a list of errors in sds form, or NULL. */ +list *callReplyDeferredErrorList(CallReply *rep) { + return rep->deferred_error_list; +} + /* Create a new CallReply struct from the reply blob. * * The function will own the reply blob, so it must not be used or freed by @@ -496,6 +503,9 @@ int callReplyIsResp3(CallReply *rep) { * The reply blob will be freed when the returned CallReply struct is later * freed using freeCallReply(). * + * The deferred_error_list is an optional list of errors that are present + * in the reply blob, if given, this function will take ownership on it. + * * The private_data is optional and can later be accessed using * callReplyGetPrivateData(). * @@ -504,7 +514,7 @@ int callReplyIsResp3(CallReply *rep) { * DESIGNED TO HANDLE USER INPUT and using it to parse invalid replies is * unsafe. */ -CallReply *callReplyCreate(sds reply, void *private_data) { +CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data) { CallReply *res = zmalloc(sizeof(*res)); res->flags = REPLY_FLAG_ROOT; res->original_proto = reply; @@ -512,5 +522,6 @@ CallReply *callReplyCreate(sds reply, void *private_data) { res->proto_len = sdslen(reply); res->private_data = private_data; res->attribute = NULL; + res->deferred_error_list = deferred_error_list; return res; } diff --git a/src/call_reply.h b/src/call_reply.h index 5b07dc437..ff98f7f5a 100644 --- a/src/call_reply.h +++ b/src/call_reply.h @@ -34,7 +34,7 @@ typedef struct CallReply CallReply; -CallReply *callReplyCreate(sds reply, void *private_data); +CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data); int callReplyType(CallReply *rep); const char *callReplyGetString(CallReply *rep, size_t *len); long long callReplyGetLongLong(CallReply *rep); @@ -51,6 +51,7 @@ const char *callReplyGetVerbatim(CallReply *rep, size_t *len, const char **forma const char *callReplyGetProto(CallReply *rep, size_t *len); void *callReplyGetPrivateData(CallReply *rep); int callReplyIsResp3(CallReply *rep); +list *callReplyDeferredErrorList(CallReply *rep); void freeCallReply(CallReply *rep); #endif /* SRC_CALL_REPLY_H_ */ diff --git a/src/module.c b/src/module.c index 8fca2572d..c6e816c6e 100644 --- a/src/module.c +++ b/src/module.c @@ -2918,6 +2918,15 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { size_t proto_len; const char *proto = callReplyGetProto(reply, &proto_len); addReplyProto(c, proto, proto_len); + /* Propagate the error list from that reply to the other client, to do some + * post error reply handling, like statistics. + * Note that if the original reply had an array with errors, and the module + * replied with just a portion of the original reply, and not the entire + * reply, the errors are currently not propagated and the errors stats + * will not get propagated. */ + list *errors = callReplyDeferredErrorList(reply); + if (errors) + deferredAfterErrorReply(c, errors); return REDISMODULE_OK; } @@ -5654,7 +5663,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch proto = sdscatlen(proto,o->buf,o->used); listDelNode(c->reply,listFirst(c->reply)); } - reply = callReplyCreate(proto, ctx); + reply = callReplyCreate(proto, c->deferred_reply_errors, ctx); + c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */ autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); cleanup: diff --git a/src/networking.c b/src/networking.c index 05001c564..0ae0f6eb9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -173,6 +173,7 @@ client *createClient(connection *conn) { c->slave_capa = SLAVE_CAPA_NONE; c->slave_req = SLAVE_REQ_NONE; c->reply = listCreate(); + c->deferred_reply_errors = NULL; c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); @@ -439,6 +440,18 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ void afterErrorReply(client *c, const char *s, size_t len) { + /* Module clients fall into two categories: + * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted. + * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */ + if (c->flags & CLIENT_MODULE) { + if (!c->deferred_reply_errors) { + c->deferred_reply_errors = listCreate(); + listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree); + } + listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len)); + return; + } + /* Increment the global error counter */ server.stat_total_error_replies++; /* Increment the error stats @@ -1024,10 +1037,28 @@ void AddReplyFromClient(client *dst, client *src) { src->reply_bytes = 0; src->bufpos = 0; + if (src->deferred_reply_errors) { + deferredAfterErrorReply(dst, src->deferred_reply_errors); + listRelease(src->deferred_reply_errors); + src->deferred_reply_errors = NULL; + } + /* Check output buffer limits */ closeClientOnOutputBufferLimitReached(dst, 1); } +/* Append the listed errors to the server error statistics. the input + * list is not modified and remains the responsibility of the caller. */ +void deferredAfterErrorReply(client *c, list *errors) { + listIter li; + listNode *ln; + listRewind(errors,&li); + while((ln = listNext(&li))) { + sds err = ln->value; + afterErrorReply(c, err, sdslen(err)); + } +} + /* Logically copy 'src' replica client buffers info to 'dst' replica. * Basically increase referenced buffer block node reference count. */ void copyReplicaOutputBuffer(client *dst, client *src) { @@ -1497,6 +1528,8 @@ void freeClient(client *c) { freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); + if (c->deferred_reply_errors) + listRelease(c->deferred_reply_errors); /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different @@ -1863,6 +1896,10 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; + if (c->deferred_reply_errors) + listRelease(c->deferred_reply_errors); + c->deferred_reply_errors = NULL; + /* We clear the ASKING flag as well if we are not inside a MULTI, and * if what we just executed is not the ASKING command itself. */ if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) diff --git a/src/server.h b/src/server.h index 994e98c32..853e9f5fa 100644 --- a/src/server.h +++ b/src/server.h @@ -1087,6 +1087,7 @@ typedef struct client { long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ + list *deferred_reply_errors; /* Used for module thread safe contexts. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ @@ -2428,6 +2429,7 @@ void addReplySubcommandSyntaxError(client *c); void addReplyLoadedModules(client *c); void copyReplicaOutputBuffer(client *dst, client *src); void addListRangeReply(client *c, robj *o, long start, long end, int reverse); +void deferredAfterErrorReply(client *c, list *errors); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); void freeClientReplyValue(void *o); diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 523d7ba69..d782fab36 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -180,6 +180,26 @@ start_server {tags {"modules"}} { assert_no_match "*name=myclient*" $clients } + test {module client error stats} { + r config resetstat + + assert_error "NULL reply returned" {r do_rm_call hgetalllll} + assert_equal [errorrstat NULL r] {count=1} + + assert_error "NULL reply returned" {r do_bg_rm_call hgetalllll} + assert_equal [errorrstat NULL r] {count=2} + + r do_rm_call set x x + assert_error "ERR wrong number of arguments for 'do_rm_call' command" {r do_rm_call} + assert_equal [errorrstat ERR r] {count=1} + + assert_error "WRONGTYPE*" {r do_rm_call hgetall x} + assert_equal [errorrstat WRONGTYPE r] {count=1} + + assert_error "WRONGTYPE*" {r do_bg_rm_call hgetall x} + assert_equal [errorrstat WRONGTYPE r] {count=2} + } + test "Unload the module - blockedclient" { assert_equal {OK} [r module unload blockedclient] } |