diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5f60aa3..22dffb4 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -190,7 +190,7 @@ class BaseCoordinator(object): return True if self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead() + self.coordinator_dead('Node Disconnected') return True return False @@ -311,7 +311,7 @@ class BaseCoordinator(object): # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests)): - self.coordinator_dead() + self.coordinator_dead(error) future.failure(error) def _handle_join_group_response(self, future, send_time, response): @@ -348,7 +348,7 @@ class BaseCoordinator(object): elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): # re-discover the coordinator and retry with backoff - self.coordinator_dead() + self.coordinator_dead(error_type()) log.debug("Attempt to join group %s failed due to obsolete " "coordinator information: %s", self.group_id, error_type.__name__) @@ -448,7 +448,7 @@ class BaseCoordinator(object): Errors.NotCoordinatorForGroupError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.coordinator_dead() + self.coordinator_dead(error) future.failure(error) else: error = error_type() @@ -513,7 +513,7 @@ class BaseCoordinator(object): error) future.failure(error) - def coordinator_dead(self, error=None): + def coordinator_dead(self, error): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: log.warning("Marking the coordinator dead (node %s) for group %s: %s.", @@ -571,7 +571,7 @@ class BaseCoordinator(object): log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, self.coordinator_id) - self.coordinator_dead() + self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: log.warning("Heartbeat failed for group %s because it is" @@ -640,7 +640,7 @@ class HeartbeatTask(object): # we haven't received a successful heartbeat in one session interval # so mark the coordinator dead log.error("Heartbeat session expired - marking coordinator dead") - self._coordinator.coordinator_dead() + self._coordinator.coordinator_dead('Heartbeat session expired') return if not self._heartbeat.should_heartbeat(): |