diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 00d26c6..73daa36 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -44,7 +44,7 @@ class Fetcher(six.Iterator): 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, - 'iterator_refetch_records': 1, # undocumented -- interface may change + 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } @@ -91,10 +91,10 @@ class Fetcher(six.Iterator): self._client = client self._subscriptions = subscriptions - self._records = collections.deque() # (offset, topic_partition, messages) + self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() - self._offset_out_of_range_partitions = dict() # {topic_partition: offset} - self._record_too_large_partitions = dict() # {topic_partition: offset} + self._offset_out_of_range_partitions = dict() # {topic_partition: offset} + self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -217,7 +217,7 @@ class Fetcher(six.Iterator): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -494,10 +494,10 @@ class Fetcher(six.Iterator): # of a compressed message depends on the # typestamp type of the wrapper message: - if msg.timestamp_type == 0: # CREATE_TIME (0) + if msg.timestamp_type == 0: # CREATE_TIME (0) inner_timestamp = inner_msg.timestamp - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) inner_timestamp = msg.timestamp else: @@ -673,7 +673,7 @@ class Fetcher(six.Iterator): requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( - -1, # replica_id + -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], partition_data.items()) |