summaryrefslogtreecommitdiff
path: root/kafka/consumer/subscription_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r--kafka/consumer/subscription_state.py462
1 files changed, 462 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
new file mode 100644
index 0000000..c60f192
--- /dev/null
+++ b/kafka/consumer/subscription_state.py
@@ -0,0 +1,462 @@
+from __future__ import absolute_import
+
+import abc
+import logging
+import re
+
+import six
+
+from kafka.common import IllegalStateError, OffsetAndMetadata
+from kafka.protocol.offset import OffsetResetStrategy
+
+log = logging.getLogger(__name__)
+
+
+class SubscriptionState(object):
+ """
+ A class for tracking the topics, partitions, and offsets for the consumer.
+ A partition is "assigned" either directly with assign_from_user() (manual
+ assignment) or with assign_from_subscribed() (automatic assignment from
+ subscription).
+
+ Once assigned, the partition is not considered "fetchable" until its initial
+ position has been set with seek(). Fetchable partitions track a fetch
+ position which is used to set the offset of the next fetch, and a consumed
+ position which is the last offset that has been returned to the user. You
+ can suspend fetching from a partition through pause() without affecting the
+ fetched/consumed offsets. The partition will remain unfetchable until the
+ resume() is used. You can also query the pause state independently with
+ is_paused().
+
+ Note that pause state as well as fetch/consumed positions are not preserved
+ when partition assignment is changed whether directly by the user or
+ through a group rebalance.
+
+ This class also maintains a cache of the latest commit position for each of
+ the assigned partitions. This is updated through committed() and can be used
+ to set the initial fetch position (e.g. Fetcher._reset_offset() ).
+ """
+ _SUBSCRIPTION_EXCEPTION_MESSAGE = ("Subscription to topics, partitions and"
+ " pattern are mutually exclusive")
+
+ def __init__(self, offset_reset_strategy='earliest'):
+ """Initialize a SubscriptionState instance
+
+ Keyword Arguments:
+ offset_reset_strategy: 'earliest' or 'latest', otherwise
+ exception will be raised when fetching an offset that is no
+ longer available. Default: 'earliest'
+ """
+ try:
+ offset_reset_strategy = getattr(OffsetResetStrategy,
+ offset_reset_strategy.upper())
+ except AttributeError:
+ log.warning('Unrecognized offset_reset_strategy, using NONE')
+ offset_reset_strategy = OffsetResetStrategy.NONE
+ self._default_offset_reset_strategy = offset_reset_strategy
+
+ self.subscription = None # set() or None
+ self.subscribed_pattern = None # regex str or None
+ self._group_subscription = set()
+ self._user_assignment = set()
+ self.assignment = dict()
+ self.needs_partition_assignment = False
+ self.listener = None
+
+ # initialize to true for the consumers to fetch offset upon starting up
+ self.needs_fetch_committed_offsets = True
+
+ def subscribe(self, topics=(), pattern=None, listener=None):
+ """Subscribe to a list of topics, or a topic regex pattern.
+
+ Partitions will be dynamically assigned via a group coordinator.
+ Topic subscriptions are not incremental: this list will replace the
+ current assignment (if there is one).
+
+ This method is incompatible with assign_from_user()
+
+ Arguments:
+ topics (list): List of topics for subscription.
+ pattern (str): Pattern to match available topics. You must provide
+ either topics or pattern, but not both.
+ listener (ConsumerRebalanceListener): Optionally include listener
+ callback, which will be called before and after each rebalance
+ operation.
+
+ As part of group management, the consumer will keep track of the
+ list of consumers that belong to a particular group and will
+ trigger a rebalance operation if one of the following events
+ trigger:
+
+ * Number of partitions change for any of the subscribed topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to the consumer group
+
+ When any of these events are triggered, the provided listener
+ will be invoked first to indicate that the consumer's assignment
+ has been revoked, and then again when the new assignment has
+ been received. Note that this listener will immediately override
+ any listener set in a previous call to subscribe. It is
+ guaranteed, however, that the partitions revoked/assigned
+ through this interface are from topics subscribed in this call.
+ """
+ if self._user_assignment or (topics and pattern):
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ assert topics or pattern, 'Must provide topics or pattern'
+
+ if pattern:
+ log.info('Subscribing to pattern: /%s/', pattern)
+ self.subscription = set()
+ self.subscribed_pattern = re.compile(pattern)
+ else:
+ self.change_subscription(topics)
+
+ if listener and not isinstance(listener, ConsumerRebalanceListener):
+ raise TypeError('listener must be a ConsumerRebalanceListener')
+ self.listener = listener
+
+ def change_subscription(self, topics):
+ """Change the topic subscription.
+
+ Arguments:
+ topics (list of str): topics for subscription
+
+ Raises:
+ IllegalStateErrror: if assign_from_user has been used already
+ """
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ if self.subscription == set(topics):
+ log.warning("subscription unchanged by change_subscription(%s)",
+ topics)
+ return
+
+ log.info('Updating subscribed topics to: %s', topics)
+ self.subscription = set(topics)
+ self._group_subscription.update(topics)
+ self.needs_partition_assignment = True
+
+ # Remove any assigned partitions which are no longer subscribed to
+ for tp in set(self.assignment.keys()):
+ if tp.topic not in self.subscription:
+ del self.assignment[tp]
+
+ def group_subscribe(self, topics):
+ """Add topics to the current group subscription.
+
+ This is used by the group leader to ensure that it receives metadata
+ updates for all topics that any member of the group is subscribed to.
+
+ Arguments:
+ topics (list of str): topics to add to the group subscription
+ """
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ self._group_subscription.update(topics)
+
+ def mark_for_reassignment(self):
+ self._group_subscription.intersection_update(self.subscription)
+ self.needs_partition_assignment = True
+
+ def assign_from_user(self, partitions):
+ """Manually assign a list of TopicPartitions to this consumer.
+
+ This interface does not allow for incremental assignment and will
+ replace the previous assignment (if there was one).
+
+ Manual topic assignment through this method does not use the consumer's
+ group management functionality. As such, there will be no rebalance
+ operation triggered when group membership or cluster and topic metadata
+ change. Note that it is not possible to use both manual partition
+ assignment with assign() and group assignment with subscribe().
+
+ Arguments:
+ partitions (list of TopicPartition): assignment for this instance.
+
+ Raises:
+ IllegalStateError: if consumer has already called subscribe()
+ """
+ if self.subscription is not None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ self._user_assignment.clear()
+ self._user_assignment.update(partitions)
+
+ for partition in partitions:
+ if partition not in self.assignment:
+ self._add_assigned_partition(partition)
+
+ for tp in set(self.assignment.keys()) - self._user_assignment:
+ del self.assignment[tp]
+
+ self.needs_partition_assignment = False
+
+ def assign_from_subscribed(self, assignments):
+ """Update the assignment to the specified partitions
+
+ This method is called by the coordinator to dynamically assign
+ partitions based on the consumer's topic subscription. This is different
+ from assign_from_user() which directly sets the assignment from a
+ user-supplied TopicPartition list.
+
+ Arguments:
+ assignments (list of TopicPartition): partitions to assign to this
+ consumer instance.
+ """
+ if self.subscription is None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ for tp in assignments:
+ if tp.topic not in self.subscription:
+ raise ValueError("Assigned partition %s for non-subscribed topic." % tp)
+ self.assignment.clear()
+ for tp in assignments:
+ self._add_assigned_partition(tp)
+ self.needs_partition_assignment = False
+ log.info("Updated partition assignment: %s", assignments)
+
+ def unsubscribe(self):
+ """Clear all topic subscriptions and partition assignments"""
+ self.subscription = None
+ self._user_assignment.clear()
+ self.assignment.clear()
+ self.needs_partition_assignment = True
+ self.subscribed_pattern = None
+
+ def group_subscription(self):
+ """Get the topic subscription for the group.
+
+ For the leader, this will include the union of all member subscriptions.
+ For followers, it is the member's subscription only.
+
+ This is used when querying topic metadata to detect metadata changes
+ that would require rebalancing (the leader fetches metadata for all
+ topics in the group so that it can do partition assignment).
+
+ Returns:
+ set: topics
+ """
+ return self._group_subscription
+
+ def seek(self, partition, offset):
+ """Manually specify the fetch offset for a TopicPartition.
+
+ Overrides the fetch offsets that the consumer will use on the next
+ poll(). If this API is invoked for the same partition more than once,
+ the latest offset will be used on the next poll(). Note that you may
+ lose data if this API is arbitrarily used in the middle of consumption,
+ to reset the fetch offsets.
+
+ Arguments:
+ partition (TopicPartition): partition for seek operation
+ offset (int): message offset in partition
+ """
+ self.assignment[partition].seek(offset)
+
+ def assigned_partitions(self):
+ """Return set of TopicPartitions in current assignment."""
+ return set(self.assignment.keys())
+
+ def fetchable_partitions(self):
+ """Return set of TopicPartitions that should be Fetched."""
+ fetchable = set()
+ for partition, state in six.iteritems(self.assignment):
+ if state.is_fetchable():
+ fetchable.add(partition)
+ return fetchable
+
+ def partitions_auto_assigned(self):
+ """Return True unless user supplied partitions manually."""
+ return self.subscription is not None
+
+ def all_consumed_offsets(self):
+ """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}"""
+ all_consumed = {}
+ for partition, state in six.iteritems(self.assignment):
+ if state.has_valid_position:
+ all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
+ return all_consumed
+
+ def need_offset_reset(self, partition, offset_reset_strategy=None):
+ """Mark partition for offset reset using specified or default strategy.
+
+ Arguments:
+ partition (TopicPartition): partition to mark
+ offset_reset_strategy (OffsetResetStrategy, optional)
+ """
+ if offset_reset_strategy is None:
+ offset_reset_strategy = self._default_offset_reset_strategy
+ self.assignment[partition].await_reset(offset_reset_strategy)
+
+ def has_default_offset_reset_policy(self):
+ """Return True if default offset reset policy is Earliest or Latest"""
+ return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
+
+ def is_offset_reset_needed(self, partition):
+ return self.assignment[partition].awaiting_reset
+
+ def has_all_fetch_positions(self):
+ for state in self.assignment.values():
+ if not state.has_valid_position:
+ return False
+ return True
+
+ def missing_fetch_positions(self):
+ missing = set()
+ for partition, state in six.iteritems(self.assignment):
+ if not state.has_valid_position:
+ missing.add(partition)
+ return missing
+
+ def is_assigned(self, partition):
+ return partition in self.assignment
+
+ def is_paused(self, partition):
+ return partition in self.assignment and self.assignment[partition].paused
+
+ def is_fetchable(self, partition):
+ return partition in self.assignment and self.assignment[partition].is_fetchable()
+
+ def pause(self, partition):
+ self.assignment[partition].pause()
+
+ def resume(self, partition):
+ self.assignment[partition].resume()
+
+ def _add_assigned_partition(self, partition):
+ self.assignment[partition] = TopicPartitionState()
+
+
+class TopicPartitionState(object):
+ def __init__(self):
+ self.committed = None # last committed position
+ self.has_valid_position = False # whether we have valid consumed and fetched positions
+ self.paused = False # whether this partition has been paused by the user
+ self.awaiting_reset = False # whether we are awaiting reset
+ self.reset_strategy = None # the reset strategy if awaitingReset is set
+ self._consumed = None # offset exposed to the user
+ self._fetched = None # current fetch position
+
+ def _set_fetched(self, offset):
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
+ self._fetched = offset
+
+ def _get_fetched(self):
+ return self._fetched
+
+ fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
+
+ def _set_consumed(self, offset):
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
+ self._consumed = offset
+
+ def _get_consumed(self):
+ return self._consumed
+
+ consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
+
+ def await_reset(self, strategy):
+ self.awaiting_reset = True
+ self.reset_strategy = strategy
+ self._consumed = None
+ self._fetched = None
+ self.has_valid_position = False
+
+ def seek(self, offset):
+ self._consumed = offset
+ self._fetched = offset
+ self.awaiting_reset = False
+ self.reset_strategy = None
+ self.has_valid_position = True
+
+ def pause(self):
+ self.paused = True
+
+ def resume(self):
+ self.paused = False
+
+ def is_fetchable(self):
+ return not self.paused and self.has_valid_position
+
+
+class ConsumerRebalanceListener(object):
+ """
+ A callback interface that the user can implement to trigger custom actions
+ when the set of partitions assigned to the consumer changes.
+
+ This is applicable when the consumer is having Kafka auto-manage group
+ membership. If the consumer's directly assign partitions, those
+ partitions will never be reassigned and this callback is not applicable.
+
+ When Kafka is managing the group membership, a partition re-assignment will
+ be triggered any time the members of the group changes or the subscription
+ of the members changes. This can occur when processes die, new process
+ instances are added or old instances come back to life after failure.
+ Rebalances can also be triggered by changes affecting the subscribed
+ topics (e.g. when then number of partitions is administratively adjusted).
+
+ There are many uses for this functionality. One common use is saving offsets
+ in a custom store. By saving offsets in the on_partitions_revoked(), call we
+ can ensure that any time partition assignment changes the offset gets saved.
+
+ Another use is flushing out any kind of cache of intermediate results the
+ consumer may be keeping. For example, consider a case where the consumer is
+ subscribed to a topic containing user page views, and the goal is to count
+ the number of page views per users for each five minute window. Let's say
+ the topic is partitioned by the user id so that all events for a particular
+ user will go to a single consumer instance. The consumer can keep in memory
+ a running tally of actions per user and only flush these out to a remote
+ data store when its cache gets too big. However if a partition is reassigned
+ it may want to automatically trigger a flush of this cache, before the new
+ owner takes over consumption.
+
+ This callback will execute in the user thread as part of the Consumer.poll()
+ whenever partition assignment changes.
+
+ It is guaranteed that all consumer processes will invoke
+ on_partitions_revoked() prior to any process invoking
+ on_partitions_assigned(). So if offsets or other state is saved in the
+ on_partitions_revoked() call, it should be saved by the time the process
+ taking over that partition has their on_partitions_assigned() callback
+ called to load the state.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def on_partitions_revoked(self, revoked):
+ """
+ A callback method the user can implement to provide handling of offset
+ commits to a customized store on the start of a rebalance operation.
+ This method will be called before a rebalance operation starts and
+ after the consumer stops fetching data. It is recommended that offsets
+ should be committed in this callback to either Kafka or a custom offset
+ store to prevent duplicate data.
+
+ NOTE: This method is only called before rebalances. It is not called
+ prior to KafkaConsumer.close()
+
+ Arguments:
+ revoked (list of TopicPartition): the partitions that were assigned
+ to the consumer on the last rebalance
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_partitions_assigned(self, assigned):
+ """
+ A callback method the user can implement to provide handling of
+ customized offsets on completion of a successful partition
+ re-assignment. This method will be called after an offset re-assignment
+ completes and before the consumer starts fetching data.
+
+ It is guaranteed that all the processes in a consumer group will execute
+ their on_partitions_revoked() callback before any instance executes its
+ on_partitions_assigned() callback.
+
+ Arguments:
+ assigned (list of TopicPartition): the partitions assigned to the
+ consumer (may include partitions that were previously assigned)
+ """
+ pass