summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py102
1 files changed, 56 insertions, 46 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 78686a4..7c345e7 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import copy
import logging
@@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator):
distribute partition ownership amongst consumer instances when
group management is used.
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
+ max_poll_records (int): The maximum number of records returned in a
+ single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
+ max_poll_interval_ms (int): The maximum delay between invocations of
+ :meth:`~kafka.KafkaConsumer.poll` when using consumer group
+ management. This places an upper bound on the amount of time that
+ the consumer can be idle before fetching more records. If
+ :meth:`~kafka.KafkaConsumer.poll` is not called before expiration
+ of this timeout, then the consumer is considered failed and the
+ group will rebalance in order to reassign the partitions to another
+ member. Default 300000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group management facilities. The consumer sends
+ periodic heartbeats to indicate its liveness to the broker. If
+ no heartbeats are received by the broker before the expiration of
+ this session timeout, then the broker will remove this consumer
+ from the group and initiate a rebalance. Note that the value must
+ be in the allowable range as configured in the broker configuration
+ by group.min.session.timeout.ms and group.max.session.timeout.ms.
+ Default: 10000
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
- Kafka's group management feature. Heartbeats are used to ensure
+ Kafka's group management facilities. Heartbeats are used to ensure
that the consumer's session stays active and to facilitate
rebalancing when new consumers join or leave the group. The
value must be set lower than session_timeout_ms, but typically
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
- session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group management facilities. Default: 30000
- max_poll_records (int): The maximum number of records returned in a
- single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -236,7 +251,7 @@ class KafkaConsumer(six.Iterator):
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'request_timeout_ms': 40 * 1000,
+ 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
@@ -248,9 +263,10 @@ class KafkaConsumer(six.Iterator):
'check_crcs': True,
'metadata_max_age_ms': 5 * 60 * 1000,
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'heartbeat_interval_ms': 3000,
- 'session_timeout_ms': 30000,
'max_poll_records': 500,
+ 'max_poll_interval_ms': 300000,
+ 'session_timeout_ms': 10000,
+ 'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
@@ -278,15 +294,16 @@ class KafkaConsumer(six.Iterator):
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka'
}
+ DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
def __init__(self, *topics, **configs):
- self.config = copy.copy(self.DEFAULT_CONFIG)
- for key in self.config:
- if key in configs:
- self.config[key] = configs.pop(key)
-
# Only check for extra config keys in top-level class
- assert not configs, 'Unrecognized configs: %s' % configs
+ extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
+ if extra_configs:
+ raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
+
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ self.config.update(configs)
deprecated = {'smallest': 'earliest', 'largest': 'latest'}
if self.config['auto_offset_reset'] in deprecated:
@@ -296,12 +313,7 @@ class KafkaConsumer(six.Iterator):
self.config['auto_offset_reset'] = new_config
request_timeout_ms = self.config['request_timeout_ms']
- session_timeout_ms = self.config['session_timeout_ms']
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
- if request_timeout_ms <= session_timeout_ms:
- raise KafkaConfigurationError(
- "Request timeout (%s) must be larger than session timeout (%s)" %
- (request_timeout_ms, session_timeout_ms))
if request_timeout_ms <= fetch_max_wait_ms:
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
(request_timeout_ms, fetch_max_wait_ms))
@@ -330,6 +342,25 @@ class KafkaConsumer(six.Iterator):
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']
+ # Coordinator configurations are different for older brokers
+ # max_poll_interval_ms is not supported directly -- it must the be
+ # the same as session_timeout_ms. If the user provides one of them,
+ # use it for both. Otherwise use the old default of 30secs
+ if self.config['api_version'] < (0, 10, 1):
+ if 'session_timeout_ms' not in configs:
+ if 'max_poll_interval_ms' in configs:
+ self.config['session_timeout_ms'] = configs['max_poll_interval_ms']
+ else:
+ self.config['session_timeout_ms'] = self.DEFAULT_SESSION_TIMEOUT_MS_0_9
+ if 'max_poll_interval_ms' not in configs:
+ self.config['max_poll_interval_ms'] = self.config['session_timeout_ms']
+
+ if self.config['group_id'] is not None:
+ if self.config['request_timeout_ms'] <= self.config['session_timeout_ms']:
+ raise KafkaConfigurationError(
+ "Request timeout (%s) must be larger than session timeout (%s)" %
+ (self.config['request_timeout_ms'], self.config['session_timeout_ms']))
+
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, **self.config)
@@ -587,12 +618,7 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: Map of topic to list of records (may be empty).
"""
- if self._use_consumer_group():
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_ready()
+ self._coordinator.poll()
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -614,6 +640,7 @@ class KafkaConsumer(six.Iterator):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
+ timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records
@@ -1014,13 +1041,7 @@ class KafkaConsumer(six.Iterator):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
- if self._use_consumer_group():
- self._coordinator.ensure_coordinator_ready()
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_ready()
+ self._coordinator.poll()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
@@ -1068,19 +1089,8 @@ class KafkaConsumer(six.Iterator):
def _next_timeout(self):
timeout = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
-
- # Although the delayed_tasks timeout above should cover processing
- # HeartbeatRequests, it is still possible that HeartbeatResponses
- # are left unprocessed during a long _fetcher iteration without
- # an intermediate poll(). And because tasks are responsible for
- # rescheduling themselves, an unprocessed response will prevent
- # the next heartbeat from being sent. This check should help
- # avoid that.
- if self._use_consumer_group():
- heartbeat = time.time() + self._coordinator.heartbeat.ttl()
- timeout = min(timeout, heartbeat)
+ self._client.cluster.ttl() / 1000.0 + time.time(),
+ self._coordinator.time_to_next_poll() + time.time())
return timeout
def __iter__(self): # pylint: disable=non-iterator-returned