summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py19
-rw-r--r--test/test_client.py2
2 files changed, 7 insertions, 14 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 39c89ba..65914a4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -45,23 +45,16 @@ class KafkaClient(object):
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
-
host_key = (host, port)
if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(host, port)
+ self.conns[host_key] = KafkaConnection(
+ host,
+ port,
+ timeout=self.timeout
+ )
return self.conns[host_key]
- def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, timeout=self.timeout)
-
- return self._get_conn(broker.host, broker.port)
-
def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
@@ -151,7 +144,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn_for_broker(broker)
+ conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
diff --git a/test/test_client.py b/test/test_client.py
index 218586a..9520d48 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -8,7 +8,7 @@ from mock import MagicMock, patch
from kafka import KafkaClient
from kafka.common import (
ProduceRequest, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError,
+ TopicAndPartition, KafkaUnavailableError,
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.protocol import (