From 2a3da90ca4fcc67f9a428fec035055d5709df785 Mon Sep 17 00:00:00 2001 From: Joffrey F Date: Fri, 22 Aug 2014 16:54:45 +0200 Subject: Pull, push, and logs are always streamed. Pull and push is JSON-decoded on the fly. --- docker/client.py | 119 +++++++++++++++++++------------------------------------ tests/test.py | 106 ++++++++++++++++++++++++------------------------- 2 files changed, 91 insertions(+), 134 deletions(-) diff --git a/docker/client.py b/docker/client.py index d826727..774f03f 100644 --- a/docker/client.py +++ b/docker/client.py @@ -14,7 +14,6 @@ import json import os -import re import shlex import struct import warnings @@ -282,20 +281,6 @@ class Client(requests.Session): break yield data - def _multiplexed_buffer_helper(self, response): - """A generator of multiplexed data blocks read from a buffered - response.""" - buf = self._result(response, binary=True) - walker = 0 - while True: - if len(buf[walker:]) < 8: - break - _, length = struct.unpack_from('>BxxxL', buf[walker:]) - start = walker + STREAM_HEADER_SIZE_BYTES - end = start + length - walker = end - yield buf[start:end] - def _multiplexed_socket_stream_helper(self, response): """A generator of multiplexed data blocks coming from a response socket.""" @@ -339,26 +324,20 @@ class Client(requests.Session): 'stream': stream and 1 or 0, } u = self._url("/containers/{0}/attach".format(container)) - response = self._post(u, params=params, stream=stream) + response = self._post(u, params=params, stream=True) # Stream multi-plexing was only introduced in API v1.6. Anything before # that needs old-style streaming. if utils.compare_version('1.6', self._version) < 0: - def stream_result(): - self._raise_for_status(response) - for line in response.iter_lines(chunk_size=1, - decode_unicode=True): - # filter out keep-alive new lines - if line: - yield line - - return stream_result() if stream else \ - self._result(response, binary=True) - - sep = bytes() if six.PY3 else str() - - return stream and self._multiplexed_socket_stream_helper(response) or \ - sep.join([x for x in self._multiplexed_buffer_helper(response)]) + self._raise_for_status(response) + for line in response.iter_lines(chunk_size=1, + decode_unicode=True): + # filter out keep-alive new lines + if line: + yield line + else: + for line in self._multiplexed_socket_stream_helper(response): + yield line def attach_socket(self, container, params=None, ws=False): if params is None: @@ -379,8 +358,8 @@ class Client(requests.Session): u, None, params=self._attach_params(params), stream=True)) def build(self, path=None, tag=None, quiet=False, fileobj=None, - nocache=False, rm=False, stream=False, timeout=None, - custom_context=False, encoding=None): + nocache=False, rm=False, timeout=None, custom_context=False, + encoding=None): remote = context = headers = None if path is None and fileobj is None: raise TypeError("Either path or fileobj needs to be provided.") @@ -391,8 +370,7 @@ class Client(requests.Session): context = fileobj elif fileobj is not None: context = utils.mkbuildcontext(fileobj) - elif path.startswith(('http://', 'https://', - 'git://', 'github.com/')): + elif path.startswith(('http://', 'https://', 'git://', 'github.com/')): remote = path else: dockerignore = os.path.join(path, '.dockerignore') @@ -402,9 +380,6 @@ class Client(requests.Session): exclude = list(filter(bool, f.read().split('\n'))) context = utils.tar(path, exclude=exclude) - if utils.compare_version('1.8', self._version) >= 0: - stream = True - u = self._url('/build') params = { 't': tag, @@ -437,22 +412,15 @@ class Client(requests.Session): data=context, params=params, headers=headers, - stream=stream, + stream=True, timeout=timeout, ) if context is not None: context.close() - if stream: - return self._stream_helper(response) - else: - output = self._result(response) - srch = r'Successfully built ([0-9a-f]+)' - match = re.search(srch, output) - if not match: - return None, output - return match.group(1), output + for line in self._stream_helper(response): + yield json.loads(line) def commit(self, container, repository=None, tag=None, message=None, author=None, conf=None): @@ -666,7 +634,7 @@ class Client(requests.Session): self._auth_configs[registry] = req_data return self._result(response, json=True) - def logs(self, container, stdout=True, stderr=True, stream=False, + def logs(self, container, stdout=True, stderr=True, follow=False, timestamps=False): if isinstance(container, dict): container = container.get('Id') @@ -674,26 +642,21 @@ class Client(requests.Session): params = {'stderr': stderr and 1 or 0, 'stdout': stdout and 1 or 0, 'timestamps': timestamps and 1 or 0, - 'follow': stream and 1 or 0} + 'follow': follow and 1 or 0} url = self._url("/containers/{0}/logs".format(container)) - res = self._get(url, params=params, stream=stream) - if stream: - return self._multiplexed_socket_stream_helper(res) - elif six.PY3: - return bytes().join( - [x for x in self._multiplexed_buffer_helper(res)] - ) - else: - return str().join( - [x for x in self._multiplexed_buffer_helper(res)] - ) - return self.attach( - container, - stdout=stdout, - stderr=stderr, - stream=stream, - logs=True - ) + res = self._get(url, params=params, stream=True) + for line in self._multiplexed_socket_stream_helper(res): + yield line + else: + strm = self.attach( + container, + stdout=stdout, + stderr=stderr, + stream=True, + logs=True + ) + for line in strm: + yield line def ping(self): return self._result(self._get(self._url('/_ping'))) @@ -713,7 +676,7 @@ class Client(requests.Session): return h_ports - def pull(self, repository, tag=None, stream=False): + def pull(self, repository, tag=None): if not tag: repository, tag = utils.parse_repository_tag(repository) registry, repo_name = auth.resolve_repository_name(repository) @@ -740,14 +703,12 @@ class Client(requests.Session): headers['X-Registry-Auth'] = auth.encode_header(authcfg) response = self._post(self._url('/images/create'), params=params, - headers=headers, stream=stream, timeout=None) + headers=headers, stream=True, timeout=None) - if stream: - return self._stream_helper(response) - else: - return self._result(response) + for line in self._stream_helper(response): + yield json.loads(line) - def push(self, repository, stream=False): + def push(self, repository): registry, repo_name = auth.resolve_repository_name(repository) u = self._url("/images/{0}/push".format(repository)) headers = {} @@ -765,12 +726,12 @@ class Client(requests.Session): if authcfg: headers['X-Registry-Auth'] = auth.encode_header(authcfg) - response = self._post_json(u, None, headers=headers, stream=stream) + response = self._post_json(u, None, headers=headers) else: - response = self._post_json(u, None, stream=stream) + response = self._post_json(u, None) - return stream and self._stream_helper(response) \ - or self._result(response) + for line in self._stream_helper(response): + yield json.loads(line) def remove_container(self, container, v=False, link=False, force=False): if isinstance(container, dict): diff --git a/tests/test.py b/tests/test.py index 5b21a81..d309196 100644 --- a/tests/test.py +++ b/tests/test.py @@ -28,7 +28,6 @@ import gzip import docker import requests import six - import fake_api @@ -65,6 +64,37 @@ url_prefix = 'http+unix://var/run/docker.sock/v{0}/'.format( docker.client.DEFAULT_DOCKER_API_VERSION) +def fake_get_raw_response_socket(self, response): + class FakeSocket(object): + def __init__(self, content): + self.content = content + self.idx = 0 + + def __iter__(self): + for line in self.content: + yield line + + def settimeout(self, noop): + return + + def setblocking(self, noop): + return + + def makefile(self): + file_contents = '{0}\r\n{1}'.format( + len(self.content), self.content.decode('ascii') + ) + if six.PY3: + return io.StringIO(file_contents) + return io.StringIO(unicode(file_contents)) + + def recv(self, size): + self.idx += size + return self.content[self.idx - size:self.idx] + + return FakeSocket(response.content) + + class Cleanup(object): if sys.version_info < (2, 7): # Provide a basic implementation of addCleanup for Python < 2.7 @@ -92,6 +122,8 @@ class Cleanup(object): @mock.patch.multiple('docker.Client', get=fake_request, post=fake_request, put=fake_request, delete=fake_request) +@mock.patch('docker.Client._get_raw_response_socket', + fake_get_raw_response_socket) class DockerClientTest(Cleanup, unittest.TestCase): def setUp(self): self.client = docker.Client() @@ -882,15 +914,19 @@ class DockerClientTest(Cleanup, unittest.TestCase): except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) + result = bytes() if six.PY3 else str() + for line in logs: + result += line + fake_request.assert_called_with( url_prefix + 'containers/3cc2351ab11b/logs', params={'timestamps': 0, 'follow': 0, 'stderr': 1, 'stdout': 1}, timeout=docker.client.DEFAULT_TIMEOUT_SECONDS, - stream=False + stream=True ) self.assertEqual( - logs, + result, 'Flowering Nights\n(Sakuya Iyazoi)\n'.encode('ascii') ) @@ -900,31 +936,22 @@ class DockerClientTest(Cleanup, unittest.TestCase): except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) + result = bytes() if six.PY3 else str() + for line in logs: + result += line + fake_request.assert_called_with( url_prefix + 'containers/3cc2351ab11b/logs', params={'timestamps': 0, 'follow': 0, 'stderr': 1, 'stdout': 1}, timeout=docker.client.DEFAULT_TIMEOUT_SECONDS, - stream=False + stream=True ) self.assertEqual( - logs, + result, 'Flowering Nights\n(Sakuya Iyazoi)\n'.encode('ascii') ) - def test_log_streaming(self): - try: - self.client.logs(fake_api.FAKE_CONTAINER_ID, stream=True) - except Exception as e: - self.fail('Command should not raise exception: {0}'.format(e)) - - fake_request.assert_called_with( - url_prefix + 'containers/3cc2351ab11b/logs', - params={'timestamps': 0, 'follow': 1, 'stderr': 1, 'stdout': 1}, - timeout=docker.client.DEFAULT_TIMEOUT_SECONDS, - stream=True - ) - def test_diff(self): try: self.client.diff(fake_api.FAKE_CONTAINER_ID) @@ -1122,7 +1149,7 @@ class DockerClientTest(Cleanup, unittest.TestCase): def test_pull(self): try: - self.client.pull('joffrey/test001') + next(self.client.pull('joffrey/test001')) except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) @@ -1135,11 +1162,11 @@ class DockerClientTest(Cleanup, unittest.TestCase): args[1]['params'], {'tag': None, 'fromImage': 'joffrey/test001'} ) - self.assertFalse(args[1]['stream']) + self.assertTrue(args[1]['stream']) - def test_pull_stream(self): + def test_pull_tag(self): try: - self.client.pull('joffrey/test001', stream=True) + next(self.client.pull('joffrey/test001:latest')) except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) @@ -1150,7 +1177,7 @@ class DockerClientTest(Cleanup, unittest.TestCase): ) self.assertEqual( args[1]['params'], - {'tag': None, 'fromImage': 'joffrey/test001'} + {'tag': 'latest', 'fromImage': 'joffrey/test001'} ) self.assertTrue(args[1]['stream']) @@ -1300,7 +1327,7 @@ class DockerClientTest(Cleanup, unittest.TestCase): try: with mock.patch('docker.auth.auth.resolve_authconfig', fake_resolve_authconfig): - self.client.push(fake_api.FAKE_IMAGE_NAME) + next(self.client.push(fake_api.FAKE_IMAGE_NAME)) except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) @@ -1308,23 +1335,6 @@ class DockerClientTest(Cleanup, unittest.TestCase): url_prefix + 'images/test_image/push', data='{}', headers={'Content-Type': 'application/json'}, - stream=False, - timeout=docker.client.DEFAULT_TIMEOUT_SECONDS - ) - - def test_push_image_stream(self): - try: - with mock.patch('docker.auth.auth.resolve_authconfig', - fake_resolve_authconfig): - self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True) - except Exception as e: - self.fail('Command should not raise exception: {0}'.format(e)) - - fake_request.assert_called_with( - url_prefix + 'images/test_image/push', - data='{}', - headers={'Content-Type': 'application/json'}, - stream=True, timeout=docker.client.DEFAULT_TIMEOUT_SECONDS ) @@ -1423,20 +1433,6 @@ class DockerClientTest(Cleanup, unittest.TestCase): except Exception as e: self.fail('Command should not raise exception: {0}'.format(e)) - def test_build_container_stream(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - try: - self.client.build(fileobj=script, stream=True) - except Exception as e: - self.fail('Command should not raise exception: {0}'.format(e)) - def test_build_container_custom_context(self): script = io.BytesIO('\n'.join([ 'FROM busybox', -- cgit v1.2.1