summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/__init__.py2
-rw-r--r--kafka/consumer/base.py19
-rw-r--r--kafka/consumer/fetcher.py645
-rw-r--r--kafka/consumer/group.py682
-rw-r--r--kafka/consumer/kafka.py37
-rw-r--r--kafka/consumer/multiprocess.py8
-rw-r--r--kafka/consumer/simple.py84
-rw-r--r--kafka/consumer/subscription_state.py462
8 files changed, 1860 insertions, 79 deletions
diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py
index 935f56e..8041537 100644
--- a/kafka/consumer/__init__.py
+++ b/kafka/consumer/__init__.py
@@ -1,6 +1,6 @@
from .simple import SimpleConsumer
from .multiprocess import MultiProcessConsumer
-from .kafka import KafkaConsumer
+from .group import KafkaConsumer
__all__ = [
'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index c9f6e48..2059d92 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -7,11 +7,11 @@ from threading import Lock
import kafka.common
from kafka.common import (
- OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
UnknownTopicOrPartitionError, check_error, KafkaError
)
-from kafka.util import kafka_bytestring, ReentrantTimer
+from kafka.util import ReentrantTimer
log = logging.getLogger('kafka.consumer')
@@ -47,8 +47,8 @@ class Consumer(object):
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
- self.topic = kafka_bytestring(topic)
- self.group = None if group is None else kafka_bytestring(group)
+ self.topic = topic
+ self.group = group
self.client.load_metadata_for_topics(topic)
self.offsets = {}
@@ -94,14 +94,14 @@ class Consumer(object):
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
- raise ValueError('KafkaClient.group must not be None')
+ raise ValueError('SimpleClient.group must not be None')
if partitions is None:
partitions = self.client.get_partition_ids_for_topic(self.topic)
responses = self.client.send_offset_fetch_request(
self.group,
- [OffsetFetchRequest(self.topic, p) for p in partitions],
+ [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
fail_on_error=False
)
@@ -155,7 +155,7 @@ class Consumer(object):
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)
- reqs.append(OffsetCommitRequest(self.topic, partition,
+ reqs.append(OffsetCommitRequestPayload(self.topic, partition,
offset, None))
try:
@@ -197,7 +197,8 @@ class Consumer(object):
# ValueError on list.remove() if the exithandler no longer
# exists is fine here
try:
- atexit._exithandlers.remove((self._cleanup_func, (self,), {}))
+ atexit._exithandlers.remove( # pylint: disable=no-member
+ (self._cleanup_func, (self,), {}))
except ValueError:
pass
@@ -217,7 +218,7 @@ class Consumer(object):
reqs = []
for partition in partitions:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
resps = self.client.send_offset_request(reqs)
for resp in resps:
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
new file mode 100644
index 0000000..1593018
--- /dev/null
+++ b/kafka/consumer/fetcher.py
@@ -0,0 +1,645 @@
+from __future__ import absolute_import
+
+import collections
+import copy
+import logging
+
+import six
+
+import kafka.common as Errors
+from kafka.common import TopicPartition
+from kafka.future import Future
+from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.message import PartialMessage
+from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+
+log = logging.getLogger(__name__)
+
+
+ConsumerRecord = collections.namedtuple("ConsumerRecord",
+ ["topic", "partition", "offset", "key", "value"])
+
+
+class NoOffsetForPartitionError(Errors.KafkaError):
+ pass
+
+
+class RecordTooLargeError(Errors.KafkaError):
+ pass
+
+
+class Fetcher(six.Iterator):
+ DEFAULT_CONFIG = {
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_min_bytes': 1024,
+ 'fetch_max_wait_ms': 500,
+ 'max_partition_fetch_bytes': 1048576,
+ 'check_crcs': True,
+ }
+
+ def __init__(self, client, subscriptions, **configs):
+ """Initialize a Kafka Message Fetcher.
+
+ Keyword Arguments:
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ """
+ #metrics=None,
+ #metric_group_prefix='consumer',
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ self._client = client
+ self._subscriptions = subscriptions
+ self._records = collections.deque() # (offset, topic_partition, messages)
+ self._unauthorized_topics = set()
+ self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
+ self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._iterator = None
+
+ #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+
+ def init_fetches(self):
+ """Send FetchRequests asynchronously for all assigned partitions.
+
+ Returns:
+ List of Futures: each future resolves to a FetchResponse
+ """
+ futures = []
+ for node_id, request in six.iteritems(self._create_fetch_requests()):
+ if self._client.ready(node_id):
+ log.debug("Sending FetchRequest to node %s", node_id)
+ future = self._client.send(node_id, request)
+ future.add_callback(self._handle_fetch_response, request)
+ future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
+ futures.append(future)
+ return futures
+
+ def update_fetch_positions(self, partitions):
+ """Update the fetch positions for the provided partitions.
+
+ Arguments:
+ partitions (list of TopicPartitions): partitions to update
+
+ Raises:
+ NoOffsetForPartitionError: if no offset is stored for a given
+ partition and no reset policy is available
+ """
+ # reset the fetch position to the committed position
+ for tp in partitions:
+ if not self._subscriptions.is_assigned(tp):
+ log.warning("partition %s is not assigned - skipping offset"
+ " update", tp)
+ continue
+ elif self._subscriptions.is_fetchable(tp):
+ log.warning("partition %s is still fetchable -- skipping offset"
+ " update", tp)
+ continue
+
+ # TODO: If there are several offsets to reset,
+ # we could submit offset requests in parallel
+ # for now, each call to _reset_offset will block
+ if self._subscriptions.is_offset_reset_needed(tp):
+ self._reset_offset(tp)
+ elif self._subscriptions.assignment[tp].committed is None:
+ # there's no committed position, so we need to reset with the
+ # default strategy
+ self._subscriptions.need_offset_reset(tp)
+ self._reset_offset(tp)
+ else:
+ committed = self._subscriptions.assignment[tp].committed
+ log.debug("Resetting offset for partition %s to the committed"
+ " offset %s", tp, committed)
+ self._subscriptions.seek(tp, committed)
+
+ def _reset_offset(self, partition):
+ """Reset offsets for the given partition using the offset reset strategy.
+
+ Arguments:
+ partition (TopicPartition): the partition that needs reset offset
+
+ Raises:
+ NoOffsetForPartitionError: if no offset reset strategy is defined
+ """
+ timestamp = self._subscriptions.assignment[partition].reset_strategy
+ if timestamp is OffsetResetStrategy.EARLIEST:
+ strategy = 'earliest'
+ elif timestamp is OffsetResetStrategy.LATEST:
+ strategy = 'latest'
+ else:
+ raise NoOffsetForPartitionError(partition)
+
+ log.debug("Resetting offset for partition %s to %s offset.",
+ partition, strategy)
+ offset = self._offset(partition, timestamp)
+
+ # we might lose the assignment while fetching the offset,
+ # so check it is still active
+ if self._subscriptions.is_assigned(partition):
+ self._subscriptions.seek(partition, offset)
+
+ def _offset(self, partition, timestamp):
+ """Fetch a single offset before the given timestamp for the partition.
+
+ Blocks until offset is obtained, or a non-retriable exception is raised
+
+ Arguments:
+ partition The partition that needs fetching offset.
+ timestamp (int): timestamp for fetching offset. -1 for the latest
+ available, -2 for the earliest available. Otherwise timestamp
+ is treated as epoch seconds.
+
+ Returns:
+ int: message offset
+ """
+ while True:
+ future = self._send_offset_request(partition, timestamp)
+ self._client.poll(future=future)
+
+ if future.succeeded():
+ return future.value
+
+ if not future.retriable():
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ if future.exception.invalid_metadata:
+ refresh_future = self._client.cluster.request_update()
+ self._client.poll(future=refresh_future)
+
+ def _raise_if_offset_out_of_range(self):
+ """Check FetchResponses for offset out of range.
+
+ Raises:
+ OffsetOutOfRangeError: if any partition from previous FetchResponse
+ contains OffsetOutOfRangeError and the default_reset_policy is
+ None
+ """
+ if not self._offset_out_of_range_partitions:
+ return
+
+ current_out_of_range_partitions = {}
+
+ # filter only the fetchable partitions
+ for partition, offset in self._offset_out_of_range_partitions:
+ if not self._subscriptions.is_fetchable(partition):
+ log.debug("Ignoring fetched records for %s since it is no"
+ " longer fetchable", partition)
+ continue
+ consumed = self._subscriptions.assignment[partition].consumed
+ # ignore partition if its consumed offset != offset in FetchResponse
+ # e.g. after seek()
+ if consumed is not None and offset == consumed:
+ current_out_of_range_partitions[partition] = offset
+
+ self._offset_out_of_range_partitions.clear()
+ if current_out_of_range_partitions:
+ raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
+
+ def _raise_if_unauthorized_topics(self):
+ """Check FetchResponses for topic authorization failures.
+
+ Raises:
+ TopicAuthorizationFailedError
+ """
+ if self._unauthorized_topics:
+ topics = set(self._unauthorized_topics)
+ self._unauthorized_topics.clear()
+ raise Errors.TopicAuthorizationFailedError(topics)
+
+ def _raise_if_record_too_large(self):
+ """Check FetchResponses for messages larger than the max per partition.
+
+ Raises:
+ RecordTooLargeError: if there is a message larger than fetch size
+ """
+ if not self._record_too_large_partitions:
+ return
+
+ copied_record_too_large_partitions = dict(self._record_too_large_partitions)
+ self._record_too_large_partitions.clear()
+
+ raise RecordTooLargeError(
+ "There are some messages at [Partition=Offset]: %s "
+ " whose size is larger than the fetch size %s"
+ " and hence cannot be ever returned."
+ " Increase the fetch size, or decrease the maximum message"
+ " size the broker will allow.",
+ copied_record_too_large_partitions,
+ self.config['max_partition_fetch_bytes'])
+
+ def fetched_records(self):
+ """Returns previously fetched records and updates consumed offsets.
+
+ Incompatible with iterator interface - use one or the other, not both.
+
+ Raises:
+ OffsetOutOfRangeError: if no subscription offset_reset_strategy
+ InvalidMessageError: if message crc validation fails (check_crcs
+ must be set to True)
+ RecordTooLargeError: if a message is larger than the currently
+ configured max_partition_fetch_bytes
+ TopicAuthorizationError: if consumer is not authorized to fetch
+ messages from the topic
+ AssertionError: if used with iterator (incompatible)
+
+ Returns:
+ dict: {TopicPartition: [messages]}
+ """
+ assert self._iterator is None, (
+ 'fetched_records is incompatible with message iterator')
+ if self._subscriptions.needs_partition_assignment:
+ return {}
+
+ drained = collections.defaultdict(list)
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ # Loop over the records deque
+ while self._records:
+ (fetch_offset, tp, messages) = self._records.popleft()
+
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the consumed position should always be available
+ # as long as the partition is still assigned
+ consumed = self._subscriptions.assignment[tp].consumed
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition consumption paused before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ # we also need to reset the fetch positions to pretend we did
+ # not fetch this partition in the previous request at all
+ self._subscriptions.assignment[tp].fetched = consumed
+ elif fetch_offset == consumed:
+ next_offset = messages[-1][0] + 1
+ log.debug("Returning fetched records for assigned partition %s"
+ " and update consumed position to %s", tp, next_offset)
+ self._subscriptions.assignment[tp].consumed = next_offset
+
+ for record in self._unpack_message_set(tp, messages):
+ drained[tp].append(record)
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.debug("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
+ return dict(drained)
+
+ def _unpack_message_set(self, tp, messages):
+ for offset, size, msg in messages:
+ if self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+ elif msg.is_compressed():
+ for record in self._unpack_message_set(tp, msg.decompress()):
+ yield record
+ else:
+ key, value = self._deserialize(msg)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+
+ def _message_generator(self):
+ """Iterate over fetched_records"""
+ if self._subscriptions.needs_partition_assignment:
+ raise StopIteration('Subscription needs partition assignment')
+
+ while self._records:
+
+ # Check on each iteration since this is a generator
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ (fetch_offset, tp, messages) = self._records.popleft()
+
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned
+ log.warning("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the consumed position should always be available
+ # as long as the partition is still assigned
+ consumed = self._subscriptions.assignment[tp].consumed
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition consumption paused before
+ # fetched records are returned
+ log.warning("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ # we also need to reset the fetch positions to pretend we did
+ # not fetch this partition in the previous request at all
+ self._subscriptions.assignment[tp].fetched = consumed
+
+ elif fetch_offset == consumed:
+ for msg in self._unpack_message_set(tp, messages):
+ self._subscriptions.assignment[tp].consumed = msg.offset + 1
+ yield msg
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.warning("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
+
+ # Send any additional FetchRequests that we can now
+ # this will likely fetch each partition individually, rather than
+ # fetch multiple partitions in bulk when they are on the same broker
+ self.init_fetches()
+
+ def __iter__(self): # pylint: disable=non-iterator-returned
+ return self
+
+ def __next__(self):
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
+ def _deserialize(self, msg):
+ if self.config['key_deserializer']:
+ key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
+ else:
+ key = msg.key
+ if self.config['value_deserializer']:
+ value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
+ else:
+ value = msg.value
+ return key, value
+
+ def _send_offset_request(self, partition, timestamp):
+ """Fetch a single offset before the given timestamp for the partition.
+
+ Arguments:
+ partition (TopicPartition): partition that needs fetching offset
+ timestamp (int): timestamp for fetching offset
+
+ Returns:
+ Future: resolves to the corresponding offset
+ """
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None:
+ log.debug("Partition %s is unknown for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.StaleMetadata(partition))
+ elif node_id == -1:
+ log.debug("Leader for partition %s unavailable for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.LeaderNotAvailableError(partition))
+
+ request = OffsetRequest(
+ -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
+ )
+ # Client returns a future that only fails on network issues
+ # so create a separate future and attach a callback to update it
+ # based on response error codes
+ future = Future()
+ if not self._client.ready(node_id):
+ return future.failure(Errors.NodeNotReadyError(node_id))
+
+ _f = self._client.send(node_id, request)
+ _f.add_callback(self._handle_offset_response, partition, future)
+ _f.add_errback(lambda e: future.failure(e))
+ return future
+
+ def _handle_offset_response(self, partition, future, response):
+ """Callback for the response of the list offset call above.
+
+ Arguments:
+ partition (TopicPartition): The partition that was fetched
+ future (Future): the future to update based on response
+ response (OffsetResponse): response from the server
+
+ Raises:
+ AssertionError: if response does not match partition
+ """
+ topic, partition_info = response.topics[0]
+ assert len(response.topics) == 1 and len(partition_info) == 1, (
+ 'OffsetResponse should only be for a single topic-partition')
+
+ part, error_code, offsets = partition_info[0]
+ assert topic == partition.topic and part == partition.partition, (
+ 'OffsetResponse partition does not match OffsetRequest partition')
+
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
+ offset = offsets[0]
+ log.debug("Fetched offset %d for partition %s", offset, partition)
+ future.success(offset)
+ elif error_type in (Errors.NotLeaderForPartitionError,
+ Errors.UnknownTopicOrPartitionError):
+ log.warning("Attempt to fetch offsets for partition %s failed due"
+ " to obsolete leadership information, retrying.",
+ partition)
+ future.failure(error_type(partition))
+ else:
+ log.error("Attempt to fetch offsets for partition %s failed due to:"
+ " %s", partition, error_type)
+ future.failure(error_type(partition))
+
+ def _create_fetch_requests(self):
+ """Create fetch requests for all assigned partitions, grouped by node.
+
+ FetchRequests skipped if no leader, node has requests in flight, or we
+ have not returned all previously fetched records to consumer
+
+ Returns:
+ dict: {node_id: [FetchRequest,...]}
+ """
+ # create the fetch info as a dict of lists of partition info tuples
+ # which can be passed to FetchRequest() via .items()
+ fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
+
+ for partition in self._subscriptions.fetchable_partitions():
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None or node_id == -1:
+ log.debug("No leader found for partition %s."
+ " Requesting metadata update", partition)
+ self._client.cluster.request_update()
+ elif self._client.in_flight_request_count(node_id) == 0:
+ # if there is a leader and no in-flight requests,
+ # issue a new fetch but only fetch data for partitions whose
+ # previously fetched data has been consumed
+ fetched = self._subscriptions.assignment[partition].fetched
+ consumed = self._subscriptions.assignment[partition].consumed
+ if consumed == fetched:
+ partition_info = (
+ partition.partition,
+ fetched,
+ self.config['max_partition_fetch_bytes']
+ )
+ fetchable[node_id][partition.topic].append(partition_info)
+ else:
+ log.debug("Skipping FetchRequest to %s because previously"
+ " fetched offsets (%s) have not been fully"
+ " consumed yet (%s)", node_id, fetched, consumed)
+
+ requests = {}
+ for node_id, partition_data in six.iteritems(fetchable):
+ requests[node_id] = FetchRequest(
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ partition_data.items())
+ return requests
+
+ def _handle_fetch_response(self, request, response):
+ """The callback for fetch completion"""
+ #total_bytes = 0
+ #total_count = 0
+
+ fetch_offsets = {}
+ for topic, partitions in request.topics:
+ for partition, offset, _ in partitions:
+ fetch_offsets[TopicPartition(topic, partition)] = offset
+
+ for topic, partitions in response.topics:
+ for partition, error_code, highwater, messages in partitions:
+ tp = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a rebalance happened or a partition
+ # consumption paused while fetch is still in-flight
+ log.debug("Ignoring fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ elif error_type is Errors.NoError:
+ fetch_offset = fetch_offsets[tp]
+
+ # we are interested in this fetch only if the beginning
+ # offset matches the current consumed position
+ consumed = self._subscriptions.assignment[tp].consumed
+ if consumed is None:
+ continue
+ elif consumed != fetch_offset:
+ # the fetched position has gotten out of sync with the
+ # consumed position (which might happen when a
+ # rebalance occurs with a fetch in-flight), so we need
+ # to reset the fetch position so the next fetch is right
+ self._subscriptions.assignment[tp].fetched = consumed
+ continue
+
+ partial = None
+ if messages and isinstance(messages[-1][-1], PartialMessage):
+ partial = messages.pop()
+
+ if messages:
+ last_offset, _, _ = messages[-1]
+ self._subscriptions.assignment[tp].fetched = last_offset + 1
+ self._records.append((fetch_offset, tp, messages))
+ #self.sensors.records_fetch_lag.record(highwater - last_offset)
+ elif partial:
+ # we did not read a single message from a non-empty
+ # buffer because that message's size is larger than
+ # fetch size, in this case record this exception
+ self._record_too_large_partitions[tp] = fetch_offset
+
+ # TODO: bytes metrics
+ #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
+ #totalBytes += num_bytes;
+ #totalCount += parsed.size();
+ elif error_type in (Errors.NotLeaderForPartitionError,
+ Errors.UnknownTopicOrPartitionError):
+ self._client.cluster.request_update()
+ elif error_type is Errors.OffsetOutOfRangeError:
+ fetch_offset = fetch_offsets[tp]
+ if self._subscriptions.has_default_offset_reset_policy():
+ self._subscriptions.need_offset_reset(tp)
+ else:
+ self._offset_out_of_range_partitions[tp] = fetch_offset
+ log.info("Fetch offset %s is out of range, resetting offset",
+ self._subscriptions.assignment[tp].fetched)
+ elif error_type is Errors.TopicAuthorizationFailedError:
+ log.warn("Not authorized to read from topic %s.", tp.topic)
+ self._unauthorized_topics.add(tp.topic)
+ elif error_type is Errors.UnknownError:
+ log.warn("Unknown error fetching data for topic-partition %s", tp)
+ else:
+ raise error_type('Unexpected error while fetching data')
+
+ """TOOD - metrics
+ self.sensors.bytesFetched.record(totalBytes)
+ self.sensors.recordsFetched.record(totalCount)
+ self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
+ self.sensors.fetchLatency.record(resp.requestLatencyMs())
+
+
+class FetchManagerMetrics(object):
+ def __init__(self, metrics, prefix):
+ self.metrics = metrics
+ self.group_name = prefix + "-fetch-manager-metrics"
+
+ self.bytes_fetched = metrics.sensor("bytes-fetched")
+ self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
+ "The average number of bytes fetched per request"), metrics.Avg())
+ self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
+ "The maximum number of bytes fetched per request"), metrics.Max())
+ self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
+ "The average number of bytes consumed per second"), metrics.Rate())
+
+ self.records_fetched = self.metrics.sensor("records-fetched")
+ self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
+ "The average number of records in each request"), metrics.Avg())
+ self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
+ "The average number of records consumed per second"), metrics.Rate())
+
+ self.fetch_latency = metrics.sensor("fetch-latency")
+ self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
+ "The average time taken for a fetch request."), metrics.Avg())
+ self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
+ "The max time taken for any fetch request."), metrics.Max())
+ self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
+ "The number of fetch requests per second."), metrics.Rate(metrics.Count()))
+
+ self.records_fetch_lag = metrics.sensor("records-lag")
+ self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
+ "The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
+
+ self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
+ self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
+ "The average throttle time in ms"), metrics.Avg())
+ self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
+ "The maximum throttle time in ms"), metrics.Max())
+
+ def record_topic_fetch_metrics(topic, num_bytes, num_records):
+ # record bytes fetched
+ name = '.'.join(["topic", topic, "bytes-fetched"])
+ self.metrics[name].record(num_bytes);
+
+ # record records fetched
+ name = '.'.join(["topic", topic, "records-fetched"])
+ self.metrics[name].record(num_records)
+ """
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
new file mode 100644
index 0000000..9ce1438
--- /dev/null
+++ b/kafka/consumer/group.py
@@ -0,0 +1,682 @@
+from __future__ import absolute_import
+
+import copy
+import logging
+import time
+
+import six
+
+from kafka.client_async import KafkaClient
+from kafka.consumer.fetcher import Fetcher
+from kafka.consumer.subscription_state import SubscriptionState
+from kafka.coordinator.consumer import ConsumerCoordinator
+from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.protocol.offset import OffsetResetStrategy
+from kafka.version import __version__
+
+log = logging.getLogger(__name__)
+
+
+class KafkaConsumer(six.Iterator):
+ """Consume records from a Kafka cluster.
+
+ The consumer will transparently handle the failure of servers in the Kafka
+ cluster, and adapt as topic-partitions are created or migrate between
+ brokers. It also interacts with the assigned kafka Group Coordinator node
+ to allow multiple consumers to load balance consumption of topics (requires
+ kafka >= 0.9.0.0).
+
+ Arguments:
+ *topics (str): optional list of topics to subscribe to. If not set,
+ call subscribe() or assign() before consuming records.
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the consumer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client. Also
+ submitted to GroupCoordinator for logging with respect to
+ consumer group administration. Default: 'kafka-python-{version}'
+ group_id (str): name of the consumer group to join for dynamic
+ partition assignment (if enabled), and to use for fetching and
+ committing offsets. Default: 'kafka-python-default-group'
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ auto_offset_reset (str): A policy for resetting offsets on
+ OffsetOutOfRange errors: 'earliest' will move to the oldest
+ available message, 'latest' will move to the most recent. Any
+ ofther value will raise the exception. Default: 'latest'.
+ enable_auto_commit (bool): If true the consumer's offset will be
+ periodically committed in the background. Default: True.
+ auto_commit_interval_ms (int): milliseconds between automatic
+ offset commits, if enable_auto_commit is True. Default: 5000.
+ default_offset_commit_callback (callable): called as
+ callback(offsets, response) response will be either an Exception
+ or a OffsetCommitResponse struct. This callback can be used to
+ trigger custom actions when a commit request completes.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ partition_assignment_strategy (list): List of objects to use to
+ distribute partition ownership amongst consumer instances when
+ group management is used. Default: [RoundRobinPartitionAssignor]
+ heartbeat_interval_ms (int): The expected time in milliseconds
+ between heartbeats to the consumer coordinator when using
+ Kafka's group management feature. Heartbeats are used to ensure
+ that the consumer's session stays active and to facilitate
+ rebalancing when new consumers join or leave the group. The
+ value must be set lower than session_timeout_ms, but typically
+ should be set no higher than 1/3 of that value. It can be
+ adjusted even lower to control the expected time for normal
+ rebalances. Default: 3000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group managementment facilities. Default: 30000
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
+ consumer_timeout_ms (int): number of millisecond to throw a timeout
+ exception to the consumer if no message is available for
+ consumption. Default: -1 (dont throw exception)
+ api_version (str): specify which kafka API version to use.
+ 0.9 enables full group coordination features; 0.8.2 enables
+ kafka-storage offset commits; 0.8.1 enables zookeeper-storage
+ offset commits; 0.8.0 is what is left. If set to 'auto', will
+ attempt to infer the broker version by probing various APIs.
+ Default: auto
+
+ Note:
+ Configuration parameters are described in more detail at
+ https://kafka.apache.org/090/configuration.html#newconsumerconfigs
+ """
+ DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'group_id': 'kafka-python-default-group',
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_max_wait_ms': 500,
+ 'fetch_min_bytes': 1024,
+ 'max_partition_fetch_bytes': 1 * 1024 * 1024,
+ 'request_timeout_ms': 40 * 1000,
+ 'retry_backoff_ms': 100,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'auto_offset_reset': 'latest',
+ 'enable_auto_commit': True,
+ 'auto_commit_interval_ms': 5000,
+ 'check_crcs': True,
+ 'metadata_max_age_ms': 5 * 60 * 1000,
+ 'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
+ 'heartbeat_interval_ms': 3000,
+ 'session_timeout_ms': 30000,
+ 'send_buffer_bytes': 128 * 1024,
+ 'receive_buffer_bytes': 32 * 1024,
+ 'consumer_timeout_ms': -1,
+ 'api_version': 'auto',
+ 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+ #'metric_reporters': None,
+ #'metrics_num_samples': 2,
+ #'metrics_sample_window_ms': 30000,
+ }
+
+ def __init__(self, *topics, **configs):
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs.pop(key)
+
+ # Only check for extra config keys in top-level class
+ assert not configs, 'Unrecognized configs: %s' % configs
+
+ deprecated = {'smallest': 'earliest', 'largest': 'latest' }
+ if self.config['auto_offset_reset'] in deprecated:
+ new_config = deprecated[self.config['auto_offset_reset']]
+ log.warning('use auto_offset_reset=%s (%s is deprecated)',
+ new_config, self.config['auto_offset_reset'])
+ self.config['auto_offset_reset'] = new_config
+
+ self._client = KafkaClient(**self.config)
+
+ # Check Broker Version if not set explicitly
+ if self.config['api_version'] == 'auto':
+ self.config['api_version'] = self._client.check_version()
+ assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
+
+ # Convert api_version config to tuple for easy comparisons
+ self.config['api_version'] = tuple(
+ map(int, self.config['api_version'].split('.')))
+
+ self._subscription = SubscriptionState(self.config['auto_offset_reset'])
+ self._fetcher = Fetcher(
+ self._client, self._subscription, **self.config)
+ self._coordinator = ConsumerCoordinator(
+ self._client, self._subscription,
+ assignors=self.config['partition_assignment_strategy'],
+ **self.config)
+ self._closed = False
+ self._iterator = None
+ self._consumer_timeout = float('inf')
+
+ #self.metrics = None
+ if topics:
+ self._subscription.subscribe(topics=topics)
+ self._client.set_topics(topics)
+
+ def assign(self, partitions):
+ """Manually assign a list of TopicPartitions to this consumer.
+
+ Arguments:
+ partitions (list of TopicPartition): assignment for this instance.
+
+ Raises:
+ IllegalStateError: if consumer has already called subscribe()
+
+ Warning:
+ It is not possible to use both manual partition assignment with
+ assign() and group assignment with subscribe().
+
+ Note:
+ This interface does not support incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Note:
+ Manual topic assignment through this method does not use the
+ consumer's group management functionality. As such, there will be
+ no rebalance operation triggered when group membership or cluster
+ and topic metadata change.
+ """
+ self._subscription.assign_from_user(partitions)
+ self._client.set_topics([tp.topic for tp in partitions])
+
+ def assignment(self):
+ """Get the TopicPartitions currently assigned to this consumer.
+
+ If partitions were directly assigned using assign(), then this will
+ simply return the same partitions that were previously assigned.
+ If topics were subscribed using subscribe(), then this will give the
+ set of topic partitions currently assigned to the consumer (which may
+ be none if the assignment hasn't happened yet, or if the partitions are
+ in the process of being reassigned).
+
+ Returns:
+ set: {TopicPartition, ...}
+ """
+ return self._subscription.assigned_partitions()
+
+ def close(self):
+ """Close the consumer, waiting indefinitely for any needed cleanup."""
+ if self._closed:
+ return
+ log.debug("Closing the KafkaConsumer.")
+ self._closed = True
+ self._coordinator.close()
+ #self.metrics.close()
+ self._client.close()
+ try:
+ self.config['key_deserializer'].close()
+ except AttributeError:
+ pass
+ try:
+ self.config['value_deserializer'].close()
+ except AttributeError:
+ pass
+ log.debug("The KafkaConsumer has closed.")
+
+ def commit_async(self, offsets=None, callback=None):
+ """Commit offsets to kafka asynchronously, optionally firing callback
+
+ This commits offsets only to Kafka. The offsets committed using this API
+ will be used on the first fetch after every rebalance and also on
+ startup. As such, if you need to store offsets in anything other than
+ Kafka, this API should not be used.
+
+ This is an asynchronous call and will not block. Any errors encountered
+ are either passed to the callback (if provided) or discarded.
+
+ Arguments:
+ offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
+ to commit with the configured group_id. Defaults to current
+ consumed offsets for all subscribed partitions.
+ callback (callable, optional): called as callback(offsets, response)
+ with response as either an Exception or a OffsetCommitResponse
+ struct. This callback can be used to trigger custom actions when
+ a commit request completes.
+
+ Returns:
+ kafka.future.Future
+ """
+ assert self.config['api_version'] >= (0, 8, 1)
+ if offsets is None:
+ offsets = self._subscription.all_consumed_offsets()
+ log.debug("Committing offsets: %s", offsets)
+ future = self._coordinator.commit_offsets_async(
+ offsets, callback=callback)
+ return future
+
+ def commit(self, offsets=None):
+ """Commit offsets to kafka, blocking until success or error
+
+ This commits offsets only to Kafka. The offsets committed using this API
+ will be used on the first fetch after every rebalance and also on
+ startup. As such, if you need to store offsets in anything other than
+ Kafka, this API should not be used.
+
+ Blocks until either the commit succeeds or an unrecoverable error is
+ encountered (in which case it is thrown to the caller).
+
+ Currently only supports kafka-topic offset storage (not zookeeper)
+
+ Arguments:
+ offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
+ to commit with the configured group_id. Defaults to current
+ consumed offsets for all subscribed partitions.
+ """
+ assert self.config['api_version'] >= (0, 8, 1)
+ if offsets is None:
+ offsets = self._subscription.all_consumed_offsets()
+ self._coordinator.commit_offsets_sync(offsets)
+
+ def committed(self, partition):
+ """Get the last committed offset for the given partition
+
+ This offset will be used as the position for the consumer
+ in the event of a failure.
+
+ This call may block to do a remote call if the partition in question
+ isn't assigned to this consumer or if the consumer hasn't yet
+ initialized its cache of committed offsets.
+
+ Arguments:
+ partition (TopicPartition): the partition to check
+
+ Returns:
+ The last committed offset, or None if there was no prior commit.
+ """
+ assert self.config['api_version'] >= (0, 8, 1)
+ if self._subscription.is_assigned(partition):
+ committed = self._subscription.assignment[partition].committed
+ if committed is None:
+ self._coordinator.refresh_committed_offsets_if_needed()
+ committed = self._subscription.assignment[partition].committed
+ else:
+ commit_map = self._coordinator.fetch_committed_offsets([partition])
+ if partition in commit_map:
+ committed = commit_map[partition].offset
+ else:
+ committed = None
+ return committed
+
+ def topics(self):
+ """Get all topic metadata topics the user is authorized to view.
+
+ [Not Implemented Yet]
+
+ Returns:
+ {topic: [partition_info]}
+ """
+ raise NotImplementedError('TODO')
+
+ def partitions_for_topic(self, topic):
+ """Get metadata about the partitions for a given topic.
+
+ Arguments:
+ topic (str): topic to check
+
+ Returns:
+ set: partition ids
+ """
+ return self._client.cluster.partitions_for_topic(topic)
+
+ def poll(self, timeout_ms=0):
+ """Fetch data from assigned topics / partitions.
+
+ Records are fetched and returned in batches by topic-partition.
+ On each poll, consumer will try to use the last consumed offset as the
+ starting offset and fetch sequentially. The last consumed offset can be
+ manually set through seek(partition, offset) or automatically set as
+ the last committed offset for the subscribed list of partitions.
+
+ Incompatible with iterator interface -- use one or the other, not both.
+
+ Arguments:
+ timeout_ms (int, optional): milliseconds to spend waiting in poll if
+ data is not available. If 0, returns immediately with any
+ records that are available now. Must not be negative. Default: 0
+
+ Returns:
+ dict: topic to list of records since the last fetch for the
+ subscribed list of topics and partitions
+ """
+ assert timeout_ms >= 0, 'Timeout must not be negative'
+ assert self._iterator is None, 'Incompatible with iterator interface'
+
+ # poll for new data until the timeout expires
+ start = time.time()
+ remaining = timeout_ms
+ while True:
+ records = self._poll_once(remaining)
+ if records:
+ # before returning the fetched records, we can send off the
+ # next round of fetches and avoid block waiting for their
+ # responses to enable pipelining while the user is handling the
+ # fetched records.
+ self._fetcher.init_fetches()
+ return records
+
+ elapsed_ms = (time.time() - start) * 1000
+ remaining = timeout_ms - elapsed_ms
+
+ if remaining <= 0:
+ return {}
+
+ def _poll_once(self, timeout_ms):
+ """
+ Do one round of polling. In addition to checking for new data, this does
+ any needed heart-beating, auto-commits, and offset updates.
+
+ Arguments:
+ timeout_ms (int): The maximum time in milliseconds to block
+
+ Returns:
+ dict: map of topic to list of records (may be empty)
+ """
+ if self.config['api_version'] >= (0, 8, 2):
+ # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+ self._coordinator.ensure_coordinator_known()
+
+ if self.config['api_version'] >= (0, 9):
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
+
+ # fetch positions if we have partitions we're subscribed to that we
+ # don't know the offset for
+ if not self._subscription.has_all_fetch_positions():
+ self._update_fetch_positions(self._subscription.missing_fetch_positions())
+
+ # init any new fetches (won't resend pending fetches)
+ records = self._fetcher.fetched_records()
+
+ # if data is available already, e.g. from a previous network client
+ # poll() call to commit, then just return it immediately
+ if records:
+ return records
+
+ self._fetcher.init_fetches()
+ self._client.poll(timeout_ms / 1000.0)
+ return self._fetcher.fetched_records()
+
+ def position(self, partition):
+ """Get the offset of the next record that will be fetched
+
+ Arguments:
+ partition (TopicPartition): partition to check
+ """
+ assert self._subscription.is_assigned(partition)
+
+ offset = self._subscription.assignment[partition].consumed
+ if offset is None:
+ self._update_fetch_positions(partition)
+ offset = self._subscription.assignment[partition].consumed
+ return offset
+
+ def pause(self, *partitions):
+ """Suspend fetching from the requested partitions.
+
+ Future calls to poll() will not return any records from these partitions
+ until they have been resumed using resume(). Note that this method does
+ not affect partition subscription. In particular, it does not cause a
+ group rebalance when automatic assignment is used.
+
+ Arguments:
+ *partitions (TopicPartition): partitions to pause
+ """
+ for partition in partitions:
+ log.debug("Pausing partition %s", partition)
+ self._subscription.pause(partition)
+
+ def resume(self, *partitions):
+ """Resume fetching from the specified (paused) partitions.
+
+ Arguments:
+ *partitions (TopicPartition): partitions to resume
+ """
+ for partition in partitions:
+ log.debug("Resuming partition %s", partition)
+ self._subscription.resume(partition)
+
+ def seek(self, partition, offset):
+ """Manually specify the fetch offset for a TopicPartition.
+
+ Overrides the fetch offsets that the consumer will use on the next
+ poll(). If this API is invoked for the same partition more than once,
+ the latest offset will be used on the next poll(). Note that you may
+ lose data if this API is arbitrarily used in the middle of consumption,
+ to reset the fetch offsets.
+
+ Arguments:
+ partition (TopicPartition): partition for seek operation
+ offset (int): message offset in partition
+ """
+ assert offset >= 0
+ log.debug("Seeking to offset %s for partition %s", offset, partition)
+ self._subscription.assignment[partition].seek(offset)
+
+ def seek_to_beginning(self, *partitions):
+ """Seek to the oldest available offset for partitions.
+
+ Arguments:
+ *partitions: optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions
+ """
+ if not partitions:
+ partitions = self._subscription.assigned_partitions()
+ for tp in partitions:
+ log.debug("Seeking to beginning of partition %s", tp)
+ self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
+
+ def seek_to_end(self, *partitions):
+ """Seek to the most recent available offset for partitions.
+
+ Arguments:
+ *partitions: optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions
+ """
+ if not partitions:
+ partitions = self._subscription.assigned_partitions()
+ for tp in partitions:
+ log.debug("Seeking to end of partition %s", tp)
+ self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
+
+ def subscribe(self, topics=(), pattern=None, listener=None):
+ """Subscribe to a list of topics, or a topic regex pattern
+
+ Partitions will be dynamically assigned via a group coordinator.
+ Topic subscriptions are not incremental: this list will replace the
+ current assignment (if there is one).
+
+ This method is incompatible with assign()
+
+ Arguments:
+ topics (list): List of topics for subscription.
+ pattern (str): Pattern to match available topics. You must provide
+ either topics or pattern, but not both.
+ listener (ConsumerRebalanceListener): Optionally include listener
+ callback, which will be called before and after each rebalance
+ operation.
+
+ As part of group management, the consumer will keep track of the
+ list of consumers that belong to a particular group and will
+ trigger a rebalance operation if one of the following events
+ trigger:
+
+ * Number of partitions change for any of the subscribed topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to the consumer group
+
+ When any of these events are triggered, the provided listener
+ will be invoked first to indicate that the consumer's assignment
+ has been revoked, and then again when the new assignment has
+ been received. Note that this listener will immediately override
+ any listener set in a previous call to subscribe. It is
+ guaranteed, however, that the partitions revoked/assigned
+ through this interface are from topics subscribed in this call.
+ """
+ if not topics:
+ self.unsubscribe()
+ else:
+ self._subscription.subscribe(topics=topics,
+ pattern=pattern,
+ listener=listener)
+ # regex will need all topic metadata
+ if pattern is not None:
+ self._client.cluster.need_metadata_for_all = True
+ log.debug("Subscribed to topic pattern: %s", topics)
+ else:
+ self._client.set_topics(self._subscription.group_subscription())
+ log.debug("Subscribed to topic(s): %s", topics)
+
+ def subscription(self):
+ """Get the current topic subscription.
+
+ Returns:
+ set: {topic, ...}
+ """
+ return self._subscription.subscription
+
+ def unsubscribe(self):
+ """Unsubscribe from all topics and clear all assigned partitions."""
+ self._subscription.unsubscribe()
+ self._coordinator.close()
+ self._client.cluster.need_metadata_for_all_topics = False
+ log.debug("Unsubscribed all topics or patterns and assigned partitions")
+
+ def _update_fetch_positions(self, partitions):
+ """
+ Set the fetch position to the committed position (if there is one)
+ or reset it using the offset reset policy the user has configured.
+
+ Arguments:
+ partitions (List[TopicPartition]): The partitions that need
+ updating fetch positions
+
+ Raises:
+ NoOffsetForPartitionError: If no offset is stored for a given
+ partition and no offset reset policy is defined
+ """
+ if self.config['api_version'] >= (0, 8, 1):
+ # refresh commits for all assigned partitions
+ self._coordinator.refresh_committed_offsets_if_needed()
+
+ # then do any offset lookups in case some positions are not known
+ self._fetcher.update_fetch_positions(partitions)
+
+ def _message_generator(self):
+ while time.time() < self._consumer_timeout:
+ if self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
+
+ if self.config['api_version'] >= (0, 9):
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
+
+ # fetch positions if we have partitions we're subscribed to that we
+ # don't know the offset for
+ if not self._subscription.has_all_fetch_positions():
+ self._update_fetch_positions(self._subscription.missing_fetch_positions())
+
+ # init any new fetches (won't resend pending fetches)
+ self._fetcher.init_fetches()
+ self._client.poll(self.config['request_timeout_ms'] / 1000.0)
+ timeout = self._consumer_timeout
+ if self.config['api_version'] >= (0, 9):
+ heartbeat_timeout = time.time() + (
+ self.config['heartbeat_interval_ms'] / 1000.0)
+ timeout = min(heartbeat_timeout, timeout)
+ for msg in self._fetcher:
+ yield msg
+ if time.time() > timeout:
+ break
+
+ def __iter__(self): # pylint: disable=non-iterator-returned
+ return self
+
+ def __next__(self):
+ if not self._iterator:
+ self._iterator = self._message_generator()
+
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
+ # old KafkaConsumer methods are deprecated
+ def configure(self, **configs):
+ raise NotImplementedError(
+ 'deprecated -- initialize a new consumer')
+
+ def set_topic_partitions(self, *topics):
+ raise NotImplementedError(
+ 'deprecated -- use subscribe() or assign()')
+
+ def fetch_messages(self):
+ raise NotImplementedError(
+ 'deprecated -- use poll() or iterator interface')
+
+ def get_partition_offsets(self, topic, partition,
+ request_time_ms, max_num_offsets):
+ raise NotImplementedError(
+ 'deprecated -- send an OffsetRequest with KafkaClient')
+
+ def offsets(self, group=None):
+ raise NotImplementedError('deprecated -- use committed(partition)')
+
+ def task_done(self, message):
+ raise NotImplementedError(
+ 'deprecated -- commit offsets manually if needed')
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 3ef106c..29ddd0e 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -9,14 +9,14 @@ import time
import six
-from kafka.client import KafkaClient
+from kafka import SimpleClient
from kafka.common import (
- OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
+ OffsetFetchRequestPayload, OffsetCommitRequestPayload,
+ OffsetRequestPayload, FetchRequestPayload,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
-from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
@@ -136,7 +136,7 @@ class KafkaConsumer(object):
'bootstrap_servers required to configure KafkaConsumer'
)
- self._client = KafkaClient(
+ self._client = SimpleClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
@@ -192,14 +192,14 @@ class KafkaConsumer(object):
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
- topic = kafka_bytestring(arg)
+ topic = arg
for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- topic = kafka_bytestring(arg[0])
+ topic = arg[0]
partition = arg[1]
self._consume_topic_partition(topic, partition)
if len(arg) == 3:
@@ -212,7 +212,7 @@ class KafkaConsumer(object):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
- topic = kafka_bytestring(key)
+ topic = key
# topic: partition
if isinstance(value, int):
@@ -230,7 +230,7 @@ class KafkaConsumer(object):
# (topic, partition): offset
elif isinstance(key, tuple):
- topic = kafka_bytestring(key[0])
+ topic = key[0]
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value
@@ -333,9 +333,9 @@ class KafkaConsumer(object):
'No fetch offsets found when calling fetch_messages'
)
- fetches = [FetchRequest(topic, partition,
- self._offsets.fetch[(topic, partition)],
- max_bytes)
+ fetches = [FetchRequestPayload(topic, partition,
+ self._offsets.fetch[(topic, partition)],
+ max_bytes)
for (topic, partition) in self._topics]
# send_fetch_request will batch topic/partition requests by leader
@@ -353,7 +353,7 @@ class KafkaConsumer(object):
self._refresh_metadata_on_error()
continue
- topic = kafka_bytestring(resp.topic)
+ topic = resp.topic
partition = resp.partition
try:
check_error(resp)
@@ -425,7 +425,7 @@ class KafkaConsumer(object):
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
- reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+ reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)
@@ -545,14 +545,14 @@ class KafkaConsumer(object):
continue
commits.append(
- OffsetCommitRequest(topic_partition[0], topic_partition[1],
+ OffsetCommitRequestPayload(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(
- kafka_bytestring(self._config['group_id']), commits,
+ self._config['group_id'], commits,
fail_on_error=False
)
@@ -576,7 +576,6 @@ class KafkaConsumer(object):
#
def _consume_topic_partition(self, topic, partition):
- topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
@@ -616,8 +615,8 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- kafka_bytestring(self._config['group_id']),
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ self._config['group_id'],
+ [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
@@ -665,7 +664,7 @@ class KafkaConsumer(object):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
- raise # pylint: disable-msg=E0704
+ raise # pylint: disable=E0704
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index d0e2920..9358b09 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -3,12 +3,10 @@ from __future__ import absolute_import
from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
-try:
- import queue # python 3
-except ImportError:
- import Queue as queue # python 2
import time
+from six.moves import queue
+
from ..common import KafkaError
from .base import (
Consumer,
@@ -104,7 +102,7 @@ class MultiProcessConsumer(Consumer):
parallel using multiple processes
Arguments:
- client: a connected KafkaClient
+ client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 7c63246..29eb480 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -1,18 +1,15 @@
from __future__ import absolute_import
try:
- from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
+ from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611
except ImportError:
- from itertools import izip_longest as izip_longest, repeat # python 2
+ from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611
import logging
-try:
- import queue # python 3
-except ImportError:
- import Queue as queue # python 2
import sys
import time
import six
+from six.moves import queue
from .base import (
Consumer,
@@ -27,11 +24,12 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, KafkaError, OffsetRequest,
+ FetchRequestPayload, KafkaError, OffsetRequestPayload,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
+from kafka.protocol.message import PartialMessage
log = logging.getLogger(__name__)
@@ -72,7 +70,7 @@ class SimpleConsumer(Consumer):
for a topic
Arguments:
- client: a connected KafkaClient
+ client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
@@ -153,9 +151,9 @@ class SimpleConsumer(Consumer):
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
- reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
- reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
@@ -166,7 +164,7 @@ class SimpleConsumer(Consumer):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
- raise # pylint: disable-msg=E0704
+ raise # pylint: disable=E0704
# send_offset_request
log.info('Resetting topic-partition offset to %s for %s:%d',
@@ -224,23 +222,17 @@ class SimpleConsumer(Consumer):
for tmp_partition in self.offsets.keys():
if whence == 0:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -2,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -1,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1))
else:
pass
else:
deltas[partition] = offset
if whence == 0:
- reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
else:
pass
@@ -370,9 +362,9 @@ class SimpleConsumer(Consumer):
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequest(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
+ requests.append(FetchRequestPayload(self.topic, partition,
+ self.fetch_offsets[partition],
+ buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
@@ -413,32 +405,34 @@ class SimpleConsumer(Consumer):
partition = resp.partition
buffer_size = partitions[partition]
- try:
- for message in resp.messages:
- if message.offset < self.fetch_offsets[partition]:
- log.debug('Skipping message %s because its offset is less than the consumer offset',
- message)
- continue
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall:
+
+ # Check for partial message
+ if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):
+
+ # If buffer is at max and all we got was a partial message
+ # raise ConsumerFetchSizeTooSmall
if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size):
- log.error('Max fetch size %d too small',
- self.max_buffer_size)
- raise
+ buffer_size == self.max_buffer_size and
+ len(resp.messages) == 1):
+
+ log.error('Max fetch size %d too small', self.max_buffer_size)
+ raise ConsumerFetchSizeTooSmall()
+
if self.max_buffer_size is None:
buffer_size *= 2
else:
- buffer_size = min(buffer_size * 2,
- self.max_buffer_size)
+ buffer_size = min(buffer_size * 2, self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
- except ConsumerNoMoreData as e:
- log.debug('Iteration was ended by %r', e)
- except StopIteration:
- # Stop iterating through this partition
- log.debug('Done iterating over partition %s', partition)
+ resp.messages.pop()
+
+ for message in resp.messages:
+ if message.offset < self.fetch_offsets[partition]:
+ log.debug('Skipping message %s because its offset is less than the consumer offset',
+ message)
+ continue
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
partitions = retry_partitions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
new file mode 100644
index 0000000..c60f192
--- /dev/null
+++ b/kafka/consumer/subscription_state.py
@@ -0,0 +1,462 @@
+from __future__ import absolute_import
+
+import abc
+import logging
+import re
+
+import six
+
+from kafka.common import IllegalStateError, OffsetAndMetadata
+from kafka.protocol.offset import OffsetResetStrategy
+
+log = logging.getLogger(__name__)
+
+
+class SubscriptionState(object):
+ """
+ A class for tracking the topics, partitions, and offsets for the consumer.
+ A partition is "assigned" either directly with assign_from_user() (manual
+ assignment) or with assign_from_subscribed() (automatic assignment from
+ subscription).
+
+ Once assigned, the partition is not considered "fetchable" until its initial
+ position has been set with seek(). Fetchable partitions track a fetch
+ position which is used to set the offset of the next fetch, and a consumed
+ position which is the last offset that has been returned to the user. You
+ can suspend fetching from a partition through pause() without affecting the
+ fetched/consumed offsets. The partition will remain unfetchable until the
+ resume() is used. You can also query the pause state independently with
+ is_paused().
+
+ Note that pause state as well as fetch/consumed positions are not preserved
+ when partition assignment is changed whether directly by the user or
+ through a group rebalance.
+
+ This class also maintains a cache of the latest commit position for each of
+ the assigned partitions. This is updated through committed() and can be used
+ to set the initial fetch position (e.g. Fetcher._reset_offset() ).
+ """
+ _SUBSCRIPTION_EXCEPTION_MESSAGE = ("Subscription to topics, partitions and"
+ " pattern are mutually exclusive")
+
+ def __init__(self, offset_reset_strategy='earliest'):
+ """Initialize a SubscriptionState instance
+
+ Keyword Arguments:
+ offset_reset_strategy: 'earliest' or 'latest', otherwise
+ exception will be raised when fetching an offset that is no
+ longer available. Default: 'earliest'
+ """
+ try:
+ offset_reset_strategy = getattr(OffsetResetStrategy,
+ offset_reset_strategy.upper())
+ except AttributeError:
+ log.warning('Unrecognized offset_reset_strategy, using NONE')
+ offset_reset_strategy = OffsetResetStrategy.NONE
+ self._default_offset_reset_strategy = offset_reset_strategy
+
+ self.subscription = None # set() or None
+ self.subscribed_pattern = None # regex str or None
+ self._group_subscription = set()
+ self._user_assignment = set()
+ self.assignment = dict()
+ self.needs_partition_assignment = False
+ self.listener = None
+
+ # initialize to true for the consumers to fetch offset upon starting up
+ self.needs_fetch_committed_offsets = True
+
+ def subscribe(self, topics=(), pattern=None, listener=None):
+ """Subscribe to a list of topics, or a topic regex pattern.
+
+ Partitions will be dynamically assigned via a group coordinator.
+ Topic subscriptions are not incremental: this list will replace the
+ current assignment (if there is one).
+
+ This method is incompatible with assign_from_user()
+
+ Arguments:
+ topics (list): List of topics for subscription.
+ pattern (str): Pattern to match available topics. You must provide
+ either topics or pattern, but not both.
+ listener (ConsumerRebalanceListener): Optionally include listener
+ callback, which will be called before and after each rebalance
+ operation.
+
+ As part of group management, the consumer will keep track of the
+ list of consumers that belong to a particular group and will
+ trigger a rebalance operation if one of the following events
+ trigger:
+
+ * Number of partitions change for any of the subscribed topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to the consumer group
+
+ When any of these events are triggered, the provided listener
+ will be invoked first to indicate that the consumer's assignment
+ has been revoked, and then again when the new assignment has
+ been received. Note that this listener will immediately override
+ any listener set in a previous call to subscribe. It is
+ guaranteed, however, that the partitions revoked/assigned
+ through this interface are from topics subscribed in this call.
+ """
+ if self._user_assignment or (topics and pattern):
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ assert topics or pattern, 'Must provide topics or pattern'
+
+ if pattern:
+ log.info('Subscribing to pattern: /%s/', pattern)
+ self.subscription = set()
+ self.subscribed_pattern = re.compile(pattern)
+ else:
+ self.change_subscription(topics)
+
+ if listener and not isinstance(listener, ConsumerRebalanceListener):
+ raise TypeError('listener must be a ConsumerRebalanceListener')
+ self.listener = listener
+
+ def change_subscription(self, topics):
+ """Change the topic subscription.
+
+ Arguments:
+ topics (list of str): topics for subscription
+
+ Raises:
+ IllegalStateErrror: if assign_from_user has been used already
+ """
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ if self.subscription == set(topics):
+ log.warning("subscription unchanged by change_subscription(%s)",
+ topics)
+ return
+
+ log.info('Updating subscribed topics to: %s', topics)
+ self.subscription = set(topics)
+ self._group_subscription.update(topics)
+ self.needs_partition_assignment = True
+
+ # Remove any assigned partitions which are no longer subscribed to
+ for tp in set(self.assignment.keys()):
+ if tp.topic not in self.subscription:
+ del self.assignment[tp]
+
+ def group_subscribe(self, topics):
+ """Add topics to the current group subscription.
+
+ This is used by the group leader to ensure that it receives metadata
+ updates for all topics that any member of the group is subscribed to.
+
+ Arguments:
+ topics (list of str): topics to add to the group subscription
+ """
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ self._group_subscription.update(topics)
+
+ def mark_for_reassignment(self):
+ self._group_subscription.intersection_update(self.subscription)
+ self.needs_partition_assignment = True
+
+ def assign_from_user(self, partitions):
+ """Manually assign a list of TopicPartitions to this consumer.
+
+ This interface does not allow for incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Manual topic assignment through this method does not use the consumer's
+ group management functionality. As such, there will be no rebalance
+ operation triggered when group membership or cluster and topic metadata
+ change. Note that it is not possible to use both manual partition
+ assignment with assign() and group assignment with subscribe().
+
+ Arguments:
+ partitions (list of TopicPartition): assignment for this instance.
+
+ Raises:
+ IllegalStateError: if consumer has already called subscribe()
+ """
+ if self.subscription is not None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ self._user_assignment.clear()
+ self._user_assignment.update(partitions)
+
+ for partition in partitions:
+ if partition not in self.assignment:
+ self._add_assigned_partition(partition)
+
+ for tp in set(self.assignment.keys()) - self._user_assignment:
+ del self.assignment[tp]
+
+ self.needs_partition_assignment = False
+
+ def assign_from_subscribed(self, assignments):
+ """Update the assignment to the specified partitions
+
+ This method is called by the coordinator to dynamically assign
+ partitions based on the consumer's topic subscription. This is different
+ from assign_from_user() which directly sets the assignment from a
+ user-supplied TopicPartition list.
+
+ Arguments:
+ assignments (list of TopicPartition): partitions to assign to this
+ consumer instance.
+ """
+ if self.subscription is None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ for tp in assignments:
+ if tp.topic not in self.subscription:
+ raise ValueError("Assigned partition %s for non-subscribed topic." % tp)
+ self.assignment.clear()
+ for tp in assignments:
+ self._add_assigned_partition(tp)
+ self.needs_partition_assignment = False
+ log.info("Updated partition assignment: %s", assignments)
+
+ def unsubscribe(self):
+ """Clear all topic subscriptions and partition assignments"""
+ self.subscription = None
+ self._user_assignment.clear()
+ self.assignment.clear()
+ self.needs_partition_assignment = True
+ self.subscribed_pattern = None
+
+ def group_subscription(self):
+ """Get the topic subscription for the group.
+
+ For the leader, this will include the union of all member subscriptions.
+ For followers, it is the member's subscription only.
+
+ This is used when querying topic metadata to detect metadata changes
+ that would require rebalancing (the leader fetches metadata for all
+ topics in the group so that it can do partition assignment).
+
+ Returns:
+ set: topics
+ """
+ return self._group_subscription
+
+ def seek(self, partition, offset):
+ """Manually specify the fetch offset for a TopicPartition.
+
+ Overrides the fetch offsets that the consumer will use on the next
+ poll(). If this API is invoked for the same partition more than once,
+ the latest offset will be used on the next poll(). Note that you may
+ lose data if this API is arbitrarily used in the middle of consumption,
+ to reset the fetch offsets.
+
+ Arguments:
+ partition (TopicPartition): partition for seek operation
+ offset (int): message offset in partition
+ """
+ self.assignment[partition].seek(offset)
+
+ def assigned_partitions(self):
+ """Return set of TopicPartitions in current assignment."""
+ return set(self.assignment.keys())
+
+ def fetchable_partitions(self):
+ """Return set of TopicPartitions that should be Fetched."""
+ fetchable = set()
+ for partition, state in six.iteritems(self.assignment):
+ if state.is_fetchable():
+ fetchable.add(partition)
+ return fetchable
+
+ def partitions_auto_assigned(self):
+ """Return True unless user supplied partitions manually."""
+ return self.subscription is not None
+
+ def all_consumed_offsets(self):
+ """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}"""
+ all_consumed = {}
+ for partition, state in six.iteritems(self.assignment):
+ if state.has_valid_position:
+ all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
+ return all_consumed
+
+ def need_offset_reset(self, partition, offset_reset_strategy=None):
+ """Mark partition for offset reset using specified or default strategy.
+
+ Arguments:
+ partition (TopicPartition): partition to mark
+ offset_reset_strategy (OffsetResetStrategy, optional)
+ """
+ if offset_reset_strategy is None:
+ offset_reset_strategy = self._default_offset_reset_strategy
+ self.assignment[partition].await_reset(offset_reset_strategy)
+
+ def has_default_offset_reset_policy(self):
+ """Return True if default offset reset policy is Earliest or Latest"""
+ return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
+
+ def is_offset_reset_needed(self, partition):
+ return self.assignment[partition].awaiting_reset
+
+ def has_all_fetch_positions(self):
+ for state in self.assignment.values():
+ if not state.has_valid_position:
+ return False
+ return True
+
+ def missing_fetch_positions(self):
+ missing = set()
+ for partition, state in six.iteritems(self.assignment):
+ if not state.has_valid_position:
+ missing.add(partition)
+ return missing
+
+ def is_assigned(self, partition):
+ return partition in self.assignment
+
+ def is_paused(self, partition):
+ return partition in self.assignment and self.assignment[partition].paused
+
+ def is_fetchable(self, partition):
+ return partition in self.assignment and self.assignment[partition].is_fetchable()
+
+ def pause(self, partition):
+ self.assignment[partition].pause()
+
+ def resume(self, partition):
+ self.assignment[partition].resume()
+
+ def _add_assigned_partition(self, partition):
+ self.assignment[partition] = TopicPartitionState()
+
+
+class TopicPartitionState(object):
+ def __init__(self):
+ self.committed = None # last committed position
+ self.has_valid_position = False # whether we have valid consumed and fetched positions
+ self.paused = False # whether this partition has been paused by the user
+ self.awaiting_reset = False # whether we are awaiting reset
+ self.reset_strategy = None # the reset strategy if awaitingReset is set
+ self._consumed = None # offset exposed to the user
+ self._fetched = None # current fetch position
+
+ def _set_fetched(self, offset):
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
+ self._fetched = offset
+
+ def _get_fetched(self):
+ return self._fetched
+
+ fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
+
+ def _set_consumed(self, offset):
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
+ self._consumed = offset
+
+ def _get_consumed(self):
+ return self._consumed
+
+ consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
+
+ def await_reset(self, strategy):
+ self.awaiting_reset = True
+ self.reset_strategy = strategy
+ self._consumed = None
+ self._fetched = None
+ self.has_valid_position = False
+
+ def seek(self, offset):
+ self._consumed = offset
+ self._fetched = offset
+ self.awaiting_reset = False
+ self.reset_strategy = None
+ self.has_valid_position = True
+
+ def pause(self):
+ self.paused = True
+
+ def resume(self):
+ self.paused = False
+
+ def is_fetchable(self):
+ return not self.paused and self.has_valid_position
+
+
+class ConsumerRebalanceListener(object):
+ """
+ A callback interface that the user can implement to trigger custom actions
+ when the set of partitions assigned to the consumer changes.
+
+ This is applicable when the consumer is having Kafka auto-manage group
+ membership. If the consumer's directly assign partitions, those
+ partitions will never be reassigned and this callback is not applicable.
+
+ When Kafka is managing the group membership, a partition re-assignment will
+ be triggered any time the members of the group changes or the subscription
+ of the members changes. This can occur when processes die, new process
+ instances are added or old instances come back to life after failure.
+ Rebalances can also be triggered by changes affecting the subscribed
+ topics (e.g. when then number of partitions is administratively adjusted).
+
+ There are many uses for this functionality. One common use is saving offsets
+ in a custom store. By saving offsets in the on_partitions_revoked(), call we
+ can ensure that any time partition assignment changes the offset gets saved.
+
+ Another use is flushing out any kind of cache of intermediate results the
+ consumer may be keeping. For example, consider a case where the consumer is
+ subscribed to a topic containing user page views, and the goal is to count
+ the number of page views per users for each five minute window. Let's say
+ the topic is partitioned by the user id so that all events for a particular
+ user will go to a single consumer instance. The consumer can keep in memory
+ a running tally of actions per user and only flush these out to a remote
+ data store when its cache gets too big. However if a partition is reassigned
+ it may want to automatically trigger a flush of this cache, before the new
+ owner takes over consumption.
+
+ This callback will execute in the user thread as part of the Consumer.poll()
+ whenever partition assignment changes.
+
+ It is guaranteed that all consumer processes will invoke
+ on_partitions_revoked() prior to any process invoking
+ on_partitions_assigned(). So if offsets or other state is saved in the
+ on_partitions_revoked() call, it should be saved by the time the process
+ taking over that partition has their on_partitions_assigned() callback
+ called to load the state.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def on_partitions_revoked(self, revoked):
+ """
+ A callback method the user can implement to provide handling of offset
+ commits to a customized store on the start of a rebalance operation.
+ This method will be called before a rebalance operation starts and
+ after the consumer stops fetching data. It is recommended that offsets
+ should be committed in this callback to either Kafka or a custom offset
+ store to prevent duplicate data.
+
+ NOTE: This method is only called before rebalances. It is not called
+ prior to KafkaConsumer.close()
+
+ Arguments:
+ revoked (list of TopicPartition): the partitions that were assigned
+ to the consumer on the last rebalance
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_partitions_assigned(self, assigned):
+ """
+ A callback method the user can implement to provide handling of
+ customized offsets on completion of a successful partition
+ re-assignment. This method will be called after an offset re-assignment
+ completes and before the consumer starts fetching data.
+
+ It is guaranteed that all the processes in a consumer group will execute
+ their on_partitions_revoked() callback before any instance executes its
+ on_partitions_assigned() callback.
+
+ Arguments:
+ assigned (list of TopicPartition): the partitions assigned to the
+ consumer (may include partitions that were previously assigned)
+ """
+ pass