summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-06 15:16:05 -0800
committerDana Powers <dana.powers@gmail.com>2017-03-06 15:16:05 -0800
commitab2f4ff984187e4c930a5ae1b7d8f1aff677991b (patch)
treeef652fdf588fa803d00c393af8aef6d7d7b3bc05
parent9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (diff)
downloadkafka-python-ab2f4ff984187e4c930a5ae1b7d8f1aff677991b.tar.gz
Small cleanup for #962
-rw-r--r--kafka/conn.py13
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2f28ed7..d9e4c72 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -761,7 +761,9 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
- def _check_version_above_0_10(self, response):
+ def _check_api_version_response(self, response):
+ # The logic here is to check the list of supported request versions
+ # in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 10, 1), MetadataRequest[2])
@@ -774,9 +776,12 @@ class BrokerConnection(object):
for api_key, _, max_version in response.api_versions
])
# Get the best match of test cases
- for broker_version, struct in test_cases:
+ for broker_version, struct in sorted(test_cases, reverse=True):
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version
+
+ # We know that ApiVersionResponse is only supported in 0.10+
+ # so if all else fails, choose that
return (0, 10, 0)
def check_version(self, timeout=2, strict=False):
@@ -857,10 +862,10 @@ class BrokerConnection(object):
self._sock.setblocking(False)
if f.succeeded():
- if version == (0, 10):
+ if isinstance(request, ApiVersionRequest[0]):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
- version = self._check_version_above_0_10(f.value)
+ version = self._check_api_version_response(f.value)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)