summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-06-06 18:04:04 +0200
committerantirez <antirez@gmail.com>2014-06-06 18:05:22 +0200
commit932cc3ebe5fc19a37b94fad1d47a027d6a17f3b0 (patch)
tree7426dd655f35ed01e0dd5585c291f1d387b9b656
parentfe85d0600aa3e2368dacd4f12311ffba16ae0209 (diff)
downloadredis-932cc3ebe5fc19a37b94fad1d47a027d6a17f3b0.tar.gz
ZUNIONSTORE reimplemented for speed.
The user @kjmph provided excellent ideas to improve speed of ZUNIONSTORE (in certain cases by many order of magnitude), together with an implementation of the ideas. While the ideas were sounding, the implementation could be improved both in terms of speed and clearness, so that's my attempt at reimplementing the speedup proposed, trying to improve by directly using just a dictionary with an embedded score inside, and reusing the single-pass aggregate + order-later approach. Note that you can't apply this commit without applying the previous commit in this branch that adds a double in the dictEntry value union. Issue #1786.
-rw-r--r--src/t_zset.c93
1 files changed, 60 insertions, 33 deletions
diff --git a/src/t_zset.c b/src/t_zset.c
index 8bb5bde6d..e567e3bb0 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1981,51 +1981,78 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
zuiClearIterator(&src[0]);
}
} else if (op == REDIS_OP_UNION) {
+ dict *accumulator = dictCreate(&setDictType,NULL);
+ dictIterator *di;
+ dictEntry *de;
+ double score;
+
+ if (setnum) {
+ /* Our union is at least as large as the largest set.
+ * Resize the dictionary ASAP to avoid useless rehashing. */
+ int minlen = setnum ? zuiLength(&src[setnum-1]) : 0;
+ dictExpand(accumulator,minlen);
+ }
+
+ /* Step 1: Create a dictionary of elements -> aggregated-scores
+ * by iterating one sorted set after the other. */
for (i = 0; i < setnum; i++) {
- if (zuiLength(&src[i]) == 0)
- continue;
+ if (zuiLength(&src[i]) == 0) continue;
zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) {
- double score, value;
-
- /* Skip an element that when already processed */
- if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
- continue;
-
- /* Initialize score */
+ /* Initialize value */
score = src[i].weight * zval.score;
if (isnan(score)) score = 0;
- /* We need to check only next sets to see if this element
- * exists, since we process every element just one time so
- * it can't exist in a previous set (otherwise it would be
- * already processed). */
- for (j = (i+1); j < setnum; j++) {
- /* It is not safe to access the zset we are
- * iterating, so explicitly check for equal object. */
- if(src[j].subject == src[i].subject) {
- value = zval.score*src[j].weight;
- zunionInterAggregate(&score,value,aggregate);
- } else if (zuiFind(&src[j],&zval,&value)) {
- value *= src[j].weight;
- zunionInterAggregate(&score,value,aggregate);
+ /* Search for this element in the accumulating dictionary. */
+ de = dictFind(accumulator,zuiObjectFromValue(&zval));
+ /* If we don't have it, we need to create a new entry. */
+ if (de == NULL) {
+ tmp = zuiObjectFromValue(&zval);
+ /* Remember the longest single element encountered,
+ * to understand if it's possible to convert to ziplist
+ * at the end. */
+ if (sdsEncodedObject(tmp)) {
+ if (sdslen(tmp->ptr) > maxelelen)
+ maxelelen = sdslen(tmp->ptr);
}
- }
-
- tmp = zuiObjectFromValue(&zval);
- znode = zslInsert(dstzset->zsl,score,tmp);
- incrRefCount(zval.ele); /* added to skiplist */
- dictAdd(dstzset->dict,tmp,&znode->score);
- incrRefCount(zval.ele); /* added to dictionary */
-
- if (sdsEncodedObject(tmp)) {
- if (sdslen(tmp->ptr) > maxelelen)
- maxelelen = sdslen(tmp->ptr);
+ /* Add the element with its initial score. */
+ de = dictAddRaw(accumulator,tmp);
+ incrRefCount(tmp);
+ dictSetDoubleVal(de,score);
+ } else {
+ /* Update the score with the score of the new instance
+ * of the element found in the current sorted set.
+ *
+ * Here we access directly the dictEntry double
+ * value inside the union as it is a big speedup
+ * compared to using the getDouble/setDouble API. */
+ zunionInterAggregate(&de->v.d,score,aggregate);
}
}
zuiClearIterator(&src[i]);
}
+
+ /* Step 2: convert the dictionary into the final sorted set. */
+ di = dictGetIterator(accumulator);
+
+ /* We now are aware of the final size of the resulting sorted set,
+ * let's resize the dictionary embedded inside the sorted set to the
+ * right size, in order to save rehashing time. */
+ dictExpand(dstzset->dict,dictSize(accumulator));
+
+ while((de = dictNext(di)) != NULL) {
+ robj *ele = dictGetKey(de);
+ score = dictGetDoubleVal(de);
+ znode = zslInsert(dstzset->zsl,score,ele);
+ incrRefCount(ele); /* added to skiplist */
+ dictAdd(dstzset->dict,ele,&znode->score);
+ incrRefCount(ele); /* added to dictionary */
+ }
+ dictReleaseIterator(di);
+
+ /* We can free the accumulator dictionary now. */
+ dictRelease(accumulator);
} else {
redisPanic("Unknown operator");
}