diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 682 |
1 files changed, 682 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py new file mode 100644 index 0000000..9ce1438 --- /dev/null +++ b/kafka/consumer/group.py @@ -0,0 +1,682 @@ +from __future__ import absolute_import + +import copy +import logging +import time + +import six + +from kafka.client_async import KafkaClient +from kafka.consumer.fetcher import Fetcher +from kafka.consumer.subscription_state import SubscriptionState +from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.protocol.offset import OffsetResetStrategy +from kafka.version import __version__ + +log = logging.getLogger(__name__) + + +class KafkaConsumer(six.Iterator): + """Consume records from a Kafka cluster. + + The consumer will transparently handle the failure of servers in the Kafka + cluster, and adapt as topic-partitions are created or migrate between + brokers. It also interacts with the assigned kafka Group Coordinator node + to allow multiple consumers to load balance consumption of topics (requires + kafka >= 0.9.0.0). + + Arguments: + *topics (str): optional list of topics to subscribe to. If not set, + call subscribe() or assign() before consuming records. + + Keyword Arguments: + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the consumer should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + group_id (str): name of the consumer group to join for dynamic + partition assignment (if enabled), and to use for fetching and + committing offsets. Default: 'kafka-python-default-group' + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + auto_offset_reset (str): A policy for resetting offsets on + OffsetOutOfRange errors: 'earliest' will move to the oldest + available message, 'latest' will move to the most recent. Any + ofther value will raise the exception. Default: 'latest'. + enable_auto_commit (bool): If true the consumer's offset will be + periodically committed in the background. Default: True. + auto_commit_interval_ms (int): milliseconds between automatic + offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + metadata_max_age_ms (int): The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. Default: 300000 + partition_assignment_strategy (list): List of objects to use to + distribute partition ownership amongst consumer instances when + group management is used. Default: [RoundRobinPartitionAssignor] + heartbeat_interval_ms (int): The expected time in milliseconds + between heartbeats to the consumer coordinator when using + Kafka's group management feature. Heartbeats are used to ensure + that the consumer's session stays active and to facilitate + rebalancing when new consumers join or leave the group. The + value must be set lower than session_timeout_ms, but typically + should be set no higher than 1/3 of that value. It can be + adjusted even lower to control the expected time for normal + rebalances. Default: 3000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group managementment facilities. Default: 30000 + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: 131072 + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: 32768 + consumer_timeout_ms (int): number of millisecond to throw a timeout + exception to the consumer if no message is available for + consumption. Default: -1 (dont throw exception) + api_version (str): specify which kafka API version to use. + 0.9 enables full group coordination features; 0.8.2 enables + kafka-storage offset commits; 0.8.1 enables zookeeper-storage + offset commits; 0.8.0 is what is left. If set to 'auto', will + attempt to infer the broker version by probing various APIs. + Default: auto + + Note: + Configuration parameters are described in more detail at + https://kafka.apache.org/090/configuration.html#newconsumerconfigs + """ + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'group_id': 'kafka-python-default-group', + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'check_crcs': True, + 'metadata_max_age_ms': 5 * 60 * 1000, + 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'consumer_timeout_ms': -1, + 'api_version': 'auto', + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet + #'metric_reporters': None, + #'metrics_num_samples': 2, + #'metrics_sample_window_ms': 30000, + } + + def __init__(self, *topics, **configs): + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs.pop(key) + + # Only check for extra config keys in top-level class + assert not configs, 'Unrecognized configs: %s' % configs + + deprecated = {'smallest': 'earliest', 'largest': 'latest' } + if self.config['auto_offset_reset'] in deprecated: + new_config = deprecated[self.config['auto_offset_reset']] + log.warning('use auto_offset_reset=%s (%s is deprecated)', + new_config, self.config['auto_offset_reset']) + self.config['auto_offset_reset'] = new_config + + self._client = KafkaClient(**self.config) + + # Check Broker Version if not set explicitly + if self.config['api_version'] == 'auto': + self.config['api_version'] = self._client.check_version() + assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0') + + # Convert api_version config to tuple for easy comparisons + self.config['api_version'] = tuple( + map(int, self.config['api_version'].split('.'))) + + self._subscription = SubscriptionState(self.config['auto_offset_reset']) + self._fetcher = Fetcher( + self._client, self._subscription, **self.config) + self._coordinator = ConsumerCoordinator( + self._client, self._subscription, + assignors=self.config['partition_assignment_strategy'], + **self.config) + self._closed = False + self._iterator = None + self._consumer_timeout = float('inf') + + #self.metrics = None + if topics: + self._subscription.subscribe(topics=topics) + self._client.set_topics(topics) + + def assign(self, partitions): + """Manually assign a list of TopicPartitions to this consumer. + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() + + Warning: + It is not possible to use both manual partition assignment with + assign() and group assignment with subscribe(). + + Note: + This interface does not support incremental assignment and will + replace the previous assignment (if there was one). + + Note: + Manual topic assignment through this method does not use the + consumer's group management functionality. As such, there will be + no rebalance operation triggered when group membership or cluster + and topic metadata change. + """ + self._subscription.assign_from_user(partitions) + self._client.set_topics([tp.topic for tp in partitions]) + + def assignment(self): + """Get the TopicPartitions currently assigned to this consumer. + + If partitions were directly assigned using assign(), then this will + simply return the same partitions that were previously assigned. + If topics were subscribed using subscribe(), then this will give the + set of topic partitions currently assigned to the consumer (which may + be none if the assignment hasn't happened yet, or if the partitions are + in the process of being reassigned). + + Returns: + set: {TopicPartition, ...} + """ + return self._subscription.assigned_partitions() + + def close(self): + """Close the consumer, waiting indefinitely for any needed cleanup.""" + if self._closed: + return + log.debug("Closing the KafkaConsumer.") + self._closed = True + self._coordinator.close() + #self.metrics.close() + self._client.close() + try: + self.config['key_deserializer'].close() + except AttributeError: + pass + try: + self.config['value_deserializer'].close() + except AttributeError: + pass + log.debug("The KafkaConsumer has closed.") + + def commit_async(self, offsets=None, callback=None): + """Commit offsets to kafka asynchronously, optionally firing callback + + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + This is an asynchronous call and will not block. Any errors encountered + are either passed to the callback (if provided) or discarded. + + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. + callback (callable, optional): called as callback(offsets, response) + with response as either an Exception or a OffsetCommitResponse + struct. This callback can be used to trigger custom actions when + a commit request completes. + + Returns: + kafka.future.Future + """ + assert self.config['api_version'] >= (0, 8, 1) + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + log.debug("Committing offsets: %s", offsets) + future = self._coordinator.commit_offsets_async( + offsets, callback=callback) + return future + + def commit(self, offsets=None): + """Commit offsets to kafka, blocking until success or error + + This commits offsets only to Kafka. The offsets committed using this API + will be used on the first fetch after every rebalance and also on + startup. As such, if you need to store offsets in anything other than + Kafka, this API should not be used. + + Blocks until either the commit succeeds or an unrecoverable error is + encountered (in which case it is thrown to the caller). + + Currently only supports kafka-topic offset storage (not zookeeper) + + Arguments: + offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict + to commit with the configured group_id. Defaults to current + consumed offsets for all subscribed partitions. + """ + assert self.config['api_version'] >= (0, 8, 1) + if offsets is None: + offsets = self._subscription.all_consumed_offsets() + self._coordinator.commit_offsets_sync(offsets) + + def committed(self, partition): + """Get the last committed offset for the given partition + + This offset will be used as the position for the consumer + in the event of a failure. + + This call may block to do a remote call if the partition in question + isn't assigned to this consumer or if the consumer hasn't yet + initialized its cache of committed offsets. + + Arguments: + partition (TopicPartition): the partition to check + + Returns: + The last committed offset, or None if there was no prior commit. + """ + assert self.config['api_version'] >= (0, 8, 1) + if self._subscription.is_assigned(partition): + committed = self._subscription.assignment[partition].committed + if committed is None: + self._coordinator.refresh_committed_offsets_if_needed() + committed = self._subscription.assignment[partition].committed + else: + commit_map = self._coordinator.fetch_committed_offsets([partition]) + if partition in commit_map: + committed = commit_map[partition].offset + else: + committed = None + return committed + + def topics(self): + """Get all topic metadata topics the user is authorized to view. + + [Not Implemented Yet] + + Returns: + {topic: [partition_info]} + """ + raise NotImplementedError('TODO') + + def partitions_for_topic(self, topic): + """Get metadata about the partitions for a given topic. + + Arguments: + topic (str): topic to check + + Returns: + set: partition ids + """ + return self._client.cluster.partitions_for_topic(topic) + + def poll(self, timeout_ms=0): + """Fetch data from assigned topics / partitions. + + Records are fetched and returned in batches by topic-partition. + On each poll, consumer will try to use the last consumed offset as the + starting offset and fetch sequentially. The last consumed offset can be + manually set through seek(partition, offset) or automatically set as + the last committed offset for the subscribed list of partitions. + + Incompatible with iterator interface -- use one or the other, not both. + + Arguments: + timeout_ms (int, optional): milliseconds to spend waiting in poll if + data is not available. If 0, returns immediately with any + records that are available now. Must not be negative. Default: 0 + + Returns: + dict: topic to list of records since the last fetch for the + subscribed list of topics and partitions + """ + assert timeout_ms >= 0, 'Timeout must not be negative' + assert self._iterator is None, 'Incompatible with iterator interface' + + # poll for new data until the timeout expires + start = time.time() + remaining = timeout_ms + while True: + records = self._poll_once(remaining) + if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + self._fetcher.init_fetches() + return records + + elapsed_ms = (time.time() - start) * 1000 + remaining = timeout_ms - elapsed_ms + + if remaining <= 0: + return {} + + def _poll_once(self, timeout_ms): + """ + Do one round of polling. In addition to checking for new data, this does + any needed heart-beating, auto-commits, and offset updates. + + Arguments: + timeout_ms (int): The maximum time in milliseconds to block + + Returns: + dict: map of topic to list of records (may be empty) + """ + if self.config['api_version'] >= (0, 8, 2): + # TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) + self._coordinator.ensure_coordinator_known() + + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + records = self._fetcher.fetched_records() + + # if data is available already, e.g. from a previous network client + # poll() call to commit, then just return it immediately + if records: + return records + + self._fetcher.init_fetches() + self._client.poll(timeout_ms / 1000.0) + return self._fetcher.fetched_records() + + def position(self, partition): + """Get the offset of the next record that will be fetched + + Arguments: + partition (TopicPartition): partition to check + """ + assert self._subscription.is_assigned(partition) + + offset = self._subscription.assignment[partition].consumed + if offset is None: + self._update_fetch_positions(partition) + offset = self._subscription.assignment[partition].consumed + return offset + + def pause(self, *partitions): + """Suspend fetching from the requested partitions. + + Future calls to poll() will not return any records from these partitions + until they have been resumed using resume(). Note that this method does + not affect partition subscription. In particular, it does not cause a + group rebalance when automatic assignment is used. + + Arguments: + *partitions (TopicPartition): partitions to pause + """ + for partition in partitions: + log.debug("Pausing partition %s", partition) + self._subscription.pause(partition) + + def resume(self, *partitions): + """Resume fetching from the specified (paused) partitions. + + Arguments: + *partitions (TopicPartition): partitions to resume + """ + for partition in partitions: + log.debug("Resuming partition %s", partition) + self._subscription.resume(partition) + + def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ + assert offset >= 0 + log.debug("Seeking to offset %s for partition %s", offset, partition) + self._subscription.assignment[partition].seek(offset) + + def seek_to_beginning(self, *partitions): + """Seek to the oldest available offset for partitions. + + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to beginning of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) + + def seek_to_end(self, *partitions): + """Seek to the most recent available offset for partitions. + + Arguments: + *partitions: optionally provide specific TopicPartitions, otherwise + default to all assigned partitions + """ + if not partitions: + partitions = self._subscription.assigned_partitions() + for tp in partitions: + log.debug("Seeking to end of partition %s", tp) + self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) + + def subscribe(self, topics=(), pattern=None, listener=None): + """Subscribe to a list of topics, or a topic regex pattern + + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). + + This method is incompatible with assign() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. + """ + if not topics: + self.unsubscribe() + else: + self._subscription.subscribe(topics=topics, + pattern=pattern, + listener=listener) + # regex will need all topic metadata + if pattern is not None: + self._client.cluster.need_metadata_for_all = True + log.debug("Subscribed to topic pattern: %s", topics) + else: + self._client.set_topics(self._subscription.group_subscription()) + log.debug("Subscribed to topic(s): %s", topics) + + def subscription(self): + """Get the current topic subscription. + + Returns: + set: {topic, ...} + """ + return self._subscription.subscription + + def unsubscribe(self): + """Unsubscribe from all topics and clear all assigned partitions.""" + self._subscription.unsubscribe() + self._coordinator.close() + self._client.cluster.need_metadata_for_all_topics = False + log.debug("Unsubscribed all topics or patterns and assigned partitions") + + def _update_fetch_positions(self, partitions): + """ + Set the fetch position to the committed position (if there is one) + or reset it using the offset reset policy the user has configured. + + Arguments: + partitions (List[TopicPartition]): The partitions that need + updating fetch positions + + Raises: + NoOffsetForPartitionError: If no offset is stored for a given + partition and no offset reset policy is defined + """ + if self.config['api_version'] >= (0, 8, 1): + # refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed() + + # then do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions) + + def _message_generator(self): + while time.time() < self._consumer_timeout: + if self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + if self.config['api_version'] >= (0, 9): + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + self._fetcher.init_fetches() + self._client.poll(self.config['request_timeout_ms'] / 1000.0) + timeout = self._consumer_timeout + if self.config['api_version'] >= (0, 9): + heartbeat_timeout = time.time() + ( + self.config['heartbeat_interval_ms'] / 1000.0) + timeout = min(heartbeat_timeout, timeout) + for msg in self._fetcher: + yield msg + if time.time() > timeout: + break + + def __iter__(self): # pylint: disable=non-iterator-returned + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) + + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + + # old KafkaConsumer methods are deprecated + def configure(self, **configs): + raise NotImplementedError( + 'deprecated -- initialize a new consumer') + + def set_topic_partitions(self, *topics): + raise NotImplementedError( + 'deprecated -- use subscribe() or assign()') + + def fetch_messages(self): + raise NotImplementedError( + 'deprecated -- use poll() or iterator interface') + + def get_partition_offsets(self, topic, partition, + request_time_ms, max_num_offsets): + raise NotImplementedError( + 'deprecated -- send an OffsetRequest with KafkaClient') + + def offsets(self, group=None): + raise NotImplementedError('deprecated -- use committed(partition)') + + def task_done(self, message): + raise NotImplementedError( + 'deprecated -- commit offsets manually if needed') |