summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c46
1 files changed, 30 insertions, 16 deletions
diff --git a/src/blocked.c b/src/blocked.c
index b7506173d..d8413b80c 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -67,6 +67,21 @@
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
+/* This structure represents the blocked key information that we store
+ * in the client structure. Each client blocked on keys, has a
+ * client->bpop.keys hash table. The keys of the hash table are Redis
+ * keys pointers to 'robj' structures. The value is this structure.
+ * The structure has two goals: firstly we store the list node that this
+ * client uses to be listed in the database "blocked clients for this key"
+ * list, so we can later unblock in O(1) without a list scan.
+ * Secondly for certain blocking types, we have additional info. Right now
+ * the only use for additional info we have is when clients are blocked
+ * on streams, as we have to remember the ID it blocked for. */
+typedef struct bkinfo {
+ listNode *listnode; /* List node for db->blocking_keys[key] list. */
+ streamID stream_id; /* Stream ID if we blocked in a stream. */
+} bkinfo;
+
/* Get a timeout value from an object and store it into 'timeout'.
* The final timeout is always stored as milliseconds as a time where the
* timeout will expire, however the parsing is performed according to
@@ -291,8 +306,7 @@ void handleClientsBlockedOnKeys(void) {
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
+ listRotateHeadToTail(clients);
continue;
}
@@ -353,8 +367,7 @@ void handleClientsBlockedOnKeys(void) {
if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
- listDelNode(clients,clientnode);
- listAddNodeTail(clients,receiver);
+ listRotateHeadToTail(clients);
continue;
}
@@ -398,8 +411,9 @@ void handleClientsBlockedOnKeys(void) {
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM) continue;
- streamID *gt = dictFetchValue(receiver->bpop.keys,
- rl->key);
+ bkinfo *bki = dictFetchValue(receiver->bpop.keys,
+ rl->key);
+ streamID *gt = &bki->stream_id;
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
@@ -516,17 +530,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
- /* The value associated with the key name in the bpop.keys dictionary
- * is NULL for lists and sorted sets, or the stream ID for streams. */
- void *key_data = NULL;
- if (btype == BLOCKED_STREAM) {
- key_data = zmalloc(sizeof(streamID));
- memcpy(key_data,ids+j,sizeof(streamID));
- }
+ /* Allocate our bkinfo structure, associated to each key the client
+ * is blocked for. */
+ bkinfo *bki = zmalloc(sizeof(*bki));
+ if (btype == BLOCKED_STREAM)
+ bki->stream_id = ids[j];
/* If the key already exists in the dictionary ignore it. */
- if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
- zfree(key_data);
+ if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
+ zfree(bki);
continue;
}
incrRefCount(keys[j]);
@@ -545,6 +557,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
l = dictGetVal(de);
}
listAddNodeTail(l,c);
+ bki->listnode = listLast(l);
}
blockClient(c,btype);
}
@@ -561,11 +574,12 @@ void unblockClientWaitingData(client *c) {
/* The client may wait for multiple keys, so unblock it for every key. */
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
+ bkinfo *bki = dictGetVal(de);
/* Remove this client from the list of clients waiting for this key. */
l = dictFetchValue(c->db->blocking_keys,key);
serverAssertWithInfo(c,key,l != NULL);
- listDelNode(l,listSearchKey(l,c));
+ listDelNode(l,bki->listnode);
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
dictDelete(c->db->blocking_keys,key);