summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 00d26c6..73daa36 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -44,7 +44,7 @@ class Fetcher(six.Iterator):
'max_poll_records': sys.maxsize,
'check_crcs': True,
'skip_double_compressed_messages': False,
- 'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
@@ -91,10 +91,10 @@ class Fetcher(six.Iterator):
self._client = client
self._subscriptions = subscriptions
- self._records = collections.deque() # (offset, topic_partition, messages)
+ self._records = collections.deque() # (offset, topic_partition, messages)
self._unauthorized_topics = set()
- self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
- self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
+ self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -217,7 +217,7 @@ class Fetcher(six.Iterator):
return future.value
if not future.retriable():
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
@@ -494,10 +494,10 @@ class Fetcher(six.Iterator):
# of a compressed message depends on the
# typestamp type of the wrapper message:
- if msg.timestamp_type == 0: # CREATE_TIME (0)
+ if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp
- elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
+ elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp
else:
@@ -673,7 +673,7 @@ class Fetcher(six.Iterator):
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version](
- -1, # replica_id
+ -1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data.items())