diff options
author | Valeria Chernenko <aynroot@users.noreply.github.com> | 2020-09-30 06:03:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-29 21:03:54 -0700 |
commit | c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93 (patch) | |
tree | 40b412379666620a8a5173932652e94bdc9439b2 /kafka/coordinator/consumer.py | |
parent | cb96a1a6c79c17ac9b3399b7a33bbaea7ad8886f (diff) | |
download | kafka-python-c536dd28bc3c2db85d9b62a1e73d23a3eeaebd93.tar.gz |
KIP-54: Implement sticky partition assignment strategy (#2057)
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index fda80aa..971f5e8 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -11,6 +11,7 @@ from kafka.vendor import six from kafka.coordinator.base import BaseCoordinator, Generation from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol import kafka.errors as Errors from kafka.future import Future @@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, - 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor), 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -234,6 +235,8 @@ class ConsumerCoordinator(BaseCoordinator): # give the assignor a chance to update internal state # based on the received assignment assignor.on_assignment(assignment) + if assignor.name == 'sticky': + assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval |