diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 36 |
1 files changed, 17 insertions, 19 deletions
diff --git a/kafka/client.py b/kafka/client.py index 65914a4..4870ab9 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,15 +1,18 @@ import copy import logging +import collections + +import kafka.common -from collections import defaultdict from functools import partial from itertools import count +from kafka.common import * -from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, +from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, - BrokerResponseError, PartitionUnavailableError, - LeaderUnavailableError, - KafkaUnavailableError) + PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -39,6 +42,7 @@ class KafkaClient(object): self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata + ################## # Private API # ################## @@ -92,10 +96,9 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception, e: + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " "trying next server: %s" % (request, host, port, e)) - continue raise KafkaUnavailableError("All servers failed to process request") @@ -123,7 +126,7 @@ class KafkaClient(object): # Group the requests by topic+partition original_keys = [] - payloads_by_broker = defaultdict(list) + payloads_by_broker = collections.defaultdict(list) for payload in payloads: leader = self._get_leader_for_partition(payload.topic, @@ -157,11 +160,11 @@ class KafkaClient(object): continue try: response = conn.recv(requestId) - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", request, conn, e) failed = True @@ -184,16 +187,11 @@ class KafkaClient(object): return '<KafkaClient client_id=%s>' % (self.client_id) def _raise_on_response_error(self, resp): - if resp.error == ErrorMapping.NO_ERROR: - return - - if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, - ErrorMapping.NOT_LEADER_FOR_PARTITION): + try: + kafka.common.check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e: self.reset_topic_metadata(resp.topic) - - raise BrokerResponseError( - "Request for %s failed with errorcode=%d (%s)" % - (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) + raise ################# # Public API # |