diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-12-21 14:46:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-21 14:46:10 -0800 |
commit | ad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch) | |
tree | f1993351b2c6487e8e623cefabf42ddf7477f666 /kafka/coordinator/consumer.py | |
parent | 995664c7d407009a0a1030c7541848eb5ad51c97 (diff) | |
download | kafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz |
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 242 |
1 files changed, 144 insertions, 98 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index dee70f0..48dcad4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -1,14 +1,13 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division -import copy import collections +import copy import logging import time -import weakref from kafka.vendor import six -from .base import BaseCoordinator +from .base import BaseCoordinator, Generation from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol @@ -30,12 +29,13 @@ class ConsumerCoordinator(BaseCoordinator): 'group_id': 'kafka-python-default-group', 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, - 'default_offset_commit_callback': lambda offsets, response: True, + 'default_offset_commit_callback': None, 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), - 'session_timeout_ms': 30000, + 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, + 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, - 'api_version': (0, 9), + 'api_version': (0, 10, 1), 'exclude_internal_topics': True, 'metric_group_prefix': 'consumer' } @@ -52,9 +52,9 @@ class ConsumerCoordinator(BaseCoordinator): auto_commit_interval_ms (int): milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000. default_offset_commit_callback (callable): called as - callback(offsets, response) response will be either an Exception - or a OffsetCommitResponse struct. This callback can be used to - trigger custom actions when a commit request completes. + callback(offsets, exception) response will be either an Exception + or None. This callback can be used to trigger custom actions when + a commit request completes. assignors (list): List of objects to use to distribute partition ownership amongst consumer instances when group management is used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] @@ -83,17 +83,27 @@ class ConsumerCoordinator(BaseCoordinator): if key in configs: self.config[key] = configs[key] - if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None: - assert self.config['assignors'], 'Coordinator requires assignors' - self._subscription = subscription self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) self._assignment_snapshot = None self._cluster = client.cluster - self._cluster.request_update() - self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 + self.next_auto_commit_deadline = None + self.completed_offset_commits = collections.deque() + + if self.config['default_offset_commit_callback'] is None: + self.config['default_offset_commit_callback'] = self._default_offset_commit_callback + + if self.config['group_id'] is not None: + if self.config['api_version'] >= (0, 9): + if not self.config['assignors']: + raise Errors.KafkaConfigurationError('Coordinator requires assignors') + if self.config['api_version'] < (0, 10, 1): + if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: + raise Errors.KafkaConfigurationError("Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms") - self._auto_commit_task = None if self.config['enable_auto_commit']: if self.config['api_version'] < (0, 8, 1): log.warning('Broker version (%s) does not support offset' @@ -104,13 +114,14 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('group_id is None: disabling auto-commit.') self.config['enable_auto_commit'] = False else: - interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) - self._auto_commit_task.reschedule() + self.next_auto_commit_deadline = time.time() + self.auto_commit_interval self.consumer_sensors = ConsumerCoordinatorMetrics( metrics, self.config['metric_group_prefix'], self._subscription) + self._cluster.request_update() + self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + def __del__(self): if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) @@ -210,8 +221,7 @@ class ConsumerCoordinator(BaseCoordinator): assignor.on_assignment(assignment) # reschedule the auto commit starting from now - if self._auto_commit_task: - self._auto_commit_task.reschedule() + self.next_auto_commit_deadline = time.time() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) log.info("Setting newly assigned partitions %s for group %s", @@ -227,6 +237,54 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.listener, self.group_id, assigned) + def poll(self): + """ + Poll for coordinator events. Only applicable if group_id is set, and + broker version supports GroupCoordinators. This ensures that the + coordinator is known, and if using automatic partition assignment, + ensures that the consumer has joined the group. This also handles + periodic offset commits if they are enabled. + """ + if self.group_id is None or self.config['api_version'] < (0, 8, 2): + return + + self._invoke_completed_offset_commit_callbacks() + self.ensure_coordinator_ready() + + if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if self.need_rejoin(): + # due to a race condition between the initial metadata fetch and the + # initial rebalance, we need to ensure that the metadata is fresh + # before joining initially, and then request the metadata update. If + # metadata update arrives while the rebalance is still pending (for + # example, when the join group is still inflight), then we will lose + # track of the fact that we need to rebalance again to reflect the + # change to the topic subscription. Without ensuring that the + # metadata is fresh, any metadata update that changes the topic + # subscriptions and arrives while a rebalance is in progress will + # essentially be ignored. See KAFKA-3949 for the complete + # description of the problem. + if self._subscription.subscribed_pattern: + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update) + + self.ensure_active_group() + + self.poll_heartbeat() + + self._maybe_auto_commit_offsets_async() + + def time_to_next_poll(self): + """Return seconds (float) remaining until :meth:`.poll` should be called again""" + if not self.config['enable_auto_commit']: + return self.time_to_next_heartbeat() + + if time.time() > self.next_auto_commit_deadline: + return 0 + + return min(self.next_auto_commit_deadline - time.time(), + self.time_to_next_heartbeat()) + def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy @@ -327,7 +385,7 @@ class ConsumerCoordinator(BaseCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def close(self, autocommit=True): """Close the coordinator, leave the current group, @@ -344,6 +402,11 @@ class ConsumerCoordinator(BaseCoordinator): finally: super(ConsumerCoordinator, self).close() + def _invoke_completed_offset_commit_callbacks(self): + while self.completed_offset_commits: + callback, offsets, exception = self.completed_offset_commits.popleft() + callback(offsets, exception) + def commit_offsets_async(self, offsets, callback=None): """Commit specific offsets asynchronously. @@ -354,6 +417,7 @@ class ConsumerCoordinator(BaseCoordinator): struct. This callback can be used to trigger custom actions when a commit request completes. """ + self._invoke_completed_offset_commit_callbacks() if not self.coordinator_unknown(): self._do_commit_offsets_async(offsets, callback) else: @@ -367,7 +431,7 @@ class ConsumerCoordinator(BaseCoordinator): future = self.lookup_coordinator() future.add_callback(self._do_commit_offsets_async, offsets, callback) if callback: - future.add_errback(callback) + future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) # ensure the commit has a chance to be transmitted (without blocking on # its completion). Note that commits are treated as heartbeats by the @@ -384,7 +448,7 @@ class ConsumerCoordinator(BaseCoordinator): callback = self.config['default_offset_commit_callback'] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) - future.add_both(callback, offsets) + future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) return future def commit_offsets_sync(self, offsets): @@ -402,6 +466,7 @@ class ConsumerCoordinator(BaseCoordinator): assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) + self._invoke_completed_offset_commit_callbacks() if not offsets: return @@ -417,26 +482,24 @@ class ConsumerCoordinator(BaseCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def _maybe_auto_commit_offsets_sync(self): - if self._auto_commit_task is None: - return - - try: - self.commit_offsets_sync(self._subscription.all_consumed_offsets()) - - # The three main group membership errors are known and should not - # require a stacktrace -- just a warning - except (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): - log.warning("Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery.") - except Exception: - log.exception("Offset commit failed: This is likely to cause" - " duplicate message delivery") + if self.config['enable_auto_commit']: + try: + self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + + # The three main group membership errors are known and should not + # require a stacktrace -- just a warning + except (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): + log.warning("Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery.") + except Exception: + log.exception("Offset commit failed: This is likely to cause" + " duplicate message delivery") def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. @@ -458,23 +521,34 @@ class ConsumerCoordinator(BaseCoordinator): offsets.values())) if not offsets: log.debug('No offsets to commit') - return Future().success(True) + return Future().success(None) - elif self.coordinator_unknown(): + node_id = self.coordinator() + if node_id is None: return Future().failure(Errors.GroupCoordinatorNotAvailableError) - node_id = self.coordinator_id # create the offset commit request offset_data = collections.defaultdict(dict) for tp, offset in six.iteritems(offsets): offset_data[tp.topic][tp.partition] = offset + if self._subscription.partitions_auto_assigned(): + generation = self.generation() + else: + generation = Generation.NO_GENERATION + + # if the generation is None, we are not part of an active group + # (and we expect to be). The only thing we can do is fail the commit + # and let the user rejoin the group in poll() + if self.config['api_version'] >= (0, 9) and generation is None: + return Future().failure(Errors.CommitFailedError()) + if self.config['api_version'] >= (0, 9): request = OffsetCommitRequest[2]( self.group_id, - self.generation, - self.member_id, + generation.generation_id, + generation.member_id, OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, [( topic, [( @@ -568,7 +642,7 @@ class ConsumerCoordinator(BaseCoordinator): error = error_type(self.group_id) log.debug("OffsetCommit for group %s failed: %s", self.group_id, error) - self._subscription.mark_for_reassignment() + self.reset_generation() future.failure(Errors.CommitFailedError( "Commit cannot be completed since the group has" " already rebalanced and assigned the partitions to" @@ -593,7 +667,7 @@ class ConsumerCoordinator(BaseCoordinator): unauthorized_topics, self.group_id) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: - future.success(True) + future.success(None) def _send_offset_fetch_request(self, partitions): """Fetch the committed offsets for a set of partitions. @@ -612,11 +686,10 @@ class ConsumerCoordinator(BaseCoordinator): if not partitions: return Future().success({}) - elif self.coordinator_unknown(): + node_id = self.coordinator() + if node_id is None: return Future().failure(Errors.GroupCoordinatorNotAvailableError) - node_id = self.coordinator_id - # Verify node is ready if not self._client.ready(node_id): log.debug("Node %s not ready -- failing offset fetch request", @@ -665,11 +738,6 @@ class ConsumerCoordinator(BaseCoordinator): # re-discover the coordinator and retry self.coordinator_dead(error_type()) future.failure(error) - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError): - # need to re-join group - self._subscription.mark_for_reassignment() - future.failure(error) elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("OffsetFetchRequest -- unknown topic %s" " (have you committed any offsets yet?)", @@ -689,50 +757,28 @@ class ConsumerCoordinator(BaseCoordinator): " %s", self.group_id, tp) future.success(offsets) + def _default_offset_commit_callback(self, offsets, exception): + if exception is not None: + log.error("Offset commit failed: %s", exception) -class AutoCommitTask(object): - def __init__(self, coordinator, interval): - self._coordinator = coordinator - self._client = coordinator._client - self._interval = interval - - def reschedule(self, at=None): - if at is None: - at = time.time() + self._interval - self._client.schedule(self, at) - - def __call__(self): - if self._coordinator.coordinator_unknown(): - log.debug("Cannot auto-commit offsets for group %s because the" - " coordinator is unknown", self._coordinator.group_id) - backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 - self.reschedule(time.time() + backoff) - return - - self._coordinator.commit_offsets_async( - self._coordinator._subscription.all_consumed_offsets(), - self._handle_commit_response) - - def _handle_commit_response(self, offsets, result): - if result is True: - log.debug("Successfully auto-committed offsets for group %s", - self._coordinator.group_id) - next_at = time.time() + self._interval - elif not isinstance(result, BaseException): - raise Errors.IllegalStateError( - 'Unrecognized result in _handle_commit_response: %s' - % result) - elif hasattr(result, 'retriable') and result.retriable: - log.debug("Failed to auto-commit offsets for group %s: %s," - " will retry immediately", self._coordinator.group_id, - result) - next_at = time.time() - else: + def _commit_offsets_async_on_complete(self, offsets, exception): + if exception is not None: log.warning("Auto offset commit failed for group %s: %s", - self._coordinator.group_id, result) - next_at = time.time() + self._interval + self.group_id, exception) + if getattr(exception, 'retriable', False): + self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + else: + log.debug("Completed autocommit of offsets %s for group %s", + offsets, self.group_id) - self.reschedule(next_at) + def _maybe_auto_commit_offsets_async(self): + if self.config['enable_auto_commit']: + if self.coordinator_unknown(): + self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 + elif time.time() > self.next_auto_commit_deadline: + self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.commit_offsets_async(self._subscription.all_consumed_offsets(), + self._commit_offsets_async_on_complete) class ConsumerCoordinatorMetrics(object): |