diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-03-21 23:15:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-03-21 23:15:28 -0700 |
commit | 9bed11db98387c0d9e456528130b330631dc50af (patch) | |
tree | 6c054286a388df1647e3f81a4a85130742ba83ca /kafka/conn.py | |
parent | e937e3f971f5958c8da6249b08288aafd5ed5bcd (diff) | |
parent | a6fc260f288ac639070783a0f6faa94bd7612c67 (diff) | |
download | kafka-python-9bed11db98387c0d9e456528130b330631dc50af.tar.gz |
Merge pull request #134 from wizzat/conn_refactorv0.9.0
conn.py performance improvements, make examples work, add another example
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 7538e8d..4fdeb17 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -54,11 +54,10 @@ class KafkaConnection(local): super(KafkaConnection, self).__init__() self.host = host self.port = port - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) self.timeout = timeout - self._sock.settimeout(self.timeout) - self._dirty = False + self._sock = None + + self.reinit() def __repr__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -73,24 +72,28 @@ class KafkaConnection(local): def _read_bytes(self, num_bytes): bytes_left = num_bytes - resp = '' + responses = [] + log.debug("About to read %d bytes from Kafka", num_bytes) if self._dirty: self.reinit() + while bytes_left: try: - data = self._sock.recv(bytes_left) + data = self._sock.recv(min(bytes_left, 4096)) except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() + if data == '': log.error("Not enough data to read this response") self._raise_connection_error() + bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - resp += data + responses.append(data) - return resp + return ''.join(responses) ################## # Public API # |