diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 102 |
1 files changed, 56 insertions, 46 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 78686a4..7c345e7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import copy import logging @@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator): distribute partition ownership amongst consumer instances when group management is used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] + max_poll_records (int): The maximum number of records returned in a + single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500 + max_poll_interval_ms (int): The maximum delay between invocations of + :meth:`~kafka.KafkaConsumer.poll` when using consumer group + management. This places an upper bound on the amount of time that + the consumer can be idle before fetching more records. If + :meth:`~kafka.KafkaConsumer.poll` is not called before expiration + of this timeout, then the consumer is considered failed and the + group will rebalance in order to reassign the partitions to another + member. Default 300000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group management facilities. The consumer sends + periodic heartbeats to indicate its liveness to the broker. If + no heartbeats are received by the broker before the expiration of + this session timeout, then the broker will remove this consumer + from the group and initiate a rebalance. Note that the value must + be in the allowable range as configured in the broker configuration + by group.min.session.timeout.ms and group.max.session.timeout.ms. + Default: 10000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure + Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session_timeout_ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - max_poll_records (int): The maximum number of records returned in a - single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500 receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -236,7 +251,7 @@ class KafkaConsumer(six.Iterator): 'fetch_min_bytes': 1, 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1 * 1024 * 1024, - 'request_timeout_ms': 40 * 1000, + 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -248,9 +263,10 @@ class KafkaConsumer(six.Iterator): 'check_crcs': True, 'metadata_max_age_ms': 5 * 60 * 1000, 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), - 'heartbeat_interval_ms': 3000, - 'session_timeout_ms': 30000, 'max_poll_records': 500, + 'max_poll_interval_ms': 300000, + 'session_timeout_ms': 10000, + 'heartbeat_interval_ms': 3000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], @@ -278,15 +294,16 @@ class KafkaConsumer(six.Iterator): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka' } + DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 def __init__(self, *topics, **configs): - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs.pop(key) - # Only check for extra config keys in top-level class - assert not configs, 'Unrecognized configs: %s' % configs + extra_configs = set(configs).difference(self.DEFAULT_CONFIG) + if extra_configs: + raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + + self.config = copy.copy(self.DEFAULT_CONFIG) + self.config.update(configs) deprecated = {'smallest': 'earliest', 'largest': 'latest'} if self.config['auto_offset_reset'] in deprecated: @@ -296,12 +313,7 @@ class KafkaConsumer(six.Iterator): self.config['auto_offset_reset'] = new_config request_timeout_ms = self.config['request_timeout_ms'] - session_timeout_ms = self.config['session_timeout_ms'] fetch_max_wait_ms = self.config['fetch_max_wait_ms'] - if request_timeout_ms <= session_timeout_ms: - raise KafkaConfigurationError( - "Request timeout (%s) must be larger than session timeout (%s)" % - (request_timeout_ms, session_timeout_ms)) if request_timeout_ms <= fetch_max_wait_ms: raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" % (request_timeout_ms, fetch_max_wait_ms)) @@ -330,6 +342,25 @@ class KafkaConsumer(six.Iterator): if self.config['api_version'] is None: self.config['api_version'] = self._client.config['api_version'] + # Coordinator configurations are different for older brokers + # max_poll_interval_ms is not supported directly -- it must the be + # the same as session_timeout_ms. If the user provides one of them, + # use it for both. Otherwise use the old default of 30secs + if self.config['api_version'] < (0, 10, 1): + if 'session_timeout_ms' not in configs: + if 'max_poll_interval_ms' in configs: + self.config['session_timeout_ms'] = configs['max_poll_interval_ms'] + else: + self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_0_9 + if 'max_poll_interval_ms' not in configs: + self.config['max_poll_interval_ms'] = self.config['session_timeout_ms'] + + if self.config['group_id'] is not None: + if self.config['request_timeout_ms'] <= self.config['session_timeout_ms']: + raise KafkaConfigurationError( + "Request timeout (%s) must be larger than session timeout (%s)" % + (self.config['request_timeout_ms'], self.config['session_timeout_ms'])) + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( self._client, self._subscription, self._metrics, **self.config) @@ -587,12 +618,7 @@ class KafkaConsumer(six.Iterator): Returns: dict: Map of topic to list of records (may be empty). """ - if self._use_consumer_group(): - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_ready() + self._coordinator.poll() # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -614,6 +640,7 @@ class KafkaConsumer(six.Iterator): # Send any new fetches (won't resend pending fetches) self._fetcher.send_fetches() + timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll()) self._client.poll(timeout_ms=timeout_ms) records, _ = self._fetcher.fetched_records(max_records) return records @@ -1014,13 +1041,7 @@ class KafkaConsumer(six.Iterator): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: - if self._use_consumer_group(): - self._coordinator.ensure_coordinator_ready() - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_ready() + self._coordinator.poll() # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): @@ -1068,19 +1089,8 @@ class KafkaConsumer(six.Iterator): def _next_timeout(self): timeout = min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) - - # Although the delayed_tasks timeout above should cover processing - # HeartbeatRequests, it is still possible that HeartbeatResponses - # are left unprocessed during a long _fetcher iteration without - # an intermediate poll(). And because tasks are responsible for - # rescheduling themselves, an unprocessed response will prevent - # the next heartbeat from being sent. This check should help - # avoid that. - if self._use_consumer_group(): - heartbeat = time.time() + self._coordinator.heartbeat.ttl() - timeout = min(timeout, heartbeat) + self._client.cluster.ttl() / 1000.0 + time.time(), + self._coordinator.time_to_next_poll() + time.time()) return timeout def __iter__(self): # pylint: disable=non-iterator-returned |