summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2020-06-05 10:23:26 +0200
committerantirez <antirez@gmail.com>2020-06-10 10:40:18 +0200
commit7a5762d34715506dc5aba3e010043c26c92834e2 (patch)
treeecb939c8d0dbb114e55bb756a846697c2a3f9afb
parente19c5c8cf3c8aa6cfd01cb9b1a9553ea2cd06621 (diff)
downloadredis-7a5762d34715506dc5aba3e010043c26c92834e2.tar.gz
TCC: threaded SINTER / SMEMBERS.
-rw-r--r--src/t_set.c114
1 files changed, 85 insertions, 29 deletions
diff --git a/src/t_set.c b/src/t_set.c
index eb0c22c06..39085789c 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -793,9 +793,19 @@ int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
return 0;
}
-void sinterGenericCommand(client *c, robj **setkeys,
- unsigned long setnum, robj *dstkey) {
- robj **sets = zmalloc(sizeof(robj*)*setnum);
+struct sinterThreadOptions {
+ unsigned long setnum;
+ int threaded;
+ robj **sets;
+ robj *dstkey; /* Only used if threaded is false. */
+};
+
+void sinterThreadedHalf(client *c, void *options) {
+ struct sinterThreadOptions *opt = options;
+ robj **sets = opt->sets;
+ unsigned long setnum = opt->setnum;
+ robj *dstkey = opt->dstkey;
+
setTypeIterator *si;
robj *dstset = NULL;
sds elesds;
@@ -804,29 +814,6 @@ void sinterGenericCommand(client *c, robj **setkeys,
unsigned long j, cardinality = 0;
int encoding;
- for (j = 0; j < setnum; j++) {
- robj *setobj = dstkey ?
- lookupKeyWrite(c->db,setkeys[j]) :
- lookupKeyRead(c->db,setkeys[j]);
- if (!setobj) {
- zfree(sets);
- if (dstkey) {
- if (dbDelete(c->db,dstkey)) {
- signalModifiedKey(c,c->db,dstkey);
- server.dirty++;
- }
- addReply(c,shared.czero);
- } else {
- addReply(c,shared.emptyset[c->resp]);
- }
- return;
- }
- if (checkType(c,setobj,OBJ_SET)) {
- zfree(sets);
- return;
- }
- sets[j] = setobj;
- }
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
@@ -898,7 +885,11 @@ void sinterGenericCommand(client *c, robj **setkeys,
if (dstkey) {
/* Store the resulting set into the target, if the intersection
- * is not an empty set. */
+ * is not an empty set.
+ *
+ * Note that this part is never called in the context of a thread
+ * and contains many API calls that are forbidden in the threaded
+ * half: we get here only in the case of synchronous execution. */
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
@@ -918,6 +909,67 @@ void sinterGenericCommand(client *c, robj **setkeys,
setDeferredSetLen(c,replylen,cardinality);
}
zfree(sets);
+ zfree(options);
+}
+
+void sinterGenericCommand(client *c, robj **setkeys,
+ unsigned long setnum, robj *dstkey) {
+ robj **sets = zmalloc(sizeof(robj*)*setnum);
+ int threaded_call_allowed = (dstkey == NULL);
+ int threaded = 0;
+ unsigned long j;
+
+ for (j = 0; j < setnum; j++) {
+ robj *setobj = dstkey ?
+ lookupKeyWrite(c->db,setkeys[j]) :
+ lookupKeyRead(c->db,setkeys[j]);
+ if (!setobj) {
+ zfree(sets);
+ if (dstkey) {
+ if (dbDelete(c->db,dstkey)) {
+ signalModifiedKey(c,c->db,dstkey);
+ server.dirty++;
+ }
+ addReply(c,shared.czero);
+ } else {
+ addReply(c,shared.emptyset[c->resp]);
+ }
+ return;
+ }
+ if (checkType(c,setobj,OBJ_SET)) {
+ zfree(sets);
+ return;
+ }
+ sets[j] = setobj;
+ if (threaded_call_allowed &&
+ setTypeSize(setobj) >= THREADED_SET_SIZE_THRESHOLD)
+ {
+ threaded = 1;
+ }
+ }
+
+ /* Try to lock the keys only if this is a threaded execution. If we
+ * fail we just revert to non threaded execution. */
+ if (threaded) {
+ for (j = 0; j < setnum; j++) {
+ if (lockKey(c,setkeys[j],LOCKEDKEY_READ,NULL) == C_ERR) {
+ unlockAllKeys(c);
+ threaded = 0;
+ }
+ }
+ }
+
+ struct sinterThreadOptions *opt = zmalloc(sizeof(*opt));
+ opt->setnum = setnum;
+ opt->sets = sets;
+ opt->dstkey = dstkey;
+ opt->threaded = threaded;
+
+ if (threaded) {
+ executeThreadedCommand(c,sinterThreadedHalf,opt);
+ } else {
+ sinterThreadedHalf(c,opt);
+ }
}
void sinterCommand(client *c) {
@@ -946,8 +998,8 @@ void sunionDiffThreadedHalf(client *c, void *options) {
int setnum = opt->setnum;
int op = opt->op;
int threaded = opt->threaded;
-
robj *dstkey = opt->dstkey;
+
setTypeIterator *si;
robj *dstset = NULL;
sds ele;
@@ -1072,7 +1124,11 @@ void sunionDiffThreadedHalf(client *c, void *options) {
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set
- * create this key with the result set inside */
+ * create this key with the result set inside.
+ *
+ * Note that this part is never called in the context of a thread
+ * and contains many API calls that are forbidden in the threaded
+ * half: we get here only in the case of synchronous execution. */
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);