summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py55
1 files changed, 22 insertions, 33 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 5dee7b7..d0e07d0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,15 +1,16 @@
import copy
import logging
+import collections
+
+import kafka.common
-from collections import defaultdict
from functools import partial
from itertools import count
-
-from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
+from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
- BrokerResponseError, PartitionUnavailableError,
- LeaderUnavailableError,
- KafkaUnavailableError)
+ PartitionUnavailableError,
+ LeaderUnavailableError, KafkaUnavailableError,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError)
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
@@ -39,29 +40,23 @@ class KafkaClient(object):
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata
+
##################
# Private API #
##################
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
-
host_key = (host, port)
if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout)
+ self.conns[host_key] = KafkaConnection(
+ host,
+ port,
+ timeout=self.timeout
+ )
return self.conns[host_key]
- def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, timeout=self.timeout)
-
- return self._get_conn(broker.host, broker.port)
-
def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
@@ -99,10 +94,9 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception, e:
+ except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
- continue
raise KafkaUnavailableError("All servers failed to process request")
@@ -130,7 +124,7 @@ class KafkaClient(object):
# Group the requests by topic+partition
original_keys = []
- payloads_by_broker = defaultdict(list)
+ payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
@@ -151,7 +145,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn_for_broker(broker)
+ conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -164,11 +158,11 @@ class KafkaClient(object):
continue
try:
response = conn.recv(requestId)
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
- except ConnectionError, e:
+ except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
@@ -191,16 +185,11 @@ class KafkaClient(object):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
- if resp.error == ErrorMapping.NO_ERROR:
- return
-
- if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
- ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ try:
+ kafka.common.check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)
-
- raise BrokerResponseError(
- "Request for %s failed with errorcode=%d (%s)" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
+ raise
#################
# Public API #