summaryrefslogtreecommitdiff
path: root/kafka/coordinator/assignors/abstract.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/assignors/abstract.py')
-rw-r--r--kafka/coordinator/assignors/abstract.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py
new file mode 100644
index 0000000..773280a
--- /dev/null
+++ b/kafka/coordinator/assignors/abstract.py
@@ -0,0 +1,54 @@
+import abc
+import logging
+
+log = logging.getLogger(__name__)
+
+
+class AbstractPartitionAssignor(object):
+ """
+ Abstract assignor implementation which does some common grunt work (in particular collecting
+ partition counts which are always needed in assignors).
+ """
+
+ @abc.abstractproperty
+ def name(self):
+ """.name should be a string identifying the assignor"""
+ pass
+
+ @abc.abstractmethod
+ def assign(self, cluster, members):
+ """Perform group assignment given cluster metadata and member subscriptions
+
+ Arguments:
+ cluster (ClusterMetadata): metadata for use in assignment
+ members (dict of {member_id: MemberMetadata}): decoded metadata for
+ each member in the group.
+
+ Returns:
+ dict: {member_id: MemberAssignment}
+ """
+ pass
+
+ @abc.abstractmethod
+ def metadata(self, topics):
+ """Generate ProtocolMetadata to be submitted via JoinGroupRequest.
+
+ Arguments:
+ topics (set): a member's subscribed topics
+
+ Returns:
+ MemberMetadata struct
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_assignment(self, assignment):
+ """Callback that runs on each assignment.
+
+ This method can be used to update internal state, if any, of the
+ partition assignor.
+
+ Arguments:
+ assignment (MemberAssignment): the member's assignment
+ """
+ pass