diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-12-08 14:36:36 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-08 14:36:36 -0800 |
commit | 92376cbe8004d2ae6e468a70bc268e420531e72e (patch) | |
tree | a550110f1408182be2ad91f54f39aaba8a9f8710 /kafka/conn.py | |
parent | 2c8748ccfd4feaa16206899599663ff3aac03c6a (diff) | |
download | kafka-python-92376cbe8004d2ae6e468a70bc268e420531e72e.tar.gz |
Refactor dns lookup in BrokerConnection (#1312)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 110 |
1 files changed, 49 insertions, 61 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e20210a..2926e2f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -251,67 +251,42 @@ class BrokerConnection(object): self._sasl_auth_future = None self.last_attempt = 0 self._gai = None - self._gai_index = 0 self._sensors = None if self.config['metrics']: self._sensors = BrokerConnectionMetrics(self.config['metrics'], self.config['metric_group_prefix'], self.node_id) + def _next_afi_host_port(self): + if not self._gai: + self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi) + if not self._gai: + log.error('DNS lookup failed for %s:%i (%s)', + self._init_host, self._init_port, self._init_afi) + return + + afi, _, __, ___, sockaddr = self._gai.pop(0) + host, port = sockaddr[:2] + return (afi, host, port) + def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: - log.debug('%s: creating new socket', self) - # if self.afi is set to AF_UNSPEC, then we need to do a name - # resolution and try all available address families - if self._init_afi == socket.AF_UNSPEC: - if self._gai is None: - # XXX: all DNS functions in Python are blocking. If we really - # want to be non-blocking here, we need to use a 3rd-party - # library like python-adns, or move resolution onto its - # own thread. This will be subject to the default libc - # name resolution timeout (5s on most Linux boxes) - try: - self._gai = socket.getaddrinfo(self._init_host, - self._init_port, - socket.AF_UNSPEC, - socket.SOCK_STREAM) - except socket.gaierror as ex: - log.warning('DNS lookup failed for %s:%d,' - ' exception was %s. Is your' - ' advertised.listeners (called' - ' advertised.host.name before Kafka 9)' - ' correct and resolvable?', - self._init_host, self._init_port, ex) - self._gai = [] - self._gai_index = 0 - else: - # if self._gai already exists, then we should try the next - # name - self._gai_index += 1 - while True: - if self._gai_index >= len(self._gai): - error = 'Unable to connect to any of the names for {0}:{1}'.format( - self._init_host, self._init_port) - log.error(error) - self.close(Errors.ConnectionError(error)) - return - afi, _, __, ___, sockaddr = self._gai[self._gai_index] - if afi not in (socket.AF_INET, socket.AF_INET6): - self._gai_index += 1 - continue - break - self.host, self.port = sockaddr[:2] - self._sock = socket.socket(afi, socket.SOCK_STREAM) + self.last_attempt = time.time() + next_lookup = self._next_afi_host_port() + if not next_lookup: + self.close(Errors.ConnectionError('DNS failure')) + return else: - self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM) + log.debug('%s: creating new socket', self) + self.afi, self.host, self.port = next_lookup + self._sock = socket.socket(self.afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) self._sock.setsockopt(*option) self._sock.setblocking(False) - self.last_attempt = time.time() self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() @@ -328,11 +303,6 @@ class BrokerConnection(object): ret = None try: ret = self._sock.connect_ex((self.host, self.port)) - # if we got here through a host lookup, we've found a host,port,af tuple - # that works save it so we don't do a GAI lookup again - if self._gai is not None: - self.afi = self._sock.family - self._gai = None except socket.error as err: ret = err.errno @@ -645,23 +615,15 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ - if self.state is ConnectionStates.DISCONNECTED: - if error is not None: - if sys.version_info >= (3, 2): - log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True) - else: - log.warning('%s: close() called on disconnected connection with error: %s', self, error) - return - log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) self._update_reconnect_backoff() if self._sock: self._sock.close() self._sock = None self.state = ConnectionStates.DISCONNECTED - self.last_attempt = time.time() self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -1170,3 +1132,29 @@ def collect_hosts(hosts, randomize=True): shuffle(result) return result + + +def is_inet_4_or_6(gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + +def dns_lookup(host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning('DNS lookup failed for %s:%d,' + ' exception was %s. Is your' + ' advertised.listeners (called' + ' advertised.host.name before Kafka 9)' + ' correct and resolvable?', + host, port, ex) + return [] |