summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index d6ea6c0..e147b6f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -89,7 +89,7 @@ class BaseCoordinator(object):
self.group_id = self.config['group_id']
self.coordinator_id = None
self.rejoin_needed = True
- self.needs_join_prepare = True
+ self.rejoining = False
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
@@ -235,9 +235,9 @@ class BaseCoordinator(object):
if not self.need_rejoin():
return
- if self.needs_join_prepare:
+ if not self.rejoining:
self._on_join_prepare(self.generation, self.member_id)
- self.needs_join_prepare = False
+ self.rejoining = True
while self.need_rejoin():
self.ensure_coordinator_known()
@@ -249,7 +249,7 @@ class BaseCoordinator(object):
member_assignment_bytes = future.value
self._on_join_complete(self.generation, self.member_id,
self.protocol, member_assignment_bytes)
- self.needs_join_prepare = True
+ self.rejoining = False
self.heartbeat_task.reset()
else:
assert future.failed()