summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-02-02 16:36:30 -0800
committerGitHub <noreply@github.com>2018-02-02 16:36:30 -0800
commit618c5051493693c1305aa9f08e8a0583d5fcf0e3 (patch)
tree3a2fcec8260915a83f19a603671c4a0e5461cca0 /kafka/coordinator
parent08a7fb7b754a754c6c64e96d4ba5c4f56cf38a5f (diff)
downloadkafka-python-618c5051493693c1305aa9f08e8a0583d5fcf0e3.tar.gz
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/base.py24
-rw-r--r--kafka/coordinator/consumer.py102
2 files changed, 76 insertions, 50 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 301c06d..820fc1f 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -344,23 +344,25 @@ class BaseCoordinator(object):
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
with self._lock:
- if not self.need_rejoin():
- return
-
- # call on_join_prepare if needed. We set a flag to make sure that
- # we do not call it a second time if the client is woken up before
- # a pending rebalance completes.
- if not self.rejoining:
- self._on_join_prepare(self._generation.generation_id,
- self._generation.member_id)
- self.rejoining = True
-
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
while self.need_rejoin():
self.ensure_coordinator_ready()
+ # call on_join_prepare if needed. We set a flag
+ # to make sure that we do not call it a second
+ # time if the client is woken up before a pending
+ # rebalance completes. This must be called on each
+ # iteration of the loop because an event requiring
+ # a rebalance (such as a metadata refresh which
+ # changes the matched subscription set) can occur
+ # while another rebalance is still in progress.
+ if not self.rejoining:
+ self._on_join_prepare(self._generation.generation_id,
+ self._generation.member_id)
+ self.rejoining = True
+
# ensure that there are no pending requests to the coordinator.
# This is important in particular to avoid resending a pending
# JoinGroup request.
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ab30883..9438a7e 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -84,6 +84,8 @@ class ConsumerCoordinator(BaseCoordinator):
self.config[key] = configs[key]
self._subscription = subscription
+ self._is_leader = False
+ self._joined_subscription = set()
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
@@ -132,11 +134,22 @@ class ConsumerCoordinator(BaseCoordinator):
def group_protocols(self):
"""Returns list of preferred (protocols, metadata)"""
- topics = self._subscription.subscription
- assert topics is not None, 'Consumer has not subscribed to topics'
+ if self._subscription.subscription is None:
+ raise Errors.IllegalStateError('Consumer has not subscribed to topics')
+ # dpkp note: I really dislike this.
+ # why? because we are using this strange method group_protocols,
+ # which is seemingly innocuous, to set internal state (_joined_subscription)
+ # that is later used to check whether metadata has changed since we joined a group
+ # but there is no guarantee that this method, group_protocols, will get called
+ # in the correct sequence or that it will only be called when we want it to be.
+ # So this really should be moved elsewhere, but I don't have the energy to
+ # work that out right now. If you read this at some later date after the mutable
+ # state has bitten you... I'm sorry! It mimics the java client, and that's the
+ # best I've got for now.
+ self._joined_subscription = set(self._subscription.subscription)
metadata_list = []
for assignor in self.config['assignors']:
- metadata = assignor.metadata(topics)
+ metadata = assignor.metadata(self._joined_subscription)
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
return metadata_list
@@ -158,21 +171,29 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger
# a rebalance
- if self._subscription_metadata_changed(cluster):
-
- if (self.config['api_version'] >= (0, 9)
- and self.config['group_id'] is not None):
-
- self._subscription.mark_for_reassignment()
-
- # If we haven't got group coordinator support,
- # just assign all partitions locally
- else:
- self._subscription.assign_from_subscribed([
- TopicPartition(topic, partition)
- for topic in self._subscription.subscription
- for partition in self._metadata_snapshot[topic]
- ])
+ if self._subscription.partitions_auto_assigned():
+ metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
+ if self._metadata_snapshot != metadata_snapshot:
+ self._metadata_snapshot = metadata_snapshot
+
+ # If we haven't got group coordinator support,
+ # just assign all partitions locally
+ if self._auto_assign_all_partitions():
+ self._subscription.assign_from_subscribed([
+ TopicPartition(topic, partition)
+ for topic in self._subscription.subscription
+ for partition in self._metadata_snapshot[topic]
+ ])
+
+ def _auto_assign_all_partitions(self):
+ # For users that use "subscribe" without group support,
+ # we will simply assign all partitions to this consumer
+ if self.config['api_version'] < (0, 9):
+ return True
+ elif self.config['group_id'] is None:
+ return True
+ else:
+ return False
def _build_metadata_snapshot(self, subscription, cluster):
metadata_snapshot = {}
@@ -181,16 +202,6 @@ class ConsumerCoordinator(BaseCoordinator):
metadata_snapshot[topic] = set(partitions)
return metadata_snapshot
- def _subscription_metadata_changed(self, cluster):
- if not self._subscription.partitions_auto_assigned():
- return False
-
- metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
- if self._metadata_snapshot != metadata_snapshot:
- self._metadata_snapshot = metadata_snapshot
- return True
- return False
-
def _lookup_assignor(self, name):
for assignor in self.config['assignors']:
if assignor.name == name:
@@ -199,12 +210,10 @@ class ConsumerCoordinator(BaseCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
- # if we were the assignor, then we need to make sure that there have
- # been no metadata updates since the rebalance begin. Otherwise, we
- # won't rebalance again until the next metadata change
- if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
- self._subscription.mark_for_reassignment()
- return
+ # only the leader is responsible for monitoring for metadata changes
+ # (i.e. partition changes)
+ if not self._is_leader:
+ self._assignment_snapshot = None
assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
@@ -307,6 +316,7 @@ class ConsumerCoordinator(BaseCoordinator):
# keep track of the metadata used for assignment so that we can check
# after rebalance completion whether anything has changed
self._cluster.request_update()
+ self._is_leader = True
self._assignment_snapshot = self._metadata_snapshot
log.debug("Performing assignment for group %s using strategy %s"
@@ -338,8 +348,8 @@ class ConsumerCoordinator(BaseCoordinator):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)
- self._assignment_snapshot = None
- self._subscription.mark_for_reassignment()
+ self._is_leader = False
+ self._subscription.reset_group_subscription()
def need_rejoin(self):
"""Check whether the group should be rejoined
@@ -347,9 +357,23 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
bool: True if consumer should rejoin group, False otherwise
"""
- return (self._subscription.partitions_auto_assigned() and
- (super(ConsumerCoordinator, self).need_rejoin() or
- self._subscription.needs_partition_assignment))
+ if not self._subscription.partitions_auto_assigned():
+ return False
+
+ if self._auto_assign_all_partitions():
+ return False
+
+ # we need to rejoin if we performed the assignment and metadata has changed
+ if (self._assignment_snapshot is not None
+ and self._assignment_snapshot != self._metadata_snapshot):
+ return True
+
+ # we need to join if our subscription has changed since the last join
+ if (self._joined_subscription is not None
+ and self._joined_subscription != self._subscription.subscription):
+ return True
+
+ return super(ConsumerCoordinator, self).need_rejoin()
def refresh_committed_offsets_if_needed(self):
"""Fetch committed offsets for assigned partitions."""