diff options
author | Joffrey F <joffrey@docker.com> | 2015-09-23 17:42:29 -0700 |
---|---|---|
committer | Joffrey F <joffrey@docker.com> | 2015-10-21 16:02:09 -0700 |
commit | 93a296fb0448d9fccdf9f40f7a9996f49ea22c48 (patch) | |
tree | 633371cbdf88435664b530c4ca835c4929d92274 | |
parent | 5a1c7ed8bf0ac9a3914de7c80c1c29c13f6a62ea (diff) | |
download | docker-py-reorganize_tests.tar.gz |
Reorganize test directoriesreorganize_tests
More clearly separate unit and integration tests
Allow splitting into multiple files
Cleaner
Signed-off-by: Joffrey F <joffrey@docker.com>
28 files changed, 3731 insertions, 3713 deletions
@@ -17,21 +17,21 @@ build-dind-certs: test: flake8 unit-test unit-test-py3 integration-dind integration-dind-ssl unit-test: build - docker run docker-py py.test tests/test.py tests/utils_test.py + docker run docker-py py.test tests/unit unit-test-py3: build-py3 - docker run docker-py3 py.test tests/test.py tests/utils_test.py + docker run docker-py3 py.test tests/unit integration-test: build - docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration_test.py + docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration integration-test-py3: build-py3 - docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration_test.py + docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration integration-dind: build build-py3 docker run -d --name dpy-dind --env="DOCKER_HOST=tcp://localhost:2375" --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375 - docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration_test.py - docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration_test.py + docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration + docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration docker rm -vf dpy-dind integration-dind-ssl: build-dind-certs build build-py3 diff --git a/tests/base.py b/tests/base.py index 51b2300..a2c01fc 100644 --- a/tests/base.py +++ b/tests/base.py @@ -21,3 +21,28 @@ def requires_api_version(version): ), reason="API version is too low (< {0})".format(version) ) + + +class Cleanup(object): + if sys.version_info < (2, 7): + # Provide a basic implementation of addCleanup for Python < 2.7 + def __init__(self, *args, **kwargs): + super(Cleanup, self).__init__(*args, **kwargs) + self._cleanups = [] + + def tearDown(self): + super(Cleanup, self).tearDown() + ok = True + while self._cleanups: + fn, args, kwargs = self._cleanups.pop(-1) + try: + fn(*args, **kwargs) + except KeyboardInterrupt: + raise + except: + ok = False + if not ok: + raise + + def addCleanup(self, function, *args, **kwargs): + self._cleanups.append((function, args, kwargs)) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..20c0a0a --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,13 @@ +# flake8: noqa + +# FIXME: crutch while we transition to the new folder architecture +# Remove imports when merged in master and Jenkins is updated to find the +# tests in the new location. +from .api_test import * +from .build_test import * +from .container_test import * +from .exec_test import * +from .image_test import * +from .network_test import * +from .regression_test import * +from .volume_test import * diff --git a/tests/integration/api_test.py b/tests/integration/api_test.py new file mode 100644 index 0000000..1b33728 --- /dev/null +++ b/tests/integration/api_test.py @@ -0,0 +1,292 @@ +import base64 +import json +import os +import shutil +import tempfile +import time +import unittest +import warnings + +import docker +import six + +BUSYBOX = 'busybox:buildroot-2014.02' +EXEC_DRIVER = [] + + +def exec_driver_is_native(): + global EXEC_DRIVER + if not EXEC_DRIVER: + c = docker_client() + EXEC_DRIVER = c.info()['ExecutionDriver'] + c.close() + return EXEC_DRIVER.startswith('native') + + +def docker_client(**kwargs): + return docker.Client(**docker_client_kwargs(**kwargs)) + + +def docker_client_kwargs(**kwargs): + client_kwargs = docker.utils.kwargs_from_env(assert_hostname=False) + client_kwargs.update(kwargs) + return client_kwargs + + +def setup_module(): + warnings.simplefilter('error') + c = docker_client() + try: + c.inspect_image(BUSYBOX) + except docker.errors.NotFound: + os.write(2, "\npulling busybox\n".encode('utf-8')) + for data in c.pull(BUSYBOX, stream=True): + data = json.loads(data.decode('utf-8')) + os.write(2, ("%c[2K\r" % 27).encode('utf-8')) + status = data.get("status") + progress = data.get("progress") + detail = "{0} - {1}".format(status, progress).encode('utf-8') + os.write(2, detail) + os.write(2, "\npulled busybox\n".encode('utf-8')) + + # Double make sure we now have busybox + c.inspect_image(BUSYBOX) + c.close() + + +class BaseTestCase(unittest.TestCase): + tmp_imgs = [] + tmp_containers = [] + tmp_folders = [] + tmp_volumes = [] + + def setUp(self): + if six.PY2: + self.assertRegex = self.assertRegexpMatches + self.assertCountEqual = self.assertItemsEqual + self.client = docker_client(timeout=60) + self.tmp_imgs = [] + self.tmp_containers = [] + self.tmp_folders = [] + self.tmp_volumes = [] + self.tmp_networks = [] + + def tearDown(self): + for img in self.tmp_imgs: + try: + self.client.remove_image(img) + except docker.errors.APIError: + pass + for container in self.tmp_containers: + try: + self.client.stop(container, timeout=1) + self.client.remove_container(container) + except docker.errors.APIError: + pass + for network in self.tmp_networks: + try: + self.client.remove_network(network) + except docker.errors.APIError: + pass + for folder in self.tmp_folders: + shutil.rmtree(folder) + + for volume in self.tmp_volumes: + try: + self.client.remove_volume(volume) + except docker.errors.APIError: + pass + + self.client.close() + + def run_container(self, *args, **kwargs): + container = self.client.create_container(*args, **kwargs) + self.tmp_containers.append(container) + self.client.start(container) + exitcode = self.client.wait(container) + + if exitcode != 0: + output = self.client.logs(container) + raise Exception( + "Container exited with code {}:\n{}" + .format(exitcode, output)) + + return container + + +######################### +# INFORMATION TESTS # +######################### + + +class InformationTest(BaseTestCase): + def test_version(self): + res = self.client.version() + self.assertIn('GoVersion', res) + self.assertIn('Version', res) + self.assertEqual(len(res['Version'].split('.')), 3) + + def test_info(self): + res = self.client.info() + self.assertIn('Containers', res) + self.assertIn('Images', res) + self.assertIn('Debug', res) + + def test_search(self): + self.client = docker_client(timeout=10) + res = self.client.search('busybox') + self.assertTrue(len(res) >= 1) + base_img = [x for x in res if x['name'] == 'busybox'] + self.assertEqual(len(base_img), 1) + self.assertIn('description', base_img[0]) + + +################# +# LINKS TESTS # +################# + + +class LinkTest(BaseTestCase): + def test_remove_link(self): + # Create containers + container1 = self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True + ) + container1_id = container1['Id'] + self.tmp_containers.append(container1_id) + self.client.start(container1_id) + + # Create Link + # we don't want the first / + link_path = self.client.inspect_container(container1_id)['Name'][1:] + link_alias = 'mylink' + + container2 = self.client.create_container( + BUSYBOX, 'cat', host_config=self.client.create_host_config( + links={link_path: link_alias}, network_mode='none' + ) + ) + container2_id = container2['Id'] + self.tmp_containers.append(container2_id) + self.client.start(container2_id) + + # Remove link + linked_name = self.client.inspect_container(container2_id)['Name'][1:] + link_name = '%s/%s' % (linked_name, link_alias) + self.client.remove_container(link_name, link=True) + + # Link is gone + containers = self.client.containers(all=True) + retrieved = [x for x in containers if link_name in x['Names']] + self.assertEqual(len(retrieved), 0) + + # Containers are still there + retrieved = [ + x for x in containers if x['Id'].startswith(container1_id) or + x['Id'].startswith(container2_id) + ] + self.assertEqual(len(retrieved), 2) + + +####################### +# PY SPECIFIC TESTS # +####################### + +class LoadConfigTest(BaseTestCase): + def test_load_legacy_config(self): + folder = tempfile.mkdtemp() + self.tmp_folders.append(folder) + cfg_path = os.path.join(folder, '.dockercfg') + f = open(cfg_path, 'w') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + f.write('auth = {0}\n'.format(auth_)) + f.write('email = sakuya@scarlet.net') + f.close() + cfg = docker.auth.load_config(cfg_path) + self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None) + cfg = cfg[docker.auth.INDEX_NAME] + self.assertEqual(cfg['username'], 'sakuya') + self.assertEqual(cfg['password'], 'izayoi') + self.assertEqual(cfg['email'], 'sakuya@scarlet.net') + self.assertEqual(cfg.get('Auth'), None) + + def test_load_json_config(self): + folder = tempfile.mkdtemp() + self.tmp_folders.append(folder) + cfg_path = os.path.join(folder, '.dockercfg') + f = open(os.path.join(folder, '.dockercfg'), 'w') + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + email_ = 'sakuya@scarlet.net' + f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format( + docker.auth.INDEX_URL, auth_, email_)) + f.close() + cfg = docker.auth.load_config(cfg_path) + self.assertNotEqual(cfg[docker.auth.INDEX_URL], None) + cfg = cfg[docker.auth.INDEX_URL] + self.assertEqual(cfg['username'], 'sakuya') + self.assertEqual(cfg['password'], 'izayoi') + self.assertEqual(cfg['email'], 'sakuya@scarlet.net') + self.assertEqual(cfg.get('Auth'), None) + + +class AutoDetectVersionTest(unittest.TestCase): + def test_client_init(self): + client = docker_client(version='auto') + client_version = client._version + api_version = client.version(api_version=False)['ApiVersion'] + self.assertEqual(client_version, api_version) + api_version_2 = client.version()['ApiVersion'] + self.assertEqual(client_version, api_version_2) + client.close() + + def test_auto_client(self): + client = docker.AutoVersionClient(**docker_client_kwargs()) + client_version = client._version + api_version = client.version(api_version=False)['ApiVersion'] + self.assertEqual(client_version, api_version) + api_version_2 = client.version()['ApiVersion'] + self.assertEqual(client_version, api_version_2) + client.close() + with self.assertRaises(docker.errors.DockerException): + docker.AutoVersionClient(**docker_client_kwargs(version='1.11')) + + +class ConnectionTimeoutTest(unittest.TestCase): + def setUp(self): + self.timeout = 0.5 + self.client = docker.client.Client(base_url='http://192.168.10.2:4243', + timeout=self.timeout) + + def test_timeout(self): + start = time.time() + res = None + # This call isn't supposed to complete, and it should fail fast. + try: + res = self.client.inspect_container('id') + except: + pass + end = time.time() + self.assertTrue(res is None) + self.assertTrue(end - start < 2 * self.timeout) + + +class UnixconnTest(unittest.TestCase): + """ + Test UNIX socket connection adapter. + """ + + def test_resource_warnings(self): + """ + Test no warnings are produced when using the client. + """ + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + + client = docker_client() + client.images() + client.close() + del client + + assert len(w) == 0, \ + "No warnings produced: {0}".format(w[0].message) diff --git a/tests/integration/build_test.py b/tests/integration/build_test.py new file mode 100644 index 0000000..289c0fe --- /dev/null +++ b/tests/integration/build_test.py @@ -0,0 +1,98 @@ +import io +import json +import os +import shutil +import tempfile + +import six + +from . import api_test +from ..base import requires_api_version + + +class BuildTest(api_test.BaseTestCase): + def test_build_streaming(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + stream = self.client.build(fileobj=script, stream=True) + logs = '' + for chunk in stream: + if six.PY3: + chunk = chunk.decode('utf-8') + json.loads(chunk) # ensure chunk is a single, valid JSON blob + logs += chunk + self.assertNotEqual(logs, '') + + def test_build_from_stringio(self): + if six.PY3: + return + script = io.StringIO(six.text_type('\n').join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ])) + stream = self.client.build(fileobj=script, stream=True) + logs = '' + for chunk in stream: + if six.PY3: + chunk = chunk.decode('utf-8') + logs += chunk + self.assertNotEqual(logs, '') + + @requires_api_version('1.8') + def test_build_with_dockerignore(self): + base_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base_dir) + + with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f: + f.write("\n".join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'ADD . /test', + ])) + + with open(os.path.join(base_dir, '.dockerignore'), 'w') as f: + f.write("\n".join([ + 'ignored', + 'Dockerfile', + '.dockerignore', + '', # empty line + ])) + + with open(os.path.join(base_dir, 'not-ignored'), 'w') as f: + f.write("this file should not be ignored") + + subdir = os.path.join(base_dir, 'ignored', 'subdir') + os.makedirs(subdir) + with open(os.path.join(subdir, 'file'), 'w') as f: + f.write("this file should be ignored") + + tag = 'docker-py-test-build-with-dockerignore' + stream = self.client.build( + path=base_dir, + tag=tag, + ) + for chunk in stream: + pass + + c = self.client.create_container(tag, ['ls', '-1A', '/test']) + self.client.start(c) + self.client.wait(c) + logs = self.client.logs(c) + + if six.PY3: + logs = logs.decode('utf-8') + + self.assertEqual( + list(filter(None, logs.split('\n'))), + ['not-ignored'], + ) diff --git a/tests/integration/container_test.py b/tests/integration/container_test.py new file mode 100644 index 0000000..04a0ef5 --- /dev/null +++ b/tests/integration/container_test.py @@ -0,0 +1,991 @@ +import errno +import os +import shutil +import signal +import struct +import tempfile + +import docker +import pytest +import six + +from . import api_test +from ..base import requires_api_version +from .. import helpers + +BUSYBOX = api_test.BUSYBOX + + +class ListContainersTest(api_test.BaseTestCase): + def test_list_containers(self): + res0 = self.client.containers(all=True) + size = len(res0) + res1 = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res1) + self.client.start(res1['Id']) + self.tmp_containers.append(res1['Id']) + res2 = self.client.containers(all=True) + self.assertEqual(size + 1, len(res2)) + retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])] + self.assertEqual(len(retrieved), 1) + retrieved = retrieved[0] + self.assertIn('Command', retrieved) + self.assertEqual(retrieved['Command'], six.text_type('true')) + self.assertIn('Image', retrieved) + self.assertRegex(retrieved['Image'], r'busybox:.*') + self.assertIn('Status', retrieved) + + +class CreateContainerTest(api_test.BaseTestCase): + def setUp(self): + super(CreateContainerTest, self).setUp() + + self.mount_dest = '/mnt' + + # Get a random pathname - we don't need it to exist locally + self.mount_origin = tempfile.mkdtemp() + shutil.rmtree(self.mount_origin) + + self.filename = 'shared.txt' + + self.run_with_volume( + False, + BUSYBOX, + ['touch', os.path.join(self.mount_dest, self.filename)], + ) + + def test_create(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + + def test_create_with_host_pid_mode(self): + ctnr = self.client.create_container( + BUSYBOX, 'true', host_config=self.client.create_host_config( + pid_mode='host', network_mode='none' + ) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + inspect = self.client.inspect_container(ctnr) + self.assertIn('HostConfig', inspect) + host_config = inspect['HostConfig'] + self.assertIn('PidMode', host_config) + self.assertEqual(host_config['PidMode'], 'host') + + def test_create_with_links(self): + res0 = self.client.create_container( + BUSYBOX, 'cat', + detach=True, stdin_open=True, + environment={'FOO': '1'}) + + container1_id = res0['Id'] + self.tmp_containers.append(container1_id) + + self.client.start(container1_id) + + res1 = self.client.create_container( + BUSYBOX, 'cat', + detach=True, stdin_open=True, + environment={'FOO': '1'}) + + container2_id = res1['Id'] + self.tmp_containers.append(container2_id) + + self.client.start(container2_id) + + # we don't want the first / + link_path1 = self.client.inspect_container(container1_id)['Name'][1:] + link_alias1 = 'mylink1' + link_env_prefix1 = link_alias1.upper() + + link_path2 = self.client.inspect_container(container2_id)['Name'][1:] + link_alias2 = 'mylink2' + link_env_prefix2 = link_alias2.upper() + + res2 = self.client.create_container( + BUSYBOX, 'env', host_config=self.client.create_host_config( + links={link_path1: link_alias1, link_path2: link_alias2}, + network_mode='none' + ) + ) + container3_id = res2['Id'] + self.tmp_containers.append(container3_id) + self.client.start(container3_id) + self.assertEqual(self.client.wait(container3_id), 0) + + logs = self.client.logs(container3_id) + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn('{0}_NAME='.format(link_env_prefix1), logs) + self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs) + self.assertIn('{0}_NAME='.format(link_env_prefix2), logs) + self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs) + + def test_create_with_restart_policy(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '2'], + host_config=self.client.create_host_config( + restart_policy={"Name": "always", "MaximumRetryCount": 0}, + network_mode='none' + ) + ) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + with self.assertRaises(docker.errors.APIError) as exc: + self.client.remove_container(id) + err = exc.exception.response.text + self.assertIn( + 'You cannot remove a running container', err + ) + self.client.remove_container(id, force=True) + + def test_create_container_with_volumes_from(self): + vol_names = ['foobar_vol0', 'foobar_vol1'] + + res0 = self.client.create_container( + BUSYBOX, 'true', name=vol_names[0] + ) + container1_id = res0['Id'] + self.tmp_containers.append(container1_id) + self.client.start(container1_id) + + res1 = self.client.create_container( + BUSYBOX, 'true', name=vol_names[1] + ) + container2_id = res1['Id'] + self.tmp_containers.append(container2_id) + self.client.start(container2_id) + with self.assertRaises(docker.errors.DockerException): + self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True, + volumes_from=vol_names + ) + res2 = self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True, + host_config=self.client.create_host_config( + volumes_from=vol_names, network_mode='none' + ) + ) + container3_id = res2['Id'] + self.tmp_containers.append(container3_id) + self.client.start(container3_id) + + info = self.client.inspect_container(res2['Id']) + self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names) + + def test_create_with_binds_rw(self): + container = self.run_with_volume( + False, + BUSYBOX, + ['ls', self.mount_dest], + ) + logs = self.client.logs(container) + + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn(self.filename, logs) + inspect_data = self.client.inspect_container(container) + self.check_container_data(inspect_data, True) + + def test_create_with_binds_ro(self): + container = self.run_with_volume( + True, + BUSYBOX, + ['ls', self.mount_dest], + ) + logs = self.client.logs(container) + + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn(self.filename, logs) + + inspect_data = self.client.inspect_container(container) + self.check_container_data(inspect_data, False) + + def create_container_readonly_fs(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + ctnr = self.client.create_container( + BUSYBOX, ['mkdir', '/shrine'], + host_config=self.client.create_host_config( + read_only=True, network_mode='none' + ) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + res = self.client.wait(ctnr) + self.assertNotEqual(res, 0) + + def create_container_with_name(self): + res = self.client.create_container(BUSYBOX, 'true', name='foobar') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Name', inspect) + self.assertEqual('/foobar', inspect['Name']) + + def create_container_privileged(self): + res = self.client.create_container( + BUSYBOX, 'true', host_config=self.client.create_host_config( + privileged=True, network_mode='none' + ) + ) + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + # Since Nov 2013, the Privileged flag is no longer part of the + # container's config exposed via the API (safety concerns?). + # + if 'Privileged' in inspect['Config']: + self.assertEqual(inspect['Config']['Privileged'], True) + + def test_create_with_mac_address(self): + mac_address_expected = "02:42:ac:11:00:0a" + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected) + + id = container['Id'] + + self.client.start(container) + res = self.client.inspect_container(container['Id']) + self.assertEqual(mac_address_expected, + res['NetworkSettings']['MacAddress']) + + self.client.kill(id) + + @requires_api_version('1.20') + def test_group_id_ints(self): + container = self.client.create_container( + BUSYBOX, 'id -G', + host_config=self.client.create_host_config(group_add=[1000, 1001]) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.wait(container) + + logs = self.client.logs(container) + if six.PY3: + logs = logs.decode('utf-8') + groups = logs.strip().split(' ') + self.assertIn('1000', groups) + self.assertIn('1001', groups) + + @requires_api_version('1.20') + def test_group_id_strings(self): + container = self.client.create_container( + BUSYBOX, 'id -G', host_config=self.client.create_host_config( + group_add=['1000', '1001'] + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.wait(container) + + logs = self.client.logs(container) + if six.PY3: + logs = logs.decode('utf-8') + + groups = logs.strip().split(' ') + self.assertIn('1000', groups) + self.assertIn('1001', groups) + + def test_valid_log_driver_and_log_opt(self): + log_config = docker.utils.LogConfig( + type='json-file', + config={'max-file': '100'} + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], log_config.type) + self.assertEqual(container_log_config['Config'], log_config.config) + + def test_invalid_log_driver_raises_exception(self): + log_config = docker.utils.LogConfig( + type='asdf-nope', + config={} + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + + expected_msg = "logger: no log driver named 'asdf-nope' is registered" + + with pytest.raises(docker.errors.APIError) as excinfo: + # raises an internal server error 500 + self.client.start(container) + + assert expected_msg in str(excinfo.value) + + @pytest.mark.skipif(True, + reason="https://github.com/docker/docker/issues/15633") + def test_valid_no_log_driver_specified(self): + log_config = docker.utils.LogConfig( + type="", + config={'max-file': '100'} + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], "json-file") + self.assertEqual(container_log_config['Config'], log_config.config) + + def test_valid_no_config_specified(self): + log_config = docker.utils.LogConfig( + type="json-file", + config=None + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], "json-file") + self.assertEqual(container_log_config['Config'], {}) + + def check_container_data(self, inspect_data, rw): + if docker.utils.compare_version('1.20', self.client._version) < 0: + self.assertIn('Volumes', inspect_data) + self.assertIn(self.mount_dest, inspect_data['Volumes']) + self.assertEqual( + self.mount_origin, inspect_data['Volumes'][self.mount_dest] + ) + self.assertIn(self.mount_dest, inspect_data['VolumesRW']) + self.assertFalse(inspect_data['VolumesRW'][self.mount_dest]) + else: + self.assertIn('Mounts', inspect_data) + filtered = list(filter( + lambda x: x['Destination'] == self.mount_dest, + inspect_data['Mounts'] + )) + self.assertEqual(len(filtered), 1) + mount_data = filtered[0] + self.assertEqual(mount_data['Source'], self.mount_origin) + self.assertEqual(mount_data['RW'], rw) + + def run_with_volume(self, ro, *args, **kwargs): + return self.run_container( + *args, + volumes={self.mount_dest: {}}, + host_config=self.client.create_host_config( + binds={ + self.mount_origin: { + 'bind': self.mount_dest, + 'ro': ro, + }, + }, + network_mode='none' + ), + **kwargs + ) + + +@requires_api_version('1.20') +class ArchiveTest(api_test.BaseTestCase): + def test_get_file_archive_from_container(self): + data = 'The Maid and the Pocket Watch of Blood' + ctnr = self.client.create_container( + BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + self.client.start(ctnr) + self.client.wait(ctnr) + with tempfile.NamedTemporaryFile() as destination: + strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') + for d in strm: + destination.write(d) + destination.seek(0) + retrieved_data = helpers.untar_file(destination, 'data.txt') + if six.PY3: + retrieved_data = retrieved_data.decode('utf-8') + self.assertEqual(data, retrieved_data.strip()) + + def test_get_file_stat_from_container(self): + data = 'The Maid and the Pocket Watch of Blood' + ctnr = self.client.create_container( + BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + self.client.start(ctnr) + self.client.wait(ctnr) + strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') + self.assertIn('name', stat) + self.assertEqual(stat['name'], 'data.txt') + self.assertIn('size', stat) + self.assertEqual(stat['size'], len(data)) + + def test_copy_file_to_container(self): + data = b'Deaf To All But The Song' + with tempfile.NamedTemporaryFile() as test_file: + test_file.write(data) + test_file.seek(0) + ctnr = self.client.create_container( + BUSYBOX, + 'cat {0}'.format( + os.path.join('/vol1', os.path.basename(test_file.name)) + ), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + with helpers.simple_tar(test_file.name) as test_tar: + self.client.put_archive(ctnr, '/vol1', test_tar) + self.client.start(ctnr) + self.client.wait(ctnr) + logs = self.client.logs(ctnr) + if six.PY3: + logs = logs.decode('utf-8') + data = data.decode('utf-8') + self.assertEqual(logs.strip(), data) + + def test_copy_directory_to_container(self): + files = ['a.py', 'b.py', 'foo/b.py'] + dirs = ['foo', 'bar'] + base = helpers.make_tree(dirs, files) + ctnr = self.client.create_container( + BUSYBOX, 'ls -p /vol1', volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + with docker.utils.tar(base) as test_tar: + self.client.put_archive(ctnr, '/vol1', test_tar) + self.client.start(ctnr) + self.client.wait(ctnr) + logs = self.client.logs(ctnr) + if six.PY3: + logs = logs.decode('utf-8') + results = logs.strip().split() + self.assertIn('a.py', results) + self.assertIn('b.py', results) + self.assertIn('foo/', results) + self.assertIn('bar/', results) + + +class RenameContainerTest(api_test.BaseTestCase): + def test_rename_container(self): + version = self.client.version()['Version'] + name = 'hong_meiling' + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.rename(res, name) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Name', inspect) + if version == '1.5.0': + self.assertEqual(name, inspect['Name']) + else: + self.assertEqual('/{0}'.format(name), inspect['Name']) + + +class StartContainerTest(api_test.BaseTestCase): + def test_start_container(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + + def test_start_container_with_dict_instead_of_id(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + + def test_run_shlex_commands(self): + commands = [ + 'true', + 'echo "The Young Descendant of Tepes & Septette for the ' + 'Dead Princess"', + 'echo -n "The Young Descendant of Tepes & Septette for the ' + 'Dead Princess"', + '/bin/sh -c "echo Hello World"', + '/bin/sh -c \'echo "Hello World"\'', + 'echo "\"Night of Nights\""', + 'true && echo "Night of Nights"' + ] + for cmd in commands: + container = self.client.create_container(BUSYBOX, cmd) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0, msg=cmd) + + +class WaitTest(api_test.BaseTestCase): + def test_wait(self): + res = self.client.create_container(BUSYBOX, ['sleep', '3']) + id = res['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + inspect = self.client.inspect_container(id) + self.assertIn('Running', inspect['State']) + self.assertEqual(inspect['State']['Running'], False) + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], exitcode) + + def test_wait_with_dict_instead_of_id(self): + res = self.client.create_container(BUSYBOX, ['sleep', '3']) + id = res['Id'] + self.tmp_containers.append(id) + self.client.start(res) + exitcode = self.client.wait(res) + self.assertEqual(exitcode, 0) + inspect = self.client.inspect_container(res) + self.assertIn('Running', inspect['State']) + self.assertEqual(inspect['State']['Running'], False) + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], exitcode) + + +class LogsTest(api_test.BaseTestCase): + def test_logs(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo {0}'.format(snippet) + ) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(id) + self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + def test_logs_tail_option(self): + snippet = '''Line1 +Line2''' + container = self.client.create_container( + BUSYBOX, 'echo "{0}"'.format(snippet) + ) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(id, tail=1) + self.assertEqual(logs, ('Line2\n').encode(encoding='ascii')) + +# def test_logs_streaming(self): +# snippet = 'Flowering Nights (Sakuya Iyazoi)' +# container = self.client.create_container( +# BUSYBOX, 'echo {0}'.format(snippet) +# ) +# id = container['Id'] +# self.client.start(id) +# self.tmp_containers.append(id) +# logs = bytes() if six.PY3 else str() +# for chunk in self.client.logs(id, stream=True): +# logs += chunk + +# exitcode = self.client.wait(id) +# self.assertEqual(exitcode, 0) + +# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + def test_logs_with_dict_instead_of_id(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo {0}'.format(snippet) + ) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(container) + self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + +class DiffTest(api_test.BaseTestCase): + def test_diff(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + diff = self.client.diff(id) + test_diff = [x for x in diff if x.get('Path', None) == '/test'] + self.assertEqual(len(test_diff), 1) + self.assertIn('Kind', test_diff[0]) + self.assertEqual(test_diff[0]['Kind'], 1) + + def test_diff_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + diff = self.client.diff(container) + test_diff = [x for x in diff if x.get('Path', None) == '/test'] + self.assertEqual(len(test_diff), 1) + self.assertIn('Kind', test_diff[0]) + self.assertEqual(test_diff[0]['Kind'], 1) + + +class StopTest(api_test.BaseTestCase): + def test_stop(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.stop(id, timeout=2) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + if api_test.exec_driver_is_native(): + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_stop_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + self.assertIn('Id', container) + id = container['Id'] + self.client.start(container) + self.tmp_containers.append(id) + self.client.stop(container, timeout=2) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + if api_test.exec_driver_is_native(): + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + +class KillTest(api_test.BaseTestCase): + def test_kill(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + if api_test.exec_driver_is_native(): + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_kill_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(container) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + if api_test.exec_driver_is_native(): + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_kill_with_signal(self): + container = self.client.create_container(BUSYBOX, ['sleep', '60']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(id, signal=signal.SIGKILL) + exitcode = self.client.wait(id) + self.assertNotEqual(exitcode, 0) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False, state) + + +class PortTest(api_test.BaseTestCase): + def test_port(self): + + port_bindings = { + '1111': ('127.0.0.1', '4567'), + '2222': ('127.0.0.1', '4568') + } + + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()), + host_config=self.client.create_host_config( + port_bindings=port_bindings, network_mode='bridge' + ) + ) + id = container['Id'] + + self.client.start(container) + + # Call the port function on each biding and compare expected vs actual + for port in port_bindings: + actual_bindings = self.client.port(container, port) + port_binding = actual_bindings.pop() + + ip, host_port = port_binding['HostIp'], port_binding['HostPort'] + + self.assertEqual(ip, port_bindings[port][0]) + self.assertEqual(host_port, port_bindings[port][1]) + + self.client.kill(id) + + +class ContainerTopTest(api_test.BaseTestCase): + def test_top(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60']) + + id = container['Id'] + + self.client.start(container) + res = self.client.top(container['Id']) + self.assertEqual( + res['Titles'], + ['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD'] + ) + self.assertEqual(len(res['Processes']), 1) + self.assertEqual(res['Processes'][0][7], 'sleep 60') + self.client.kill(id) + + def test_top_with_psargs(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60']) + + id = container['Id'] + + self.client.start(container) + res = self.client.top(container['Id'], 'waux') + self.assertEqual( + res['Titles'], + ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS', + 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'], + ) + self.assertEqual(len(res['Processes']), 1) + self.assertEqual(res['Processes'][0][10], 'sleep 60') + self.client.kill(id) + + +class RestartContainerTest(api_test.BaseTestCase): + def test_restart(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + info = self.client.inspect_container(id) + self.assertIn('State', info) + self.assertIn('StartedAt', info['State']) + start_time1 = info['State']['StartedAt'] + self.client.restart(id, timeout=2) + info2 = self.client.inspect_container(id) + self.assertIn('State', info2) + self.assertIn('StartedAt', info2['State']) + start_time2 = info2['State']['StartedAt'] + self.assertNotEqual(start_time1, start_time2) + self.assertIn('Running', info2['State']) + self.assertEqual(info2['State']['Running'], True) + self.client.kill(id) + + def test_restart_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + self.assertIn('Id', container) + id = container['Id'] + self.client.start(container) + self.tmp_containers.append(id) + info = self.client.inspect_container(id) + self.assertIn('State', info) + self.assertIn('StartedAt', info['State']) + start_time1 = info['State']['StartedAt'] + self.client.restart(container, timeout=2) + info2 = self.client.inspect_container(id) + self.assertIn('State', info2) + self.assertIn('StartedAt', info2['State']) + start_time2 = info2['State']['StartedAt'] + self.assertNotEqual(start_time1, start_time2) + self.assertIn('Running', info2['State']) + self.assertEqual(info2['State']['Running'], True) + self.client.kill(id) + + +class RemoveContainerTest(api_test.BaseTestCase): + def test_remove(self): + container = self.client.create_container(BUSYBOX, ['true']) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + self.client.remove_container(id) + containers = self.client.containers(all=True) + res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] + self.assertEqual(len(res), 0) + + def test_remove_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['true']) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + self.client.remove_container(container) + containers = self.client.containers(all=True) + res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] + self.assertEqual(len(res), 0) + + +class AttachContainerTest(api_test.BaseTestCase): + def test_run_container_streaming(self): + container = self.client.create_container(BUSYBOX, '/bin/sh', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + sock = self.client.attach_socket(container, ws=False) + self.assertTrue(sock.fileno() > -1) + + def test_run_container_reading_socket(self): + line = 'hi there and stuff and things, words!' + command = "echo '{0}'".format(line) + container = self.client.create_container(BUSYBOX, command, + detach=True, tty=False) + ident = container['Id'] + self.tmp_containers.append(ident) + + opts = {"stdout": 1, "stream": 1, "logs": 1} + pty_stdout = self.client.attach_socket(ident, opts) + self.client.start(ident) + + recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK) + + def read(n=4096): + """Code stolen from dockerpty to read the socket""" + try: + if hasattr(pty_stdout, 'recv'): + return pty_stdout.recv(n) + return os.read(pty_stdout.fileno(), n) + except EnvironmentError as e: + if e.errno not in recoverable_errors: + raise + + def next_packet_size(): + """Code stolen from dockerpty to get the next packet size""" + data = six.binary_type() + while len(data) < 8: + next_data = read(8 - len(data)) + if not next_data: + return 0 + data = data + next_data + + if data is None: + return 0 + + if len(data) == 8: + _, actual = struct.unpack('>BxxxL', data) + return actual + + next_size = next_packet_size() + self.assertEqual(next_size, len(line) + 1) + + data = six.binary_type() + while len(data) < next_size: + next_data = read(next_size - len(data)) + if not next_data: + assert False, "Failed trying to read in the dataz" + data += next_data + self.assertEqual(data.decode('utf-8'), "{0}\n".format(line)) + pty_stdout.close() + + # Prevent segfault at the end of the test run + if hasattr(pty_stdout, "_response"): + del pty_stdout._response + + +class PauseTest(api_test.BaseTestCase): + def test_pause_unpause(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(container) + self.client.pause(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], True) + self.assertIn('Paused', state) + self.assertEqual(state['Paused'], True) + + self.client.unpause(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], True) + self.assertIn('Paused', state) + self.assertEqual(state['Paused'], False) diff --git a/tests/integration/exec_test.py b/tests/integration/exec_test.py new file mode 100644 index 0000000..39883d6 --- /dev/null +++ b/tests/integration/exec_test.py @@ -0,0 +1,106 @@ +import pytest + +from . import api_test + +BUSYBOX = api_test.BUSYBOX + + +class ExecTest(api_test.BaseTestCase): + def test_execute_command(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + res = self.client.exec_create(id, ['echo', 'hello']) + self.assertIn('Id', res) + + exec_log = self.client.exec_start(res) + self.assertEqual(exec_log, b'hello\n') + + def test_exec_command_string(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + res = self.client.exec_create(id, 'echo hello world') + self.assertIn('Id', res) + + exec_log = self.client.exec_start(res) + self.assertEqual(exec_log, b'hello world\n') + + def test_exec_command_as_user(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + res = self.client.exec_create(id, 'whoami', user='default') + self.assertIn('Id', res) + + exec_log = self.client.exec_start(res) + self.assertEqual(exec_log, b'default\n') + + def test_exec_command_as_root(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + res = self.client.exec_create(id, 'whoami') + self.assertIn('Id', res) + + exec_log = self.client.exec_start(res) + self.assertEqual(exec_log, b'root\n') + + def test_exec_command_streaming(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + exec_id = self.client.exec_create(id, ['echo', 'hello\nworld']) + self.assertIn('Id', exec_id) + + res = b'' + for chunk in self.client.exec_start(exec_id, stream=True): + res += chunk + self.assertEqual(res, b'hello\nworld\n') + + def test_exec_inspect(self): + if not api_test.exec_driver_is_native(): + pytest.skip('Exec driver not native') + + container = self.client.create_container(BUSYBOX, 'cat', + detach=True, stdin_open=True) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + + exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist']) + self.assertIn('Id', exec_id) + self.client.exec_start(exec_id) + exec_info = self.client.exec_inspect(exec_id) + self.assertIn('ExitCode', exec_info) + self.assertNotEqual(exec_info['ExitCode'], 0) diff --git a/tests/integration/image_test.py b/tests/integration/image_test.py new file mode 100644 index 0000000..6c186e4 --- /dev/null +++ b/tests/integration/image_test.py @@ -0,0 +1,235 @@ +import contextlib +import json +import shutil +import socket +import tarfile +import tempfile +import threading + +import pytest +import six +from six.moves import BaseHTTPServer +from six.moves import socketserver + + +import docker + +from . import api_test + +BUSYBOX = api_test.BUSYBOX + + +class ListImagesTest(api_test.BaseTestCase): + def test_images(self): + res1 = self.client.images(all=True) + self.assertIn('Id', res1[0]) + res10 = res1[0] + self.assertIn('Created', res10) + self.assertIn('RepoTags', res10) + distinct = [] + for img in res1: + if img['Id'] not in distinct: + distinct.append(img['Id']) + self.assertEqual(len(distinct), self.client.info()['Images']) + + def test_images_quiet(self): + res1 = self.client.images(quiet=True) + self.assertEqual(type(res1[0]), six.text_type) + + +class PullImageTest(api_test.BaseTestCase): + def test_pull(self): + try: + self.client.remove_image('hello-world') + except docker.errors.APIError: + pass + res = self.client.pull('hello-world') + self.tmp_imgs.append('hello-world') + self.assertEqual(type(res), six.text_type) + self.assertGreaterEqual( + len(self.client.images('hello-world')), 1 + ) + img_info = self.client.inspect_image('hello-world') + self.assertIn('Id', img_info) + + def test_pull_streaming(self): + try: + self.client.remove_image('hello-world') + except docker.errors.APIError: + pass + stream = self.client.pull('hello-world', stream=True) + self.tmp_imgs.append('hello-world') + for chunk in stream: + if six.PY3: + chunk = chunk.decode('utf-8') + json.loads(chunk) # ensure chunk is a single, valid JSON blob + self.assertGreaterEqual( + len(self.client.images('hello-world')), 1 + ) + img_info = self.client.inspect_image('hello-world') + self.assertIn('Id', img_info) + + +class CommitTest(api_test.BaseTestCase): + def test_commit(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + res = self.client.commit(id) + self.assertIn('Id', res) + img_id = res['Id'] + self.tmp_imgs.append(img_id) + img = self.client.inspect_image(img_id) + self.assertIn('Container', img) + self.assertTrue(img['Container'].startswith(id)) + self.assertIn('ContainerConfig', img) + self.assertIn('Image', img['ContainerConfig']) + self.assertEqual(BUSYBOX, img['ContainerConfig']['Image']) + busybox_id = self.client.inspect_image(BUSYBOX)['Id'] + self.assertIn('Parent', img) + self.assertEqual(img['Parent'], busybox_id) + + +class RemoveImageTest(api_test.BaseTestCase): + def test_remove(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + res = self.client.commit(id) + self.assertIn('Id', res) + img_id = res['Id'] + self.tmp_imgs.append(img_id) + self.client.remove_image(img_id, force=True) + images = self.client.images(all=True) + res = [x for x in images if x['Id'].startswith(img_id)] + self.assertEqual(len(res), 0) + + +class ImportImageTest(api_test.BaseTestCase): + '''Base class for `docker import` test cases.''' + + TAR_SIZE = 512 * 1024 + + def write_dummy_tar_content(self, n_bytes, tar_fd): + def extend_file(f, n_bytes): + f.seek(n_bytes - 1) + f.write(bytearray([65])) + f.seek(0) + + tar = tarfile.TarFile(fileobj=tar_fd, mode='w') + + with tempfile.NamedTemporaryFile() as f: + extend_file(f, n_bytes) + tarinfo = tar.gettarinfo(name=f.name, arcname='testdata') + tar.addfile(tarinfo, fileobj=f) + + tar.close() + + @contextlib.contextmanager + def dummy_tar_stream(self, n_bytes): + '''Yields a stream that is valid tar data of size n_bytes.''' + with tempfile.NamedTemporaryFile() as tar_file: + self.write_dummy_tar_content(n_bytes, tar_file) + tar_file.seek(0) + yield tar_file + + @contextlib.contextmanager + def dummy_tar_file(self, n_bytes): + '''Yields the name of a valid tar file of size n_bytes.''' + with tempfile.NamedTemporaryFile() as tar_file: + self.write_dummy_tar_content(n_bytes, tar_file) + tar_file.seek(0) + yield tar_file.name + + def test_import_from_bytes(self): + with self.dummy_tar_stream(n_bytes=500) as f: + content = f.read() + + # The generic import_image() function cannot import in-memory bytes + # data that happens to be represented as a string type, because + # import_image() will try to use it as a filename and usually then + # trigger an exception. So we test the import_image_from_data() + # function instead. + statuses = self.client.import_image_from_data( + content, repository='test/import-from-bytes') + + result_text = statuses.splitlines()[-1] + result = json.loads(result_text) + + self.assertNotIn('error', result) + + img_id = result['status'] + self.tmp_imgs.append(img_id) + + def test_import_from_file(self): + with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename: + # statuses = self.client.import_image( + # src=tar_filename, repository='test/import-from-file') + statuses = self.client.import_image_from_file( + tar_filename, repository='test/import-from-file') + + result_text = statuses.splitlines()[-1] + result = json.loads(result_text) + + self.assertNotIn('error', result) + + self.assertIn('status', result) + img_id = result['status'] + self.tmp_imgs.append(img_id) + + def test_import_from_stream(self): + with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream: + statuses = self.client.import_image( + src=tar_stream, repository='test/import-from-stream') + # statuses = self.client.import_image_from_stream( + # tar_stream, repository='test/import-from-stream') + result_text = statuses.splitlines()[-1] + result = json.loads(result_text) + + self.assertNotIn('error', result) + + self.assertIn('status', result) + img_id = result['status'] + self.tmp_imgs.append(img_id) + + @contextlib.contextmanager + def temporary_http_file_server(self, stream): + '''Serve data from an IO stream over HTTP.''' + + class Handler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header('Content-Type', 'application/x-tar') + self.end_headers() + shutil.copyfileobj(stream, self.wfile) + + server = socketserver.TCPServer(('', 0), Handler) + thread = threading.Thread(target=server.serve_forever) + thread.setDaemon(True) + thread.start() + + yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1]) + + server.shutdown() + + @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME") + def test_import_from_url(self): + # The crappy test HTTP server doesn't handle large files well, so use + # a small file. + tar_size = 10240 + + with self.dummy_tar_stream(n_bytes=tar_size) as tar_data: + with self.temporary_http_file_server(tar_data) as url: + statuses = self.client.import_image( + src=url, repository='test/import-from-url') + + result_text = statuses.splitlines()[-1] + result = json.loads(result_text) + + self.assertNotIn('error', result) + + self.assertIn('status', result) + img_id = result['status'] + self.tmp_imgs.append(img_id) diff --git a/tests/integration/network_test.py b/tests/integration/network_test.py new file mode 100644 index 0000000..e009660 --- /dev/null +++ b/tests/integration/network_test.py @@ -0,0 +1,98 @@ +import random + +import docker +import pytest + +from . import api_test +from ..base import requires_api_version + + +@requires_api_version('1.21') +class TestNetworks(api_test.BaseTestCase): + def create_network(self, *args, **kwargs): + net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] + net_id = self.client.create_network(net_name, *args, **kwargs)['id'] + self.tmp_networks.append(net_id) + return (net_name, net_id) + + def test_list_networks(self): + networks = self.client.networks() + initial_size = len(networks) + + net_name, net_id = self.create_network() + + networks = self.client.networks() + self.assertEqual(len(networks), initial_size + 1) + self.assertTrue(net_id in [n['id'] for n in networks]) + + networks_by_name = self.client.networks(names=[net_name]) + self.assertEqual([n['id'] for n in networks_by_name], [net_id]) + + networks_by_partial_id = self.client.networks(ids=[net_id[:8]]) + self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id]) + + def test_inspect_network(self): + net_name, net_id = self.create_network() + + net = self.client.inspect_network(net_id) + self.assertEqual(net, { + u'name': net_name, + u'id': net_id, + u'driver': 'bridge', + u'containers': {}, + }) + + def test_create_network_with_host_driver_fails(self): + net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] + + with pytest.raises(docker.errors.APIError): + self.client.create_network(net_name, driver='host') + + def test_remove_network(self): + initial_size = len(self.client.networks()) + + net_name, net_id = self.create_network() + self.assertEqual(len(self.client.networks()), initial_size + 1) + + self.client.remove_network(net_id) + self.assertEqual(len(self.client.networks()), initial_size) + + def test_connect_and_disconnect_container(self): + net_name, net_id = self.create_network() + + container = self.client.create_container('busybox', 'top') + self.tmp_containers.append(container) + self.client.start(container) + + network_data = self.client.inspect_network(net_id) + self.assertFalse(network_data.get('containers')) + + self.client.connect_container_to_network(container, net_id) + network_data = self.client.inspect_network(net_id) + self.assertEqual( + list(network_data['containers'].keys()), + [container['Id']]) + + self.client.disconnect_container_from_network(container, net_id) + network_data = self.client.inspect_network(net_id) + self.assertFalse(network_data.get('containers')) + + def test_connect_on_container_create(self): + net_name, net_id = self.create_network() + + container = self.client.create_container( + image='busybox', + command='top', + host_config=self.client.create_host_config(network_mode=net_name), + ) + self.tmp_containers.append(container) + self.client.start(container) + + network_data = self.client.inspect_network(net_id) + self.assertEqual( + list(network_data['containers'].keys()), + [container['Id']]) + + self.client.disconnect_container_from_network(container, net_id) + network_data = self.client.inspect_network(net_id) + self.assertFalse(network_data.get('containers')) diff --git a/tests/integration/regression_test.py b/tests/integration/regression_test.py new file mode 100644 index 0000000..dac8806 --- /dev/null +++ b/tests/integration/regression_test.py @@ -0,0 +1,69 @@ +import io +import random + +import docker +import six + +from . import api_test + +BUSYBOX = api_test.BUSYBOX + + +class TestRegressions(api_test.BaseTestCase): + def test_443_handle_nonchunked_response_in_stream(self): + dfile = io.BytesIO() + with self.assertRaises(docker.errors.APIError) as exc: + for line in self.client.build(fileobj=dfile, tag="a/b/c"): + pass + self.assertEqual(exc.exception.response.status_code, 500) + dfile.close() + + def test_542_truncate_ids_client_side(self): + self.client.start( + self.client.create_container(BUSYBOX, ['true']) + ) + result = self.client.containers(all=True, trunc=True) + self.assertEqual(len(result[0]['Id']), 12) + + def test_647_support_doubleslash_in_image_names(self): + with self.assertRaises(docker.errors.APIError): + self.client.inspect_image('gensokyo.jp//kirisame') + + def test_649_handle_timeout_value_none(self): + self.client.timeout = None + ctnr = self.client.create_container(BUSYBOX, ['sleep', '2']) + self.client.start(ctnr) + self.client.stop(ctnr) + + def test_715_handle_user_param_as_int_value(self): + ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000) + self.client.start(ctnr) + self.client.wait(ctnr) + logs = self.client.logs(ctnr) + if six.PY3: + logs = logs.decode('utf-8') + assert logs == '1000\n' + + def test_792_explicit_port_protocol(self): + + tcp_port, udp_port = random.sample(range(9999, 32000), 2) + ctnr = self.client.create_container( + BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')], + host_config=self.client.create_host_config( + port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port} + ) + ) + self.tmp_containers.append(ctnr) + self.client.start(ctnr) + self.assertEqual( + self.client.port(ctnr, 2000)[0]['HostPort'], + six.text_type(tcp_port) + ) + self.assertEqual( + self.client.port(ctnr, '2000/tcp')[0]['HostPort'], + six.text_type(tcp_port) + ) + self.assertEqual( + self.client.port(ctnr, '2000/udp')[0]['HostPort'], + six.text_type(udp_port) + ) diff --git a/tests/integration/volume_test.py b/tests/integration/volume_test.py new file mode 100644 index 0000000..f1fdf92 --- /dev/null +++ b/tests/integration/volume_test.py @@ -0,0 +1,56 @@ +import docker +import pytest + +from . import api_test +from ..base import requires_api_version + + +@requires_api_version('1.21') +class TestVolumes(api_test.BaseTestCase): + def test_create_volume(self): + name = 'perfectcherryblossom' + self.tmp_volumes.append(name) + result = self.client.create_volume(name) + self.assertIn('Name', result) + self.assertEqual(result['Name'], name) + self.assertIn('Driver', result) + self.assertEqual(result['Driver'], 'local') + + def test_create_volume_invalid_driver(self): + driver_name = 'invalid.driver' + + with pytest.raises(docker.errors.NotFound): + self.client.create_volume('perfectcherryblossom', driver_name) + + def test_list_volumes(self): + name = 'imperishablenight' + self.tmp_volumes.append(name) + volume_info = self.client.create_volume(name) + result = self.client.volumes() + self.assertIn('Volumes', result) + volumes = result['Volumes'] + self.assertIn(volume_info, volumes) + + def test_inspect_volume(self): + name = 'embodimentofscarletdevil' + self.tmp_volumes.append(name) + volume_info = self.client.create_volume(name) + result = self.client.inspect_volume(name) + self.assertEqual(volume_info, result) + + def test_inspect_nonexistent_volume(self): + name = 'embodimentofscarletdevil' + with pytest.raises(docker.errors.NotFound): + self.client.inspect_volume(name) + + def test_remove_volume(self): + name = 'shootthebullet' + self.tmp_volumes.append(name) + self.client.create_volume(name) + result = self.client.remove_volume(name) + self.assertTrue(result) + + def test_remove_nonexistent_volume(self): + name = 'shootthebullet' + with pytest.raises(docker.errors.NotFound): + self.client.remove_volume(name) diff --git a/tests/integration_test.py b/tests/integration_test.py index 85eb8da..73cbf93 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -1,2040 +1,4 @@ -# Copyright 2013 dotCloud inc. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import contextlib -import errno -import json -import io -import os -import random -import shutil -import signal -import socket -import struct -import tarfile -import tempfile -import threading -import time -import unittest -import warnings - -import pytest -import six -from six.moves import BaseHTTPServer -from six.moves import socketserver - -import docker -from docker.errors import APIError, NotFound -from docker.utils import kwargs_from_env - -from . import helpers -from .base import requires_api_version -from .test import Cleanup - - -# FIXME: missing tests for -# export; history; insert; port; push; tag; get; load; stats - -warnings.simplefilter('error') -compare_version = docker.utils.compare_version - -EXEC_DRIVER = [] -BUSYBOX = 'busybox:buildroot-2014.02' - - -def exec_driver_is_native(): - global EXEC_DRIVER - if not EXEC_DRIVER: - c = docker_client() - EXEC_DRIVER = c.info()['ExecutionDriver'] - c.close() - return EXEC_DRIVER.startswith('native') - - -def docker_client(**kwargs): - return docker.Client(**docker_client_kwargs(**kwargs)) - - -def docker_client_kwargs(**kwargs): - client_kwargs = kwargs_from_env(assert_hostname=False) - client_kwargs.update(kwargs) - return client_kwargs - - -def setup_module(): - c = docker_client() - try: - c.inspect_image(BUSYBOX) - except NotFound: - os.write(2, "\npulling busybox\n".encode('utf-8')) - for data in c.pull(BUSYBOX, stream=True): - data = json.loads(data.decode('utf-8')) - os.write(2, ("%c[2K\r" % 27).encode('utf-8')) - status = data.get("status") - progress = data.get("progress") - detail = "{0} - {1}".format(status, progress).encode('utf-8') - os.write(2, detail) - os.write(2, "\npulled busybox\n".encode('utf-8')) - - # Double make sure we now have busybox - c.inspect_image(BUSYBOX) - c.close() - - -class BaseTestCase(unittest.TestCase): - tmp_imgs = [] - tmp_containers = [] - tmp_folders = [] - tmp_volumes = [] - - def setUp(self): - if six.PY2: - self.assertRegex = self.assertRegexpMatches - self.assertCountEqual = self.assertItemsEqual - self.client = docker_client(timeout=60) - self.tmp_imgs = [] - self.tmp_containers = [] - self.tmp_folders = [] - self.tmp_volumes = [] - self.tmp_networks = [] - - def tearDown(self): - for img in self.tmp_imgs: - try: - self.client.remove_image(img) - except docker.errors.APIError: - pass - for container in self.tmp_containers: - try: - self.client.stop(container, timeout=1) - self.client.remove_container(container) - except docker.errors.APIError: - pass - for network in self.tmp_networks: - try: - self.client.remove_network(network) - except docker.errors.APIError: - pass - for folder in self.tmp_folders: - shutil.rmtree(folder) - - for volume in self.tmp_volumes: - try: - self.client.remove_volume(volume) - except docker.errors.APIError: - pass - - self.client.close() - - def run_container(self, *args, **kwargs): - container = self.client.create_container(*args, **kwargs) - self.tmp_containers.append(container) - self.client.start(container) - exitcode = self.client.wait(container) - - if exitcode != 0: - output = self.client.logs(container) - raise Exception( - "Container exited with code {}:\n{}" - .format(exitcode, output)) - - return container - - -######################### -# INFORMATION TESTS # -######################### - - -class TestVersion(BaseTestCase): - def runTest(self): - res = self.client.version() - self.assertIn('GoVersion', res) - self.assertIn('Version', res) - self.assertEqual(len(res['Version'].split('.')), 3) - - -class TestInfo(BaseTestCase): - def runTest(self): - res = self.client.info() - self.assertIn('Containers', res) - self.assertIn('Images', res) - self.assertIn('Debug', res) - - -class TestSearch(BaseTestCase): - def runTest(self): - self.client = docker_client(timeout=10) - res = self.client.search('busybox') - self.assertTrue(len(res) >= 1) - base_img = [x for x in res if x['name'] == 'busybox'] - self.assertEqual(len(base_img), 1) - self.assertIn('description', base_img[0]) - -################### -# LISTING TESTS # -################### - - -class TestImages(BaseTestCase): - def runTest(self): - res1 = self.client.images(all=True) - self.assertIn('Id', res1[0]) - res10 = res1[0] - self.assertIn('Created', res10) - self.assertIn('RepoTags', res10) - distinct = [] - for img in res1: - if img['Id'] not in distinct: - distinct.append(img['Id']) - self.assertEqual(len(distinct), self.client.info()['Images']) - - -class TestImageIds(BaseTestCase): - def runTest(self): - res1 = self.client.images(quiet=True) - self.assertEqual(type(res1[0]), six.text_type) - - -class TestListContainers(BaseTestCase): - def runTest(self): - res0 = self.client.containers(all=True) - size = len(res0) - res1 = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res1) - self.client.start(res1['Id']) - self.tmp_containers.append(res1['Id']) - res2 = self.client.containers(all=True) - self.assertEqual(size + 1, len(res2)) - retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])] - self.assertEqual(len(retrieved), 1) - retrieved = retrieved[0] - self.assertIn('Command', retrieved) - self.assertEqual(retrieved['Command'], six.text_type('true')) - self.assertIn('Image', retrieved) - self.assertRegex(retrieved['Image'], r'busybox:.*') - self.assertIn('Status', retrieved) - -##################### -# CONTAINER TESTS # -##################### - - -class TestCreateContainer(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - - -class TestCreateContainerWithBinds(BaseTestCase): - def setUp(self): - super(TestCreateContainerWithBinds, self).setUp() - - self.mount_dest = '/mnt' - - # Get a random pathname - we don't need it to exist locally - self.mount_origin = tempfile.mkdtemp() - shutil.rmtree(self.mount_origin) - - self.filename = 'shared.txt' - - self.run_with_volume( - False, - BUSYBOX, - ['touch', os.path.join(self.mount_dest, self.filename)], - ) - - def run_with_volume(self, ro, *args, **kwargs): - return self.run_container( - *args, - volumes={self.mount_dest: {}}, - host_config=self.client.create_host_config( - binds={ - self.mount_origin: { - 'bind': self.mount_dest, - 'ro': ro, - }, - }, - network_mode='none' - ), - **kwargs - ) - - def test_rw(self): - container = self.run_with_volume( - False, - BUSYBOX, - ['ls', self.mount_dest], - ) - logs = self.client.logs(container) - - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn(self.filename, logs) - inspect_data = self.client.inspect_container(container) - self.check_container_data(inspect_data, True) - - def test_ro(self): - container = self.run_with_volume( - True, - BUSYBOX, - ['ls', self.mount_dest], - ) - logs = self.client.logs(container) - - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn(self.filename, logs) - - inspect_data = self.client.inspect_container(container) - self.check_container_data(inspect_data, False) - - def check_container_data(self, inspect_data, rw): - if docker.utils.compare_version('1.20', self.client._version) < 0: - self.assertIn('Volumes', inspect_data) - self.assertIn(self.mount_dest, inspect_data['Volumes']) - self.assertEqual( - self.mount_origin, inspect_data['Volumes'][self.mount_dest] - ) - self.assertIn(self.mount_dest, inspect_data['VolumesRW']) - self.assertFalse(inspect_data['VolumesRW'][self.mount_dest]) - else: - self.assertIn('Mounts', inspect_data) - filtered = list(filter( - lambda x: x['Destination'] == self.mount_dest, - inspect_data['Mounts'] - )) - self.assertEqual(len(filtered), 1) - mount_data = filtered[0] - self.assertEqual(mount_data['Source'], self.mount_origin) - self.assertEqual(mount_data['RW'], rw) - - -@requires_api_version('1.20') -class CreateContainerWithGroupAddTest(BaseTestCase): - def test_group_id_ints(self): - container = self.client.create_container( - BUSYBOX, 'id -G', - host_config=self.client.create_host_config(group_add=[1000, 1001]) - ) - self.tmp_containers.append(container) - self.client.start(container) - self.client.wait(container) - - logs = self.client.logs(container) - if six.PY3: - logs = logs.decode('utf-8') - groups = logs.strip().split(' ') - self.assertIn('1000', groups) - self.assertIn('1001', groups) - - def test_group_id_strings(self): - container = self.client.create_container( - BUSYBOX, 'id -G', host_config=self.client.create_host_config( - group_add=['1000', '1001'] - ) - ) - self.tmp_containers.append(container) - self.client.start(container) - self.client.wait(container) - - logs = self.client.logs(container) - if six.PY3: - logs = logs.decode('utf-8') - - groups = logs.strip().split(' ') - self.assertIn('1000', groups) - self.assertIn('1001', groups) - - -class CreateContainerWithLogConfigTest(BaseTestCase): - def test_valid_log_driver_and_log_opt(self): - log_config = docker.utils.LogConfig( - type='json-file', - config={'max-file': '100'} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], log_config.type) - self.assertEqual(container_log_config['Config'], log_config.config) - - def test_invalid_log_driver_raises_exception(self): - log_config = docker.utils.LogConfig( - type='asdf-nope', - config={} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - - expected_msg = "logger: no log driver named 'asdf-nope' is registered" - - with pytest.raises(APIError) as excinfo: - # raises an internal server error 500 - self.client.start(container) - - assert expected_msg in str(excinfo.value) - - @pytest.mark.skipif(True, - reason="https://github.com/docker/docker/issues/15633") - def test_valid_no_log_driver_specified(self): - log_config = docker.utils.LogConfig( - type="", - config={'max-file': '100'} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], "json-file") - self.assertEqual(container_log_config['Config'], log_config.config) - - def test_valid_no_config_specified(self): - log_config = docker.utils.LogConfig( - type="json-file", - config=None - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], "json-file") - self.assertEqual(container_log_config['Config'], {}) - - -@requires_api_version('1.20') -class GetArchiveTest(BaseTestCase): - def test_get_file_archive_from_container(self): - data = 'The Maid and the Pocket Watch of Blood' - ctnr = self.client.create_container( - BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.client.wait(ctnr) - with tempfile.NamedTemporaryFile() as destination: - strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') - for d in strm: - destination.write(d) - destination.seek(0) - retrieved_data = helpers.untar_file(destination, 'data.txt') - if six.PY3: - retrieved_data = retrieved_data.decode('utf-8') - self.assertEqual(data, retrieved_data.strip()) - - def test_get_file_stat_from_container(self): - data = 'The Maid and the Pocket Watch of Blood' - ctnr = self.client.create_container( - BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.client.wait(ctnr) - strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') - self.assertIn('name', stat) - self.assertEqual(stat['name'], 'data.txt') - self.assertIn('size', stat) - self.assertEqual(stat['size'], len(data)) - - -@requires_api_version('1.20') -class PutArchiveTest(BaseTestCase): - def test_copy_file_to_container(self): - data = b'Deaf To All But The Song' - with tempfile.NamedTemporaryFile() as test_file: - test_file.write(data) - test_file.seek(0) - ctnr = self.client.create_container( - BUSYBOX, - 'cat {0}'.format( - os.path.join('/vol1', os.path.basename(test_file.name)) - ), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - with helpers.simple_tar(test_file.name) as test_tar: - self.client.put_archive(ctnr, '/vol1', test_tar) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - data = data.decode('utf-8') - self.assertEqual(logs.strip(), data) - - def test_copy_directory_to_container(self): - files = ['a.py', 'b.py', 'foo/b.py'] - dirs = ['foo', 'bar'] - base = helpers.make_tree(dirs, files) - ctnr = self.client.create_container( - BUSYBOX, 'ls -p /vol1', volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - with docker.utils.tar(base) as test_tar: - self.client.put_archive(ctnr, '/vol1', test_tar) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - results = logs.strip().split() - self.assertIn('a.py', results) - self.assertIn('b.py', results) - self.assertIn('foo/', results) - self.assertIn('bar/', results) - - -class TestCreateContainerReadOnlyFs(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - ctnr = self.client.create_container( - BUSYBOX, ['mkdir', '/shrine'], - host_config=self.client.create_host_config( - read_only=True, network_mode='none' - ) - ) - self.assertIn('Id', ctnr) - self.tmp_containers.append(ctnr['Id']) - self.client.start(ctnr) - res = self.client.wait(ctnr) - self.assertNotEqual(res, 0) - - -class TestCreateContainerWithName(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true', name='foobar') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Name', inspect) - self.assertEqual('/foobar', inspect['Name']) - - -class TestRenameContainer(BaseTestCase): - def runTest(self): - version = self.client.version()['Version'] - name = 'hong_meiling' - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.rename(res, name) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Name', inspect) - if version == '1.5.0': - self.assertEqual(name, inspect['Name']) - else: - self.assertEqual('/{0}'.format(name), inspect['Name']) - - -class TestStartContainer(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - - -class TestStartContainerWithDictInsteadOfId(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - - -class TestCreateContainerPrivileged(BaseTestCase): - def runTest(self): - res = self.client.create_container( - BUSYBOX, 'true', host_config=self.client.create_host_config( - privileged=True, network_mode='none' - ) - ) - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - # Since Nov 2013, the Privileged flag is no longer part of the - # container's config exposed via the API (safety concerns?). - # - if 'Privileged' in inspect['Config']: - self.assertEqual(inspect['Config']['Privileged'], True) - - -class TestWait(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, ['sleep', '3']) - id = res['Id'] - self.tmp_containers.append(id) - self.client.start(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - inspect = self.client.inspect_container(id) - self.assertIn('Running', inspect['State']) - self.assertEqual(inspect['State']['Running'], False) - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], exitcode) - - -class TestWaitWithDictInsteadOfId(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, ['sleep', '3']) - id = res['Id'] - self.tmp_containers.append(id) - self.client.start(res) - exitcode = self.client.wait(res) - self.assertEqual(exitcode, 0) - inspect = self.client.inspect_container(res) - self.assertIn('Running', inspect['State']) - self.assertEqual(inspect['State']['Running'], False) - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], exitcode) - - -class TestLogs(BaseTestCase): - def runTest(self): - snippet = 'Flowering Nights (Sakuya Iyazoi)' - container = self.client.create_container( - BUSYBOX, 'echo {0}'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(id) - self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestLogsWithTailOption(BaseTestCase): - def runTest(self): - snippet = '''Line1 -Line2''' - container = self.client.create_container( - BUSYBOX, 'echo "{0}"'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(id, tail=1) - self.assertEqual(logs, ('Line2\n').encode(encoding='ascii')) - - -# class TestLogsStreaming(BaseTestCase): -# def runTest(self): -# snippet = 'Flowering Nights (Sakuya Iyazoi)' -# container = self.client.create_container( -# BUSYBOX, 'echo {0}'.format(snippet) -# ) -# id = container['Id'] -# self.client.start(id) -# self.tmp_containers.append(id) -# logs = bytes() if six.PY3 else str() -# for chunk in self.client.logs(id, stream=True): -# logs += chunk - -# exitcode = self.client.wait(id) -# self.assertEqual(exitcode, 0) - -# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestLogsWithDictInsteadOfId(BaseTestCase): - def runTest(self): - snippet = 'Flowering Nights (Sakuya Iyazoi)' - container = self.client.create_container( - BUSYBOX, 'echo {0}'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(container) - self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestDiff(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - diff = self.client.diff(id) - test_diff = [x for x in diff if x.get('Path', None) == '/test'] - self.assertEqual(len(test_diff), 1) - self.assertIn('Kind', test_diff[0]) - self.assertEqual(test_diff[0]['Kind'], 1) - - -class TestDiffWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - diff = self.client.diff(container) - test_diff = [x for x in diff if x.get('Path', None) == '/test'] - self.assertEqual(len(test_diff), 1) - self.assertIn('Kind', test_diff[0]) - self.assertEqual(test_diff[0]['Kind'], 1) - - -class TestStop(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.stop(id, timeout=2) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestStopWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - self.assertIn('Id', container) - id = container['Id'] - self.client.start(container) - self.tmp_containers.append(id) - self.client.stop(container, timeout=2) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKill(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKillWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(container) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKillWithSignal(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '60']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(id, signal=signal.SIGKILL) - exitcode = self.client.wait(id) - self.assertNotEqual(exitcode, 0) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False, state) - - -class TestPort(BaseTestCase): - def runTest(self): - - port_bindings = { - '1111': ('127.0.0.1', '4567'), - '2222': ('127.0.0.1', '4568') - } - - container = self.client.create_container( - BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()), - host_config=self.client.create_host_config( - port_bindings=port_bindings, network_mode='bridge' - ) - ) - id = container['Id'] - - self.client.start(container) - - # Call the port function on each biding and compare expected vs actual - for port in port_bindings: - actual_bindings = self.client.port(container, port) - port_binding = actual_bindings.pop() - - ip, host_port = port_binding['HostIp'], port_binding['HostPort'] - - self.assertEqual(ip, port_bindings[port][0]) - self.assertEqual(host_port, port_bindings[port][1]) - - self.client.kill(id) - - -class TestMacAddress(BaseTestCase): - def runTest(self): - mac_address_expected = "02:42:ac:11:00:0a" - container = self.client.create_container( - BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected) - - id = container['Id'] - - self.client.start(container) - res = self.client.inspect_container(container['Id']) - self.assertEqual(mac_address_expected, - res['NetworkSettings']['MacAddress']) - - self.client.kill(id) - - -class TestContainerTop(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '60']) - - id = container['Id'] - - self.client.start(container) - res = self.client.top(container['Id']) - self.assertEqual( - res['Titles'], - ['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD'] - ) - self.assertEqual(len(res['Processes']), 1) - self.assertEqual(res['Processes'][0][7], 'sleep 60') - self.client.kill(id) - - -class TestContainerTopWithPsArgs(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '60']) - - id = container['Id'] - - self.client.start(container) - res = self.client.top(container['Id'], 'waux') - self.assertEqual( - res['Titles'], - ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS', - 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'], - ) - self.assertEqual(len(res['Processes']), 1) - self.assertEqual(res['Processes'][0][10], 'sleep 60') - self.client.kill(id) - - -class TestRestart(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - info = self.client.inspect_container(id) - self.assertIn('State', info) - self.assertIn('StartedAt', info['State']) - start_time1 = info['State']['StartedAt'] - self.client.restart(id, timeout=2) - info2 = self.client.inspect_container(id) - self.assertIn('State', info2) - self.assertIn('StartedAt', info2['State']) - start_time2 = info2['State']['StartedAt'] - self.assertNotEqual(start_time1, start_time2) - self.assertIn('Running', info2['State']) - self.assertEqual(info2['State']['Running'], True) - self.client.kill(id) - - -class TestRestartWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - self.assertIn('Id', container) - id = container['Id'] - self.client.start(container) - self.tmp_containers.append(id) - info = self.client.inspect_container(id) - self.assertIn('State', info) - self.assertIn('StartedAt', info['State']) - start_time1 = info['State']['StartedAt'] - self.client.restart(container, timeout=2) - info2 = self.client.inspect_container(id) - self.assertIn('State', info2) - self.assertIn('StartedAt', info2['State']) - start_time2 = info2['State']['StartedAt'] - self.assertNotEqual(start_time1, start_time2) - self.assertIn('Running', info2['State']) - self.assertEqual(info2['State']['Running'], True) - self.client.kill(id) - - -class TestRemoveContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['true']) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - self.client.remove_container(id) - containers = self.client.containers(all=True) - res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] - self.assertEqual(len(res), 0) - - -class TestRemoveContainerWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['true']) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - self.client.remove_container(container) - containers = self.client.containers(all=True) - res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] - self.assertEqual(len(res), 0) - - -class TestCreateContainerWithVolumesFrom(BaseTestCase): - def runTest(self): - vol_names = ['foobar_vol0', 'foobar_vol1'] - - res0 = self.client.create_container( - BUSYBOX, 'true', name=vol_names[0] - ) - container1_id = res0['Id'] - self.tmp_containers.append(container1_id) - self.client.start(container1_id) - - res1 = self.client.create_container( - BUSYBOX, 'true', name=vol_names[1] - ) - container2_id = res1['Id'] - self.tmp_containers.append(container2_id) - self.client.start(container2_id) - with self.assertRaises(docker.errors.DockerException): - self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True, - volumes_from=vol_names - ) - res2 = self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True, - host_config=self.client.create_host_config( - volumes_from=vol_names, network_mode='none' - ) - ) - container3_id = res2['Id'] - self.tmp_containers.append(container3_id) - self.client.start(container3_id) - - info = self.client.inspect_container(res2['Id']) - self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names) - - -class TestCreateContainerWithLinks(BaseTestCase): - def runTest(self): - res0 = self.client.create_container( - BUSYBOX, 'cat', - detach=True, stdin_open=True, - environment={'FOO': '1'}) - - container1_id = res0['Id'] - self.tmp_containers.append(container1_id) - - self.client.start(container1_id) - - res1 = self.client.create_container( - BUSYBOX, 'cat', - detach=True, stdin_open=True, - environment={'FOO': '1'}) - - container2_id = res1['Id'] - self.tmp_containers.append(container2_id) - - self.client.start(container2_id) - - # we don't want the first / - link_path1 = self.client.inspect_container(container1_id)['Name'][1:] - link_alias1 = 'mylink1' - link_env_prefix1 = link_alias1.upper() - - link_path2 = self.client.inspect_container(container2_id)['Name'][1:] - link_alias2 = 'mylink2' - link_env_prefix2 = link_alias2.upper() - - res2 = self.client.create_container( - BUSYBOX, 'env', host_config=self.client.create_host_config( - links={link_path1: link_alias1, link_path2: link_alias2}, - network_mode='none' - ) - ) - container3_id = res2['Id'] - self.tmp_containers.append(container3_id) - self.client.start(container3_id) - self.assertEqual(self.client.wait(container3_id), 0) - - logs = self.client.logs(container3_id) - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn('{0}_NAME='.format(link_env_prefix1), logs) - self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs) - self.assertIn('{0}_NAME='.format(link_env_prefix2), logs) - self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs) - - -class TestRestartingContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '2'], - host_config=self.client.create_host_config( - restart_policy={"Name": "always", "MaximumRetryCount": 0}, - network_mode='none' - ) - ) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - with self.assertRaises(docker.errors.APIError) as exc: - self.client.remove_container(id) - err = exc.exception.response.text - self.assertIn( - 'You cannot remove a running container', err - ) - self.client.remove_container(id, force=True) - - -class TestExecuteCommand(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, ['echo', 'hello']) - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'hello\n') - - -class TestExecuteCommandString(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'echo hello world') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'hello world\n') - - -class TestExecuteCommandStringAsUser(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'whoami', user='default') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'default\n') - - -class TestExecuteCommandStringAsRoot(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'whoami') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'root\n') - - -class TestExecuteCommandStreaming(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - exec_id = self.client.exec_create(id, ['echo', 'hello\nworld']) - self.assertIn('Id', exec_id) - - res = b'' - for chunk in self.client.exec_start(exec_id, stream=True): - res += chunk - self.assertEqual(res, b'hello\nworld\n') - - -class TestExecInspect(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist']) - self.assertIn('Id', exec_id) - self.client.exec_start(exec_id) - exec_info = self.client.exec_inspect(exec_id) - self.assertIn('ExitCode', exec_info) - self.assertNotEqual(exec_info['ExitCode'], 0) - - -class TestRunContainerStreaming(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, '/bin/sh', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - sock = self.client.attach_socket(container, ws=False) - self.assertTrue(sock.fileno() > -1) - - -class TestRunContainerReadingSocket(BaseTestCase): - def runTest(self): - line = 'hi there and stuff and things, words!' - command = "echo '{0}'".format(line) - container = self.client.create_container(BUSYBOX, command, - detach=True, tty=False) - ident = container['Id'] - self.tmp_containers.append(ident) - - opts = {"stdout": 1, "stream": 1, "logs": 1} - pty_stdout = self.client.attach_socket(ident, opts) - self.client.start(ident) - - recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK) - - def read(n=4096): - """Code stolen from dockerpty to read the socket""" - try: - if hasattr(pty_stdout, 'recv'): - return pty_stdout.recv(n) - return os.read(pty_stdout.fileno(), n) - except EnvironmentError as e: - if e.errno not in recoverable_errors: - raise - - def next_packet_size(): - """Code stolen from dockerpty to get the next packet size""" - data = six.binary_type() - while len(data) < 8: - next_data = read(8 - len(data)) - if not next_data: - return 0 - data = data + next_data - - if data is None: - return 0 - - if len(data) == 8: - _, actual = struct.unpack('>BxxxL', data) - return actual - - next_size = next_packet_size() - self.assertEqual(next_size, len(line)+1) - - data = six.binary_type() - while len(data) < next_size: - next_data = read(next_size - len(data)) - if not next_data: - assert False, "Failed trying to read in the dataz" - data += next_data - self.assertEqual(data.decode('utf-8'), "{0}\n".format(line)) - pty_stdout.close() - - # Prevent segfault at the end of the test run - if hasattr(pty_stdout, "_response"): - del pty_stdout._response - - -class TestPauseUnpauseContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.tmp_containers.append(id) - self.client.start(container) - self.client.pause(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], True) - self.assertIn('Paused', state) - self.assertEqual(state['Paused'], True) - - self.client.unpause(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], True) - self.assertIn('Paused', state) - self.assertEqual(state['Paused'], False) - - -class TestCreateContainerWithHostPidMode(BaseTestCase): - def runTest(self): - ctnr = self.client.create_container( - BUSYBOX, 'true', host_config=self.client.create_host_config( - pid_mode='host', network_mode='none' - ) - ) - self.assertIn('Id', ctnr) - self.tmp_containers.append(ctnr['Id']) - self.client.start(ctnr) - inspect = self.client.inspect_container(ctnr) - self.assertIn('HostConfig', inspect) - host_config = inspect['HostConfig'] - self.assertIn('PidMode', host_config) - self.assertEqual(host_config['PidMode'], 'host') - - -################# -# LINKS TESTS # -################# - - -class TestRemoveLink(BaseTestCase): - def runTest(self): - # Create containers - container1 = self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True - ) - container1_id = container1['Id'] - self.tmp_containers.append(container1_id) - self.client.start(container1_id) - - # Create Link - # we don't want the first / - link_path = self.client.inspect_container(container1_id)['Name'][1:] - link_alias = 'mylink' - - container2 = self.client.create_container( - BUSYBOX, 'cat', host_config=self.client.create_host_config( - links={link_path: link_alias}, network_mode='none' - ) - ) - container2_id = container2['Id'] - self.tmp_containers.append(container2_id) - self.client.start(container2_id) - - # Remove link - linked_name = self.client.inspect_container(container2_id)['Name'][1:] - link_name = '%s/%s' % (linked_name, link_alias) - self.client.remove_container(link_name, link=True) - - # Link is gone - containers = self.client.containers(all=True) - retrieved = [x for x in containers if link_name in x['Names']] - self.assertEqual(len(retrieved), 0) - - # Containers are still there - retrieved = [ - x for x in containers if x['Id'].startswith(container1_id) or - x['Id'].startswith(container2_id) - ] - self.assertEqual(len(retrieved), 2) - -################## -# IMAGES TESTS # -################## - - -class TestPull(BaseTestCase): - def runTest(self): - try: - self.client.remove_image('hello-world') - except docker.errors.APIError: - pass - res = self.client.pull('hello-world') - self.tmp_imgs.append('hello-world') - self.assertEqual(type(res), six.text_type) - self.assertGreaterEqual( - len(self.client.images('hello-world')), 1 - ) - img_info = self.client.inspect_image('hello-world') - self.assertIn('Id', img_info) - - -class TestPullStream(BaseTestCase): - def runTest(self): - try: - self.client.remove_image('hello-world') - except docker.errors.APIError: - pass - stream = self.client.pull('hello-world', stream=True) - self.tmp_imgs.append('hello-world') - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - json.loads(chunk) # ensure chunk is a single, valid JSON blob - self.assertGreaterEqual( - len(self.client.images('hello-world')), 1 - ) - img_info = self.client.inspect_image('hello-world') - self.assertIn('Id', img_info) - - -class TestCommit(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - res = self.client.commit(id) - self.assertIn('Id', res) - img_id = res['Id'] - self.tmp_imgs.append(img_id) - img = self.client.inspect_image(img_id) - self.assertIn('Container', img) - self.assertTrue(img['Container'].startswith(id)) - self.assertIn('ContainerConfig', img) - self.assertIn('Image', img['ContainerConfig']) - self.assertEqual(BUSYBOX, img['ContainerConfig']['Image']) - busybox_id = self.client.inspect_image(BUSYBOX)['Id'] - self.assertIn('Parent', img) - self.assertEqual(img['Parent'], busybox_id) - - -class TestRemoveImage(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - res = self.client.commit(id) - self.assertIn('Id', res) - img_id = res['Id'] - self.tmp_imgs.append(img_id) - self.client.remove_image(img_id, force=True) - images = self.client.images(all=True) - res = [x for x in images if x['Id'].startswith(img_id)] - self.assertEqual(len(res), 0) - - -################## -# IMPORT TESTS # -################## - - -class ImportTestCase(BaseTestCase): - '''Base class for `docker import` test cases.''' - - TAR_SIZE = 512 * 1024 - - def write_dummy_tar_content(self, n_bytes, tar_fd): - def extend_file(f, n_bytes): - f.seek(n_bytes - 1) - f.write(bytearray([65])) - f.seek(0) - - tar = tarfile.TarFile(fileobj=tar_fd, mode='w') - - with tempfile.NamedTemporaryFile() as f: - extend_file(f, n_bytes) - tarinfo = tar.gettarinfo(name=f.name, arcname='testdata') - tar.addfile(tarinfo, fileobj=f) - - tar.close() - - @contextlib.contextmanager - def dummy_tar_stream(self, n_bytes): - '''Yields a stream that is valid tar data of size n_bytes.''' - with tempfile.NamedTemporaryFile() as tar_file: - self.write_dummy_tar_content(n_bytes, tar_file) - tar_file.seek(0) - yield tar_file - - @contextlib.contextmanager - def dummy_tar_file(self, n_bytes): - '''Yields the name of a valid tar file of size n_bytes.''' - with tempfile.NamedTemporaryFile() as tar_file: - self.write_dummy_tar_content(n_bytes, tar_file) - tar_file.seek(0) - yield tar_file.name - - -class TestImportFromBytes(ImportTestCase): - '''Tests importing an image from in-memory byte data.''' - - def runTest(self): - with self.dummy_tar_stream(n_bytes=500) as f: - content = f.read() - - # The generic import_image() function cannot import in-memory bytes - # data that happens to be represented as a string type, because - # import_image() will try to use it as a filename and usually then - # trigger an exception. So we test the import_image_from_data() - # function instead. - statuses = self.client.import_image_from_data( - content, repository='test/import-from-bytes') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromFile(ImportTestCase): - '''Tests importing an image from a tar file on disk.''' - - def runTest(self): - with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename: - # statuses = self.client.import_image( - # src=tar_filename, repository='test/import-from-file') - statuses = self.client.import_image_from_file( - tar_filename, repository='test/import-from-file') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromStream(ImportTestCase): - '''Tests importing an image from a stream containing tar data.''' - - def runTest(self): - with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream: - statuses = self.client.import_image( - src=tar_stream, repository='test/import-from-stream') - # statuses = self.client.import_image_from_stream( - # tar_stream, repository='test/import-from-stream') - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromURL(ImportTestCase): - '''Tests downloading an image over HTTP.''' - - @contextlib.contextmanager - def temporary_http_file_server(self, stream): - '''Serve data from an IO stream over HTTP.''' - - class Handler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - self.send_response(200) - self.send_header('Content-Type', 'application/x-tar') - self.end_headers() - shutil.copyfileobj(stream, self.wfile) - - server = socketserver.TCPServer(('', 0), Handler) - thread = threading.Thread(target=server.serve_forever) - thread.setDaemon(True) - thread.start() - - yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1]) - - server.shutdown() - - @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME") - def runTest(self): - # The crappy test HTTP server doesn't handle large files well, so use - # a small file. - TAR_SIZE = 10240 - - with self.dummy_tar_stream(n_bytes=TAR_SIZE) as tar_data: - with self.temporary_http_file_server(tar_data) as url: - statuses = self.client.import_image( - src=url, repository='test/import-from-url') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -################# -# VOLUMES TESTS # -################# - -@requires_api_version('1.21') -class TestVolumes(BaseTestCase): - def test_create_volume(self): - name = 'perfectcherryblossom' - self.tmp_volumes.append(name) - result = self.client.create_volume(name) - self.assertIn('Name', result) - self.assertEqual(result['Name'], name) - self.assertIn('Driver', result) - self.assertEqual(result['Driver'], 'local') - - def test_create_volume_invalid_driver(self): - driver_name = 'invalid.driver' - - with pytest.raises(docker.errors.NotFound): - self.client.create_volume('perfectcherryblossom', driver_name) - - def test_list_volumes(self): - name = 'imperishablenight' - self.tmp_volumes.append(name) - volume_info = self.client.create_volume(name) - result = self.client.volumes() - self.assertIn('Volumes', result) - volumes = result['Volumes'] - self.assertIn(volume_info, volumes) - - def test_inspect_volume(self): - name = 'embodimentofscarletdevil' - self.tmp_volumes.append(name) - volume_info = self.client.create_volume(name) - result = self.client.inspect_volume(name) - self.assertEqual(volume_info, result) - - def test_inspect_nonexistent_volume(self): - name = 'embodimentofscarletdevil' - with pytest.raises(docker.errors.NotFound): - self.client.inspect_volume(name) - - def test_remove_volume(self): - name = 'shootthebullet' - self.tmp_volumes.append(name) - self.client.create_volume(name) - result = self.client.remove_volume(name) - self.assertTrue(result) - - def test_remove_nonexistent_volume(self): - name = 'shootthebullet' - with pytest.raises(docker.errors.NotFound): - self.client.remove_volume(name) - - -################# -# BUILDER TESTS # -################# - -class TestBuildStream(BaseTestCase): - def runTest(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - stream = self.client.build(fileobj=script, stream=True) - logs = '' - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - json.loads(chunk) # ensure chunk is a single, valid JSON blob - logs += chunk - self.assertNotEqual(logs, '') - - -class TestBuildFromStringIO(BaseTestCase): - def runTest(self): - if six.PY3: - return - script = io.StringIO(six.text_type('\n').join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ])) - stream = self.client.build(fileobj=script, stream=True) - logs = '' - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - logs += chunk - self.assertNotEqual(logs, '') - - -@requires_api_version('1.8') -class TestBuildWithDockerignore(Cleanup, BaseTestCase): - def runTest(self): - base_dir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base_dir) - - with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f: - f.write("\n".join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'ADD . /test', - ])) - - with open(os.path.join(base_dir, '.dockerignore'), 'w') as f: - f.write("\n".join([ - 'ignored', - 'Dockerfile', - '.dockerignore', - '', # empty line - ])) - - with open(os.path.join(base_dir, 'not-ignored'), 'w') as f: - f.write("this file should not be ignored") - - subdir = os.path.join(base_dir, 'ignored', 'subdir') - os.makedirs(subdir) - with open(os.path.join(subdir, 'file'), 'w') as f: - f.write("this file should be ignored") - - tag = 'docker-py-test-build-with-dockerignore' - stream = self.client.build( - path=base_dir, - tag=tag, - ) - for chunk in stream: - pass - - c = self.client.create_container(tag, ['ls', '-1A', '/test']) - self.client.start(c) - self.client.wait(c) - logs = self.client.logs(c) - - if six.PY3: - logs = logs.decode('utf-8') - - self.assertEqual( - list(filter(None, logs.split('\n'))), - ['not-ignored'], - ) - - -####################### -# NETWORK TESTS # -####################### - - -@requires_api_version('1.21') -class TestNetworks(BaseTestCase): - def create_network(self, *args, **kwargs): - net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] - net_id = self.client.create_network(net_name, *args, **kwargs)['id'] - self.tmp_networks.append(net_id) - return (net_name, net_id) - - def test_list_networks(self): - networks = self.client.networks() - initial_size = len(networks) - - net_name, net_id = self.create_network() - - networks = self.client.networks() - self.assertEqual(len(networks), initial_size + 1) - self.assertTrue(net_id in [n['id'] for n in networks]) - - networks_by_name = self.client.networks(names=[net_name]) - self.assertEqual([n['id'] for n in networks_by_name], [net_id]) - - networks_by_partial_id = self.client.networks(ids=[net_id[:8]]) - self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id]) - - def test_inspect_network(self): - net_name, net_id = self.create_network() - - net = self.client.inspect_network(net_id) - self.assertEqual(net, { - u'name': net_name, - u'id': net_id, - u'driver': 'bridge', - u'containers': {}, - }) - - def test_create_network_with_host_driver_fails(self): - net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] - - with pytest.raises(APIError): - self.client.create_network(net_name, driver='host') - - def test_remove_network(self): - initial_size = len(self.client.networks()) - - net_name, net_id = self.create_network() - self.assertEqual(len(self.client.networks()), initial_size + 1) - - self.client.remove_network(net_id) - self.assertEqual(len(self.client.networks()), initial_size) - - def test_connect_and_disconnect_container(self): - net_name, net_id = self.create_network() - - container = self.client.create_container('busybox', 'top') - self.tmp_containers.append(container) - self.client.start(container) - - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - self.client.connect_container_to_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertEqual( - list(network_data['containers'].keys()), - [container['Id']]) - - self.client.disconnect_container_from_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - def test_connect_on_container_create(self): - net_name, net_id = self.create_network() - - container = self.client.create_container( - image='busybox', - command='top', - host_config=self.client.create_host_config(network_mode=net_name), - ) - self.tmp_containers.append(container) - self.client.start(container) - - network_data = self.client.inspect_network(net_id) - self.assertEqual( - list(network_data['containers'].keys()), - [container['Id']]) - - self.client.disconnect_container_from_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - -####################### -# PY SPECIFIC TESTS # -####################### - - -class TestRunShlex(BaseTestCase): - def runTest(self): - commands = [ - 'true', - 'echo "The Young Descendant of Tepes & Septette for the ' - 'Dead Princess"', - 'echo -n "The Young Descendant of Tepes & Septette for the ' - 'Dead Princess"', - '/bin/sh -c "echo Hello World"', - '/bin/sh -c \'echo "Hello World"\'', - 'echo "\"Night of Nights\""', - 'true && echo "Night of Nights"' - ] - for cmd in commands: - container = self.client.create_container(BUSYBOX, cmd) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0, msg=cmd) - - -class TestLoadConfig(BaseTestCase): - def runTest(self): - folder = tempfile.mkdtemp() - self.tmp_folders.append(folder) - cfg_path = os.path.join(folder, '.dockercfg') - f = open(cfg_path, 'w') - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - f.write('auth = {0}\n'.format(auth_)) - f.write('email = sakuya@scarlet.net') - f.close() - cfg = docker.auth.load_config(cfg_path) - self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None) - cfg = cfg[docker.auth.INDEX_NAME] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('Auth'), None) - - -class TestLoadJSONConfig(BaseTestCase): - def runTest(self): - folder = tempfile.mkdtemp() - self.tmp_folders.append(folder) - cfg_path = os.path.join(folder, '.dockercfg') - f = open(os.path.join(folder, '.dockercfg'), 'w') - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - email_ = 'sakuya@scarlet.net' - f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format( - docker.auth.INDEX_URL, auth_, email_)) - f.close() - cfg = docker.auth.load_config(cfg_path) - self.assertNotEqual(cfg[docker.auth.INDEX_URL], None) - cfg = cfg[docker.auth.INDEX_URL] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('Auth'), None) - - -class TestAutoDetectVersion(unittest.TestCase): - def test_client_init(self): - client = docker_client(version='auto') - client_version = client._version - api_version = client.version(api_version=False)['ApiVersion'] - self.assertEqual(client_version, api_version) - api_version_2 = client.version()['ApiVersion'] - self.assertEqual(client_version, api_version_2) - client.close() - - def test_auto_client(self): - client = docker.AutoVersionClient(**docker_client_kwargs()) - client_version = client._version - api_version = client.version(api_version=False)['ApiVersion'] - self.assertEqual(client_version, api_version) - api_version_2 = client.version()['ApiVersion'] - self.assertEqual(client_version, api_version_2) - client.close() - with self.assertRaises(docker.errors.DockerException): - docker.AutoVersionClient(**docker_client_kwargs(version='1.11')) - - -class TestConnectionTimeout(unittest.TestCase): - def setUp(self): - self.timeout = 0.5 - self.client = docker.client.Client(base_url='http://192.168.10.2:4243', - timeout=self.timeout) - - def runTest(self): - start = time.time() - res = None - # This call isn't supposed to complete, and it should fail fast. - try: - res = self.client.inspect_container('id') - except: - pass - end = time.time() - self.assertTrue(res is None) - self.assertTrue(end - start < 2 * self.timeout) - - -class UnixconnTestCase(unittest.TestCase): - """ - Test UNIX socket connection adapter. - """ - - def test_resource_warnings(self): - """ - Test no warnings are produced when using the client. - """ - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - - client = docker_client() - client.images() - client.close() - del client - - assert len(w) == 0, \ - "No warnings produced: {0}".format(w[0].message) - - -#################### -# REGRESSION TESTS # -#################### - -class TestRegressions(BaseTestCase): - def test_443(self): - dfile = io.BytesIO() - with self.assertRaises(docker.errors.APIError) as exc: - for line in self.client.build(fileobj=dfile, tag="a/b/c"): - pass - self.assertEqual(exc.exception.response.status_code, 500) - dfile.close() - - def test_542(self): - self.client.start( - self.client.create_container(BUSYBOX, ['true']) - ) - result = self.client.containers(all=True, trunc=True) - self.assertEqual(len(result[0]['Id']), 12) - - def test_647(self): - with self.assertRaises(docker.errors.APIError): - self.client.inspect_image('gensokyo.jp//kirisame') - - def test_649(self): - self.client.timeout = None - ctnr = self.client.create_container(BUSYBOX, ['sleep', '2']) - self.client.start(ctnr) - self.client.stop(ctnr) - - def test_715(self): - ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - assert logs == '1000\n' - - def test_792_explicit_port_protocol(self): - - tcp_port, udp_port = random.sample(range(9999, 32000), 2) - ctnr = self.client.create_container( - BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')], - host_config=self.client.create_host_config( - port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port} - ) - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.assertEqual( - self.client.port(ctnr, 2000)[0]['HostPort'], - six.text_type(tcp_port) - ) - self.assertEqual( - self.client.port(ctnr, '2000/tcp')[0]['HostPort'], - six.text_type(tcp_port) - ) - self.assertEqual( - self.client.port(ctnr, '2000/udp')[0]['HostPort'], - six.text_type(udp_port) - ) +# FIXME: placeholder while we transition to the new folder architecture +# Remove when merged in master and Jenkins is updated to find the tests +# in the new location. +from .integration import * # flake8: noqa diff --git a/tests/testdata/certs/ca.pem b/tests/unit/__init__.py index e69de29..e69de29 100644 --- a/tests/testdata/certs/ca.pem +++ b/tests/unit/__init__.py diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py new file mode 100644 index 0000000..62d64e8 --- /dev/null +++ b/tests/unit/api_test.py @@ -0,0 +1,418 @@ +# Copyright 2013 dotCloud inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import os +import re +import shutil +import socket +import sys +import tempfile +import threading +import time + +import docker +import requests +import six + +from .. import base +from . import fake_api + +import pytest + +try: + from unittest import mock +except ImportError: + import mock + + +DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS + + +def response(status_code=200, content='', headers=None, reason=None, elapsed=0, + request=None): + res = requests.Response() + res.status_code = status_code + if not isinstance(content, six.binary_type): + content = json.dumps(content).encode('ascii') + res._content = content + res.headers = requests.structures.CaseInsensitiveDict(headers or {}) + res.reason = reason + res.elapsed = datetime.timedelta(elapsed) + res.request = request + return res + + +def fake_resolve_authconfig(authconfig, registry=None): + return None + + +def fake_inspect_container(self, container, tty=False): + return fake_api.get_fake_inspect_container(tty=tty)[1] + + +def fake_resp(method, url, *args, **kwargs): + key = None + if url in fake_api.fake_responses: + key = url + elif (url, method) in fake_api.fake_responses: + key = (url, method) + if not key: + raise Exception('{0} {1}'.format(method, url)) + status_code, content = fake_api.fake_responses[key]() + return response(status_code=status_code, content=content) + + +fake_request = mock.Mock(side_effect=fake_resp) + + +def fake_get(self, url, *args, **kwargs): + return fake_request('GET', url, *args, **kwargs) + + +def fake_post(self, url, *args, **kwargs): + return fake_request('POST', url, *args, **kwargs) + + +def fake_put(self, url, *args, **kwargs): + return fake_request('PUT', url, *args, **kwargs) + + +def fake_delete(self, url, *args, **kwargs): + return fake_request('DELETE', url, *args, **kwargs) + +url_base = 'http+docker://localunixsocket/' +url_prefix = '{0}v{1}/'.format( + url_base, + docker.constants.DEFAULT_DOCKER_API_VERSION) + + +class DockerClientTest(base.Cleanup, base.BaseTestCase): + def setUp(self): + self.patcher = mock.patch.multiple( + 'docker.Client', get=fake_get, post=fake_post, put=fake_put, + delete=fake_delete + ) + self.patcher.start() + self.client = docker.Client() + # Force-clear authconfig to avoid tampering with the tests + self.client._cfg = {'Configs': {}} + + def tearDown(self): + self.client.close() + self.patcher.stop() + + def assertIn(self, object, collection): + if six.PY2 and sys.version_info[1] <= 6: + return self.assertTrue(object in collection) + return super(DockerClientTest, self).assertIn(object, collection) + + def base_create_payload(self, img='busybox', cmd=None): + if not cmd: + cmd = ['true'] + return {"Tty": False, "Image": img, "Cmd": cmd, + "AttachStdin": False, + "AttachStderr": True, "AttachStdout": True, + "StdinOnce": False, + "OpenStdin": False, "NetworkDisabled": False, + } + + +class DockerApiTest(DockerClientTest): + def test_ctor(self): + with pytest.raises(docker.errors.DockerException) as excinfo: + docker.Client(version=1.12) + + self.assertEqual( + str(excinfo.value), + 'Version parameter must be a string or None. Found float' + ) + + def test_url_valid_resource(self): + url = self.client._url('/hello/{0}/world', 'somename') + self.assertEqual( + url, '{0}{1}'.format(url_prefix, 'hello/somename/world') + ) + + url = self.client._url( + '/hello/{0}/world/{1}', 'somename', 'someothername' + ) + self.assertEqual( + url, + '{0}{1}'.format(url_prefix, 'hello/somename/world/someothername') + ) + + url = self.client._url('/hello/{0}/world', '/some?name') + self.assertEqual( + url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world') + ) + + def test_url_invalid_resource(self): + with pytest.raises(ValueError): + self.client._url('/hello/{0}/world', ['sakuya', 'izayoi']) + + def test_url_no_resource(self): + url = self.client._url('/simple') + self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple')) + + def test_url_unversioned_api(self): + url = self.client._url( + '/hello/{0}/world', 'somename', versioned_api=False + ) + self.assertEqual( + url, '{0}{1}'.format(url_base, 'hello/somename/world') + ) + + def test_version(self): + self.client.version() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'version', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_version_no_api_version(self): + self.client.version(False) + + fake_request.assert_called_with( + 'GET', + url_base + 'version', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_retrieve_server_version(self): + client = docker.Client(version="auto") + self.assertTrue(isinstance(client._version, six.string_types)) + self.assertFalse(client._version == "auto") + client.close() + + def test_auto_retrieve_server_version(self): + version = self.client._retrieve_server_version() + self.assertTrue(isinstance(version, six.string_types)) + + def test_info(self): + self.client.info() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'info', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_search(self): + self.client.search('busybox') + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/search', + params={'term': 'busybox'}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_events(self): + self.client.events() + + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={'since': None, 'until': None, 'filters': None}, + stream=True + ) + + def test_events_with_since_until(self): + ts = 1356048000 + now = datetime.datetime.utcfromtimestamp(ts) + since = now - datetime.timedelta(seconds=10) + until = now + datetime.timedelta(seconds=10) + + self.client.events(since=since, until=until) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={ + 'since': ts - 10, + 'until': ts + 10, + 'filters': None + }, + stream=True + ) + + def test_events_with_filters(self): + filters = {'event': ['die', 'stop'], + 'container': fake_api.FAKE_CONTAINER_ID} + + self.client.events(filters=filters) + + expected_filters = docker.utils.convert_filters(filters) + fake_request.assert_called_with( + 'GET', + url_prefix + 'events', + params={ + 'since': None, + 'until': None, + 'filters': expected_filters + }, + stream=True + ) + + def _socket_path_for_client_session(self, client): + socket_adapter = client.get_adapter('http+docker://') + return socket_adapter.socket_path + + def test_url_compatibility_unix(self): + c = docker.Client(base_url="unix://socket") + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_unix_triple_slash(self): + c = docker.Client(base_url="unix:///socket") + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_http_unix_triple_slash(self): + c = docker.Client(base_url="http+unix:///socket") + + assert self._socket_path_for_client_session(c) == '/socket' + + def test_url_compatibility_http(self): + c = docker.Client(base_url="http://hostname:1234") + + assert c.base_url == "http://hostname:1234" + + def test_url_compatibility_tcp(self): + c = docker.Client(base_url="tcp://hostname:1234") + + assert c.base_url == "http://hostname:1234" + + def test_remove_link(self): + self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True) + + fake_request.assert_called_with( + 'DELETE', + url_prefix + 'containers/3cc2351ab11b', + params={'v': False, 'link': True, 'force': False}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_create_host_config_secopt(self): + security_opt = ['apparmor:test_profile'] + result = self.client.create_host_config(security_opt=security_opt) + self.assertIn('SecurityOpt', result) + self.assertEqual(result['SecurityOpt'], security_opt) + self.assertRaises( + docker.errors.DockerException, self.client.create_host_config, + security_opt='wrong' + ) + + +class StreamTest(base.Cleanup, base.BaseTestCase): + def setUp(self): + socket_dir = tempfile.mkdtemp() + self.build_context = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, socket_dir) + self.addCleanup(shutil.rmtree, self.build_context) + self.socket_file = os.path.join(socket_dir, 'test_sock.sock') + self.server_socket = self._setup_socket() + self.stop_server = False + server_thread = threading.Thread(target=self.run_server) + server_thread.setDaemon(True) + server_thread.start() + self.response = None + self.request_handler = None + self.addCleanup(server_thread.join) + self.addCleanup(self.stop) + + def stop(self): + self.stop_server = True + + def _setup_socket(self): + server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_sock.bind(self.socket_file) + # Non-blocking mode so that we can shut the test down easily + server_sock.setblocking(0) + server_sock.listen(5) + return server_sock + + def run_server(self): + try: + while not self.stop_server: + try: + connection, client_address = self.server_socket.accept() + except socket.error: + # Probably no connection to accept yet + time.sleep(0.01) + continue + + connection.setblocking(1) + try: + self.request_handler(connection) + finally: + connection.close() + finally: + self.server_socket.close() + + def early_response_sending_handler(self, connection): + data = b'' + headers = None + + connection.sendall(self.response) + while not headers: + data += connection.recv(2048) + parts = data.split(b'\r\n\r\n', 1) + if len(parts) == 2: + headers, data = parts + + mo = re.search(r'Content-Length: ([0-9]+)', headers.decode()) + assert mo + content_length = int(mo.group(1)) + + while True: + if len(data) >= content_length: + break + + data += connection.recv(2048) + + def test_early_stream_response(self): + self.request_handler = self.early_response_sending_handler + lines = [] + for i in range(0, 50): + line = str(i).encode() + lines += [('%x' % len(line)).encode(), line] + lines.append(b'0') + lines.append(b'') + + self.response = ( + b'HTTP/1.1 200 OK\r\n' + b'Transfer-Encoding: chunked\r\n' + b'\r\n' + ) + b'\r\n'.join(lines) + + with docker.Client(base_url="http+unix://" + self.socket_file) \ + as client: + for i in range(5): + try: + stream = client.build( + path=self.build_context, + stream=True + ) + break + except requests.ConnectionError as e: + if i == 4: + raise e + + self.assertEqual(list(stream), [ + str(i).encode() for i in range(50)]) diff --git a/tests/unit/auth_test.py b/tests/unit/auth_test.py new file mode 100644 index 0000000..3405de6 --- /dev/null +++ b/tests/unit/auth_test.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- + +import base64 +import json +import os +import os.path +import random +import shutil +import tempfile + +from docker import auth + +from .. import base + +try: + from unittest import mock +except ImportError: + import mock + + +class RegressionTest(base.BaseTestCase): + def test_803_urlsafe_encode(self): + auth_data = { + 'username': 'root', + 'password': 'GR?XGR?XGR?XGR?X' + } + encoded = auth.encode_header(auth_data) + assert b'/' not in encoded + assert b'_' in encoded + + +class ResolveAuthTest(base.BaseTestCase): + auth_config = { + 'https://index.docker.io/v1/': {'auth': 'indexuser'}, + 'my.registry.net': {'auth': 'privateuser'}, + 'http://legacy.registry.url/v1/': {'auth': 'legacyauth'} + } + + def test_resolve_repository_name_hub_library_image(self): + self.assertEqual( + auth.resolve_repository_name('image'), + ('index.docker.io', 'image'), + ) + + def test_resolve_repository_name_hub_image(self): + self.assertEqual( + auth.resolve_repository_name('username/image'), + ('index.docker.io', 'username/image'), + ) + + def test_resolve_repository_name_private_registry(self): + self.assertEqual( + auth.resolve_repository_name('my.registry.net/image'), + ('my.registry.net', 'image'), + ) + + def test_resolve_repository_name_private_registry_with_port(self): + self.assertEqual( + auth.resolve_repository_name('my.registry.net:5000/image'), + ('my.registry.net:5000', 'image'), + ) + + def test_resolve_repository_name_private_registry_with_username(self): + self.assertEqual( + auth.resolve_repository_name('my.registry.net/username/image'), + ('my.registry.net', 'username/image'), + ) + + def test_resolve_repository_name_no_dots_but_port(self): + self.assertEqual( + auth.resolve_repository_name('hostname:5000/image'), + ('hostname:5000', 'image'), + ) + + def test_resolve_repository_name_no_dots_but_port_and_username(self): + self.assertEqual( + auth.resolve_repository_name('hostname:5000/username/image'), + ('hostname:5000', 'username/image'), + ) + + def test_resolve_repository_name_localhost(self): + self.assertEqual( + auth.resolve_repository_name('localhost/image'), + ('localhost', 'image'), + ) + + def test_resolve_repository_name_localhost_with_username(self): + self.assertEqual( + auth.resolve_repository_name('localhost/username/image'), + ('localhost', 'username/image'), + ) + + def test_resolve_authconfig_hostname_only(self): + self.assertEqual( + auth.resolve_authconfig(self.auth_config, 'my.registry.net'), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_no_protocol(self): + self.assertEqual( + auth.resolve_authconfig(self.auth_config, 'my.registry.net/v1/'), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_no_path(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net' + ), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_no_path_trailing_slash(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net/' + ), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_no_path_wrong_secure_proto(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'https://my.registry.net' + ), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_no_path_wrong_insecure_proto(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'http://index.docker.io' + ), + {'auth': 'indexuser'} + ) + + def test_resolve_authconfig_path_wrong_proto(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'https://my.registry.net/v1/' + ), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_default_registry(self): + self.assertEqual( + auth.resolve_authconfig(self.auth_config), {'auth': 'indexuser'} + ) + + def test_resolve_authconfig_default_explicit_none(self): + self.assertEqual( + auth.resolve_authconfig(self.auth_config, None), + {'auth': 'indexuser'} + ) + + def test_resolve_authconfig_fully_explicit(self): + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, 'http://my.registry.net/v1/' + ), + {'auth': 'privateuser'} + ) + + def test_resolve_authconfig_legacy_config(self): + self.assertEqual( + auth.resolve_authconfig(self.auth_config, 'legacy.registry.url'), + {'auth': 'legacyauth'} + ) + + def test_resolve_authconfig_no_match(self): + self.assertTrue( + auth.resolve_authconfig(self.auth_config, 'does.not.exist') is None + ) + + def test_resolve_registry_and_auth_library_image(self): + image = 'image' + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + ), + {'auth': 'indexuser'}, + ) + + def test_resolve_registry_and_auth_hub_image(self): + image = 'username/image' + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + ), + {'auth': 'indexuser'}, + ) + + def test_resolve_registry_and_auth_private_registry(self): + image = 'my.registry.net/image' + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + ), + {'auth': 'privateuser'}, + ) + + def test_resolve_registry_and_auth_unauthenticated_registry(self): + image = 'other.registry.net/image' + self.assertEqual( + auth.resolve_authconfig( + self.auth_config, auth.resolve_repository_name(image)[0] + ), + None, + ) + + +class LoadConfigTest(base.Cleanup, base.BaseTestCase): + def test_load_config_no_file(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + cfg = auth.load_config(folder) + self.assertTrue(cfg is not None) + + def test_load_config(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + dockercfg_path = os.path.join(folder, '.dockercfg') + with open(dockercfg_path, 'w') as f: + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + f.write('auth = {0}\n'.format(auth_)) + f.write('email = sakuya@scarlet.net') + cfg = auth.load_config(dockercfg_path) + assert auth.INDEX_NAME in cfg + self.assertNotEqual(cfg[auth.INDEX_NAME], None) + cfg = cfg[auth.INDEX_NAME] + self.assertEqual(cfg['username'], 'sakuya') + self.assertEqual(cfg['password'], 'izayoi') + self.assertEqual(cfg['email'], 'sakuya@scarlet.net') + self.assertEqual(cfg.get('auth'), None) + + def test_load_config_with_random_name(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, + '.{0}.dockercfg'.format( + random.randrange(100000))) + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + config = { + registry: { + 'auth': '{0}'.format(auth_), + 'email': 'sakuya@scarlet.net' + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + cfg = auth.load_config(dockercfg_path) + assert registry in cfg + self.assertNotEqual(cfg[registry], None) + cfg = cfg[registry] + self.assertEqual(cfg['username'], 'sakuya') + self.assertEqual(cfg['password'], 'izayoi') + self.assertEqual(cfg['email'], 'sakuya@scarlet.net') + self.assertEqual(cfg.get('auth'), None) + + def test_load_config_custom_config_env(self): + folder = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, folder) + + dockercfg_path = os.path.join(folder, 'config.json') + registry = 'https://your.private.registry.io' + auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') + config = { + registry: { + 'auth': '{0}'.format(auth_), + 'email': 'sakuya@scarlet.net' + } + } + + with open(dockercfg_path, 'w') as f: + json.dump(config, f) + + with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): + cfg = auth.load_config(None) + assert registry in cfg + self.assertNotEqual(cfg[registry], None) + cfg = cfg[registry] + self.assertEqual(cfg['username'], 'sakuya') + self.assertEqual(cfg['password'], 'izayoi') + self.assertEqual(cfg['email'], 'sakuya@scarlet.net') + self.assertEqual(cfg.get('auth'), None) diff --git a/tests/unit/build_test.py b/tests/unit/build_test.py new file mode 100644 index 0000000..414153e --- /dev/null +++ b/tests/unit/build_test.py @@ -0,0 +1,105 @@ +import gzip +import io + +import docker + +from .api_test import DockerClientTest + + +class BuildTest(DockerClientTest): + def test_build_container(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + + self.client.build(fileobj=script) + + def test_build_container_pull(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + + self.client.build(fileobj=script, pull=True) + + def test_build_container_stream(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + + self.client.build(fileobj=script, stream=True) + + def test_build_container_custom_context(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + context = docker.utils.mkbuildcontext(script) + + self.client.build(fileobj=context, custom_context=True) + + def test_build_container_custom_context_gzip(self): + script = io.BytesIO('\n'.join([ + 'FROM busybox', + 'MAINTAINER docker-py', + 'RUN mkdir -p /tmp/test', + 'EXPOSE 8080', + 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' + ' /tmp/silence.tar.gz' + ]).encode('ascii')) + context = docker.utils.mkbuildcontext(script) + gz_context = gzip.GzipFile(fileobj=context) + + self.client.build( + fileobj=gz_context, + custom_context=True, + encoding="gzip" + ) + + def test_build_remote_with_registry_auth(self): + self.client._auth_configs = { + 'https://example.com': { + 'user': 'example', + 'password': 'example', + 'email': 'example@example.com' + } + } + + self.client.build(path='https://github.com/docker-library/mongo') + + def test_build_container_with_named_dockerfile(self): + self.client.build('.', dockerfile='nameddockerfile') + + def test_build_container_with_container_limits(self): + self.client.build('.', container_limits={ + 'memory': 1024 * 1024, + 'cpusetcpus': 1, + 'cpushares': 1000, + 'memswap': 1024 * 1024 * 8 + }) + + def test_build_container_invalid_container_limits(self): + self.assertRaises( + docker.errors.DockerException, + lambda: self.client.build('.', container_limits={ + 'foo': 'bar' + }) + ) diff --git a/tests/test.py b/tests/unit/container_test.py index 86f1941..eeeba76 100644 --- a/tests/test.py +++ b/tests/unit/container_test.py @@ -1,43 +1,15 @@ -# Copyright 2013 dotCloud inc. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import datetime -import gzip -import io import json -import os -import re -import shutil import signal -import socket -import sys -import tarfile -import tempfile -import threading -import time -import random import docker -import requests +import pytest import six -from . import base from . import fake_api -from .helpers import make_tree - -import pytest +from .api_test import ( + DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS, + fake_inspect_container +) try: from unittest import mock @@ -45,370 +17,154 @@ except ImportError: import mock -DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS - - -def response(status_code=200, content='', headers=None, reason=None, elapsed=0, - request=None): - res = requests.Response() - res.status_code = status_code - if not isinstance(content, six.binary_type): - content = json.dumps(content).encode('ascii') - res._content = content - res.headers = requests.structures.CaseInsensitiveDict(headers or {}) - res.reason = reason - res.elapsed = datetime.timedelta(elapsed) - res.request = request - return res - - -def fake_resolve_authconfig(authconfig, registry=None): - return None - - -def fake_inspect_container(self, container, tty=False): - return fake_api.get_fake_inspect_container(tty=tty)[1] - - def fake_inspect_container_tty(self, container): return fake_inspect_container(self, container, tty=True) -def fake_resp(method, url, *args, **kwargs): - key = None - if url in fake_api.fake_responses: - key = url - elif (url, method) in fake_api.fake_responses: - key = (url, method) - if not key: - raise Exception('{0} {1}'.format(method, url)) - status_code, content = fake_api.fake_responses[key]() - return response(status_code=status_code, content=content) - - -fake_request = mock.Mock(side_effect=fake_resp) - - -def fake_get(self, url, *args, **kwargs): - return fake_request('GET', url, *args, **kwargs) - - -def fake_post(self, url, *args, **kwargs): - return fake_request('POST', url, *args, **kwargs) - - -def fake_put(self, url, *args, **kwargs): - return fake_request('PUT', url, *args, **kwargs) - - -def fake_delete(self, url, *args, **kwargs): - return fake_request('DELETE', url, *args, **kwargs) - -url_base = 'http+docker://localunixsocket/' -url_prefix = '{0}v{1}/'.format( - url_base, - docker.constants.DEFAULT_DOCKER_API_VERSION) - - -class Cleanup(object): - if sys.version_info < (2, 7): - # Provide a basic implementation of addCleanup for Python < 2.7 - def __init__(self, *args, **kwargs): - super(Cleanup, self).__init__(*args, **kwargs) - self._cleanups = [] - - def tearDown(self): - super(Cleanup, self).tearDown() - ok = True - while self._cleanups: - fn, args, kwargs = self._cleanups.pop(-1) - try: - fn(*args, **kwargs) - except KeyboardInterrupt: - raise - except: - ok = False - if not ok: - raise - - def addCleanup(self, function, *args, **kwargs): - self._cleanups.append((function, args, kwargs)) - - -@mock.patch.multiple('docker.Client', get=fake_get, post=fake_post, - put=fake_put, delete=fake_delete) -class DockerClientTest(Cleanup, base.BaseTestCase): - def setUp(self): - self.client = docker.Client() - # Force-clear authconfig to avoid tampering with the tests - self.client._cfg = {'Configs': {}} - - def tearDown(self): - self.client.close() - - def assertIn(self, object, collection): - if six.PY2 and sys.version_info[1] <= 6: - return self.assertTrue(object in collection) - return super(DockerClientTest, self).assertIn(object, collection) - - def base_create_payload(self, img='busybox', cmd=None): - if not cmd: - cmd = ['true'] - return {"Tty": False, "Image": img, "Cmd": cmd, - "AttachStdin": False, - "AttachStderr": True, "AttachStdout": True, - "StdinOnce": False, - "OpenStdin": False, "NetworkDisabled": False, - } - - def test_ctor(self): - with pytest.raises(docker.errors.DockerException) as excinfo: - docker.Client(version=1.12) - - self.assertEqual( - str(excinfo.value), - 'Version parameter must be a string or None. Found float' - ) +class StartContainerTest(DockerClientTest): + def test_start_container(self): + self.client.start(fake_api.FAKE_CONTAINER_ID) - def test_url_valid_resource(self): - url = self.client._url('/hello/{0}/world', 'somename') + args = fake_request.call_args self.assertEqual( - url, '{0}{1}'.format(url_prefix, 'hello/somename/world') - ) - - url = self.client._url( - '/hello/{0}/world/{1}', 'somename', 'someothername' + args[0][1], + url_prefix + 'containers/3cc2351ab11b/start' ) + self.assertEqual(json.loads(args[1]['data']), {}) self.assertEqual( - url, - '{0}{1}'.format(url_prefix, 'hello/somename/world/someothername') + args[1]['headers'], {'Content-Type': 'application/json'} ) - - url = self.client._url('/hello/{0}/world', '/some?name') self.assertEqual( - url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world') + args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS ) - def test_url_invalid_resource(self): - with pytest.raises(ValueError): - self.client._url('/hello/{0}/world', ['sakuya', 'izayoi']) - - def test_url_no_resource(self): - url = self.client._url('/simple') - self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple')) + def test_start_container_none(self): + with pytest.raises(ValueError) as excinfo: + self.client.start(container=None) - def test_url_unversioned_api(self): - url = self.client._url( - '/hello/{0}/world', 'somename', versioned_api=False - ) self.assertEqual( - url, '{0}{1}'.format(url_base, 'hello/somename/world') - ) - - ######################### - # INFORMATION TESTS # - ######################### - def test_version(self): - self.client.version() - - fake_request.assert_called_with( - 'GET', - url_prefix + 'version', - timeout=DEFAULT_TIMEOUT_SECONDS + str(excinfo.value), + 'image or container param is undefined', ) - def test_version_no_api_version(self): - self.client.version(False) + with pytest.raises(ValueError) as excinfo: + self.client.start(None) - fake_request.assert_called_with( - 'GET', - url_base + 'version', - timeout=DEFAULT_TIMEOUT_SECONDS + self.assertEqual( + str(excinfo.value), + 'image or container param is undefined', ) - def test_retrieve_server_version(self): - client = docker.Client(version="auto") - self.assertTrue(isinstance(client._version, six.string_types)) - self.assertFalse(client._version == "auto") - client.close() + def test_start_container_regression_573(self): + self.client.start(**{'container': fake_api.FAKE_CONTAINER_ID}) - def test_auto_retrieve_server_version(self): - version = self.client._retrieve_server_version() - self.assertTrue(isinstance(version, six.string_types)) + def test_start_container_with_lxc_conf(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, + lxc_conf={'lxc.conf.k': 'lxc.conf.value'} + ) - def test_info(self): - self.client.info() + pytest.deprecated_call(call_start) - fake_request.assert_called_with( - 'GET', - url_prefix + 'info', - timeout=DEFAULT_TIMEOUT_SECONDS - ) + def test_start_container_with_lxc_conf_compat(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, + lxc_conf=[{'Key': 'lxc.conf.k', 'Value': 'lxc.conf.value'}] + ) - def test_search(self): - self.client.search('busybox') + pytest.deprecated_call(call_start) - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/search', - params={'term': 'busybox'}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) + def test_start_container_with_binds_ro(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, binds={ + '/tmp': { + "bind": '/mnt', + "ro": True + } + } + ) - def test_image_viz(self): - with pytest.raises(Exception): - self.client.images('busybox', viz=True) - self.fail('Viz output should not be supported!') + pytest.deprecated_call(call_start) - def test_events(self): - self.client.events() + def test_start_container_with_binds_rw(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, binds={ + '/tmp': {"bind": '/mnt', "ro": False} + } + ) - fake_request.assert_called_with( - 'GET', - url_prefix + 'events', - params={'since': None, 'until': None, 'filters': None}, - stream=True - ) + pytest.deprecated_call(call_start) - def test_events_with_since_until(self): - ts = 1356048000 - now = datetime.datetime.utcfromtimestamp(ts) - since = now - datetime.timedelta(seconds=10) - until = now + datetime.timedelta(seconds=10) + def test_start_container_with_port_binds(self): + self.maxDiff = None - self.client.events(since=since, until=until) + def call_start(): + self.client.start(fake_api.FAKE_CONTAINER_ID, port_bindings={ + 1111: None, + 2222: 2222, + '3333/udp': (3333,), + 4444: ('127.0.0.1',), + 5555: ('127.0.0.1', 5555), + 6666: [('127.0.0.1',), ('192.168.0.1',)] + }) - fake_request.assert_called_with( - 'GET', - url_prefix + 'events', - params={ - 'since': ts - 10, - 'until': ts + 10, - 'filters': None - }, - stream=True - ) + pytest.deprecated_call(call_start) - def test_events_with_filters(self): - filters = {'event': ['die', 'stop'], - 'container': fake_api.FAKE_CONTAINER_ID} + def test_start_container_with_links(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, links={'path': 'alias'} + ) - self.client.events(filters=filters) + pytest.deprecated_call(call_start) - expected_filters = docker.utils.convert_filters(filters) - fake_request.assert_called_with( - 'GET', - url_prefix + 'events', - params={ - 'since': None, - 'until': None, - 'filters': expected_filters - }, - stream=True - ) + def test_start_container_with_multiple_links(self): + def call_start(): + self.client.start( + fake_api.FAKE_CONTAINER_ID, + links={ + 'path1': 'alias1', + 'path2': 'alias2' + } + ) - ################### - # LISTING TESTS # - ################### + pytest.deprecated_call(call_start) - def test_images(self): - self.client.images(all=True) + def test_start_container_with_links_as_list_of_tuples(self): + def call_start(): + self.client.start(fake_api.FAKE_CONTAINER_ID, + links=[('path', 'alias')]) - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/json', - params={'filter': None, 'only_ids': 0, 'all': 1}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) + pytest.deprecated_call(call_start) - def test_images_quiet(self): - self.client.images(all=True, quiet=True) + def test_start_container_privileged(self): + def call_start(): + self.client.start(fake_api.FAKE_CONTAINER_ID, privileged=True) - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/json', - params={'filter': None, 'only_ids': 1, 'all': 1}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) + pytest.deprecated_call(call_start) - def test_image_ids(self): - self.client.images(quiet=True) + def test_start_container_with_dict_instead_of_id(self): + self.client.start({'Id': fake_api.FAKE_CONTAINER_ID}) - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/json', - params={'filter': None, 'only_ids': 1, 'all': 0}, - timeout=DEFAULT_TIMEOUT_SECONDS + args = fake_request.call_args + self.assertEqual( + args[0][1], + url_prefix + 'containers/3cc2351ab11b/start' ) - - def test_images_filters(self): - self.client.images(filters={'dangling': True}) - - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/json', - params={'filter': None, 'only_ids': 0, 'all': 0, - 'filters': '{"dangling": ["true"]}'}, - timeout=DEFAULT_TIMEOUT_SECONDS + self.assertEqual(json.loads(args[1]['data']), {}) + self.assertEqual( + args[1]['headers'], {'Content-Type': 'application/json'} ) - - def test_list_containers(self): - self.client.containers(all=True) - - fake_request.assert_called_with( - 'GET', - url_prefix + 'containers/json', - params={ - 'all': 1, - 'since': None, - 'size': 0, - 'limit': -1, - 'trunc_cmd': 0, - 'before': None - }, - timeout=DEFAULT_TIMEOUT_SECONDS + self.assertEqual( + args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS ) - @base.requires_api_version('1.21') - def test_list_networks(self): - networks = [ - { - "name": "none", - "id": "8e4e55c6863ef424", - "type": "null", - "endpoints": [] - }, - { - "name": "host", - "id": "062b6d9ea7913fde", - "type": "host", - "endpoints": [] - }, - ] - - get = mock.Mock(return_value=response( - status_code=200, content=json.dumps(networks).encode('utf-8'))) - - with mock.patch('docker.Client.get', get): - self.assertEqual(self.client.networks(), networks) - - self.assertEqual(get.call_args[0][0], url_prefix + 'networks') - - filters = json.loads(get.call_args[1]['params']['filters']) - self.assertFalse(filters) - - self.client.networks(names=['foo']) - filters = json.loads(get.call_args[1]['params']['filters']) - self.assertEqual(filters, {'name': ['foo']}) - - self.client.networks(ids=['123']) - filters = json.loads(get.call_args[1]['params']['filters']) - self.assertEqual(filters, {'id': ['123']}) - - ##################### - # CONTAINER TESTS # - ##################### +class CreateContainerTest(DockerClientTest): def test_create_container(self): self.client.create_container('busybox', 'true') @@ -709,42 +465,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase): self.client.create_host_config, mem_limit='1f28' ) - def test_start_container(self): - self.client.start(fake_api.FAKE_CONTAINER_ID) - - args = fake_request.call_args - self.assertEqual( - args[0][1], - url_prefix + 'containers/3cc2351ab11b/start' - ) - self.assertEqual(json.loads(args[1]['data']), {}) - self.assertEqual( - args[1]['headers'], {'Content-Type': 'application/json'} - ) - self.assertEqual( - args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS - ) - - def test_start_container_none(self): - with pytest.raises(ValueError) as excinfo: - self.client.start(container=None) - - self.assertEqual( - str(excinfo.value), - 'image or container param is undefined', - ) - - with pytest.raises(ValueError) as excinfo: - self.client.start(None) - - self.assertEqual( - str(excinfo.value), - 'image or container param is undefined', - ) - - def test_start_container_regression_573(self): - self.client.start(**{'container': fake_api.FAKE_CONTAINER_ID}) - def test_create_container_with_lxc_conf(self): self.client.create_container( 'busybox', 'true', host_config=self.client.create_host_config( @@ -1070,111 +790,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase): DEFAULT_TIMEOUT_SECONDS ) - def test_start_container_with_lxc_conf(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, - lxc_conf={'lxc.conf.k': 'lxc.conf.value'} - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_lxc_conf_compat(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, - lxc_conf=[{'Key': 'lxc.conf.k', 'Value': 'lxc.conf.value'}] - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_binds_ro(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, binds={ - '/tmp': { - "bind": '/mnt', - "ro": True - } - } - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_binds_rw(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, binds={ - '/tmp': {"bind": '/mnt', "ro": False} - } - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_port_binds(self): - self.maxDiff = None - - def call_start(): - self.client.start(fake_api.FAKE_CONTAINER_ID, port_bindings={ - 1111: None, - 2222: 2222, - '3333/udp': (3333,), - 4444: ('127.0.0.1',), - 5555: ('127.0.0.1', 5555), - 6666: [('127.0.0.1',), ('192.168.0.1',)] - }) - - pytest.deprecated_call(call_start) - - def test_start_container_with_links(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, links={'path': 'alias'} - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_multiple_links(self): - def call_start(): - self.client.start( - fake_api.FAKE_CONTAINER_ID, - links={ - 'path1': 'alias1', - 'path2': 'alias2' - } - ) - - pytest.deprecated_call(call_start) - - def test_start_container_with_links_as_list_of_tuples(self): - def call_start(): - self.client.start(fake_api.FAKE_CONTAINER_ID, - links=[('path', 'alias')]) - - pytest.deprecated_call(call_start) - - def test_start_container_privileged(self): - def call_start(): - self.client.start(fake_api.FAKE_CONTAINER_ID, privileged=True) - - pytest.deprecated_call(call_start) - - def test_start_container_with_dict_instead_of_id(self): - self.client.start({'Id': fake_api.FAKE_CONTAINER_ID}) - - args = fake_request.call_args - self.assertEqual( - args[0][1], - url_prefix + 'containers/3cc2351ab11b/start' - ) - self.assertEqual(json.loads(args[1]['data']), {}) - self.assertEqual( - args[1]['headers'], {'Content-Type': 'application/json'} - ) - self.assertEqual( - args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS - ) - def test_create_container_with_restart_policy(self): self.client.create_container( 'busybox', 'true', host_config=self.client.create_host_config( @@ -1348,6 +963,25 @@ class DockerClientTest(Cleanup, base.BaseTestCase): DEFAULT_TIMEOUT_SECONDS ) + +class ContainerTest(DockerClientTest): + def test_list_containers(self): + self.client.containers(all=True) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'containers/json', + params={ + 'all': 1, + 'since': None, + 'size': 0, + 'limit': -1, + 'trunc_cmd': 0, + 'before': None + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + def test_resize_container(self): self.client.resize( {'Id': fake_api.FAKE_CONTAINER_ID}, @@ -1393,35 +1027,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase): timeout=None ) - def _socket_path_for_client_session(self, client): - socket_adapter = client.get_adapter('http+docker://') - return socket_adapter.socket_path - - def test_url_compatibility_unix(self): - c = docker.Client(base_url="unix://socket") - - assert self._socket_path_for_client_session(c) == '/socket' - - def test_url_compatibility_unix_triple_slash(self): - c = docker.Client(base_url="unix:///socket") - - assert self._socket_path_for_client_session(c) == '/socket' - - def test_url_compatibility_http_unix_triple_slash(self): - c = docker.Client(base_url="http+unix:///socket") - - assert self._socket_path_for_client_session(c) == '/socket' - - def test_url_compatibility_http(self): - c = docker.Client(base_url="http://hostname:1234") - - assert c.base_url == "http://hostname:1234" - - def test_url_compatibility_tcp(self): - c = docker.Client(base_url="tcp://hostname:1234") - - assert c.base_url == "http://hostname:1234" - def test_logs(self): with mock.patch('docker.Client.inspect_container', fake_inspect_container): @@ -1560,73 +1165,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase): timeout=(DEFAULT_TIMEOUT_SECONDS + timeout) ) - def test_exec_create(self): - self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1']) - - args = fake_request.call_args - self.assertEqual( - 'POST', - args[0][0], url_prefix + 'containers/{0}/exec'.format( - fake_api.FAKE_CONTAINER_ID - ) - ) - - self.assertEqual( - json.loads(args[1]['data']), { - 'Tty': False, - 'AttachStdout': True, - 'Container': fake_api.FAKE_CONTAINER_ID, - 'Cmd': ['ls', '-1'], - 'Privileged': False, - 'AttachStdin': False, - 'AttachStderr': True, - 'User': '' - } - ) - - self.assertEqual(args[1]['headers'], - {'Content-Type': 'application/json'}) - - def test_exec_start(self): - self.client.exec_start(fake_api.FAKE_EXEC_ID) - - args = fake_request.call_args - self.assertEqual( - args[0][1], url_prefix + 'exec/{0}/start'.format( - fake_api.FAKE_EXEC_ID - ) - ) - - self.assertEqual( - json.loads(args[1]['data']), { - 'Tty': False, - 'Detach': False, - } - ) - - self.assertEqual(args[1]['headers'], - {'Content-Type': 'application/json'}) - - def test_exec_inspect(self): - self.client.exec_inspect(fake_api.FAKE_EXEC_ID) - - args = fake_request.call_args - self.assertEqual( - args[0][1], url_prefix + 'exec/{0}/json'.format( - fake_api.FAKE_EXEC_ID - ) - ) - - def test_exec_resize(self): - self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID), - params={'h': 20, 'w': 60}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - def test_pause_container(self): self.client.pause(fake_api.FAKE_CONTAINER_ID) @@ -1715,16 +1253,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase): timeout=DEFAULT_TIMEOUT_SECONDS ) - def test_remove_link(self): - self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True) - - fake_request.assert_called_with( - 'DELETE', - url_prefix + 'containers/3cc2351ab11b', - params={'v': False, 'link': True, 'force': False}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - def test_export(self): self.client.export(fake_api.FAKE_CONTAINER_ID) @@ -1792,851 +1320,3 @@ class DockerClientTest(Cleanup, base.BaseTestCase): params={'ps_args': 'waux'}, timeout=DEFAULT_TIMEOUT_SECONDS ) - - ################## - # IMAGES TESTS # - ################## - - def test_pull(self): - self.client.pull('joffrey/test001') - - args = fake_request.call_args - self.assertEqual( - args[0][1], - url_prefix + 'images/create' - ) - self.assertEqual( - args[1]['params'], - {'tag': None, 'fromImage': 'joffrey/test001'} - ) - self.assertFalse(args[1]['stream']) - - def test_pull_stream(self): - self.client.pull('joffrey/test001', stream=True) - - args = fake_request.call_args - self.assertEqual( - args[0][1], - url_prefix + 'images/create' - ) - self.assertEqual( - args[1]['params'], - {'tag': None, 'fromImage': 'joffrey/test001'} - ) - self.assertTrue(args[1]['stream']) - - def test_commit(self): - self.client.commit(fake_api.FAKE_CONTAINER_ID) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'commit', - data='{}', - headers={'Content-Type': 'application/json'}, - params={ - 'repo': None, - 'comment': None, - 'tag': None, - 'container': '3cc2351ab11b', - 'author': None - }, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_remove_image(self): - self.client.remove_image(fake_api.FAKE_IMAGE_ID) - - fake_request.assert_called_with( - 'DELETE', - url_prefix + 'images/e9aa60c60128', - params={'force': False, 'noprune': False}, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_image_history(self): - self.client.history(fake_api.FAKE_IMAGE_NAME) - - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/test_image/history', - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_import_image(self): - self.client.import_image( - fake_api.FAKE_TARBALL_PATH, - repository=fake_api.FAKE_REPO_NAME, - tag=fake_api.FAKE_TAG_NAME - ) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/create', - params={ - 'repo': fake_api.FAKE_REPO_NAME, - 'tag': fake_api.FAKE_TAG_NAME, - 'fromSrc': fake_api.FAKE_TARBALL_PATH - }, - data=None, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_import_image_from_bytes(self): - stream = (i for i in range(0, 100)) - - self.client.import_image( - stream, - repository=fake_api.FAKE_REPO_NAME, - tag=fake_api.FAKE_TAG_NAME - ) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/create', - params={ - 'repo': fake_api.FAKE_REPO_NAME, - 'tag': fake_api.FAKE_TAG_NAME, - 'fromSrc': '-', - }, - headers={ - 'Content-Type': 'application/tar', - }, - data=stream, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_import_image_from_image(self): - self.client.import_image( - image=fake_api.FAKE_IMAGE_NAME, - repository=fake_api.FAKE_REPO_NAME, - tag=fake_api.FAKE_TAG_NAME - ) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/create', - params={ - 'repo': fake_api.FAKE_REPO_NAME, - 'tag': fake_api.FAKE_TAG_NAME, - 'fromImage': fake_api.FAKE_IMAGE_NAME - }, - data=None, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_inspect_image(self): - self.client.inspect_image(fake_api.FAKE_IMAGE_NAME) - - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/test_image/json', - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_inspect_image_undefined_id(self): - for arg in None, '', {True: True}: - with pytest.raises(docker.errors.NullResource) as excinfo: - self.client.inspect_image(arg) - - self.assertEqual( - excinfo.value.args[0], 'image or container param is undefined' - ) - - def test_insert_image(self): - try: - self.client.insert(fake_api.FAKE_IMAGE_NAME, - fake_api.FAKE_URL, fake_api.FAKE_PATH) - except docker.errors.DeprecatedMethod: - self.assertTrue( - docker.utils.compare_version('1.12', self.client._version) >= 0 - ) - return - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/test_image/insert', - params={ - 'url': fake_api.FAKE_URL, - 'path': fake_api.FAKE_PATH - }, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_push_image(self): - with mock.patch('docker.auth.auth.resolve_authconfig', - fake_resolve_authconfig): - self.client.push(fake_api.FAKE_IMAGE_NAME) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/test_image/push', - params={ - 'tag': None - }, - data='{}', - headers={'Content-Type': 'application/json'}, - stream=False, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_push_image_with_tag(self): - with mock.patch('docker.auth.auth.resolve_authconfig', - fake_resolve_authconfig): - self.client.push( - fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME - ) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/test_image/push', - params={ - 'tag': fake_api.FAKE_TAG_NAME, - }, - data='{}', - headers={'Content-Type': 'application/json'}, - stream=False, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_push_image_stream(self): - with mock.patch('docker.auth.auth.resolve_authconfig', - fake_resolve_authconfig): - self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/test_image/push', - params={ - 'tag': None - }, - data='{}', - headers={'Content-Type': 'application/json'}, - stream=True, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_tag_image(self): - self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/e9aa60c60128/tag', - params={ - 'tag': None, - 'repo': 'repo', - 'force': 0 - }, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_tag_image_tag(self): - self.client.tag( - fake_api.FAKE_IMAGE_ID, - fake_api.FAKE_REPO_NAME, - tag=fake_api.FAKE_TAG_NAME - ) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/e9aa60c60128/tag', - params={ - 'tag': 'tag', - 'repo': 'repo', - 'force': 0 - }, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_tag_image_force(self): - self.client.tag( - fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True) - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/e9aa60c60128/tag', - params={ - 'tag': None, - 'repo': 'repo', - 'force': 1 - }, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_get_image(self): - self.client.get_image(fake_api.FAKE_IMAGE_ID) - - fake_request.assert_called_with( - 'GET', - url_prefix + 'images/e9aa60c60128/get', - stream=True, - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - def test_load_image(self): - self.client.load_image('Byte Stream....') - - fake_request.assert_called_with( - 'POST', - url_prefix + 'images/load', - data='Byte Stream....', - timeout=DEFAULT_TIMEOUT_SECONDS - ) - - ################# - # BUILDER TESTS # - ################# - - def test_build_container(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - - self.client.build(fileobj=script) - - def test_build_container_pull(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - - self.client.build(fileobj=script, pull=True) - - def test_build_container_stream(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - - self.client.build(fileobj=script, stream=True) - - def test_build_container_custom_context(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - context = docker.utils.mkbuildcontext(script) - - self.client.build(fileobj=context, custom_context=True) - - def test_build_container_custom_context_gzip(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - context = docker.utils.mkbuildcontext(script) - gz_context = gzip.GzipFile(fileobj=context) - - self.client.build( - fileobj=gz_context, - custom_context=True, - encoding="gzip" - ) - - def test_build_remote_with_registry_auth(self): - self.client._auth_configs = { - 'https://example.com': { - 'user': 'example', - 'password': 'example', - 'email': 'example@example.com' - } - } - - self.client.build(path='https://github.com/docker-library/mongo') - - def test_build_container_with_named_dockerfile(self): - self.client.build('.', dockerfile='nameddockerfile') - - def test_build_container_with_container_limits(self): - self.client.build('.', container_limits={ - 'memory': 1024 * 1024, - 'cpusetcpus': 1, - 'cpushares': 1000, - 'memswap': 1024 * 1024 * 8 - }) - - def test_build_container_invalid_container_limits(self): - self.assertRaises( - docker.errors.DockerException, - lambda: self.client.build('.', container_limits={ - 'foo': 'bar' - }) - ) - - ################### - # VOLUMES TESTS # - ################### - - @base.requires_api_version('1.21') - def test_list_volumes(self): - volumes = self.client.volumes() - self.assertIn('Volumes', volumes) - self.assertEqual(len(volumes['Volumes']), 2) - args = fake_request.call_args - - self.assertEqual(args[0][0], 'GET') - self.assertEqual(args[0][1], url_prefix + 'volumes') - - @base.requires_api_version('1.21') - def test_create_volume(self): - name = 'perfectcherryblossom' - result = self.client.create_volume(name) - self.assertIn('Name', result) - self.assertEqual(result['Name'], name) - self.assertIn('Driver', result) - self.assertEqual(result['Driver'], 'local') - args = fake_request.call_args - - self.assertEqual(args[0][0], 'POST') - self.assertEqual(args[0][1], url_prefix + 'volumes/create') - self.assertEqual(json.loads(args[1]['data']), {'Name': name}) - - @base.requires_api_version('1.21') - def test_create_volume_with_driver(self): - name = 'perfectcherryblossom' - driver_name = 'sshfs' - self.client.create_volume(name, driver=driver_name) - args = fake_request.call_args - - self.assertEqual(args[0][0], 'POST') - self.assertEqual(args[0][1], url_prefix + 'volumes/create') - data = json.loads(args[1]['data']) - self.assertIn('Driver', data) - self.assertEqual(data['Driver'], driver_name) - - @base.requires_api_version('1.21') - def test_create_volume_invalid_opts_type(self): - with pytest.raises(TypeError): - self.client.create_volume( - 'perfectcherryblossom', driver_opts='hello=world' - ) - - with pytest.raises(TypeError): - self.client.create_volume( - 'perfectcherryblossom', driver_opts=['hello=world'] - ) - - with pytest.raises(TypeError): - self.client.create_volume( - 'perfectcherryblossom', driver_opts='' - ) - - @base.requires_api_version('1.21') - def test_inspect_volume(self): - name = 'perfectcherryblossom' - result = self.client.inspect_volume(name) - self.assertIn('Name', result) - self.assertEqual(result['Name'], name) - self.assertIn('Driver', result) - self.assertEqual(result['Driver'], 'local') - args = fake_request.call_args - - self.assertEqual(args[0][0], 'GET') - self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name)) - - @base.requires_api_version('1.21') - def test_remove_volume(self): - name = 'perfectcherryblossom' - result = self.client.remove_volume(name) - self.assertTrue(result) - args = fake_request.call_args - - self.assertEqual(args[0][0], 'DELETE') - self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name)) - - ##################### - # NETWORK TESTS # - ##################### - - @base.requires_api_version('1.21') - def test_create_network(self): - network_data = { - "id": 'abc12345', - "warning": "", - } - - network_response = response(status_code=200, content=network_data) - post = mock.Mock(return_value=network_response) - - with mock.patch('docker.Client.post', post): - result = self.client.create_network('foo') - self.assertEqual(result, network_data) - - self.assertEqual( - post.call_args[0][0], - url_prefix + 'networks/create') - - self.assertEqual( - json.loads(post.call_args[1]['data']), - {"name": "foo"}) - - self.client.create_network('foo', 'bridge') - - self.assertEqual( - json.loads(post.call_args[1]['data']), - {"name": "foo", "driver": "bridge"}) - - @base.requires_api_version('1.21') - def test_remove_network(self): - network_id = 'abc12345' - delete = mock.Mock(return_value=response(status_code=200)) - - with mock.patch('docker.Client.delete', delete): - self.client.remove_network(network_id) - - args = delete.call_args - self.assertEqual(args[0][0], - url_prefix + 'networks/{0}'.format(network_id)) - - @base.requires_api_version('1.21') - def test_inspect_network(self): - network_id = 'abc12345' - network_name = 'foo' - network_data = { - six.u('name'): network_name, - six.u('id'): network_id, - six.u('driver'): 'bridge', - six.u('containers'): {}, - } - - network_response = response(status_code=200, content=network_data) - get = mock.Mock(return_value=network_response) - - with mock.patch('docker.Client.get', get): - result = self.client.inspect_network(network_id) - self.assertEqual(result, network_data) - - args = get.call_args - self.assertEqual(args[0][0], - url_prefix + 'networks/{0}'.format(network_id)) - - @base.requires_api_version('1.21') - def test_connect_container_to_network(self): - network_id = 'abc12345' - container_id = 'def45678' - - post = mock.Mock(return_value=response(status_code=201)) - - with mock.patch('docker.Client.post', post): - self.client.connect_container_to_network( - {'Id': container_id}, network_id) - - self.assertEqual( - post.call_args[0][0], - url_prefix + 'networks/{0}/connect'.format(network_id)) - - self.assertEqual( - json.loads(post.call_args[1]['data']), - {'container': container_id}) - - @base.requires_api_version('1.21') - def test_disconnect_container_from_network(self): - network_id = 'abc12345' - container_id = 'def45678' - - post = mock.Mock(return_value=response(status_code=201)) - - with mock.patch('docker.Client.post', post): - self.client.disconnect_container_from_network( - {'Id': container_id}, network_id) - - self.assertEqual( - post.call_args[0][0], - url_prefix + 'networks/{0}/disconnect'.format(network_id)) - - self.assertEqual( - json.loads(post.call_args[1]['data']), - {'container': container_id}) - - ####################### - # PY SPECIFIC TESTS # - ####################### - - def test_load_config_no_file(self): - folder = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, folder) - cfg = docker.auth.load_config(folder) - self.assertTrue(cfg is not None) - - def test_load_config(self): - folder = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, folder) - dockercfg_path = os.path.join(folder, '.dockercfg') - with open(dockercfg_path, 'w') as f: - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - f.write('auth = {0}\n'.format(auth_)) - f.write('email = sakuya@scarlet.net') - cfg = docker.auth.load_config(dockercfg_path) - assert docker.auth.INDEX_NAME in cfg - self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None) - cfg = cfg[docker.auth.INDEX_NAME] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('auth'), None) - - def test_load_config_with_random_name(self): - folder = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, folder) - - dockercfg_path = os.path.join(folder, - '.{0}.dockercfg'.format( - random.randrange(100000))) - registry = 'https://your.private.registry.io' - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - config = { - registry: { - 'auth': '{0}'.format(auth_), - 'email': 'sakuya@scarlet.net' - } - } - - with open(dockercfg_path, 'w') as f: - json.dump(config, f) - - cfg = docker.auth.load_config(dockercfg_path) - assert registry in cfg - self.assertNotEqual(cfg[registry], None) - cfg = cfg[registry] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('auth'), None) - - def test_load_config_custom_config_env(self): - folder = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, folder) - - dockercfg_path = os.path.join(folder, 'config.json') - registry = 'https://your.private.registry.io' - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - config = { - registry: { - 'auth': '{0}'.format(auth_), - 'email': 'sakuya@scarlet.net' - } - } - - with open(dockercfg_path, 'w') as f: - json.dump(config, f) - - with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): - cfg = docker.auth.load_config(None) - assert registry in cfg - self.assertNotEqual(cfg[registry], None) - cfg = cfg[registry] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('auth'), None) - - def test_tar_with_excludes(self): - dirs = [ - 'foo', - 'foo/bar', - 'bar', - ] - - files = [ - 'Dockerfile', - 'Dockerfile.alt', - '.dockerignore', - 'a.py', - 'a.go', - 'b.py', - 'cde.py', - 'foo/a.py', - 'foo/b.py', - 'foo/bar/a.py', - 'bar/a.py', - ] - - exclude = [ - '*.py', - '!b.py', - '!a.go', - 'foo', - 'Dockerfile*', - '.dockerignore', - ] - - expected_names = set([ - 'Dockerfile', - '.dockerignore', - 'a.go', - 'b.py', - 'bar', - 'bar/a.py', - ]) - - base = make_tree(dirs, files) - self.addCleanup(shutil.rmtree, base) - - with docker.utils.tar(base, exclude=exclude) as archive: - tar = tarfile.open(fileobj=archive) - assert sorted(tar.getnames()) == sorted(expected_names) - - def test_tar_with_empty_directory(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - with docker.utils.tar(base) as archive: - tar = tarfile.open(fileobj=archive) - self.assertEqual(sorted(tar.getnames()), ['bar', 'foo']) - - def test_tar_with_file_symlinks(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - with open(os.path.join(base, 'foo'), 'w') as f: - f.write("content") - os.makedirs(os.path.join(base, 'bar')) - os.symlink('../foo', os.path.join(base, 'bar/foo')) - with docker.utils.tar(base) as archive: - tar = tarfile.open(fileobj=archive) - self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo']) - - def test_tar_with_directory_symlinks(self): - base = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base) - for d in ['foo', 'bar']: - os.makedirs(os.path.join(base, d)) - os.symlink('../foo', os.path.join(base, 'bar/foo')) - with docker.utils.tar(base) as archive: - tar = tarfile.open(fileobj=archive) - self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo']) - - ####################### - # HOST CONFIG TESTS # - ####################### - - def test_create_host_config_secopt(self): - security_opt = ['apparmor:test_profile'] - result = self.client.create_host_config(security_opt=security_opt) - self.assertIn('SecurityOpt', result) - self.assertEqual(result['SecurityOpt'], security_opt) - - self.assertRaises( - docker.errors.DockerException, self.client.create_host_config, - security_opt='wrong' - ) - - -class StreamTest(Cleanup, base.BaseTestCase): - - def setUp(self): - socket_dir = tempfile.mkdtemp() - self.build_context = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, socket_dir) - self.addCleanup(shutil.rmtree, self.build_context) - self.socket_file = os.path.join(socket_dir, 'test_sock.sock') - self.server_socket = self._setup_socket() - self.stop_server = False - server_thread = threading.Thread(target=self.run_server) - server_thread.setDaemon(True) - server_thread.start() - self.response = None - self.request_handler = None - self.addCleanup(server_thread.join) - self.addCleanup(self.stop) - - def stop(self): - self.stop_server = True - - def _setup_socket(self): - server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - server_sock.bind(self.socket_file) - # Non-blocking mode so that we can shut the test down easily - server_sock.setblocking(0) - server_sock.listen(5) - return server_sock - - def run_server(self): - try: - while not self.stop_server: - try: - connection, client_address = self.server_socket.accept() - except socket.error: - # Probably no connection to accept yet - time.sleep(0.01) - continue - - connection.setblocking(1) - try: - self.request_handler(connection) - finally: - connection.close() - finally: - self.server_socket.close() - - def early_response_sending_handler(self, connection): - data = b'' - headers = None - - connection.sendall(self.response) - while not headers: - data += connection.recv(2048) - parts = data.split(b'\r\n\r\n', 1) - if len(parts) == 2: - headers, data = parts - - mo = re.search(r'Content-Length: ([0-9]+)', headers.decode()) - assert mo - content_length = int(mo.group(1)) - - while True: - if len(data) >= content_length: - break - - data += connection.recv(2048) - - def test_early_stream_response(self): - self.request_handler = self.early_response_sending_handler - lines = [] - for i in range(0, 50): - line = str(i).encode() - lines += [('%x' % len(line)).encode(), line] - lines.append(b'0') - lines.append(b'') - - self.response = ( - b'HTTP/1.1 200 OK\r\n' - b'Transfer-Encoding: chunked\r\n' - b'\r\n' - ) + b'\r\n'.join(lines) - - with docker.Client(base_url="http+unix://" + self.socket_file) \ - as client: - for i in range(5): - try: - stream = client.build( - path=self.build_context, - stream=True - ) - break - except requests.ConnectionError as e: - if i == 4: - raise e - - self.assertEqual(list(stream), [ - str(i).encode() for i in range(50)]) diff --git a/tests/unit/exec_test.py b/tests/unit/exec_test.py new file mode 100644 index 0000000..3007799 --- /dev/null +++ b/tests/unit/exec_test.py @@ -0,0 +1,75 @@ +import json + +from . import fake_api +from .api_test import ( + DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS, +) + + +class ExecTest(DockerClientTest): + def test_exec_create(self): + self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1']) + + args = fake_request.call_args + self.assertEqual( + 'POST', + args[0][0], url_prefix + 'containers/{0}/exec'.format( + fake_api.FAKE_CONTAINER_ID + ) + ) + + self.assertEqual( + json.loads(args[1]['data']), { + 'Tty': False, + 'AttachStdout': True, + 'Container': fake_api.FAKE_CONTAINER_ID, + 'Cmd': ['ls', '-1'], + 'Privileged': False, + 'AttachStdin': False, + 'AttachStderr': True, + 'User': '' + } + ) + + self.assertEqual(args[1]['headers'], + {'Content-Type': 'application/json'}) + + def test_exec_start(self): + self.client.exec_start(fake_api.FAKE_EXEC_ID) + + args = fake_request.call_args + self.assertEqual( + args[0][1], url_prefix + 'exec/{0}/start'.format( + fake_api.FAKE_EXEC_ID + ) + ) + + self.assertEqual( + json.loads(args[1]['data']), { + 'Tty': False, + 'Detach': False, + } + ) + + self.assertEqual(args[1]['headers'], + {'Content-Type': 'application/json'}) + + def test_exec_inspect(self): + self.client.exec_inspect(fake_api.FAKE_EXEC_ID) + + args = fake_request.call_args + self.assertEqual( + args[0][1], url_prefix + 'exec/{0}/json'.format( + fake_api.FAKE_EXEC_ID + ) + ) + + def test_exec_resize(self): + self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID), + params={'h': 20, 'w': 60}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) diff --git a/tests/fake_api.py b/tests/unit/fake_api.py index 8852da0..8852da0 100644 --- a/tests/fake_api.py +++ b/tests/unit/fake_api.py diff --git a/tests/fake_stat.py b/tests/unit/fake_stat.py index a7f1029..a7f1029 100644 --- a/tests/fake_stat.py +++ b/tests/unit/fake_stat.py diff --git a/tests/unit/image_test.py b/tests/unit/image_test.py new file mode 100644 index 0000000..a46e48e --- /dev/null +++ b/tests/unit/image_test.py @@ -0,0 +1,346 @@ +import docker +import pytest + +from . import fake_api +from .api_test import ( + DockerClientTest, fake_request, DEFAULT_TIMEOUT_SECONDS, url_prefix, + fake_resolve_authconfig +) + +try: + from unittest import mock +except ImportError: + import mock + + +class ImageTest(DockerClientTest): + def test_image_viz(self): + with pytest.raises(Exception): + self.client.images('busybox', viz=True) + self.fail('Viz output should not be supported!') + + def test_images(self): + self.client.images(all=True) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/json', + params={'filter': None, 'only_ids': 0, 'all': 1}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_images_quiet(self): + self.client.images(all=True, quiet=True) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/json', + params={'filter': None, 'only_ids': 1, 'all': 1}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_image_ids(self): + self.client.images(quiet=True) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/json', + params={'filter': None, 'only_ids': 1, 'all': 0}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_images_filters(self): + self.client.images(filters={'dangling': True}) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/json', + params={'filter': None, 'only_ids': 0, 'all': 0, + 'filters': '{"dangling": ["true"]}'}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_pull(self): + self.client.pull('joffrey/test001') + + args = fake_request.call_args + self.assertEqual( + args[0][1], + url_prefix + 'images/create' + ) + self.assertEqual( + args[1]['params'], + {'tag': None, 'fromImage': 'joffrey/test001'} + ) + self.assertFalse(args[1]['stream']) + + def test_pull_stream(self): + self.client.pull('joffrey/test001', stream=True) + + args = fake_request.call_args + self.assertEqual( + args[0][1], + url_prefix + 'images/create' + ) + self.assertEqual( + args[1]['params'], + {'tag': None, 'fromImage': 'joffrey/test001'} + ) + self.assertTrue(args[1]['stream']) + + def test_commit(self): + self.client.commit(fake_api.FAKE_CONTAINER_ID) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'commit', + data='{}', + headers={'Content-Type': 'application/json'}, + params={ + 'repo': None, + 'comment': None, + 'tag': None, + 'container': '3cc2351ab11b', + 'author': None + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_remove_image(self): + self.client.remove_image(fake_api.FAKE_IMAGE_ID) + + fake_request.assert_called_with( + 'DELETE', + url_prefix + 'images/e9aa60c60128', + params={'force': False, 'noprune': False}, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_image_history(self): + self.client.history(fake_api.FAKE_IMAGE_NAME) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/test_image/history', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_import_image(self): + self.client.import_image( + fake_api.FAKE_TARBALL_PATH, + repository=fake_api.FAKE_REPO_NAME, + tag=fake_api.FAKE_TAG_NAME + ) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/create', + params={ + 'repo': fake_api.FAKE_REPO_NAME, + 'tag': fake_api.FAKE_TAG_NAME, + 'fromSrc': fake_api.FAKE_TARBALL_PATH + }, + data=None, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_import_image_from_bytes(self): + stream = (i for i in range(0, 100)) + + self.client.import_image( + stream, + repository=fake_api.FAKE_REPO_NAME, + tag=fake_api.FAKE_TAG_NAME + ) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/create', + params={ + 'repo': fake_api.FAKE_REPO_NAME, + 'tag': fake_api.FAKE_TAG_NAME, + 'fromSrc': '-', + }, + headers={ + 'Content-Type': 'application/tar', + }, + data=stream, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_import_image_from_image(self): + self.client.import_image( + image=fake_api.FAKE_IMAGE_NAME, + repository=fake_api.FAKE_REPO_NAME, + tag=fake_api.FAKE_TAG_NAME + ) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/create', + params={ + 'repo': fake_api.FAKE_REPO_NAME, + 'tag': fake_api.FAKE_TAG_NAME, + 'fromImage': fake_api.FAKE_IMAGE_NAME + }, + data=None, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_inspect_image(self): + self.client.inspect_image(fake_api.FAKE_IMAGE_NAME) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/test_image/json', + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_inspect_image_undefined_id(self): + for arg in None, '', {True: True}: + with pytest.raises(docker.errors.NullResource) as excinfo: + self.client.inspect_image(arg) + + self.assertEqual( + excinfo.value.args[0], 'image or container param is undefined' + ) + + def test_insert_image(self): + try: + self.client.insert(fake_api.FAKE_IMAGE_NAME, + fake_api.FAKE_URL, fake_api.FAKE_PATH) + except docker.errors.DeprecatedMethod: + self.assertTrue( + docker.utils.compare_version('1.12', self.client._version) >= 0 + ) + return + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/test_image/insert', + params={ + 'url': fake_api.FAKE_URL, + 'path': fake_api.FAKE_PATH + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_push_image(self): + with mock.patch('docker.auth.auth.resolve_authconfig', + fake_resolve_authconfig): + self.client.push(fake_api.FAKE_IMAGE_NAME) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/test_image/push', + params={ + 'tag': None + }, + data='{}', + headers={'Content-Type': 'application/json'}, + stream=False, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_push_image_with_tag(self): + with mock.patch('docker.auth.auth.resolve_authconfig', + fake_resolve_authconfig): + self.client.push( + fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME + ) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/test_image/push', + params={ + 'tag': fake_api.FAKE_TAG_NAME, + }, + data='{}', + headers={'Content-Type': 'application/json'}, + stream=False, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_push_image_stream(self): + with mock.patch('docker.auth.auth.resolve_authconfig', + fake_resolve_authconfig): + self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/test_image/push', + params={ + 'tag': None + }, + data='{}', + headers={'Content-Type': 'application/json'}, + stream=True, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_tag_image(self): + self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/e9aa60c60128/tag', + params={ + 'tag': None, + 'repo': 'repo', + 'force': 0 + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_tag_image_tag(self): + self.client.tag( + fake_api.FAKE_IMAGE_ID, + fake_api.FAKE_REPO_NAME, + tag=fake_api.FAKE_TAG_NAME + ) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/e9aa60c60128/tag', + params={ + 'tag': 'tag', + 'repo': 'repo', + 'force': 0 + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_tag_image_force(self): + self.client.tag( + fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True) + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/e9aa60c60128/tag', + params={ + 'tag': None, + 'repo': 'repo', + 'force': 1 + }, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_get_image(self): + self.client.get_image(fake_api.FAKE_IMAGE_ID) + + fake_request.assert_called_with( + 'GET', + url_prefix + 'images/e9aa60c60128/get', + stream=True, + timeout=DEFAULT_TIMEOUT_SECONDS + ) + + def test_load_image(self): + self.client.load_image('Byte Stream....') + + fake_request.assert_called_with( + 'POST', + url_prefix + 'images/load', + data='Byte Stream....', + timeout=DEFAULT_TIMEOUT_SECONDS + ) diff --git a/tests/unit/network_test.py b/tests/unit/network_test.py new file mode 100644 index 0000000..306ee4f --- /dev/null +++ b/tests/unit/network_test.py @@ -0,0 +1,149 @@ +import json + +import six + +from .. import base +from .api_test import DockerClientTest, url_prefix, response + +try: + from unittest import mock +except ImportError: + import mock + + +class NetworkTest(DockerClientTest): + @base.requires_api_version('1.21') + def test_list_networks(self): + networks = [ + { + "name": "none", + "id": "8e4e55c6863ef424", + "type": "null", + "endpoints": [] + }, + { + "name": "host", + "id": "062b6d9ea7913fde", + "type": "host", + "endpoints": [] + }, + ] + + get = mock.Mock(return_value=response( + status_code=200, content=json.dumps(networks).encode('utf-8'))) + + with mock.patch('docker.Client.get', get): + self.assertEqual(self.client.networks(), networks) + + self.assertEqual(get.call_args[0][0], url_prefix + 'networks') + + filters = json.loads(get.call_args[1]['params']['filters']) + self.assertFalse(filters) + + self.client.networks(names=['foo']) + filters = json.loads(get.call_args[1]['params']['filters']) + self.assertEqual(filters, {'name': ['foo']}) + + self.client.networks(ids=['123']) + filters = json.loads(get.call_args[1]['params']['filters']) + self.assertEqual(filters, {'id': ['123']}) + + @base.requires_api_version('1.21') + def test_create_network(self): + network_data = { + "id": 'abc12345', + "warning": "", + } + + network_response = response(status_code=200, content=network_data) + post = mock.Mock(return_value=network_response) + + with mock.patch('docker.Client.post', post): + result = self.client.create_network('foo') + self.assertEqual(result, network_data) + + self.assertEqual( + post.call_args[0][0], + url_prefix + 'networks/create') + + self.assertEqual( + json.loads(post.call_args[1]['data']), + {"name": "foo"}) + + self.client.create_network('foo', 'bridge') + + self.assertEqual( + json.loads(post.call_args[1]['data']), + {"name": "foo", "driver": "bridge"}) + + @base.requires_api_version('1.21') + def test_remove_network(self): + network_id = 'abc12345' + delete = mock.Mock(return_value=response(status_code=200)) + + with mock.patch('docker.Client.delete', delete): + self.client.remove_network(network_id) + + args = delete.call_args + self.assertEqual(args[0][0], + url_prefix + 'networks/{0}'.format(network_id)) + + @base.requires_api_version('1.21') + def test_inspect_network(self): + network_id = 'abc12345' + network_name = 'foo' + network_data = { + six.u('name'): network_name, + six.u('id'): network_id, + six.u('driver'): 'bridge', + six.u('containers'): {}, + } + + network_response = response(status_code=200, content=network_data) + get = mock.Mock(return_value=network_response) + + with mock.patch('docker.Client.get', get): + result = self.client.inspect_network(network_id) + self.assertEqual(result, network_data) + + args = get.call_args + self.assertEqual(args[0][0], + url_prefix + 'networks/{0}'.format(network_id)) + + @base.requires_api_version('1.21') + def test_connect_container_to_network(self): + network_id = 'abc12345' + container_id = 'def45678' + + post = mock.Mock(return_value=response(status_code=201)) + + with mock.patch('docker.Client.post', post): + self.client.connect_container_to_network( + {'Id': container_id}, network_id) + + self.assertEqual( + post.call_args[0][0], + url_prefix + 'networks/{0}/connect'.format(network_id)) + + self.assertEqual( + json.loads(post.call_args[1]['data']), + {'container': container_id}) + + @base.requires_api_version('1.21') + def test_disconnect_container_from_network(self): + network_id = 'abc12345' + container_id = 'def45678' + + post = mock.Mock(return_value=response(status_code=201)) + + with mock.patch('docker.Client.post', post): + self.client.disconnect_container_from_network( + {'Id': container_id}, network_id) + + self.assertEqual( + post.call_args[0][0], + url_prefix + 'networks/{0}/disconnect'.format(network_id)) + + self.assertEqual( + json.loads(post.call_args[1]['data']), + {'container': container_id}) diff --git a/tests/testdata/certs/cert.pem b/tests/unit/testdata/certs/ca.pem index e69de29..e69de29 100644 --- a/tests/testdata/certs/cert.pem +++ b/tests/unit/testdata/certs/ca.pem diff --git a/tests/testdata/certs/key.pem b/tests/unit/testdata/certs/cert.pem index e69de29..e69de29 100644 --- a/tests/testdata/certs/key.pem +++ b/tests/unit/testdata/certs/cert.pem diff --git a/tests/unit/testdata/certs/key.pem b/tests/unit/testdata/certs/key.pem new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/unit/testdata/certs/key.pem diff --git a/tests/utils_test.py b/tests/unit/utils_test.py index 04183f9..fffa77a 100644 --- a/tests/utils_test.py +++ b/tests/unit/utils_test.py @@ -5,6 +5,7 @@ import json import os import os.path import shutil +import tarfile import tempfile import pytest @@ -16,15 +17,12 @@ from docker.errors import DockerException from docker.utils import ( parse_repository_tag, parse_host, convert_filters, kwargs_from_env, create_host_config, Ulimit, LogConfig, parse_bytes, parse_env_file, - exclude_paths, convert_volume_binds, decode_json_header + exclude_paths, convert_volume_binds, decode_json_header, tar ) from docker.utils.ports import build_port_bindings, split_port -from docker.auth import ( - resolve_repository_name, resolve_authconfig, encode_header -) -from . import base -from .helpers import make_tree +from .. import base +from ..helpers import make_tree TEST_CERT_DIR = os.path.join( @@ -186,20 +184,7 @@ class KwargsFromEnvTest(base.BaseTestCase): shutil.rmtree(temp_dir) -class UtilsTest(base.BaseTestCase): - longMessage = True - - def generate_tempfile(self, file_content=None): - """ - Generates a temporary file for tests with the content - of 'file_content' and returns the filename. - Don't forget to unlink the file with os.unlink() after. - """ - local_tempfile = tempfile.NamedTemporaryFile(delete=False) - local_tempfile.write(file_content.encode('UTF-8')) - local_tempfile.close() - return local_tempfile.name - +class ConverVolumeBindsTest(base.BaseTestCase): def test_convert_volume_binds_empty(self): self.assertEqual(convert_volume_binds({}), []) self.assertEqual(convert_volume_binds([]), []) @@ -283,26 +268,43 @@ class UtilsTest(base.BaseTestCase): convert_volume_binds(data), expected ) - def test_parse_repository_tag(self): - self.assertEqual(parse_repository_tag("root"), - ("root", None)) - self.assertEqual(parse_repository_tag("root:tag"), - ("root", "tag")) - self.assertEqual(parse_repository_tag("user/repo"), - ("user/repo", None)) - self.assertEqual(parse_repository_tag("user/repo:tag"), - ("user/repo", "tag")) - self.assertEqual(parse_repository_tag("url:5000/repo"), - ("url:5000/repo", None)) - self.assertEqual(parse_repository_tag("url:5000/repo:tag"), - ("url:5000/repo", "tag")) - def test_parse_bytes(self): - self.assertEqual(parse_bytes("512MB"), (536870912)) - self.assertEqual(parse_bytes("512M"), (536870912)) - self.assertRaises(DockerException, parse_bytes, "512MK") - self.assertRaises(DockerException, parse_bytes, "512L") +class ParseEnvFileTest(base.BaseTestCase): + def generate_tempfile(self, file_content=None): + """ + Generates a temporary file for tests with the content + of 'file_content' and returns the filename. + Don't forget to unlink the file with os.unlink() after. + """ + local_tempfile = tempfile.NamedTemporaryFile(delete=False) + local_tempfile.write(file_content.encode('UTF-8')) + local_tempfile.close() + return local_tempfile.name + def test_parse_env_file_proper(self): + env_file = self.generate_tempfile( + file_content='USER=jdoe\nPASS=secret') + get_parse_env_file = parse_env_file(env_file) + self.assertEqual(get_parse_env_file, + {'USER': 'jdoe', 'PASS': 'secret'}) + os.unlink(env_file) + + def test_parse_env_file_commented_line(self): + env_file = self.generate_tempfile( + file_content='USER=jdoe\n#PASS=secret') + get_parse_env_file = parse_env_file((env_file)) + self.assertEqual(get_parse_env_file, {'USER': 'jdoe'}) + os.unlink(env_file) + + def test_parse_env_file_invalid_line(self): + env_file = self.generate_tempfile( + file_content='USER jdoe') + self.assertRaises( + DockerException, parse_env_file, env_file) + os.unlink(env_file) + + +class ParseHostTest(base.BaseTestCase): def test_parse_host(self): invalid_hosts = [ '0.0.0.0', @@ -341,27 +343,29 @@ class UtilsTest(base.BaseTestCase): assert parse_host(val, 'win32') == tcp_port - def test_parse_env_file_proper(self): - env_file = self.generate_tempfile( - file_content='USER=jdoe\nPASS=secret') - get_parse_env_file = parse_env_file(env_file) - self.assertEqual(get_parse_env_file, - {'USER': 'jdoe', 'PASS': 'secret'}) - os.unlink(env_file) - def test_parse_env_file_commented_line(self): - env_file = self.generate_tempfile( - file_content='USER=jdoe\n#PASS=secret') - get_parse_env_file = parse_env_file((env_file)) - self.assertEqual(get_parse_env_file, {'USER': 'jdoe'}) - os.unlink(env_file) +class UtilsTest(base.BaseTestCase): + longMessage = True - def test_parse_env_file_invalid_line(self): - env_file = self.generate_tempfile( - file_content='USER jdoe') - self.assertRaises( - DockerException, parse_env_file, env_file) - os.unlink(env_file) + def test_parse_repository_tag(self): + self.assertEqual(parse_repository_tag("root"), + ("root", None)) + self.assertEqual(parse_repository_tag("root:tag"), + ("root", "tag")) + self.assertEqual(parse_repository_tag("user/repo"), + ("user/repo", None)) + self.assertEqual(parse_repository_tag("user/repo:tag"), + ("user/repo", "tag")) + self.assertEqual(parse_repository_tag("url:5000/repo"), + ("url:5000/repo", None)) + self.assertEqual(parse_repository_tag("url:5000/repo:tag"), + ("url:5000/repo", "tag")) + + def test_parse_bytes(self): + self.assertEqual(parse_bytes("512MB"), (536870912)) + self.assertEqual(parse_bytes("512M"), (536870912)) + self.assertRaises(DockerException, parse_bytes, "512MK") + self.assertRaises(DockerException, parse_bytes, "512L") def test_convert_filters(self): tests = [ @@ -384,168 +388,6 @@ class UtilsTest(base.BaseTestCase): decoded_data = decode_json_header(data) self.assertEqual(obj, decoded_data) - def test_803_urlsafe_encode(self): - auth_data = { - 'username': 'root', - 'password': 'GR?XGR?XGR?XGR?X' - } - encoded = encode_header(auth_data) - assert b'/' not in encoded - assert b'_' in encoded - - def test_resolve_repository_name(self): - # docker hub library image - self.assertEqual( - resolve_repository_name('image'), - ('index.docker.io', 'image'), - ) - - # docker hub image - self.assertEqual( - resolve_repository_name('username/image'), - ('index.docker.io', 'username/image'), - ) - - # private registry - self.assertEqual( - resolve_repository_name('my.registry.net/image'), - ('my.registry.net', 'image'), - ) - - # private registry with port - self.assertEqual( - resolve_repository_name('my.registry.net:5000/image'), - ('my.registry.net:5000', 'image'), - ) - - # private registry with username - self.assertEqual( - resolve_repository_name('my.registry.net/username/image'), - ('my.registry.net', 'username/image'), - ) - - # no dots but port - self.assertEqual( - resolve_repository_name('hostname:5000/image'), - ('hostname:5000', 'image'), - ) - - # no dots but port and username - self.assertEqual( - resolve_repository_name('hostname:5000/username/image'), - ('hostname:5000', 'username/image'), - ) - - # localhost - self.assertEqual( - resolve_repository_name('localhost/image'), - ('localhost', 'image'), - ) - - # localhost with username - self.assertEqual( - resolve_repository_name('localhost/username/image'), - ('localhost', 'username/image'), - ) - - def test_resolve_authconfig(self): - auth_config = { - 'https://index.docker.io/v1/': {'auth': 'indexuser'}, - 'my.registry.net': {'auth': 'privateuser'}, - 'http://legacy.registry.url/v1/': {'auth': 'legacyauth'} - } - # hostname only - self.assertEqual( - resolve_authconfig(auth_config, 'my.registry.net'), - {'auth': 'privateuser'} - ) - # no protocol - self.assertEqual( - resolve_authconfig(auth_config, 'my.registry.net/v1/'), - {'auth': 'privateuser'} - ) - # no path - self.assertEqual( - resolve_authconfig(auth_config, 'http://my.registry.net'), - {'auth': 'privateuser'} - ) - # no path, trailing slash - self.assertEqual( - resolve_authconfig(auth_config, 'http://my.registry.net/'), - {'auth': 'privateuser'} - ) - # no path, wrong secure protocol - self.assertEqual( - resolve_authconfig(auth_config, 'https://my.registry.net'), - {'auth': 'privateuser'} - ) - # no path, wrong insecure protocol - self.assertEqual( - resolve_authconfig(auth_config, 'http://index.docker.io'), - {'auth': 'indexuser'} - ) - # with path, wrong protocol - self.assertEqual( - resolve_authconfig(auth_config, 'https://my.registry.net/v1/'), - {'auth': 'privateuser'} - ) - # default registry - self.assertEqual( - resolve_authconfig(auth_config), {'auth': 'indexuser'} - ) - # default registry (explicit None) - self.assertEqual( - resolve_authconfig(auth_config, None), {'auth': 'indexuser'} - ) - # fully explicit - self.assertEqual( - resolve_authconfig(auth_config, 'http://my.registry.net/v1/'), - {'auth': 'privateuser'} - ) - # legacy entry in config - self.assertEqual( - resolve_authconfig(auth_config, 'legacy.registry.url'), - {'auth': 'legacyauth'} - ) - # no matching entry - self.assertTrue( - resolve_authconfig(auth_config, 'does.not.exist') is None - ) - - def test_resolve_registry_and_auth(self): - auth_config = { - 'https://index.docker.io/v1/': {'auth': 'indexuser'}, - 'my.registry.net': {'auth': 'privateuser'}, - } - - # library image - image = 'image' - self.assertEqual( - resolve_authconfig(auth_config, resolve_repository_name(image)[0]), - {'auth': 'indexuser'}, - ) - - # docker hub image - image = 'username/image' - self.assertEqual( - resolve_authconfig(auth_config, resolve_repository_name(image)[0]), - {'auth': 'indexuser'}, - ) - - # private registry - image = 'my.registry.net/image' - self.assertEqual( - resolve_authconfig(auth_config, resolve_repository_name(image)[0]), - {'auth': 'privateuser'}, - ) - - # unauthenticated registry - image = 'other.registry.net/image' - self.assertEqual( - resolve_authconfig(auth_config, resolve_repository_name(image)[0]), - None, - ) - class PortsTest(base.BaseTestCase): def test_split_port_with_host_ip(self): @@ -790,3 +632,85 @@ class ExcludePathsTest(base.BaseTestCase): assert self.exclude(['foo/bar']) == self.all_paths - set([ 'foo/bar', 'foo/bar/a.py', ]) + + +class TarTest(base.Cleanup, base.BaseTestCase): + def test_tar_with_excludes(self): + dirs = [ + 'foo', + 'foo/bar', + 'bar', + ] + + files = [ + 'Dockerfile', + 'Dockerfile.alt', + '.dockerignore', + 'a.py', + 'a.go', + 'b.py', + 'cde.py', + 'foo/a.py', + 'foo/b.py', + 'foo/bar/a.py', + 'bar/a.py', + ] + + exclude = [ + '*.py', + '!b.py', + '!a.go', + 'foo', + 'Dockerfile*', + '.dockerignore', + ] + + expected_names = set([ + 'Dockerfile', + '.dockerignore', + 'a.go', + 'b.py', + 'bar', + 'bar/a.py', + ]) + + base = make_tree(dirs, files) + self.addCleanup(shutil.rmtree, base) + + with tar(base, exclude=exclude) as archive: + tar_data = tarfile.open(fileobj=archive) + assert sorted(tar_data.getnames()) == sorted(expected_names) + + def test_tar_with_empty_directory(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + self.assertEqual(sorted(tar_data.getnames()), ['bar', 'foo']) + + def test_tar_with_file_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + with open(os.path.join(base, 'foo'), 'w') as f: + f.write("content") + os.makedirs(os.path.join(base, 'bar')) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + self.assertEqual( + sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo'] + ) + + def test_tar_with_directory_symlinks(self): + base = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, base) + for d in ['foo', 'bar']: + os.makedirs(os.path.join(base, d)) + os.symlink('../foo', os.path.join(base, 'bar/foo')) + with tar(base) as archive: + tar_data = tarfile.open(fileobj=archive) + self.assertEqual( + sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo'] + ) diff --git a/tests/unit/volume_test.py b/tests/unit/volume_test.py new file mode 100644 index 0000000..4c2f877 --- /dev/null +++ b/tests/unit/volume_test.py @@ -0,0 +1,85 @@ +import json + +import pytest + +from .. import base +from .api_test import DockerClientTest, url_prefix, fake_request + + +class VolumeTest(DockerClientTest): + @base.requires_api_version('1.21') + def test_list_volumes(self): + volumes = self.client.volumes() + self.assertIn('Volumes', volumes) + self.assertEqual(len(volumes['Volumes']), 2) + args = fake_request.call_args + + self.assertEqual(args[0][0], 'GET') + self.assertEqual(args[0][1], url_prefix + 'volumes') + + @base.requires_api_version('1.21') + def test_create_volume(self): + name = 'perfectcherryblossom' + result = self.client.create_volume(name) + self.assertIn('Name', result) + self.assertEqual(result['Name'], name) + self.assertIn('Driver', result) + self.assertEqual(result['Driver'], 'local') + args = fake_request.call_args + + self.assertEqual(args[0][0], 'POST') + self.assertEqual(args[0][1], url_prefix + 'volumes/create') + self.assertEqual(json.loads(args[1]['data']), {'Name': name}) + + @base.requires_api_version('1.21') + def test_create_volume_with_driver(self): + name = 'perfectcherryblossom' + driver_name = 'sshfs' + self.client.create_volume(name, driver=driver_name) + args = fake_request.call_args + + self.assertEqual(args[0][0], 'POST') + self.assertEqual(args[0][1], url_prefix + 'volumes/create') + data = json.loads(args[1]['data']) + self.assertIn('Driver', data) + self.assertEqual(data['Driver'], driver_name) + + @base.requires_api_version('1.21') + def test_create_volume_invalid_opts_type(self): + with pytest.raises(TypeError): + self.client.create_volume( + 'perfectcherryblossom', driver_opts='hello=world' + ) + + with pytest.raises(TypeError): + self.client.create_volume( + 'perfectcherryblossom', driver_opts=['hello=world'] + ) + + with pytest.raises(TypeError): + self.client.create_volume( + 'perfectcherryblossom', driver_opts='' + ) + + @base.requires_api_version('1.21') + def test_inspect_volume(self): + name = 'perfectcherryblossom' + result = self.client.inspect_volume(name) + self.assertIn('Name', result) + self.assertEqual(result['Name'], name) + self.assertIn('Driver', result) + self.assertEqual(result['Driver'], 'local') + args = fake_request.call_args + + self.assertEqual(args[0][0], 'GET') + self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name)) + + @base.requires_api_version('1.21') + def test_remove_volume(self): + name = 'perfectcherryblossom' + result = self.client.remove_volume(name) + self.assertTrue(result) + args = fake_request.call_args + + self.assertEqual(args[0][0], 'DELETE') + self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name)) @@ -5,7 +5,7 @@ skipsdist=True [testenv] usedevelop=True commands = - py.test --cov=docker tests/test.py tests/utils_test.py + py.test --cov=docker tests/unit/ deps = -r{toxinidir}/test-requirements.txt -r{toxinidir}/requirements.txt |