diff options
-rw-r--r-- | kafka/client.py | 19 | ||||
-rw-r--r-- | test/test_client.py | 2 |
2 files changed, 7 insertions, 14 deletions
diff --git a/kafka/client.py b/kafka/client.py index 39c89ba..65914a4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -45,23 +45,16 @@ class KafkaClient(object): def _get_conn(self, host, port): "Get or create a connection to a broker using host and port" - host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port) + self.conns[host_key] = KafkaConnection( + host, + port, + timeout=self.timeout + ) return self.conns[host_key] - def _get_conn_for_broker(self, broker): - """ - Get or create a connection to a broker - """ - if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, timeout=self.timeout) - - return self._get_conn(broker.host, broker.port) - def _get_leader_for_partition(self, topic, partition): """ Returns the leader for a partition or None if the partition exists @@ -151,7 +144,7 @@ class KafkaClient(object): # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self._get_conn_for_broker(broker) + conn = self._get_conn(broker.host, broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) diff --git a/test/test_client.py b/test/test_client.py index 218586a..9520d48 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -8,7 +8,7 @@ from mock import MagicMock, patch from kafka import KafkaClient from kafka.common import ( ProduceRequest, BrokerMetadata, PartitionMetadata, - TopicAndPartition, KafkaUnavailableError, + TopicAndPartition, KafkaUnavailableError, LeaderUnavailableError, PartitionUnavailableError ) from kafka.protocol import ( |