diff options
author | antirez <antirez@gmail.com> | 2020-06-05 10:23:26 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2020-06-10 10:40:18 +0200 |
commit | 7a5762d34715506dc5aba3e010043c26c92834e2 (patch) | |
tree | ecb939c8d0dbb114e55bb756a846697c2a3f9afb | |
parent | e19c5c8cf3c8aa6cfd01cb9b1a9553ea2cd06621 (diff) | |
download | redis-7a5762d34715506dc5aba3e010043c26c92834e2.tar.gz |
TCC: threaded SINTER / SMEMBERS.
-rw-r--r-- | src/t_set.c | 114 |
1 files changed, 85 insertions, 29 deletions
diff --git a/src/t_set.c b/src/t_set.c index eb0c22c06..39085789c 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -793,9 +793,19 @@ int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) { return 0; } -void sinterGenericCommand(client *c, robj **setkeys, - unsigned long setnum, robj *dstkey) { - robj **sets = zmalloc(sizeof(robj*)*setnum); +struct sinterThreadOptions { + unsigned long setnum; + int threaded; + robj **sets; + robj *dstkey; /* Only used if threaded is false. */ +}; + +void sinterThreadedHalf(client *c, void *options) { + struct sinterThreadOptions *opt = options; + robj **sets = opt->sets; + unsigned long setnum = opt->setnum; + robj *dstkey = opt->dstkey; + setTypeIterator *si; robj *dstset = NULL; sds elesds; @@ -804,29 +814,6 @@ void sinterGenericCommand(client *c, robj **setkeys, unsigned long j, cardinality = 0; int encoding; - for (j = 0; j < setnum; j++) { - robj *setobj = dstkey ? - lookupKeyWrite(c->db,setkeys[j]) : - lookupKeyRead(c->db,setkeys[j]); - if (!setobj) { - zfree(sets); - if (dstkey) { - if (dbDelete(c->db,dstkey)) { - signalModifiedKey(c,c->db,dstkey); - server.dirty++; - } - addReply(c,shared.czero); - } else { - addReply(c,shared.emptyset[c->resp]); - } - return; - } - if (checkType(c,setobj,OBJ_SET)) { - zfree(sets); - return; - } - sets[j] = setobj; - } /* Sort sets from the smallest to largest, this will improve our * algorithm's performance */ qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); @@ -898,7 +885,11 @@ void sinterGenericCommand(client *c, robj **setkeys, if (dstkey) { /* Store the resulting set into the target, if the intersection - * is not an empty set. */ + * is not an empty set. + * + * Note that this part is never called in the context of a thread + * and contains many API calls that are forbidden in the threaded + * half: we get here only in the case of synchronous execution. */ int deleted = dbDelete(c->db,dstkey); if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); @@ -918,6 +909,67 @@ void sinterGenericCommand(client *c, robj **setkeys, setDeferredSetLen(c,replylen,cardinality); } zfree(sets); + zfree(options); +} + +void sinterGenericCommand(client *c, robj **setkeys, + unsigned long setnum, robj *dstkey) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + int threaded_call_allowed = (dstkey == NULL); + int threaded = 0; + unsigned long j; + + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); + if (!setobj) { + zfree(sets); + if (dstkey) { + if (dbDelete(c->db,dstkey)) { + signalModifiedKey(c,c->db,dstkey); + server.dirty++; + } + addReply(c,shared.czero); + } else { + addReply(c,shared.emptyset[c->resp]); + } + return; + } + if (checkType(c,setobj,OBJ_SET)) { + zfree(sets); + return; + } + sets[j] = setobj; + if (threaded_call_allowed && + setTypeSize(setobj) >= THREADED_SET_SIZE_THRESHOLD) + { + threaded = 1; + } + } + + /* Try to lock the keys only if this is a threaded execution. If we + * fail we just revert to non threaded execution. */ + if (threaded) { + for (j = 0; j < setnum; j++) { + if (lockKey(c,setkeys[j],LOCKEDKEY_READ,NULL) == C_ERR) { + unlockAllKeys(c); + threaded = 0; + } + } + } + + struct sinterThreadOptions *opt = zmalloc(sizeof(*opt)); + opt->setnum = setnum; + opt->sets = sets; + opt->dstkey = dstkey; + opt->threaded = threaded; + + if (threaded) { + executeThreadedCommand(c,sinterThreadedHalf,opt); + } else { + sinterThreadedHalf(c,opt); + } } void sinterCommand(client *c) { @@ -946,8 +998,8 @@ void sunionDiffThreadedHalf(client *c, void *options) { int setnum = opt->setnum; int op = opt->op; int threaded = opt->threaded; - robj *dstkey = opt->dstkey; + setTypeIterator *si; robj *dstset = NULL; sds ele; @@ -1072,7 +1124,11 @@ void sunionDiffThreadedHalf(client *c, void *options) { decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set - * create this key with the result set inside */ + * create this key with the result set inside. + * + * Note that this part is never called in the context of a thread + * and contains many API calls that are forbidden in the threaded + * half: we get here only in the case of synchronous execution. */ int deleted = dbDelete(c->db,dstkey); if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); |