summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2018-11-30 14:48:19 -0800
committerJoffrey F <joffrey@docker.com>2018-11-30 14:48:19 -0800
commitee6ec4c6e81ffd2657d5532df6946eed0946fb93 (patch)
tree65b082047948c76a688ac773b36ebd8392d4c393
parent666388168dad15b7c1a24a4f21eaa55b0119e2a5 (diff)
parent7b3b83dfdbb9f4270dcf54e1449645efc045dfd3 (diff)
downloaddocker-py-ee6ec4c6e81ffd2657d5532df6946eed0946fb93.tar.gz
Merge branch 'master' of https://github.com/little-dude/docker-py into little-dude-master
-rw-r--r--docker/api/client.py18
-rw-r--r--docker/api/container.py13
-rw-r--r--docker/api/exec_api.py13
-rw-r--r--docker/models/containers.py70
-rw-r--r--docker/utils/socket.py97
-rw-r--r--tests/integration/api_container_test.py5
-rw-r--r--tests/integration/api_exec_test.py74
-rw-r--r--tests/unit/api_test.py100
-rw-r--r--tests/unit/models_containers_test.py6
9 files changed, 340 insertions, 56 deletions
diff --git a/docker/api/client.py b/docker/api/client.py
index 197846d..8a5a60b 100644
--- a/docker/api/client.py
+++ b/docker/api/client.py
@@ -32,7 +32,7 @@ from ..errors import (
from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config
-from ..utils.socket import frames_iter, socket_raw_iter
+from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream
try:
from ..transport import NpipeAdapter
@@ -381,19 +381,23 @@ class APIClient(
for out in response.iter_content(chunk_size, decode):
yield out
- def _read_from_socket(self, response, stream, tty=False):
+ def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response)
- gen = None
- if tty is False:
- gen = frames_iter(socket)
+ gen = frames_iter(socket, tty)
+
+ if demux:
+ # The generator will output tuples (stdout, stderr)
+ gen = (demux_adaptor(*frame) for frame in gen)
else:
- gen = socket_raw_iter(socket)
+ # The generator will output strings
+ gen = (data for (_, data) in gen)
if stream:
return gen
else:
- return six.binary_type().join(gen)
+ # Wait for all the frames, concatenate them, and return the result
+ return consume_socket_output(gen, demux=demux)
def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
diff --git a/docker/api/container.py b/docker/api/container.py
index fce73af..ab3b1cf 100644
--- a/docker/api/container.py
+++ b/docker/api/container.py
@@ -13,7 +13,7 @@ from ..types import (
class ContainerApiMixin(object):
@utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True,
- stream=False, logs=False):
+ stream=False, logs=False, demux=False):
"""
Attach to a container.
@@ -28,11 +28,15 @@ class ContainerApiMixin(object):
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
+ demux (bool): Keep stdout and stderr separate.
Returns:
- By default, the container's output as a single string.
+ By default, the container's output as a single string (two if
+ ``demux=True``: one for stdout and one for stderr).
- If ``stream=True``, an iterator of output strings.
+ If ``stream=True``, an iterator of output strings. If
+ ``demux=True``, two iterators are returned: one for stdout and one
+ for stderr.
Raises:
:py:class:`docker.errors.APIError`
@@ -54,8 +58,7 @@ class ContainerApiMixin(object):
response = self._post(u, headers=headers, params=params, stream=True)
output = self._read_from_socket(
- response, stream, self._check_is_tty(container)
- )
+ response, stream, self._check_is_tty(container), demux=demux)
if stream:
return CancellableStream(output, response)
diff --git a/docker/api/exec_api.py b/docker/api/exec_api.py
index 986d87f..d13b128 100644
--- a/docker/api/exec_api.py
+++ b/docker/api/exec_api.py
@@ -118,7 +118,7 @@ class ExecApiMixin(object):
@utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False,
- socket=False):
+ socket=False, demux=False):
"""
Start a previously set up exec instance.
@@ -130,11 +130,14 @@ class ExecApiMixin(object):
stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom
read/write operations.
+ demux (bool): Return stdout and stderr separately
Returns:
- (generator or str): If ``stream=True``, a generator yielding
- response chunks. If ``socket=True``, a socket object for the
- connection. A string containing response data otherwise.
+
+ (generator or str or tuple): If ``stream=True``, a generator
+ yielding response chunks. If ``socket=True``, a socket object for
+ the connection. A string containing response data otherwise. If
+ ``demux=True``, stdout and stderr are separated.
Raises:
:py:class:`docker.errors.APIError`
@@ -162,4 +165,4 @@ class ExecApiMixin(object):
return self._result(res)
if socket:
return self._get_raw_response_socket(res)
- return self._read_from_socket(res, stream, tty)
+ return self._read_from_socket(res, stream, tty=tty, demux=demux)
diff --git a/docker/models/containers.py b/docker/models/containers.py
index 34996ce..75d8c2e 100644
--- a/docker/models/containers.py
+++ b/docker/models/containers.py
@@ -144,7 +144,7 @@ class Container(Model):
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
- socket=False, environment=None, workdir=None):
+ socket=False, environment=None, workdir=None, demux=False):
"""
Run a command inside this container. Similar to
``docker exec``.
@@ -166,6 +166,7 @@ class Container(Model):
the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session
+ demux (bool): Return stdout and stderr separately
Returns:
(ExecResult): A tuple of (exit_code, output)
@@ -180,6 +181,70 @@ class Container(Model):
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
+
+ Example:
+
+ Create a container that runs in the background
+
+ >>> client = docker.from_env()
+ >>> container = client.containers.run(
+ ... 'bfirsh/reticulate-splines', detach=True)
+
+ Prepare the command we are going to use. It prints "hello stdout"
+ in `stdout`, followed by "hello stderr" in `stderr`:
+
+ >>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'
+
+ We'll run this command with all four the combinations of ``stream``
+ and ``demux``.
+
+ With ``stream=False`` and ``demux=False``, the output is a string
+ that contains both the `stdout` and the `stderr` output:
+
+ >>> res = container.exec_run(cmd, stream=False, demux=False)
+ >>> res.output
+ b'hello stderr\nhello stdout\n'
+
+ With ``stream=True``, and ``demux=False``, the output is a
+ generator that yields strings containing the output of both
+ `stdout` and `stderr`:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=False)
+ >>> next(res.output)
+ b'hello stdout\n'
+ >>> next(res.output)
+ b'hello stderr\n'
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+
+ With ``stream=True`` and ``demux=True``, the generator now
+ separates the streams, and yield tuples
+ ``(stdout, stderr)``:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=True)
+ >>> next(res.output)
+ (b'hello stdout\n', None)
+ >>> next(res.output)
+ (None, b'hello stderr\n')
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+
+ Finally, with ``stream=False`` and ``demux=True``, the whole output
+ is returned, but the streams are still separated:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=True)
+ >>> next(res.output)
+ (b'hello stdout\n', None)
+ >>> next(res.output)
+ (None, b'hello stderr\n')
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
@@ -187,7 +252,8 @@ class Container(Model):
workdir=workdir
)
exec_output = self.client.api.exec_start(
- resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
+ resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
+ demux=demux
)
if socket or stream:
return ExecResult(None, exec_output)
diff --git a/docker/utils/socket.py b/docker/utils/socket.py
index 7b96d4f..7ba9505 100644
--- a/docker/utils/socket.py
+++ b/docker/utils/socket.py
@@ -12,6 +12,10 @@ except ImportError:
NpipeSocket = type(None)
+STDOUT = 1
+STDERR = 2
+
+
class SocketError(Exception):
pass
@@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data
-def next_frame_size(socket):
+def next_frame_header(socket):
"""
- Returns the size of the next frame of data waiting to be read from socket,
- according to the protocol defined here:
+ Returns the stream and size of the next frame of data waiting to be read
+ from socket, according to the protocol defined here:
- https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
+ https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
try:
data = read_exactly(socket, 8)
except SocketError:
- return -1
+ return (-1, -1)
+
+ stream, actual = struct.unpack('>BxxxL', data)
+ return (stream, actual)
+
- _, actual = struct.unpack('>BxxxL', data)
- return actual
+def frames_iter(socket, tty):
+ """
+ Return a generator of frames read from socket. A frame is a tuple where
+ the first item is the stream number and the second item is a chunk of data.
+
+ If the tty setting is enabled, the streams are multiplexed into the stdout
+ stream.
+ """
+ if tty:
+ return ((STDOUT, frame) for frame in frames_iter_tty(socket))
+ else:
+ return frames_iter_no_tty(socket)
-def frames_iter(socket):
+def frames_iter_no_tty(socket):
"""
- Returns a generator of frames read from socket
+ Returns a generator of data read from the socket when the tty setting is
+ not enabled.
"""
while True:
- n = next_frame_size(socket)
+ (stream, n) = next_frame_header(socket)
if n < 0:
break
while n > 0:
@@ -84,13 +103,13 @@ def frames_iter(socket):
# We have reached EOF
return
n -= data_length
- yield result
+ yield (stream, result)
-def socket_raw_iter(socket):
+def frames_iter_tty(socket):
"""
- Returns a generator of data read from the socket.
- This is used for non-multiplexed streams.
+ Return a generator of data read from the socket when the tty setting is
+ enabled.
"""
while True:
result = read(socket)
@@ -98,3 +117,53 @@ def socket_raw_iter(socket):
# We have reached EOF
return
yield result
+
+
+def consume_socket_output(frames, demux=False):
+ """
+ Iterate through frames read from the socket and return the result.
+
+ Args:
+
+ demux (bool):
+ If False, stdout and stderr are multiplexed, and the result is the
+ concatenation of all the frames. If True, the streams are
+ demultiplexed, and the result is a 2-tuple where each item is the
+ concatenation of frames belonging to the same stream.
+ """
+ if demux is False:
+ # If the streams are multiplexed, the generator returns strings, that
+ # we just need to concatenate.
+ return six.binary_type().join(frames)
+
+ # If the streams are demultiplexed, the generator yields tuples
+ # (stdout, stderr)
+ out = [None, None]
+ for frame in frames:
+ # It is guaranteed that for each frame, one and only one stream
+ # is not None.
+ assert frame != (None, None)
+ if frame[0] is not None:
+ if out[0] is None:
+ out[0] = frame[0]
+ else:
+ out[0] += frame[0]
+ else:
+ if out[1] is None:
+ out[1] = frame[1]
+ else:
+ out[1] += frame[1]
+ return tuple(out)
+
+
+def demux_adaptor(stream_id, data):
+ """
+ Utility to demultiplex stdout and stderr when reading frames from the
+ socket.
+ """
+ if stream_id == STDOUT:
+ return (data, None)
+ elif stream_id == STDERR:
+ return (None, data)
+ else:
+ raise ValueError('{0} is not a valid stream'.format(stream_id))
diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py
index 02f3603..83df342 100644
--- a/tests/integration/api_container_test.py
+++ b/tests/integration/api_container_test.py
@@ -7,7 +7,7 @@ from datetime import datetime
import docker
from docker.constants import IS_WINDOWS_PLATFORM
-from docker.utils.socket import next_frame_size
+from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
import pytest
@@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest):
self.client.start(container)
- next_size = next_frame_size(pty_stdout)
+ (stream, next_size) = next_frame_header(pty_stdout)
+ assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
diff --git a/tests/integration/api_exec_test.py b/tests/integration/api_exec_test.py
index 1a5a4e5..857a18c 100644
--- a/tests/integration/api_exec_test.py
+++ b/tests/integration/api_exec_test.py
@@ -1,4 +1,4 @@
-from docker.utils.socket import next_frame_size
+from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
from .base import BaseAPIIntegrationTest, BUSYBOX
@@ -75,6 +75,75 @@ class ExecTest(BaseAPIIntegrationTest):
res += chunk
assert res == b'hello\nworld\n'
+ def test_exec_command_demux(self):
+ container = self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ script = ' ; '.join([
+ # Write something on stdout
+ 'echo hello out',
+ # Busybox's sleep does not handle sub-second times.
+ # This loops takes ~0.3 second to execute on my machine.
+ 'for i in $(seq 1 50000); do echo $i>/dev/null; done',
+ # Write something on stderr
+ 'echo hello err >&2'])
+ cmd = 'sh -c "{}"'.format(script)
+
+ # tty=False, stream=False, demux=False
+ res = self.client.exec_create(id, cmd)
+ exec_log = self.client.exec_start(res)
+ assert exec_log == b'hello out\nhello err\n'
+
+ # tty=False, stream=True, demux=False
+ res = self.client.exec_create(id, cmd)
+ exec_log = self.client.exec_start(res, stream=True)
+ assert next(exec_log) == b'hello out\n'
+ assert next(exec_log) == b'hello err\n'
+ with self.assertRaises(StopIteration):
+ next(exec_log)
+
+ # tty=False, stream=False, demux=True
+ res = self.client.exec_create(id, cmd)
+ exec_log = self.client.exec_start(res, demux=True)
+ assert exec_log == (b'hello out\n', b'hello err\n')
+
+ # tty=False, stream=True, demux=True
+ res = self.client.exec_create(id, cmd)
+ exec_log = self.client.exec_start(res, demux=True, stream=True)
+ assert next(exec_log) == (b'hello out\n', None)
+ assert next(exec_log) == (None, b'hello err\n')
+ with self.assertRaises(StopIteration):
+ next(exec_log)
+
+ # tty=True, stream=False, demux=False
+ res = self.client.exec_create(id, cmd, tty=True)
+ exec_log = self.client.exec_start(res)
+ assert exec_log == b'hello out\r\nhello err\r\n'
+
+ # tty=True, stream=True, demux=False
+ res = self.client.exec_create(id, cmd, tty=True)
+ exec_log = self.client.exec_start(res, stream=True)
+ assert next(exec_log) == b'hello out\r\n'
+ assert next(exec_log) == b'hello err\r\n'
+ with self.assertRaises(StopIteration):
+ next(exec_log)
+
+ # tty=True, stream=False, demux=True
+ res = self.client.exec_create(id, cmd, tty=True)
+ exec_log = self.client.exec_start(res, demux=True)
+ assert exec_log == (b'hello out\r\nhello err\r\n', None)
+
+ # tty=True, stream=True, demux=True
+ res = self.client.exec_create(id, cmd, tty=True)
+ exec_log = self.client.exec_start(res, demux=True, stream=True)
+ assert next(exec_log) == (b'hello out\r\n', None)
+ assert next(exec_log) == (b'hello err\r\n', None)
+ with self.assertRaises(StopIteration):
+ next(exec_log)
+
def test_exec_start_socket(self):
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
@@ -91,7 +160,8 @@ class ExecTest(BaseAPIIntegrationTest):
socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close)
- next_size = next_frame_size(socket)
+ (stream, next_size) = next_frame_header(socket)
+ assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr)
assert next_size == len(line)
data = read_exactly(socket, next_size)
assert data.decode('utf-8') == line
diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py
index af2bb1c..fac314d 100644
--- a/tests/unit/api_test.py
+++ b/tests/unit/api_test.py
@@ -15,6 +15,7 @@ from docker.api import APIClient
import requests
from requests.packages import urllib3
import six
+import struct
from . import fake_api
@@ -83,7 +84,7 @@ def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
-def fake_read_from_socket(self, response, stream, tty=False):
+def fake_read_from_socket(self, response, stream, tty=False, demux=False):
return six.binary_type()
@@ -467,24 +468,25 @@ class UnixSocketStreamTest(unittest.TestCase):
class TCPSocketStreamTest(unittest.TestCase):
- text_data = b'''
+ stdout_data = b'''
Now, those children out there, they're jumping through the
flames in the hope that the god of the fire will make them fruitful.
Really, you can't blame them. After all, what girl would not prefer the
child of a god to that of some acne-scarred artisan?
'''
+ stderr_data = b'''
+ And what of the true God? To whose glory churches and monasteries have been
+ built on these islands for generations past? Now shall what of Him?
+ '''
def setUp(self):
-
self.server = six.moves.socketserver.ThreadingTCPServer(
- ('', 0), self.get_handler_class()
- )
+ ('', 0), self.get_handler_class())
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.setDaemon(True)
self.thread.start()
self.address = 'http://{}:{}'.format(
- socket.gethostname(), self.server.server_address[1]
- )
+ socket.gethostname(), self.server.server_address[1])
def tearDown(self):
self.server.shutdown()
@@ -492,31 +494,95 @@ class TCPSocketStreamTest(unittest.TestCase):
self.thread.join()
def get_handler_class(self):
- text_data = self.text_data
+ stdout_data = self.stdout_data
+ stderr_data = self.stderr_data
class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object):
def do_POST(self):
+ resp_data = self.get_resp_data()
self.send_response(101)
self.send_header(
- 'Content-Type', 'application/vnd.docker.raw-stream'
- )
+ 'Content-Type', 'application/vnd.docker.raw-stream')
self.send_header('Connection', 'Upgrade')
self.send_header('Upgrade', 'tcp')
self.end_headers()
self.wfile.flush()
time.sleep(0.2)
- self.wfile.write(text_data)
+ self.wfile.write(resp_data)
self.wfile.flush()
+ def get_resp_data(self):
+ path = self.path.split('/')[-1]
+ if path == 'tty':
+ return stdout_data + stderr_data
+ elif path == 'no-tty':
+ data = b''
+ data += self.frame_header(1, stdout_data)
+ data += stdout_data
+ data += self.frame_header(2, stderr_data)
+ data += stderr_data
+ return data
+ else:
+ raise Exception('Unknown path {0}'.format(path))
+
+ @staticmethod
+ def frame_header(stream, data):
+ return struct.pack('>BxxxL', stream, len(data))
+
return Handler
- def test_read_from_socket(self):
+ def request(self, stream=None, tty=None, demux=None):
+ assert stream is not None and tty is not None and demux is not None
with APIClient(base_url=self.address) as client:
- resp = client._post(client._url('/dummy'), stream=True)
- data = client._read_from_socket(resp, stream=True, tty=True)
- results = b''.join(data)
-
- assert results == self.text_data
+ if tty:
+ url = client._url('/tty')
+ else:
+ url = client._url('/no-tty')
+ resp = client._post(url, stream=True)
+ return client._read_from_socket(
+ resp, stream=stream, tty=tty, demux=demux)
+
+ def test_read_from_socket_1(self):
+ res = self.request(stream=True, tty=True, demux=False)
+ assert next(res) == self.stdout_data + self.stderr_data
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_2(self):
+ res = self.request(stream=True, tty=True, demux=True)
+ assert next(res) == (self.stdout_data + self.stderr_data, None)
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_3(self):
+ res = self.request(stream=True, tty=False, demux=False)
+ assert next(res) == self.stdout_data
+ assert next(res) == self.stderr_data
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_4(self):
+ res = self.request(stream=True, tty=False, demux=True)
+ assert (self.stdout_data, None) == next(res)
+ assert (None, self.stderr_data) == next(res)
+ with self.assertRaises(StopIteration):
+ next(res)
+
+ def test_read_from_socket_5(self):
+ res = self.request(stream=False, tty=True, demux=False)
+ assert res == self.stdout_data + self.stderr_data
+
+ def test_read_from_socket_6(self):
+ res = self.request(stream=False, tty=True, demux=True)
+ assert res == (self.stdout_data + self.stderr_data, None)
+
+ def test_read_from_socket_7(self):
+ res = self.request(stream=False, tty=False, demux=False)
+ res == self.stdout_data + self.stderr_data
+
+ def test_read_from_socket_8(self):
+ res = self.request(stream=False, tty=False, demux=True)
+ assert res == (self.stdout_data, self.stderr_data)
class UserAgentTest(unittest.TestCase):
diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py
index 39e409e..cb92c62 100644
--- a/tests/unit/models_containers_test.py
+++ b/tests/unit/models_containers_test.py
@@ -419,7 +419,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
- FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False
+ FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False,
+ demux=False,
)
def test_exec_run_failure(self):
@@ -432,7 +433,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
- FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False
+ FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False,
+ demux=False,
)
def test_export(self):