summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py36
1 files changed, 17 insertions, 19 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 65914a4..4870ab9 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,15 +1,18 @@
import copy
import logging
+import collections
+
+import kafka.common
-from collections import defaultdict
from functools import partial
from itertools import count
+from kafka.common import *
-from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
+from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
- BrokerResponseError, PartitionUnavailableError,
- LeaderUnavailableError,
- KafkaUnavailableError)
+ PartitionUnavailableError,
+ LeaderUnavailableError, KafkaUnavailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -39,6 +42,7 @@ class KafkaClient(object):
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
+
##################
# Private API #
##################
@@ -92,10 +96,9 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception, e:
+ except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
- continue
raise KafkaUnavailableError("All servers failed to process request")
@@ -123,7 +126,7 @@ class KafkaClient(object):
# Group the requests by topic+partition
original_keys = []
- payloads_by_broker = defaultdict(list)
+ payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
@@ -157,11 +160,11 @@ class KafkaClient(object):
continue
try:
response = conn.recv(requestId)
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
@@ -184,16 +187,11 @@ class KafkaClient(object):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
- if resp.error == ErrorMapping.NO_ERROR:
- return
-
- if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
- ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ try:
+ kafka.common.check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)
-
- raise BrokerResponseError(
- "Request for %s failed with errorcode=%d (%s)" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
+ raise
#################
# Public API #