diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/__init__.py | 2 | ||||
-rw-r--r-- | kafka/consumer/base.py | 19 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 645 | ||||
-rw-r--r-- | kafka/consumer/group.py | 682 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 37 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 8 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 84 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 462 |
8 files changed, 1860 insertions, 79 deletions
diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py index 935f56e..8041537 100644 --- a/kafka/consumer/__init__.py +++ b/kafka/consumer/__init__.py @@ -1,6 +1,6 @@ from .simple import SimpleConsumer from .multiprocess import MultiProcessConsumer -from .kafka import KafkaConsumer +from .group import KafkaConsumer __all__ = [ 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer' diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index c9f6e48..2059d92 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,11 +7,11 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, UnknownTopicOrPartitionError, check_error, KafkaError ) -from kafka.util import kafka_bytestring, ReentrantTimer +from kafka.util import ReentrantTimer log = logging.getLogger('kafka.consumer') @@ -47,8 +47,8 @@ class Consumer(object): auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client - self.topic = kafka_bytestring(topic) - self.group = None if group is None else kafka_bytestring(group) + self.topic = topic + self.group = group self.client.load_metadata_for_topics(topic) self.offsets = {} @@ -94,14 +94,14 @@ class Consumer(object): def fetch_last_known_offsets(self, partitions=None): if self.group is None: - raise ValueError('KafkaClient.group must not be None') + raise ValueError('SimpleClient.group must not be None') if partitions is None: partitions = self.client.get_partition_ids_for_topic(self.topic) responses = self.client.send_offset_fetch_request( self.group, - [OffsetFetchRequest(self.topic, p) for p in partitions], + [OffsetFetchRequestPayload(self.topic, p) for p in partitions], fail_on_error=False ) @@ -155,7 +155,7 @@ class Consumer(object): 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) - reqs.append(OffsetCommitRequest(self.topic, partition, + reqs.append(OffsetCommitRequestPayload(self.topic, partition, offset, None)) try: @@ -197,7 +197,8 @@ class Consumer(object): # ValueError on list.remove() if the exithandler no longer # exists is fine here try: - atexit._exithandlers.remove((self._cleanup_func, (self,), {})) + atexit._exithandlers.remove( # pylint: disable=no-member + (self._cleanup_func, (self,), {})) except ValueError: pass @@ -217,7 +218,7 @@ class Consumer(object): reqs = [] for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) resps = self.client.send_offset_request(reqs) for resp in resps: diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py new file mode 100644 index 0000000..1593018 --- /dev/null +++ b/kafka/consumer/fetcher.py @@ -0,0 +1,645 @@ +from __future__ import absolute_import + +import collections +import copy +import logging + +import six + +import kafka.common as Errors +from kafka.common import TopicPartition +from kafka.future import Future +from kafka.protocol.fetch import FetchRequest +from kafka.protocol.message import PartialMessage +from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy + +log = logging.getLogger(__name__) + + +ConsumerRecord = collections.namedtuple("ConsumerRecord", + ["topic", "partition", "offset", "key", "value"]) + + +class NoOffsetForPartitionError(Errors.KafkaError): + pass + + +class RecordTooLargeError(Errors.KafkaError): + pass + + +class Fetcher(six.Iterator): + DEFAULT_CONFIG = { + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_min_bytes': 1024, + 'fetch_max_wait_ms': 500, + 'max_partition_fetch_bytes': 1048576, + 'check_crcs': True, + } + + def __init__(self, client, subscriptions, **configs): + """Initialize a Kafka Message Fetcher. + + Keyword Arguments: + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + """ + #metrics=None, + #metric_group_prefix='consumer', + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self._client = client + self._subscriptions = subscriptions + self._records = collections.deque() # (offset, topic_partition, messages) + self._unauthorized_topics = set() + self._offset_out_of_range_partitions = dict() # {topic_partition: offset} + self._record_too_large_partitions = dict() # {topic_partition: offset} + self._iterator = None + + #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) + + def init_fetches(self): + """Send FetchRequests asynchronously for all assigned partitions. + + Returns: + List of Futures: each future resolves to a FetchResponse + """ + futures = [] + for node_id, request in six.iteritems(self._create_fetch_requests()): + if self._client.ready(node_id): + log.debug("Sending FetchRequest to node %s", node_id) + future = self._client.send(node_id, request) + future.add_callback(self._handle_fetch_response, request) + future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) + futures.append(future) + return futures + + def update_fetch_positions(self, partitions): + """Update the fetch positions for the provided partitions. + + Arguments: + partitions (list of TopicPartitions): partitions to update + + Raises: + NoOffsetForPartitionError: if no offset is stored for a given + partition and no reset policy is available + """ + # reset the fetch position to the committed position + for tp in partitions: + if not self._subscriptions.is_assigned(tp): + log.warning("partition %s is not assigned - skipping offset" + " update", tp) + continue + elif self._subscriptions.is_fetchable(tp): + log.warning("partition %s is still fetchable -- skipping offset" + " update", tp) + continue + + # TODO: If there are several offsets to reset, + # we could submit offset requests in parallel + # for now, each call to _reset_offset will block + if self._subscriptions.is_offset_reset_needed(tp): + self._reset_offset(tp) + elif self._subscriptions.assignment[tp].committed is None: + # there's no committed position, so we need to reset with the + # default strategy + self._subscriptions.need_offset_reset(tp) + self._reset_offset(tp) + else: + committed = self._subscriptions.assignment[tp].committed + log.debug("Resetting offset for partition %s to the committed" + " offset %s", tp, committed) + self._subscriptions.seek(tp, committed) + + def _reset_offset(self, partition): + """Reset offsets for the given partition using the offset reset strategy. + + Arguments: + partition (TopicPartition): the partition that needs reset offset + + Raises: + NoOffsetForPartitionError: if no offset reset strategy is defined + """ + timestamp = self._subscriptions.assignment[partition].reset_strategy + if timestamp is OffsetResetStrategy.EARLIEST: + strategy = 'earliest' + elif timestamp is OffsetResetStrategy.LATEST: + strategy = 'latest' + else: + raise NoOffsetForPartitionError(partition) + + log.debug("Resetting offset for partition %s to %s offset.", + partition, strategy) + offset = self._offset(partition, timestamp) + + # we might lose the assignment while fetching the offset, + # so check it is still active + if self._subscriptions.is_assigned(partition): + self._subscriptions.seek(partition, offset) + + def _offset(self, partition, timestamp): + """Fetch a single offset before the given timestamp for the partition. + + Blocks until offset is obtained, or a non-retriable exception is raised + + Arguments: + partition The partition that needs fetching offset. + timestamp (int): timestamp for fetching offset. -1 for the latest + available, -2 for the earliest available. Otherwise timestamp + is treated as epoch seconds. + + Returns: + int: message offset + """ + while True: + future = self._send_offset_request(partition, timestamp) + self._client.poll(future=future) + + if future.succeeded(): + return future.value + + if not future.retriable(): + raise future.exception # pylint: disable-msg=raising-bad-type + + if future.exception.invalid_metadata: + refresh_future = self._client.cluster.request_update() + self._client.poll(future=refresh_future) + + def _raise_if_offset_out_of_range(self): + """Check FetchResponses for offset out of range. + + Raises: + OffsetOutOfRangeError: if any partition from previous FetchResponse + contains OffsetOutOfRangeError and the default_reset_policy is + None + """ + if not self._offset_out_of_range_partitions: + return + + current_out_of_range_partitions = {} + + # filter only the fetchable partitions + for partition, offset in self._offset_out_of_range_partitions: + if not self._subscriptions.is_fetchable(partition): + log.debug("Ignoring fetched records for %s since it is no" + " longer fetchable", partition) + continue + consumed = self._subscriptions.assignment[partition].consumed + # ignore partition if its consumed offset != offset in FetchResponse + # e.g. after seek() + if consumed is not None and offset == consumed: + current_out_of_range_partitions[partition] = offset + + self._offset_out_of_range_partitions.clear() + if current_out_of_range_partitions: + raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions) + + def _raise_if_unauthorized_topics(self): + """Check FetchResponses for topic authorization failures. + + Raises: + TopicAuthorizationFailedError + """ + if self._unauthorized_topics: + topics = set(self._unauthorized_topics) + self._unauthorized_topics.clear() + raise Errors.TopicAuthorizationFailedError(topics) + + def _raise_if_record_too_large(self): + """Check FetchResponses for messages larger than the max per partition. + + Raises: + RecordTooLargeError: if there is a message larger than fetch size + """ + if not self._record_too_large_partitions: + return + + copied_record_too_large_partitions = dict(self._record_too_large_partitions) + self._record_too_large_partitions.clear() + + raise RecordTooLargeError( + "There are some messages at [Partition=Offset]: %s " + " whose size is larger than the fetch size %s" + " and hence cannot be ever returned." + " Increase the fetch size, or decrease the maximum message" + " size the broker will allow.", + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) + + def fetched_records(self): + """Returns previously fetched records and updates consumed offsets. + + Incompatible with iterator interface - use one or the other, not both. + + Raises: + OffsetOutOfRangeError: if no subscription offset_reset_strategy + InvalidMessageError: if message crc validation fails (check_crcs + must be set to True) + RecordTooLargeError: if a message is larger than the currently + configured max_partition_fetch_bytes + TopicAuthorizationError: if consumer is not authorized to fetch + messages from the topic + AssertionError: if used with iterator (incompatible) + + Returns: + dict: {TopicPartition: [messages]} + """ + assert self._iterator is None, ( + 'fetched_records is incompatible with message iterator') + if self._subscriptions.needs_partition_assignment: + return {} + + drained = collections.defaultdict(list) + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + # Loop over the records deque + while self._records: + (fetch_offset, tp, messages) = self._records.popleft() + + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the consumed position should always be available + # as long as the partition is still assigned + consumed = self._subscriptions.assignment[tp].consumed + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition consumption paused before + # fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + # we also need to reset the fetch positions to pretend we did + # not fetch this partition in the previous request at all + self._subscriptions.assignment[tp].fetched = consumed + elif fetch_offset == consumed: + next_offset = messages[-1][0] + 1 + log.debug("Returning fetched records for assigned partition %s" + " and update consumed position to %s", tp, next_offset) + self._subscriptions.assignment[tp].consumed = next_offset + + for record in self._unpack_message_set(tp, messages): + drained[tp].append(record) + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.debug("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) + return dict(drained) + + def _unpack_message_set(self, tp, messages): + for offset, size, msg in messages: + if self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + elif msg.is_compressed(): + for record in self._unpack_message_set(tp, msg.decompress()): + yield record + else: + key, value = self._deserialize(msg) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + + def _message_generator(self): + """Iterate over fetched_records""" + if self._subscriptions.needs_partition_assignment: + raise StopIteration('Subscription needs partition assignment') + + while self._records: + + # Check on each iteration since this is a generator + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + (fetch_offset, tp, messages) = self._records.popleft() + + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned + log.warning("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the consumed position should always be available + # as long as the partition is still assigned + consumed = self._subscriptions.assignment[tp].consumed + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition consumption paused before + # fetched records are returned + log.warning("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + # we also need to reset the fetch positions to pretend we did + # not fetch this partition in the previous request at all + self._subscriptions.assignment[tp].fetched = consumed + + elif fetch_offset == consumed: + for msg in self._unpack_message_set(tp, messages): + self._subscriptions.assignment[tp].consumed = msg.offset + 1 + yield msg + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.warning("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) + + # Send any additional FetchRequests that we can now + # this will likely fetch each partition individually, rather than + # fetch multiple partitions in bulk when they are on the same broker + self.init_fetches() + + def __iter__(self): # pylint: disable=non-iterator-returned + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + + def _deserialize(self, msg): + if self.config['key_deserializer']: + key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable + else: + key = msg.key + if self.config['value_deserializer']: + value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable + else: + value = msg.value + return key, value + + def _send_offset_request(self, partition, timestamp): + """Fetch a single offset before the given timestamp for the partition. + + Arguments: + partition (TopicPartition): partition that needs fetching offset + timestamp (int): timestamp for fetching offset + + Returns: + Future: resolves to the corresponding offset + """ + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.LeaderNotAvailableError(partition)) + + request = OffsetRequest( + -1, [(partition.topic, [(partition.partition, timestamp, 1)])] + ) + # Client returns a future that only fails on network issues + # so create a separate future and attach a callback to update it + # based on response error codes + future = Future() + if not self._client.ready(node_id): + return future.failure(Errors.NodeNotReadyError(node_id)) + + _f = self._client.send(node_id, request) + _f.add_callback(self._handle_offset_response, partition, future) + _f.add_errback(lambda e: future.failure(e)) + return future + + def _handle_offset_response(self, partition, future, response): + """Callback for the response of the list offset call above. + + Arguments: + partition (TopicPartition): The partition that was fetched + future (Future): the future to update based on response + response (OffsetResponse): response from the server + + Raises: + AssertionError: if response does not match partition + """ + topic, partition_info = response.topics[0] + assert len(response.topics) == 1 and len(partition_info) == 1, ( + 'OffsetResponse should only be for a single topic-partition') + + part, error_code, offsets = partition_info[0] + assert topic == partition.topic and part == partition.partition, ( + 'OffsetResponse partition does not match OffsetRequest partition') + + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' + offset = offsets[0] + log.debug("Fetched offset %d for partition %s", offset, partition) + future.success(offset) + elif error_type in (Errors.NotLeaderForPartitionError, + Errors.UnknownTopicOrPartitionError): + log.warning("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + else: + log.error("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + + def _create_fetch_requests(self): + """Create fetch requests for all assigned partitions, grouped by node. + + FetchRequests skipped if no leader, node has requests in flight, or we + have not returned all previously fetched records to consumer + + Returns: + dict: {node_id: [FetchRequest,...]} + """ + # create the fetch info as a dict of lists of partition info tuples + # which can be passed to FetchRequest() via .items() + fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) + + for partition in self._subscriptions.fetchable_partitions(): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None or node_id == -1: + log.debug("No leader found for partition %s." + " Requesting metadata update", partition) + self._client.cluster.request_update() + elif self._client.in_flight_request_count(node_id) == 0: + # if there is a leader and no in-flight requests, + # issue a new fetch but only fetch data for partitions whose + # previously fetched data has been consumed + fetched = self._subscriptions.assignment[partition].fetched + consumed = self._subscriptions.assignment[partition].consumed + if consumed == fetched: + partition_info = ( + partition.partition, + fetched, + self.config['max_partition_fetch_bytes'] + ) + fetchable[node_id][partition.topic].append(partition_info) + else: + log.debug("Skipping FetchRequest to %s because previously" + " fetched offsets (%s) have not been fully" + " consumed yet (%s)", node_id, fetched, consumed) + + requests = {} + for node_id, partition_data in six.iteritems(fetchable): + requests[node_id] = FetchRequest( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + partition_data.items()) + return requests + + def _handle_fetch_response(self, request, response): + """The callback for fetch completion""" + #total_bytes = 0 + #total_count = 0 + + fetch_offsets = {} + for topic, partitions in request.topics: + for partition, offset, _ in partitions: + fetch_offsets[TopicPartition(topic, partition)] = offset + + for topic, partitions in response.topics: + for partition, error_code, highwater, messages in partitions: + tp = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if not self._subscriptions.is_fetchable(tp): + # this can happen when a rebalance happened or a partition + # consumption paused while fetch is still in-flight + log.debug("Ignoring fetched records for partition %s" + " since it is no longer fetchable", tp) + elif error_type is Errors.NoError: + fetch_offset = fetch_offsets[tp] + + # we are interested in this fetch only if the beginning + # offset matches the current consumed position + consumed = self._subscriptions.assignment[tp].consumed + if consumed is None: + continue + elif consumed != fetch_offset: + # the fetched position has gotten out of sync with the + # consumed position (which might happen when a + # rebalance occurs with a fetch in-flight), so we need + # to reset the fetch position so the next fetch is right + self._subscriptions.assignment[tp].fetched = consumed + continue + + partial = None + if messages and isinstance(messages[-1][-1], PartialMessage): + partial = messages.pop() + + if messages: + last_offset, _, _ = messages[-1] + self._subscriptions.assignment[tp].fetched = last_offset + 1 + self._records.append((fetch_offset, tp, messages)) + #self.sensors.records_fetch_lag.record(highwater - last_offset) + elif partial: + # we did not read a single message from a non-empty + # buffer because that message's size is larger than + # fetch size, in this case record this exception + self._record_too_large_partitions[tp] = fetch_offset + + # TODO: bytes metrics + #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size()); + #totalBytes += num_bytes; + #totalCount += parsed.size(); + elif error_type in (Errors.NotLeaderForPartitionError, + Errors.UnknownTopicOrPartitionError): + self._client.cluster.request_update() + elif error_type is Errors.OffsetOutOfRangeError: + fetch_offset = fetch_offsets[tp] + if self._subscriptions.has_default_offset_reset_policy(): + self._subscriptions.need_offset_reset(tp) + else: + self._offset_out_of_range_partitions[tp] = fetch_offset + log.info("Fetch offset %s is out of range, resetting offset", + self._subscriptions.assignment[tp].fetched) + elif error_type is Errors.TopicAuthorizationFailedError: + log.warn("Not authorized to read from topic %s.", tp.topic) + self._unauthorized_topics.add(tp.topic) + elif error_type is Errors.UnknownError: + log.warn("Unknown error fetching data for topic-partition %s", tp) + else: + raise error_type('Unexpected error while fetching data') + + """TOOD - metrics + self.sensors.bytesFetched.record(totalBytes) + self.sensors.recordsFetched.record(totalCount) + self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()) + self.sensors.fetchLatency.record(resp.requestLatencyMs()) + + +class FetchManagerMetrics(object): + def __init__(self, metrics, prefix): + self.metrics = metrics + self.group_name = prefix + "-fetch-manager-metrics" + + self.bytes_fetched = metrics.sensor("bytes-fetched") + self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name, + "The average number of bytes fetched per request"), metrics.Avg()) + self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name, + "The maximum number of bytes fetched per request"), metrics.Max()) + self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name, + "The average number of bytes consumed per second"), metrics.Rate()) + + self.records_fetched = self.metrics.sensor("records-fetched") + self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name, + "The average number of records in each request"), metrics.Avg()) + self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name, + "The average number of records consumed per second"), metrics.Rate()) + + self.fetch_latency = metrics.sensor("fetch-latency") + self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name, + "The average time taken for a fetch request."), metrics.Avg()) + self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name, + "The max time taken for any fetch request."), metrics.Max()) + self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name, + "The number of fetch requests per second."), metrics.Rate(metrics.Count())) + + self.records_fetch_lag = metrics.sensor("records-lag") + self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name, + "The maximum lag in terms of number of records for any partition in self window"), metrics.Max()) + + self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time") + self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name, + "The average throttle time in ms"), metrics.Avg()) + self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name, + "The maximum throttle time in ms"), metrics.Max()) + + def record_topic_fetch_metrics(topic, num_bytes, num_records): + # record bytes fetched + name = '.'.join(["topic", topic, "bytes-fetched"]) + self.metrics[name].record(num_bytes); + + # record records fetched + name = '.'.join(["topic", topic, "records-fetched"]) + self.metrics[name].record(num_records) + """ diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py new file mode 100644 index 0000000..9ce1438 --- /dev/null +++ b/kafka/consumer/group.py @@ -0,0 +1,682 @@ +from __future__ import absolute_import + +import copy +import logging +import time + +import six + +from kafka.client_async import KafkaClient +from kafka.consumer.fetcher import Fetcher +from kafka.consumer.subscription_state import SubscriptionState +from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.protocol.offset import OffsetResetStrategy +from kafka.version import __version__ + +log = logging.getLogger(__name__) + + +class KafkaConsumer(six.Iterator): + """Consume records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the Kafka + cluster, and adapt as topic-partitions are created or migrate between + brokers. It also interacts with the assigned kafka Group Coordinator node + to allow multiple consumers to load balance consumption of topics (requires + kafka >= 0.9.0.0). + + Arguments: + *topics (str): optional list of topics to subscribe to. If not set, + call subscribe() or assign() before consuming records. + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + auto_offset_reset (str): A policy for resetting offsets on + OffsetOutOfRange errors: 'earliest' will move to the oldest + available message, 'latest' will move to the most recent. Any + ofther value will raise the exception. Default: 'latest'. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + consumer_timeout_ms (int): number of millisecond to throw a timeout + exception to the consumer if no message is available for + consumption. Default: -1 (dont throw exception) + api_version (str): specify which kafka API version to use. + 0.9 enables full group coordination features; 0.8.2 enables + kafka-storage offset commits; 0.8.1 enables zookeeper-storage + offset commits; 0.8.0 is what is left. If set to 'auto', will + attempt to infer the broker version by probing various APIs. + Default: auto + + Note: + Configuration parameters are described in more detail at + https://kafka.apache.org/090/configuration.html#newconsumerconfigs + """ + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'group_id': 'kafka-python-default-group', + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'check_crcs': True, + 'metadata_max_age_ms': 5 * 60 * 1000, + 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'consumer_timeout_ms': -1, + 'api_version': 'auto', + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet + #'metric_reporters': None, + #'metrics_num_samples': 2, + #'metrics_sample_window_ms': 30000, + } + + def __init__(self, *topics, **configs): + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs.pop(key) + + # Only check for extra config keys in top-level class + assert not configs, 'Unrecognized configs: %s' % configs + + deprecated = {'smallest': 'earliest', 'largest': 'latest' } + if self.config['auto_offset_reset'] in deprecated: + new_config = deprecated[self.config['auto_offset_reset']] + log.warning('use auto_offset_reset=%s (%s is deprecated)', + new_config, self.config['auto_offset_reset']) + self.config['auto_offset_reset'] = new_config + + self._client = KafkaClient(**self.config) + + # Check Broker Version if not set explicitly + if self.config['api_version'] == 'auto': + self.config['api_version'] = self._client.check_version() + assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0') + + # Convert api_version config to tuple for easy comparisons + self.config['api_version'] = tuple( + map(int, self.config['api_version'].split('.'))) + + self._subscription = SubscriptionState(self.config['auto_offset_reset']) + self._fetcher = Fetcher( + self._client, self._subscription, **self.config) + self._coordinator = ConsumerCoordinator( + self._client, self._subscription, + assignors=self.config['partition_assignment_strategy'], + **self.config) + self._closed = False + self._iterator = None + self._consumer_timeout = float('inf') + + #self.metrics = None + if topics: + self._subscription.subscribe(topics=topics) + self._client.set_topics(topics) + + def assign(self, partitions): + """Manually assign a list of TopicPartitions to this consumer. + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() + + Warning: + It is not possible to use both manual partition assignment with + assign() and group assignment with subscribe(). + + Note: + This interface does not support incremental assignment and will + replace the previous assignment (if there was one). + + Note: + Manual topic assignment through this method does not use the + consumer's group management functionality. As such, there will be + no rebalance operation triggered when group membership or cluster + and topic metadata change. + """ + self._subscription.assign_from_user(partitions) + self._client.set_topics([tp.topic for tp in partitions]) + + def assignment(self): + """Get the TopicPartitions currently assigned to this consumer. + + If partitions were directly assigned using assign(), then this will + simply return the same partitions that were previously assigned. + If topics were subscribed using subscribe(), then this will give the + set of topic partitions currently assigned to the consumer (which may + be none if the assignment hasn't happened yet, or if the partitions are + in the process of being reassigned). + + Returns: + set: {TopicPartition, ...} + """ + return self._subscription.assigned_partitions() + + def close(self): + """Close the consumer, waiting indefinitely for any needed cleanup.""" + if self._closed: + return + log.debug("Closing the KafkaConsumer.") + self._closed = True + self._coordinator.close() + #self.metrics.close() + self._client.close() + try: + self.config['key_deserializer'].close() + except AttributeError: + pass + try: + self.config['value_deserializer'].close() + except AttributeError: + pass + log.debug("The KafkaConsumer has closed.") + + def commit_async(self, offsets=None, callback=None): + """Commit offsets to kafka asynchronously, optionally firing callback + + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + This is an asynchronous call and will not block. Any errors encountered + are either passed to the callback (if provided) or discarded. + + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. + callback (callable, optional): called as callback(offsets, response) + with response as either an Exception or a OffsetCommitResponse + struct. This callback can be used to trigger custom actions when + a commit request completes. + + Returns: + kafka.future.Future + """ + assert self.config['api_version'] >= (0, 8, 1) + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + log.debug("Committing offsets: %s", offsets) + future = self._coordinator.commit_offsets_async( + offsets, callback=callback) + return future + + def commit(self, offsets=None): + """Commit offsets to kafka, blocking until success or error + + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + Blocks until either the commit succeeds or an unrecoverable error is + encountered (in which case it is thrown to the caller). + + Currently only supports kafka-topic offset storage (not zookeeper) + + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. + """ + assert self.config['api_version'] >= (0, 8, 1) + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + self._coordinator.commit_offsets_sync(offsets) + + def committed(self, partition): + """Get the last committed offset for the given partition + + This offset will be used as the position for the consumer + in the event of a failure. + + This call may block to do a remote call if the partition in question + isn't assigned to this consumer or if the consumer hasn't yet + initialized its cache of committed offsets. + + Arguments: + partition (TopicPartition): the partition to check + + Returns: + The last committed offset, or None if there was no prior commit. + """ + assert self.config['api_version'] >= (0, 8, 1) + if self._subscription.is_assigned(partition): + committed = self._subscription.assignment[partition].committed + if committed is None: + self._coordinator.refresh_committed_offsets_if_needed() + committed = self._subscription.assignment[partition].committed + else: + commit_map = self._coordinator.fetch_committed_offsets([partition]) + if partition in commit_map: + committed = commit_map[partition].offset + else: + committed = None + return committed + + def topics(self): + """Get all topic metadata topics the user is authorized to view. + + [Not Implemented Yet] + + Returns: + {topic: [partition_info]} + """ + raise NotImplementedError('TODO') + + def partitions_for_topic(self, topic): + """Get metadata about the partitions for a given topic. + + Arguments: + topic (str): topic to check + + Returns: + set: partition ids + """ + return self._client.cluster.partitions_for_topic(topic) + + def poll(self, timeout_ms=0): + """Fetch data from assigned topics / partitions. + + Records are fetched and returned in batches by topic-partition. + On each poll, consumer will try to use the last consumed offset as the + starting offset and fetch sequentially. The last consumed offset can be + manually set through seek(partition, offset) or automatically set as + the last committed offset for the subscribed list of partitions. + + Incompatible with iterator interface -- use one or the other, not both. + + Arguments: + timeout_ms (int, optional): milliseconds to spend waiting in poll if + data is not available. If 0, returns immediately with any + records that are available now. Must not be negative. Default: 0 + + Returns: + dict: topic to list of records since the last fetch for the + subscribed list of topics and partitions + """ + assert timeout_ms >= 0, 'Timeout must not be negative' + assert self._iterator is None, 'Incompatible with iterator interface' + + # poll for new data until the timeout expires + start = time.time() + remaining = timeout_ms + while True: + records = self._poll_once(remaining) + if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + self._fetcher.init_fetches() + return records + + elapsed_ms = (time.time() - start) * 1000 + remaining = timeout_ms - elapsed_ms + + if remaining <= 0: + return {} + + def _poll_once(self, timeout_ms): + """ + Do one round of polling. In addition to checking for new data, this does + any needed heart-beating, auto-commits, and offset updates. + + Arguments: + timeout_ms (int): The maximum time in milliseconds to block + + Returns: + dict: map of topic to list of records (may be empty) + """ + if self.config['api_version'] >= (0, 8, 2): + # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) + self._coordinator.ensure_coordinator_known() + + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + records = self._fetcher.fetched_records() + + # if data is available already, e.g. from a previous network client + # poll() call to commit, then just return it immediately + if records: + return records + + self._fetcher.init_fetches() + self._client.poll(timeout_ms / 1000.0) + return self._fetcher.fetched_records() + + def position(self, partition): + """Get the offset of the next record that will be fetched + + Arguments: + partition (TopicPartition): partition to check + """ + assert self._subscription.is_assigned(partition) + + offset = self._subscription.assignment[partition].consumed + if offset is None: + self._update_fetch_positions(partition) + offset = self._subscription.assignment[partition].consumed + return offset + + def pause(self, *partitions): + """Suspend fetching from the requested partitions. + + Future calls to poll() will not return any records from these partitions + until they have been resumed using resume(). Note that this method does + not affect partition subscription. In particular, it does not cause a + group rebalance when automatic assignment is used. + + Arguments: + *partitions (TopicPartition): partitions to pause + """ + for partition in partitions: + log.debug("Pausing partition %s", partition) + self._subscription.pause(partition) + + def resume(self, *partitions): + """Resume fetching from the specified (paused) partitions. + + Arguments: + *partitions (TopicPartition): partitions to resume + """ + for partition in partitions: + log.debug("Resuming partition %s", partition) + self._subscription.resume(partition) + + def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ + assert offset >= 0 + log.debug("Seeking to offset %s for partition %s", offset, partition) + self._subscription.assignment[partition].seek(offset) + + def seek_to_beginning(self, *partitions): + """Seek to the oldest available offset for partitions. + + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to beginning of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) + + def seek_to_end(self, *partitions): + """Seek to the most recent available offset for partitions. + + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to end of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) + + def subscribe(self, topics=(), pattern=None, listener=None): + """Subscribe to a list of topics, or a topic regex pattern + + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). + + This method is incompatible with assign() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. + """ + if not topics: + self.unsubscribe() + else: + self._subscription.subscribe(topics=topics, + pattern=pattern, + listener=listener) + # regex will need all topic metadata + if pattern is not None: + self._client.cluster.need_metadata_for_all = True + log.debug("Subscribed to topic pattern: %s", topics) + else: + self._client.set_topics(self._subscription.group_subscription()) + log.debug("Subscribed to topic(s): %s", topics) + + def subscription(self): + """Get the current topic subscription. + + Returns: + set: {topic, ...} + """ + return self._subscription.subscription + + def unsubscribe(self): + """Unsubscribe from all topics and clear all assigned partitions.""" + self._subscription.unsubscribe() + self._coordinator.close() + self._client.cluster.need_metadata_for_all_topics = False + log.debug("Unsubscribed all topics or patterns and assigned partitions") + + def _update_fetch_positions(self, partitions): + """ + Set the fetch position to the committed position (if there is one) + or reset it using the offset reset policy the user has configured. + + Arguments: + partitions (List[TopicPartition]): The partitions that need + updating fetch positions + + Raises: + NoOffsetForPartitionError: If no offset is stored for a given + partition and no offset reset policy is defined + """ + if self.config['api_version'] >= (0, 8, 1): + # refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() + + # then do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions) + + def _message_generator(self): + while time.time() < self._consumer_timeout: + if self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + self._fetcher.init_fetches() + self._client.poll(self.config['request_timeout_ms'] / 1000.0) + timeout = self._consumer_timeout + if self.config['api_version'] >= (0, 9): + heartbeat_timeout = time.time() + ( + self.config['heartbeat_interval_ms'] / 1000.0) + timeout = min(heartbeat_timeout, timeout) + for msg in self._fetcher: + yield msg + if time.time() > timeout: + break + + def __iter__(self): # pylint: disable=non-iterator-returned + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) + + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + + # old KafkaConsumer methods are deprecated + def configure(self, **configs): + raise NotImplementedError( + 'deprecated -- initialize a new consumer') + + def set_topic_partitions(self, *topics): + raise NotImplementedError( + 'deprecated -- use subscribe() or assign()') + + def fetch_messages(self): + raise NotImplementedError( + 'deprecated -- use poll() or iterator interface') + + def get_partition_offsets(self, topic, partition, + request_time_ms, max_num_offsets): + raise NotImplementedError( + 'deprecated -- send an OffsetRequest with KafkaClient') + + def offsets(self, group=None): + raise NotImplementedError('deprecated -- use committed(partition)') + + def task_done(self, message): + raise NotImplementedError( + 'deprecated -- commit offsets manually if needed') diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 3ef106c..29ddd0e 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -9,14 +9,14 @@ import time import six -from kafka.client import KafkaClient +from kafka import SimpleClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + OffsetFetchRequestPayload, OffsetCommitRequestPayload, + OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) -from kafka.util import kafka_bytestring logger = logging.getLogger(__name__) @@ -136,7 +136,7 @@ class KafkaConsumer(object): 'bootstrap_servers required to configure KafkaConsumer' ) - self._client = KafkaClient( + self._client = SimpleClient( self._config['bootstrap_servers'], client_id=self._config['client_id'], timeout=(self._config['socket_timeout_ms'] / 1000.0) @@ -192,14 +192,14 @@ class KafkaConsumer(object): # Topic name str -- all partitions if isinstance(arg, (six.string_types, six.binary_type)): - topic = kafka_bytestring(arg) + topic = arg for partition in self._client.get_partition_ids_for_topic(topic): self._consume_topic_partition(topic, partition) # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - topic = kafka_bytestring(arg[0]) + topic = arg[0] partition = arg[1] self._consume_topic_partition(topic, partition) if len(arg) == 3: @@ -212,7 +212,7 @@ class KafkaConsumer(object): # key can be string (a topic) if isinstance(key, (six.string_types, six.binary_type)): - topic = kafka_bytestring(key) + topic = key # topic: partition if isinstance(value, int): @@ -230,7 +230,7 @@ class KafkaConsumer(object): # (topic, partition): offset elif isinstance(key, tuple): - topic = kafka_bytestring(key[0]) + topic = key[0] partition = key[1] self._consume_topic_partition(topic, partition) self._offsets.fetch[(topic, partition)] = value @@ -333,9 +333,9 @@ class KafkaConsumer(object): 'No fetch offsets found when calling fetch_messages' ) - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) + fetches = [FetchRequestPayload(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) for (topic, partition) in self._topics] # send_fetch_request will batch topic/partition requests by leader @@ -353,7 +353,7 @@ class KafkaConsumer(object): self._refresh_metadata_on_error() continue - topic = kafka_bytestring(resp.topic) + topic = resp.topic partition = resp.partition try: check_error(resp) @@ -425,7 +425,7 @@ class KafkaConsumer(object): topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) @@ -545,14 +545,14 @@ class KafkaConsumer(object): continue commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], + OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], commit_offset, metadata) ) if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) resps = self._client.send_offset_commit_request( - kafka_bytestring(self._config['group_id']), commits, + self._config['group_id'], commits, fail_on_error=False ) @@ -576,7 +576,6 @@ class KafkaConsumer(object): # def _consume_topic_partition(self, topic, partition): - topic = kafka_bytestring(topic) if not isinstance(partition, int): raise KafkaConfigurationError('Unknown partition type (%s) ' '-- expected int' % type(partition)) @@ -616,8 +615,8 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - kafka_bytestring(self._config['group_id']), - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + self._config['group_id'], + [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) @@ -665,7 +664,7 @@ class KafkaConsumer(object): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable=E0704 (offset, ) = self.get_partition_offsets(topic, partition, request_time_ms, max_num_offsets=1) diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index d0e2920..9358b09 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -3,12 +3,10 @@ from __future__ import absolute_import from collections import namedtuple import logging from multiprocessing import Process, Manager as MPManager -try: - import queue # python 3 -except ImportError: - import Queue as queue # python 2 import time +from six.moves import queue + from ..common import KafkaError from .base import ( Consumer, @@ -104,7 +102,7 @@ class MultiProcessConsumer(Consumer): parallel using multiple processes Arguments: - client: a connected KafkaClient + client: a connected SimpleClient group: a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you *must* set this to None diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 7c63246..29eb480 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -1,18 +1,15 @@ from __future__ import absolute_import try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 + from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611 except ImportError: - from itertools import izip_longest as izip_longest, repeat # python 2 + from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611 import logging -try: - import queue # python 3 -except ImportError: - import Queue as queue # python 2 import sys import time import six +from six.moves import queue from .base import ( Consumer, @@ -27,11 +24,12 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, KafkaError, OffsetRequest, + FetchRequestPayload, KafkaError, OffsetRequestPayload, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error ) +from kafka.protocol.message import PartialMessage log = logging.getLogger(__name__) @@ -72,7 +70,7 @@ class SimpleConsumer(Consumer): for a topic Arguments: - client: a connected KafkaClient + client: a connected SimpleClient group: a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you *must* set this to None @@ -153,9 +151,9 @@ class SimpleConsumer(Consumer): LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': - reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] else: # Let's raise an reasonable exception type if user calls # outside of an exception context @@ -166,7 +164,7 @@ class SimpleConsumer(Consumer): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable=E0704 # send_offset_request log.info('Resetting topic-partition offset to %s for %s:%d', @@ -224,23 +222,17 @@ class SimpleConsumer(Consumer): for tmp_partition in self.offsets.keys(): if whence == 0: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -2, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -1, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) else: pass else: deltas[partition] = offset if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) else: pass @@ -370,9 +362,9 @@ class SimpleConsumer(Consumer): while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) + requests.append(FetchRequestPayload(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, @@ -413,32 +405,34 @@ class SimpleConsumer(Consumer): partition = resp.partition buffer_size = partitions[partition] - try: - for message in resp.messages: - if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) - continue - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall: + + # Check for partial message + if resp.messages and isinstance(resp.messages[-1].message, PartialMessage): + + # If buffer is at max and all we got was a partial message + # raise ConsumerFetchSizeTooSmall if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size): - log.error('Max fetch size %d too small', - self.max_buffer_size) - raise + buffer_size == self.max_buffer_size and + len(resp.messages) == 1): + + log.error('Max fetch size %d too small', self.max_buffer_size) + raise ConsumerFetchSizeTooSmall() + if self.max_buffer_size is None: buffer_size *= 2 else: - buffer_size = min(buffer_size * 2, - self.max_buffer_size) + buffer_size = min(buffer_size * 2, self.max_buffer_size) log.warning('Fetch size too small, increase to %d (2x) ' 'and retry', buffer_size) retry_partitions[partition] = buffer_size - except ConsumerNoMoreData as e: - log.debug('Iteration was ended by %r', e) - except StopIteration: - # Stop iterating through this partition - log.debug('Done iterating over partition %s', partition) + resp.messages.pop() + + for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 partitions = retry_partitions diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py new file mode 100644 index 0000000..c60f192 --- /dev/null +++ b/kafka/consumer/subscription_state.py @@ -0,0 +1,462 @@ +from __future__ import absolute_import + +import abc +import logging +import re + +import six + +from kafka.common import IllegalStateError, OffsetAndMetadata +from kafka.protocol.offset import OffsetResetStrategy + +log = logging.getLogger(__name__) + + +class SubscriptionState(object): + """ + A class for tracking the topics, partitions, and offsets for the consumer. + A partition is "assigned" either directly with assign_from_user() (manual + assignment) or with assign_from_subscribed() (automatic assignment from + subscription). + + Once assigned, the partition is not considered "fetchable" until its initial + position has been set with seek(). Fetchable partitions track a fetch + position which is used to set the offset of the next fetch, and a consumed + position which is the last offset that has been returned to the user. You + can suspend fetching from a partition through pause() without affecting the + fetched/consumed offsets. The partition will remain unfetchable until the + resume() is used. You can also query the pause state independently with + is_paused(). + + Note that pause state as well as fetch/consumed positions are not preserved + when partition assignment is changed whether directly by the user or + through a group rebalance. + + This class also maintains a cache of the latest commit position for each of + the assigned partitions. This is updated through committed() and can be used + to set the initial fetch position (e.g. Fetcher._reset_offset() ). + """ + _SUBSCRIPTION_EXCEPTION_MESSAGE = ("Subscription to topics, partitions and" + " pattern are mutually exclusive") + + def __init__(self, offset_reset_strategy='earliest'): + """Initialize a SubscriptionState instance + + Keyword Arguments: + offset_reset_strategy: 'earliest' or 'latest', otherwise + exception will be raised when fetching an offset that is no + longer available. Default: 'earliest' + """ + try: + offset_reset_strategy = getattr(OffsetResetStrategy, + offset_reset_strategy.upper()) + except AttributeError: + log.warning('Unrecognized offset_reset_strategy, using NONE') + offset_reset_strategy = OffsetResetStrategy.NONE + self._default_offset_reset_strategy = offset_reset_strategy + + self.subscription = None # set() or None + self.subscribed_pattern = None # regex str or None + self._group_subscription = set() + self._user_assignment = set() + self.assignment = dict() + self.needs_partition_assignment = False + self.listener = None + + # initialize to true for the consumers to fetch offset upon starting up + self.needs_fetch_committed_offsets = True + + def subscribe(self, topics=(), pattern=None, listener=None): + """Subscribe to a list of topics, or a topic regex pattern. + + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). + + This method is incompatible with assign_from_user() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. + """ + if self._user_assignment or (topics and pattern): + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + assert topics or pattern, 'Must provide topics or pattern' + + if pattern: + log.info('Subscribing to pattern: /%s/', pattern) + self.subscription = set() + self.subscribed_pattern = re.compile(pattern) + else: + self.change_subscription(topics) + + if listener and not isinstance(listener, ConsumerRebalanceListener): + raise TypeError('listener must be a ConsumerRebalanceListener') + self.listener = listener + + def change_subscription(self, topics): + """Change the topic subscription. + + Arguments: + topics (list of str): topics for subscription + + Raises: + IllegalStateErrror: if assign_from_user has been used already + """ + if self._user_assignment: + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + + if self.subscription == set(topics): + log.warning("subscription unchanged by change_subscription(%s)", + topics) + return + + log.info('Updating subscribed topics to: %s', topics) + self.subscription = set(topics) + self._group_subscription.update(topics) + self.needs_partition_assignment = True + + # Remove any assigned partitions which are no longer subscribed to + for tp in set(self.assignment.keys()): + if tp.topic not in self.subscription: + del self.assignment[tp] + + def group_subscribe(self, topics): + """Add topics to the current group subscription. + + This is used by the group leader to ensure that it receives metadata + updates for all topics that any member of the group is subscribed to. + + Arguments: + topics (list of str): topics to add to the group subscription + """ + if self._user_assignment: + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + self._group_subscription.update(topics) + + def mark_for_reassignment(self): + self._group_subscription.intersection_update(self.subscription) + self.needs_partition_assignment = True + + def assign_from_user(self, partitions): + """Manually assign a list of TopicPartitions to this consumer. + + This interface does not allow for incremental assignment and will + replace the previous assignment (if there was one). + + Manual topic assignment through this method does not use the consumer's + group management functionality. As such, there will be no rebalance + operation triggered when group membership or cluster and topic metadata + change. Note that it is not possible to use both manual partition + assignment with assign() and group assignment with subscribe(). + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() + """ + if self.subscription is not None: + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + + self._user_assignment.clear() + self._user_assignment.update(partitions) + + for partition in partitions: + if partition not in self.assignment: + self._add_assigned_partition(partition) + + for tp in set(self.assignment.keys()) - self._user_assignment: + del self.assignment[tp] + + self.needs_partition_assignment = False + + def assign_from_subscribed(self, assignments): + """Update the assignment to the specified partitions + + This method is called by the coordinator to dynamically assign + partitions based on the consumer's topic subscription. This is different + from assign_from_user() which directly sets the assignment from a + user-supplied TopicPartition list. + + Arguments: + assignments (list of TopicPartition): partitions to assign to this + consumer instance. + """ + if self.subscription is None: + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + + for tp in assignments: + if tp.topic not in self.subscription: + raise ValueError("Assigned partition %s for non-subscribed topic." % tp) + self.assignment.clear() + for tp in assignments: + self._add_assigned_partition(tp) + self.needs_partition_assignment = False + log.info("Updated partition assignment: %s", assignments) + + def unsubscribe(self): + """Clear all topic subscriptions and partition assignments""" + self.subscription = None + self._user_assignment.clear() + self.assignment.clear() + self.needs_partition_assignment = True + self.subscribed_pattern = None + + def group_subscription(self): + """Get the topic subscription for the group. + + For the leader, this will include the union of all member subscriptions. + For followers, it is the member's subscription only. + + This is used when querying topic metadata to detect metadata changes + that would require rebalancing (the leader fetches metadata for all + topics in the group so that it can do partition assignment). + + Returns: + set: topics + """ + return self._group_subscription + + def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ + self.assignment[partition].seek(offset) + + def assigned_partitions(self): + """Return set of TopicPartitions in current assignment.""" + return set(self.assignment.keys()) + + def fetchable_partitions(self): + """Return set of TopicPartitions that should be Fetched.""" + fetchable = set() + for partition, state in six.iteritems(self.assignment): + if state.is_fetchable(): + fetchable.add(partition) + return fetchable + + def partitions_auto_assigned(self): + """Return True unless user supplied partitions manually.""" + return self.subscription is not None + + def all_consumed_offsets(self): + """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}""" + all_consumed = {} + for partition, state in six.iteritems(self.assignment): + if state.has_valid_position: + all_consumed[partition] = OffsetAndMetadata(state.consumed, '') + return all_consumed + + def need_offset_reset(self, partition, offset_reset_strategy=None): + """Mark partition for offset reset using specified or default strategy. + + Arguments: + partition (TopicPartition): partition to mark + offset_reset_strategy (OffsetResetStrategy, optional) + """ + if offset_reset_strategy is None: + offset_reset_strategy = self._default_offset_reset_strategy + self.assignment[partition].await_reset(offset_reset_strategy) + + def has_default_offset_reset_policy(self): + """Return True if default offset reset policy is Earliest or Latest""" + return self._default_offset_reset_strategy != OffsetResetStrategy.NONE + + def is_offset_reset_needed(self, partition): + return self.assignment[partition].awaiting_reset + + def has_all_fetch_positions(self): + for state in self.assignment.values(): + if not state.has_valid_position: + return False + return True + + def missing_fetch_positions(self): + missing = set() + for partition, state in six.iteritems(self.assignment): + if not state.has_valid_position: + missing.add(partition) + return missing + + def is_assigned(self, partition): + return partition in self.assignment + + def is_paused(self, partition): + return partition in self.assignment and self.assignment[partition].paused + + def is_fetchable(self, partition): + return partition in self.assignment and self.assignment[partition].is_fetchable() + + def pause(self, partition): + self.assignment[partition].pause() + + def resume(self, partition): + self.assignment[partition].resume() + + def _add_assigned_partition(self, partition): + self.assignment[partition] = TopicPartitionState() + + +class TopicPartitionState(object): + def __init__(self): + self.committed = None # last committed position + self.has_valid_position = False # whether we have valid consumed and fetched positions + self.paused = False # whether this partition has been paused by the user + self.awaiting_reset = False # whether we are awaiting reset + self.reset_strategy = None # the reset strategy if awaitingReset is set + self._consumed = None # offset exposed to the user + self._fetched = None # current fetch position + + def _set_fetched(self, offset): + assert self.has_valid_position, 'Valid consumed/fetch position required' + self._fetched = offset + + def _get_fetched(self): + return self._fetched + + fetched = property(_get_fetched, _set_fetched, None, "current fetch position") + + def _set_consumed(self, offset): + assert self.has_valid_position, 'Valid consumed/fetch position required' + self._consumed = offset + + def _get_consumed(self): + return self._consumed + + consumed = property(_get_consumed, _set_consumed, None, "last consumed position") + + def await_reset(self, strategy): + self.awaiting_reset = True + self.reset_strategy = strategy + self._consumed = None + self._fetched = None + self.has_valid_position = False + + def seek(self, offset): + self._consumed = offset + self._fetched = offset + self.awaiting_reset = False + self.reset_strategy = None + self.has_valid_position = True + + def pause(self): + self.paused = True + + def resume(self): + self.paused = False + + def is_fetchable(self): + return not self.paused and self.has_valid_position + + +class ConsumerRebalanceListener(object): + """ + A callback interface that the user can implement to trigger custom actions + when the set of partitions assigned to the consumer changes. + + This is applicable when the consumer is having Kafka auto-manage group + membership. If the consumer's directly assign partitions, those + partitions will never be reassigned and this callback is not applicable. + + When Kafka is managing the group membership, a partition re-assignment will + be triggered any time the members of the group changes or the subscription + of the members changes. This can occur when processes die, new process + instances are added or old instances come back to life after failure. + Rebalances can also be triggered by changes affecting the subscribed + topics (e.g. when then number of partitions is administratively adjusted). + + There are many uses for this functionality. One common use is saving offsets + in a custom store. By saving offsets in the on_partitions_revoked(), call we + can ensure that any time partition assignment changes the offset gets saved. + + Another use is flushing out any kind of cache of intermediate results the + consumer may be keeping. For example, consider a case where the consumer is + subscribed to a topic containing user page views, and the goal is to count + the number of page views per users for each five minute window. Let's say + the topic is partitioned by the user id so that all events for a particular + user will go to a single consumer instance. The consumer can keep in memory + a running tally of actions per user and only flush these out to a remote + data store when its cache gets too big. However if a partition is reassigned + it may want to automatically trigger a flush of this cache, before the new + owner takes over consumption. + + This callback will execute in the user thread as part of the Consumer.poll() + whenever partition assignment changes. + + It is guaranteed that all consumer processes will invoke + on_partitions_revoked() prior to any process invoking + on_partitions_assigned(). So if offsets or other state is saved in the + on_partitions_revoked() call, it should be saved by the time the process + taking over that partition has their on_partitions_assigned() callback + called to load the state. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def on_partitions_revoked(self, revoked): + """ + A callback method the user can implement to provide handling of offset + commits to a customized store on the start of a rebalance operation. + This method will be called before a rebalance operation starts and + after the consumer stops fetching data. It is recommended that offsets + should be committed in this callback to either Kafka or a custom offset + store to prevent duplicate data. + + NOTE: This method is only called before rebalances. It is not called + prior to KafkaConsumer.close() + + Arguments: + revoked (list of TopicPartition): the partitions that were assigned + to the consumer on the last rebalance + """ + pass + + @abc.abstractmethod + def on_partitions_assigned(self, assigned): + """ + A callback method the user can implement to provide handling of + customized offsets on completion of a successful partition + re-assignment. This method will be called after an offset re-assignment + completes and before the consumer starts fetching data. + + It is guaranteed that all the processes in a consumer group will execute + their on_partitions_revoked() callback before any instance executes its + on_partitions_assigned() callback. + + Arguments: + assigned (list of TopicPartition): the partitions assigned to the + consumer (may include partitions that were previously assigned) + """ + pass |