diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2017-02-09 12:27:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-02-09 12:27:16 -0800 |
commit | 8fde79dbb5a3793b1a9ebd10e032d5f3dd535645 (patch) | |
tree | a991daae07aa142d936b37a2af7f55030355357b /kafka/consumer/fetcher.py | |
parent | e825483d49bda41f13420311cbc9ffd59f7cee3d (diff) | |
download | kafka-python-8fde79dbb5a3793b1a9ebd10e032d5f3dd535645.tar.gz |
PEP-8: Spacing & removed unused imports (#899)
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 00d26c6..73daa36 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -44,7 +44,7 @@ class Fetcher(six.Iterator): 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, - 'iterator_refetch_records': 1, # undocumented -- interface may change + 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } @@ -91,10 +91,10 @@ class Fetcher(six.Iterator): self._client = client self._subscriptions = subscriptions - self._records = collections.deque() # (offset, topic_partition, messages) + self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() - self._offset_out_of_range_partitions = dict() # {topic_partition: offset} - self._record_too_large_partitions = dict() # {topic_partition: offset} + self._offset_out_of_range_partitions = dict() # {topic_partition: offset} + self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -217,7 +217,7 @@ class Fetcher(six.Iterator): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -494,10 +494,10 @@ class Fetcher(six.Iterator): # of a compressed message depends on the # typestamp type of the wrapper message: - if msg.timestamp_type == 0: # CREATE_TIME (0) + if msg.timestamp_type == 0: # CREATE_TIME (0) inner_timestamp = inner_msg.timestamp - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) inner_timestamp = msg.timestamp else: @@ -673,7 +673,7 @@ class Fetcher(six.Iterator): requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( - -1, # replica_id + -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], partition_data.items()) |