diff options
author | Joffrey F <joffrey@docker.com> | 2018-11-30 14:48:19 -0800 |
---|---|---|
committer | Joffrey F <joffrey@docker.com> | 2018-11-30 14:48:19 -0800 |
commit | ee6ec4c6e81ffd2657d5532df6946eed0946fb93 (patch) | |
tree | 65b082047948c76a688ac773b36ebd8392d4c393 | |
parent | 666388168dad15b7c1a24a4f21eaa55b0119e2a5 (diff) | |
parent | 7b3b83dfdbb9f4270dcf54e1449645efc045dfd3 (diff) | |
download | docker-py-ee6ec4c6e81ffd2657d5532df6946eed0946fb93.tar.gz |
Merge branch 'master' of https://github.com/little-dude/docker-py into little-dude-master
-rw-r--r-- | docker/api/client.py | 18 | ||||
-rw-r--r-- | docker/api/container.py | 13 | ||||
-rw-r--r-- | docker/api/exec_api.py | 13 | ||||
-rw-r--r-- | docker/models/containers.py | 70 | ||||
-rw-r--r-- | docker/utils/socket.py | 97 | ||||
-rw-r--r-- | tests/integration/api_container_test.py | 5 | ||||
-rw-r--r-- | tests/integration/api_exec_test.py | 74 | ||||
-rw-r--r-- | tests/unit/api_test.py | 100 | ||||
-rw-r--r-- | tests/unit/models_containers_test.py | 6 |
9 files changed, 340 insertions, 56 deletions
diff --git a/docker/api/client.py b/docker/api/client.py index 197846d..8a5a60b 100644 --- a/docker/api/client.py +++ b/docker/api/client.py @@ -32,7 +32,7 @@ from ..errors import ( from ..tls import TLSConfig from ..transport import SSLAdapter, UnixAdapter from ..utils import utils, check_resource, update_headers, config -from ..utils.socket import frames_iter, socket_raw_iter +from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor from ..utils.json_stream import json_stream try: from ..transport import NpipeAdapter @@ -381,19 +381,23 @@ class APIClient( for out in response.iter_content(chunk_size, decode): yield out - def _read_from_socket(self, response, stream, tty=False): + def _read_from_socket(self, response, stream, tty=True, demux=False): socket = self._get_raw_response_socket(response) - gen = None - if tty is False: - gen = frames_iter(socket) + gen = frames_iter(socket, tty) + + if demux: + # The generator will output tuples (stdout, stderr) + gen = (demux_adaptor(*frame) for frame in gen) else: - gen = socket_raw_iter(socket) + # The generator will output strings + gen = (data for (_, data) in gen) if stream: return gen else: - return six.binary_type().join(gen) + # Wait for all the frames, concatenate them, and return the result + return consume_socket_output(gen, demux=demux) def _disable_socket_timeout(self, socket): """ Depending on the combination of python version and whether we're diff --git a/docker/api/container.py b/docker/api/container.py index fce73af..ab3b1cf 100644 --- a/docker/api/container.py +++ b/docker/api/container.py @@ -13,7 +13,7 @@ from ..types import ( class ContainerApiMixin(object): @utils.check_resource('container') def attach(self, container, stdout=True, stderr=True, - stream=False, logs=False): + stream=False, logs=False, demux=False): """ Attach to a container. @@ -28,11 +28,15 @@ class ContainerApiMixin(object): stream (bool): Return container output progressively as an iterator of strings, rather than a single string. logs (bool): Include the container's previous output. + demux (bool): Keep stdout and stderr separate. Returns: - By default, the container's output as a single string. + By default, the container's output as a single string (two if + ``demux=True``: one for stdout and one for stderr). - If ``stream=True``, an iterator of output strings. + If ``stream=True``, an iterator of output strings. If + ``demux=True``, two iterators are returned: one for stdout and one + for stderr. Raises: :py:class:`docker.errors.APIError` @@ -54,8 +58,7 @@ class ContainerApiMixin(object): response = self._post(u, headers=headers, params=params, stream=True) output = self._read_from_socket( - response, stream, self._check_is_tty(container) - ) + response, stream, self._check_is_tty(container), demux=demux) if stream: return CancellableStream(output, response) diff --git a/docker/api/exec_api.py b/docker/api/exec_api.py index 986d87f..d13b128 100644 --- a/docker/api/exec_api.py +++ b/docker/api/exec_api.py @@ -118,7 +118,7 @@ class ExecApiMixin(object): @utils.check_resource('exec_id') def exec_start(self, exec_id, detach=False, tty=False, stream=False, - socket=False): + socket=False, demux=False): """ Start a previously set up exec instance. @@ -130,11 +130,14 @@ class ExecApiMixin(object): stream (bool): Stream response data. Default: False socket (bool): Return the connection socket to allow custom read/write operations. + demux (bool): Return stdout and stderr separately Returns: - (generator or str): If ``stream=True``, a generator yielding - response chunks. If ``socket=True``, a socket object for the - connection. A string containing response data otherwise. + + (generator or str or tuple): If ``stream=True``, a generator + yielding response chunks. If ``socket=True``, a socket object for + the connection. A string containing response data otherwise. If + ``demux=True``, stdout and stderr are separated. Raises: :py:class:`docker.errors.APIError` @@ -162,4 +165,4 @@ class ExecApiMixin(object): return self._result(res) if socket: return self._get_raw_response_socket(res) - return self._read_from_socket(res, stream, tty) + return self._read_from_socket(res, stream, tty=tty, demux=demux) diff --git a/docker/models/containers.py b/docker/models/containers.py index 34996ce..75d8c2e 100644 --- a/docker/models/containers.py +++ b/docker/models/containers.py @@ -144,7 +144,7 @@ class Container(Model): def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False, privileged=False, user='', detach=False, stream=False, - socket=False, environment=None, workdir=None): + socket=False, environment=None, workdir=None, demux=False): """ Run a command inside this container. Similar to ``docker exec``. @@ -166,6 +166,7 @@ class Container(Model): the following format ``["PASSWORD=xxx"]`` or ``{"PASSWORD": "xxx"}``. workdir (str): Path to working directory for this exec session + demux (bool): Return stdout and stderr separately Returns: (ExecResult): A tuple of (exit_code, output) @@ -180,6 +181,70 @@ class Container(Model): Raises: :py:class:`docker.errors.APIError` If the server returns an error. + + Example: + + Create a container that runs in the background + + >>> client = docker.from_env() + >>> container = client.containers.run( + ... 'bfirsh/reticulate-splines', detach=True) + + Prepare the command we are going to use. It prints "hello stdout" + in `stdout`, followed by "hello stderr" in `stderr`: + + >>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"' + + We'll run this command with all four the combinations of ``stream`` + and ``demux``. + + With ``stream=False`` and ``demux=False``, the output is a string + that contains both the `stdout` and the `stderr` output: + + >>> res = container.exec_run(cmd, stream=False, demux=False) + >>> res.output + b'hello stderr\nhello stdout\n' + + With ``stream=True``, and ``demux=False``, the output is a + generator that yields strings containing the output of both + `stdout` and `stderr`: + + >>> res = container.exec_run(cmd, stream=True, demux=False) + >>> next(res.output) + b'hello stdout\n' + >>> next(res.output) + b'hello stderr\n' + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + + With ``stream=True`` and ``demux=True``, the generator now + separates the streams, and yield tuples + ``(stdout, stderr)``: + + >>> res = container.exec_run(cmd, stream=True, demux=True) + >>> next(res.output) + (b'hello stdout\n', None) + >>> next(res.output) + (None, b'hello stderr\n') + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + + Finally, with ``stream=False`` and ``demux=True``, the whole output + is returned, but the streams are still separated: + + >>> res = container.exec_run(cmd, stream=True, demux=True) + >>> next(res.output) + (b'hello stdout\n', None) + >>> next(res.output) + (None, b'hello stderr\n') + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration """ resp = self.client.api.exec_create( self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty, @@ -187,7 +252,8 @@ class Container(Model): workdir=workdir ) exec_output = self.client.api.exec_start( - resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket + resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket, + demux=demux ) if socket or stream: return ExecResult(None, exec_output) diff --git a/docker/utils/socket.py b/docker/utils/socket.py index 7b96d4f..7ba9505 100644 --- a/docker/utils/socket.py +++ b/docker/utils/socket.py @@ -12,6 +12,10 @@ except ImportError: NpipeSocket = type(None) +STDOUT = 1 +STDERR = 2 + + class SocketError(Exception): pass @@ -51,28 +55,43 @@ def read_exactly(socket, n): return data -def next_frame_size(socket): +def next_frame_header(socket): """ - Returns the size of the next frame of data waiting to be read from socket, - according to the protocol defined here: + Returns the stream and size of the next frame of data waiting to be read + from socket, according to the protocol defined here: - https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container + https://docs.docker.com/engine/api/v1.24/#attach-to-a-container """ try: data = read_exactly(socket, 8) except SocketError: - return -1 + return (-1, -1) + + stream, actual = struct.unpack('>BxxxL', data) + return (stream, actual) + - _, actual = struct.unpack('>BxxxL', data) - return actual +def frames_iter(socket, tty): + """ + Return a generator of frames read from socket. A frame is a tuple where + the first item is the stream number and the second item is a chunk of data. + + If the tty setting is enabled, the streams are multiplexed into the stdout + stream. + """ + if tty: + return ((STDOUT, frame) for frame in frames_iter_tty(socket)) + else: + return frames_iter_no_tty(socket) -def frames_iter(socket): +def frames_iter_no_tty(socket): """ - Returns a generator of frames read from socket + Returns a generator of data read from the socket when the tty setting is + not enabled. """ while True: - n = next_frame_size(socket) + (stream, n) = next_frame_header(socket) if n < 0: break while n > 0: @@ -84,13 +103,13 @@ def frames_iter(socket): # We have reached EOF return n -= data_length - yield result + yield (stream, result) -def socket_raw_iter(socket): +def frames_iter_tty(socket): """ - Returns a generator of data read from the socket. - This is used for non-multiplexed streams. + Return a generator of data read from the socket when the tty setting is + enabled. """ while True: result = read(socket) @@ -98,3 +117,53 @@ def socket_raw_iter(socket): # We have reached EOF return yield result + + +def consume_socket_output(frames, demux=False): + """ + Iterate through frames read from the socket and return the result. + + Args: + + demux (bool): + If False, stdout and stderr are multiplexed, and the result is the + concatenation of all the frames. If True, the streams are + demultiplexed, and the result is a 2-tuple where each item is the + concatenation of frames belonging to the same stream. + """ + if demux is False: + # If the streams are multiplexed, the generator returns strings, that + # we just need to concatenate. + return six.binary_type().join(frames) + + # If the streams are demultiplexed, the generator yields tuples + # (stdout, stderr) + out = [None, None] + for frame in frames: + # It is guaranteed that for each frame, one and only one stream + # is not None. + assert frame != (None, None) + if frame[0] is not None: + if out[0] is None: + out[0] = frame[0] + else: + out[0] += frame[0] + else: + if out[1] is None: + out[1] = frame[1] + else: + out[1] += frame[1] + return tuple(out) + + +def demux_adaptor(stream_id, data): + """ + Utility to demultiplex stdout and stderr when reading frames from the + socket. + """ + if stream_id == STDOUT: + return (data, None) + elif stream_id == STDERR: + return (None, data) + else: + raise ValueError('{0} is not a valid stream'.format(stream_id)) diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py index 02f3603..83df342 100644 --- a/tests/integration/api_container_test.py +++ b/tests/integration/api_container_test.py @@ -7,7 +7,7 @@ from datetime import datetime import docker from docker.constants import IS_WINDOWS_PLATFORM -from docker.utils.socket import next_frame_size +from docker.utils.socket import next_frame_header from docker.utils.socket import read_exactly import pytest @@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest): self.client.start(container) - next_size = next_frame_size(pty_stdout) + (stream, next_size) = next_frame_header(pty_stdout) + assert stream == 1 # correspond to stdout assert next_size == len(line) data = read_exactly(pty_stdout, next_size) assert data.decode('utf-8') == line diff --git a/tests/integration/api_exec_test.py b/tests/integration/api_exec_test.py index 1a5a4e5..857a18c 100644 --- a/tests/integration/api_exec_test.py +++ b/tests/integration/api_exec_test.py @@ -1,4 +1,4 @@ -from docker.utils.socket import next_frame_size +from docker.utils.socket import next_frame_header from docker.utils.socket import read_exactly from .base import BaseAPIIntegrationTest, BUSYBOX @@ -75,6 +75,75 @@ class ExecTest(BaseAPIIntegrationTest): res += chunk assert res == b'hello\nworld\n' + def test_exec_command_demux(self): + container = self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + script = ' ; '.join([ + # Write something on stdout + 'echo hello out', + # Busybox's sleep does not handle sub-second times. + # This loops takes ~0.3 second to execute on my machine. + 'for i in $(seq 1 50000); do echo $i>/dev/null; done', + # Write something on stderr + 'echo hello err >&2']) + cmd = 'sh -c "{}"'.format(script) + + # tty=False, stream=False, demux=False + res = self.client.exec_create(id, cmd) + exec_log = self.client.exec_start(res) + assert exec_log == b'hello out\nhello err\n' + + # tty=False, stream=True, demux=False + res = self.client.exec_create(id, cmd) + exec_log = self.client.exec_start(res, stream=True) + assert next(exec_log) == b'hello out\n' + assert next(exec_log) == b'hello err\n' + with self.assertRaises(StopIteration): + next(exec_log) + + # tty=False, stream=False, demux=True + res = self.client.exec_create(id, cmd) + exec_log = self.client.exec_start(res, demux=True) + assert exec_log == (b'hello out\n', b'hello err\n') + + # tty=False, stream=True, demux=True + res = self.client.exec_create(id, cmd) + exec_log = self.client.exec_start(res, demux=True, stream=True) + assert next(exec_log) == (b'hello out\n', None) + assert next(exec_log) == (None, b'hello err\n') + with self.assertRaises(StopIteration): + next(exec_log) + + # tty=True, stream=False, demux=False + res = self.client.exec_create(id, cmd, tty=True) + exec_log = self.client.exec_start(res) + assert exec_log == b'hello out\r\nhello err\r\n' + + # tty=True, stream=True, demux=False + res = self.client.exec_create(id, cmd, tty=True) + exec_log = self.client.exec_start(res, stream=True) + assert next(exec_log) == b'hello out\r\n' + assert next(exec_log) == b'hello err\r\n' + with self.assertRaises(StopIteration): + next(exec_log) + + # tty=True, stream=False, demux=True + res = self.client.exec_create(id, cmd, tty=True) + exec_log = self.client.exec_start(res, demux=True) + assert exec_log == (b'hello out\r\nhello err\r\n', None) + + # tty=True, stream=True, demux=True + res = self.client.exec_create(id, cmd, tty=True) + exec_log = self.client.exec_start(res, demux=True, stream=True) + assert next(exec_log) == (b'hello out\r\n', None) + assert next(exec_log) == (b'hello err\r\n', None) + with self.assertRaises(StopIteration): + next(exec_log) + def test_exec_start_socket(self): container = self.client.create_container(BUSYBOX, 'cat', detach=True, stdin_open=True) @@ -91,7 +160,8 @@ class ExecTest(BaseAPIIntegrationTest): socket = self.client.exec_start(exec_id, socket=True) self.addCleanup(socket.close) - next_size = next_frame_size(socket) + (stream, next_size) = next_frame_header(socket) + assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr) assert next_size == len(line) data = read_exactly(socket, next_size) assert data.decode('utf-8') == line diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py index af2bb1c..fac314d 100644 --- a/tests/unit/api_test.py +++ b/tests/unit/api_test.py @@ -15,6 +15,7 @@ from docker.api import APIClient import requests from requests.packages import urllib3 import six +import struct from . import fake_api @@ -83,7 +84,7 @@ def fake_delete(self, url, *args, **kwargs): return fake_request('DELETE', url, *args, **kwargs) -def fake_read_from_socket(self, response, stream, tty=False): +def fake_read_from_socket(self, response, stream, tty=False, demux=False): return six.binary_type() @@ -467,24 +468,25 @@ class UnixSocketStreamTest(unittest.TestCase): class TCPSocketStreamTest(unittest.TestCase): - text_data = b''' + stdout_data = b''' Now, those children out there, they're jumping through the flames in the hope that the god of the fire will make them fruitful. Really, you can't blame them. After all, what girl would not prefer the child of a god to that of some acne-scarred artisan? ''' + stderr_data = b''' + And what of the true God? To whose glory churches and monasteries have been + built on these islands for generations past? Now shall what of Him? + ''' def setUp(self): - self.server = six.moves.socketserver.ThreadingTCPServer( - ('', 0), self.get_handler_class() - ) + ('', 0), self.get_handler_class()) self.thread = threading.Thread(target=self.server.serve_forever) self.thread.setDaemon(True) self.thread.start() self.address = 'http://{}:{}'.format( - socket.gethostname(), self.server.server_address[1] - ) + socket.gethostname(), self.server.server_address[1]) def tearDown(self): self.server.shutdown() @@ -492,31 +494,95 @@ class TCPSocketStreamTest(unittest.TestCase): self.thread.join() def get_handler_class(self): - text_data = self.text_data + stdout_data = self.stdout_data + stderr_data = self.stderr_data class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object): def do_POST(self): + resp_data = self.get_resp_data() self.send_response(101) self.send_header( - 'Content-Type', 'application/vnd.docker.raw-stream' - ) + 'Content-Type', 'application/vnd.docker.raw-stream') self.send_header('Connection', 'Upgrade') self.send_header('Upgrade', 'tcp') self.end_headers() self.wfile.flush() time.sleep(0.2) - self.wfile.write(text_data) + self.wfile.write(resp_data) self.wfile.flush() + def get_resp_data(self): + path = self.path.split('/')[-1] + if path == 'tty': + return stdout_data + stderr_data + elif path == 'no-tty': + data = b'' + data += self.frame_header(1, stdout_data) + data += stdout_data + data += self.frame_header(2, stderr_data) + data += stderr_data + return data + else: + raise Exception('Unknown path {0}'.format(path)) + + @staticmethod + def frame_header(stream, data): + return struct.pack('>BxxxL', stream, len(data)) + return Handler - def test_read_from_socket(self): + def request(self, stream=None, tty=None, demux=None): + assert stream is not None and tty is not None and demux is not None with APIClient(base_url=self.address) as client: - resp = client._post(client._url('/dummy'), stream=True) - data = client._read_from_socket(resp, stream=True, tty=True) - results = b''.join(data) - - assert results == self.text_data + if tty: + url = client._url('/tty') + else: + url = client._url('/no-tty') + resp = client._post(url, stream=True) + return client._read_from_socket( + resp, stream=stream, tty=tty, demux=demux) + + def test_read_from_socket_1(self): + res = self.request(stream=True, tty=True, demux=False) + assert next(res) == self.stdout_data + self.stderr_data + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_2(self): + res = self.request(stream=True, tty=True, demux=True) + assert next(res) == (self.stdout_data + self.stderr_data, None) + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_3(self): + res = self.request(stream=True, tty=False, demux=False) + assert next(res) == self.stdout_data + assert next(res) == self.stderr_data + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_4(self): + res = self.request(stream=True, tty=False, demux=True) + assert (self.stdout_data, None) == next(res) + assert (None, self.stderr_data) == next(res) + with self.assertRaises(StopIteration): + next(res) + + def test_read_from_socket_5(self): + res = self.request(stream=False, tty=True, demux=False) + assert res == self.stdout_data + self.stderr_data + + def test_read_from_socket_6(self): + res = self.request(stream=False, tty=True, demux=True) + assert res == (self.stdout_data + self.stderr_data, None) + + def test_read_from_socket_7(self): + res = self.request(stream=False, tty=False, demux=False) + res == self.stdout_data + self.stderr_data + + def test_read_from_socket_8(self): + res = self.request(stream=False, tty=False, demux=True) + assert res == (self.stdout_data, self.stderr_data) class UserAgentTest(unittest.TestCase): diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py index 39e409e..cb92c62 100644 --- a/tests/unit/models_containers_test.py +++ b/tests/unit/models_containers_test.py @@ -419,7 +419,8 @@ class ContainerTest(unittest.TestCase): workdir=None ) client.api.exec_start.assert_called_with( - FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False + FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False, + demux=False, ) def test_exec_run_failure(self): @@ -432,7 +433,8 @@ class ContainerTest(unittest.TestCase): workdir=None ) client.api.exec_start.assert_called_with( - FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False + FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False, + demux=False, ) def test_export(self): |