diff options
Diffstat (limited to 'deps/hiredis/async.c')
-rw-r--r-- | deps/hiredis/async.c | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index d955203f8..0cecd30d9 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -336,7 +336,8 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) { if (ac->err == 0) { /* For clean disconnects, there should be no pending callbacks. */ - assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR); + int ret = __redisShiftCallback(&ac->replies,NULL); + assert(ret == REDIS_ERR); } else { /* Disconnection is caused by an error, make sure that pending * callbacks cannot call new commands. */ @@ -364,6 +365,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { redisContext *c = &(ac->c); dict *callbacks; + redisCallback *cb; dictEntry *de; int pvariant; char *stype; @@ -387,16 +389,28 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); de = dictFind(callbacks,sname); if (de != NULL) { - memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); + cb = dictGetEntryVal(de); + + /* If this is an subscribe reply decrease pending counter. */ + if (strcasecmp(stype+pvariant,"subscribe") == 0) { + cb->pending_subs -= 1; + } + + memcpy(dstcb,cb,sizeof(*dstcb)); /* If this is an unsubscribe message, remove it. */ if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - dictDelete(callbacks,sname); + if (cb->pending_subs == 0) + dictDelete(callbacks,sname); /* If this was the last unsubscribe message, revert to * non-subscribe mode. */ assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - if (reply->element[2]->integer == 0) + + /* Unset subscribed flag only when no pipelined pending subscribe. */ + if (reply->element[2]->integer == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0) c->flags &= ~REDIS_SUBSCRIBED; } } @@ -410,7 +424,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, NULL}; + redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -492,22 +506,22 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * write event fires. When connecting was not successful, the connect callback * is called with a REDIS_ERR status and the context is free'd. */ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { + int completed = 0; redisContext *c = &(ac->c); - - if (redisCheckSocketError(c) == REDIS_ERR) { - /* Try again later when connect(2) is still in progress. */ - if (errno == EINPROGRESS) - return REDIS_OK; - - if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); + if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { + /* Error! */ + redisCheckSocketError(c); + if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); __redisAsyncDisconnect(ac); return REDIS_ERR; + } else if (completed == 1) { + /* connected! */ + if (ac->onConnect) ac->onConnect(ac, REDIS_OK); + c->flags |= REDIS_CONNECTED; + return REDIS_OK; + } else { + return REDIS_OK; } - - /* Mark context as connected. */ - c->flags |= REDIS_CONNECTED; - if (ac->onConnect) ac->onConnect(ac,REDIS_OK); - return REDIS_OK; } /* This function should be called when the socket is readable. @@ -583,6 +597,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; + struct dict *cbdict; + dictEntry *de; + redisCallback *existcb; int pvariant, hasnext; const char *cstr, *astr; size_t clen, alen; @@ -596,6 +613,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* Setup callback */ cb.fn = fn; cb.privdata = privdata; + cb.pending_subs = 1; /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); @@ -612,9 +630,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void while ((p = nextArgument(p,&astr,&alen)) != NULL) { sname = sdsnewlen(astr,alen); if (pvariant) - ret = dictReplace(ac->sub.patterns,sname,&cb); + cbdict = ac->sub.patterns; else - ret = dictReplace(ac->sub.channels,sname,&cb); + cbdict = ac->sub.channels; + + de = dictFind(cbdict,sname); + + if (de != NULL) { + existcb = dictGetEntryVal(de); + cb.pending_subs = existcb->pending_subs + 1; + } + + ret = dictReplace(cbdict,sname,&cb); if (ret == 0) sdsfree(sname); } @@ -676,6 +703,8 @@ int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *priv int len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); + if (len < 0) + return REDIS_ERR; status = __redisAsyncCommand(ac,fn,privdata,cmd,len); sdsfree(cmd); return status; |