summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-28 12:39:34 -0700
committerGitHub <noreply@github.com>2016-09-28 12:39:34 -0700
commit9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af (patch)
treeae679983f7206ff6d0058fa551aa4f8380612e42 /kafka/consumer/fetcher.py
parentb8717b4b79462e83344f49bbd42312cf521d84aa (diff)
downloadkafka-python-9ee77dfdbc4aeb5723ce7ebdae76f8b7141962af.tar.gz
KAFKA-3007: KafkaConsumer max_poll_records (#831)
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py226
1 files changed, 91 insertions, 135 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index f5d44b1..15fa1c9 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -4,6 +4,7 @@ import collections
import copy
import logging
import random
+import sys
import time
from kafka.vendor import six
@@ -39,6 +40,7 @@ class Fetcher(six.Iterator):
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
+ 'max_poll_records': sys.maxsize,
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
@@ -92,11 +94,10 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
- self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
- def init_fetches(self):
+ def send_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
Note: noop if there are unconsumed records internal to the fetcher
@@ -104,16 +105,6 @@ class Fetcher(six.Iterator):
Returns:
List of Futures: each future resolves to a FetchResponse
"""
- # We need to be careful when creating fetch records during iteration
- # so we verify that there are no records in the deque, or in an
- # iterator
- if self._records or self._iterator:
- log.debug('Skipping init_fetches because there are unconsumed'
- ' records internally')
- return []
- return self._init_fetches()
-
- def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -291,10 +282,12 @@ class Fetcher(six.Iterator):
copied_record_too_large_partitions,
self.config['max_partition_fetch_bytes'])
- def fetched_records(self):
+ def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
- Incompatible with iterator interface - use one or the other, not both.
+ Arguments:
+ max_records (int): Maximum number of records returned. Defaults
+ to max_poll_records configuration.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -304,32 +297,44 @@ class Fetcher(six.Iterator):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
- AssertionError: if used with iterator (incompatible)
- Returns:
- dict: {TopicPartition: [messages]}
+ Returns: (records (dict), partial (bool))
+ records: {TopicPartition: [messages]}
+ partial: True if records returned did not fully drain any pending
+ partition requests. This may be useful for choosing when to
+ pipeline additional fetch requests.
"""
- assert self._iterator is None, (
- 'fetched_records is incompatible with message iterator')
+ if max_records is None:
+ max_records = self.config['max_poll_records']
+ assert max_records > 0
+
if self._subscriptions.needs_partition_assignment:
- return {}
+ return {}, False
- drained = collections.defaultdict(list)
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
- # Loop over the records deque
- while self._records:
- (fetch_offset, tp, messages) = self._records.popleft()
-
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
+ drained = collections.defaultdict(list)
+ partial = bool(self._records and max_records)
+ while self._records and max_records > 0:
+ part = self._records.popleft()
+ max_records -= self._append(drained, part, max_records)
+ if part.has_more():
+ self._records.appendleft(part)
+ else:
+ partial &= False
+ return dict(drained), partial
+
+ def _append(self, drained, part, max_records):
+ tp = part.topic_partition
+ fetch_offset = part.fetch_offset
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ else:
# note that the position should always be available
# as long as the partition is still assigned
position = self._subscriptions.assignment[tp].position
@@ -340,26 +345,35 @@ class Fetcher(six.Iterator):
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
- next_offset = messages[-1][0] + 1
+ part_records = part.take(max_records)
+ if not part_records:
+ return 0
+ next_offset = part_records[-1].offset + 1
+
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s and update position to %s", position,
tp, next_offset)
- self._subscriptions.assignment[tp].position = next_offset
- for record in self._unpack_message_set(tp, messages):
+ for record in part_records:
# Fetched compressed messages may include additional records
if record.offset < fetch_offset:
log.debug("Skipping message offset: %s (expecting %s)",
record.offset, fetch_offset)
continue
drained[tp].append(record)
+
+ self._subscriptions.assignment[tp].position = next_offset
+ return len(part_records)
+
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.debug("Ignoring fetched records for %s at offset %s since"
- " the current position is %d", tp, fetch_offset,
+ " the current position is %d", tp, part.fetch_offset,
position)
- return dict(drained)
+
+ part.discard()
+ return 0
def _unpack_message_set(self, tp, messages):
try:
@@ -430,97 +444,17 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
- def _message_generator(self):
- """Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
-
- while self._records:
-
- # Check on each iteration since this is a generator
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
-
- # Send additional FetchRequests when the internal queue is low
- # this should enable moderate pipelining
- if len(self._records) <= self.config['iterator_refetch_records']:
- self._init_fetches()
-
- (fetch_offset, tp, messages) = self._records.popleft()
-
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
- # note that the consumed position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition consumption paused before
- # fetched records are returned
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
-
- elif fetch_offset == position:
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s", position, tp)
-
- # We can ignore any prior signal to drop pending message sets
- # because we are starting from a fresh one where fetch_offset == position
- # i.e., the user seek()'d to this position
- self._subscriptions.assignment[tp].drop_pending_message_set = False
-
- for msg in self._unpack_message_set(tp, messages):
-
- # Because we are in a generator, it is possible for
- # subscription state to change between yield calls
- # so we need to re-check on each loop
- # this should catch assignment changes, pauses
- # and resets via seek_to_beginning / seek_to_end
- if not self._subscriptions.is_fetchable(tp):
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer fetchable", tp)
- break
-
- # If there is a seek during message iteration,
- # we should stop unpacking this message set and
- # wait for a new fetch response that aligns with the
- # new seek position
- elif self._subscriptions.assignment[tp].drop_pending_message_set:
- log.debug("Skipping remainder of message set for partition %s", tp)
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- break
-
- # Compressed messagesets may include earlier messages
- elif msg.offset < self._subscriptions.assignment[tp].position:
- log.debug("Skipping message offset: %s (expecting %s)",
- msg.offset,
- self._subscriptions.assignment[tp].position)
- continue
-
- self._subscriptions.assignment[tp].position = msg.offset + 1
- yield msg
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
+ ret, _ = self.fetched_records(max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
def _deserialize(self, msg):
if self.config['key_deserializer']:
@@ -601,6 +535,11 @@ class Fetcher(six.Iterator):
" %s", partition, error_type)
future.failure(error_type(partition))
+ def _fetchable_partitions(self):
+ fetchable = self._subscriptions.fetchable_partitions()
+ pending = set([part.topic_partition for part in self._records])
+ return fetchable.difference(pending)
+
def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.
@@ -613,24 +552,17 @@ class Fetcher(six.Iterator):
# which can be passed to FetchRequest() via .items()
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
- # avoid re-fetching pending offsets
- pending = set()
- for fetch_offset, tp, _ in self._records:
- pending.add((tp, fetch_offset))
-
- for partition in self._subscriptions.fetchable_partitions():
+ for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
position = self._subscriptions.assignment[partition].position
- # fetch if there is a leader, no in-flight requests, and no _records
+ # fetch if there is a leader and no in-flight requests
if node_id is None or node_id == -1:
log.debug("No leader found for partition %s."
" Requesting metadata update", partition)
self._client.cluster.request_update()
- elif ((partition, position) not in pending and
- self._client.in_flight_request_count(node_id) == 0):
-
+ elif self._client.in_flight_request_count(node_id) == 0:
partition_info = (
partition.partition,
position,
@@ -704,7 +636,8 @@ class Fetcher(six.Iterator):
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
- self._records.append((fetch_offset, tp, messages))
+ unpacked = list(self._unpack_message_set(tp, messages))
+ self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
@@ -744,6 +677,29 @@ class Fetcher(six.Iterator):
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
+ class PartitionRecords(six.Iterator):
+ def __init__(self, fetch_offset, tp, messages):
+ self.fetch_offset = fetch_offset
+ self.topic_partition = tp
+ self.messages = messages
+ self.message_idx = 0
+
+ def discard(self):
+ self.messages = None
+
+ def take(self, n):
+ if not self.has_more():
+ return []
+ next_idx = self.message_idx + n
+ res = self.messages[self.message_idx:next_idx]
+ self.message_idx = next_idx
+ if self.has_more():
+ self.fetch_offset = self.messages[self.message_idx].offset
+ return res
+
+ def has_more(self):
+ return self.message_idx < len(self.messages)
+
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):