From 0f78d57c604e864fab51f7cfb8fa69c9c4e623c7 Mon Sep 17 00:00:00 2001 From: Tim Evens Date: Wed, 30 Mar 2016 15:32:05 -0700 Subject: Kafka IPv6 Support. IPv6 address without port can be defined as the IPv6 address. If the address is a hostname or if a port is included, then the address MUST be wrapped in brackets [] (E.g. [somehost]:1234 or [fd00:1001::2]:1234). --- kafka/client.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'kafka/client.py') diff --git a/kafka/client.py b/kafka/client.py index 11f54eb..99d6fec 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -16,7 +16,7 @@ from kafka.common import (TopicPartition, BrokerMetadata, UnknownError, from kafka.conn import ( collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, - ConnectionStates) + ConnectionStates, get_ip_port_afi) from kafka.protocol import KafkaProtocol # New KafkaClient @@ -56,12 +56,12 @@ class SimpleClient(object): # Private API # ################## - def _get_conn(self, host, port): + def _get_conn(self, host, port, afi): """Get or create a connection to a broker using host and port""" host_key = (host, port) if host_key not in self._conns: self._conns[host_key] = BrokerConnection( - host, port, + host, port, afi, request_timeout_ms=self.timeout * 1000, client_id=self.client_id ) @@ -139,13 +139,17 @@ class SimpleClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - hosts = set([(broker.host, broker.port) for broker in self.brokers.values()]) + hosts = set() + for broker in self.brokers.values(): + host, port, afi = get_ip_port_afi(broker.host) + hosts.add((host, broker.port, afi)) + hosts.update(self.hosts) hosts = list(hosts) random.shuffle(hosts) - for (host, port) in hosts: - conn = self._get_conn(host, port) + for (host, port, afi) in hosts: + conn = self._get_conn(host, port, afi) if not conn.connected(): log.warning("Skipping unconnected connection: %s", conn) continue @@ -227,7 +231,9 @@ class SimpleClient(object): failed_payloads(broker_payloads) continue - conn = self._get_conn(broker.host, broker.port) + + host, port, afi = get_ip_port_afi(broker.host) + conn = self._get_conn(host, broker.port, afi) conn.connect() if not conn.connected(): refresh_metadata = True @@ -323,7 +329,8 @@ class SimpleClient(object): # Send the request, recv the response try: - conn = self._get_conn(broker.host, broker.port) + host, port, afi = get_ip_port_afi(broker.host) + conn = self._get_conn(host, broker.port, afi) conn.send(requestId, request) except ConnectionError as e: -- cgit v1.2.1