summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-18 17:51:22 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commit8c8ca5fa573c21e0f03c892154ba42e187153600 (patch)
tree5939b4442eb8e72c76f3f48bf9f874d8eb2a0aad /kafka/client.py
parent0f2b08d80217fb82860c51e05e819012f6acb521 (diff)
downloadkafka-python-8c8ca5fa573c21e0f03c892154ba42e187153600.tar.gz
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented.
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9659364..bd3a214 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -19,13 +19,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10):
+ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize, timeout=timeout)
+ (host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -42,7 +41,7 @@ class KafkaClient(object):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout)
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]