summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2014-04-17 17:08:43 +0200
committerantirez <antirez@gmail.com>2014-04-17 17:08:43 +0200
commitfcd2155b6f6d0beb88ba3b008e23a9c366af5c9f (patch)
tree93a15c4827eaa2b6b6f1718271739eb91826a12d
parent78954ca3a2a3b4fdb731bd43b7cbab2d3a619871 (diff)
downloadredis-fcd2155b6f6d0beb88ba3b008e23a9c366af5c9f.tar.gz
HyperLogLog low level merge extracted from PFMERGE.
-rw-r--r--src/hyperloglog.c93
1 files changed, 54 insertions, 39 deletions
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index 838c921db..1b984c15f 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -991,6 +991,55 @@ int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
}
}
+/* Merge by computing MAX(registers[i],hll[i]) the HyperLogLog 'hll'
+ * with an array of uint8_t HLL_REGISTERS registers pointed by 'max'.
+ *
+ * The hll object must be already validated via isHLLObjectOrReply()
+ * or in some other way.
+ *
+ * If the HyperLogLog is sparse and is found to be invalid, REDIS_ERR
+ * is returned, otherwise the function always succeeds. */
+int hllMerge(uint8_t *max, robj *hll) {
+ struct hllhdr *hdr = hll->ptr;
+ int i;
+
+ if (hdr->encoding == HLL_DENSE) {
+ uint8_t val;
+
+ for (i = 0; i < HLL_REGISTERS; i++) {
+ HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
+ if (val > max[i]) max[i] = val;
+ }
+ } else {
+ uint8_t *p = hll->ptr, *end = p + sdslen(hll->ptr);
+ long runlen, regval;
+
+ p += HLL_HDR_SIZE;
+ i = 0;
+ while(p < end) {
+ if (HLL_SPARSE_IS_ZERO(p)) {
+ runlen = HLL_SPARSE_ZERO_LEN(p);
+ i += runlen;
+ p++;
+ } else if (HLL_SPARSE_IS_XZERO(p)) {
+ runlen = HLL_SPARSE_XZERO_LEN(p);
+ i += runlen;
+ p += 2;
+ } else {
+ runlen = HLL_SPARSE_VAL_LEN(p);
+ regval = HLL_SPARSE_VAL_VALUE(p);
+ while(runlen--) {
+ if (regval > max[i]) max[i] = regval;
+ i++;
+ }
+ p++;
+ }
+ }
+ if (i != HLL_REGISTERS) return REDIS_ERR;
+ }
+ return REDIS_OK;
+}
+
/* ========================== HyperLogLog commands ========================== */
/* Create an HLL object. We always create the HLL using sparse encoding.
@@ -1157,7 +1206,7 @@ void pfcountCommand(redisClient *c) {
void pfmergeCommand(redisClient *c) {
uint8_t max[HLL_REGISTERS];
struct hllhdr *hdr;
- int j, i;
+ int j;
/* Compute an HLL with M[i] = MAX(M[i]_j).
* We we the maximum into the max array of registers. We'll write
@@ -1171,43 +1220,9 @@ void pfmergeCommand(redisClient *c) {
/* Merge with this HLL with our 'max' HHL by setting max[i]
* to MAX(max[i],hll[i]). */
- hdr = o->ptr;
- if (hdr->encoding == HLL_DENSE) {
- uint8_t val;
-
- for (i = 0; i < HLL_REGISTERS; i++) {
- HLL_DENSE_GET_REGISTER(val,hdr->registers,i);
- if (val > max[i]) max[i] = val;
- }
- } else {
- uint8_t *p = o->ptr, *end = p + sdslen(o->ptr);
- long runlen, regval;
-
- p += HLL_HDR_SIZE;
- i = 0;
- while(p < end) {
- if (HLL_SPARSE_IS_ZERO(p)) {
- runlen = HLL_SPARSE_ZERO_LEN(p);
- i += runlen;
- p++;
- } else if (HLL_SPARSE_IS_XZERO(p)) {
- runlen = HLL_SPARSE_XZERO_LEN(p);
- i += runlen;
- p += 2;
- } else {
- runlen = HLL_SPARSE_VAL_LEN(p);
- regval = HLL_SPARSE_VAL_VALUE(p);
- while(runlen--) {
- if (regval > max[i]) max[i] = regval;
- i++;
- }
- p++;
- }
- }
- if (i != HLL_REGISTERS) {
- addReplySds(c,sdsnew(invalid_hll_err));
- return;
- }
+ if (hllMerge(max,o) == REDIS_ERR) {
+ addReplySds(c,sdsnew(invalid_hll_err));
+ return;
}
}
@@ -1241,7 +1256,7 @@ void pfmergeCommand(redisClient *c) {
HLL_INVALIDATE_CACHE(hdr);
signalModifiedKey(c->db,c->argv[1]);
- /* We generate an HLLADD event for HLLMERGE for semantical simplicity
+ /* We generate an PFADD event for PFMERGE for semantical simplicity
* since in theory this is a mass-add of elements. */
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
server.dirty++;