summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorTim Evens <tievens@cisco.com>2016-03-30 15:32:05 -0700
committerTim Evens <tievens@cisco.com>2016-03-30 15:32:25 -0700
commit0f78d57c604e864fab51f7cfb8fa69c9c4e623c7 (patch)
tree649c0953a56f39719761751901da5da11b9589f0 /kafka
parentc6c862ad29ec5d0ae61d635c2020fb925b405c44 (diff)
downloadkafka-python-0f78d57c604e864fab51f7cfb8fa69c9c4e623c7.tar.gz
Kafka IPv6 Support.
IPv6 address without port can be defined as the IPv6 address. If the address is a hostname or if a port is included, then the address MUST be wrapped in brackets [] (E.g. [somehost]:1234 or [fd00:1001::2]:1234).
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py23
-rw-r--r--kafka/client_async.py10
-rw-r--r--kafka/conn.py48
3 files changed, 63 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 11f54eb..99d6fec 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -16,7 +16,7 @@ from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
from kafka.conn import (
collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
- ConnectionStates)
+ ConnectionStates, get_ip_port_afi)
from kafka.protocol import KafkaProtocol
# New KafkaClient
@@ -56,12 +56,12 @@ class SimpleClient(object):
# Private API #
##################
- def _get_conn(self, host, port):
+ def _get_conn(self, host, port, afi):
"""Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self._conns:
self._conns[host_key] = BrokerConnection(
- host, port,
+ host, port, afi,
request_timeout_ms=self.timeout * 1000,
client_id=self.client_id
)
@@ -139,13 +139,17 @@ class SimpleClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- hosts = set([(broker.host, broker.port) for broker in self.brokers.values()])
+ hosts = set()
+ for broker in self.brokers.values():
+ host, port, afi = get_ip_port_afi(broker.host)
+ hosts.add((host, broker.port, afi))
+
hosts.update(self.hosts)
hosts = list(hosts)
random.shuffle(hosts)
- for (host, port) in hosts:
- conn = self._get_conn(host, port)
+ for (host, port, afi) in hosts:
+ conn = self._get_conn(host, port, afi)
if not conn.connected():
log.warning("Skipping unconnected connection: %s", conn)
continue
@@ -227,7 +231,9 @@ class SimpleClient(object):
failed_payloads(broker_payloads)
continue
- conn = self._get_conn(broker.host, broker.port)
+
+ host, port, afi = get_ip_port_afi(broker.host)
+ conn = self._get_conn(host, broker.port, afi)
conn.connect()
if not conn.connected():
refresh_metadata = True
@@ -323,7 +329,8 @@ class SimpleClient(object):
# Send the request, recv the response
try:
- conn = self._get_conn(broker.host, broker.port)
+ host, port, afi = get_ip_port_afi(broker.host)
+ conn = self._get_conn(host, broker.port, afi)
conn.send(requestId, request)
except ConnectionError as e:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ae9dbb4..5a1d624 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -14,7 +14,7 @@ import six
import kafka.common as Errors # TODO: make Errors a separate class
from .cluster import ClusterMetadata
-from .conn import BrokerConnection, ConnectionStates, collect_hosts
+from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
@@ -115,9 +115,9 @@ class KafkaClient(object):
self._last_bootstrap = time.time()
metadata_request = MetadataRequest([])
- for host, port in hosts:
+ for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(host, port, **self.config)
+ bootstrap = BrokerConnection(host, port, afi, **self.config)
bootstrap.connect()
while bootstrap.state is ConnectionStates.CONNECTING:
bootstrap.connect()
@@ -160,7 +160,9 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
- self._conns[node_id] = BrokerConnection(broker.host, broker.port,
+
+ host, port, afi = get_ip_port_afi(broker.host)
+ self._conns[node_id] = BrokerConnection(host, broker.port, afi,
**self.config)
return self._finish_connect(node_id)
diff --git a/kafka/conn.py b/kafka/conn.py
index 65451f9..f7a85dc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -52,9 +52,10 @@ class BrokerConnection(object):
'api_version': (0, 8, 2), # default to most restrictive
}
- def __init__(self, host, port, **configs):
+ def __init__(self, host, port, afi, **configs):
self.host = host
self.port = port
+ self.afi = afi
self.in_flight_requests = collections.deque()
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -76,7 +77,7 @@ class BrokerConnection(object):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
self.close()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
if self.config['receive_buffer_bytes'] is not None:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
self.config['receive_buffer_bytes'])
@@ -356,6 +357,38 @@ class BrokerConnection(object):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
+def get_ip_port_afi(host_and_port_str):
+ """
+ Parse the IP and port from a string in the format of:
+
+ * host_or_ip <- Can be either IPv4 or IPv6 address or hostname/fqdn
+ * host_or_ip:port <- This is only for IPv4
+ * [host_or_ip]:port. <- This is only for IPv6
+
+ .. note:: If the port is not specified, default will be returned.
+
+ :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6
+ """
+ afi = socket.AF_INET
+
+ if host_and_port_str.strip()[0] == '[':
+ afi = socket.AF_INET6
+ res = host_and_port_str.split("]:")
+ res[0] = res[0].translate(None, "[]")
+
+ elif host_and_port_str.count(":") > 1:
+ afi = socket.AF_INET6
+ res = [host_and_port_str]
+
+ else:
+ res = host_and_port_str.split(':')
+
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+
+ return host.strip(), port, afi
+
+
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
@@ -366,12 +399,15 @@ def collect_hosts(hosts, randomize=True):
hosts = hosts.strip().split(',')
result = []
+ afi = socket.AF_INET
for host_port in hosts:
- res = host_port.split(':')
- host = res[0]
- port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
- result.append((host.strip(), port))
+ host, port, afi = get_ip_port_afi(host_port)
+
+ if port < 0:
+ port = DEFAULT_KAFKA_PORT
+
+ result.append((host, port, afi))
if randomize:
shuffle(result)