summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-21 14:46:10 -0800
committerGitHub <noreply@github.com>2017-12-21 14:46:10 -0800
commitad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch)
treef1993351b2c6487e8e623cefabf42ddf7477f666 /kafka/coordinator/consumer.py
parent995664c7d407009a0a1030c7541848eb5ad51c97 (diff)
downloadkafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py242
1 files changed, 144 insertions, 98 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index dee70f0..48dcad4 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,14 +1,13 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
-import copy
import collections
+import copy
import logging
import time
-import weakref
from kafka.vendor import six
-from .base import BaseCoordinator
+from .base import BaseCoordinator, Generation
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
@@ -30,12 +29,13 @@ class ConsumerCoordinator(BaseCoordinator):
'group_id': 'kafka-python-default-group',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
- 'default_offset_commit_callback': lambda offsets, response: True,
+ 'default_offset_commit_callback': None,
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'session_timeout_ms': 30000,
+ 'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
+ 'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
- 'api_version': (0, 9),
+ 'api_version': (0, 10, 1),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
}
@@ -52,9 +52,9 @@ class ConsumerCoordinator(BaseCoordinator):
auto_commit_interval_ms (int): milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
default_offset_commit_callback (callable): called as
- callback(offsets, response) response will be either an Exception
- or a OffsetCommitResponse struct. This callback can be used to
- trigger custom actions when a commit request completes.
+ callback(offsets, exception) response will be either an Exception
+ or None. This callback can be used to trigger custom actions when
+ a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
@@ -83,17 +83,27 @@ class ConsumerCoordinator(BaseCoordinator):
if key in configs:
self.config[key] = configs[key]
- if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None:
- assert self.config['assignors'], 'Coordinator requires assignors'
-
self._subscription = subscription
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
- self._cluster.request_update()
- self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+ self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000
+ self.next_auto_commit_deadline = None
+ self.completed_offset_commits = collections.deque()
+
+ if self.config['default_offset_commit_callback'] is None:
+ self.config['default_offset_commit_callback'] = self._default_offset_commit_callback
+
+ if self.config['group_id'] is not None:
+ if self.config['api_version'] >= (0, 9):
+ if not self.config['assignors']:
+ raise Errors.KafkaConfigurationError('Coordinator requires assignors')
+ if self.config['api_version'] < (0, 10, 1):
+ if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']:
+ raise Errors.KafkaConfigurationError("Broker version %s does not support "
+ "different values for max_poll_interval_ms "
+ "and session_timeout_ms")
- self._auto_commit_task = None
if self.config['enable_auto_commit']:
if self.config['api_version'] < (0, 8, 1):
log.warning('Broker version (%s) does not support offset'
@@ -104,13 +114,14 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
self.config['enable_auto_commit'] = False
else:
- interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, self.config['metric_group_prefix'], self._subscription)
+ self._cluster.request_update()
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
+
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
@@ -210,8 +221,7 @@ class ConsumerCoordinator(BaseCoordinator):
assignor.on_assignment(assignment)
# reschedule the auto commit starting from now
- if self._auto_commit_task:
- self._auto_commit_task.reschedule()
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
assigned = set(self._subscription.assigned_partitions())
log.info("Setting newly assigned partitions %s for group %s",
@@ -227,6 +237,54 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.listener, self.group_id,
assigned)
+ def poll(self):
+ """
+ Poll for coordinator events. Only applicable if group_id is set, and
+ broker version supports GroupCoordinators. This ensures that the
+ coordinator is known, and if using automatic partition assignment,
+ ensures that the consumer has joined the group. This also handles
+ periodic offset commits if they are enabled.
+ """
+ if self.group_id is None or self.config['api_version'] < (0, 8, 2):
+ return
+
+ self._invoke_completed_offset_commit_callbacks()
+ self.ensure_coordinator_ready()
+
+ if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
+ if self.need_rejoin():
+ # due to a race condition between the initial metadata fetch and the
+ # initial rebalance, we need to ensure that the metadata is fresh
+ # before joining initially, and then request the metadata update. If
+ # metadata update arrives while the rebalance is still pending (for
+ # example, when the join group is still inflight), then we will lose
+ # track of the fact that we need to rebalance again to reflect the
+ # change to the topic subscription. Without ensuring that the
+ # metadata is fresh, any metadata update that changes the topic
+ # subscriptions and arrives while a rebalance is in progress will
+ # essentially be ignored. See KAFKA-3949 for the complete
+ # description of the problem.
+ if self._subscription.subscribed_pattern:
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+
+ self.ensure_active_group()
+
+ self.poll_heartbeat()
+
+ self._maybe_auto_commit_offsets_async()
+
+ def time_to_next_poll(self):
+ """Return seconds (float) remaining until :meth:`.poll` should be called again"""
+ if not self.config['enable_auto_commit']:
+ return self.time_to_next_heartbeat()
+
+ if time.time() > self.next_auto_commit_deadline:
+ return 0
+
+ return min(self.next_auto_commit_deadline - time.time(),
+ self.time_to_next_heartbeat())
+
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
@@ -327,7 +385,7 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def close(self, autocommit=True):
"""Close the coordinator, leave the current group,
@@ -344,6 +402,11 @@ class ConsumerCoordinator(BaseCoordinator):
finally:
super(ConsumerCoordinator, self).close()
+ def _invoke_completed_offset_commit_callbacks(self):
+ while self.completed_offset_commits:
+ callback, offsets, exception = self.completed_offset_commits.popleft()
+ callback(offsets, exception)
+
def commit_offsets_async(self, offsets, callback=None):
"""Commit specific offsets asynchronously.
@@ -354,6 +417,7 @@ class ConsumerCoordinator(BaseCoordinator):
struct. This callback can be used to trigger custom actions when
a commit request completes.
"""
+ self._invoke_completed_offset_commit_callbacks()
if not self.coordinator_unknown():
self._do_commit_offsets_async(offsets, callback)
else:
@@ -367,7 +431,7 @@ class ConsumerCoordinator(BaseCoordinator):
future = self.lookup_coordinator()
future.add_callback(self._do_commit_offsets_async, offsets, callback)
if callback:
- future.add_errback(callback)
+ future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))
# ensure the commit has a chance to be transmitted (without blocking on
# its completion). Note that commits are treated as heartbeats by the
@@ -384,7 +448,7 @@ class ConsumerCoordinator(BaseCoordinator):
callback = self.config['default_offset_commit_callback']
self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets)
- future.add_both(callback, offsets)
+ future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res)))
return future
def commit_offsets_sync(self, offsets):
@@ -402,6 +466,7 @@ class ConsumerCoordinator(BaseCoordinator):
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
offsets.values()))
+ self._invoke_completed_offset_commit_callbacks()
if not offsets:
return
@@ -417,26 +482,24 @@ class ConsumerCoordinator(BaseCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _maybe_auto_commit_offsets_sync(self):
- if self._auto_commit_task is None:
- return
-
- try:
- self.commit_offsets_sync(self._subscription.all_consumed_offsets())
-
- # The three main group membership errors are known and should not
- # require a stacktrace -- just a warning
- except (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError,
- Errors.RebalanceInProgressError):
- log.warning("Offset commit failed: group membership out of date"
- " This is likely to cause duplicate message"
- " delivery.")
- except Exception:
- log.exception("Offset commit failed: This is likely to cause"
- " duplicate message delivery")
+ if self.config['enable_auto_commit']:
+ try:
+ self.commit_offsets_sync(self._subscription.all_consumed_offsets())
+
+ # The three main group membership errors are known and should not
+ # require a stacktrace -- just a warning
+ except (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError,
+ Errors.RebalanceInProgressError):
+ log.warning("Offset commit failed: group membership out of date"
+ " This is likely to cause duplicate message"
+ " delivery.")
+ except Exception:
+ log.exception("Offset commit failed: This is likely to cause"
+ " duplicate message delivery")
def _send_offset_commit_request(self, offsets):
"""Commit offsets for the specified list of topics and partitions.
@@ -458,23 +521,34 @@ class ConsumerCoordinator(BaseCoordinator):
offsets.values()))
if not offsets:
log.debug('No offsets to commit')
- return Future().success(True)
+ return Future().success(None)
- elif self.coordinator_unknown():
+ node_id = self.coordinator()
+ if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
- node_id = self.coordinator_id
# create the offset commit request
offset_data = collections.defaultdict(dict)
for tp, offset in six.iteritems(offsets):
offset_data[tp.topic][tp.partition] = offset
+ if self._subscription.partitions_auto_assigned():
+ generation = self.generation()
+ else:
+ generation = Generation.NO_GENERATION
+
+ # if the generation is None, we are not part of an active group
+ # (and we expect to be). The only thing we can do is fail the commit
+ # and let the user rejoin the group in poll()
+ if self.config['api_version'] >= (0, 9) and generation is None:
+ return Future().failure(Errors.CommitFailedError())
+
if self.config['api_version'] >= (0, 9):
request = OffsetCommitRequest[2](
self.group_id,
- self.generation,
- self.member_id,
+ generation.generation_id,
+ generation.member_id,
OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
[(
topic, [(
@@ -568,7 +642,7 @@ class ConsumerCoordinator(BaseCoordinator):
error = error_type(self.group_id)
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error)
- self._subscription.mark_for_reassignment()
+ self.reset_generation()
future.failure(Errors.CommitFailedError(
"Commit cannot be completed since the group has"
" already rebalanced and assigned the partitions to"
@@ -593,7 +667,7 @@ class ConsumerCoordinator(BaseCoordinator):
unauthorized_topics, self.group_id)
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
- future.success(True)
+ future.success(None)
def _send_offset_fetch_request(self, partitions):
"""Fetch the committed offsets for a set of partitions.
@@ -612,11 +686,10 @@ class ConsumerCoordinator(BaseCoordinator):
if not partitions:
return Future().success({})
- elif self.coordinator_unknown():
+ node_id = self.coordinator()
+ if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
- node_id = self.coordinator_id
-
# Verify node is ready
if not self._client.ready(node_id):
log.debug("Node %s not ready -- failing offset fetch request",
@@ -665,11 +738,6 @@ class ConsumerCoordinator(BaseCoordinator):
# re-discover the coordinator and retry
self.coordinator_dead(error_type())
future.failure(error)
- elif error_type in (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError):
- # need to re-join group
- self._subscription.mark_for_reassignment()
- future.failure(error)
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warning("OffsetFetchRequest -- unknown topic %s"
" (have you committed any offsets yet?)",
@@ -689,50 +757,28 @@ class ConsumerCoordinator(BaseCoordinator):
" %s", self.group_id, tp)
future.success(offsets)
+ def _default_offset_commit_callback(self, offsets, exception):
+ if exception is not None:
+ log.error("Offset commit failed: %s", exception)
-class AutoCommitTask(object):
- def __init__(self, coordinator, interval):
- self._coordinator = coordinator
- self._client = coordinator._client
- self._interval = interval
-
- def reschedule(self, at=None):
- if at is None:
- at = time.time() + self._interval
- self._client.schedule(self, at)
-
- def __call__(self):
- if self._coordinator.coordinator_unknown():
- log.debug("Cannot auto-commit offsets for group %s because the"
- " coordinator is unknown", self._coordinator.group_id)
- backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
- self.reschedule(time.time() + backoff)
- return
-
- self._coordinator.commit_offsets_async(
- self._coordinator._subscription.all_consumed_offsets(),
- self._handle_commit_response)
-
- def _handle_commit_response(self, offsets, result):
- if result is True:
- log.debug("Successfully auto-committed offsets for group %s",
- self._coordinator.group_id)
- next_at = time.time() + self._interval
- elif not isinstance(result, BaseException):
- raise Errors.IllegalStateError(
- 'Unrecognized result in _handle_commit_response: %s'
- % result)
- elif hasattr(result, 'retriable') and result.retriable:
- log.debug("Failed to auto-commit offsets for group %s: %s,"
- " will retry immediately", self._coordinator.group_id,
- result)
- next_at = time.time()
- else:
+ def _commit_offsets_async_on_complete(self, offsets, exception):
+ if exception is not None:
log.warning("Auto offset commit failed for group %s: %s",
- self._coordinator.group_id, result)
- next_at = time.time() + self._interval
+ self.group_id, exception)
+ if getattr(exception, 'retriable', False):
+ self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
+ else:
+ log.debug("Completed autocommit of offsets %s for group %s",
+ offsets, self.group_id)
- self.reschedule(next_at)
+ def _maybe_auto_commit_offsets_async(self):
+ if self.config['enable_auto_commit']:
+ if self.coordinator_unknown():
+ self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
+ elif time.time() > self.next_auto_commit_deadline:
+ self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
+ self.commit_offsets_async(self._subscription.all_consumed_offsets(),
+ self._commit_offsets_async_on_complete)
class ConsumerCoordinatorMetrics(object):