summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/call_reply.c15
-rw-r--r--src/call_reply.h3
-rw-r--r--src/module.c12
-rw-r--r--src/networking.c37
-rw-r--r--src/server.h2
-rw-r--r--tests/unit/moduleapi/blockedclient.tcl20
6 files changed, 85 insertions, 4 deletions
diff --git a/src/call_reply.c b/src/call_reply.c
index 7aa79d089..3694db55e 100644
--- a/src/call_reply.c
+++ b/src/call_reply.c
@@ -60,7 +60,7 @@ struct CallReply {
double d; /* Reply value for double reply. */
struct CallReply *array; /* Array of sub-reply elements. used for set, array, map, and attribute */
} val;
-
+ list *deferred_error_list; /* list of errors in sds form or NULL */
struct CallReply *attribute; /* attribute reply, NULL if not exists */
};
@@ -237,6 +237,8 @@ void freeCallReply(CallReply *rep) {
freeCallReplyInternal(rep);
}
sdsfree(rep->original_proto);
+ if (rep->deferred_error_list)
+ listRelease(rep->deferred_error_list);
zfree(rep);
}
@@ -488,6 +490,11 @@ int callReplyIsResp3(CallReply *rep) {
return rep->flags & REPLY_FLAG_RESP3;
}
+/* Returns a list of errors in sds form, or NULL. */
+list *callReplyDeferredErrorList(CallReply *rep) {
+ return rep->deferred_error_list;
+}
+
/* Create a new CallReply struct from the reply blob.
*
* The function will own the reply blob, so it must not be used or freed by
@@ -496,6 +503,9 @@ int callReplyIsResp3(CallReply *rep) {
* The reply blob will be freed when the returned CallReply struct is later
* freed using freeCallReply().
*
+ * The deferred_error_list is an optional list of errors that are present
+ * in the reply blob, if given, this function will take ownership on it.
+ *
* The private_data is optional and can later be accessed using
* callReplyGetPrivateData().
*
@@ -504,7 +514,7 @@ int callReplyIsResp3(CallReply *rep) {
* DESIGNED TO HANDLE USER INPUT and using it to parse invalid replies is
* unsafe.
*/
-CallReply *callReplyCreate(sds reply, void *private_data) {
+CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data) {
CallReply *res = zmalloc(sizeof(*res));
res->flags = REPLY_FLAG_ROOT;
res->original_proto = reply;
@@ -512,5 +522,6 @@ CallReply *callReplyCreate(sds reply, void *private_data) {
res->proto_len = sdslen(reply);
res->private_data = private_data;
res->attribute = NULL;
+ res->deferred_error_list = deferred_error_list;
return res;
}
diff --git a/src/call_reply.h b/src/call_reply.h
index 5b07dc437..ff98f7f5a 100644
--- a/src/call_reply.h
+++ b/src/call_reply.h
@@ -34,7 +34,7 @@
typedef struct CallReply CallReply;
-CallReply *callReplyCreate(sds reply, void *private_data);
+CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data);
int callReplyType(CallReply *rep);
const char *callReplyGetString(CallReply *rep, size_t *len);
long long callReplyGetLongLong(CallReply *rep);
@@ -51,6 +51,7 @@ const char *callReplyGetVerbatim(CallReply *rep, size_t *len, const char **forma
const char *callReplyGetProto(CallReply *rep, size_t *len);
void *callReplyGetPrivateData(CallReply *rep);
int callReplyIsResp3(CallReply *rep);
+list *callReplyDeferredErrorList(CallReply *rep);
void freeCallReply(CallReply *rep);
#endif /* SRC_CALL_REPLY_H_ */
diff --git a/src/module.c b/src/module.c
index 8fca2572d..c6e816c6e 100644
--- a/src/module.c
+++ b/src/module.c
@@ -2918,6 +2918,15 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
size_t proto_len;
const char *proto = callReplyGetProto(reply, &proto_len);
addReplyProto(c, proto, proto_len);
+ /* Propagate the error list from that reply to the other client, to do some
+ * post error reply handling, like statistics.
+ * Note that if the original reply had an array with errors, and the module
+ * replied with just a portion of the original reply, and not the entire
+ * reply, the errors are currently not propagated and the errors stats
+ * will not get propagated. */
+ list *errors = callReplyDeferredErrorList(reply);
+ if (errors)
+ deferredAfterErrorReply(c, errors);
return REDISMODULE_OK;
}
@@ -5654,7 +5663,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
}
- reply = callReplyCreate(proto, ctx);
+ reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
+ c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
cleanup:
diff --git a/src/networking.c b/src/networking.c
index 05001c564..0ae0f6eb9 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -173,6 +173,7 @@ client *createClient(connection *conn) {
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
+ c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
@@ -439,6 +440,18 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
void afterErrorReply(client *c, const char *s, size_t len) {
+ /* Module clients fall into two categories:
+ * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted.
+ * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */
+ if (c->flags & CLIENT_MODULE) {
+ if (!c->deferred_reply_errors) {
+ c->deferred_reply_errors = listCreate();
+ listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree);
+ }
+ listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len));
+ return;
+ }
+
/* Increment the global error counter */
server.stat_total_error_replies++;
/* Increment the error stats
@@ -1024,10 +1037,28 @@ void AddReplyFromClient(client *dst, client *src) {
src->reply_bytes = 0;
src->bufpos = 0;
+ if (src->deferred_reply_errors) {
+ deferredAfterErrorReply(dst, src->deferred_reply_errors);
+ listRelease(src->deferred_reply_errors);
+ src->deferred_reply_errors = NULL;
+ }
+
/* Check output buffer limits */
closeClientOnOutputBufferLimitReached(dst, 1);
}
+/* Append the listed errors to the server error statistics. the input
+ * list is not modified and remains the responsibility of the caller. */
+void deferredAfterErrorReply(client *c, list *errors) {
+ listIter li;
+ listNode *ln;
+ listRewind(errors,&li);
+ while((ln = listNext(&li))) {
+ sds err = ln->value;
+ afterErrorReply(c, err, sdslen(err));
+ }
+}
+
/* Logically copy 'src' replica client buffers info to 'dst' replica.
* Basically increase referenced buffer block node reference count. */
void copyReplicaOutputBuffer(client *dst, client *src) {
@@ -1497,6 +1528,8 @@ void freeClient(client *c) {
freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
+ if (c->deferred_reply_errors)
+ listRelease(c->deferred_reply_errors);
/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
@@ -1863,6 +1896,10 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
+ if (c->deferred_reply_errors)
+ listRelease(c->deferred_reply_errors);
+ c->deferred_reply_errors = NULL;
+
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
diff --git a/src/server.h b/src/server.h
index 994e98c32..853e9f5fa 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1087,6 +1087,7 @@ typedef struct client {
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
+ list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
@@ -2428,6 +2429,7 @@ void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
+void deferredAfterErrorReply(client *c, list *errors);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl
index 523d7ba69..d782fab36 100644
--- a/tests/unit/moduleapi/blockedclient.tcl
+++ b/tests/unit/moduleapi/blockedclient.tcl
@@ -180,6 +180,26 @@ start_server {tags {"modules"}} {
assert_no_match "*name=myclient*" $clients
}
+ test {module client error stats} {
+ r config resetstat
+
+ assert_error "NULL reply returned" {r do_rm_call hgetalllll}
+ assert_equal [errorrstat NULL r] {count=1}
+
+ assert_error "NULL reply returned" {r do_bg_rm_call hgetalllll}
+ assert_equal [errorrstat NULL r] {count=2}
+
+ r do_rm_call set x x
+ assert_error "ERR wrong number of arguments for 'do_rm_call' command" {r do_rm_call}
+ assert_equal [errorrstat ERR r] {count=1}
+
+ assert_error "WRONGTYPE*" {r do_rm_call hgetall x}
+ assert_equal [errorrstat WRONGTYPE r] {count=1}
+
+ assert_error "WRONGTYPE*" {r do_bg_rm_call hgetall x}
+ assert_equal [errorrstat WRONGTYPE r] {count=2}
+ }
+
test "Unload the module - blockedclient" {
assert_equal {OK} [r module unload blockedclient]
}