summaryrefslogtreecommitdiff
path: root/kafka/partitioner/default.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-10-11 12:03:22 -0700
committerGitHub <noreply@github.com>2019-10-11 12:03:22 -0700
commit3631bfa009a28767a2057c9beee470acaa6597d5 (patch)
treee10b73861a33d83a95b6496ef3074ee3caeaae41 /kafka/partitioner/default.py
parent6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff)
downloadkafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing: * `SimpleClient` * `SimpleConsumer` * `SimpleProducer` * Old partitioners used by `SimpleProducer`; these are superceded by the `DefaultPartitioner` These have been deprecated for several years in favor of `KafkaClient` / `KafkaConsumer` / `KafkaProducer`. Since 2.0 allows breaking changes, we are removing the deprecated classes. Additionally, since the only usage of `unittest` was in tests for these old Simple* clients, this also drops `unittest` from the library. All tests now run under `pytest`.
Diffstat (limited to 'kafka/partitioner/default.py')
-rw-r--r--kafka/partitioner/default.py72
1 files changed, 71 insertions, 1 deletions
diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py
index e4d9df5..d0914c6 100644
--- a/kafka/partitioner/default.py
+++ b/kafka/partitioner/default.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
import random
-from kafka.partitioner.hashed import murmur2
+from kafka.vendor import six
class DefaultPartitioner(object):
@@ -30,3 +30,73 @@ class DefaultPartitioner(object):
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
+
+
+# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
+def murmur2(data):
+ """Pure-python Murmur2 implementation.
+
+ Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
+
+ Args:
+ data (bytes): opaque bytes
+
+ Returns: MurmurHash2 of data
+ """
+ # Python2 bytes is really a str, causing the bitwise operations below to fail
+ # so convert to bytearray.
+ if six.PY2:
+ data = bytearray(bytes(data))
+
+ length = len(data)
+ seed = 0x9747b28c
+ # 'm' and 'r' are mixing constants generated offline.
+ # They're not really 'magic', they just happen to work well.
+ m = 0x5bd1e995
+ r = 24
+
+ # Initialize the hash to a random value
+ h = seed ^ length
+ length4 = length // 4
+
+ for i in range(length4):
+ i4 = i * 4
+ k = ((data[i4 + 0] & 0xff) +
+ ((data[i4 + 1] & 0xff) << 8) +
+ ((data[i4 + 2] & 0xff) << 16) +
+ ((data[i4 + 3] & 0xff) << 24))
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+ k ^= (k % 0x100000000) >> r # k ^= k >>> r
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+
+ h *= m
+ h &= 0xffffffff
+ h ^= k
+ h &= 0xffffffff
+
+ # Handle the last few bytes of the input array
+ extra_bytes = length % 4
+ if extra_bytes >= 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16
+ h &= 0xffffffff
+ if extra_bytes >= 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8
+ h &= 0xffffffff
+ if extra_bytes >= 1:
+ h ^= (data[length & ~3] & 0xff)
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+
+ h ^= (h % 0x100000000) >> 13 # h >>> 13;
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+ h ^= (h % 0x100000000) >> 15 # h >>> 15;
+ h &= 0xffffffff
+
+ return h