summaryrefslogtreecommitdiff
path: root/test/test_assignors.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-16 14:28:40 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-16 14:28:40 -0800
commit9f0db5d38b444f5a93da7bed4a19114aff8701e8 (patch)
tree4ae6f29b5934995fc4d678d6461fd347eb17285d /test/test_assignors.py
parentd5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff)
parentc8be93b44bb0939dd512a72be578d42a4d7426b7 (diff)
downloadkafka-python-9f0db5d38b444f5a93da7bed4a19114aff8701e8.tar.gz
Merge pull request #550 from dpkp/range_assignor
Add RangePartitionAssignor (and use as default)
Diffstat (limited to 'test/test_assignors.py')
-rw-r--r--test/test_assignors.py58
1 files changed, 58 insertions, 0 deletions
diff --git a/test/test_assignors.py b/test/test_assignors.py
new file mode 100644
index 0000000..e2a1d4f
--- /dev/null
+++ b/test/test_assignors.py
@@ -0,0 +1,58 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.coordinator.assignors.range import RangePartitionAssignor
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.coordinator.protocol import (
+ ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
+
+
+@pytest.fixture
+def cluster(mocker):
+ cluster = mocker.MagicMock()
+ cluster.partitions_for_topic.return_value = set([0, 1, 2])
+ return cluster
+
+
+def test_assignor_roundrobin(cluster):
+ assignor = RoundRobinPartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()
+
+
+def test_assignor_range(cluster):
+ assignor = RangePartitionAssignor
+
+ member_metadata = {
+ 'C0': assignor.metadata(set(['t0', 't1'])),
+ 'C1': assignor.metadata(set(['t0', 't1'])),
+ }
+
+ ret = assignor.assign(cluster, member_metadata)
+ expected = {
+ 'C0': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
+ 'C1': ConsumerProtocolMemberAssignment(
+ assignor.version, [('t0', [2]), ('t1', [2])], b'')
+ }
+ assert ret == expected
+ assert set(ret) == set(expected)
+ for member in ret:
+ assert ret[member].encode() == expected[member].encode()