summaryrefslogtreecommitdiff
path: root/kafka/coordinator/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r--kafka/coordinator/base.py674
1 files changed, 436 insertions, 238 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index a3055da..b16c1e1 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import, division
import abc
import copy
import logging
+import sys
+import threading
import time
import weakref
@@ -20,6 +22,28 @@ from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
log = logging.getLogger('kafka.coordinator')
+class MemberState(object):
+ UNJOINED = '<unjoined>' # the client is not part of a group
+ REBALANCING = '<rebalancing>' # the client has begun rebalancing
+ STABLE = '<stable>' # the client has joined and is sending heartbeats
+
+
+class Generation(object):
+ def __init__(self, generation_id, member_id, protocol):
+ self.generation_id = generation_id
+ self.member_id = member_id
+ self.protocol = protocol
+
+Generation.NO_GENERATION = Generation(
+ OffsetCommitRequest[2].DEFAULT_GENERATION_ID,
+ JoinGroupRequest[0].UNKNOWN_MEMBER_ID,
+ None)
+
+
+class UnjoinedGroupException(Errors.KafkaError):
+ retriable = True
+
+
class BaseCoordinator(object):
"""
BaseCoordinator implements group management for a single group member
@@ -47,14 +71,23 @@ class BaseCoordinator(object):
:meth:`.group_protocols` and the format of the state assignment provided by
the leader in :meth:`._perform_assignment` and which becomes available to
members in :meth:`._on_join_complete`.
+
+ Note on locking: this class shares state between the caller and a background
+ thread which is used for sending heartbeats after the client has joined the
+ group. All mutable state as well as state transitions are protected with the
+ class's monitor. Generally this means acquiring the lock before reading or
+ writing the state of the group (e.g. generation, member_id) and holding the
+ lock when sending a request that affects the state of the group
+ (e.g. JoinGroup, LeaveGroup).
"""
DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
+ 'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
- 'api_version': (0, 9),
+ 'api_version': (0, 10, 1),
'metric_group_prefix': '',
}
@@ -83,27 +116,31 @@ class BaseCoordinator(object):
if key in configs:
self.config[key] = configs[key]
+ if self.config['api_version'] < (0, 10, 1):
+ if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
+ raise Errors.KafkaConfigurationError("Broker version %s does not support "
+ "different values for max_poll_interval_ms "
+ "and session_timeout_ms")
+
self._client = client
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
+ self.heartbeat = Heartbeat(**self.config)
+ self._heartbeat_thread = None
+ self._lock = threading.Condition()
+ self.rejoin_needed = True
+ self.rejoining = False # renamed / complement of java needsJoinPrepare
+ self.state = MemberState.UNJOINED
+ self.join_future = None
self.coordinator_id = None
self._find_coordinator_future = None
- self.rejoin_needed = True
- self.rejoining = False
- self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
+ self._generation = Generation.NO_GENERATION
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
self.config['metric_group_prefix'])
- def __del__(self):
- if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
- self.heartbeat_task.disable()
-
@abc.abstractmethod
def protocol_type(self):
"""
- Unique identifier for the class of protocols implements
+ Unique identifier for the class of supported protocols
(e.g. "consumer" or "connect").
Returns:
@@ -187,42 +224,51 @@ class BaseCoordinator(object):
Returns:
bool: True if the coordinator is unknown
"""
- if self.coordinator_id is None:
- return True
+ return self.coordinator() is None
- if self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead('Node Disconnected')
- return True
+ def coordinator(self):
+ """Get the current coordinator
- return False
+ Returns: the current coordinator id or None if it is unknown
+ """
+ with self._lock:
+ if self.coordinator_id is None:
+ return None
+ elif self._client.is_disconnected(self.coordinator_id):
+ self.coordinator_dead('Node Disconnected')
+ return None
+ else:
+ return self.coordinator_id
def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- while self.coordinator_unknown():
- # Prior to 0.8.2 there was no group coordinator
- # so we will just pick a node at random and treat
- # it as the "coordinator"
- if self.config['api_version'] < (0, 8, 2):
- self.coordinator_id = self._client.least_loaded_node()
- if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
- continue
-
- future = self.lookup_coordinator()
- self._client.poll(future=future)
-
- if future.failed():
- if future.retriable():
- if getattr(future.exception, 'invalid_metadata', False):
- log.debug('Requesting metadata for group coordinator request: %s', future.exception)
- metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update)
+ with self._lock:
+ while self.coordinator_unknown():
+
+ # Prior to 0.8.2 there was no group coordinator
+ # so we will just pick a node at random and treat
+ # it as the "coordinator"
+ if self.config['api_version'] < (0, 8, 2):
+ self.coordinator_id = self._client.least_loaded_node()
+ if self.coordinator_id is not None:
+ self._client.ready(self.coordinator_id)
+ continue
+
+ future = self.lookup_coordinator()
+ self._client.poll(future=future)
+
+ if future.failed():
+ if future.retriable():
+ if getattr(future.exception, 'invalid_metadata', False):
+ log.debug('Requesting metadata for group coordinator request: %s', future.exception)
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
- time.sleep(self.config['retry_backoff_ms'] / 1000)
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None
@@ -248,52 +294,116 @@ class BaseCoordinator(object):
"""
return self.rejoin_needed
+ def poll_heartbeat(self):
+ """
+ Check the status of the heartbeat thread (if it is active) and indicate
+ the liveness of the client. This must be called periodically after
+ joining with :meth:`.ensure_active_group` to ensure that the member stays
+ in the group. If an interval of time longer than the provided rebalance
+ timeout (max_poll_interval_ms) expires without calling this method, then
+ the client will proactively leave the group.
+
+ Raises: RuntimeError for unexpected errors raised from the heartbeat thread
+ """
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ if self._heartbeat_thread.failed:
+ # set the heartbeat thread to None and raise an exception.
+ # If the user catches it, the next call to ensure_active_group()
+ # will spawn a new heartbeat thread.
+ cause = self._heartbeat_thread.failed
+ self._heartbeat_thread = None
+ raise cause # pylint: disable-msg=raising-bad-type
+ self.heartbeat.poll()
+
+ def time_to_next_heartbeat(self):
+ with self._lock:
+ # if we have not joined the group, we don't need to send heartbeats
+ if self.state is MemberState.UNJOINED:
+ return sys.maxsize
+ return self.heartbeat.time_to_next_heartbeat()
+
+ def _handle_join_success(self, member_assignment_bytes):
+ with self._lock:
+ log.info("Successfully joined group %s with generation %s",
+ self.group_id, self._generation.generation_id)
+ self.join_future = None
+ self.state = MemberState.STABLE
+ self.rejoining = False
+ self._heartbeat_thread.enable()
+ self._on_join_complete(self._generation.generation_id,
+ self._generation.member_id,
+ self._generation.protocol,
+ member_assignment_bytes)
+
+ def _handle_join_failure(self, _):
+ with self._lock:
+ self.join_future = None
+ self.state = MemberState.UNJOINED
+
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- # always ensure that the coordinator is ready because we may have been
- # disconnected when sending heartbeats and does not necessarily require
- # us to rejoin the group.
- self.ensure_coordinator_ready()
-
- if not self.need_rejoin():
- return
-
- if not self.rejoining:
- self._on_join_prepare(self.generation, self.member_id)
- self.rejoining = True
-
- while self.need_rejoin():
- self.ensure_coordinator_ready()
-
- # ensure that there are no pending requests to the coordinator.
- # This is important in particular to avoid resending a pending
- # JoinGroup request.
- while not self.coordinator_unknown():
- if not self._client.in_flight_request_count(self.coordinator_id):
- break
- self._client.poll(delayed_tasks=False)
- else:
- continue
-
- future = self._send_join_group_request()
- self._client.poll(future=future)
+ with self._lock:
+ if not self.need_rejoin():
+ return
- if future.succeeded():
- member_assignment_bytes = future.value
- self._on_join_complete(self.generation, self.member_id,
- self.protocol, member_assignment_bytes)
- self.rejoining = False
- self.heartbeat_task.reset()
- else:
- assert future.failed()
- exception = future.exception
- if isinstance(exception, (Errors.UnknownMemberIdError,
- Errors.RebalanceInProgressError,
- Errors.IllegalGenerationError)):
+ # call on_join_prepare if needed. We set a flag to make sure that
+ # we do not call it a second time if the client is woken up before
+ # a pending rebalance completes.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
+ if self._heartbeat_thread is None:
+ log.debug('Starting new heartbeat thread')
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+
+ while self.need_rejoin():
+ self.ensure_coordinator_ready()
+
+ # ensure that there are no pending requests to the coordinator.
+ # This is important in particular to avoid resending a pending
+ # JoinGroup request.
+ while not self.coordinator_unknown():
+ if not self._client.in_flight_request_count(self.coordinator_id):
+ break
+ self._client.poll()
+ else:
continue
- elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000)
+
+ # we store the join future in case we are woken up by the user
+ # after beginning the rebalance in the call to poll below.
+ # This ensures that we do not mistakenly attempt to rejoin
+ # before the pending rebalance has completed.
+ if self.join_future is None:
+ self.state = MemberState.REBALANCING
+ self.join_future = self._send_join_group_request()
+
+ # handle join completion in the callback so that the
+ # callback will be invoked even if the consumer is woken up
+ # before finishing the rebalance
+ self.join_future.add_callback(self._handle_join_success)
+
+ # we handle failures below after the request finishes.
+ # If the join completes after having been woken up, the
+ # exception is ignored and we will rejoin
+ self.join_future.add_errback(self._handle_join_failure)
+
+ future = self.join_future
+ self._client.poll(future=future)
+
+ if future.failed():
+ exception = future.exception
+ if isinstance(exception, (Errors.UnknownMemberIdError,
+ Errors.RebalanceInProgressError,
+ Errors.IllegalGenerationError)):
+ continue
+ elif not future.retriable():
+ raise exception # pylint: disable-msg=raising-bad-type
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -315,14 +425,35 @@ class BaseCoordinator(object):
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
- request = JoinGroupRequest[0](
- self.group_id,
- self.config['session_timeout_ms'],
- self.member_id,
- self.protocol_type(),
- [(protocol,
- metadata if isinstance(metadata, bytes) else metadata.encode())
- for protocol, metadata in self.group_protocols()])
+ member_metadata = [
+ (protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
+ for protocol, metadata in self.group_protocols()
+ ]
+ if self.config['api_version'] < (0, 9):
+ raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
+ elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
+ request = JoinGroupRequest[0](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
+ elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
+ request = JoinGroupRequest[1](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
+ else:
+ request = JoinGroupRequest[2](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
@@ -348,19 +479,25 @@ class BaseCoordinator(object):
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
- self.member_id = response.member_id
- self.generation = response.generation_id
- self.rejoin_needed = False
- self.protocol = response.group_protocol
- log.info("Joined group '%s' (generation %s) with member_id %s",
- self.group_id, self.generation, self.member_id)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
- if response.leader_id == response.member_id:
- log.info("Elected group leader -- performing partition"
- " assignments using %s", self.protocol)
- self._on_join_leader(response).chain(future)
- else:
- self._on_join_follower().chain(future)
+ with self._lock:
+ if self.state is not MemberState.REBALANCING:
+ # if the consumer was woken up before a rebalance completes,
+ # we may have already left the group. In this case, we do
+ # not want to continue with the sync group.
+ future.failure(UnjoinedGroupException())
+ else:
+ self._generation = Generation(response.generation_id,
+ response.member_id,
+ response.group_protocol)
+ self.rejoin_needed = False
+
+ if response.leader_id == response.member_id:
+ log.info("Elected group leader -- performing partition"
+ " assignments using %s", self._generation.protocol)
+ self._on_join_leader(response).chain(future)
+ else:
+ self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator %s"
@@ -369,8 +506,8 @@ class BaseCoordinator(object):
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
- error = error_type(self.member_id)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ error = error_type(self._generation.member_id)
+ self.reset_generation()
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
@@ -400,10 +537,11 @@ class BaseCoordinator(object):
def _on_join_follower(self):
# send follower's sync group with an empty assignment
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
{})
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
@@ -427,10 +565,11 @@ class BaseCoordinator(object):
except Exception as e:
return Future().failure(e)
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
[(member_id,
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
@@ -460,14 +599,12 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("Successfully joined group %s with generation %s",
- self.group_id, self.generation)
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
# Always rejoin on error
- self.rejoin_needed = True
+ self.request_rejoin()
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
@@ -478,7 +615,7 @@ class BaseCoordinator(object):
Errors.IllegalGenerationError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ self.reset_generation()
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
@@ -516,30 +653,24 @@ class BaseCoordinator(object):
def _handle_group_coordinator_response(self, future, response):
log.debug("Received group coordinator response %s", response)
- if not self.coordinator_unknown():
- # We already found the coordinator, so ignore the request
- log.debug("Coordinator already known -- ignoring metadata response")
- future.success(self.coordinator_id)
- return
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- ok = self._client.cluster.add_group_coordinator(self.group_id, response)
- if not ok:
- # This could happen if coordinator metadata is different
- # than broker metadata
- future.failure(Errors.IllegalStateError())
- return
-
- self.coordinator_id = response.coordinator_id
- log.info("Discovered coordinator %s for group %s",
- self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
-
- # start sending heartbeats only if we have a valid generation
- if self.generation > 0:
- self.heartbeat_task.reset()
+ with self._lock:
+ ok = self._client.cluster.add_group_coordinator(self.group_id, response)
+ if not ok:
+ # This could happen if coordinator metadata is different
+ # than broker metadata
+ future.failure(Errors.IllegalStateError())
+ return
+
+ self.coordinator_id = response.coordinator_id
+ log.info("Discovered coordinator %s for group %s",
+ self.coordinator_id, self.group_id)
+ self._client.ready(self.coordinator_id)
+ self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)
+
elif error_type is Errors.GroupCoordinatorNotAvailableError:
log.debug("Group Coordinator Not Available; retry")
future.failure(error_type())
@@ -549,45 +680,74 @@ class BaseCoordinator(object):
future.failure(error)
else:
error = error_type()
- log.error("Unrecognized failure in Group Coordinator Request: %s",
- error)
+ log.error("Group coordinator lookup for group %s failed: %s",
+ self.group_id, error)
future.failure(error)
def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
- if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
- self.coordinator_id, self.group_id, error)
- self.coordinator_id = None
+ with self._lock:
+ if self.coordinator_id is not None:
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
+ self.coordinator_id = None
+
+ def generation(self):
+ """Get the current generation state if the group is stable.
+
+ Returns: the current generation or None if the group is unjoined/rebalancing
+ """
+ with self._lock:
+ if self.state is not MemberState.STABLE:
+ return None
+ return self._generation
+
+ def reset_generation(self):
+ """Reset the generation and memberId because we have fallen out of the group."""
+ with self._lock:
+ self._generation = Generation.NO_GENERATION
+ self.rejoin_needed = True
+ self.state = MemberState.UNJOINED
+
+ def request_rejoin(self):
+ self.rejoin_needed = True
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- try:
- self._client.unschedule(self.heartbeat_task)
- except KeyError:
- pass
-
- if not self.coordinator_unknown() and self.generation > 0:
- # this is a minimal effort attempt to leave the group. we do not
- # attempt any resending if the request fails or times out.
- log.info('Leaving consumer group (%s).', self.group_id)
- request = LeaveGroupRequest[0](self.group_id, self.member_id)
- future = self._client.send(self.coordinator_id, request)
- future.add_callback(self._handle_leave_group_response)
- future.add_errback(log.error, "LeaveGroup request failed: %s")
- self._client.poll(future=future)
-
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ self._heartbeat_thread.close()
+ self._heartbeat_thread = None
+ self.maybe_leave_group()
+
+ def maybe_leave_group(self):
+ """Leave the current group and reset local generation/memberId."""
+ with self._lock:
+ if (not self.coordinator_unknown()
+ and self.state is not MemberState.UNJOINED
+ and self._generation is not Generation.NO_GENERATION):
+
+ # this is a minimal effort attempt to leave the group. we do not
+ # attempt any resending if the request fails or times out.
+ log.info('Leaving consumer group (%s).', self.group_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
+ future = self._client.send(self.coordinator_id, request)
+ future.add_callback(self._handle_leave_group_response)
+ future.add_errback(log.error, "LeaveGroup request failed: %s")
+ self._client.poll(future=future)
+
+ self.reset_generation()
def _handle_leave_group_response(self, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("LeaveGroup request succeeded")
+ log.debug("LeaveGroup request for group %s returned successfully",
+ self.group_id)
else:
- log.error("LeaveGroup request failed: %s", error_type())
+ log.error("LeaveGroup request for group %s failed with error: %s",
+ self.group_id, error_type())
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
@@ -599,7 +759,10 @@ class BaseCoordinator(object):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)
- request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = HeartbeatRequest[version](self.group_id,
+ self._generation.generation_id,
+ self._generation.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
@@ -619,24 +782,23 @@ class BaseCoordinator(object):
Errors.NotCoordinatorForGroupError):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
- self.coordinator_id)
+ self.coordinator())
self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
- self.rejoin_needed = True
+ self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
@@ -648,76 +810,6 @@ class BaseCoordinator(object):
future.failure(error)
-class HeartbeatTask(object):
- def __init__(self, coordinator):
- self._coordinator = coordinator
- self._heartbeat = coordinator.heartbeat
- self._client = coordinator._client
- self._request_in_flight = False
-
- def disable(self):
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
-
- def reset(self):
- # start or restart the heartbeat task to be executed at the next chance
- self._heartbeat.reset_session_timeout()
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
- if not self._request_in_flight:
- self._client.schedule(self, time.time())
-
- def __call__(self):
- if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin()):
- # no need to send the heartbeat we're not using auto-assignment
- # or if we are awaiting a rebalance
- log.info("Skipping heartbeat: no auto-assignment"
- " or waiting on rebalance")
- return
-
- if self._coordinator.coordinator_unknown():
- log.warning("Coordinator unknown during heartbeat -- will retry")
- self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
- return
-
- if self._heartbeat.session_expired():
- # we haven't received a successful heartbeat in one session interval
- # so mark the coordinator dead
- log.error("Heartbeat session expired - marking coordinator dead")
- self._coordinator.coordinator_dead('Heartbeat session expired')
- return
-
- if not self._heartbeat.should_heartbeat():
- # we don't need to heartbeat now, so reschedule for when we do
- ttl = self._heartbeat.ttl()
- log.debug("Heartbeat task unneeded now, retrying in %s", ttl)
- self._client.schedule(self, time.time() + ttl)
- else:
- self._heartbeat.sent_heartbeat()
- self._request_in_flight = True
- future = self._coordinator._send_heartbeat_request()
- future.add_callback(self._handle_heartbeat_success)
- future.add_errback(self._handle_heartbeat_failure)
-
- def _handle_heartbeat_success(self, v):
- log.debug("Received successful heartbeat")
- self._request_in_flight = False
- self._heartbeat.received_heartbeat()
- ttl = self._heartbeat.ttl()
- self._client.schedule(self, time.time() + ttl)
-
- def _handle_heartbeat_failure(self, e):
- log.warning("Heartbeat failed (%s); retrying", e)
- self._request_in_flight = False
- etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
- self._client.schedule(self, etd)
-
-
class GroupCoordinatorMetrics(object):
def __init__(self, heartbeat, metrics, prefix, tags=None):
self.heartbeat = heartbeat
@@ -764,6 +856,112 @@ class GroupCoordinatorMetrics(object):
metrics.add_metric(metrics.metric_name(
'last-heartbeat-seconds-ago', self.metric_group_name,
- 'The number of seconds since the last controller heartbeat',
+ 'The number of seconds since the last controller heartbeat was sent',
tags), AnonMeasurable(
lambda _, now: (now / 1000) - self.heartbeat.last_send))
+
+
+class HeartbeatThread(threading.Thread):
+ def __init__(self, coordinator):
+ super(HeartbeatThread, self).__init__()
+ self.name = threading.current_thread().name + '-heartbeat'
+ self.coordinator = coordinator
+ self.enabled = False
+ self.closed = False
+ self.failed = None
+
+ def enable(self):
+ with self.coordinator._lock:
+ self.enabled = True
+ self.coordinator.heartbeat.reset_timeouts()
+ self.coordinator._lock.notify()
+
+ def disable(self):
+ with self.coordinator._lock:
+ self.enabled = False
+
+ def close(self):
+ with self.coordinator._lock:
+ self.closed = True
+ self.coordinator._lock.notify()
+
+ def run(self):
+ try:
+ while not self.closed:
+ self._run_once()
+
+ log.debug('Heartbeat closed!')
+
+ except RuntimeError as e:
+ log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
+ self.coordinator.group_id, e)
+ self.failed = e
+
+ def _run_once(self):
+ with self.coordinator._lock:
+ if not self.enabled:
+ log.debug('Heartbeat disabled. Waiting')
+ self.coordinator._lock.wait()
+ log.debug('Heartbeat re-enabled.')
+ return
+
+ if self.coordinator.state is not MemberState.STABLE:
+ # the group is not stable (perhaps because we left the
+ # group or because the coordinator kicked us out), so
+ # disable heartbeats and wait for the main thread to rejoin.
+ log.debug('Group state is not stable, disabling heartbeats')
+ self.disable()
+ return
+
+ # TODO: When consumer.wakeup() is implemented, we need to
+ # disable here to prevent propagating an exception to this
+ # heartbeat thread
+ self.coordinator._client.poll(timeout_ms=0)
+
+ if self.coordinator.coordinator_unknown():
+ if not self.coordinator.lookup_coordinator().is_done:
+ self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
+
+ elif self.coordinator.heartbeat.session_timeout_expired():
+ # the session timeout has expired without seeing a
+ # successful heartbeat, so we should probably make sure
+ # the coordinator is still healthy.
+ log.debug('Heartbeat session expired, marking coordinator dead')
+ self.coordinator.coordinator_dead('Heartbeat session expired')
+
+ elif self.coordinator.heartbeat.poll_timeout_expired():
+ # the poll timeout has expired, which means that the
+ # foreground thread has stalled in between calls to
+ # poll(), so we explicitly leave the group.
+ log.debug('Heartbeat poll expired, leaving group')
+ self.coordinator.maybe_leave_group()
+
+ elif not self.coordinator.heartbeat.should_heartbeat():
+ # poll again after waiting for the retry backoff in case
+ # the heartbeat failed or the coordinator disconnected
+ log.debug('Not ready to heartbeat, waiting')
+ self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
+
+ else:
+ self.coordinator.heartbeat.sent_heartbeat()
+ future = self.coordinator._send_heartbeat_request()
+ future.add_callback(self._handle_heartbeat_success)
+ future.add_errback(self._handle_heartbeat_failure)
+
+ def _handle_heartbeat_success(self, result):
+ with self.coordinator._lock:
+ self.coordinator.heartbeat.received_heartbeat()
+
+ def _handle_heartbeat_failure(self, exception):
+ with self.coordinator._lock:
+ if isinstance(exception, Errors.RebalanceInProgressError):
+ # it is valid to continue heartbeating while the group is
+ # rebalancing. This ensures that the coordinator keeps the
+ # member in the group for as long as the duration of the
+ # rebalance timeout. If we stop sending heartbeats, however,
+ # then the session timeout may expire before we can rejoin.
+ self.coordinator.heartbeat.received_heartbeat()
+ else:
+ self.coordinator.heartbeat.fail_heartbeat()
+ # wake up the thread if it's sleeping to reschedule the heartbeat
+ self.coordinator._lock.notify()