summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c22
1 files changed, 8 insertions, 14 deletions
diff --git a/src/networking.c b/src/networking.c
index 054bca6a0..ed4f88581 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -165,6 +165,7 @@ client *createClient(connection *conn) {
c->flags = 0;
c->slot = -1;
c->ctime = c->lastinteraction = server.unixtime;
+ c->duration = 0;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
c->repl_start_cmd_stream_on_ack = 0;
@@ -184,15 +185,7 @@ client *createClient(connection *conn) {
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->btype = BLOCKED_NONE;
- c->bpop.timeout = 0;
- c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
- c->bpop.target = NULL;
- c->bpop.xread_group = NULL;
- c->bpop.xread_consumer = NULL;
- c->bpop.xread_group_noack = 0;
- c->bpop.numreplicas = 0;
- c->bpop.reploffset = 0;
+ initClientBlockingState(c);
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
@@ -1588,7 +1581,7 @@ void freeClient(client *c) {
/* Deallocate structures used to block on blocking ops. */
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
- dictRelease(c->bpop.keys);
+ dictRelease(c->bstate.keys);
/* UNWATCH all the keys */
unwatchAllKeys(c);
@@ -2033,6 +2026,8 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
+ c->duration = 0;
+ c->flags &= ~CLIENT_EXECUTING_COMMAND;
if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
@@ -3142,12 +3137,11 @@ NULL
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (unblock_error)
- addReplyError(target,
+ unblockClientOnError(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
else
- replyToBlockedClientTimedOut(target);
- unblockClient(target);
- updateStatsOnUnblock(target, 0, 0, 1);
+ unblockClientOnTimeout(target);
+
addReply(c,shared.cone);
} else {
addReply(c,shared.czero);