summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2017-06-23 17:37:25 -0700
committerJoffrey F <joffrey@docker.com>2017-08-16 15:07:24 -0700
commit942e778f302199ff6ec677860aa28d07f69e11da (patch)
treeae9234b4d93bf26c2ff6bd5bd239b11667b4fde8
parente9fab1432b974ceaa888b371e382dfcf2f6556e4 (diff)
downloaddocker-py-942e778f302199ff6ec677860aa28d07f69e11da.tar.gz
Fix frames_iter buffering/streaming logicfix_frames_iter
Use single method to process logs/attach output Signed-off-by: Joffrey F <joffrey@docker.com>
-rw-r--r--docker/api/client.py48
-rw-r--r--docker/utils/socket.py11
2 files changed, 11 insertions, 48 deletions
diff --git a/docker/api/client.py b/docker/api/client.py
index 65b5baa..a2fd80a 100644
--- a/docker/api/client.py
+++ b/docker/api/client.py
@@ -1,5 +1,4 @@
import json
-import struct
import warnings
from functools import partial
@@ -22,7 +21,7 @@ from .volume import VolumeApiMixin
from .. import auth
from ..constants import (
DEFAULT_TIMEOUT_SECONDS, DEFAULT_USER_AGENT, IS_WINDOWS_PLATFORM,
- DEFAULT_DOCKER_API_VERSION, STREAM_HEADER_SIZE_BYTES, DEFAULT_NUM_POOLS,
+ DEFAULT_DOCKER_API_VERSION, DEFAULT_NUM_POOLS,
MINIMUM_DOCKER_API_VERSION
)
from ..errors import (
@@ -310,43 +309,6 @@ class APIClient(
# encountered an error immediately
yield self._result(response, json=decode)
- def _multiplexed_buffer_helper(self, response):
- """A generator of multiplexed data blocks read from a buffered
- response."""
- buf = self._result(response, binary=True)
- buf_length = len(buf)
- walker = 0
- while True:
- if buf_length - walker < STREAM_HEADER_SIZE_BYTES:
- break
- header = buf[walker:walker + STREAM_HEADER_SIZE_BYTES]
- _, length = struct.unpack_from('>BxxxL', header)
- start = walker + STREAM_HEADER_SIZE_BYTES
- end = start + length
- walker = end
- yield buf[start:end]
-
- def _multiplexed_response_stream_helper(self, response):
- """A generator of multiplexed data blocks coming from a response
- stream."""
-
- # Disable timeout on the underlying socket to prevent
- # Read timed out(s) for long running processes
- socket = self._get_raw_response_socket(response)
- self._disable_socket_timeout(socket)
-
- while True:
- header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
- if not header:
- break
- _, length = struct.unpack('>BxxxL', header)
- if not length:
- continue
- data = response.raw.read(length)
- if not data:
- break
- yield data
-
def _stream_raw_result_old(self, response):
''' Stream raw output for API versions below 1.6 '''
self._raise_for_status(response)
@@ -415,13 +377,7 @@ class APIClient(
self._result(res, binary=True)
self._raise_for_status(res)
- sep = six.binary_type()
- if stream:
- return self._multiplexed_response_stream_helper(res)
- else:
- return sep.join(
- [x for x in self._multiplexed_buffer_helper(res)]
- )
+ return self._read_from_socket(res, stream)
def _unmount(self, *args):
for proto in args:
diff --git a/docker/utils/socket.py b/docker/utils/socket.py
index 4080f25..7542bc3 100644
--- a/docker/utils/socket.py
+++ b/docker/utils/socket.py
@@ -5,6 +5,8 @@ import struct
import six
+from ..constants import STREAM_HEADER_SIZE_BYTES
+
try:
from ..transport import NpipeSocket
except ImportError:
@@ -57,7 +59,7 @@ def next_frame_size(socket):
https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
"""
try:
- data = read_exactly(socket, 8)
+ data = read_exactly(socket, STREAM_HEADER_SIZE_BYTES)
except SocketError:
return 0
@@ -75,5 +77,10 @@ def frames_iter(socket):
break
while n > 0:
result = read(socket, n)
- n -= len(result)
+ data_length = len(result)
+ if data_length == 0:
+ # We have reached EOF
+ return
+
+ n -= data_length
yield result