diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 674 |
1 files changed, 436 insertions, 238 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index a3055da..b16c1e1 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -3,6 +3,8 @@ from __future__ import absolute_import, division import abc import copy import logging +import sys +import threading import time import weakref @@ -20,6 +22,28 @@ from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, log = logging.getLogger('kafka.coordinator') +class MemberState(object): + UNJOINED = '<unjoined>' # the client is not part of a group + REBALANCING = '<rebalancing>' # the client has begun rebalancing + STABLE = '<stable>' # the client has joined and is sending heartbeats + + +class Generation(object): + def __init__(self, generation_id, member_id, protocol): + self.generation_id = generation_id + self.member_id = member_id + self.protocol = protocol + +Generation.NO_GENERATION = Generation( + OffsetCommitRequest[2].DEFAULT_GENERATION_ID, + JoinGroupRequest[0].UNKNOWN_MEMBER_ID, + None) + + +class UnjoinedGroupException(Errors.KafkaError): + retriable = True + + class BaseCoordinator(object): """ BaseCoordinator implements group management for a single group member @@ -47,14 +71,23 @@ class BaseCoordinator(object): :meth:`.group_protocols` and the format of the state assignment provided by the leader in :meth:`._perform_assignment` and which becomes available to members in :meth:`._on_join_complete`. + + Note on locking: this class shares state between the caller and a background + thread which is used for sending heartbeats after the client has joined the + group. All mutable state as well as state transitions are protected with the + class's monitor. Generally this means acquiring the lock before reading or + writing the state of the group (e.g. generation, member_id) and holding the + lock when sending a request that affects the state of the group + (e.g. JoinGroup, LeaveGroup). """ DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', - 'session_timeout_ms': 30000, + 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, + 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, - 'api_version': (0, 9), + 'api_version': (0, 10, 1), 'metric_group_prefix': '', } @@ -83,27 +116,31 @@ class BaseCoordinator(object): if key in configs: self.config[key] = configs[key] + if self.config['api_version'] < (0, 10, 1): + if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: + raise Errors.KafkaConfigurationError("Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms") + self._client = client - self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] + self.heartbeat = Heartbeat(**self.config) + self._heartbeat_thread = None + self._lock = threading.Condition() + self.rejoin_needed = True + self.rejoining = False # renamed / complement of java needsJoinPrepare + self.state = MemberState.UNJOINED + self.join_future = None self.coordinator_id = None self._find_coordinator_future = None - self.rejoin_needed = True - self.rejoining = False - self.heartbeat = Heartbeat(**self.config) - self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) + self._generation = Generation.NO_GENERATION self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, self.config['metric_group_prefix']) - def __del__(self): - if hasattr(self, 'heartbeat_task') and self.heartbeat_task: - self.heartbeat_task.disable() - @abc.abstractmethod def protocol_type(self): """ - Unique identifier for the class of protocols implements + Unique identifier for the class of supported protocols (e.g. "consumer" or "connect"). Returns: @@ -187,42 +224,51 @@ class BaseCoordinator(object): Returns: bool: True if the coordinator is unknown """ - if self.coordinator_id is None: - return True + return self.coordinator() is None - if self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead('Node Disconnected') - return True + def coordinator(self): + """Get the current coordinator - return False + Returns: the current coordinator id or None if it is unknown + """ + with self._lock: + if self.coordinator_id is None: + return None + elif self._client.is_disconnected(self.coordinator_id): + self.coordinator_dead('Node Disconnected') + return None + else: + return self.coordinator_id def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - while self.coordinator_unknown(): - # Prior to 0.8.2 there was no group coordinator - # so we will just pick a node at random and treat - # it as the "coordinator" - if self.config['api_version'] < (0, 8, 2): - self.coordinator_id = self._client.least_loaded_node() - if self.coordinator_id is not None: - self._client.ready(self.coordinator_id) - continue - - future = self.lookup_coordinator() - self._client.poll(future=future) - - if future.failed(): - if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): - log.debug('Requesting metadata for group coordinator request: %s', future.exception) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) + with self._lock: + while self.coordinator_unknown(): + + # Prior to 0.8.2 there was no group coordinator + # so we will just pick a node at random and treat + # it as the "coordinator" + if self.config['api_version'] < (0, 8, 2): + self.coordinator_id = self._client.least_loaded_node() + if self.coordinator_id is not None: + self._client.ready(self.coordinator_id) + continue + + future = self.lookup_coordinator() + self._client.poll(future=future) + + if future.failed(): + if future.retriable(): + if getattr(future.exception, 'invalid_metadata', False): + log.debug('Requesting metadata for group coordinator request: %s', future.exception) + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000) else: - time.sleep(self.config['retry_backoff_ms'] / 1000) - else: - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type def _reset_find_coordinator_future(self, result): self._find_coordinator_future = None @@ -248,52 +294,116 @@ class BaseCoordinator(object): """ return self.rejoin_needed + def poll_heartbeat(self): + """ + Check the status of the heartbeat thread (if it is active) and indicate + the liveness of the client. This must be called periodically after + joining with :meth:`.ensure_active_group` to ensure that the member stays + in the group. If an interval of time longer than the provided rebalance + timeout (max_poll_interval_ms) expires without calling this method, then + the client will proactively leave the group. + + Raises: RuntimeError for unexpected errors raised from the heartbeat thread + """ + with self._lock: + if self._heartbeat_thread is not None: + if self._heartbeat_thread.failed: + # set the heartbeat thread to None and raise an exception. + # If the user catches it, the next call to ensure_active_group() + # will spawn a new heartbeat thread. + cause = self._heartbeat_thread.failed + self._heartbeat_thread = None + raise cause # pylint: disable-msg=raising-bad-type + self.heartbeat.poll() + + def time_to_next_heartbeat(self): + with self._lock: + # if we have not joined the group, we don't need to send heartbeats + if self.state is MemberState.UNJOINED: + return sys.maxsize + return self.heartbeat.time_to_next_heartbeat() + + def _handle_join_success(self, member_assignment_bytes): + with self._lock: + log.info("Successfully joined group %s with generation %s", + self.group_id, self._generation.generation_id) + self.join_future = None + self.state = MemberState.STABLE + self.rejoining = False + self._heartbeat_thread.enable() + self._on_join_complete(self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + member_assignment_bytes) + + def _handle_join_failure(self, _): + with self._lock: + self.join_future = None + self.state = MemberState.UNJOINED + def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - # always ensure that the coordinator is ready because we may have been - # disconnected when sending heartbeats and does not necessarily require - # us to rejoin the group. - self.ensure_coordinator_ready() - - if not self.need_rejoin(): - return - - if not self.rejoining: - self._on_join_prepare(self.generation, self.member_id) - self.rejoining = True - - while self.need_rejoin(): - self.ensure_coordinator_ready() - - # ensure that there are no pending requests to the coordinator. - # This is important in particular to avoid resending a pending - # JoinGroup request. - while not self.coordinator_unknown(): - if not self._client.in_flight_request_count(self.coordinator_id): - break - self._client.poll(delayed_tasks=False) - else: - continue - - future = self._send_join_group_request() - self._client.poll(future=future) + with self._lock: + if not self.need_rejoin(): + return - if future.succeeded(): - member_assignment_bytes = future.value - self._on_join_complete(self.generation, self.member_id, - self.protocol, member_assignment_bytes) - self.rejoining = False - self.heartbeat_task.reset() - else: - assert future.failed() - exception = future.exception - if isinstance(exception, (Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): + # call on_join_prepare if needed. We set a flag to make sure that + # we do not call it a second time if the client is woken up before + # a pending rebalance completes. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + + if self._heartbeat_thread is None: + log.debug('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + while self.need_rejoin(): + self.ensure_coordinator_ready() + + # ensure that there are no pending requests to the coordinator. + # This is important in particular to avoid resending a pending + # JoinGroup request. + while not self.coordinator_unknown(): + if not self._client.in_flight_request_count(self.coordinator_id): + break + self._client.poll() + else: continue - elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + + # we store the join future in case we are woken up by the user + # after beginning the rebalance in the call to poll below. + # This ensures that we do not mistakenly attempt to rejoin + # before the pending rebalance has completed. + if self.join_future is None: + self.state = MemberState.REBALANCING + self.join_future = self._send_join_group_request() + + # handle join completion in the callback so that the + # callback will be invoked even if the consumer is woken up + # before finishing the rebalance + self.join_future.add_callback(self._handle_join_success) + + # we handle failures below after the request finishes. + # If the join completes after having been woken up, the + # exception is ignored and we will rejoin + self.join_future.add_errback(self._handle_join_failure) + + future = self.join_future + self._client.poll(future=future) + + if future.failed(): + exception = future.exception + if isinstance(exception, (Errors.UnknownMemberIdError, + Errors.RebalanceInProgressError, + Errors.IllegalGenerationError)): + continue + elif not future.retriable(): + raise exception # pylint: disable-msg=raising-bad-type + time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -315,14 +425,35 @@ class BaseCoordinator(object): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - request = JoinGroupRequest[0]( - self.group_id, - self.config['session_timeout_ms'], - self.member_id, - self.protocol_type(), - [(protocol, - metadata if isinstance(metadata, bytes) else metadata.encode()) - for protocol, metadata in self.group_protocols()]) + member_metadata = [ + (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) + for protocol, metadata in self.group_protocols() + ] + if self.config['api_version'] < (0, 9): + raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') + elif (0, 9) <= self.config['api_version'] < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self.config['session_timeout_ms'], + self._generation.member_id, + self.protocol_type(), + member_metadata) + elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self._generation.member_id, + self.protocol_type(), + member_metadata) + else: + request = JoinGroupRequest[2]( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self._generation.member_id, + self.protocol_type(), + member_metadata) # create the request for the coordinator log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) @@ -348,19 +479,25 @@ class BaseCoordinator(object): if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", self.group_id, response) - self.member_id = response.member_id - self.generation = response.generation_id - self.rejoin_needed = False - self.protocol = response.group_protocol - log.info("Joined group '%s' (generation %s) with member_id %s", - self.group_id, self.generation, self.member_id) self.sensors.join_latency.record((time.time() - send_time) * 1000) - if response.leader_id == response.member_id: - log.info("Elected group leader -- performing partition" - " assignments using %s", self.protocol) - self._on_join_leader(response).chain(future) - else: - self._on_join_follower().chain(future) + with self._lock: + if self.state is not MemberState.REBALANCING: + # if the consumer was woken up before a rebalance completes, + # we may have already left the group. In this case, we do + # not want to continue with the sync group. + future.failure(UnjoinedGroupException()) + else: + self._generation = Generation(response.generation_id, + response.member_id, + response.group_protocol) + self.rejoin_needed = False + + if response.leader_id == response.member_id: + log.info("Elected group leader -- performing partition" + " assignments using %s", self._generation.protocol) + self._on_join_leader(response).chain(future) + else: + self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: log.debug("Attempt to join group %s rejected since coordinator %s" @@ -369,8 +506,8 @@ class BaseCoordinator(object): future.failure(error_type(response)) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately - error = error_type(self.member_id) - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + error = error_type(self._generation.member_id) + self.reset_generation() log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) @@ -400,10 +537,11 @@ class BaseCoordinator(object): def _on_join_follower(self): # send follower's sync group with an empty assignment - request = SyncGroupRequest[0]( + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( self.group_id, - self.generation, - self.member_id, + self._generation.generation_id, + self._generation.member_id, {}) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -427,10 +565,11 @@ class BaseCoordinator(object): except Exception as e: return Future().failure(e) - request = SyncGroupRequest[0]( + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( self.group_id, - self.generation, - self.member_id, + self._generation.generation_id, + self._generation.member_id, [(member_id, assignment if isinstance(assignment, bytes) else assignment.encode()) for member_id, assignment in six.iteritems(group_assignment)]) @@ -460,14 +599,12 @@ class BaseCoordinator(object): def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.info("Successfully joined group %s with generation %s", - self.group_id, self.generation) self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return # Always rejoin on error - self.rejoin_needed = True + self.request_rejoin() if error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) elif error_type is Errors.RebalanceInProgressError: @@ -478,7 +615,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID + self.reset_generation() future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): @@ -516,30 +653,24 @@ class BaseCoordinator(object): def _handle_group_coordinator_response(self, future, response): log.debug("Received group coordinator response %s", response) - if not self.coordinator_unknown(): - # We already found the coordinator, so ignore the request - log.debug("Coordinator already known -- ignoring metadata response") - future.success(self.coordinator_id) - return error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - ok = self._client.cluster.add_group_coordinator(self.group_id, response) - if not ok: - # This could happen if coordinator metadata is different - # than broker metadata - future.failure(Errors.IllegalStateError()) - return - - self.coordinator_id = response.coordinator_id - log.info("Discovered coordinator %s for group %s", - self.coordinator_id, self.group_id) - self._client.ready(self.coordinator_id) - - # start sending heartbeats only if we have a valid generation - if self.generation > 0: - self.heartbeat_task.reset() + with self._lock: + ok = self._client.cluster.add_group_coordinator(self.group_id, response) + if not ok: + # This could happen if coordinator metadata is different + # than broker metadata + future.failure(Errors.IllegalStateError()) + return + + self.coordinator_id = response.coordinator_id + log.info("Discovered coordinator %s for group %s", + self.coordinator_id, self.group_id) + self._client.ready(self.coordinator_id) + self.heartbeat.reset_timeouts() future.success(self.coordinator_id) + elif error_type is Errors.GroupCoordinatorNotAvailableError: log.debug("Group Coordinator Not Available; retry") future.failure(error_type()) @@ -549,45 +680,74 @@ class BaseCoordinator(object): future.failure(error) else: error = error_type() - log.error("Unrecognized failure in Group Coordinator Request: %s", - error) + log.error("Group coordinator lookup for group %s failed: %s", + self.group_id, error) future.failure(error) def coordinator_dead(self, error): """Mark the current coordinator as dead.""" - if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, self.group_id, error) - self.coordinator_id = None + with self._lock: + if self.coordinator_id is not None: + log.warning("Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, self.group_id, error) + self.coordinator_id = None + + def generation(self): + """Get the current generation state if the group is stable. + + Returns: the current generation or None if the group is unjoined/rebalancing + """ + with self._lock: + if self.state is not MemberState.STABLE: + return None + return self._generation + + def reset_generation(self): + """Reset the generation and memberId because we have fallen out of the group.""" + with self._lock: + self._generation = Generation.NO_GENERATION + self.rejoin_needed = True + self.state = MemberState.UNJOINED + + def request_rejoin(self): + self.rejoin_needed = True def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" - try: - self._client.unschedule(self.heartbeat_task) - except KeyError: - pass - - if not self.coordinator_unknown() and self.generation > 0: - # this is a minimal effort attempt to leave the group. we do not - # attempt any resending if the request fails or times out. - log.info('Leaving consumer group (%s).', self.group_id) - request = LeaveGroupRequest[0](self.group_id, self.member_id) - future = self._client.send(self.coordinator_id, request) - future.add_callback(self._handle_leave_group_response) - future.add_errback(log.error, "LeaveGroup request failed: %s") - self._client.poll(future=future) - - self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID - self.rejoin_needed = True + with self._lock: + if self._heartbeat_thread is not None: + self._heartbeat_thread.close() + self._heartbeat_thread = None + self.maybe_leave_group() + + def maybe_leave_group(self): + """Leave the current group and reset local generation/memberId.""" + with self._lock: + if (not self.coordinator_unknown() + and self.state is not MemberState.UNJOINED + and self._generation is not Generation.NO_GENERATION): + + # this is a minimal effort attempt to leave the group. we do not + # attempt any resending if the request fails or times out. + log.info('Leaving consumer group (%s).', self.group_id) + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + future = self._client.send(self.coordinator_id, request) + future.add_callback(self._handle_leave_group_response) + future.add_errback(log.error, "LeaveGroup request failed: %s") + self._client.poll(future=future) + + self.reset_generation() def _handle_leave_group_response(self, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.info("LeaveGroup request succeeded") + log.debug("LeaveGroup request for group %s returned successfully", + self.group_id) else: - log.error("LeaveGroup request failed: %s", error_type()) + log.error("LeaveGroup request for group %s failed with error: %s", + self.group_id, error_type()) def _send_heartbeat_request(self): """Send a heartbeat request""" @@ -599,7 +759,10 @@ class BaseCoordinator(object): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + request = HeartbeatRequest[version](self.group_id, + self._generation.generation_id, + self._generation.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -619,24 +782,23 @@ class BaseCoordinator(object): Errors.NotCoordinatorForGroupError): log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, - self.coordinator_id) + self.coordinator()) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: log.warning("Heartbeat failed for group %s because it is" " rebalancing", self.group_id) - self.rejoin_needed = True + self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: log.warning("Heartbeat failed for group %s: generation id is not " " current.", self.group_id) - self.rejoin_needed = True + self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") - self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID - self.rejoin_needed = True + self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: error = error_type(self.group_id) @@ -648,76 +810,6 @@ class BaseCoordinator(object): future.failure(error) -class HeartbeatTask(object): - def __init__(self, coordinator): - self._coordinator = coordinator - self._heartbeat = coordinator.heartbeat - self._client = coordinator._client - self._request_in_flight = False - - def disable(self): - try: - self._client.unschedule(self) - except KeyError: - pass - - def reset(self): - # start or restart the heartbeat task to be executed at the next chance - self._heartbeat.reset_session_timeout() - try: - self._client.unschedule(self) - except KeyError: - pass - if not self._request_in_flight: - self._client.schedule(self, time.time()) - - def __call__(self): - if (self._coordinator.generation < 0 or - self._coordinator.need_rejoin()): - # no need to send the heartbeat we're not using auto-assignment - # or if we are awaiting a rebalance - log.info("Skipping heartbeat: no auto-assignment" - " or waiting on rebalance") - return - - if self._coordinator.coordinator_unknown(): - log.warning("Coordinator unknown during heartbeat -- will retry") - self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError()) - return - - if self._heartbeat.session_expired(): - # we haven't received a successful heartbeat in one session interval - # so mark the coordinator dead - log.error("Heartbeat session expired - marking coordinator dead") - self._coordinator.coordinator_dead('Heartbeat session expired') - return - - if not self._heartbeat.should_heartbeat(): - # we don't need to heartbeat now, so reschedule for when we do - ttl = self._heartbeat.ttl() - log.debug("Heartbeat task unneeded now, retrying in %s", ttl) - self._client.schedule(self, time.time() + ttl) - else: - self._heartbeat.sent_heartbeat() - self._request_in_flight = True - future = self._coordinator._send_heartbeat_request() - future.add_callback(self._handle_heartbeat_success) - future.add_errback(self._handle_heartbeat_failure) - - def _handle_heartbeat_success(self, v): - log.debug("Received successful heartbeat") - self._request_in_flight = False - self._heartbeat.received_heartbeat() - ttl = self._heartbeat.ttl() - self._client.schedule(self, time.time() + ttl) - - def _handle_heartbeat_failure(self, e): - log.warning("Heartbeat failed (%s); retrying", e) - self._request_in_flight = False - etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000 - self._client.schedule(self, etd) - - class GroupCoordinatorMetrics(object): def __init__(self, heartbeat, metrics, prefix, tags=None): self.heartbeat = heartbeat @@ -764,6 +856,112 @@ class GroupCoordinatorMetrics(object): metrics.add_metric(metrics.metric_name( 'last-heartbeat-seconds-ago', self.metric_group_name, - 'The number of seconds since the last controller heartbeat', + 'The number of seconds since the last controller heartbeat was sent', tags), AnonMeasurable( lambda _, now: (now / 1000) - self.heartbeat.last_send)) + + +class HeartbeatThread(threading.Thread): + def __init__(self, coordinator): + super(HeartbeatThread, self).__init__() + self.name = threading.current_thread().name + '-heartbeat' + self.coordinator = coordinator + self.enabled = False + self.closed = False + self.failed = None + + def enable(self): + with self.coordinator._lock: + self.enabled = True + self.coordinator.heartbeat.reset_timeouts() + self.coordinator._lock.notify() + + def disable(self): + with self.coordinator._lock: + self.enabled = False + + def close(self): + with self.coordinator._lock: + self.closed = True + self.coordinator._lock.notify() + + def run(self): + try: + while not self.closed: + self._run_once() + + log.debug('Heartbeat closed!') + + except RuntimeError as e: + log.error("Heartbeat thread for group %s failed due to unexpected error: %s", + self.coordinator.group_id, e) + self.failed = e + + def _run_once(self): + with self.coordinator._lock: + if not self.enabled: + log.debug('Heartbeat disabled. Waiting') + self.coordinator._lock.wait() + log.debug('Heartbeat re-enabled.') + return + + if self.coordinator.state is not MemberState.STABLE: + # the group is not stable (perhaps because we left the + # group or because the coordinator kicked us out), so + # disable heartbeats and wait for the main thread to rejoin. + log.debug('Group state is not stable, disabling heartbeats') + self.disable() + return + + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + self.coordinator._client.poll(timeout_ms=0) + + if self.coordinator.coordinator_unknown(): + if not self.coordinator.lookup_coordinator().is_done: + self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + + elif self.coordinator.heartbeat.session_timeout_expired(): + # the session timeout has expired without seeing a + # successful heartbeat, so we should probably make sure + # the coordinator is still healthy. + log.debug('Heartbeat session expired, marking coordinator dead') + self.coordinator.coordinator_dead('Heartbeat session expired') + + elif self.coordinator.heartbeat.poll_timeout_expired(): + # the poll timeout has expired, which means that the + # foreground thread has stalled in between calls to + # poll(), so we explicitly leave the group. + log.debug('Heartbeat poll expired, leaving group') + self.coordinator.maybe_leave_group() + + elif not self.coordinator.heartbeat.should_heartbeat(): + # poll again after waiting for the retry backoff in case + # the heartbeat failed or the coordinator disconnected + log.debug('Not ready to heartbeat, waiting') + self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + + else: + self.coordinator.heartbeat.sent_heartbeat() + future = self.coordinator._send_heartbeat_request() + future.add_callback(self._handle_heartbeat_success) + future.add_errback(self._handle_heartbeat_failure) + + def _handle_heartbeat_success(self, result): + with self.coordinator._lock: + self.coordinator.heartbeat.received_heartbeat() + + def _handle_heartbeat_failure(self, exception): + with self.coordinator._lock: + if isinstance(exception, Errors.RebalanceInProgressError): + # it is valid to continue heartbeating while the group is + # rebalancing. This ensures that the coordinator keeps the + # member in the group for as long as the duration of the + # rebalance timeout. If we stop sending heartbeats, however, + # then the session timeout may expire before we can rejoin. + self.coordinator.heartbeat.received_heartbeat() + else: + self.coordinator.heartbeat.fail_heartbeat() + # wake up the thread if it's sleeping to reschedule the heartbeat + self.coordinator._lock.notify() |