diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 |
2 files changed, 11 insertions, 11 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 00d26c6..73daa36 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -44,7 +44,7 @@ class Fetcher(six.Iterator): 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, - 'iterator_refetch_records': 1, # undocumented -- interface may change + 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } @@ -91,10 +91,10 @@ class Fetcher(six.Iterator): self._client = client self._subscriptions = subscriptions - self._records = collections.deque() # (offset, topic_partition, messages) + self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() - self._offset_out_of_range_partitions = dict() # {topic_partition: offset} - self._record_too_large_partitions = dict() # {topic_partition: offset} + self._offset_out_of_range_partitions = dict() # {topic_partition: offset} + self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -217,7 +217,7 @@ class Fetcher(six.Iterator): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -494,10 +494,10 @@ class Fetcher(six.Iterator): # of a compressed message depends on the # typestamp type of the wrapper message: - if msg.timestamp_type == 0: # CREATE_TIME (0) + if msg.timestamp_type == 0: # CREATE_TIME (0) inner_timestamp = inner_msg.timestamp - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) inner_timestamp = msg.timestamp else: @@ -673,7 +673,7 @@ class Fetcher(six.Iterator): requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( - -1, # replica_id + -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], partition_data.items()) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 10d293c..47c721f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -239,7 +239,7 @@ class KafkaConsumer(six.Iterator): 'ssl_password': None, 'api_version': None, 'api_version_auto_timeout_ms': 2000, - 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet + 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, @@ -831,8 +831,8 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - if (self.config['api_version'] >= (0, 8, 1) - and self.config['group_id'] is not None): + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): # Refresh commits for all assigned partitions self._coordinator.refresh_committed_offsets_if_needed() |