diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-03-06 15:16:05 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-06 15:16:05 -0800 |
commit | ab2f4ff984187e4c930a5ae1b7d8f1aff677991b (patch) | |
tree | ef652fdf588fa803d00c393af8aef6d7d7b3bc05 | |
parent | 9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (diff) | |
download | kafka-python-ab2f4ff984187e4c930a5ae1b7d8f1aff677991b.tar.gz |
Small cleanup for #962
-rw-r--r-- | kafka/conn.py | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2f28ed7..d9e4c72 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -761,7 +761,9 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id - def _check_version_above_0_10(self, response): + def _check_api_version_response(self, response): + # The logic here is to check the list of supported request versions + # in descending order. As soon as we find one that works, return it test_cases = [ # format (<broker verion>, <needed struct>) ((0, 10, 1), MetadataRequest[2]) @@ -774,9 +776,12 @@ class BrokerConnection(object): for api_key, _, max_version in response.api_versions ]) # Get the best match of test cases - for broker_version, struct in test_cases: + for broker_version, struct in sorted(test_cases, reverse=True): if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION: return broker_version + + # We know that ApiVersionResponse is only supported in 0.10+ + # so if all else fails, choose that return (0, 10, 0) def check_version(self, timeout=2, strict=False): @@ -857,10 +862,10 @@ class BrokerConnection(object): self._sock.setblocking(False) if f.succeeded(): - if version == (0, 10): + if isinstance(request, ApiVersionRequest[0]): # Starting from 0.10 kafka broker we determine version # by looking at ApiVersionResponse - version = self._check_version_above_0_10(f.value) + version = self._check_api_version_response(f.value) log.info('Broker version identifed as %s', '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) |