diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 12 | ||||
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/conn.py | 64 |
3 files changed, 49 insertions, 33 deletions
diff --git a/kafka/client.py b/kafka/client.py index 369dc97..10b1724 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -71,17 +71,7 @@ class SimpleClient(object): ) conn = self._conns[host_key] - conn.connect() - if conn.connected(): - return conn - - timeout = time.time() + self.timeout - while time.time() < timeout and conn.connecting(): - if conn.connect() is ConnectionStates.CONNECTED: - break - else: - time.sleep(0.05) - else: + if not conn.connect_blocking(self.timeout): conn.close() raise ConnectionError("%s:%s (%s)" % (host, port, afi)) return conn diff --git a/kafka/client_async.py b/kafka/client_async.py index 58155b8..857e4b7 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -257,11 +257,7 @@ class KafkaClient(object): state_change_callback=cb, node_id='bootstrap', **self.config) - bootstrap.connect() - while bootstrap.connecting(): - self._selector.select(1) - bootstrap.connect() - if not bootstrap.connected(): + if not bootstrap.connect_blocking(): bootstrap.close() continue future = bootstrap.send(metadata_request) diff --git a/kafka/conn.py b/kafka/conn.py index b0d6029..4bbd744 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -271,18 +271,58 @@ class BrokerConnection(object): self.config['metric_group_prefix'], self.node_id) + def _dns_lookup(self): + self._gai = dns_lookup(self.host, self.port, self.afi) + if not self._gai: + log.error('DNS lookup failed for %s:%i (%s)', + self.host, self.port, self.afi) + return False + return True + def _next_afi_host_port(self): if not self._gai: - self._gai = dns_lookup(self.host, self.port, self.afi) - if not self._gai: - log.error('DNS lookup failed for %s:%i (%s)', - self.host, self.port, self.afi) + if not self._dns_lookup(): return - afi, _, __, ___, sockaddr = self._gai.pop(0) host, port = sockaddr[:2] return (afi, host, port) + def connect_blocking(self, timeout=float('inf')): + if self.connected(): + return True + timeout += time.time() + # First attempt to perform dns lookup + # note that the underlying interface, socket.getaddrinfo, + # has no explicit timeout so we may exceed the user-specified timeout + while time.time() < timeout: + if self._dns_lookup(): + break + else: + return False + + # Loop once over all returned dns entries + selector = None + while self._gai: + while time.time() < timeout: + self.connect() + if self.connected(): + if selector is not None: + selector.close() + return True + elif self.connecting(): + if selector is None: + selector = self.config['selector']() + selector.register(self._sock, selectors.EVENT_WRITE) + selector.select(1) + elif self.disconnected(): + if selector is not None: + selector.close() + selector = None + break + else: + break + return False + def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): @@ -903,19 +943,9 @@ class BrokerConnection(object): ((0, 8, 0), MetadataRequest[0]([])), ] - def connect(): - self.connect() - if self.connected(): - return - timeout_at = time.time() + timeout - while time.time() < timeout_at and self.connecting(): - if self.connect() is ConnectionStates.CONNECTED: - return - time.sleep(0.05) - raise Errors.NodeNotReadyError() - for version, request in test_cases: - connect() + if not self.connect_blocking(timeout): + raise Errors.NodeNotReadyError() f = self.send(request) # HACK: sleeping to wait for socket to send bytes time.sleep(0.1) |