diff options
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/assignors/sticky/__init__.py | 0 | ||||
-rw-r--r-- | kafka/coordinator/assignors/sticky/partition_movements.py | 149 | ||||
-rw-r--r-- | kafka/coordinator/assignors/sticky/sorted_set.py | 63 | ||||
-rw-r--r-- | kafka/coordinator/assignors/sticky/sticky_assignor.py | 681 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 5 |
5 files changed, 897 insertions, 1 deletions
diff --git a/kafka/coordinator/assignors/sticky/__init__.py b/kafka/coordinator/assignors/sticky/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/__init__.py diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py new file mode 100644 index 0000000..8851e4c --- /dev/null +++ b/kafka/coordinator/assignors/sticky/partition_movements.py @@ -0,0 +1,149 @@ +import logging +from collections import defaultdict, namedtuple +from copy import deepcopy + +from kafka.vendor import six + +log = logging.getLogger(__name__) + + +ConsumerPair = namedtuple("ConsumerPair", ["src_member_id", "dst_member_id"]) +""" +Represents a pair of Kafka consumer ids involved in a partition reassignment. +Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some +partition of the particular topic was moved from the source consumer to the destination consumer +during the rebalance. This class helps in determining whether a partition reassignment results in cycles among +the generated graph of consumer pairs. +""" + + +def is_sublist(source, target): + """Checks if one list is a sublist of another. + + Arguments: + source: the list in which to search for the occurrence of target. + target: the list to search for as a sublist of source + + Returns: + true if target is in source; false otherwise + """ + for index in (i for i, e in enumerate(source) if e == target[0]): + if tuple(source[index: index + len(target)]) == target: + return True + return False + + +class PartitionMovements: + """ + This class maintains some data structures to simplify lookup of partition movements among consumers. + At each point of time during a partition rebalance it keeps track of partition movements + corresponding to each topic, and also possible movement (in form a ConsumerPair object) for each partition. + """ + + def __init__(self): + self.partition_movements_by_topic = defaultdict( + lambda: defaultdict(set) + ) + self.partition_movements = {} + + def move_partition(self, partition, old_consumer, new_consumer): + pair = ConsumerPair(src_member_id=old_consumer, dst_member_id=new_consumer) + if partition in self.partition_movements: + # this partition has previously moved + existing_pair = self._remove_movement_record_of_partition(partition) + assert existing_pair.dst_member_id == old_consumer + if existing_pair.src_member_id != new_consumer: + # the partition is not moving back to its previous consumer + self._add_partition_movement_record( + partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer) + ) + else: + self._add_partition_movement_record(partition, pair) + + def get_partition_to_be_moved(self, partition, old_consumer, new_consumer): + if partition.topic not in self.partition_movements_by_topic: + return partition + if partition in self.partition_movements: + # this partition has previously moved + assert old_consumer == self.partition_movements[partition].dst_member_id + old_consumer = self.partition_movements[partition].src_member_id + reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer) + if reverse_pair not in self.partition_movements_by_topic[partition.topic]: + return partition + + return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) + + def are_sticky(self): + for topic, movements in six.iteritems(self.partition_movements_by_topic): + movement_pairs = set(movements.keys()) + if self._has_cycles(movement_pairs): + log.error( + "Stickiness is violated for topic {}\n" + "Partition movements for this topic occurred among the following consumer pairs:\n" + "{}".format(topic, movement_pairs) + ) + return False + return True + + def _remove_movement_record_of_partition(self, partition): + pair = self.partition_movements[partition] + del self.partition_movements[partition] + + self.partition_movements_by_topic[partition.topic][pair].remove(partition) + if not self.partition_movements_by_topic[partition.topic][pair]: + del self.partition_movements_by_topic[partition.topic][pair] + if not self.partition_movements_by_topic[partition.topic]: + del self.partition_movements_by_topic[partition.topic] + + return pair + + def _add_partition_movement_record(self, partition, pair): + self.partition_movements[partition] = pair + self.partition_movements_by_topic[partition.topic][pair].add(partition) + + def _has_cycles(self, consumer_pairs): + cycles = set() + for pair in consumer_pairs: + reduced_pairs = deepcopy(consumer_pairs) + reduced_pairs.remove(pair) + path = [pair.src_member_id] + if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle( + path, cycles + ): + cycles.add(tuple(path)) + log.error("A cycle of length {} was found: {}".format(len(path) - 1, path)) + + # for now we want to make sure there is no partition movements of the same topic between a pair of consumers. + # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized + # tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases. + for cycle in cycles: + if len(cycle) == 3: # indicates a cycle of length 2 + return True + return False + + @staticmethod + def _is_subcycle(cycle, cycles): + super_cycle = deepcopy(cycle) + super_cycle = super_cycle[:-1] + super_cycle.extend(cycle) + for found_cycle in cycles: + if len(found_cycle) == len(cycle) and is_sublist(super_cycle, found_cycle): + return True + return False + + def _is_linked(self, src, dst, pairs, current_path): + if src == dst: + return False + if not pairs: + return False + if ConsumerPair(src, dst) in pairs: + current_path.append(src) + current_path.append(dst) + return True + for pair in pairs: + if pair.src_member_id == src: + reduced_set = deepcopy(pairs) + reduced_set.remove(pair) + current_path.append(pair.src_member_id) + return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path) + return False diff --git a/kafka/coordinator/assignors/sticky/sorted_set.py b/kafka/coordinator/assignors/sticky/sorted_set.py new file mode 100644 index 0000000..6a454a4 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/sorted_set.py @@ -0,0 +1,63 @@ +class SortedSet: + def __init__(self, iterable=None, key=None): + self._key = key if key is not None else lambda x: x + self._set = set(iterable) if iterable is not None else set() + + self._cached_last = None + self._cached_first = None + + def first(self): + if self._cached_first is not None: + return self._cached_first + + first = None + for element in self._set: + if first is None or self._key(first) > self._key(element): + first = element + self._cached_first = first + return first + + def last(self): + if self._cached_last is not None: + return self._cached_last + + last = None + for element in self._set: + if last is None or self._key(last) < self._key(element): + last = element + self._cached_last = last + return last + + def pop_last(self): + value = self.last() + self._set.remove(value) + self._cached_last = None + return value + + def add(self, value): + if self._cached_last is not None and self._key(value) > self._key(self._cached_last): + self._cached_last = value + if self._cached_first is not None and self._key(value) < self._key(self._cached_first): + self._cached_first = value + + return self._set.add(value) + + def remove(self, value): + if self._cached_last is not None and self._cached_last == value: + self._cached_last = None + if self._cached_first is not None and self._cached_first == value: + self._cached_first = None + + return self._set.remove(value) + + def __contains__(self, value): + return value in self._set + + def __iter__(self): + return iter(sorted(self._set, key=self._key)) + + def _bool(self): + return len(self._set) != 0 + + __nonzero__ = _bool + __bool__ = _bool diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py new file mode 100644 index 0000000..7827086 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -0,0 +1,681 @@ +import logging +from collections import defaultdict, namedtuple +from copy import deepcopy + +from kafka.cluster import ClusterMetadata +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from kafka.coordinator.assignors.sticky.sorted_set import SortedSet +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.coordinator.protocol import Schema +from kafka.protocol.struct import Struct +from kafka.protocol.types import String, Array, Int32 +from kafka.structs import TopicPartition +from kafka.vendor import six + +log = logging.getLogger(__name__) + +ConsumerGenerationPair = namedtuple("ConsumerGenerationPair", ["consumer", "generation"]) + + +def has_identical_list_elements(list_): + """Checks if all lists in the collection have the same members + + Arguments: + list_: collection of lists + + Returns: + true if all lists in the collection have the same members; false otherwise + """ + if not list_: + return True + for i in range(1, len(list_)): + if list_[i] != list_[i - 1]: + return False + return True + + +def subscriptions_comparator_key(element): + return len(element[1]), element[0] + + +def partitions_comparator_key(element): + return len(element[1]), element[0].topic, element[0].partition + + +def remove_if_present(collection, element): + try: + collection.remove(element) + except (ValueError, KeyError): + pass + + +StickyAssignorMemberMetadataV1 = namedtuple("StickyAssignorMemberMetadataV1", + ["subscription", "partitions", "generation"]) + + +class StickyAssignorUserDataV1(Struct): + """ + Used for preserving consumer's previously assigned partitions + list and sending it as user data to the leader during a rebalance + """ + + SCHEMA = Schema( + ("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32) + ) + + +class StickyAssignmentExecutor: + def __init__(self, cluster, members): + self.members = members + # a mapping between consumers and their assigned partitions that is updated during assignment procedure + self.current_assignment = defaultdict(list) + # an assignment from a previous generation + self.previous_assignment = {} + # a mapping between partitions and their assigned consumers + self.current_partition_consumer = {} + # a flag indicating that there were no previous assignments performed ever + self.is_fresh_assignment = False + # a mapping of all topic partitions to all consumers that can be assigned to them + self.partition_to_all_potential_consumers = {} + # a mapping of all consumers to all potential topic partitions that can be assigned to them + self.consumer_to_all_potential_partitions = {} + # an ascending sorted set of consumers based on how many topic partitions are already assigned to them + self.sorted_current_subscriptions = SortedSet() + # an ascending sorted list of topic partitions based on how many consumers can potentially use them + self.sorted_partitions = [] + # all partitions that need to be assigned + self.unassigned_partitions = [] + # a flag indicating that a certain partition cannot remain assigned to its current consumer because the consumer + # is no longer subscribed to its topic + self.revocation_required = False + + self.partition_movements = PartitionMovements() + self._initialize(cluster) + + def perform_initial_assignment(self): + self._populate_sorted_partitions() + self._populate_partitions_to_reassign() + + def balance(self): + self._initialize_current_subscriptions() + initializing = len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) == 0 + + # assign all unassigned partitions + for partition in self.unassigned_partitions: + # skip if there is no potential consumer for the partition + if not self.partition_to_all_potential_consumers[partition]: + continue + self._assign_partition(partition) + + # narrow down the reassignment scope to only those partitions that can actually be reassigned + fixed_partitions = set() + for partition in six.iterkeys(self.partition_to_all_potential_consumers): + if not self._can_partition_participate_in_reassignment(partition): + fixed_partitions.add(partition) + for fixed_partition in fixed_partitions: + remove_if_present(self.sorted_partitions, fixed_partition) + remove_if_present(self.unassigned_partitions, fixed_partition) + + # narrow down the reassignment scope to only those consumers that are subject to reassignment + fixed_assignments = {} + for consumer in six.iterkeys(self.consumer_to_all_potential_partitions): + if not self._can_consumer_participate_in_reassignment(consumer): + self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + fixed_assignments[consumer] = self.current_assignment[consumer] + del self.current_assignment[consumer] + + # create a deep copy of the current assignment so we can revert to it + # if we do not get a more balanced assignment later + prebalance_assignment = deepcopy(self.current_assignment) + prebalance_partition_consumers = deepcopy(self.current_partition_consumer) + + # if we don't already need to revoke something due to subscription changes, + # first try to balance by only moving newly added partitions + if not self.revocation_required: + self._perform_reassignments(self.unassigned_partitions) + reassignment_performed = self._perform_reassignments(self.sorted_partitions) + + # if we are not preserving existing assignments and we have made changes to the current assignment + # make sure we are getting a more balanced assignment; otherwise, revert to previous assignment + if ( + not initializing + and reassignment_performed + and self._get_balance_score(self.current_assignment) >= self._get_balance_score(prebalance_assignment) + ): + self.current_assignment = prebalance_assignment + self.current_partition_consumer.clear() + self.current_partition_consumer.update(prebalance_partition_consumers) + + # add the fixed assignments (those that could not change) back + for consumer, partitions in six.iteritems(fixed_assignments): + self.current_assignment[consumer] = partitions + self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) + + def get_final_assignment(self, member_id): + assignment = defaultdict(list) + for topic_partition in self.current_assignment[member_id]: + assignment[topic_partition.topic].append(topic_partition.partition) + assignment = {k: sorted(v) for k, v in six.iteritems(assignment)} + return six.viewitems(assignment) + + def _initialize(self, cluster): + self._init_current_assignments(self.members) + + for topic in cluster.topics(): + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning("No partition metadata for topic %s", topic) + continue + for p in partitions: + partition = TopicPartition(topic=topic, partition=p) + self.partition_to_all_potential_consumers[partition] = [] + for consumer_id, member_metadata in six.iteritems(self.members): + self.consumer_to_all_potential_partitions[consumer_id] = [] + for topic in member_metadata.subscription: + if cluster.partitions_for_topic(topic) is None: + log.warning("No partition metadata for topic {}".format(topic)) + continue + for p in cluster.partitions_for_topic(topic): + partition = TopicPartition(topic=topic, partition=p) + self.consumer_to_all_potential_partitions[consumer_id].append(partition) + self.partition_to_all_potential_consumers[partition].append(consumer_id) + if consumer_id not in self.current_assignment: + self.current_assignment[consumer_id] = [] + + def _init_current_assignments(self, members): + # we need to process subscriptions' user data with each consumer's reported generation in mind + # higher generations overwrite lower generations in case of a conflict + # note that a conflict could exists only if user data is for different generations + + # for each partition we create a map of its consumers by generation + sorted_partition_consumers_by_generation = {} + for consumer, member_metadata in six.iteritems(members): + for partitions in member_metadata.partitions: + if partitions in sorted_partition_consumers_by_generation: + consumers = sorted_partition_consumers_by_generation[partitions] + if member_metadata.generation and member_metadata.generation in consumers: + # same partition is assigned to two consumers during the same rebalance. + # log a warning and skip this record + log.warning( + "Partition {} is assigned to multiple consumers " + "following sticky assignment generation {}.".format(partitions, member_metadata.generation) + ) + else: + consumers[member_metadata.generation] = consumer + else: + sorted_consumers = {member_metadata.generation: consumer} + sorted_partition_consumers_by_generation[partitions] = sorted_consumers + + # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition + # current and previous consumers are the last two consumers of each partition in the above sorted map + for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation): + generations = sorted(consumers.keys(), reverse=True) + self.current_assignment[consumers[generations[0]]].append(partitions) + # now update previous assignment if any + if len(generations) > 1: + self.previous_assignment[partitions] = ConsumerGenerationPair( + consumer=consumers[generations[1]], generation=generations[1] + ) + + self.is_fresh_assignment = len(self.current_assignment) == 0 + + for consumer_id, partitions in six.iteritems(self.current_assignment): + for partition in partitions: + self.current_partition_consumer[partition] = consumer_id + + def _are_subscriptions_identical(self): + """ + Returns: + true, if both potential consumers of partitions and potential partitions that consumers can + consume are the same + """ + if not has_identical_list_elements(list(six.itervalues(self.partition_to_all_potential_consumers))): + return False + return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions))) + + def _populate_sorted_partitions(self): + # set of topic partitions with their respective potential consumers + all_partitions = set((tp, tuple(consumers)) + for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers)) + partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key) + + self.sorted_partitions = [] + if not self.is_fresh_assignment and self._are_subscriptions_identical(): + # if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics) + # then we just need to simply list partitions in a round robin fashion (from consumers with + # most assigned partitions to those with least) + assignments = deepcopy(self.current_assignment) + for consumer_id, partitions in six.iteritems(assignments): + to_remove = [] + for partition in partitions: + if partition not in self.partition_to_all_potential_consumers: + to_remove.append(partition) + for partition in to_remove: + partitions.remove(partition) + + sorted_consumers = SortedSet( + iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(assignments)], + key=subscriptions_comparator_key, + ) + # at this point, sorted_consumers contains an ascending-sorted list of consumers based on + # how many valid partitions are currently assigned to them + while sorted_consumers: + # take the consumer with the most partitions + consumer, _ = sorted_consumers.pop_last() + # currently assigned partitions to this consumer + remaining_partitions = assignments[consumer] + # from partitions that had a different consumer before, + # keep only those that are assigned to this consumer now + previous_partitions = set(six.iterkeys(self.previous_assignment)).intersection(set(remaining_partitions)) + if previous_partitions: + # if there is a partition of this consumer that was assigned to another consumer before + # mark it as good options for reassignment + partition = previous_partitions.pop() + remaining_partitions.remove(partition) + self.sorted_partitions.append(partition) + sorted_consumers.add((consumer, tuple(assignments[consumer]))) + elif remaining_partitions: + # otherwise, mark any other one of the current partitions as a reassignment candidate + self.sorted_partitions.append(remaining_partitions.pop()) + sorted_consumers.add((consumer, tuple(assignments[consumer]))) + + while partitions_sorted_by_num_of_potential_consumers: + partition = partitions_sorted_by_num_of_potential_consumers.pop(0)[0] + if partition not in self.sorted_partitions: + self.sorted_partitions.append(partition) + else: + while partitions_sorted_by_num_of_potential_consumers: + self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0]) + + def _populate_partitions_to_reassign(self): + self.unassigned_partitions = deepcopy(self.sorted_partitions) + + assignments_to_remove = [] + for consumer_id, partitions in six.iteritems(self.current_assignment): + if consumer_id not in self.members: + # if a consumer that existed before (and had some partition assignments) is now removed, + # remove it from current_assignment + for partition in partitions: + del self.current_partition_consumer[partition] + assignments_to_remove.append(consumer_id) + else: + # otherwise (the consumer still exists) + partitions_to_remove = [] + for partition in partitions: + if partition not in self.partition_to_all_potential_consumers: + # if this topic partition of this consumer no longer exists + # remove it from current_assignment of the consumer + partitions_to_remove.append(partition) + elif partition.topic not in self.members[consumer_id].subscription: + # if this partition cannot remain assigned to its current consumer because the consumer + # is no longer subscribed to its topic remove it from current_assignment of the consumer + partitions_to_remove.append(partition) + self.revocation_required = True + else: + # otherwise, remove the topic partition from those that need to be assigned only if + # its current consumer is still subscribed to its topic (because it is already assigned + # and we would want to preserve that assignment as much as possible) + self.unassigned_partitions.remove(partition) + for partition in partitions_to_remove: + self.current_assignment[consumer_id].remove(partition) + del self.current_partition_consumer[partition] + for consumer_id in assignments_to_remove: + del self.current_assignment[consumer_id] + + def _initialize_current_subscriptions(self): + self.sorted_current_subscriptions = SortedSet( + iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(self.current_assignment)], + key=subscriptions_comparator_key, + ) + + def _get_consumer_with_least_subscriptions(self): + return self.sorted_current_subscriptions.first()[0] + + def _get_consumer_with_most_subscriptions(self): + return self.sorted_current_subscriptions.last()[0] + + def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): + self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) + + def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer): + self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer]))) + + def _is_balanced(self): + """Determines if the current assignment is a balanced one""" + if ( + len(self.current_assignment[self._get_consumer_with_least_subscriptions()]) + >= len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) - 1 + ): + # if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true + return True + + # create a mapping from partitions to the consumer assigned to them + all_assigned_partitions = {} + for consumer_id, consumer_partitions in six.iteritems(self.current_assignment): + for partition in consumer_partitions: + if partition in all_assigned_partitions: + log.error("{} is assigned to more than one consumer.".format(partition)) + all_assigned_partitions[partition] = consumer_id + + # for each consumer that does not have all the topic partitions it can get + # make sure none of the topic partitions it could but did not get cannot be moved to it + # (because that would break the balance) + for consumer, _ in self.sorted_current_subscriptions: + consumer_partition_count = len(self.current_assignment[consumer]) + # skip if this consumer already has all the topic partitions it can get + if consumer_partition_count == len(self.consumer_to_all_potential_partitions[consumer]): + continue + + # otherwise make sure it cannot get any more + for partition in self.consumer_to_all_potential_partitions[consumer]: + if partition not in self.current_assignment[consumer]: + other_consumer = all_assigned_partitions[partition] + other_consumer_partition_count = len(self.current_assignment[other_consumer]) + if consumer_partition_count < other_consumer_partition_count: + return False + return True + + def _assign_partition(self, partition): + for consumer, _ in self.sorted_current_subscriptions: + if partition in self.consumer_to_all_potential_partitions[consumer]: + self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self.current_assignment[consumer].append(partition) + self.current_partition_consumer[partition] = consumer + self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) + break + + def _can_partition_participate_in_reassignment(self, partition): + return len(self.partition_to_all_potential_consumers[partition]) >= 2 + + def _can_consumer_participate_in_reassignment(self, consumer): + current_partitions = self.current_assignment[consumer] + current_assignment_size = len(current_partitions) + max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) + if current_assignment_size > max_assignment_size: + log.error("The consumer {} is assigned more partitions than the maximum possible.".format(consumer)) + if current_assignment_size < max_assignment_size: + # if a consumer is not assigned all its potential partitions it is subject to reassignment + return True + for partition in current_partitions: + # if any of the partitions assigned to a consumer is subject to reassignment the consumer itself + # is subject to reassignment + if self._can_partition_participate_in_reassignment(partition): + return True + return False + + def _perform_reassignments(self, reassignable_partitions): + reassignment_performed = False + + # repeat reassignment until no partition can be moved to improve the balance + while True: + modified = False + # reassign all reassignable partitions until the full list is processed or a balance is achieved + # (starting from the partition with least potential consumers and if needed) + for partition in reassignable_partitions: + if self._is_balanced(): + break + # the partition must have at least two potential consumers + if len(self.partition_to_all_potential_consumers[partition]) <= 1: + log.error("Expected more than one potential consumer for partition {}".format(partition)) + # the partition must have a current consumer + consumer = self.current_partition_consumer.get(partition) + if consumer is None: + log.error("Expected partition {} to be assigned to a consumer".format(partition)) + + if ( + partition in self.previous_assignment + and len(self.current_assignment[consumer]) + > len(self.current_assignment[self.previous_assignment[partition].consumer]) + 1 + ): + self._reassign_partition_to_consumer( + partition, self.previous_assignment[partition].consumer, + ) + reassignment_performed = True + modified = True + continue + + # check if a better-suited consumer exist for the partition; if so, reassign it + for other_consumer in self.partition_to_all_potential_consumers[partition]: + if len(self.current_assignment[consumer]) > len(self.current_assignment[other_consumer]) + 1: + self._reassign_partition(partition) + reassignment_performed = True + modified = True + break + + if not modified: + break + return reassignment_performed + + def _reassign_partition(self, partition): + new_consumer = None + for another_consumer, _ in self.sorted_current_subscriptions: + if partition in self.consumer_to_all_potential_partitions[another_consumer]: + new_consumer = another_consumer + break + assert new_consumer is not None + self._reassign_partition_to_consumer(partition, new_consumer) + + def _reassign_partition_to_consumer(self, partition, new_consumer): + consumer = self.current_partition_consumer[partition] + # find the correct partition movement considering the stickiness requirement + partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer) + self._move_partition(partition_to_be_moved, new_consumer) + + def _move_partition(self, partition, new_consumer): + old_consumer = self.current_partition_consumer[partition] + self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer) + + self.partition_movements.move_partition(partition, old_consumer, new_consumer) + + self.current_assignment[old_consumer].remove(partition) + self.current_assignment[new_consumer].append(partition) + self.current_partition_consumer[partition] = new_consumer + + self._add_consumer_to_current_subscriptions_and_maintain_order(new_consumer) + self._add_consumer_to_current_subscriptions_and_maintain_order(old_consumer) + + @staticmethod + def _get_balance_score(assignment): + """Calculates a balance score of a give assignment + as the sum of assigned partitions size difference of all consumer pairs. + A perfectly balanced assignment (with all consumers getting the same number of partitions) + has a balance score of 0. Lower balance score indicates a more balanced assignment. + + Arguments: + assignment (dict): {consumer: list of assigned topic partitions} + + Returns: + the balance score of the assignment + """ + score = 0 + consumer_to_assignment = {} + for consumer_id, partitions in six.iteritems(assignment): + consumer_to_assignment[consumer_id] = len(partitions) + + consumers_to_explore = set(consumer_to_assignment.keys()) + for consumer_id in consumer_to_assignment.keys(): + if consumer_id in consumers_to_explore: + consumers_to_explore.remove(consumer_id) + for other_consumer_id in consumers_to_explore: + score += abs(consumer_to_assignment[consumer_id] - consumer_to_assignment[other_consumer_id]) + return score + + +class StickyPartitionAssignor(AbstractPartitionAssignor): + """ + https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy + + The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: + - the numbers of topic partitions assigned to consumers differ by at most one; or + - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it. + + Second, it preserved as many existing assignment as possible when a reassignment occurs. + This helps in saving some of the overhead processing when topic partitions move from one consumer to another. + + Starting fresh it would work by distributing the partitions over consumers as evenly as possible. + Even though this may sound similar to how round robin assignor works, the second example below shows that it is not. + During a reassignment it would perform the reassignment in such a way that in the new assignment + - topic partitions are still distributed as evenly as possible, and + - topic partitions stay with their previously assigned consumers as much as possible. + + The first goal above takes precedence over the second one. + + Example 1. + Suppose there are three consumers C0, C1, C2, + four topics t0, t1, t2, t3, and each topic has 2 partitions, + resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. + Each consumer is subscribed to all three topics. + + The assignment with both sticky and round robin assignors will be: + - C0: [t0p0, t1p1, t3p0] + - C1: [t0p1, t2p0, t3p1] + - C2: [t1p0, t2p1] + + Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce: + - C0: [t0p0, t1p0, t2p0, t3p0] + - C2: [t0p1, t1p1, t2p1, t3p1] + + while the sticky assignor would result in: + - C0 [t0p0, t1p1, t3p0, t2p0] + - C2 [t1p0, t2p1, t0p1, t3p1] + preserving all the previous assignments (unlike the round robin assignor). + + + Example 2. + There are three consumers C0, C1, C2, + and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. + Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. + C0 is subscribed to t0; + C1 is subscribed to t0, t1; + and C2 is subscribed to t0, t1, t2. + + The round robin assignor would come up with the following assignment: + - C0 [t0p0] + - C1 [t1p0] + - C2 [t1p1, t2p0, t2p1, t2p2] + + which is not as balanced as the assignment suggested by sticky assignor: + - C0 [t0p0] + - C1 [t1p0, t1p1] + - C2 [t2p0, t2p1, t2p2] + + Now, if consumer C0 is removed, these two assignors would produce the following assignments. + Round Robin (preserves 3 partition assignments): + - C1 [t0p0, t1p1] + - C2 [t1p0, t2p0, t2p1, t2p2] + + Sticky (preserves 5 partition assignments): + - C1 [t1p0, t1p1, t0p0] + - C2 [t2p0, t2p1, t2p2] + """ + + DEFAULT_GENERATION_ID = -1 + + name = "sticky" + version = 0 + + member_assignment = None + generation = DEFAULT_GENERATION_ID + + _latest_partition_movements = None + + @classmethod + def assign(cls, cluster, members): + """Performs group assignment given cluster metadata and member subscriptions + + Arguments: + cluster (ClusterMetadata): cluster metadata + members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. + + Returns: + dict: {member_id: MemberAssignment} + """ + members_metadata = {} + for consumer, member_metadata in six.iteritems(members): + members_metadata[consumer] = cls.parse_member_metadata(member_metadata) + + executor = StickyAssignmentExecutor(cluster, members_metadata) + executor.perform_initial_assignment() + executor.balance() + + cls._latest_partition_movements = executor.partition_movements + + assignment = {} + for member_id in members: + assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, sorted(executor.get_final_assignment(member_id)), b'' + ) + return assignment + + @classmethod + def parse_member_metadata(cls, metadata): + """ + Parses member metadata into a python object. + This implementation only serializes and deserializes the StickyAssignorMemberMetadataV1 user data, + since no StickyAssignor written in Python was deployed ever in the wild with version V0, meaning that + there is no need to support backward compatibility with V0. + + Arguments: + metadata (MemberMetadata): decoded metadata for a member of the group. + + Returns: + parsed metadata (StickyAssignorMemberMetadataV1) + """ + user_data = metadata.user_data + if not user_data: + return StickyAssignorMemberMetadataV1( + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + ) + + try: + decoded_user_data = StickyAssignorUserDataV1.decode(user_data) + except Exception as e: + # ignore the consumer's previous assignment if it cannot be parsed + log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args + return StickyAssignorMemberMetadataV1( + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + ) + + member_partitions = [] + for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member + member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) + return StickyAssignorMemberMetadataV1( + # pylint: disable=no-member + partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription + ) + + @classmethod + def metadata(cls, topics): + if cls.member_assignment is None: + log.debug("No member assignment available") + user_data = b'' + else: + log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) + partitions_by_topic = defaultdict(list) + for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable + partitions_by_topic[topic_partition.topic].append(topic_partition.partition) + data = StickyAssignorUserDataV1(six.iteritems(partitions_by_topic), cls.generation) + user_data = data.encode() + return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) + + @classmethod + def on_assignment(cls, assignment): + """Callback that runs on each assignment. Updates assignor's state. + + Arguments: + assignment: MemberAssignment + """ + log.debug("On assignment: assignment={}".format(assignment)) + cls.member_assignment = assignment.partitions() + + @classmethod + def on_generation_assignment(cls, generation): + """Callback that runs on each assignment. Updates assignor's generation id. + + Arguments: + generation: generation id + """ + log.debug("On generation assignment: generation={}".format(generation)) + cls.generation = generation diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index fda80aa..971f5e8 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -11,6 +11,7 @@ from kafka.vendor import six from kafka.coordinator.base import BaseCoordinator, Generation from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol import kafka.errors as Errors from kafka.future import Future @@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, - 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor), 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -234,6 +235,8 @@ class ConsumerCoordinator(BaseCoordinator): # give the assignor a chance to update internal state # based on the received assignment assignor.on_assignment(assignment) + if assignor.name == 'sticky': + assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval |