diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2013-12-18 17:51:22 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | 8c8ca5fa573c21e0f03c892154ba42e187153600 (patch) | |
tree | 5939b4442eb8e72c76f3f48bf9f874d8eb2a0aad /kafka/client.py | |
parent | 0f2b08d80217fb82860c51e05e819012f6acb521 (diff) | |
download | kafka-python-8c8ca5fa573c21e0f03c892154ba42e187153600.tar.gz |
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn, since they're not actually enforced
Notes:
This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.
Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9659364..bd3a214 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,13 +19,12 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): + def __init__(self, host, port, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) + (host, port): KafkaConnection(host, port, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -42,7 +41,7 @@ class KafkaClient(object): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self.conns[(broker.host, broker.port)] |