summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/cluster.py19
-rw-r--r--kafka/structs.py2
3 files changed, 15 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 891ae03..8a34cc4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -137,7 +137,7 @@ class SimpleClient(object):
kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
- return BrokerMetadata(resp.nodeId, resp.host, resp.port)
+ return BrokerMetadata(resp.nodeId, resp.host, resp.port, None)
def _next_id(self):
"""Generate a new correlation id"""
@@ -525,7 +525,7 @@ class SimpleClient(object):
log.debug('Updating broker metadata: %s', resp.brokers)
log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
- self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port))
+ self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None))
for nodeId, host, port in resp.brokers])
for error, topic, partitions in resp.topics:
@@ -577,7 +577,7 @@ class SimpleClient(object):
# (not sure how this could happen. server could be in bad state)
else:
self.topics_to_brokers[topic_part] = BrokerMetadata(
- leader, None, None
+ leader, None, None, None
)
def send_metadata_request(self, payloads=[], fail_on_error=True,
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..c3b8f3c 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -189,7 +189,7 @@ class ClusterMetadata(object):
for node_id, host, port in metadata.brokers:
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, None)
})
_new_partitions = {}
@@ -272,7 +272,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +282,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id
diff --git a/kafka/structs.py b/kafka/structs.py
index 5902930..3188516 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -58,7 +58,7 @@ TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
BrokerMetadata = namedtuple("BrokerMetadata",
- ["nodeId", "host", "port"])
+ ["nodeId", "host", "port", "rack"])
PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])