diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 201 |
1 files changed, 108 insertions, 93 deletions
diff --git a/src/blocked.c b/src/blocked.c index c4aded0c5..497ffe4ce 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -398,14 +398,26 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { /* Helper function for handleClientsBlockedOnKeys(). This function is called * when there may be clients blocked on a stream key, and there may be new - * data to fetch (the key is ready). */ + * data to fetch (the key is ready). + * This function also handles the case where there may be clients blocked, + * via XREADGROUP, on an existing stream which was deleted. + * We need to unblock the clients in that case. + * The idea is that a client that is blocked via XREADGROUP is different from + * any other blocking type in the sense that it depends on the existence of both + * the key and the group. Even if the key is deleted and then revived with XADD + * it won't help any clients blocked on XREADGROUP because the group no longer + * exist, so they would fail with -NOGROUP anyway. + * The conclusion is that it's better to unblock these client (with error) upon + * the deletion of the key, rather than waiting for the first XADD.*/ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { /* Optimization: If no clients are in type BLOCKED_STREAM, * we can skip this loop. */ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - stream *s = o->ptr; + /* This function may be called with o=NULL (in order to unblock + * XREADGROUP clients whose key was deleted) */ + stream *s = o? o->ptr : NULL; /* We need to provide the new data arrived on the stream * to all the clients that are waiting for an offset smaller @@ -422,6 +434,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); streamID *gt = &bki->stream_id; + if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) { + /* If it's a blocking XREAD and the stream was either deleted + * or replaced with another key, we don't do anything (it's ok + * the the client blocks on a non-existing key). */ + continue; + } + long long prev_error_replies = server.stat_total_error_replies; client *old_client = server.current_client; server.current_client = receiver; @@ -439,6 +458,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { * otherwise. */ streamCG *group = NULL; if (receiver->bpop.xread_group) { + /* If it's a blocking XREADGROUP and the stream was either deleted + * or replaced with another key, we unblock the client */ + if (!o || o->type != OBJ_STREAM) { + addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); + goto unblock_receiver; + } group = streamLookupCG(s, receiver->bpop.xread_group->ptr); /* If the group was not found, send an error @@ -545,52 +570,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); - if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - moduleUnblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked, via XREADGROUP, on an existing stream which - * was deleted. We need to unblock the clients in that case. - * The idea is that a client that is blocked via XREADGROUP is different from - * any other blocking type in the sense that it depends on the existence of both - * the key and the group. Even if the key is deleted and then revived with XADD - * it won't help any clients blocked on XREADGROUP because the group no longer - * exist, so they would fail with -NOGROUP anyway. - * The conclusion is that it's better to unblock these client (with error) upon - * the deletion of the key, rather than waiting for the first XADD. */ -void unblockDeletedStreamReadgroupClients(readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group) - continue; - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); + if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) { + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + moduleUnblockClient(receiver); + } + /* We need to call afterCommand even if the client was not unblocked + * in order to propagate any changes that could have been done inside + * moduleTryServeClientBlockedOnKey */ afterCommand(receiver); server.current_client = old_client; } @@ -646,33 +632,35 @@ void handleClientsBlockedOnKeys(void) { /* Serve clients blocked on the key. */ robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); - if (o != NULL) { - int objtype = o->type; - if (objtype == OBJ_LIST) - serveClientsBlockedOnListKey(o,rl); - else if (objtype == OBJ_ZSET) - serveClientsBlockedOnSortedSetKey(o,rl); - else if (objtype == OBJ_STREAM) - serveClientsBlockedOnStreamKey(o,rl); - /* We want to serve clients blocked on module keys - * regardless of the object type: we don't know what the - * module is trying to accomplish right now. */ - serveClientsBlockedOnKeyByModule(rl); - /* If we have XREADGROUP clients blocked on this key, and - * the key is not a stream, it must mean that the key was - * overwritten by either SET or something like - * (MULTI, DEL key, SADD key e, EXEC). - * In this case we need to unblock all these clients. */ - if (objtype != OBJ_STREAM) - unblockDeletedStreamReadgroupClients(rl); - } else { - /* Unblock all XREADGROUP clients of this deleted key */ - unblockDeletedStreamReadgroupClients(rl); + if (!o) { /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to * take care of the propagation here, because afterCommand wasn't called */ - if (server.also_propagate.numops > 0) - propagatePendingCommands(); + propagatePendingCommands(); + } else { + if (o->type == OBJ_LIST) + serveClientsBlockedOnListKey(o,rl); + else if (o->type == OBJ_ZSET) + serveClientsBlockedOnSortedSetKey(o,rl); } + /* We need to try to serve stream clients even if the key no longer exists because + * XREADGROUP clients need to be unblocked in case the key is missing, either deleted + * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}. + * In this case we need to unblock all these clients. */ + serveClientsBlockedOnStreamKey(o,rl); + /* We want to serve clients blocked on module keys regardless of the object type, or + * whether the object exists or not: we don't know what the module is trying to + * accomplish right now. + * Please note that this function must be called only after handling non-module + * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o` + * to be stale. + * The scenario is that we have one client blocked on BLPOP while another module + * client is blocked by MODULE.SAME-AS-BLPOP on the same key. + * Of course we can call lookupKeyReadWithFlags again, but: + * 1) It takes CPU + * 2) It makes more sense to give priority to "native" blocking clients rather + * than module blocking clients + * */ + serveClientsBlockedOnKeyByModule(rl); /* Free this item. */ decrRefCount(rl->key); @@ -717,8 +705,7 @@ void handleClientsBlockedOnKeys(void) { * * 'count' for those commands that support the optional count argument. * Otherwise the value is 0. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) { - dictEntry *de; +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) { list *l; int j; @@ -745,20 +732,27 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blocking_keys,keys[j]); - if (de == NULL) { - int retval; - + dictEntry *de, *existing; + de = dictAddRaw(c->db->blocking_keys, keys[j], &existing); + if (de) { + incrRefCount(keys[j]); /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blocking_keys,keys[j],l); - incrRefCount(keys[j]); - serverAssertWithInfo(c,keys[j],retval == DICT_OK); + dictSetVal(c->db->blocking_keys, de, l); } else { - l = dictGetVal(de); + l = dictGetVal(existing); } listAddNodeTail(l,c); bki->listnode = listLast(l); + + /* We need to add the key to blocking_keys_unblock_on_nokey, if the client + * wants to be awakened if key is deleted (like XREADGROUP) */ + if (unblock_on_nokey) { + de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL); + if (de) { + incrRefCount(keys[j]); + } + } } blockClient(c,btype); } @@ -782,8 +776,10 @@ void unblockClientWaitingData(client *c) { serverAssertWithInfo(c,key,l != NULL); listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) + if (listLength(l) == 0) { dictDelete(c->db->blocking_keys,key); + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } } dictReleaseIterator(di); @@ -818,7 +814,7 @@ static int getBlockedTypeByType(int type) { * made by a script or in the context of MULTI/EXEC. * * The list will be finally processed by handleClientsBlockedOnKeys() */ -void signalKeyAsReady(redisDb *db, robj *key, int type) { +static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) { readyList *rl; /* Quick returns. */ @@ -836,11 +832,28 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { return; } - /* No clients blocking for this key? No need to queue it. */ - if (dictFind(db->blocking_keys,key) == NULL) return; + if (deleted) { + /* Key deleted and no clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL) + return; + /* Note: if we made it here it means the key is also present in db->blocking_keys */ + } else { + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys,key) == NULL) + return; + } - /* Key was already signaled? No need to queue it again. */ - if (dictFind(db->ready_keys,key) != NULL) return; + dictEntry *de, *existing; + de = dictAddRaw(db->ready_keys, key, &existing); + if (de) { + /* We add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + } else { + /* Key was already signaled? No need to queue it again. */ + return; + } /* Ok, we need to queue this key into server.ready_keys. */ rl = zmalloc(sizeof(*rl)); @@ -848,10 +861,12 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { rl->db = db; incrRefCount(key); listAddNodeTail(server.ready_keys,rl); +} - /* We also add the key in the db->ready_keys dictionary in order - * to avoid adding it multiple times into a list with a simple O(1) - * check. */ - incrRefCount(key); - serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); +void signalKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 0); +} + +void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 1); } |