summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-01-11 14:48:38 -0800
committerGitHub <noreply@github.com>2018-01-11 14:48:38 -0800
commitda65a562bdd9ce20290d4375acc36b4977ef7026 (patch)
tree4db46fe723673a3cde6bb96b6da6c569c1eac2e4
parenta69320b8e3199fa9d7cfa3947a242e699a045c3b (diff)
downloadkafka-python-da65a562bdd9ce20290d4375acc36b4977ef7026.tar.gz
Fix coordinator join_future race condition (#1338)
* Fix race condition in coordinator join_future handling
-rw-r--r--kafka/coordinator/base.py12
-rw-r--r--test/test_coordinator.py13
2 files changed, 21 insertions, 4 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 30b9c40..24412c9 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -377,19 +377,23 @@ class BaseCoordinator(object):
# before the pending rebalance has completed.
if self.join_future is None:
self.state = MemberState.REBALANCING
- self.join_future = self._send_join_group_request()
+ future = self._send_join_group_request()
+
+ self.join_future = future # this should happen before adding callbacks
# handle join completion in the callback so that the
# callback will be invoked even if the consumer is woken up
# before finishing the rebalance
- self.join_future.add_callback(self._handle_join_success)
+ future.add_callback(self._handle_join_success)
# we handle failures below after the request finishes.
# If the join completes after having been woken up, the
# exception is ignored and we will rejoin
- self.join_future.add_errback(self._handle_join_failure)
+ future.add_errback(self._handle_join_failure)
+
+ else:
+ future = self.join_future
- future = self.join_future
self._client.poll(future=future)
if future.failed():
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 7dc0e04..f567369 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -620,3 +620,16 @@ def test_lookup_coordinator_failure(mocker, coordinator):
return_value=Future().failure(Exception('foobar')))
future = coordinator.lookup_coordinator()
assert future.failed()
+
+
+def test_ensure_active_group(mocker, coordinator):
+ coordinator._subscription.subscribe(topics=['foobar'])
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
+ mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True))
+ mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False])
+ mocker.patch.object(coordinator, '_on_join_complete')
+ mocker.patch.object(coordinator, '_heartbeat_thread')
+
+ coordinator.ensure_active_group()
+
+ coordinator._send_join_group_request.assert_called_once_with()