diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/cluster.py | 19 | ||||
-rw-r--r-- | kafka/structs.py | 2 |
3 files changed, 15 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py index 891ae03..8a34cc4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -137,7 +137,7 @@ class SimpleClient(object): kafka.errors.check_error(resp) # Otherwise return the BrokerMetadata - return BrokerMetadata(resp.nodeId, resp.host, resp.port) + return BrokerMetadata(resp.nodeId, resp.host, resp.port, None) def _next_id(self): """Generate a new correlation id""" @@ -525,7 +525,7 @@ class SimpleClient(object): log.debug('Updating broker metadata: %s', resp.brokers) log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics]) - self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port)) + self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None)) for nodeId, host, port in resp.brokers]) for error, topic, partitions in resp.topics: @@ -577,7 +577,7 @@ class SimpleClient(object): # (not sure how this could happen. server could be in bad state) else: self.topics_to_brokers[topic_part] = BrokerMetadata( - leader, None, None + leader, None, None, None ) def send_metadata_request(self, payloads=[], fail_on_error=True, diff --git a/kafka/cluster.py b/kafka/cluster.py index 9aabec1..c3b8f3c 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -189,7 +189,7 @@ class ClusterMetadata(object): for node_id, host, port in metadata.brokers: self._brokers.update({ - node_id: BrokerMetadata(node_id, host, port) + node_id: BrokerMetadata(node_id, host, port, None) }) _new_partitions = {} @@ -272,7 +272,8 @@ class ClusterMetadata(object): coordinator = BrokerMetadata( response.coordinator_id, response.host, - response.port) + response.port, + None) # Assume that group coordinators are just brokers # (this is true now, but could diverge in future) @@ -281,12 +282,14 @@ class ClusterMetadata(object): # If this happens, either brokers have moved without # changing IDs, or our assumption above is wrong - elif coordinator != self._brokers[node_id]: - log.error("GroupCoordinator metadata conflicts with existing" - " broker metadata. Coordinator: %s, Broker: %s", - coordinator, self._brokers[node_id]) - self._groups[group] = node_id - return False + else: + node = self._brokers[node_id] + if coordinator.host != node.host or coordinator.port != node.port: + log.error("GroupCoordinator metadata conflicts with existing" + " broker metadata. Coordinator: %s, Broker: %s", + coordinator, node) + self._groups[group] = node_id + return False log.info("Group coordinator for %s is %s", group, coordinator) self._groups[group] = node_id diff --git a/kafka/structs.py b/kafka/structs.py index 5902930..3188516 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -58,7 +58,7 @@ TopicPartition = namedtuple("TopicPartition", ["topic", "partition"]) BrokerMetadata = namedtuple("BrokerMetadata", - ["nodeId", "host", "port"]) + ["nodeId", "host", "port", "rack"]) PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "error"]) |