diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 55 |
1 files changed, 22 insertions, 33 deletions
diff --git a/kafka/client.py b/kafka/client.py index 5dee7b7..d0e07d0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,15 +1,16 @@ import copy import logging +import collections + +import kafka.common -from collections import defaultdict from functools import partial from itertools import count - -from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, +from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, - BrokerResponseError, PartitionUnavailableError, - LeaderUnavailableError, - KafkaUnavailableError) + PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -39,29 +40,23 @@ class KafkaClient(object): self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata + ################## # Private API # ################## def _get_conn(self, host, port): "Get or create a connection to a broker using host and port" - host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port, timeout=self.timeout) + self.conns[host_key] = KafkaConnection( + host, + port, + timeout=self.timeout + ) return self.conns[host_key] - def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, timeout=self.timeout) - - return self._get_conn(broker.host, broker.port) - def _get_leader_for_partition(self, topic, partition): """ Returns the leader for a partition or None if the partition exists @@ -99,10 +94,9 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception, e: + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " "trying next server: %s" % (request, host, port, e)) - continue raise KafkaUnavailableError("All servers failed to process request") @@ -130,7 +124,7 @@ class KafkaClient(object): # Group the requests by topic+partition original_keys = [] - payloads_by_broker = defaultdict(list) + payloads_by_broker = collections.defaultdict(list) for payload in payloads: leader = self._get_leader_for_partition(payload.topic, @@ -151,7 +145,7 @@ class KafkaClient(object): # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn_for_broker(broker) + conn = self._get_conn(broker.host, broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -164,11 +158,11 @@ class KafkaClient(object): continue try: response = conn.recv(requestId) - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", request, conn, e) failed = True @@ -191,16 +185,11 @@ class KafkaClient(object): return '<KafkaClient client_id=%s>' % (self.client_id) def _raise_on_response_error(self, resp): - if resp.error == ErrorMapping.NO_ERROR: - return - - if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, - ErrorMapping.NOT_LEADER_FOR_PARTITION): + try: + kafka.common.check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e: self.reset_topic_metadata(resp.topic) - - raise BrokerResponseError( - "Request for %s failed with errorcode=%d (%s)" % - (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) + raise ################# # Public API # |