summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-21 14:46:10 -0800
committerGitHub <noreply@github.com>2017-12-21 14:46:10 -0800
commitad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch)
treef1993351b2c6487e8e623cefabf42ddf7477f666 /kafka/coordinator
parent995664c7d407009a0a1030c7541848eb5ad51c97 (diff)
downloadkafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/base.py674
-rw-r--r--kafka/coordinator/consumer.py242
-rw-r--r--kafka/coordinator/heartbeat.py49
3 files changed, 615 insertions, 350 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index a3055da..b16c1e1 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -3,6 +3,8 @@ from __future__ import absolute_import, division
import abc
import copy
import logging
+import sys
+import threading
import time
import weakref
@@ -20,6 +22,28 @@ from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
log = logging.getLogger('kafka.coordinator')
+class MemberState(object):
+ UNJOINED = '<unjoined>' # the client is not part of a group
+ REBALANCING = '<rebalancing>' # the client has begun rebalancing
+ STABLE = '<stable>' # the client has joined and is sending heartbeats
+
+
+class Generation(object):
+ def __init__(self, generation_id, member_id, protocol):
+ self.generation_id = generation_id
+ self.member_id = member_id
+ self.protocol = protocol
+
+Generation.NO_GENERATION = Generation(
+ OffsetCommitRequest[2].DEFAULT_GENERATION_ID,
+ JoinGroupRequest[0].UNKNOWN_MEMBER_ID,
+ None)
+
+
+class UnjoinedGroupException(Errors.KafkaError):
+ retriable = True
+
+
class BaseCoordinator(object):
"""
BaseCoordinator implements group management for a single group member
@@ -47,14 +71,23 @@ class BaseCoordinator(object):
:meth:`.group_protocols` and the format of the state assignment provided by
the leader in :meth:`._perform_assignment` and which becomes available to
members in :meth:`._on_join_complete`.
+
+ Note on locking: this class shares state between the caller and a background
+ thread which is used for sending heartbeats after the client has joined the
+ group. All mutable state as well as state transitions are protected with the
+ class's monitor. Generally this means acquiring the lock before reading or
+ writing the state of the group (e.g. generation, member_id) and holding the
+ lock when sending a request that affects the state of the group
+ (e.g. JoinGroup, LeaveGroup).
"""
DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
+ 'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
- 'api_version': (0, 9),
+ 'api_version': (0, 10, 1),
'metric_group_prefix': '',
}
@@ -83,27 +116,31 @@ class BaseCoordinator(object):
if key in configs:
self.config[key] = configs[key]
+ if self.config['api_version'] < (0, 10, 1):
+ if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
+ raise Errors.KafkaConfigurationError("Broker version %s does not support "
+ "different values for max_poll_interval_ms "
+ "and session_timeout_ms")
+
self._client = client
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
+ self.heartbeat = Heartbeat(**self.config)
+ self._heartbeat_thread = None
+ self._lock = threading.Condition()
+ self.rejoin_needed = True
+ self.rejoining = False # renamed / complement of java needsJoinPrepare
+ self.state = MemberState.UNJOINED
+ self.join_future = None
self.coordinator_id = None
self._find_coordinator_future = None
- self.rejoin_needed = True
- self.rejoining = False
- self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
+ self._generation = Generation.NO_GENERATION
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
self.config['metric_group_prefix'])
- def __del__(self):
- if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
- self.heartbeat_task.disable()
-
@abc.abstractmethod
def protocol_type(self):
"""
- Unique identifier for the class of protocols implements
+ Unique identifier for the class of supported protocols
(e.g. "consumer" or "connect").
Returns:
@@ -187,42 +224,51 @@ class BaseCoordinator(object):
Returns:
bool: True if the coordinator is unknown
"""
- if self.coordinator_id is None:
- return True
+ return self.coordinator() is None
- if self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead('Node Disconnected')
- return True
+ def coordinator(self):
+ """Get the current coordinator
- return False
+ Returns: the current coordinator id or None if it is unknown
+ """
+ with self._lock:
+ if self.coordinator_id is None:
+ return None
+ elif self._client.is_disconnected(self.coordinator_id):
+ self.coordinator_dead('Node Disconnected')
+ return None
+ else:
+ return self.coordinator_id
def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
- while self.coordinator_unknown():
- # Prior to 0.8.2 there was no group coordinator
- # so we will just pick a node at random and treat
- # it as the "coordinator"
- if self.config['api_version'] < (0, 8, 2):
- self.coordinator_id = self._client.least_loaded_node()
- if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
- continue
-
- future = self.lookup_coordinator()
- self._client.poll(future=future)
-
- if future.failed():
- if future.retriable():
- if getattr(future.exception, 'invalid_metadata', False):
- log.debug('Requesting metadata for group coordinator request: %s', future.exception)
- metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update)
+ with self._lock:
+ while self.coordinator_unknown():
+
+ # Prior to 0.8.2 there was no group coordinator
+ # so we will just pick a node at random and treat
+ # it as the "coordinator"
+ if self.config['api_version'] < (0, 8, 2):
+ self.coordinator_id = self._client.least_loaded_node()
+ if self.coordinator_id is not None:
+ self._client.ready(self.coordinator_id)
+ continue
+
+ future = self.lookup_coordinator()
+ self._client.poll(future=future)
+
+ if future.failed():
+ if future.retriable():
+ if getattr(future.exception, 'invalid_metadata', False):
+ log.debug('Requesting metadata for group coordinator request: %s', future.exception)
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
else:
- time.sleep(self.config['retry_backoff_ms'] / 1000)
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None
@@ -248,52 +294,116 @@ class BaseCoordinator(object):
"""
return self.rejoin_needed
+ def poll_heartbeat(self):
+ """
+ Check the status of the heartbeat thread (if it is active) and indicate
+ the liveness of the client. This must be called periodically after
+ joining with :meth:`.ensure_active_group` to ensure that the member stays
+ in the group. If an interval of time longer than the provided rebalance
+ timeout (max_poll_interval_ms) expires without calling this method, then
+ the client will proactively leave the group.
+
+ Raises: RuntimeError for unexpected errors raised from the heartbeat thread
+ """
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ if self._heartbeat_thread.failed:
+ # set the heartbeat thread to None and raise an exception.
+ # If the user catches it, the next call to ensure_active_group()
+ # will spawn a new heartbeat thread.
+ cause = self._heartbeat_thread.failed
+ self._heartbeat_thread = None
+ raise cause # pylint: disable-msg=raising-bad-type
+ self.heartbeat.poll()
+
+ def time_to_next_heartbeat(self):
+ with self._lock:
+ # if we have not joined the group, we don't need to send heartbeats
+ if self.state is MemberState.UNJOINED:
+ return sys.maxsize
+ return self.heartbeat.time_to_next_heartbeat()
+
+ def _handle_join_success(self, member_assignment_bytes):
+ with self._lock:
+ log.info("Successfully joined group %s with generation %s",
+ self.group_id, self._generation.generation_id)
+ self.join_future = None
+ self.state = MemberState.STABLE
+ self.rejoining = False
+ self._heartbeat_thread.enable()
+ self._on_join_complete(self._generation.generation_id,
+ self._generation.member_id,
+ self._generation.protocol,
+ member_assignment_bytes)
+
+ def _handle_join_failure(self, _):
+ with self._lock:
+ self.join_future = None
+ self.state = MemberState.UNJOINED
+
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
- # always ensure that the coordinator is ready because we may have been
- # disconnected when sending heartbeats and does not necessarily require
- # us to rejoin the group.
- self.ensure_coordinator_ready()
-
- if not self.need_rejoin():
- return
-
- if not self.rejoining:
- self._on_join_prepare(self.generation, self.member_id)
- self.rejoining = True
-
- while self.need_rejoin():
- self.ensure_coordinator_ready()
-
- # ensure that there are no pending requests to the coordinator.
- # This is important in particular to avoid resending a pending
- # JoinGroup request.
- while not self.coordinator_unknown():
- if not self._client.in_flight_request_count(self.coordinator_id):
- break
- self._client.poll(delayed_tasks=False)
- else:
- continue
-
- future = self._send_join_group_request()
- self._client.poll(future=future)
+ with self._lock:
+ if not self.need_rejoin():
+ return
- if future.succeeded():
- member_assignment_bytes = future.value
- self._on_join_complete(self.generation, self.member_id,
- self.protocol, member_assignment_bytes)
- self.rejoining = False
- self.heartbeat_task.reset()
- else:
- assert future.failed()
- exception = future.exception
- if isinstance(exception, (Errors.UnknownMemberIdError,
- Errors.RebalanceInProgressError,
- Errors.IllegalGenerationError)):
+ # call on_join_prepare if needed. We set a flag to make sure that
+ # we do not call it a second time if the client is woken up before
+ # a pending rebalance completes.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
+ if self._heartbeat_thread is None:
+ log.debug('Starting new heartbeat thread')
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+
+ while self.need_rejoin():
+ self.ensure_coordinator_ready()
+
+ # ensure that there are no pending requests to the coordinator.
+ # This is important in particular to avoid resending a pending
+ # JoinGroup request.
+ while not self.coordinator_unknown():
+ if not self._client.in_flight_request_count(self.coordinator_id):
+ break
+ self._client.poll()
+ else:
continue
- elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000)
+
+ # we store the join future in case we are woken up by the user
+ # after beginning the rebalance in the call to poll below.
+ # This ensures that we do not mistakenly attempt to rejoin
+ # before the pending rebalance has completed.
+ if self.join_future is None:
+ self.state = MemberState.REBALANCING
+ self.join_future = self._send_join_group_request()
+
+ # handle join completion in the callback so that the
+ # callback will be invoked even if the consumer is woken up
+ # before finishing the rebalance
+ self.join_future.add_callback(self._handle_join_success)
+
+ # we handle failures below after the request finishes.
+ # If the join completes after having been woken up, the
+ # exception is ignored and we will rejoin
+ self.join_future.add_errback(self._handle_join_failure)
+
+ future = self.join_future
+ self._client.poll(future=future)
+
+ if future.failed():
+ exception = future.exception
+ if isinstance(exception, (Errors.UnknownMemberIdError,
+ Errors.RebalanceInProgressError,
+ Errors.IllegalGenerationError)):
+ continue
+ elif not future.retriable():
+ raise exception # pylint: disable-msg=raising-bad-type
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -315,14 +425,35 @@ class BaseCoordinator(object):
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
- request = JoinGroupRequest[0](
- self.group_id,
- self.config['session_timeout_ms'],
- self.member_id,
- self.protocol_type(),
- [(protocol,
- metadata if isinstance(metadata, bytes) else metadata.encode())
- for protocol, metadata in self.group_protocols()])
+ member_metadata = [
+ (protocol, metadata if isinstance(metadata, bytes) else metadata.encode())
+ for protocol, metadata in self.group_protocols()
+ ]
+ if self.config['api_version'] < (0, 9):
+ raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
+ elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
+ request = JoinGroupRequest[0](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
+ elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
+ request = JoinGroupRequest[1](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
+ else:
+ request = JoinGroupRequest[2](
+ self.group_id,
+ self.config['session_timeout_ms'],
+ self.config['max_poll_interval_ms'],
+ self._generation.member_id,
+ self.protocol_type(),
+ member_metadata)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
@@ -348,19 +479,25 @@ class BaseCoordinator(object):
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
- self.member_id = response.member_id
- self.generation = response.generation_id
- self.rejoin_needed = False
- self.protocol = response.group_protocol
- log.info("Joined group '%s' (generation %s) with member_id %s",
- self.group_id, self.generation, self.member_id)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
- if response.leader_id == response.member_id:
- log.info("Elected group leader -- performing partition"
- " assignments using %s", self.protocol)
- self._on_join_leader(response).chain(future)
- else:
- self._on_join_follower().chain(future)
+ with self._lock:
+ if self.state is not MemberState.REBALANCING:
+ # if the consumer was woken up before a rebalance completes,
+ # we may have already left the group. In this case, we do
+ # not want to continue with the sync group.
+ future.failure(UnjoinedGroupException())
+ else:
+ self._generation = Generation(response.generation_id,
+ response.member_id,
+ response.group_protocol)
+ self.rejoin_needed = False
+
+ if response.leader_id == response.member_id:
+ log.info("Elected group leader -- performing partition"
+ " assignments using %s", self._generation.protocol)
+ self._on_join_leader(response).chain(future)
+ else:
+ self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator %s"
@@ -369,8 +506,8 @@ class BaseCoordinator(object):
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
- error = error_type(self.member_id)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ error = error_type(self._generation.member_id)
+ self.reset_generation()
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
@@ -400,10 +537,11 @@ class BaseCoordinator(object):
def _on_join_follower(self):
# send follower's sync group with an empty assignment
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
{})
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
@@ -427,10 +565,11 @@ class BaseCoordinator(object):
except Exception as e:
return Future().failure(e)
- request = SyncGroupRequest[0](
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = SyncGroupRequest[version](
self.group_id,
- self.generation,
- self.member_id,
+ self._generation.generation_id,
+ self._generation.member_id,
[(member_id,
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
@@ -460,14 +599,12 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("Successfully joined group %s with generation %s",
- self.group_id, self.generation)
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
# Always rejoin on error
- self.rejoin_needed = True
+ self.request_rejoin()
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
@@ -478,7 +615,7 @@ class BaseCoordinator(object):
Errors.IllegalGenerationError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
+ self.reset_generation()
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
@@ -516,30 +653,24 @@ class BaseCoordinator(object):
def _handle_group_coordinator_response(self, future, response):
log.debug("Received group coordinator response %s", response)
- if not self.coordinator_unknown():
- # We already found the coordinator, so ignore the request
- log.debug("Coordinator already known -- ignoring metadata response")
- future.success(self.coordinator_id)
- return
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- ok = self._client.cluster.add_group_coordinator(self.group_id, response)
- if not ok:
- # This could happen if coordinator metadata is different
- # than broker metadata
- future.failure(Errors.IllegalStateError())
- return
-
- self.coordinator_id = response.coordinator_id
- log.info("Discovered coordinator %s for group %s",
- self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
-
- # start sending heartbeats only if we have a valid generation
- if self.generation > 0:
- self.heartbeat_task.reset()
+ with self._lock:
+ ok = self._client.cluster.add_group_coordinator(self.group_id, response)
+ if not ok:
+ # This could happen if coordinator metadata is different
+ # than broker metadata
+ future.failure(Errors.IllegalStateError())
+ return
+
+ self.coordinator_id = response.coordinator_id
+ log.info("Discovered coordinator %s for group %s",
+ self.coordinator_id, self.group_id)
+ self._client.ready(self.coordinator_id)
+ self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)
+
elif error_type is Errors.GroupCoordinatorNotAvailableError:
log.debug("Group Coordinator Not Available; retry")
future.failure(error_type())
@@ -549,45 +680,74 @@ class BaseCoordinator(object):
future.failure(error)
else:
error = error_type()
- log.error("Unrecognized failure in Group Coordinator Request: %s",
- error)
+ log.error("Group coordinator lookup for group %s failed: %s",
+ self.group_id, error)
future.failure(error)
def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
- if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
- self.coordinator_id, self.group_id, error)
- self.coordinator_id = None
+ with self._lock:
+ if self.coordinator_id is not None:
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
+ self.coordinator_id = None
+
+ def generation(self):
+ """Get the current generation state if the group is stable.
+
+ Returns: the current generation or None if the group is unjoined/rebalancing
+ """
+ with self._lock:
+ if self.state is not MemberState.STABLE:
+ return None
+ return self._generation
+
+ def reset_generation(self):
+ """Reset the generation and memberId because we have fallen out of the group."""
+ with self._lock:
+ self._generation = Generation.NO_GENERATION
+ self.rejoin_needed = True
+ self.state = MemberState.UNJOINED
+
+ def request_rejoin(self):
+ self.rejoin_needed = True
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
- try:
- self._client.unschedule(self.heartbeat_task)
- except KeyError:
- pass
-
- if not self.coordinator_unknown() and self.generation > 0:
- # this is a minimal effort attempt to leave the group. we do not
- # attempt any resending if the request fails or times out.
- log.info('Leaving consumer group (%s).', self.group_id)
- request = LeaveGroupRequest[0](self.group_id, self.member_id)
- future = self._client.send(self.coordinator_id, request)
- future.add_callback(self._handle_leave_group_response)
- future.add_errback(log.error, "LeaveGroup request failed: %s")
- self._client.poll(future=future)
-
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ with self._lock:
+ if self._heartbeat_thread is not None:
+ self._heartbeat_thread.close()
+ self._heartbeat_thread = None
+ self.maybe_leave_group()
+
+ def maybe_leave_group(self):
+ """Leave the current group and reset local generation/memberId."""
+ with self._lock:
+ if (not self.coordinator_unknown()
+ and self.state is not MemberState.UNJOINED
+ and self._generation is not Generation.NO_GENERATION):
+
+ # this is a minimal effort attempt to leave the group. we do not
+ # attempt any resending if the request fails or times out.
+ log.info('Leaving consumer group (%s).', self.group_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
+ future = self._client.send(self.coordinator_id, request)
+ future.add_callback(self._handle_leave_group_response)
+ future.add_errback(log.error, "LeaveGroup request failed: %s")
+ self._client.poll(future=future)
+
+ self.reset_generation()
def _handle_leave_group_response(self, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("LeaveGroup request succeeded")
+ log.debug("LeaveGroup request for group %s returned successfully",
+ self.group_id)
else:
- log.error("LeaveGroup request failed: %s", error_type())
+ log.error("LeaveGroup request for group %s failed with error: %s",
+ self.group_id, error_type())
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
@@ -599,7 +759,10 @@ class BaseCoordinator(object):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)
- request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
+ version = 0 if self.config['api_version'] < (0, 11, 0) else 1
+ request = HeartbeatRequest[version](self.group_id,
+ self._generation.generation_id,
+ self._generation.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
@@ -619,24 +782,23 @@ class BaseCoordinator(object):
Errors.NotCoordinatorForGroupError):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
- self.coordinator_id)
+ self.coordinator())
self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat failed for group %s because it is"
" rebalancing", self.group_id)
- self.rejoin_needed = True
+ self.request_rejoin()
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
log.warning("Heartbeat failed for group %s: generation id is not "
" current.", self.group_id)
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
+ self.reset_generation()
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
error = error_type(self.group_id)
@@ -648,76 +810,6 @@ class BaseCoordinator(object):
future.failure(error)
-class HeartbeatTask(object):
- def __init__(self, coordinator):
- self._coordinator = coordinator
- self._heartbeat = coordinator.heartbeat
- self._client = coordinator._client
- self._request_in_flight = False
-
- def disable(self):
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
-
- def reset(self):
- # start or restart the heartbeat task to be executed at the next chance
- self._heartbeat.reset_session_timeout()
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
- if not self._request_in_flight:
- self._client.schedule(self, time.time())
-
- def __call__(self):
- if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin()):
- # no need to send the heartbeat we're not using auto-assignment
- # or if we are awaiting a rebalance
- log.info("Skipping heartbeat: no auto-assignment"
- " or waiting on rebalance")
- return
-
- if self._coordinator.coordinator_unknown():
- log.warning("Coordinator unknown during heartbeat -- will retry")
- self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
- return
-
- if self._heartbeat.session_expired():
- # we haven't received a successful heartbeat in one session interval
- # so mark the coordinator dead
- log.error("Heartbeat session expired - marking coordinator dead")
- self._coordinator.coordinator_dead('Heartbeat session expired')
- return
-
- if not self._heartbeat.should_heartbeat():
- # we don't need to heartbeat now, so reschedule for when we do
- ttl = self._heartbeat.ttl()
- log.debug("Heartbeat task unneeded now, retrying in %s", ttl)
- self._client.schedule(self, time.time() + ttl)
- else:
- self._heartbeat.sent_heartbeat()
- self._request_in_flight = True
- future = self._coordinator._send_heartbeat_request()
- future.add_callback(self._handle_heartbeat_success)
- future.add_errback(self._handle_heartbeat_failure)
-
- def _handle_heartbeat_success(self, v):
- log.debug("Received successful heartbeat")
- self._request_in_flight = False
- self._heartbeat.received_heartbeat()
- ttl = self._heartbeat.ttl()
- self._client.schedule(self, time.time() + ttl)
-
- def _handle_heartbeat_failure(self, e):
- log.warning("Heartbeat failed (%s); retrying", e)
- self._request_in_flight = False
- etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
- self._client.schedule(self, etd)
-
-
class GroupCoordinatorMetrics(object):
def __init__(self, heartbeat, metrics, prefix, tags=None):
self.heartbeat = heartbeat
@@ -764,6 +856,112 @@ class GroupCoordinatorMetrics(object):
metrics.add_metric(metrics.metric_name(
'last-heartbeat-seconds-ago', self.metric_group_name,
- 'The number of seconds since the last controller heartbeat',
+ 'The number of seconds since the last controller heartbeat was sent',
tags), AnonMeasurable(
lambda _, now: (now / 1000) - self.heartbeat.last_send))
+
+
+class HeartbeatThread(threading.Thread):
+ def __init__(self, coordinator):
+ super(HeartbeatThread, self).__init__()
+ self.name = threading.current_thread().name + '-heartbeat'
+ self.coordinator = coordinator
+ self.enabled = False
+ self.closed = False
+ self.failed = None
+
+ def enable(self):
+ with self.coordinator._lock:
+ self.enabled = True
+ self.coordinator.heartbeat.reset_timeouts()
+ self.coordinator._lock.notify()
+
+ def disable(self):
+ with self.coordinator._lock:
+ self.enabled = False
+
+ def close(self):
+ with self.coordinator._lock:
+ self.closed = True
+ self.coordinator._lock.notify()
+
+ def run(self):
+ try:
+ while not self.closed:
+ self._run_once()
+
+ log.debug('Heartbeat closed!')
+
+ except RuntimeError as e:
+ log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
+ self.coordinator.group_id, e)
+ self.failed = e
+
+ def _run_once(self):
+ with self.coordinator._lock:
+ if not self.enabled:
+ log.debug('Heartbeat disabled. Waiting')
+ self.coordinator._lock.wait()
+ log.debug('Heartbeat re-enabled.')
+ return
+
+ if self.coordinator.state is not MemberState.STABLE:
+ # the group is not stable (perhaps because we left the
+ # group or because the coordinator kicked us out), so
+ # disable heartbeats and wait for the main thread to rejoin.
+ log.debug('Group state is not stable, disabling heartbeats')
+ self.disable()
+ return
+
+ # TODO: When consumer.wakeup() is implemented, we need to
+ # disable here to prevent propagating an exception to this
+ # heartbeat thread
+ self.coordinator._client.poll(timeout_ms=0)
+
+ if self.coordinator.coordinator_unknown():
+ if not self.coordinator.lookup_coordinator().is_done:
+ self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
+
+ elif self.coordinator.heartbeat.session_timeout_expired():
+ # the session timeout has expired without seeing a
+ # successful heartbeat, so we should probably make sure
+ # the coordinator is still healthy.
+ log.debug('Heartbeat session expired, marking coordinator dead')
+ self.coordinator.coordinator_dead('Heartbeat session expired')
+
+ elif self.coordinator.heartbeat.poll_timeout_expired():
+ # the poll timeout has expired, which means that the
+ # foreground thread has stalled in between calls to
+ # poll(), so we explicitly leave the group.
+ log.debug('Heartbeat poll expired, leaving group')
+ self.coordinator.maybe_leave_group()
+
+ elif not self.coordinator.heartbeat.should_heartbeat():
+ # poll again after waiting for the retry backoff in case
+ # the heartbeat failed or the coordinator disconnected
+ log.debug('Not ready to heartbeat, waiting')
+ self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
+
+ else:
+ self.coordinator.heartbeat.sent_heartbeat()
+ future = self.coordinator._send_heartbeat_request()
+ future.add_callback(self._handle_heartbeat_success)
+ future.add_errback(self._handle_heartbeat_failure)
+
+ def _handle_heartbeat_success(self, result):
+ with self.coordinator._lock:
+ self.coordinator.heartbeat.received_heartbeat()
+
+ def _handle_heartbeat_failure(self, exception):
+ with self.coordinator._lock:
+ if isinstance(exception, Errors.RebalanceInProgressError):
+ # it is valid to continue heartbeating while the group is
+ # rebalancing. This ensures that the coordinator keeps the
+ # member in the group for as long as the duration of the
+ # rebalance timeout. If we stop sending heartbeats, however,
+ # then the session timeout may expire before we can rejoin.
+ self.coordinator.heartbeat.received_heartbeat()
+ else:
+ self.coordinator.heartbeat.fail_heartbeat()
+ # wake up the thread if it's sleeping to reschedule the heartbeat
+ self.coordinator._lock.notify()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index dee70f0..48dcad4 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,14 +1,13 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
-import copy
import collections
+import copy
import logging
import time
-import weakref
from kafka.vendor import six
-from .base import BaseCoordinator
+from .base import BaseCoordinator, Generation
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
@@ -30,12 +29,13 @@ class ConsumerCoordinator(BaseCoordinator):
'group_id': 'kafka-python-default-group',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, response: True,
+ 'default_offset_commit_callback': None,
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
+ 'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
- 'api_version': (0, 9),
+ 'api_version': (0, 10, 1),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
}
@@ -52,9 +52,9 @@ class ConsumerCoordinator(BaseCoordinator):
auto_commit_interval_ms (int): milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
default_offset_commit_callback (callable): called as
- callback(offsets, response) response will be either an Exception
- or a OffsetCommitResponse struct. This callback can be used to
- trigger custom actions when a commit request completes.
+ callback(offsets, exception) response will be either an Exception
+ or None. This callback can be used to trigger custom actions when
+ a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
@@ -83,17 +83,27 @@ class ConsumerCoordinator(BaseCoordinator):
if key in configs:
self.config[key] = configs[key]
- if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None:
- assert self.config['assignors'], 'Coordinator requires assignors'
-
self._subscription = subscription
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
- self._cluster.request_update()
- self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+ self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000
+ self.next_auto_commit_deadline = None
+ self.completed_offset_commits = collections.deque()
+
+ if self.config['default_offset_commit_callback'] is None:
+ self.config['default_offset_commit_callback'] = self._default_offset_commit_callback
+
+ if self.config['group_id'] is not None:
+ if self.config['api_version'] >= (0, 9):
+ if not self.config['assignors']:
+ raise Errors.KafkaConfigurationError('Coordinator requires assignors')
+ if self.config['api_version'] < (0, 10, 1):
+ if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
+ raise Errors.KafkaConfigurationError("Broker version %s does not support "
+ "different values for max_poll_interval_ms "
+ "and session_timeout_ms")
- self._auto_commit_task = None
if self.config['enable_auto_commit']:
if self.config['api_version'] < (0, 8, 1):
log.warning('Broker version (%s) does not support offset'
@@ -104,13 +114,14 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
self.config['enable_auto_commit'] = False
else:
- interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, self.config['metric_group_prefix'], self._subscription)
+ self._cluster.request_update()
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
@@ -210,8 +221,7 @@ class ConsumerCoordinator(BaseCoordinator):
assignor.on_assignment(assignment)
# reschedule the auto commit starting from now
- if self._auto_commit_task:
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
assigned = set(self._subscription.assigned_partitions())
log.info("Setting newly assigned partitions %s for group %s",
@@ -227,6 +237,54 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.listener, self.group_id,
assigned)
+ def poll(self):
+ """
+ Poll for coordinator events. Only applicable if group_id is set, and
+ broker version supports GroupCoordinators. This ensures that the
+ coordinator is known, and if using automatic partition assignment,
+ ensures that the consumer has joined the group. This also handles
+ periodic offset commits if they are enabled.
+ """
+ if self.group_id is None or self.config['api_version'] < (0, 8, 2):
+ return
+
+ self._invoke_completed_offset_commit_callbacks()
+ self.ensure_coordinator_ready()
+
+ if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
+ if self.need_rejoin():
+ # due to a race condition between the initial metadata fetch and the
+ # initial rebalance, we need to ensure that the metadata is fresh
+ # before joining initially, and then request the metadata update. If
+ # metadata update arrives while the rebalance is still pending (for
+ # example, when the join group is still inflight), then we will lose
+ # track of the fact that we need to rebalance again to reflect the
+ # change to the topic subscription. Without ensuring that the
+ # metadata is fresh, any metadata update that changes the topic
+ # subscriptions and arrives while a rebalance is in progress will
+ # essentially be ignored. See KAFKA-3949 for the complete
+ # description of the problem.
+ if self._subscription.subscribed_pattern:
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+
+ self.ensure_active_group()
+
+ self.poll_heartbeat()
+
+ self._maybe_auto_commit_offsets_async()
+
+ def time_to_next_poll(self):
+ """Return seconds (float) remaining until :meth:`.poll` should be called again"""
+ if not self.config['enable_auto_commit']:
+ return self.time_to_next_heartbeat()
+
+ if time.time() > self.next_auto_commit_deadline:
+ return 0
+
+ return min(self.next_auto_commit_deadline - time.time(),
+ self.time_to_next_heartbeat())
+
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
@@ -327,7 +385,7 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def close(self, autocommit=True):
"""Close the coordinator, leave the current group,
@@ -344,6 +402,11 @@ class ConsumerCoordinator(BaseCoordinator):
finally:
super(ConsumerCoordinator, self).close()
+ def _invoke_completed_offset_commit_callbacks(self):
+ while self.completed_offset_commits:
+ callback, offsets, exception = self.completed_offset_commits.popleft()
+ callback(offsets, exception)
+
def commit_offsets_async(self, offsets, callback=None):
"""Commit specific offsets asynchronously.
@@ -354,6 +417,7 @@ class ConsumerCoordinator(BaseCoordinator):
struct. This callback can be used to trigger custom actions when
a commit request completes.
"""
+ self._invoke_completed_offset_commit_callbacks()
if not self.coordinator_unknown():
self._do_commit_offsets_async(offsets, callback)
else:
@@ -367,7 +431,7 @@ class ConsumerCoordinator(BaseCoordinator):
future = self.lookup_coordinator()
future.add_callback(self._do_commit_offsets_async, offsets, callback)
if callback:
- future.add_errback(callback)
+ future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))
# ensure the commit has a chance to be transmitted (without blocking on
# its completion). Note that commits are treated as heartbeats by the
@@ -384,7 +448,7 @@ class ConsumerCoordinator(BaseCoordinator):
callback = self.config['default_offset_commit_callback']
self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets)
- future.add_both(callback, offsets)
+ future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res)))
return future
def commit_offsets_sync(self, offsets):
@@ -402,6 +466,7 @@ class ConsumerCoordinator(BaseCoordinator):
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
offsets.values()))
+ self._invoke_completed_offset_commit_callbacks()
if not offsets:
return
@@ -417,26 +482,24 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _maybe_auto_commit_offsets_sync(self):
- if self._auto_commit_task is None:
- return
-
- try:
- self.commit_offsets_sync(self._subscription.all_consumed_offsets())
-
- # The three main group membership errors are known and should not
- # require a stacktrace -- just a warning
- except (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError,
- Errors.RebalanceInProgressError):
- log.warning("Offset commit failed: group membership out of date"
- " This is likely to cause duplicate message"
- " delivery.")
- except Exception:
- log.exception("Offset commit failed: This is likely to cause"
- " duplicate message delivery")
+ if self.config['enable_auto_commit']:
+ try:
+ self.commit_offsets_sync(self._subscription.all_consumed_offsets())
+
+ # The three main group membership errors are known and should not
+ # require a stacktrace -- just a warning
+ except (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError,
+ Errors.RebalanceInProgressError):
+ log.warning("Offset commit failed: group membership out of date"
+ " This is likely to cause duplicate message"
+ " delivery.")
+ except Exception:
+ log.exception("Offset commit failed: This is likely to cause"
+ " duplicate message delivery")
def _send_offset_commit_request(self, offsets):
"""Commit offsets for the specified list of topics and partitions.
@@ -458,23 +521,34 @@ class ConsumerCoordinator(BaseCoordinator):
offsets.values()))
if not offsets:
log.debug('No offsets to commit')
- return Future().success(True)
+ return Future().success(None)
- elif self.coordinator_unknown():
+ node_id = self.coordinator()
+ if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
- node_id = self.coordinator_id
# create the offset commit request
offset_data = collections.defaultdict(dict)
for tp, offset in six.iteritems(offsets):
offset_data[tp.topic][tp.partition] = offset
+ if self._subscription.partitions_auto_assigned():
+ generation = self.generation()
+ else:
+ generation = Generation.NO_GENERATION
+
+ # if the generation is None, we are not part of an active group
+ # (and we expect to be). The only thing we can do is fail the commit
+ # and let the user rejoin the group in poll()
+ if self.config['api_version'] >= (0, 9) and generation is None:
+ return Future().failure(Errors.CommitFailedError())
+
if self.config['api_version'] >= (0, 9):
request = OffsetCommitRequest[2](
self.group_id,
- self.generation,
- self.member_id,
+ generation.generation_id,
+ generation.member_id,
OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
[(
topic, [(
@@ -568,7 +642,7 @@ class ConsumerCoordinator(BaseCoordinator):
error = error_type(self.group_id)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error)
- self._subscription.mark_for_reassignment()
+ self.reset_generation()
future.failure(Errors.CommitFailedError(
"Commit cannot be completed since the group has"
" already rebalanced and assigned the partitions to"
@@ -593,7 +667,7 @@ class ConsumerCoordinator(BaseCoordinator):
unauthorized_topics, self.group_id)
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
- future.success(True)
+ future.success(None)
def _send_offset_fetch_request(self, partitions):
"""Fetch the committed offsets for a set of partitions.
@@ -612,11 +686,10 @@ class ConsumerCoordinator(BaseCoordinator):
if not partitions:
return Future().success({})
- elif self.coordinator_unknown():
+ node_id = self.coordinator()
+ if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
- node_id = self.coordinator_id
-
# Verify node is ready
if not self._client.ready(node_id):
log.debug("Node %s not ready -- failing offset fetch request",
@@ -665,11 +738,6 @@ class ConsumerCoordinator(BaseCoordinator):
# re-discover the coordinator and retry
self.coordinator_dead(error_type())
future.failure(error)
- elif error_type in (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError):
- # need to re-join group
- self._subscription.mark_for_reassignment()
- future.failure(error)
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warning("OffsetFetchRequest -- unknown topic %s"
" (have you committed any offsets yet?)",
@@ -689,50 +757,28 @@ class ConsumerCoordinator(BaseCoordinator):
" %s", self.group_id, tp)
future.success(offsets)
+ def _default_offset_commit_callback(self, offsets, exception):
+ if exception is not None:
+ log.error("Offset commit failed: %s", exception)
-class AutoCommitTask(object):
- def __init__(self, coordinator, interval):
- self._coordinator = coordinator
- self._client = coordinator._client
- self._interval = interval
-
- def reschedule(self, at=None):
- if at is None:
- at = time.time() + self._interval
- self._client.schedule(self, at)
-
- def __call__(self):
- if self._coordinator.coordinator_unknown():
- log.debug("Cannot auto-commit offsets for group %s because the"
- " coordinator is unknown", self._coordinator.group_id)
- backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
- self.reschedule(time.time() + backoff)
- return
-
- self._coordinator.commit_offsets_async(
- self._coordinator._subscription.all_consumed_offsets(),
- self._handle_commit_response)
-
- def _handle_commit_response(self, offsets, result):
- if result is True:
- log.debug("Successfully auto-committed offsets for group %s",
- self._coordinator.group_id)
- next_at = time.time() + self._interval
- elif not isinstance(result, BaseException):
- raise Errors.IllegalStateError(
- 'Unrecognized result in _handle_commit_response: %s'
- % result)
- elif hasattr(result, 'retriable') and result.retriable:
- log.debug("Failed to auto-commit offsets for group %s: %s,"
- " will retry immediately", self._coordinator.group_id,
- result)
- next_at = time.time()
- else:
+ def _commit_offsets_async_on_complete(self, offsets, exception):
+ if exception is not None:
log.warning("Auto offset commit failed for group %s: %s",
- self._coordinator.group_id, result)
- next_at = time.time() + self._interval
+ self.group_id, exception)
+ if getattr(exception, 'retriable', False):
+ self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
+ else:
+ log.debug("Completed autocommit of offsets %s for group %s",
+ offsets, self.group_id)
- self.reschedule(next_at)
+ def _maybe_auto_commit_offsets_async(self):
+ if self.config['enable_auto_commit']:
+ if self.coordinator_unknown():
+ self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
+ elif time.time() > self.next_auto_commit_deadline:
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
+ self.commit_offsets_async(self._subscription.all_consumed_offsets(),
+ self._commit_offsets_async_on_complete)
class ConsumerCoordinatorMetrics(object):
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index fddf298..2f5930b 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import copy
import time
@@ -6,8 +6,11 @@ import time
class Heartbeat(object):
DEFAULT_CONFIG = {
+ 'group_id': None,
'heartbeat_interval_ms': 3000,
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000,
+ 'max_poll_interval_ms': 300000,
+ 'retry_backoff_ms': 100,
}
def __init__(self, **configs):
@@ -16,32 +19,50 @@ class Heartbeat(object):
if key in configs:
self.config[key] = configs[key]
- assert (self.config['heartbeat_interval_ms']
- <= self.config['session_timeout_ms']), (
- 'Heartbeat interval must be lower than the session timeout')
+ if self.config['group_id'] is not None:
+ assert (self.config['heartbeat_interval_ms']
+ <= self.config['session_timeout_ms']), (
+ 'Heartbeat interval must be lower than the session timeout')
- self.interval = self.config['heartbeat_interval_ms'] / 1000.0
- self.timeout = self.config['session_timeout_ms'] / 1000.0
self.last_send = -1 * float('inf')
self.last_receive = -1 * float('inf')
+ self.last_poll = -1 * float('inf')
self.last_reset = time.time()
+ self.heartbeat_failed = None
+
+ def poll(self):
+ self.last_poll = time.time()
def sent_heartbeat(self):
self.last_send = time.time()
+ self.heartbeat_failed = False
+
+ def fail_heartbeat(self):
+ self.heartbeat_failed = True
def received_heartbeat(self):
self.last_receive = time.time()
- def ttl(self):
- last_beat = max(self.last_send, self.last_reset)
- return max(0, last_beat + self.interval - time.time())
+ def time_to_next_heartbeat(self):
+ """Returns seconds (float) remaining before next heartbeat should be sent"""
+ time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset)
+ if self.heartbeat_failed:
+ delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000
+ else:
+ delay_to_next_heartbeat = self.config['heartbeat_interval_ms'] / 1000
+ return max(0, delay_to_next_heartbeat - time_since_last_heartbeat)
def should_heartbeat(self):
- return self.ttl() == 0
+ return self.time_to_next_heartbeat() == 0
- def session_expired(self):
+ def session_timeout_expired(self):
last_recv = max(self.last_receive, self.last_reset)
- return (time.time() - last_recv) > self.timeout
+ return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000)
- def reset_session_timeout(self):
+ def reset_timeouts(self):
self.last_reset = time.time()
+ self.last_poll = time.time()
+ self.heartbeat_failed = False
+
+ def poll_timeout_expired(self):
+ return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000)