summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-04-14 16:09:32 +0200
committerantirez <antirez@gmail.com>2014-04-14 16:09:32 +0200
commit81ceef7d22bcc1a30bce029ffae1a53f4752b7ed (patch)
treeab801ee6dc6c4cca116b551941878d71b3f80b1f
parent9df77fc0c4586a7b49605a85a6e68fd1a5273a20 (diff)
downloadredis-81ceef7d22bcc1a30bce029ffae1a53f4752b7ed.tar.gz
PFMERGE fixed to work with sparse encoding.
-rw-r--r--src/hyperloglog.c53
-rw-r--r--tests/unit/hyperloglog.tcl4
2 files changed, 47 insertions, 10 deletions
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index fe0a5d7a0..9b8968c1e 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -1130,7 +1130,7 @@ void pfcountCommand(redisClient *c) {
/* Recompute it and update the cached value. */
card = hllCount(hdr,&invalid);
if (invalid) {
- addReplyError(c,"Invalid HLL object");
+ addReplyError(c,"Invalid HLL object detected");
return;
}
hdr->card[0] = card & 0xff;
@@ -1163,8 +1163,6 @@ void pfmergeCommand(redisClient *c) {
* it to the target variable later. */
memset(max,0,sizeof(max));
for (j = 1; j < c->argc; j++) {
- uint8_t val;
-
/* Check type and size. */
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) continue; /* Assume empty HLL for non existing var. */
@@ -1173,14 +1171,47 @@ void pfmergeCommand(redisClient *c) {
/* Merge with this HLL with our 'max' HHL by setting max[i]
* to MAX(max[i],hll[i]). */
hdr = o->ptr;
- for (i = 0; i < HLL_REGISTERS; i++) {
- HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
- if (val > max[i]) max[i] = val;
+ if (hdr->encoding == HLL_DENSE) {
+ uint8_t val;
+
+ for (i = 0; i < HLL_REGISTERS; i++) {
+ HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
+ if (val > max[i]) max[i] = val;
+ }
+ } else {
+ uint8_t *p = o->ptr, *end = p + sdslen(o->ptr);
+ long runlen, regval;
+
+ p += HLL_HDR_SIZE;
+ i = 0;
+ while(p < end) {
+ if (HLL_SPARSE_IS_ZERO(p)) {
+ runlen = HLL_SPARSE_ZERO_LEN(p);
+ i += runlen;
+ p++;
+ } else if (HLL_SPARSE_IS_XZERO(p)) {
+ runlen = HLL_SPARSE_XZERO_LEN(p);
+ i += runlen;
+ p += 2;
+ } else {
+ runlen = HLL_SPARSE_VAL_LEN(p);
+ regval = HLL_SPARSE_VAL_VALUE(p);
+ while(runlen--) {
+ if (regval > max[i]) max[i] = regval;
+ i++;
+ }
+ p++;
+ }
+ }
+ if (i != HLL_REGISTERS) {
+ addReplyError(c,"Invalid HLL object detected");
+ return;
+ }
}
}
/* Create / unshare the destination key's value if needed. */
- robj *o = lookupKeyRead(c->db,c->argv[1]);
+ robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
/* Create the key with a string value of the exact length to
* hold our HLL data structure. sdsnewlen() when NULL is passed
@@ -1194,6 +1225,12 @@ void pfmergeCommand(redisClient *c) {
o = dbUnshareStringValue(c->db,c->argv[1],o);
}
+ /* Only support dense objects as destination. */
+ if (hllSparseToDense(o) == REDIS_ERR) {
+ addReplyError(c,"Invalid HLL object detected");
+ return;
+ }
+
/* Write the resulting HLL to the destination HLL registers and
* invalidate the cached value. */
hdr = o->ptr;
@@ -1308,7 +1345,7 @@ void pfdebugCommand(redisClient *c) {
if (hdr->encoding == HLL_SPARSE) {
if (hllSparseToDense(o) == REDIS_ERR) {
- addReplyError(c,"HLL sparse encoding is corrupted");
+ addReplyError(c,"Invalid HLL object detected");
return;
}
server.dirty++; /* Force propagation on encoding change. */
diff --git a/tests/unit/hyperloglog.tcl b/tests/unit/hyperloglog.tcl
index e241288dd..07f8b7772 100644
--- a/tests/unit/hyperloglog.tcl
+++ b/tests/unit/hyperloglog.tcl
@@ -60,9 +60,9 @@ start_server {tags {"hll"}} {
r pfcount hll
} {5}
- test {PFGETREG returns the HyperLogLog raw registers} {
+ test {PFDEBUG GETREG returns the HyperLogLog raw registers} {
r del hll
r pfadd hll 1 2 3
- llength [r pfgetreg hll]
+ llength [r pfdebug getreg hll]
} {16384}
}