summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2015-09-23 17:42:29 -0700
committerJoffrey F <joffrey@docker.com>2015-10-21 16:02:09 -0700
commit93a296fb0448d9fccdf9f40f7a9996f49ea22c48 (patch)
tree633371cbdf88435664b530c4ca835c4929d92274
parent5a1c7ed8bf0ac9a3914de7c80c1c29c13f6a62ea (diff)
downloaddocker-py-reorganize_tests.tar.gz
Reorganize test directoriesreorganize_tests
More clearly separate unit and integration tests Allow splitting into multiple files Cleaner Signed-off-by: Joffrey F <joffrey@docker.com>
-rw-r--r--Makefile12
-rw-r--r--tests/base.py25
-rw-r--r--tests/integration/__init__.py13
-rw-r--r--tests/integration/api_test.py292
-rw-r--r--tests/integration/build_test.py98
-rw-r--r--tests/integration/container_test.py991
-rw-r--r--tests/integration/exec_test.py106
-rw-r--r--tests/integration/image_test.py235
-rw-r--r--tests/integration/network_test.py98
-rw-r--r--tests/integration/regression_test.py69
-rw-r--r--tests/integration/volume_test.py56
-rw-r--r--tests/integration_test.py2044
-rw-r--r--tests/unit/__init__.py (renamed from tests/testdata/certs/ca.pem)0
-rw-r--r--tests/unit/api_test.py418
-rw-r--r--tests/unit/auth_test.py289
-rw-r--r--tests/unit/build_test.py105
-rw-r--r--tests/unit/container_test.py (renamed from tests/test.py)1574
-rw-r--r--tests/unit/exec_test.py75
-rw-r--r--tests/unit/fake_api.py (renamed from tests/fake_api.py)0
-rw-r--r--tests/unit/fake_stat.py (renamed from tests/fake_stat.py)0
-rw-r--r--tests/unit/image_test.py346
-rw-r--r--tests/unit/network_test.py149
-rw-r--r--tests/unit/testdata/certs/ca.pem (renamed from tests/testdata/certs/cert.pem)0
-rw-r--r--tests/unit/testdata/certs/cert.pem (renamed from tests/testdata/certs/key.pem)0
-rw-r--r--tests/unit/testdata/certs/key.pem0
-rw-r--r--tests/unit/utils_test.py (renamed from tests/utils_test.py)362
-rw-r--r--tests/unit/volume_test.py85
-rw-r--r--tox.ini2
28 files changed, 3731 insertions, 3713 deletions
diff --git a/Makefile b/Makefile
index c051bac..10ccda6 100644
--- a/Makefile
+++ b/Makefile
@@ -17,21 +17,21 @@ build-dind-certs:
test: flake8 unit-test unit-test-py3 integration-dind integration-dind-ssl
unit-test: build
- docker run docker-py py.test tests/test.py tests/utils_test.py
+ docker run docker-py py.test tests/unit
unit-test-py3: build-py3
- docker run docker-py3 py.test tests/test.py tests/utils_test.py
+ docker run docker-py3 py.test tests/unit
integration-test: build
- docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration_test.py
+ docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test tests/integration
integration-test-py3: build-py3
- docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration_test.py
+ docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test tests/integration
integration-dind: build build-py3
docker run -d --name dpy-dind --env="DOCKER_HOST=tcp://localhost:2375" --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375
- docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration_test.py
- docker run --volumes-from dpy-dind --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration_test.py
+ docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test tests/integration
+ docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test tests/integration
docker rm -vf dpy-dind
integration-dind-ssl: build-dind-certs build build-py3
diff --git a/tests/base.py b/tests/base.py
index 51b2300..a2c01fc 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -21,3 +21,28 @@ def requires_api_version(version):
),
reason="API version is too low (< {0})".format(version)
)
+
+
+class Cleanup(object):
+ if sys.version_info < (2, 7):
+ # Provide a basic implementation of addCleanup for Python < 2.7
+ def __init__(self, *args, **kwargs):
+ super(Cleanup, self).__init__(*args, **kwargs)
+ self._cleanups = []
+
+ def tearDown(self):
+ super(Cleanup, self).tearDown()
+ ok = True
+ while self._cleanups:
+ fn, args, kwargs = self._cleanups.pop(-1)
+ try:
+ fn(*args, **kwargs)
+ except KeyboardInterrupt:
+ raise
+ except:
+ ok = False
+ if not ok:
+ raise
+
+ def addCleanup(self, function, *args, **kwargs):
+ self._cleanups.append((function, args, kwargs))
diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py
new file mode 100644
index 0000000..20c0a0a
--- /dev/null
+++ b/tests/integration/__init__.py
@@ -0,0 +1,13 @@
+# flake8: noqa
+
+# FIXME: crutch while we transition to the new folder architecture
+# Remove imports when merged in master and Jenkins is updated to find the
+# tests in the new location.
+from .api_test import *
+from .build_test import *
+from .container_test import *
+from .exec_test import *
+from .image_test import *
+from .network_test import *
+from .regression_test import *
+from .volume_test import *
diff --git a/tests/integration/api_test.py b/tests/integration/api_test.py
new file mode 100644
index 0000000..1b33728
--- /dev/null
+++ b/tests/integration/api_test.py
@@ -0,0 +1,292 @@
+import base64
+import json
+import os
+import shutil
+import tempfile
+import time
+import unittest
+import warnings
+
+import docker
+import six
+
+BUSYBOX = 'busybox:buildroot-2014.02'
+EXEC_DRIVER = []
+
+
+def exec_driver_is_native():
+ global EXEC_DRIVER
+ if not EXEC_DRIVER:
+ c = docker_client()
+ EXEC_DRIVER = c.info()['ExecutionDriver']
+ c.close()
+ return EXEC_DRIVER.startswith('native')
+
+
+def docker_client(**kwargs):
+ return docker.Client(**docker_client_kwargs(**kwargs))
+
+
+def docker_client_kwargs(**kwargs):
+ client_kwargs = docker.utils.kwargs_from_env(assert_hostname=False)
+ client_kwargs.update(kwargs)
+ return client_kwargs
+
+
+def setup_module():
+ warnings.simplefilter('error')
+ c = docker_client()
+ try:
+ c.inspect_image(BUSYBOX)
+ except docker.errors.NotFound:
+ os.write(2, "\npulling busybox\n".encode('utf-8'))
+ for data in c.pull(BUSYBOX, stream=True):
+ data = json.loads(data.decode('utf-8'))
+ os.write(2, ("%c[2K\r" % 27).encode('utf-8'))
+ status = data.get("status")
+ progress = data.get("progress")
+ detail = "{0} - {1}".format(status, progress).encode('utf-8')
+ os.write(2, detail)
+ os.write(2, "\npulled busybox\n".encode('utf-8'))
+
+ # Double make sure we now have busybox
+ c.inspect_image(BUSYBOX)
+ c.close()
+
+
+class BaseTestCase(unittest.TestCase):
+ tmp_imgs = []
+ tmp_containers = []
+ tmp_folders = []
+ tmp_volumes = []
+
+ def setUp(self):
+ if six.PY2:
+ self.assertRegex = self.assertRegexpMatches
+ self.assertCountEqual = self.assertItemsEqual
+ self.client = docker_client(timeout=60)
+ self.tmp_imgs = []
+ self.tmp_containers = []
+ self.tmp_folders = []
+ self.tmp_volumes = []
+ self.tmp_networks = []
+
+ def tearDown(self):
+ for img in self.tmp_imgs:
+ try:
+ self.client.remove_image(img)
+ except docker.errors.APIError:
+ pass
+ for container in self.tmp_containers:
+ try:
+ self.client.stop(container, timeout=1)
+ self.client.remove_container(container)
+ except docker.errors.APIError:
+ pass
+ for network in self.tmp_networks:
+ try:
+ self.client.remove_network(network)
+ except docker.errors.APIError:
+ pass
+ for folder in self.tmp_folders:
+ shutil.rmtree(folder)
+
+ for volume in self.tmp_volumes:
+ try:
+ self.client.remove_volume(volume)
+ except docker.errors.APIError:
+ pass
+
+ self.client.close()
+
+ def run_container(self, *args, **kwargs):
+ container = self.client.create_container(*args, **kwargs)
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ exitcode = self.client.wait(container)
+
+ if exitcode != 0:
+ output = self.client.logs(container)
+ raise Exception(
+ "Container exited with code {}:\n{}"
+ .format(exitcode, output))
+
+ return container
+
+
+#########################
+# INFORMATION TESTS #
+#########################
+
+
+class InformationTest(BaseTestCase):
+ def test_version(self):
+ res = self.client.version()
+ self.assertIn('GoVersion', res)
+ self.assertIn('Version', res)
+ self.assertEqual(len(res['Version'].split('.')), 3)
+
+ def test_info(self):
+ res = self.client.info()
+ self.assertIn('Containers', res)
+ self.assertIn('Images', res)
+ self.assertIn('Debug', res)
+
+ def test_search(self):
+ self.client = docker_client(timeout=10)
+ res = self.client.search('busybox')
+ self.assertTrue(len(res) >= 1)
+ base_img = [x for x in res if x['name'] == 'busybox']
+ self.assertEqual(len(base_img), 1)
+ self.assertIn('description', base_img[0])
+
+
+#################
+# LINKS TESTS #
+#################
+
+
+class LinkTest(BaseTestCase):
+ def test_remove_link(self):
+ # Create containers
+ container1 = self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True
+ )
+ container1_id = container1['Id']
+ self.tmp_containers.append(container1_id)
+ self.client.start(container1_id)
+
+ # Create Link
+ # we don't want the first /
+ link_path = self.client.inspect_container(container1_id)['Name'][1:]
+ link_alias = 'mylink'
+
+ container2 = self.client.create_container(
+ BUSYBOX, 'cat', host_config=self.client.create_host_config(
+ links={link_path: link_alias}, network_mode='none'
+ )
+ )
+ container2_id = container2['Id']
+ self.tmp_containers.append(container2_id)
+ self.client.start(container2_id)
+
+ # Remove link
+ linked_name = self.client.inspect_container(container2_id)['Name'][1:]
+ link_name = '%s/%s' % (linked_name, link_alias)
+ self.client.remove_container(link_name, link=True)
+
+ # Link is gone
+ containers = self.client.containers(all=True)
+ retrieved = [x for x in containers if link_name in x['Names']]
+ self.assertEqual(len(retrieved), 0)
+
+ # Containers are still there
+ retrieved = [
+ x for x in containers if x['Id'].startswith(container1_id) or
+ x['Id'].startswith(container2_id)
+ ]
+ self.assertEqual(len(retrieved), 2)
+
+
+#######################
+# PY SPECIFIC TESTS #
+#######################
+
+class LoadConfigTest(BaseTestCase):
+ def test_load_legacy_config(self):
+ folder = tempfile.mkdtemp()
+ self.tmp_folders.append(folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ f = open(cfg_path, 'w')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ f.write('auth = {0}\n'.format(auth_))
+ f.write('email = sakuya@scarlet.net')
+ f.close()
+ cfg = docker.auth.load_config(cfg_path)
+ self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
+ cfg = cfg[docker.auth.INDEX_NAME]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('Auth'), None)
+
+ def test_load_json_config(self):
+ folder = tempfile.mkdtemp()
+ self.tmp_folders.append(folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ f = open(os.path.join(folder, '.dockercfg'), 'w')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email_ = 'sakuya@scarlet.net'
+ f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
+ docker.auth.INDEX_URL, auth_, email_))
+ f.close()
+ cfg = docker.auth.load_config(cfg_path)
+ self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
+ cfg = cfg[docker.auth.INDEX_URL]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('Auth'), None)
+
+
+class AutoDetectVersionTest(unittest.TestCase):
+ def test_client_init(self):
+ client = docker_client(version='auto')
+ client_version = client._version
+ api_version = client.version(api_version=False)['ApiVersion']
+ self.assertEqual(client_version, api_version)
+ api_version_2 = client.version()['ApiVersion']
+ self.assertEqual(client_version, api_version_2)
+ client.close()
+
+ def test_auto_client(self):
+ client = docker.AutoVersionClient(**docker_client_kwargs())
+ client_version = client._version
+ api_version = client.version(api_version=False)['ApiVersion']
+ self.assertEqual(client_version, api_version)
+ api_version_2 = client.version()['ApiVersion']
+ self.assertEqual(client_version, api_version_2)
+ client.close()
+ with self.assertRaises(docker.errors.DockerException):
+ docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
+
+
+class ConnectionTimeoutTest(unittest.TestCase):
+ def setUp(self):
+ self.timeout = 0.5
+ self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
+ timeout=self.timeout)
+
+ def test_timeout(self):
+ start = time.time()
+ res = None
+ # This call isn't supposed to complete, and it should fail fast.
+ try:
+ res = self.client.inspect_container('id')
+ except:
+ pass
+ end = time.time()
+ self.assertTrue(res is None)
+ self.assertTrue(end - start < 2 * self.timeout)
+
+
+class UnixconnTest(unittest.TestCase):
+ """
+ Test UNIX socket connection adapter.
+ """
+
+ def test_resource_warnings(self):
+ """
+ Test no warnings are produced when using the client.
+ """
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+
+ client = docker_client()
+ client.images()
+ client.close()
+ del client
+
+ assert len(w) == 0, \
+ "No warnings produced: {0}".format(w[0].message)
diff --git a/tests/integration/build_test.py b/tests/integration/build_test.py
new file mode 100644
index 0000000..289c0fe
--- /dev/null
+++ b/tests/integration/build_test.py
@@ -0,0 +1,98 @@
+import io
+import json
+import os
+import shutil
+import tempfile
+
+import six
+
+from . import api_test
+from ..base import requires_api_version
+
+
+class BuildTest(api_test.BaseTestCase):
+ def test_build_streaming(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+ stream = self.client.build(fileobj=script, stream=True)
+ logs = ''
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ json.loads(chunk) # ensure chunk is a single, valid JSON blob
+ logs += chunk
+ self.assertNotEqual(logs, '')
+
+ def test_build_from_stringio(self):
+ if six.PY3:
+ return
+ script = io.StringIO(six.text_type('\n').join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]))
+ stream = self.client.build(fileobj=script, stream=True)
+ logs = ''
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ logs += chunk
+ self.assertNotEqual(logs, '')
+
+ @requires_api_version('1.8')
+ def test_build_with_dockerignore(self):
+ base_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base_dir)
+
+ with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
+ f.write("\n".join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'ADD . /test',
+ ]))
+
+ with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
+ f.write("\n".join([
+ 'ignored',
+ 'Dockerfile',
+ '.dockerignore',
+ '', # empty line
+ ]))
+
+ with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
+ f.write("this file should not be ignored")
+
+ subdir = os.path.join(base_dir, 'ignored', 'subdir')
+ os.makedirs(subdir)
+ with open(os.path.join(subdir, 'file'), 'w') as f:
+ f.write("this file should be ignored")
+
+ tag = 'docker-py-test-build-with-dockerignore'
+ stream = self.client.build(
+ path=base_dir,
+ tag=tag,
+ )
+ for chunk in stream:
+ pass
+
+ c = self.client.create_container(tag, ['ls', '-1A', '/test'])
+ self.client.start(c)
+ self.client.wait(c)
+ logs = self.client.logs(c)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+
+ self.assertEqual(
+ list(filter(None, logs.split('\n'))),
+ ['not-ignored'],
+ )
diff --git a/tests/integration/container_test.py b/tests/integration/container_test.py
new file mode 100644
index 0000000..04a0ef5
--- /dev/null
+++ b/tests/integration/container_test.py
@@ -0,0 +1,991 @@
+import errno
+import os
+import shutil
+import signal
+import struct
+import tempfile
+
+import docker
+import pytest
+import six
+
+from . import api_test
+from ..base import requires_api_version
+from .. import helpers
+
+BUSYBOX = api_test.BUSYBOX
+
+
+class ListContainersTest(api_test.BaseTestCase):
+ def test_list_containers(self):
+ res0 = self.client.containers(all=True)
+ size = len(res0)
+ res1 = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res1)
+ self.client.start(res1['Id'])
+ self.tmp_containers.append(res1['Id'])
+ res2 = self.client.containers(all=True)
+ self.assertEqual(size + 1, len(res2))
+ retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
+ self.assertEqual(len(retrieved), 1)
+ retrieved = retrieved[0]
+ self.assertIn('Command', retrieved)
+ self.assertEqual(retrieved['Command'], six.text_type('true'))
+ self.assertIn('Image', retrieved)
+ self.assertRegex(retrieved['Image'], r'busybox:.*')
+ self.assertIn('Status', retrieved)
+
+
+class CreateContainerTest(api_test.BaseTestCase):
+ def setUp(self):
+ super(CreateContainerTest, self).setUp()
+
+ self.mount_dest = '/mnt'
+
+ # Get a random pathname - we don't need it to exist locally
+ self.mount_origin = tempfile.mkdtemp()
+ shutil.rmtree(self.mount_origin)
+
+ self.filename = 'shared.txt'
+
+ self.run_with_volume(
+ False,
+ BUSYBOX,
+ ['touch', os.path.join(self.mount_dest, self.filename)],
+ )
+
+ def test_create(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+
+ def test_create_with_host_pid_mode(self):
+ ctnr = self.client.create_container(
+ BUSYBOX, 'true', host_config=self.client.create_host_config(
+ pid_mode='host', network_mode='none'
+ )
+ )
+ self.assertIn('Id', ctnr)
+ self.tmp_containers.append(ctnr['Id'])
+ self.client.start(ctnr)
+ inspect = self.client.inspect_container(ctnr)
+ self.assertIn('HostConfig', inspect)
+ host_config = inspect['HostConfig']
+ self.assertIn('PidMode', host_config)
+ self.assertEqual(host_config['PidMode'], 'host')
+
+ def test_create_with_links(self):
+ res0 = self.client.create_container(
+ BUSYBOX, 'cat',
+ detach=True, stdin_open=True,
+ environment={'FOO': '1'})
+
+ container1_id = res0['Id']
+ self.tmp_containers.append(container1_id)
+
+ self.client.start(container1_id)
+
+ res1 = self.client.create_container(
+ BUSYBOX, 'cat',
+ detach=True, stdin_open=True,
+ environment={'FOO': '1'})
+
+ container2_id = res1['Id']
+ self.tmp_containers.append(container2_id)
+
+ self.client.start(container2_id)
+
+ # we don't want the first /
+ link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
+ link_alias1 = 'mylink1'
+ link_env_prefix1 = link_alias1.upper()
+
+ link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
+ link_alias2 = 'mylink2'
+ link_env_prefix2 = link_alias2.upper()
+
+ res2 = self.client.create_container(
+ BUSYBOX, 'env', host_config=self.client.create_host_config(
+ links={link_path1: link_alias1, link_path2: link_alias2},
+ network_mode='none'
+ )
+ )
+ container3_id = res2['Id']
+ self.tmp_containers.append(container3_id)
+ self.client.start(container3_id)
+ self.assertEqual(self.client.wait(container3_id), 0)
+
+ logs = self.client.logs(container3_id)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
+ self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
+ self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
+ self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
+
+ def test_create_with_restart_policy(self):
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '2'],
+ host_config=self.client.create_host_config(
+ restart_policy={"Name": "always", "MaximumRetryCount": 0},
+ network_mode='none'
+ )
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ with self.assertRaises(docker.errors.APIError) as exc:
+ self.client.remove_container(id)
+ err = exc.exception.response.text
+ self.assertIn(
+ 'You cannot remove a running container', err
+ )
+ self.client.remove_container(id, force=True)
+
+ def test_create_container_with_volumes_from(self):
+ vol_names = ['foobar_vol0', 'foobar_vol1']
+
+ res0 = self.client.create_container(
+ BUSYBOX, 'true', name=vol_names[0]
+ )
+ container1_id = res0['Id']
+ self.tmp_containers.append(container1_id)
+ self.client.start(container1_id)
+
+ res1 = self.client.create_container(
+ BUSYBOX, 'true', name=vol_names[1]
+ )
+ container2_id = res1['Id']
+ self.tmp_containers.append(container2_id)
+ self.client.start(container2_id)
+ with self.assertRaises(docker.errors.DockerException):
+ self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True,
+ volumes_from=vol_names
+ )
+ res2 = self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True,
+ host_config=self.client.create_host_config(
+ volumes_from=vol_names, network_mode='none'
+ )
+ )
+ container3_id = res2['Id']
+ self.tmp_containers.append(container3_id)
+ self.client.start(container3_id)
+
+ info = self.client.inspect_container(res2['Id'])
+ self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
+
+ def test_create_with_binds_rw(self):
+ container = self.run_with_volume(
+ False,
+ BUSYBOX,
+ ['ls', self.mount_dest],
+ )
+ logs = self.client.logs(container)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn(self.filename, logs)
+ inspect_data = self.client.inspect_container(container)
+ self.check_container_data(inspect_data, True)
+
+ def test_create_with_binds_ro(self):
+ container = self.run_with_volume(
+ True,
+ BUSYBOX,
+ ['ls', self.mount_dest],
+ )
+ logs = self.client.logs(container)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn(self.filename, logs)
+
+ inspect_data = self.client.inspect_container(container)
+ self.check_container_data(inspect_data, False)
+
+ def create_container_readonly_fs(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ ctnr = self.client.create_container(
+ BUSYBOX, ['mkdir', '/shrine'],
+ host_config=self.client.create_host_config(
+ read_only=True, network_mode='none'
+ )
+ )
+ self.assertIn('Id', ctnr)
+ self.tmp_containers.append(ctnr['Id'])
+ self.client.start(ctnr)
+ res = self.client.wait(ctnr)
+ self.assertNotEqual(res, 0)
+
+ def create_container_with_name(self):
+ res = self.client.create_container(BUSYBOX, 'true', name='foobar')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Name', inspect)
+ self.assertEqual('/foobar', inspect['Name'])
+
+ def create_container_privileged(self):
+ res = self.client.create_container(
+ BUSYBOX, 'true', host_config=self.client.create_host_config(
+ privileged=True, network_mode='none'
+ )
+ )
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+ # Since Nov 2013, the Privileged flag is no longer part of the
+ # container's config exposed via the API (safety concerns?).
+ #
+ if 'Privileged' in inspect['Config']:
+ self.assertEqual(inspect['Config']['Privileged'], True)
+
+ def test_create_with_mac_address(self):
+ mac_address_expected = "02:42:ac:11:00:0a"
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
+
+ id = container['Id']
+
+ self.client.start(container)
+ res = self.client.inspect_container(container['Id'])
+ self.assertEqual(mac_address_expected,
+ res['NetworkSettings']['MacAddress'])
+
+ self.client.kill(id)
+
+ @requires_api_version('1.20')
+ def test_group_id_ints(self):
+ container = self.client.create_container(
+ BUSYBOX, 'id -G',
+ host_config=self.client.create_host_config(group_add=[1000, 1001])
+ )
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ self.client.wait(container)
+
+ logs = self.client.logs(container)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ groups = logs.strip().split(' ')
+ self.assertIn('1000', groups)
+ self.assertIn('1001', groups)
+
+ @requires_api_version('1.20')
+ def test_group_id_strings(self):
+ container = self.client.create_container(
+ BUSYBOX, 'id -G', host_config=self.client.create_host_config(
+ group_add=['1000', '1001']
+ )
+ )
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ self.client.wait(container)
+
+ logs = self.client.logs(container)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+
+ groups = logs.strip().split(' ')
+ self.assertIn('1000', groups)
+ self.assertIn('1001', groups)
+
+ def test_valid_log_driver_and_log_opt(self):
+ log_config = docker.utils.LogConfig(
+ type='json-file',
+ config={'max-file': '100'}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], log_config.type)
+ self.assertEqual(container_log_config['Config'], log_config.config)
+
+ def test_invalid_log_driver_raises_exception(self):
+ log_config = docker.utils.LogConfig(
+ type='asdf-nope',
+ config={}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+
+ expected_msg = "logger: no log driver named 'asdf-nope' is registered"
+
+ with pytest.raises(docker.errors.APIError) as excinfo:
+ # raises an internal server error 500
+ self.client.start(container)
+
+ assert expected_msg in str(excinfo.value)
+
+ @pytest.mark.skipif(True,
+ reason="https://github.com/docker/docker/issues/15633")
+ def test_valid_no_log_driver_specified(self):
+ log_config = docker.utils.LogConfig(
+ type="",
+ config={'max-file': '100'}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], "json-file")
+ self.assertEqual(container_log_config['Config'], log_config.config)
+
+ def test_valid_no_config_specified(self):
+ log_config = docker.utils.LogConfig(
+ type="json-file",
+ config=None
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], "json-file")
+ self.assertEqual(container_log_config['Config'], {})
+
+ def check_container_data(self, inspect_data, rw):
+ if docker.utils.compare_version('1.20', self.client._version) < 0:
+ self.assertIn('Volumes', inspect_data)
+ self.assertIn(self.mount_dest, inspect_data['Volumes'])
+ self.assertEqual(
+ self.mount_origin, inspect_data['Volumes'][self.mount_dest]
+ )
+ self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
+ self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
+ else:
+ self.assertIn('Mounts', inspect_data)
+ filtered = list(filter(
+ lambda x: x['Destination'] == self.mount_dest,
+ inspect_data['Mounts']
+ ))
+ self.assertEqual(len(filtered), 1)
+ mount_data = filtered[0]
+ self.assertEqual(mount_data['Source'], self.mount_origin)
+ self.assertEqual(mount_data['RW'], rw)
+
+ def run_with_volume(self, ro, *args, **kwargs):
+ return self.run_container(
+ *args,
+ volumes={self.mount_dest: {}},
+ host_config=self.client.create_host_config(
+ binds={
+ self.mount_origin: {
+ 'bind': self.mount_dest,
+ 'ro': ro,
+ },
+ },
+ network_mode='none'
+ ),
+ **kwargs
+ )
+
+
+@requires_api_version('1.20')
+class ArchiveTest(api_test.BaseTestCase):
+ def test_get_file_archive_from_container(self):
+ data = 'The Maid and the Pocket Watch of Blood'
+ ctnr = self.client.create_container(
+ BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data),
+ volumes=['/vol1']
+ )
+ self.tmp_containers.append(ctnr)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ with tempfile.NamedTemporaryFile() as destination:
+ strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
+ for d in strm:
+ destination.write(d)
+ destination.seek(0)
+ retrieved_data = helpers.untar_file(destination, 'data.txt')
+ if six.PY3:
+ retrieved_data = retrieved_data.decode('utf-8')
+ self.assertEqual(data, retrieved_data.strip())
+
+ def test_get_file_stat_from_container(self):
+ data = 'The Maid and the Pocket Watch of Blood'
+ ctnr = self.client.create_container(
+ BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data),
+ volumes=['/vol1']
+ )
+ self.tmp_containers.append(ctnr)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
+ self.assertIn('name', stat)
+ self.assertEqual(stat['name'], 'data.txt')
+ self.assertIn('size', stat)
+ self.assertEqual(stat['size'], len(data))
+
+ def test_copy_file_to_container(self):
+ data = b'Deaf To All But The Song'
+ with tempfile.NamedTemporaryFile() as test_file:
+ test_file.write(data)
+ test_file.seek(0)
+ ctnr = self.client.create_container(
+ BUSYBOX,
+ 'cat {0}'.format(
+ os.path.join('/vol1', os.path.basename(test_file.name))
+ ),
+ volumes=['/vol1']
+ )
+ self.tmp_containers.append(ctnr)
+ with helpers.simple_tar(test_file.name) as test_tar:
+ self.client.put_archive(ctnr, '/vol1', test_tar)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ logs = self.client.logs(ctnr)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ data = data.decode('utf-8')
+ self.assertEqual(logs.strip(), data)
+
+ def test_copy_directory_to_container(self):
+ files = ['a.py', 'b.py', 'foo/b.py']
+ dirs = ['foo', 'bar']
+ base = helpers.make_tree(dirs, files)
+ ctnr = self.client.create_container(
+ BUSYBOX, 'ls -p /vol1', volumes=['/vol1']
+ )
+ self.tmp_containers.append(ctnr)
+ with docker.utils.tar(base) as test_tar:
+ self.client.put_archive(ctnr, '/vol1', test_tar)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ logs = self.client.logs(ctnr)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ results = logs.strip().split()
+ self.assertIn('a.py', results)
+ self.assertIn('b.py', results)
+ self.assertIn('foo/', results)
+ self.assertIn('bar/', results)
+
+
+class RenameContainerTest(api_test.BaseTestCase):
+ def test_rename_container(self):
+ version = self.client.version()['Version']
+ name = 'hong_meiling'
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.rename(res, name)
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Name', inspect)
+ if version == '1.5.0':
+ self.assertEqual(name, inspect['Name'])
+ else:
+ self.assertEqual('/{0}'.format(name), inspect['Name'])
+
+
+class StartContainerTest(api_test.BaseTestCase):
+ def test_start_container(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+
+ def test_start_container_with_dict_instead_of_id(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res)
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+
+ def test_run_shlex_commands(self):
+ commands = [
+ 'true',
+ 'echo "The Young Descendant of Tepes & Septette for the '
+ 'Dead Princess"',
+ 'echo -n "The Young Descendant of Tepes & Septette for the '
+ 'Dead Princess"',
+ '/bin/sh -c "echo Hello World"',
+ '/bin/sh -c \'echo "Hello World"\'',
+ 'echo "\"Night of Nights\""',
+ 'true && echo "Night of Nights"'
+ ]
+ for cmd in commands:
+ container = self.client.create_container(BUSYBOX, cmd)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0, msg=cmd)
+
+
+class WaitTest(api_test.BaseTestCase):
+ def test_wait(self):
+ res = self.client.create_container(BUSYBOX, ['sleep', '3'])
+ id = res['Id']
+ self.tmp_containers.append(id)
+ self.client.start(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ inspect = self.client.inspect_container(id)
+ self.assertIn('Running', inspect['State'])
+ self.assertEqual(inspect['State']['Running'], False)
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], exitcode)
+
+ def test_wait_with_dict_instead_of_id(self):
+ res = self.client.create_container(BUSYBOX, ['sleep', '3'])
+ id = res['Id']
+ self.tmp_containers.append(id)
+ self.client.start(res)
+ exitcode = self.client.wait(res)
+ self.assertEqual(exitcode, 0)
+ inspect = self.client.inspect_container(res)
+ self.assertIn('Running', inspect['State'])
+ self.assertEqual(inspect['State']['Running'], False)
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], exitcode)
+
+
+class LogsTest(api_test.BaseTestCase):
+ def test_logs(self):
+ snippet = 'Flowering Nights (Sakuya Iyazoi)'
+ container = self.client.create_container(
+ BUSYBOX, 'echo {0}'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(id)
+ self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+ def test_logs_tail_option(self):
+ snippet = '''Line1
+Line2'''
+ container = self.client.create_container(
+ BUSYBOX, 'echo "{0}"'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(id, tail=1)
+ self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
+
+# def test_logs_streaming(self):
+# snippet = 'Flowering Nights (Sakuya Iyazoi)'
+# container = self.client.create_container(
+# BUSYBOX, 'echo {0}'.format(snippet)
+# )
+# id = container['Id']
+# self.client.start(id)
+# self.tmp_containers.append(id)
+# logs = bytes() if six.PY3 else str()
+# for chunk in self.client.logs(id, stream=True):
+# logs += chunk
+
+# exitcode = self.client.wait(id)
+# self.assertEqual(exitcode, 0)
+
+# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+ def test_logs_with_dict_instead_of_id(self):
+ snippet = 'Flowering Nights (Sakuya Iyazoi)'
+ container = self.client.create_container(
+ BUSYBOX, 'echo {0}'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(container)
+ self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+
+class DiffTest(api_test.BaseTestCase):
+ def test_diff(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ diff = self.client.diff(id)
+ test_diff = [x for x in diff if x.get('Path', None) == '/test']
+ self.assertEqual(len(test_diff), 1)
+ self.assertIn('Kind', test_diff[0])
+ self.assertEqual(test_diff[0]['Kind'], 1)
+
+ def test_diff_with_dict_instead_of_id(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ diff = self.client.diff(container)
+ test_diff = [x for x in diff if x.get('Path', None) == '/test']
+ self.assertEqual(len(test_diff), 1)
+ self.assertIn('Kind', test_diff[0])
+ self.assertEqual(test_diff[0]['Kind'], 1)
+
+
+class StopTest(api_test.BaseTestCase):
+ def test_stop(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.stop(id, timeout=2)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if api_test.exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+ def test_stop_with_dict_instead_of_id(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ self.assertIn('Id', container)
+ id = container['Id']
+ self.client.start(container)
+ self.tmp_containers.append(id)
+ self.client.stop(container, timeout=2)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if api_test.exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+
+class KillTest(api_test.BaseTestCase):
+ def test_kill(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if api_test.exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+ def test_kill_with_dict_instead_of_id(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(container)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if api_test.exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+ def test_kill_with_signal(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '60'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(id, signal=signal.SIGKILL)
+ exitcode = self.client.wait(id)
+ self.assertNotEqual(exitcode, 0)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False, state)
+
+
+class PortTest(api_test.BaseTestCase):
+ def test_port(self):
+
+ port_bindings = {
+ '1111': ('127.0.0.1', '4567'),
+ '2222': ('127.0.0.1', '4568')
+ }
+
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
+ host_config=self.client.create_host_config(
+ port_bindings=port_bindings, network_mode='bridge'
+ )
+ )
+ id = container['Id']
+
+ self.client.start(container)
+
+ # Call the port function on each biding and compare expected vs actual
+ for port in port_bindings:
+ actual_bindings = self.client.port(container, port)
+ port_binding = actual_bindings.pop()
+
+ ip, host_port = port_binding['HostIp'], port_binding['HostPort']
+
+ self.assertEqual(ip, port_bindings[port][0])
+ self.assertEqual(host_port, port_bindings[port][1])
+
+ self.client.kill(id)
+
+
+class ContainerTopTest(api_test.BaseTestCase):
+ def test_top(self):
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'])
+
+ id = container['Id']
+
+ self.client.start(container)
+ res = self.client.top(container['Id'])
+ self.assertEqual(
+ res['Titles'],
+ ['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD']
+ )
+ self.assertEqual(len(res['Processes']), 1)
+ self.assertEqual(res['Processes'][0][7], 'sleep 60')
+ self.client.kill(id)
+
+ def test_top_with_psargs(self):
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'])
+
+ id = container['Id']
+
+ self.client.start(container)
+ res = self.client.top(container['Id'], 'waux')
+ self.assertEqual(
+ res['Titles'],
+ ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS',
+ 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'],
+ )
+ self.assertEqual(len(res['Processes']), 1)
+ self.assertEqual(res['Processes'][0][10], 'sleep 60')
+ self.client.kill(id)
+
+
+class RestartContainerTest(api_test.BaseTestCase):
+ def test_restart(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ info = self.client.inspect_container(id)
+ self.assertIn('State', info)
+ self.assertIn('StartedAt', info['State'])
+ start_time1 = info['State']['StartedAt']
+ self.client.restart(id, timeout=2)
+ info2 = self.client.inspect_container(id)
+ self.assertIn('State', info2)
+ self.assertIn('StartedAt', info2['State'])
+ start_time2 = info2['State']['StartedAt']
+ self.assertNotEqual(start_time1, start_time2)
+ self.assertIn('Running', info2['State'])
+ self.assertEqual(info2['State']['Running'], True)
+ self.client.kill(id)
+
+ def test_restart_with_dict_instead_of_id(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ self.assertIn('Id', container)
+ id = container['Id']
+ self.client.start(container)
+ self.tmp_containers.append(id)
+ info = self.client.inspect_container(id)
+ self.assertIn('State', info)
+ self.assertIn('StartedAt', info['State'])
+ start_time1 = info['State']['StartedAt']
+ self.client.restart(container, timeout=2)
+ info2 = self.client.inspect_container(id)
+ self.assertIn('State', info2)
+ self.assertIn('StartedAt', info2['State'])
+ start_time2 = info2['State']['StartedAt']
+ self.assertNotEqual(start_time1, start_time2)
+ self.assertIn('Running', info2['State'])
+ self.assertEqual(info2['State']['Running'], True)
+ self.client.kill(id)
+
+
+class RemoveContainerTest(api_test.BaseTestCase):
+ def test_remove(self):
+ container = self.client.create_container(BUSYBOX, ['true'])
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ self.client.remove_container(id)
+ containers = self.client.containers(all=True)
+ res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
+ self.assertEqual(len(res), 0)
+
+ def test_remove_with_dict_instead_of_id(self):
+ container = self.client.create_container(BUSYBOX, ['true'])
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ self.client.remove_container(container)
+ containers = self.client.containers(all=True)
+ res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
+ self.assertEqual(len(res), 0)
+
+
+class AttachContainerTest(api_test.BaseTestCase):
+ def test_run_container_streaming(self):
+ container = self.client.create_container(BUSYBOX, '/bin/sh',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ sock = self.client.attach_socket(container, ws=False)
+ self.assertTrue(sock.fileno() > -1)
+
+ def test_run_container_reading_socket(self):
+ line = 'hi there and stuff and things, words!'
+ command = "echo '{0}'".format(line)
+ container = self.client.create_container(BUSYBOX, command,
+ detach=True, tty=False)
+ ident = container['Id']
+ self.tmp_containers.append(ident)
+
+ opts = {"stdout": 1, "stream": 1, "logs": 1}
+ pty_stdout = self.client.attach_socket(ident, opts)
+ self.client.start(ident)
+
+ recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
+
+ def read(n=4096):
+ """Code stolen from dockerpty to read the socket"""
+ try:
+ if hasattr(pty_stdout, 'recv'):
+ return pty_stdout.recv(n)
+ return os.read(pty_stdout.fileno(), n)
+ except EnvironmentError as e:
+ if e.errno not in recoverable_errors:
+ raise
+
+ def next_packet_size():
+ """Code stolen from dockerpty to get the next packet size"""
+ data = six.binary_type()
+ while len(data) < 8:
+ next_data = read(8 - len(data))
+ if not next_data:
+ return 0
+ data = data + next_data
+
+ if data is None:
+ return 0
+
+ if len(data) == 8:
+ _, actual = struct.unpack('>BxxxL', data)
+ return actual
+
+ next_size = next_packet_size()
+ self.assertEqual(next_size, len(line) + 1)
+
+ data = six.binary_type()
+ while len(data) < next_size:
+ next_data = read(next_size - len(data))
+ if not next_data:
+ assert False, "Failed trying to read in the dataz"
+ data += next_data
+ self.assertEqual(data.decode('utf-8'), "{0}\n".format(line))
+ pty_stdout.close()
+
+ # Prevent segfault at the end of the test run
+ if hasattr(pty_stdout, "_response"):
+ del pty_stdout._response
+
+
+class PauseTest(api_test.BaseTestCase):
+ def test_pause_unpause(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.tmp_containers.append(id)
+ self.client.start(container)
+ self.client.pause(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], True)
+ self.assertIn('Paused', state)
+ self.assertEqual(state['Paused'], True)
+
+ self.client.unpause(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], True)
+ self.assertIn('Paused', state)
+ self.assertEqual(state['Paused'], False)
diff --git a/tests/integration/exec_test.py b/tests/integration/exec_test.py
new file mode 100644
index 0000000..39883d6
--- /dev/null
+++ b/tests/integration/exec_test.py
@@ -0,0 +1,106 @@
+import pytest
+
+from . import api_test
+
+BUSYBOX = api_test.BUSYBOX
+
+
+class ExecTest(api_test.BaseTestCase):
+ def test_execute_command(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, ['echo', 'hello'])
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'hello\n')
+
+ def test_exec_command_string(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'echo hello world')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'hello world\n')
+
+ def test_exec_command_as_user(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'whoami', user='default')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'default\n')
+
+ def test_exec_command_as_root(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'whoami')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'root\n')
+
+ def test_exec_command_streaming(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
+ self.assertIn('Id', exec_id)
+
+ res = b''
+ for chunk in self.client.exec_start(exec_id, stream=True):
+ res += chunk
+ self.assertEqual(res, b'hello\nworld\n')
+
+ def test_exec_inspect(self):
+ if not api_test.exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
+ self.assertIn('Id', exec_id)
+ self.client.exec_start(exec_id)
+ exec_info = self.client.exec_inspect(exec_id)
+ self.assertIn('ExitCode', exec_info)
+ self.assertNotEqual(exec_info['ExitCode'], 0)
diff --git a/tests/integration/image_test.py b/tests/integration/image_test.py
new file mode 100644
index 0000000..6c186e4
--- /dev/null
+++ b/tests/integration/image_test.py
@@ -0,0 +1,235 @@
+import contextlib
+import json
+import shutil
+import socket
+import tarfile
+import tempfile
+import threading
+
+import pytest
+import six
+from six.moves import BaseHTTPServer
+from six.moves import socketserver
+
+
+import docker
+
+from . import api_test
+
+BUSYBOX = api_test.BUSYBOX
+
+
+class ListImagesTest(api_test.BaseTestCase):
+ def test_images(self):
+ res1 = self.client.images(all=True)
+ self.assertIn('Id', res1[0])
+ res10 = res1[0]
+ self.assertIn('Created', res10)
+ self.assertIn('RepoTags', res10)
+ distinct = []
+ for img in res1:
+ if img['Id'] not in distinct:
+ distinct.append(img['Id'])
+ self.assertEqual(len(distinct), self.client.info()['Images'])
+
+ def test_images_quiet(self):
+ res1 = self.client.images(quiet=True)
+ self.assertEqual(type(res1[0]), six.text_type)
+
+
+class PullImageTest(api_test.BaseTestCase):
+ def test_pull(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ res = self.client.pull('hello-world')
+ self.tmp_imgs.append('hello-world')
+ self.assertEqual(type(res), six.text_type)
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+ def test_pull_streaming(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ stream = self.client.pull('hello-world', stream=True)
+ self.tmp_imgs.append('hello-world')
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ json.loads(chunk) # ensure chunk is a single, valid JSON blob
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+
+class CommitTest(api_test.BaseTestCase):
+ def test_commit(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ img = self.client.inspect_image(img_id)
+ self.assertIn('Container', img)
+ self.assertTrue(img['Container'].startswith(id))
+ self.assertIn('ContainerConfig', img)
+ self.assertIn('Image', img['ContainerConfig'])
+ self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
+ busybox_id = self.client.inspect_image(BUSYBOX)['Id']
+ self.assertIn('Parent', img)
+ self.assertEqual(img['Parent'], busybox_id)
+
+
+class RemoveImageTest(api_test.BaseTestCase):
+ def test_remove(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ self.client.remove_image(img_id, force=True)
+ images = self.client.images(all=True)
+ res = [x for x in images if x['Id'].startswith(img_id)]
+ self.assertEqual(len(res), 0)
+
+
+class ImportImageTest(api_test.BaseTestCase):
+ '''Base class for `docker import` test cases.'''
+
+ TAR_SIZE = 512 * 1024
+
+ def write_dummy_tar_content(self, n_bytes, tar_fd):
+ def extend_file(f, n_bytes):
+ f.seek(n_bytes - 1)
+ f.write(bytearray([65]))
+ f.seek(0)
+
+ tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
+
+ with tempfile.NamedTemporaryFile() as f:
+ extend_file(f, n_bytes)
+ tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
+ tar.addfile(tarinfo, fileobj=f)
+
+ tar.close()
+
+ @contextlib.contextmanager
+ def dummy_tar_stream(self, n_bytes):
+ '''Yields a stream that is valid tar data of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file
+
+ @contextlib.contextmanager
+ def dummy_tar_file(self, n_bytes):
+ '''Yields the name of a valid tar file of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file.name
+
+ def test_import_from_bytes(self):
+ with self.dummy_tar_stream(n_bytes=500) as f:
+ content = f.read()
+
+ # The generic import_image() function cannot import in-memory bytes
+ # data that happens to be represented as a string type, because
+ # import_image() will try to use it as a filename and usually then
+ # trigger an exception. So we test the import_image_from_data()
+ # function instead.
+ statuses = self.client.import_image_from_data(
+ content, repository='test/import-from-bytes')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ def test_import_from_file(self):
+ with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
+ # statuses = self.client.import_image(
+ # src=tar_filename, repository='test/import-from-file')
+ statuses = self.client.import_image_from_file(
+ tar_filename, repository='test/import-from-file')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ def test_import_from_stream(self):
+ with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
+ statuses = self.client.import_image(
+ src=tar_stream, repository='test/import-from-stream')
+ # statuses = self.client.import_image_from_stream(
+ # tar_stream, repository='test/import-from-stream')
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ @contextlib.contextmanager
+ def temporary_http_file_server(self, stream):
+ '''Serve data from an IO stream over HTTP.'''
+
+ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/x-tar')
+ self.end_headers()
+ shutil.copyfileobj(stream, self.wfile)
+
+ server = socketserver.TCPServer(('', 0), Handler)
+ thread = threading.Thread(target=server.serve_forever)
+ thread.setDaemon(True)
+ thread.start()
+
+ yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
+
+ server.shutdown()
+
+ @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
+ def test_import_from_url(self):
+ # The crappy test HTTP server doesn't handle large files well, so use
+ # a small file.
+ tar_size = 10240
+
+ with self.dummy_tar_stream(n_bytes=tar_size) as tar_data:
+ with self.temporary_http_file_server(tar_data) as url:
+ statuses = self.client.import_image(
+ src=url, repository='test/import-from-url')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
diff --git a/tests/integration/network_test.py b/tests/integration/network_test.py
new file mode 100644
index 0000000..e009660
--- /dev/null
+++ b/tests/integration/network_test.py
@@ -0,0 +1,98 @@
+import random
+
+import docker
+import pytest
+
+from . import api_test
+from ..base import requires_api_version
+
+
+@requires_api_version('1.21')
+class TestNetworks(api_test.BaseTestCase):
+ def create_network(self, *args, **kwargs):
+ net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
+ net_id = self.client.create_network(net_name, *args, **kwargs)['id']
+ self.tmp_networks.append(net_id)
+ return (net_name, net_id)
+
+ def test_list_networks(self):
+ networks = self.client.networks()
+ initial_size = len(networks)
+
+ net_name, net_id = self.create_network()
+
+ networks = self.client.networks()
+ self.assertEqual(len(networks), initial_size + 1)
+ self.assertTrue(net_id in [n['id'] for n in networks])
+
+ networks_by_name = self.client.networks(names=[net_name])
+ self.assertEqual([n['id'] for n in networks_by_name], [net_id])
+
+ networks_by_partial_id = self.client.networks(ids=[net_id[:8]])
+ self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id])
+
+ def test_inspect_network(self):
+ net_name, net_id = self.create_network()
+
+ net = self.client.inspect_network(net_id)
+ self.assertEqual(net, {
+ u'name': net_name,
+ u'id': net_id,
+ u'driver': 'bridge',
+ u'containers': {},
+ })
+
+ def test_create_network_with_host_driver_fails(self):
+ net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
+
+ with pytest.raises(docker.errors.APIError):
+ self.client.create_network(net_name, driver='host')
+
+ def test_remove_network(self):
+ initial_size = len(self.client.networks())
+
+ net_name, net_id = self.create_network()
+ self.assertEqual(len(self.client.networks()), initial_size + 1)
+
+ self.client.remove_network(net_id)
+ self.assertEqual(len(self.client.networks()), initial_size)
+
+ def test_connect_and_disconnect_container(self):
+ net_name, net_id = self.create_network()
+
+ container = self.client.create_container('busybox', 'top')
+ self.tmp_containers.append(container)
+ self.client.start(container)
+
+ network_data = self.client.inspect_network(net_id)
+ self.assertFalse(network_data.get('containers'))
+
+ self.client.connect_container_to_network(container, net_id)
+ network_data = self.client.inspect_network(net_id)
+ self.assertEqual(
+ list(network_data['containers'].keys()),
+ [container['Id']])
+
+ self.client.disconnect_container_from_network(container, net_id)
+ network_data = self.client.inspect_network(net_id)
+ self.assertFalse(network_data.get('containers'))
+
+ def test_connect_on_container_create(self):
+ net_name, net_id = self.create_network()
+
+ container = self.client.create_container(
+ image='busybox',
+ command='top',
+ host_config=self.client.create_host_config(network_mode=net_name),
+ )
+ self.tmp_containers.append(container)
+ self.client.start(container)
+
+ network_data = self.client.inspect_network(net_id)
+ self.assertEqual(
+ list(network_data['containers'].keys()),
+ [container['Id']])
+
+ self.client.disconnect_container_from_network(container, net_id)
+ network_data = self.client.inspect_network(net_id)
+ self.assertFalse(network_data.get('containers'))
diff --git a/tests/integration/regression_test.py b/tests/integration/regression_test.py
new file mode 100644
index 0000000..dac8806
--- /dev/null
+++ b/tests/integration/regression_test.py
@@ -0,0 +1,69 @@
+import io
+import random
+
+import docker
+import six
+
+from . import api_test
+
+BUSYBOX = api_test.BUSYBOX
+
+
+class TestRegressions(api_test.BaseTestCase):
+ def test_443_handle_nonchunked_response_in_stream(self):
+ dfile = io.BytesIO()
+ with self.assertRaises(docker.errors.APIError) as exc:
+ for line in self.client.build(fileobj=dfile, tag="a/b/c"):
+ pass
+ self.assertEqual(exc.exception.response.status_code, 500)
+ dfile.close()
+
+ def test_542_truncate_ids_client_side(self):
+ self.client.start(
+ self.client.create_container(BUSYBOX, ['true'])
+ )
+ result = self.client.containers(all=True, trunc=True)
+ self.assertEqual(len(result[0]['Id']), 12)
+
+ def test_647_support_doubleslash_in_image_names(self):
+ with self.assertRaises(docker.errors.APIError):
+ self.client.inspect_image('gensokyo.jp//kirisame')
+
+ def test_649_handle_timeout_value_none(self):
+ self.client.timeout = None
+ ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
+ self.client.start(ctnr)
+ self.client.stop(ctnr)
+
+ def test_715_handle_user_param_as_int_value(self):
+ ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ logs = self.client.logs(ctnr)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ assert logs == '1000\n'
+
+ def test_792_explicit_port_protocol(self):
+
+ tcp_port, udp_port = random.sample(range(9999, 32000), 2)
+ ctnr = self.client.create_container(
+ BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')],
+ host_config=self.client.create_host_config(
+ port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port}
+ )
+ )
+ self.tmp_containers.append(ctnr)
+ self.client.start(ctnr)
+ self.assertEqual(
+ self.client.port(ctnr, 2000)[0]['HostPort'],
+ six.text_type(tcp_port)
+ )
+ self.assertEqual(
+ self.client.port(ctnr, '2000/tcp')[0]['HostPort'],
+ six.text_type(tcp_port)
+ )
+ self.assertEqual(
+ self.client.port(ctnr, '2000/udp')[0]['HostPort'],
+ six.text_type(udp_port)
+ )
diff --git a/tests/integration/volume_test.py b/tests/integration/volume_test.py
new file mode 100644
index 0000000..f1fdf92
--- /dev/null
+++ b/tests/integration/volume_test.py
@@ -0,0 +1,56 @@
+import docker
+import pytest
+
+from . import api_test
+from ..base import requires_api_version
+
+
+@requires_api_version('1.21')
+class TestVolumes(api_test.BaseTestCase):
+ def test_create_volume(self):
+ name = 'perfectcherryblossom'
+ self.tmp_volumes.append(name)
+ result = self.client.create_volume(name)
+ self.assertIn('Name', result)
+ self.assertEqual(result['Name'], name)
+ self.assertIn('Driver', result)
+ self.assertEqual(result['Driver'], 'local')
+
+ def test_create_volume_invalid_driver(self):
+ driver_name = 'invalid.driver'
+
+ with pytest.raises(docker.errors.NotFound):
+ self.client.create_volume('perfectcherryblossom', driver_name)
+
+ def test_list_volumes(self):
+ name = 'imperishablenight'
+ self.tmp_volumes.append(name)
+ volume_info = self.client.create_volume(name)
+ result = self.client.volumes()
+ self.assertIn('Volumes', result)
+ volumes = result['Volumes']
+ self.assertIn(volume_info, volumes)
+
+ def test_inspect_volume(self):
+ name = 'embodimentofscarletdevil'
+ self.tmp_volumes.append(name)
+ volume_info = self.client.create_volume(name)
+ result = self.client.inspect_volume(name)
+ self.assertEqual(volume_info, result)
+
+ def test_inspect_nonexistent_volume(self):
+ name = 'embodimentofscarletdevil'
+ with pytest.raises(docker.errors.NotFound):
+ self.client.inspect_volume(name)
+
+ def test_remove_volume(self):
+ name = 'shootthebullet'
+ self.tmp_volumes.append(name)
+ self.client.create_volume(name)
+ result = self.client.remove_volume(name)
+ self.assertTrue(result)
+
+ def test_remove_nonexistent_volume(self):
+ name = 'shootthebullet'
+ with pytest.raises(docker.errors.NotFound):
+ self.client.remove_volume(name)
diff --git a/tests/integration_test.py b/tests/integration_test.py
index 85eb8da..73cbf93 100644
--- a/tests/integration_test.py
+++ b/tests/integration_test.py
@@ -1,2040 +1,4 @@
-# Copyright 2013 dotCloud inc.
-
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import base64
-import contextlib
-import errno
-import json
-import io
-import os
-import random
-import shutil
-import signal
-import socket
-import struct
-import tarfile
-import tempfile
-import threading
-import time
-import unittest
-import warnings
-
-import pytest
-import six
-from six.moves import BaseHTTPServer
-from six.moves import socketserver
-
-import docker
-from docker.errors import APIError, NotFound
-from docker.utils import kwargs_from_env
-
-from . import helpers
-from .base import requires_api_version
-from .test import Cleanup
-
-
-# FIXME: missing tests for
-# export; history; insert; port; push; tag; get; load; stats
-
-warnings.simplefilter('error')
-compare_version = docker.utils.compare_version
-
-EXEC_DRIVER = []
-BUSYBOX = 'busybox:buildroot-2014.02'
-
-
-def exec_driver_is_native():
- global EXEC_DRIVER
- if not EXEC_DRIVER:
- c = docker_client()
- EXEC_DRIVER = c.info()['ExecutionDriver']
- c.close()
- return EXEC_DRIVER.startswith('native')
-
-
-def docker_client(**kwargs):
- return docker.Client(**docker_client_kwargs(**kwargs))
-
-
-def docker_client_kwargs(**kwargs):
- client_kwargs = kwargs_from_env(assert_hostname=False)
- client_kwargs.update(kwargs)
- return client_kwargs
-
-
-def setup_module():
- c = docker_client()
- try:
- c.inspect_image(BUSYBOX)
- except NotFound:
- os.write(2, "\npulling busybox\n".encode('utf-8'))
- for data in c.pull(BUSYBOX, stream=True):
- data = json.loads(data.decode('utf-8'))
- os.write(2, ("%c[2K\r" % 27).encode('utf-8'))
- status = data.get("status")
- progress = data.get("progress")
- detail = "{0} - {1}".format(status, progress).encode('utf-8')
- os.write(2, detail)
- os.write(2, "\npulled busybox\n".encode('utf-8'))
-
- # Double make sure we now have busybox
- c.inspect_image(BUSYBOX)
- c.close()
-
-
-class BaseTestCase(unittest.TestCase):
- tmp_imgs = []
- tmp_containers = []
- tmp_folders = []
- tmp_volumes = []
-
- def setUp(self):
- if six.PY2:
- self.assertRegex = self.assertRegexpMatches
- self.assertCountEqual = self.assertItemsEqual
- self.client = docker_client(timeout=60)
- self.tmp_imgs = []
- self.tmp_containers = []
- self.tmp_folders = []
- self.tmp_volumes = []
- self.tmp_networks = []
-
- def tearDown(self):
- for img in self.tmp_imgs:
- try:
- self.client.remove_image(img)
- except docker.errors.APIError:
- pass
- for container in self.tmp_containers:
- try:
- self.client.stop(container, timeout=1)
- self.client.remove_container(container)
- except docker.errors.APIError:
- pass
- for network in self.tmp_networks:
- try:
- self.client.remove_network(network)
- except docker.errors.APIError:
- pass
- for folder in self.tmp_folders:
- shutil.rmtree(folder)
-
- for volume in self.tmp_volumes:
- try:
- self.client.remove_volume(volume)
- except docker.errors.APIError:
- pass
-
- self.client.close()
-
- def run_container(self, *args, **kwargs):
- container = self.client.create_container(*args, **kwargs)
- self.tmp_containers.append(container)
- self.client.start(container)
- exitcode = self.client.wait(container)
-
- if exitcode != 0:
- output = self.client.logs(container)
- raise Exception(
- "Container exited with code {}:\n{}"
- .format(exitcode, output))
-
- return container
-
-
-#########################
-# INFORMATION TESTS #
-#########################
-
-
-class TestVersion(BaseTestCase):
- def runTest(self):
- res = self.client.version()
- self.assertIn('GoVersion', res)
- self.assertIn('Version', res)
- self.assertEqual(len(res['Version'].split('.')), 3)
-
-
-class TestInfo(BaseTestCase):
- def runTest(self):
- res = self.client.info()
- self.assertIn('Containers', res)
- self.assertIn('Images', res)
- self.assertIn('Debug', res)
-
-
-class TestSearch(BaseTestCase):
- def runTest(self):
- self.client = docker_client(timeout=10)
- res = self.client.search('busybox')
- self.assertTrue(len(res) >= 1)
- base_img = [x for x in res if x['name'] == 'busybox']
- self.assertEqual(len(base_img), 1)
- self.assertIn('description', base_img[0])
-
-###################
-# LISTING TESTS #
-###################
-
-
-class TestImages(BaseTestCase):
- def runTest(self):
- res1 = self.client.images(all=True)
- self.assertIn('Id', res1[0])
- res10 = res1[0]
- self.assertIn('Created', res10)
- self.assertIn('RepoTags', res10)
- distinct = []
- for img in res1:
- if img['Id'] not in distinct:
- distinct.append(img['Id'])
- self.assertEqual(len(distinct), self.client.info()['Images'])
-
-
-class TestImageIds(BaseTestCase):
- def runTest(self):
- res1 = self.client.images(quiet=True)
- self.assertEqual(type(res1[0]), six.text_type)
-
-
-class TestListContainers(BaseTestCase):
- def runTest(self):
- res0 = self.client.containers(all=True)
- size = len(res0)
- res1 = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res1)
- self.client.start(res1['Id'])
- self.tmp_containers.append(res1['Id'])
- res2 = self.client.containers(all=True)
- self.assertEqual(size + 1, len(res2))
- retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
- self.assertEqual(len(retrieved), 1)
- retrieved = retrieved[0]
- self.assertIn('Command', retrieved)
- self.assertEqual(retrieved['Command'], six.text_type('true'))
- self.assertIn('Image', retrieved)
- self.assertRegex(retrieved['Image'], r'busybox:.*')
- self.assertIn('Status', retrieved)
-
-#####################
-# CONTAINER TESTS #
-#####################
-
-
-class TestCreateContainer(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
-
-
-class TestCreateContainerWithBinds(BaseTestCase):
- def setUp(self):
- super(TestCreateContainerWithBinds, self).setUp()
-
- self.mount_dest = '/mnt'
-
- # Get a random pathname - we don't need it to exist locally
- self.mount_origin = tempfile.mkdtemp()
- shutil.rmtree(self.mount_origin)
-
- self.filename = 'shared.txt'
-
- self.run_with_volume(
- False,
- BUSYBOX,
- ['touch', os.path.join(self.mount_dest, self.filename)],
- )
-
- def run_with_volume(self, ro, *args, **kwargs):
- return self.run_container(
- *args,
- volumes={self.mount_dest: {}},
- host_config=self.client.create_host_config(
- binds={
- self.mount_origin: {
- 'bind': self.mount_dest,
- 'ro': ro,
- },
- },
- network_mode='none'
- ),
- **kwargs
- )
-
- def test_rw(self):
- container = self.run_with_volume(
- False,
- BUSYBOX,
- ['ls', self.mount_dest],
- )
- logs = self.client.logs(container)
-
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn(self.filename, logs)
- inspect_data = self.client.inspect_container(container)
- self.check_container_data(inspect_data, True)
-
- def test_ro(self):
- container = self.run_with_volume(
- True,
- BUSYBOX,
- ['ls', self.mount_dest],
- )
- logs = self.client.logs(container)
-
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn(self.filename, logs)
-
- inspect_data = self.client.inspect_container(container)
- self.check_container_data(inspect_data, False)
-
- def check_container_data(self, inspect_data, rw):
- if docker.utils.compare_version('1.20', self.client._version) < 0:
- self.assertIn('Volumes', inspect_data)
- self.assertIn(self.mount_dest, inspect_data['Volumes'])
- self.assertEqual(
- self.mount_origin, inspect_data['Volumes'][self.mount_dest]
- )
- self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
- self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
- else:
- self.assertIn('Mounts', inspect_data)
- filtered = list(filter(
- lambda x: x['Destination'] == self.mount_dest,
- inspect_data['Mounts']
- ))
- self.assertEqual(len(filtered), 1)
- mount_data = filtered[0]
- self.assertEqual(mount_data['Source'], self.mount_origin)
- self.assertEqual(mount_data['RW'], rw)
-
-
-@requires_api_version('1.20')
-class CreateContainerWithGroupAddTest(BaseTestCase):
- def test_group_id_ints(self):
- container = self.client.create_container(
- BUSYBOX, 'id -G',
- host_config=self.client.create_host_config(group_add=[1000, 1001])
- )
- self.tmp_containers.append(container)
- self.client.start(container)
- self.client.wait(container)
-
- logs = self.client.logs(container)
- if six.PY3:
- logs = logs.decode('utf-8')
- groups = logs.strip().split(' ')
- self.assertIn('1000', groups)
- self.assertIn('1001', groups)
-
- def test_group_id_strings(self):
- container = self.client.create_container(
- BUSYBOX, 'id -G', host_config=self.client.create_host_config(
- group_add=['1000', '1001']
- )
- )
- self.tmp_containers.append(container)
- self.client.start(container)
- self.client.wait(container)
-
- logs = self.client.logs(container)
- if six.PY3:
- logs = logs.decode('utf-8')
-
- groups = logs.strip().split(' ')
- self.assertIn('1000', groups)
- self.assertIn('1001', groups)
-
-
-class CreateContainerWithLogConfigTest(BaseTestCase):
- def test_valid_log_driver_and_log_opt(self):
- log_config = docker.utils.LogConfig(
- type='json-file',
- config={'max-file': '100'}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], log_config.type)
- self.assertEqual(container_log_config['Config'], log_config.config)
-
- def test_invalid_log_driver_raises_exception(self):
- log_config = docker.utils.LogConfig(
- type='asdf-nope',
- config={}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
-
- expected_msg = "logger: no log driver named 'asdf-nope' is registered"
-
- with pytest.raises(APIError) as excinfo:
- # raises an internal server error 500
- self.client.start(container)
-
- assert expected_msg in str(excinfo.value)
-
- @pytest.mark.skipif(True,
- reason="https://github.com/docker/docker/issues/15633")
- def test_valid_no_log_driver_specified(self):
- log_config = docker.utils.LogConfig(
- type="",
- config={'max-file': '100'}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], "json-file")
- self.assertEqual(container_log_config['Config'], log_config.config)
-
- def test_valid_no_config_specified(self):
- log_config = docker.utils.LogConfig(
- type="json-file",
- config=None
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], "json-file")
- self.assertEqual(container_log_config['Config'], {})
-
-
-@requires_api_version('1.20')
-class GetArchiveTest(BaseTestCase):
- def test_get_file_archive_from_container(self):
- data = 'The Maid and the Pocket Watch of Blood'
- ctnr = self.client.create_container(
- BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data),
- volumes=['/vol1']
- )
- self.tmp_containers.append(ctnr)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- with tempfile.NamedTemporaryFile() as destination:
- strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
- for d in strm:
- destination.write(d)
- destination.seek(0)
- retrieved_data = helpers.untar_file(destination, 'data.txt')
- if six.PY3:
- retrieved_data = retrieved_data.decode('utf-8')
- self.assertEqual(data, retrieved_data.strip())
-
- def test_get_file_stat_from_container(self):
- data = 'The Maid and the Pocket Watch of Blood'
- ctnr = self.client.create_container(
- BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data),
- volumes=['/vol1']
- )
- self.tmp_containers.append(ctnr)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
- self.assertIn('name', stat)
- self.assertEqual(stat['name'], 'data.txt')
- self.assertIn('size', stat)
- self.assertEqual(stat['size'], len(data))
-
-
-@requires_api_version('1.20')
-class PutArchiveTest(BaseTestCase):
- def test_copy_file_to_container(self):
- data = b'Deaf To All But The Song'
- with tempfile.NamedTemporaryFile() as test_file:
- test_file.write(data)
- test_file.seek(0)
- ctnr = self.client.create_container(
- BUSYBOX,
- 'cat {0}'.format(
- os.path.join('/vol1', os.path.basename(test_file.name))
- ),
- volumes=['/vol1']
- )
- self.tmp_containers.append(ctnr)
- with helpers.simple_tar(test_file.name) as test_tar:
- self.client.put_archive(ctnr, '/vol1', test_tar)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- logs = self.client.logs(ctnr)
- if six.PY3:
- logs = logs.decode('utf-8')
- data = data.decode('utf-8')
- self.assertEqual(logs.strip(), data)
-
- def test_copy_directory_to_container(self):
- files = ['a.py', 'b.py', 'foo/b.py']
- dirs = ['foo', 'bar']
- base = helpers.make_tree(dirs, files)
- ctnr = self.client.create_container(
- BUSYBOX, 'ls -p /vol1', volumes=['/vol1']
- )
- self.tmp_containers.append(ctnr)
- with docker.utils.tar(base) as test_tar:
- self.client.put_archive(ctnr, '/vol1', test_tar)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- logs = self.client.logs(ctnr)
- if six.PY3:
- logs = logs.decode('utf-8')
- results = logs.strip().split()
- self.assertIn('a.py', results)
- self.assertIn('b.py', results)
- self.assertIn('foo/', results)
- self.assertIn('bar/', results)
-
-
-class TestCreateContainerReadOnlyFs(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- ctnr = self.client.create_container(
- BUSYBOX, ['mkdir', '/shrine'],
- host_config=self.client.create_host_config(
- read_only=True, network_mode='none'
- )
- )
- self.assertIn('Id', ctnr)
- self.tmp_containers.append(ctnr['Id'])
- self.client.start(ctnr)
- res = self.client.wait(ctnr)
- self.assertNotEqual(res, 0)
-
-
-class TestCreateContainerWithName(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true', name='foobar')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Name', inspect)
- self.assertEqual('/foobar', inspect['Name'])
-
-
-class TestRenameContainer(BaseTestCase):
- def runTest(self):
- version = self.client.version()['Version']
- name = 'hong_meiling'
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.rename(res, name)
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Name', inspect)
- if version == '1.5.0':
- self.assertEqual(name, inspect['Name'])
- else:
- self.assertEqual('/{0}'.format(name), inspect['Name'])
-
-
-class TestStartContainer(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
-
-
-class TestStartContainerWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res)
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
-
-
-class TestCreateContainerPrivileged(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(
- BUSYBOX, 'true', host_config=self.client.create_host_config(
- privileged=True, network_mode='none'
- )
- )
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
- # Since Nov 2013, the Privileged flag is no longer part of the
- # container's config exposed via the API (safety concerns?).
- #
- if 'Privileged' in inspect['Config']:
- self.assertEqual(inspect['Config']['Privileged'], True)
-
-
-class TestWait(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, ['sleep', '3'])
- id = res['Id']
- self.tmp_containers.append(id)
- self.client.start(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- inspect = self.client.inspect_container(id)
- self.assertIn('Running', inspect['State'])
- self.assertEqual(inspect['State']['Running'], False)
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], exitcode)
-
-
-class TestWaitWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, ['sleep', '3'])
- id = res['Id']
- self.tmp_containers.append(id)
- self.client.start(res)
- exitcode = self.client.wait(res)
- self.assertEqual(exitcode, 0)
- inspect = self.client.inspect_container(res)
- self.assertIn('Running', inspect['State'])
- self.assertEqual(inspect['State']['Running'], False)
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], exitcode)
-
-
-class TestLogs(BaseTestCase):
- def runTest(self):
- snippet = 'Flowering Nights (Sakuya Iyazoi)'
- container = self.client.create_container(
- BUSYBOX, 'echo {0}'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(id)
- self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestLogsWithTailOption(BaseTestCase):
- def runTest(self):
- snippet = '''Line1
-Line2'''
- container = self.client.create_container(
- BUSYBOX, 'echo "{0}"'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(id, tail=1)
- self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
-
-
-# class TestLogsStreaming(BaseTestCase):
-# def runTest(self):
-# snippet = 'Flowering Nights (Sakuya Iyazoi)'
-# container = self.client.create_container(
-# BUSYBOX, 'echo {0}'.format(snippet)
-# )
-# id = container['Id']
-# self.client.start(id)
-# self.tmp_containers.append(id)
-# logs = bytes() if six.PY3 else str()
-# for chunk in self.client.logs(id, stream=True):
-# logs += chunk
-
-# exitcode = self.client.wait(id)
-# self.assertEqual(exitcode, 0)
-
-# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestLogsWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- snippet = 'Flowering Nights (Sakuya Iyazoi)'
- container = self.client.create_container(
- BUSYBOX, 'echo {0}'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(container)
- self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestDiff(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- diff = self.client.diff(id)
- test_diff = [x for x in diff if x.get('Path', None) == '/test']
- self.assertEqual(len(test_diff), 1)
- self.assertIn('Kind', test_diff[0])
- self.assertEqual(test_diff[0]['Kind'], 1)
-
-
-class TestDiffWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- diff = self.client.diff(container)
- test_diff = [x for x in diff if x.get('Path', None) == '/test']
- self.assertEqual(len(test_diff), 1)
- self.assertIn('Kind', test_diff[0])
- self.assertEqual(test_diff[0]['Kind'], 1)
-
-
-class TestStop(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.stop(id, timeout=2)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestStopWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- self.assertIn('Id', container)
- id = container['Id']
- self.client.start(container)
- self.tmp_containers.append(id)
- self.client.stop(container, timeout=2)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKill(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKillWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(container)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKillWithSignal(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '60'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(id, signal=signal.SIGKILL)
- exitcode = self.client.wait(id)
- self.assertNotEqual(exitcode, 0)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False, state)
-
-
-class TestPort(BaseTestCase):
- def runTest(self):
-
- port_bindings = {
- '1111': ('127.0.0.1', '4567'),
- '2222': ('127.0.0.1', '4568')
- }
-
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
- host_config=self.client.create_host_config(
- port_bindings=port_bindings, network_mode='bridge'
- )
- )
- id = container['Id']
-
- self.client.start(container)
-
- # Call the port function on each biding and compare expected vs actual
- for port in port_bindings:
- actual_bindings = self.client.port(container, port)
- port_binding = actual_bindings.pop()
-
- ip, host_port = port_binding['HostIp'], port_binding['HostPort']
-
- self.assertEqual(ip, port_bindings[port][0])
- self.assertEqual(host_port, port_bindings[port][1])
-
- self.client.kill(id)
-
-
-class TestMacAddress(BaseTestCase):
- def runTest(self):
- mac_address_expected = "02:42:ac:11:00:0a"
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
-
- id = container['Id']
-
- self.client.start(container)
- res = self.client.inspect_container(container['Id'])
- self.assertEqual(mac_address_expected,
- res['NetworkSettings']['MacAddress'])
-
- self.client.kill(id)
-
-
-class TestContainerTop(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'])
-
- id = container['Id']
-
- self.client.start(container)
- res = self.client.top(container['Id'])
- self.assertEqual(
- res['Titles'],
- ['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD']
- )
- self.assertEqual(len(res['Processes']), 1)
- self.assertEqual(res['Processes'][0][7], 'sleep 60')
- self.client.kill(id)
-
-
-class TestContainerTopWithPsArgs(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'])
-
- id = container['Id']
-
- self.client.start(container)
- res = self.client.top(container['Id'], 'waux')
- self.assertEqual(
- res['Titles'],
- ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS',
- 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'],
- )
- self.assertEqual(len(res['Processes']), 1)
- self.assertEqual(res['Processes'][0][10], 'sleep 60')
- self.client.kill(id)
-
-
-class TestRestart(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- info = self.client.inspect_container(id)
- self.assertIn('State', info)
- self.assertIn('StartedAt', info['State'])
- start_time1 = info['State']['StartedAt']
- self.client.restart(id, timeout=2)
- info2 = self.client.inspect_container(id)
- self.assertIn('State', info2)
- self.assertIn('StartedAt', info2['State'])
- start_time2 = info2['State']['StartedAt']
- self.assertNotEqual(start_time1, start_time2)
- self.assertIn('Running', info2['State'])
- self.assertEqual(info2['State']['Running'], True)
- self.client.kill(id)
-
-
-class TestRestartWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- self.assertIn('Id', container)
- id = container['Id']
- self.client.start(container)
- self.tmp_containers.append(id)
- info = self.client.inspect_container(id)
- self.assertIn('State', info)
- self.assertIn('StartedAt', info['State'])
- start_time1 = info['State']['StartedAt']
- self.client.restart(container, timeout=2)
- info2 = self.client.inspect_container(id)
- self.assertIn('State', info2)
- self.assertIn('StartedAt', info2['State'])
- start_time2 = info2['State']['StartedAt']
- self.assertNotEqual(start_time1, start_time2)
- self.assertIn('Running', info2['State'])
- self.assertEqual(info2['State']['Running'], True)
- self.client.kill(id)
-
-
-class TestRemoveContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['true'])
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- self.client.remove_container(id)
- containers = self.client.containers(all=True)
- res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
- self.assertEqual(len(res), 0)
-
-
-class TestRemoveContainerWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['true'])
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- self.client.remove_container(container)
- containers = self.client.containers(all=True)
- res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
- self.assertEqual(len(res), 0)
-
-
-class TestCreateContainerWithVolumesFrom(BaseTestCase):
- def runTest(self):
- vol_names = ['foobar_vol0', 'foobar_vol1']
-
- res0 = self.client.create_container(
- BUSYBOX, 'true', name=vol_names[0]
- )
- container1_id = res0['Id']
- self.tmp_containers.append(container1_id)
- self.client.start(container1_id)
-
- res1 = self.client.create_container(
- BUSYBOX, 'true', name=vol_names[1]
- )
- container2_id = res1['Id']
- self.tmp_containers.append(container2_id)
- self.client.start(container2_id)
- with self.assertRaises(docker.errors.DockerException):
- self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True,
- volumes_from=vol_names
- )
- res2 = self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True,
- host_config=self.client.create_host_config(
- volumes_from=vol_names, network_mode='none'
- )
- )
- container3_id = res2['Id']
- self.tmp_containers.append(container3_id)
- self.client.start(container3_id)
-
- info = self.client.inspect_container(res2['Id'])
- self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
-
-
-class TestCreateContainerWithLinks(BaseTestCase):
- def runTest(self):
- res0 = self.client.create_container(
- BUSYBOX, 'cat',
- detach=True, stdin_open=True,
- environment={'FOO': '1'})
-
- container1_id = res0['Id']
- self.tmp_containers.append(container1_id)
-
- self.client.start(container1_id)
-
- res1 = self.client.create_container(
- BUSYBOX, 'cat',
- detach=True, stdin_open=True,
- environment={'FOO': '1'})
-
- container2_id = res1['Id']
- self.tmp_containers.append(container2_id)
-
- self.client.start(container2_id)
-
- # we don't want the first /
- link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
- link_alias1 = 'mylink1'
- link_env_prefix1 = link_alias1.upper()
-
- link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
- link_alias2 = 'mylink2'
- link_env_prefix2 = link_alias2.upper()
-
- res2 = self.client.create_container(
- BUSYBOX, 'env', host_config=self.client.create_host_config(
- links={link_path1: link_alias1, link_path2: link_alias2},
- network_mode='none'
- )
- )
- container3_id = res2['Id']
- self.tmp_containers.append(container3_id)
- self.client.start(container3_id)
- self.assertEqual(self.client.wait(container3_id), 0)
-
- logs = self.client.logs(container3_id)
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
- self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
- self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
- self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
-
-
-class TestRestartingContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(
- BUSYBOX, ['sleep', '2'],
- host_config=self.client.create_host_config(
- restart_policy={"Name": "always", "MaximumRetryCount": 0},
- network_mode='none'
- )
- )
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- with self.assertRaises(docker.errors.APIError) as exc:
- self.client.remove_container(id)
- err = exc.exception.response.text
- self.assertIn(
- 'You cannot remove a running container', err
- )
- self.client.remove_container(id, force=True)
-
-
-class TestExecuteCommand(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, ['echo', 'hello'])
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'hello\n')
-
-
-class TestExecuteCommandString(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'echo hello world')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'hello world\n')
-
-
-class TestExecuteCommandStringAsUser(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'whoami', user='default')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'default\n')
-
-
-class TestExecuteCommandStringAsRoot(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'whoami')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'root\n')
-
-
-class TestExecuteCommandStreaming(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
- self.assertIn('Id', exec_id)
-
- res = b''
- for chunk in self.client.exec_start(exec_id, stream=True):
- res += chunk
- self.assertEqual(res, b'hello\nworld\n')
-
-
-class TestExecInspect(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
- self.assertIn('Id', exec_id)
- self.client.exec_start(exec_id)
- exec_info = self.client.exec_inspect(exec_id)
- self.assertIn('ExitCode', exec_info)
- self.assertNotEqual(exec_info['ExitCode'], 0)
-
-
-class TestRunContainerStreaming(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, '/bin/sh',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- sock = self.client.attach_socket(container, ws=False)
- self.assertTrue(sock.fileno() > -1)
-
-
-class TestRunContainerReadingSocket(BaseTestCase):
- def runTest(self):
- line = 'hi there and stuff and things, words!'
- command = "echo '{0}'".format(line)
- container = self.client.create_container(BUSYBOX, command,
- detach=True, tty=False)
- ident = container['Id']
- self.tmp_containers.append(ident)
-
- opts = {"stdout": 1, "stream": 1, "logs": 1}
- pty_stdout = self.client.attach_socket(ident, opts)
- self.client.start(ident)
-
- recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
-
- def read(n=4096):
- """Code stolen from dockerpty to read the socket"""
- try:
- if hasattr(pty_stdout, 'recv'):
- return pty_stdout.recv(n)
- return os.read(pty_stdout.fileno(), n)
- except EnvironmentError as e:
- if e.errno not in recoverable_errors:
- raise
-
- def next_packet_size():
- """Code stolen from dockerpty to get the next packet size"""
- data = six.binary_type()
- while len(data) < 8:
- next_data = read(8 - len(data))
- if not next_data:
- return 0
- data = data + next_data
-
- if data is None:
- return 0
-
- if len(data) == 8:
- _, actual = struct.unpack('>BxxxL', data)
- return actual
-
- next_size = next_packet_size()
- self.assertEqual(next_size, len(line)+1)
-
- data = six.binary_type()
- while len(data) < next_size:
- next_data = read(next_size - len(data))
- if not next_data:
- assert False, "Failed trying to read in the dataz"
- data += next_data
- self.assertEqual(data.decode('utf-8'), "{0}\n".format(line))
- pty_stdout.close()
-
- # Prevent segfault at the end of the test run
- if hasattr(pty_stdout, "_response"):
- del pty_stdout._response
-
-
-class TestPauseUnpauseContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.tmp_containers.append(id)
- self.client.start(container)
- self.client.pause(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], True)
- self.assertIn('Paused', state)
- self.assertEqual(state['Paused'], True)
-
- self.client.unpause(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], True)
- self.assertIn('Paused', state)
- self.assertEqual(state['Paused'], False)
-
-
-class TestCreateContainerWithHostPidMode(BaseTestCase):
- def runTest(self):
- ctnr = self.client.create_container(
- BUSYBOX, 'true', host_config=self.client.create_host_config(
- pid_mode='host', network_mode='none'
- )
- )
- self.assertIn('Id', ctnr)
- self.tmp_containers.append(ctnr['Id'])
- self.client.start(ctnr)
- inspect = self.client.inspect_container(ctnr)
- self.assertIn('HostConfig', inspect)
- host_config = inspect['HostConfig']
- self.assertIn('PidMode', host_config)
- self.assertEqual(host_config['PidMode'], 'host')
-
-
-#################
-# LINKS TESTS #
-#################
-
-
-class TestRemoveLink(BaseTestCase):
- def runTest(self):
- # Create containers
- container1 = self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True
- )
- container1_id = container1['Id']
- self.tmp_containers.append(container1_id)
- self.client.start(container1_id)
-
- # Create Link
- # we don't want the first /
- link_path = self.client.inspect_container(container1_id)['Name'][1:]
- link_alias = 'mylink'
-
- container2 = self.client.create_container(
- BUSYBOX, 'cat', host_config=self.client.create_host_config(
- links={link_path: link_alias}, network_mode='none'
- )
- )
- container2_id = container2['Id']
- self.tmp_containers.append(container2_id)
- self.client.start(container2_id)
-
- # Remove link
- linked_name = self.client.inspect_container(container2_id)['Name'][1:]
- link_name = '%s/%s' % (linked_name, link_alias)
- self.client.remove_container(link_name, link=True)
-
- # Link is gone
- containers = self.client.containers(all=True)
- retrieved = [x for x in containers if link_name in x['Names']]
- self.assertEqual(len(retrieved), 0)
-
- # Containers are still there
- retrieved = [
- x for x in containers if x['Id'].startswith(container1_id) or
- x['Id'].startswith(container2_id)
- ]
- self.assertEqual(len(retrieved), 2)
-
-##################
-# IMAGES TESTS #
-##################
-
-
-class TestPull(BaseTestCase):
- def runTest(self):
- try:
- self.client.remove_image('hello-world')
- except docker.errors.APIError:
- pass
- res = self.client.pull('hello-world')
- self.tmp_imgs.append('hello-world')
- self.assertEqual(type(res), six.text_type)
- self.assertGreaterEqual(
- len(self.client.images('hello-world')), 1
- )
- img_info = self.client.inspect_image('hello-world')
- self.assertIn('Id', img_info)
-
-
-class TestPullStream(BaseTestCase):
- def runTest(self):
- try:
- self.client.remove_image('hello-world')
- except docker.errors.APIError:
- pass
- stream = self.client.pull('hello-world', stream=True)
- self.tmp_imgs.append('hello-world')
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- json.loads(chunk) # ensure chunk is a single, valid JSON blob
- self.assertGreaterEqual(
- len(self.client.images('hello-world')), 1
- )
- img_info = self.client.inspect_image('hello-world')
- self.assertIn('Id', img_info)
-
-
-class TestCommit(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- res = self.client.commit(id)
- self.assertIn('Id', res)
- img_id = res['Id']
- self.tmp_imgs.append(img_id)
- img = self.client.inspect_image(img_id)
- self.assertIn('Container', img)
- self.assertTrue(img['Container'].startswith(id))
- self.assertIn('ContainerConfig', img)
- self.assertIn('Image', img['ContainerConfig'])
- self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
- busybox_id = self.client.inspect_image(BUSYBOX)['Id']
- self.assertIn('Parent', img)
- self.assertEqual(img['Parent'], busybox_id)
-
-
-class TestRemoveImage(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- res = self.client.commit(id)
- self.assertIn('Id', res)
- img_id = res['Id']
- self.tmp_imgs.append(img_id)
- self.client.remove_image(img_id, force=True)
- images = self.client.images(all=True)
- res = [x for x in images if x['Id'].startswith(img_id)]
- self.assertEqual(len(res), 0)
-
-
-##################
-# IMPORT TESTS #
-##################
-
-
-class ImportTestCase(BaseTestCase):
- '''Base class for `docker import` test cases.'''
-
- TAR_SIZE = 512 * 1024
-
- def write_dummy_tar_content(self, n_bytes, tar_fd):
- def extend_file(f, n_bytes):
- f.seek(n_bytes - 1)
- f.write(bytearray([65]))
- f.seek(0)
-
- tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
-
- with tempfile.NamedTemporaryFile() as f:
- extend_file(f, n_bytes)
- tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
- tar.addfile(tarinfo, fileobj=f)
-
- tar.close()
-
- @contextlib.contextmanager
- def dummy_tar_stream(self, n_bytes):
- '''Yields a stream that is valid tar data of size n_bytes.'''
- with tempfile.NamedTemporaryFile() as tar_file:
- self.write_dummy_tar_content(n_bytes, tar_file)
- tar_file.seek(0)
- yield tar_file
-
- @contextlib.contextmanager
- def dummy_tar_file(self, n_bytes):
- '''Yields the name of a valid tar file of size n_bytes.'''
- with tempfile.NamedTemporaryFile() as tar_file:
- self.write_dummy_tar_content(n_bytes, tar_file)
- tar_file.seek(0)
- yield tar_file.name
-
-
-class TestImportFromBytes(ImportTestCase):
- '''Tests importing an image from in-memory byte data.'''
-
- def runTest(self):
- with self.dummy_tar_stream(n_bytes=500) as f:
- content = f.read()
-
- # The generic import_image() function cannot import in-memory bytes
- # data that happens to be represented as a string type, because
- # import_image() will try to use it as a filename and usually then
- # trigger an exception. So we test the import_image_from_data()
- # function instead.
- statuses = self.client.import_image_from_data(
- content, repository='test/import-from-bytes')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromFile(ImportTestCase):
- '''Tests importing an image from a tar file on disk.'''
-
- def runTest(self):
- with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
- # statuses = self.client.import_image(
- # src=tar_filename, repository='test/import-from-file')
- statuses = self.client.import_image_from_file(
- tar_filename, repository='test/import-from-file')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromStream(ImportTestCase):
- '''Tests importing an image from a stream containing tar data.'''
-
- def runTest(self):
- with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
- statuses = self.client.import_image(
- src=tar_stream, repository='test/import-from-stream')
- # statuses = self.client.import_image_from_stream(
- # tar_stream, repository='test/import-from-stream')
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromURL(ImportTestCase):
- '''Tests downloading an image over HTTP.'''
-
- @contextlib.contextmanager
- def temporary_http_file_server(self, stream):
- '''Serve data from an IO stream over HTTP.'''
-
- class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
- def do_GET(self):
- self.send_response(200)
- self.send_header('Content-Type', 'application/x-tar')
- self.end_headers()
- shutil.copyfileobj(stream, self.wfile)
-
- server = socketserver.TCPServer(('', 0), Handler)
- thread = threading.Thread(target=server.serve_forever)
- thread.setDaemon(True)
- thread.start()
-
- yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
-
- server.shutdown()
-
- @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
- def runTest(self):
- # The crappy test HTTP server doesn't handle large files well, so use
- # a small file.
- TAR_SIZE = 10240
-
- with self.dummy_tar_stream(n_bytes=TAR_SIZE) as tar_data:
- with self.temporary_http_file_server(tar_data) as url:
- statuses = self.client.import_image(
- src=url, repository='test/import-from-url')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-#################
-# VOLUMES TESTS #
-#################
-
-@requires_api_version('1.21')
-class TestVolumes(BaseTestCase):
- def test_create_volume(self):
- name = 'perfectcherryblossom'
- self.tmp_volumes.append(name)
- result = self.client.create_volume(name)
- self.assertIn('Name', result)
- self.assertEqual(result['Name'], name)
- self.assertIn('Driver', result)
- self.assertEqual(result['Driver'], 'local')
-
- def test_create_volume_invalid_driver(self):
- driver_name = 'invalid.driver'
-
- with pytest.raises(docker.errors.NotFound):
- self.client.create_volume('perfectcherryblossom', driver_name)
-
- def test_list_volumes(self):
- name = 'imperishablenight'
- self.tmp_volumes.append(name)
- volume_info = self.client.create_volume(name)
- result = self.client.volumes()
- self.assertIn('Volumes', result)
- volumes = result['Volumes']
- self.assertIn(volume_info, volumes)
-
- def test_inspect_volume(self):
- name = 'embodimentofscarletdevil'
- self.tmp_volumes.append(name)
- volume_info = self.client.create_volume(name)
- result = self.client.inspect_volume(name)
- self.assertEqual(volume_info, result)
-
- def test_inspect_nonexistent_volume(self):
- name = 'embodimentofscarletdevil'
- with pytest.raises(docker.errors.NotFound):
- self.client.inspect_volume(name)
-
- def test_remove_volume(self):
- name = 'shootthebullet'
- self.tmp_volumes.append(name)
- self.client.create_volume(name)
- result = self.client.remove_volume(name)
- self.assertTrue(result)
-
- def test_remove_nonexistent_volume(self):
- name = 'shootthebullet'
- with pytest.raises(docker.errors.NotFound):
- self.client.remove_volume(name)
-
-
-#################
-# BUILDER TESTS #
-#################
-
-class TestBuildStream(BaseTestCase):
- def runTest(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
- stream = self.client.build(fileobj=script, stream=True)
- logs = ''
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- json.loads(chunk) # ensure chunk is a single, valid JSON blob
- logs += chunk
- self.assertNotEqual(logs, '')
-
-
-class TestBuildFromStringIO(BaseTestCase):
- def runTest(self):
- if six.PY3:
- return
- script = io.StringIO(six.text_type('\n').join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]))
- stream = self.client.build(fileobj=script, stream=True)
- logs = ''
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- logs += chunk
- self.assertNotEqual(logs, '')
-
-
-@requires_api_version('1.8')
-class TestBuildWithDockerignore(Cleanup, BaseTestCase):
- def runTest(self):
- base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base_dir)
-
- with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
- f.write("\n".join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'ADD . /test',
- ]))
-
- with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
- f.write("\n".join([
- 'ignored',
- 'Dockerfile',
- '.dockerignore',
- '', # empty line
- ]))
-
- with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
- f.write("this file should not be ignored")
-
- subdir = os.path.join(base_dir, 'ignored', 'subdir')
- os.makedirs(subdir)
- with open(os.path.join(subdir, 'file'), 'w') as f:
- f.write("this file should be ignored")
-
- tag = 'docker-py-test-build-with-dockerignore'
- stream = self.client.build(
- path=base_dir,
- tag=tag,
- )
- for chunk in stream:
- pass
-
- c = self.client.create_container(tag, ['ls', '-1A', '/test'])
- self.client.start(c)
- self.client.wait(c)
- logs = self.client.logs(c)
-
- if six.PY3:
- logs = logs.decode('utf-8')
-
- self.assertEqual(
- list(filter(None, logs.split('\n'))),
- ['not-ignored'],
- )
-
-
-#######################
-# NETWORK TESTS #
-#######################
-
-
-@requires_api_version('1.21')
-class TestNetworks(BaseTestCase):
- def create_network(self, *args, **kwargs):
- net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
- net_id = self.client.create_network(net_name, *args, **kwargs)['id']
- self.tmp_networks.append(net_id)
- return (net_name, net_id)
-
- def test_list_networks(self):
- networks = self.client.networks()
- initial_size = len(networks)
-
- net_name, net_id = self.create_network()
-
- networks = self.client.networks()
- self.assertEqual(len(networks), initial_size + 1)
- self.assertTrue(net_id in [n['id'] for n in networks])
-
- networks_by_name = self.client.networks(names=[net_name])
- self.assertEqual([n['id'] for n in networks_by_name], [net_id])
-
- networks_by_partial_id = self.client.networks(ids=[net_id[:8]])
- self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id])
-
- def test_inspect_network(self):
- net_name, net_id = self.create_network()
-
- net = self.client.inspect_network(net_id)
- self.assertEqual(net, {
- u'name': net_name,
- u'id': net_id,
- u'driver': 'bridge',
- u'containers': {},
- })
-
- def test_create_network_with_host_driver_fails(self):
- net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14]
-
- with pytest.raises(APIError):
- self.client.create_network(net_name, driver='host')
-
- def test_remove_network(self):
- initial_size = len(self.client.networks())
-
- net_name, net_id = self.create_network()
- self.assertEqual(len(self.client.networks()), initial_size + 1)
-
- self.client.remove_network(net_id)
- self.assertEqual(len(self.client.networks()), initial_size)
-
- def test_connect_and_disconnect_container(self):
- net_name, net_id = self.create_network()
-
- container = self.client.create_container('busybox', 'top')
- self.tmp_containers.append(container)
- self.client.start(container)
-
- network_data = self.client.inspect_network(net_id)
- self.assertFalse(network_data.get('containers'))
-
- self.client.connect_container_to_network(container, net_id)
- network_data = self.client.inspect_network(net_id)
- self.assertEqual(
- list(network_data['containers'].keys()),
- [container['Id']])
-
- self.client.disconnect_container_from_network(container, net_id)
- network_data = self.client.inspect_network(net_id)
- self.assertFalse(network_data.get('containers'))
-
- def test_connect_on_container_create(self):
- net_name, net_id = self.create_network()
-
- container = self.client.create_container(
- image='busybox',
- command='top',
- host_config=self.client.create_host_config(network_mode=net_name),
- )
- self.tmp_containers.append(container)
- self.client.start(container)
-
- network_data = self.client.inspect_network(net_id)
- self.assertEqual(
- list(network_data['containers'].keys()),
- [container['Id']])
-
- self.client.disconnect_container_from_network(container, net_id)
- network_data = self.client.inspect_network(net_id)
- self.assertFalse(network_data.get('containers'))
-
-
-#######################
-# PY SPECIFIC TESTS #
-#######################
-
-
-class TestRunShlex(BaseTestCase):
- def runTest(self):
- commands = [
- 'true',
- 'echo "The Young Descendant of Tepes & Septette for the '
- 'Dead Princess"',
- 'echo -n "The Young Descendant of Tepes & Septette for the '
- 'Dead Princess"',
- '/bin/sh -c "echo Hello World"',
- '/bin/sh -c \'echo "Hello World"\'',
- 'echo "\"Night of Nights\""',
- 'true && echo "Night of Nights"'
- ]
- for cmd in commands:
- container = self.client.create_container(BUSYBOX, cmd)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0, msg=cmd)
-
-
-class TestLoadConfig(BaseTestCase):
- def runTest(self):
- folder = tempfile.mkdtemp()
- self.tmp_folders.append(folder)
- cfg_path = os.path.join(folder, '.dockercfg')
- f = open(cfg_path, 'w')
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- f.write('auth = {0}\n'.format(auth_))
- f.write('email = sakuya@scarlet.net')
- f.close()
- cfg = docker.auth.load_config(cfg_path)
- self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
- cfg = cfg[docker.auth.INDEX_NAME]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('Auth'), None)
-
-
-class TestLoadJSONConfig(BaseTestCase):
- def runTest(self):
- folder = tempfile.mkdtemp()
- self.tmp_folders.append(folder)
- cfg_path = os.path.join(folder, '.dockercfg')
- f = open(os.path.join(folder, '.dockercfg'), 'w')
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- email_ = 'sakuya@scarlet.net'
- f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
- docker.auth.INDEX_URL, auth_, email_))
- f.close()
- cfg = docker.auth.load_config(cfg_path)
- self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
- cfg = cfg[docker.auth.INDEX_URL]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('Auth'), None)
-
-
-class TestAutoDetectVersion(unittest.TestCase):
- def test_client_init(self):
- client = docker_client(version='auto')
- client_version = client._version
- api_version = client.version(api_version=False)['ApiVersion']
- self.assertEqual(client_version, api_version)
- api_version_2 = client.version()['ApiVersion']
- self.assertEqual(client_version, api_version_2)
- client.close()
-
- def test_auto_client(self):
- client = docker.AutoVersionClient(**docker_client_kwargs())
- client_version = client._version
- api_version = client.version(api_version=False)['ApiVersion']
- self.assertEqual(client_version, api_version)
- api_version_2 = client.version()['ApiVersion']
- self.assertEqual(client_version, api_version_2)
- client.close()
- with self.assertRaises(docker.errors.DockerException):
- docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
-
-
-class TestConnectionTimeout(unittest.TestCase):
- def setUp(self):
- self.timeout = 0.5
- self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
- timeout=self.timeout)
-
- def runTest(self):
- start = time.time()
- res = None
- # This call isn't supposed to complete, and it should fail fast.
- try:
- res = self.client.inspect_container('id')
- except:
- pass
- end = time.time()
- self.assertTrue(res is None)
- self.assertTrue(end - start < 2 * self.timeout)
-
-
-class UnixconnTestCase(unittest.TestCase):
- """
- Test UNIX socket connection adapter.
- """
-
- def test_resource_warnings(self):
- """
- Test no warnings are produced when using the client.
- """
-
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always')
-
- client = docker_client()
- client.images()
- client.close()
- del client
-
- assert len(w) == 0, \
- "No warnings produced: {0}".format(w[0].message)
-
-
-####################
-# REGRESSION TESTS #
-####################
-
-class TestRegressions(BaseTestCase):
- def test_443(self):
- dfile = io.BytesIO()
- with self.assertRaises(docker.errors.APIError) as exc:
- for line in self.client.build(fileobj=dfile, tag="a/b/c"):
- pass
- self.assertEqual(exc.exception.response.status_code, 500)
- dfile.close()
-
- def test_542(self):
- self.client.start(
- self.client.create_container(BUSYBOX, ['true'])
- )
- result = self.client.containers(all=True, trunc=True)
- self.assertEqual(len(result[0]['Id']), 12)
-
- def test_647(self):
- with self.assertRaises(docker.errors.APIError):
- self.client.inspect_image('gensokyo.jp//kirisame')
-
- def test_649(self):
- self.client.timeout = None
- ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
- self.client.start(ctnr)
- self.client.stop(ctnr)
-
- def test_715(self):
- ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- logs = self.client.logs(ctnr)
- if six.PY3:
- logs = logs.decode('utf-8')
- assert logs == '1000\n'
-
- def test_792_explicit_port_protocol(self):
-
- tcp_port, udp_port = random.sample(range(9999, 32000), 2)
- ctnr = self.client.create_container(
- BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')],
- host_config=self.client.create_host_config(
- port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port}
- )
- )
- self.tmp_containers.append(ctnr)
- self.client.start(ctnr)
- self.assertEqual(
- self.client.port(ctnr, 2000)[0]['HostPort'],
- six.text_type(tcp_port)
- )
- self.assertEqual(
- self.client.port(ctnr, '2000/tcp')[0]['HostPort'],
- six.text_type(tcp_port)
- )
- self.assertEqual(
- self.client.port(ctnr, '2000/udp')[0]['HostPort'],
- six.text_type(udp_port)
- )
+# FIXME: placeholder while we transition to the new folder architecture
+# Remove when merged in master and Jenkins is updated to find the tests
+# in the new location.
+from .integration import * # flake8: noqa
diff --git a/tests/testdata/certs/ca.pem b/tests/unit/__init__.py
index e69de29..e69de29 100644
--- a/tests/testdata/certs/ca.pem
+++ b/tests/unit/__init__.py
diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py
new file mode 100644
index 0000000..62d64e8
--- /dev/null
+++ b/tests/unit/api_test.py
@@ -0,0 +1,418 @@
+# Copyright 2013 dotCloud inc.
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+import re
+import shutil
+import socket
+import sys
+import tempfile
+import threading
+import time
+
+import docker
+import requests
+import six
+
+from .. import base
+from . import fake_api
+
+import pytest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
+
+
+def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
+ request=None):
+ res = requests.Response()
+ res.status_code = status_code
+ if not isinstance(content, six.binary_type):
+ content = json.dumps(content).encode('ascii')
+ res._content = content
+ res.headers = requests.structures.CaseInsensitiveDict(headers or {})
+ res.reason = reason
+ res.elapsed = datetime.timedelta(elapsed)
+ res.request = request
+ return res
+
+
+def fake_resolve_authconfig(authconfig, registry=None):
+ return None
+
+
+def fake_inspect_container(self, container, tty=False):
+ return fake_api.get_fake_inspect_container(tty=tty)[1]
+
+
+def fake_resp(method, url, *args, **kwargs):
+ key = None
+ if url in fake_api.fake_responses:
+ key = url
+ elif (url, method) in fake_api.fake_responses:
+ key = (url, method)
+ if not key:
+ raise Exception('{0} {1}'.format(method, url))
+ status_code, content = fake_api.fake_responses[key]()
+ return response(status_code=status_code, content=content)
+
+
+fake_request = mock.Mock(side_effect=fake_resp)
+
+
+def fake_get(self, url, *args, **kwargs):
+ return fake_request('GET', url, *args, **kwargs)
+
+
+def fake_post(self, url, *args, **kwargs):
+ return fake_request('POST', url, *args, **kwargs)
+
+
+def fake_put(self, url, *args, **kwargs):
+ return fake_request('PUT', url, *args, **kwargs)
+
+
+def fake_delete(self, url, *args, **kwargs):
+ return fake_request('DELETE', url, *args, **kwargs)
+
+url_base = 'http+docker://localunixsocket/'
+url_prefix = '{0}v{1}/'.format(
+ url_base,
+ docker.constants.DEFAULT_DOCKER_API_VERSION)
+
+
+class DockerClientTest(base.Cleanup, base.BaseTestCase):
+ def setUp(self):
+ self.patcher = mock.patch.multiple(
+ 'docker.Client', get=fake_get, post=fake_post, put=fake_put,
+ delete=fake_delete
+ )
+ self.patcher.start()
+ self.client = docker.Client()
+ # Force-clear authconfig to avoid tampering with the tests
+ self.client._cfg = {'Configs': {}}
+
+ def tearDown(self):
+ self.client.close()
+ self.patcher.stop()
+
+ def assertIn(self, object, collection):
+ if six.PY2 and sys.version_info[1] <= 6:
+ return self.assertTrue(object in collection)
+ return super(DockerClientTest, self).assertIn(object, collection)
+
+ def base_create_payload(self, img='busybox', cmd=None):
+ if not cmd:
+ cmd = ['true']
+ return {"Tty": False, "Image": img, "Cmd": cmd,
+ "AttachStdin": False,
+ "AttachStderr": True, "AttachStdout": True,
+ "StdinOnce": False,
+ "OpenStdin": False, "NetworkDisabled": False,
+ }
+
+
+class DockerApiTest(DockerClientTest):
+ def test_ctor(self):
+ with pytest.raises(docker.errors.DockerException) as excinfo:
+ docker.Client(version=1.12)
+
+ self.assertEqual(
+ str(excinfo.value),
+ 'Version parameter must be a string or None. Found float'
+ )
+
+ def test_url_valid_resource(self):
+ url = self.client._url('/hello/{0}/world', 'somename')
+ self.assertEqual(
+ url, '{0}{1}'.format(url_prefix, 'hello/somename/world')
+ )
+
+ url = self.client._url(
+ '/hello/{0}/world/{1}', 'somename', 'someothername'
+ )
+ self.assertEqual(
+ url,
+ '{0}{1}'.format(url_prefix, 'hello/somename/world/someothername')
+ )
+
+ url = self.client._url('/hello/{0}/world', '/some?name')
+ self.assertEqual(
+ url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world')
+ )
+
+ def test_url_invalid_resource(self):
+ with pytest.raises(ValueError):
+ self.client._url('/hello/{0}/world', ['sakuya', 'izayoi'])
+
+ def test_url_no_resource(self):
+ url = self.client._url('/simple')
+ self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple'))
+
+ def test_url_unversioned_api(self):
+ url = self.client._url(
+ '/hello/{0}/world', 'somename', versioned_api=False
+ )
+ self.assertEqual(
+ url, '{0}{1}'.format(url_base, 'hello/somename/world')
+ )
+
+ def test_version(self):
+ self.client.version()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'version',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_version_no_api_version(self):
+ self.client.version(False)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_base + 'version',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_retrieve_server_version(self):
+ client = docker.Client(version="auto")
+ self.assertTrue(isinstance(client._version, six.string_types))
+ self.assertFalse(client._version == "auto")
+ client.close()
+
+ def test_auto_retrieve_server_version(self):
+ version = self.client._retrieve_server_version()
+ self.assertTrue(isinstance(version, six.string_types))
+
+ def test_info(self):
+ self.client.info()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'info',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_search(self):
+ self.client.search('busybox')
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/search',
+ params={'term': 'busybox'},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_events(self):
+ self.client.events()
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={'since': None, 'until': None, 'filters': None},
+ stream=True
+ )
+
+ def test_events_with_since_until(self):
+ ts = 1356048000
+ now = datetime.datetime.utcfromtimestamp(ts)
+ since = now - datetime.timedelta(seconds=10)
+ until = now + datetime.timedelta(seconds=10)
+
+ self.client.events(since=since, until=until)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={
+ 'since': ts - 10,
+ 'until': ts + 10,
+ 'filters': None
+ },
+ stream=True
+ )
+
+ def test_events_with_filters(self):
+ filters = {'event': ['die', 'stop'],
+ 'container': fake_api.FAKE_CONTAINER_ID}
+
+ self.client.events(filters=filters)
+
+ expected_filters = docker.utils.convert_filters(filters)
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'events',
+ params={
+ 'since': None,
+ 'until': None,
+ 'filters': expected_filters
+ },
+ stream=True
+ )
+
+ def _socket_path_for_client_session(self, client):
+ socket_adapter = client.get_adapter('http+docker://')
+ return socket_adapter.socket_path
+
+ def test_url_compatibility_unix(self):
+ c = docker.Client(base_url="unix://socket")
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_unix_triple_slash(self):
+ c = docker.Client(base_url="unix:///socket")
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_http_unix_triple_slash(self):
+ c = docker.Client(base_url="http+unix:///socket")
+
+ assert self._socket_path_for_client_session(c) == '/socket'
+
+ def test_url_compatibility_http(self):
+ c = docker.Client(base_url="http://hostname:1234")
+
+ assert c.base_url == "http://hostname:1234"
+
+ def test_url_compatibility_tcp(self):
+ c = docker.Client(base_url="tcp://hostname:1234")
+
+ assert c.base_url == "http://hostname:1234"
+
+ def test_remove_link(self):
+ self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True)
+
+ fake_request.assert_called_with(
+ 'DELETE',
+ url_prefix + 'containers/3cc2351ab11b',
+ params={'v': False, 'link': True, 'force': False},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_create_host_config_secopt(self):
+ security_opt = ['apparmor:test_profile']
+ result = self.client.create_host_config(security_opt=security_opt)
+ self.assertIn('SecurityOpt', result)
+ self.assertEqual(result['SecurityOpt'], security_opt)
+ self.assertRaises(
+ docker.errors.DockerException, self.client.create_host_config,
+ security_opt='wrong'
+ )
+
+
+class StreamTest(base.Cleanup, base.BaseTestCase):
+ def setUp(self):
+ socket_dir = tempfile.mkdtemp()
+ self.build_context = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, socket_dir)
+ self.addCleanup(shutil.rmtree, self.build_context)
+ self.socket_file = os.path.join(socket_dir, 'test_sock.sock')
+ self.server_socket = self._setup_socket()
+ self.stop_server = False
+ server_thread = threading.Thread(target=self.run_server)
+ server_thread.setDaemon(True)
+ server_thread.start()
+ self.response = None
+ self.request_handler = None
+ self.addCleanup(server_thread.join)
+ self.addCleanup(self.stop)
+
+ def stop(self):
+ self.stop_server = True
+
+ def _setup_socket(self):
+ server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ server_sock.bind(self.socket_file)
+ # Non-blocking mode so that we can shut the test down easily
+ server_sock.setblocking(0)
+ server_sock.listen(5)
+ return server_sock
+
+ def run_server(self):
+ try:
+ while not self.stop_server:
+ try:
+ connection, client_address = self.server_socket.accept()
+ except socket.error:
+ # Probably no connection to accept yet
+ time.sleep(0.01)
+ continue
+
+ connection.setblocking(1)
+ try:
+ self.request_handler(connection)
+ finally:
+ connection.close()
+ finally:
+ self.server_socket.close()
+
+ def early_response_sending_handler(self, connection):
+ data = b''
+ headers = None
+
+ connection.sendall(self.response)
+ while not headers:
+ data += connection.recv(2048)
+ parts = data.split(b'\r\n\r\n', 1)
+ if len(parts) == 2:
+ headers, data = parts
+
+ mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
+ assert mo
+ content_length = int(mo.group(1))
+
+ while True:
+ if len(data) >= content_length:
+ break
+
+ data += connection.recv(2048)
+
+ def test_early_stream_response(self):
+ self.request_handler = self.early_response_sending_handler
+ lines = []
+ for i in range(0, 50):
+ line = str(i).encode()
+ lines += [('%x' % len(line)).encode(), line]
+ lines.append(b'0')
+ lines.append(b'')
+
+ self.response = (
+ b'HTTP/1.1 200 OK\r\n'
+ b'Transfer-Encoding: chunked\r\n'
+ b'\r\n'
+ ) + b'\r\n'.join(lines)
+
+ with docker.Client(base_url="http+unix://" + self.socket_file) \
+ as client:
+ for i in range(5):
+ try:
+ stream = client.build(
+ path=self.build_context,
+ stream=True
+ )
+ break
+ except requests.ConnectionError as e:
+ if i == 4:
+ raise e
+
+ self.assertEqual(list(stream), [
+ str(i).encode() for i in range(50)])
diff --git a/tests/unit/auth_test.py b/tests/unit/auth_test.py
new file mode 100644
index 0000000..3405de6
--- /dev/null
+++ b/tests/unit/auth_test.py
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+
+import base64
+import json
+import os
+import os.path
+import random
+import shutil
+import tempfile
+
+from docker import auth
+
+from .. import base
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class RegressionTest(base.BaseTestCase):
+ def test_803_urlsafe_encode(self):
+ auth_data = {
+ 'username': 'root',
+ 'password': 'GR?XGR?XGR?XGR?X'
+ }
+ encoded = auth.encode_header(auth_data)
+ assert b'/' not in encoded
+ assert b'_' in encoded
+
+
+class ResolveAuthTest(base.BaseTestCase):
+ auth_config = {
+ 'https://index.docker.io/v1/': {'auth': 'indexuser'},
+ 'my.registry.net': {'auth': 'privateuser'},
+ 'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
+ }
+
+ def test_resolve_repository_name_hub_library_image(self):
+ self.assertEqual(
+ auth.resolve_repository_name('image'),
+ ('index.docker.io', 'image'),
+ )
+
+ def test_resolve_repository_name_hub_image(self):
+ self.assertEqual(
+ auth.resolve_repository_name('username/image'),
+ ('index.docker.io', 'username/image'),
+ )
+
+ def test_resolve_repository_name_private_registry(self):
+ self.assertEqual(
+ auth.resolve_repository_name('my.registry.net/image'),
+ ('my.registry.net', 'image'),
+ )
+
+ def test_resolve_repository_name_private_registry_with_port(self):
+ self.assertEqual(
+ auth.resolve_repository_name('my.registry.net:5000/image'),
+ ('my.registry.net:5000', 'image'),
+ )
+
+ def test_resolve_repository_name_private_registry_with_username(self):
+ self.assertEqual(
+ auth.resolve_repository_name('my.registry.net/username/image'),
+ ('my.registry.net', 'username/image'),
+ )
+
+ def test_resolve_repository_name_no_dots_but_port(self):
+ self.assertEqual(
+ auth.resolve_repository_name('hostname:5000/image'),
+ ('hostname:5000', 'image'),
+ )
+
+ def test_resolve_repository_name_no_dots_but_port_and_username(self):
+ self.assertEqual(
+ auth.resolve_repository_name('hostname:5000/username/image'),
+ ('hostname:5000', 'username/image'),
+ )
+
+ def test_resolve_repository_name_localhost(self):
+ self.assertEqual(
+ auth.resolve_repository_name('localhost/image'),
+ ('localhost', 'image'),
+ )
+
+ def test_resolve_repository_name_localhost_with_username(self):
+ self.assertEqual(
+ auth.resolve_repository_name('localhost/username/image'),
+ ('localhost', 'username/image'),
+ )
+
+ def test_resolve_authconfig_hostname_only(self):
+ self.assertEqual(
+ auth.resolve_authconfig(self.auth_config, 'my.registry.net'),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_no_protocol(self):
+ self.assertEqual(
+ auth.resolve_authconfig(self.auth_config, 'my.registry.net/v1/'),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_no_path(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net'
+ ),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_no_path_trailing_slash(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net/'
+ ),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_no_path_wrong_secure_proto(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'https://my.registry.net'
+ ),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'http://index.docker.io'
+ ),
+ {'auth': 'indexuser'}
+ )
+
+ def test_resolve_authconfig_path_wrong_proto(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'https://my.registry.net/v1/'
+ ),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_default_registry(self):
+ self.assertEqual(
+ auth.resolve_authconfig(self.auth_config), {'auth': 'indexuser'}
+ )
+
+ def test_resolve_authconfig_default_explicit_none(self):
+ self.assertEqual(
+ auth.resolve_authconfig(self.auth_config, None),
+ {'auth': 'indexuser'}
+ )
+
+ def test_resolve_authconfig_fully_explicit(self):
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, 'http://my.registry.net/v1/'
+ ),
+ {'auth': 'privateuser'}
+ )
+
+ def test_resolve_authconfig_legacy_config(self):
+ self.assertEqual(
+ auth.resolve_authconfig(self.auth_config, 'legacy.registry.url'),
+ {'auth': 'legacyauth'}
+ )
+
+ def test_resolve_authconfig_no_match(self):
+ self.assertTrue(
+ auth.resolve_authconfig(self.auth_config, 'does.not.exist') is None
+ )
+
+ def test_resolve_registry_and_auth_library_image(self):
+ image = 'image'
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ ),
+ {'auth': 'indexuser'},
+ )
+
+ def test_resolve_registry_and_auth_hub_image(self):
+ image = 'username/image'
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ ),
+ {'auth': 'indexuser'},
+ )
+
+ def test_resolve_registry_and_auth_private_registry(self):
+ image = 'my.registry.net/image'
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ ),
+ {'auth': 'privateuser'},
+ )
+
+ def test_resolve_registry_and_auth_unauthenticated_registry(self):
+ image = 'other.registry.net/image'
+ self.assertEqual(
+ auth.resolve_authconfig(
+ self.auth_config, auth.resolve_repository_name(image)[0]
+ ),
+ None,
+ )
+
+
+class LoadConfigTest(base.Cleanup, base.BaseTestCase):
+ def test_load_config_no_file(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ cfg = auth.load_config(folder)
+ self.assertTrue(cfg is not None)
+
+ def test_load_config(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+ dockercfg_path = os.path.join(folder, '.dockercfg')
+ with open(dockercfg_path, 'w') as f:
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ f.write('auth = {0}\n'.format(auth_))
+ f.write('email = sakuya@scarlet.net')
+ cfg = auth.load_config(dockercfg_path)
+ assert auth.INDEX_NAME in cfg
+ self.assertNotEqual(cfg[auth.INDEX_NAME], None)
+ cfg = cfg[auth.INDEX_NAME]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('auth'), None)
+
+ def test_load_config_with_random_name(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder,
+ '.{0}.dockercfg'.format(
+ random.randrange(100000)))
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ config = {
+ registry: {
+ 'auth': '{0}'.format(auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ cfg = auth.load_config(dockercfg_path)
+ assert registry in cfg
+ self.assertNotEqual(cfg[registry], None)
+ cfg = cfg[registry]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('auth'), None)
+
+ def test_load_config_custom_config_env(self):
+ folder = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, folder)
+
+ dockercfg_path = os.path.join(folder, 'config.json')
+ registry = 'https://your.private.registry.io'
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ config = {
+ registry: {
+ 'auth': '{0}'.format(auth_),
+ 'email': 'sakuya@scarlet.net'
+ }
+ }
+
+ with open(dockercfg_path, 'w') as f:
+ json.dump(config, f)
+
+ with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
+ cfg = auth.load_config(None)
+ assert registry in cfg
+ self.assertNotEqual(cfg[registry], None)
+ cfg = cfg[registry]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('auth'), None)
diff --git a/tests/unit/build_test.py b/tests/unit/build_test.py
new file mode 100644
index 0000000..414153e
--- /dev/null
+++ b/tests/unit/build_test.py
@@ -0,0 +1,105 @@
+import gzip
+import io
+
+import docker
+
+from .api_test import DockerClientTest
+
+
+class BuildTest(DockerClientTest):
+ def test_build_container(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+
+ self.client.build(fileobj=script)
+
+ def test_build_container_pull(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+
+ self.client.build(fileobj=script, pull=True)
+
+ def test_build_container_stream(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+
+ self.client.build(fileobj=script, stream=True)
+
+ def test_build_container_custom_context(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+ context = docker.utils.mkbuildcontext(script)
+
+ self.client.build(fileobj=context, custom_context=True)
+
+ def test_build_container_custom_context_gzip(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+ context = docker.utils.mkbuildcontext(script)
+ gz_context = gzip.GzipFile(fileobj=context)
+
+ self.client.build(
+ fileobj=gz_context,
+ custom_context=True,
+ encoding="gzip"
+ )
+
+ def test_build_remote_with_registry_auth(self):
+ self.client._auth_configs = {
+ 'https://example.com': {
+ 'user': 'example',
+ 'password': 'example',
+ 'email': 'example@example.com'
+ }
+ }
+
+ self.client.build(path='https://github.com/docker-library/mongo')
+
+ def test_build_container_with_named_dockerfile(self):
+ self.client.build('.', dockerfile='nameddockerfile')
+
+ def test_build_container_with_container_limits(self):
+ self.client.build('.', container_limits={
+ 'memory': 1024 * 1024,
+ 'cpusetcpus': 1,
+ 'cpushares': 1000,
+ 'memswap': 1024 * 1024 * 8
+ })
+
+ def test_build_container_invalid_container_limits(self):
+ self.assertRaises(
+ docker.errors.DockerException,
+ lambda: self.client.build('.', container_limits={
+ 'foo': 'bar'
+ })
+ )
diff --git a/tests/test.py b/tests/unit/container_test.py
index 86f1941..eeeba76 100644
--- a/tests/test.py
+++ b/tests/unit/container_test.py
@@ -1,43 +1,15 @@
-# Copyright 2013 dotCloud inc.
-
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import base64
-import datetime
-import gzip
-import io
import json
-import os
-import re
-import shutil
import signal
-import socket
-import sys
-import tarfile
-import tempfile
-import threading
-import time
-import random
import docker
-import requests
+import pytest
import six
-from . import base
from . import fake_api
-from .helpers import make_tree
-
-import pytest
+from .api_test import (
+ DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS,
+ fake_inspect_container
+)
try:
from unittest import mock
@@ -45,370 +17,154 @@ except ImportError:
import mock
-DEFAULT_TIMEOUT_SECONDS = docker.constants.DEFAULT_TIMEOUT_SECONDS
-
-
-def response(status_code=200, content='', headers=None, reason=None, elapsed=0,
- request=None):
- res = requests.Response()
- res.status_code = status_code
- if not isinstance(content, six.binary_type):
- content = json.dumps(content).encode('ascii')
- res._content = content
- res.headers = requests.structures.CaseInsensitiveDict(headers or {})
- res.reason = reason
- res.elapsed = datetime.timedelta(elapsed)
- res.request = request
- return res
-
-
-def fake_resolve_authconfig(authconfig, registry=None):
- return None
-
-
-def fake_inspect_container(self, container, tty=False):
- return fake_api.get_fake_inspect_container(tty=tty)[1]
-
-
def fake_inspect_container_tty(self, container):
return fake_inspect_container(self, container, tty=True)
-def fake_resp(method, url, *args, **kwargs):
- key = None
- if url in fake_api.fake_responses:
- key = url
- elif (url, method) in fake_api.fake_responses:
- key = (url, method)
- if not key:
- raise Exception('{0} {1}'.format(method, url))
- status_code, content = fake_api.fake_responses[key]()
- return response(status_code=status_code, content=content)
-
-
-fake_request = mock.Mock(side_effect=fake_resp)
-
-
-def fake_get(self, url, *args, **kwargs):
- return fake_request('GET', url, *args, **kwargs)
-
-
-def fake_post(self, url, *args, **kwargs):
- return fake_request('POST', url, *args, **kwargs)
-
-
-def fake_put(self, url, *args, **kwargs):
- return fake_request('PUT', url, *args, **kwargs)
-
-
-def fake_delete(self, url, *args, **kwargs):
- return fake_request('DELETE', url, *args, **kwargs)
-
-url_base = 'http+docker://localunixsocket/'
-url_prefix = '{0}v{1}/'.format(
- url_base,
- docker.constants.DEFAULT_DOCKER_API_VERSION)
-
-
-class Cleanup(object):
- if sys.version_info < (2, 7):
- # Provide a basic implementation of addCleanup for Python < 2.7
- def __init__(self, *args, **kwargs):
- super(Cleanup, self).__init__(*args, **kwargs)
- self._cleanups = []
-
- def tearDown(self):
- super(Cleanup, self).tearDown()
- ok = True
- while self._cleanups:
- fn, args, kwargs = self._cleanups.pop(-1)
- try:
- fn(*args, **kwargs)
- except KeyboardInterrupt:
- raise
- except:
- ok = False
- if not ok:
- raise
-
- def addCleanup(self, function, *args, **kwargs):
- self._cleanups.append((function, args, kwargs))
-
-
-@mock.patch.multiple('docker.Client', get=fake_get, post=fake_post,
- put=fake_put, delete=fake_delete)
-class DockerClientTest(Cleanup, base.BaseTestCase):
- def setUp(self):
- self.client = docker.Client()
- # Force-clear authconfig to avoid tampering with the tests
- self.client._cfg = {'Configs': {}}
-
- def tearDown(self):
- self.client.close()
-
- def assertIn(self, object, collection):
- if six.PY2 and sys.version_info[1] <= 6:
- return self.assertTrue(object in collection)
- return super(DockerClientTest, self).assertIn(object, collection)
-
- def base_create_payload(self, img='busybox', cmd=None):
- if not cmd:
- cmd = ['true']
- return {"Tty": False, "Image": img, "Cmd": cmd,
- "AttachStdin": False,
- "AttachStderr": True, "AttachStdout": True,
- "StdinOnce": False,
- "OpenStdin": False, "NetworkDisabled": False,
- }
-
- def test_ctor(self):
- with pytest.raises(docker.errors.DockerException) as excinfo:
- docker.Client(version=1.12)
-
- self.assertEqual(
- str(excinfo.value),
- 'Version parameter must be a string or None. Found float'
- )
+class StartContainerTest(DockerClientTest):
+ def test_start_container(self):
+ self.client.start(fake_api.FAKE_CONTAINER_ID)
- def test_url_valid_resource(self):
- url = self.client._url('/hello/{0}/world', 'somename')
+ args = fake_request.call_args
self.assertEqual(
- url, '{0}{1}'.format(url_prefix, 'hello/somename/world')
- )
-
- url = self.client._url(
- '/hello/{0}/world/{1}', 'somename', 'someothername'
+ args[0][1],
+ url_prefix + 'containers/3cc2351ab11b/start'
)
+ self.assertEqual(json.loads(args[1]['data']), {})
self.assertEqual(
- url,
- '{0}{1}'.format(url_prefix, 'hello/somename/world/someothername')
+ args[1]['headers'], {'Content-Type': 'application/json'}
)
-
- url = self.client._url('/hello/{0}/world', '/some?name')
self.assertEqual(
- url, '{0}{1}'.format(url_prefix, 'hello/%2Fsome%3Fname/world')
+ args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS
)
- def test_url_invalid_resource(self):
- with pytest.raises(ValueError):
- self.client._url('/hello/{0}/world', ['sakuya', 'izayoi'])
-
- def test_url_no_resource(self):
- url = self.client._url('/simple')
- self.assertEqual(url, '{0}{1}'.format(url_prefix, 'simple'))
+ def test_start_container_none(self):
+ with pytest.raises(ValueError) as excinfo:
+ self.client.start(container=None)
- def test_url_unversioned_api(self):
- url = self.client._url(
- '/hello/{0}/world', 'somename', versioned_api=False
- )
self.assertEqual(
- url, '{0}{1}'.format(url_base, 'hello/somename/world')
- )
-
- #########################
- # INFORMATION TESTS #
- #########################
- def test_version(self):
- self.client.version()
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'version',
- timeout=DEFAULT_TIMEOUT_SECONDS
+ str(excinfo.value),
+ 'image or container param is undefined',
)
- def test_version_no_api_version(self):
- self.client.version(False)
+ with pytest.raises(ValueError) as excinfo:
+ self.client.start(None)
- fake_request.assert_called_with(
- 'GET',
- url_base + 'version',
- timeout=DEFAULT_TIMEOUT_SECONDS
+ self.assertEqual(
+ str(excinfo.value),
+ 'image or container param is undefined',
)
- def test_retrieve_server_version(self):
- client = docker.Client(version="auto")
- self.assertTrue(isinstance(client._version, six.string_types))
- self.assertFalse(client._version == "auto")
- client.close()
+ def test_start_container_regression_573(self):
+ self.client.start(**{'container': fake_api.FAKE_CONTAINER_ID})
- def test_auto_retrieve_server_version(self):
- version = self.client._retrieve_server_version()
- self.assertTrue(isinstance(version, six.string_types))
+ def test_start_container_with_lxc_conf(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID,
+ lxc_conf={'lxc.conf.k': 'lxc.conf.value'}
+ )
- def test_info(self):
- self.client.info()
+ pytest.deprecated_call(call_start)
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'info',
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
+ def test_start_container_with_lxc_conf_compat(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID,
+ lxc_conf=[{'Key': 'lxc.conf.k', 'Value': 'lxc.conf.value'}]
+ )
- def test_search(self):
- self.client.search('busybox')
+ pytest.deprecated_call(call_start)
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/search',
- params={'term': 'busybox'},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
+ def test_start_container_with_binds_ro(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID, binds={
+ '/tmp': {
+ "bind": '/mnt',
+ "ro": True
+ }
+ }
+ )
- def test_image_viz(self):
- with pytest.raises(Exception):
- self.client.images('busybox', viz=True)
- self.fail('Viz output should not be supported!')
+ pytest.deprecated_call(call_start)
- def test_events(self):
- self.client.events()
+ def test_start_container_with_binds_rw(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID, binds={
+ '/tmp': {"bind": '/mnt', "ro": False}
+ }
+ )
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'events',
- params={'since': None, 'until': None, 'filters': None},
- stream=True
- )
+ pytest.deprecated_call(call_start)
- def test_events_with_since_until(self):
- ts = 1356048000
- now = datetime.datetime.utcfromtimestamp(ts)
- since = now - datetime.timedelta(seconds=10)
- until = now + datetime.timedelta(seconds=10)
+ def test_start_container_with_port_binds(self):
+ self.maxDiff = None
- self.client.events(since=since, until=until)
+ def call_start():
+ self.client.start(fake_api.FAKE_CONTAINER_ID, port_bindings={
+ 1111: None,
+ 2222: 2222,
+ '3333/udp': (3333,),
+ 4444: ('127.0.0.1',),
+ 5555: ('127.0.0.1', 5555),
+ 6666: [('127.0.0.1',), ('192.168.0.1',)]
+ })
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'events',
- params={
- 'since': ts - 10,
- 'until': ts + 10,
- 'filters': None
- },
- stream=True
- )
+ pytest.deprecated_call(call_start)
- def test_events_with_filters(self):
- filters = {'event': ['die', 'stop'],
- 'container': fake_api.FAKE_CONTAINER_ID}
+ def test_start_container_with_links(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID, links={'path': 'alias'}
+ )
- self.client.events(filters=filters)
+ pytest.deprecated_call(call_start)
- expected_filters = docker.utils.convert_filters(filters)
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'events',
- params={
- 'since': None,
- 'until': None,
- 'filters': expected_filters
- },
- stream=True
- )
+ def test_start_container_with_multiple_links(self):
+ def call_start():
+ self.client.start(
+ fake_api.FAKE_CONTAINER_ID,
+ links={
+ 'path1': 'alias1',
+ 'path2': 'alias2'
+ }
+ )
- ###################
- # LISTING TESTS #
- ###################
+ pytest.deprecated_call(call_start)
- def test_images(self):
- self.client.images(all=True)
+ def test_start_container_with_links_as_list_of_tuples(self):
+ def call_start():
+ self.client.start(fake_api.FAKE_CONTAINER_ID,
+ links=[('path', 'alias')])
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/json',
- params={'filter': None, 'only_ids': 0, 'all': 1},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
+ pytest.deprecated_call(call_start)
- def test_images_quiet(self):
- self.client.images(all=True, quiet=True)
+ def test_start_container_privileged(self):
+ def call_start():
+ self.client.start(fake_api.FAKE_CONTAINER_ID, privileged=True)
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/json',
- params={'filter': None, 'only_ids': 1, 'all': 1},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
+ pytest.deprecated_call(call_start)
- def test_image_ids(self):
- self.client.images(quiet=True)
+ def test_start_container_with_dict_instead_of_id(self):
+ self.client.start({'Id': fake_api.FAKE_CONTAINER_ID})
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/json',
- params={'filter': None, 'only_ids': 1, 'all': 0},
- timeout=DEFAULT_TIMEOUT_SECONDS
+ args = fake_request.call_args
+ self.assertEqual(
+ args[0][1],
+ url_prefix + 'containers/3cc2351ab11b/start'
)
-
- def test_images_filters(self):
- self.client.images(filters={'dangling': True})
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/json',
- params={'filter': None, 'only_ids': 0, 'all': 0,
- 'filters': '{"dangling": ["true"]}'},
- timeout=DEFAULT_TIMEOUT_SECONDS
+ self.assertEqual(json.loads(args[1]['data']), {})
+ self.assertEqual(
+ args[1]['headers'], {'Content-Type': 'application/json'}
)
-
- def test_list_containers(self):
- self.client.containers(all=True)
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'containers/json',
- params={
- 'all': 1,
- 'since': None,
- 'size': 0,
- 'limit': -1,
- 'trunc_cmd': 0,
- 'before': None
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
+ self.assertEqual(
+ args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS
)
- @base.requires_api_version('1.21')
- def test_list_networks(self):
- networks = [
- {
- "name": "none",
- "id": "8e4e55c6863ef424",
- "type": "null",
- "endpoints": []
- },
- {
- "name": "host",
- "id": "062b6d9ea7913fde",
- "type": "host",
- "endpoints": []
- },
- ]
-
- get = mock.Mock(return_value=response(
- status_code=200, content=json.dumps(networks).encode('utf-8')))
-
- with mock.patch('docker.Client.get', get):
- self.assertEqual(self.client.networks(), networks)
-
- self.assertEqual(get.call_args[0][0], url_prefix + 'networks')
-
- filters = json.loads(get.call_args[1]['params']['filters'])
- self.assertFalse(filters)
-
- self.client.networks(names=['foo'])
- filters = json.loads(get.call_args[1]['params']['filters'])
- self.assertEqual(filters, {'name': ['foo']})
-
- self.client.networks(ids=['123'])
- filters = json.loads(get.call_args[1]['params']['filters'])
- self.assertEqual(filters, {'id': ['123']})
-
- #####################
- # CONTAINER TESTS #
- #####################
+class CreateContainerTest(DockerClientTest):
def test_create_container(self):
self.client.create_container('busybox', 'true')
@@ -709,42 +465,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
self.client.create_host_config, mem_limit='1f28'
)
- def test_start_container(self):
- self.client.start(fake_api.FAKE_CONTAINER_ID)
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1],
- url_prefix + 'containers/3cc2351ab11b/start'
- )
- self.assertEqual(json.loads(args[1]['data']), {})
- self.assertEqual(
- args[1]['headers'], {'Content-Type': 'application/json'}
- )
- self.assertEqual(
- args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_start_container_none(self):
- with pytest.raises(ValueError) as excinfo:
- self.client.start(container=None)
-
- self.assertEqual(
- str(excinfo.value),
- 'image or container param is undefined',
- )
-
- with pytest.raises(ValueError) as excinfo:
- self.client.start(None)
-
- self.assertEqual(
- str(excinfo.value),
- 'image or container param is undefined',
- )
-
- def test_start_container_regression_573(self):
- self.client.start(**{'container': fake_api.FAKE_CONTAINER_ID})
-
def test_create_container_with_lxc_conf(self):
self.client.create_container(
'busybox', 'true', host_config=self.client.create_host_config(
@@ -1070,111 +790,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
DEFAULT_TIMEOUT_SECONDS
)
- def test_start_container_with_lxc_conf(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID,
- lxc_conf={'lxc.conf.k': 'lxc.conf.value'}
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_lxc_conf_compat(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID,
- lxc_conf=[{'Key': 'lxc.conf.k', 'Value': 'lxc.conf.value'}]
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_binds_ro(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID, binds={
- '/tmp': {
- "bind": '/mnt',
- "ro": True
- }
- }
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_binds_rw(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID, binds={
- '/tmp': {"bind": '/mnt', "ro": False}
- }
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_port_binds(self):
- self.maxDiff = None
-
- def call_start():
- self.client.start(fake_api.FAKE_CONTAINER_ID, port_bindings={
- 1111: None,
- 2222: 2222,
- '3333/udp': (3333,),
- 4444: ('127.0.0.1',),
- 5555: ('127.0.0.1', 5555),
- 6666: [('127.0.0.1',), ('192.168.0.1',)]
- })
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_links(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID, links={'path': 'alias'}
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_multiple_links(self):
- def call_start():
- self.client.start(
- fake_api.FAKE_CONTAINER_ID,
- links={
- 'path1': 'alias1',
- 'path2': 'alias2'
- }
- )
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_links_as_list_of_tuples(self):
- def call_start():
- self.client.start(fake_api.FAKE_CONTAINER_ID,
- links=[('path', 'alias')])
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_privileged(self):
- def call_start():
- self.client.start(fake_api.FAKE_CONTAINER_ID, privileged=True)
-
- pytest.deprecated_call(call_start)
-
- def test_start_container_with_dict_instead_of_id(self):
- self.client.start({'Id': fake_api.FAKE_CONTAINER_ID})
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1],
- url_prefix + 'containers/3cc2351ab11b/start'
- )
- self.assertEqual(json.loads(args[1]['data']), {})
- self.assertEqual(
- args[1]['headers'], {'Content-Type': 'application/json'}
- )
- self.assertEqual(
- args[1]['timeout'], DEFAULT_TIMEOUT_SECONDS
- )
-
def test_create_container_with_restart_policy(self):
self.client.create_container(
'busybox', 'true', host_config=self.client.create_host_config(
@@ -1348,6 +963,25 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
DEFAULT_TIMEOUT_SECONDS
)
+
+class ContainerTest(DockerClientTest):
+ def test_list_containers(self):
+ self.client.containers(all=True)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'containers/json',
+ params={
+ 'all': 1,
+ 'since': None,
+ 'size': 0,
+ 'limit': -1,
+ 'trunc_cmd': 0,
+ 'before': None
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
def test_resize_container(self):
self.client.resize(
{'Id': fake_api.FAKE_CONTAINER_ID},
@@ -1393,35 +1027,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
timeout=None
)
- def _socket_path_for_client_session(self, client):
- socket_adapter = client.get_adapter('http+docker://')
- return socket_adapter.socket_path
-
- def test_url_compatibility_unix(self):
- c = docker.Client(base_url="unix://socket")
-
- assert self._socket_path_for_client_session(c) == '/socket'
-
- def test_url_compatibility_unix_triple_slash(self):
- c = docker.Client(base_url="unix:///socket")
-
- assert self._socket_path_for_client_session(c) == '/socket'
-
- def test_url_compatibility_http_unix_triple_slash(self):
- c = docker.Client(base_url="http+unix:///socket")
-
- assert self._socket_path_for_client_session(c) == '/socket'
-
- def test_url_compatibility_http(self):
- c = docker.Client(base_url="http://hostname:1234")
-
- assert c.base_url == "http://hostname:1234"
-
- def test_url_compatibility_tcp(self):
- c = docker.Client(base_url="tcp://hostname:1234")
-
- assert c.base_url == "http://hostname:1234"
-
def test_logs(self):
with mock.patch('docker.Client.inspect_container',
fake_inspect_container):
@@ -1560,73 +1165,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
timeout=(DEFAULT_TIMEOUT_SECONDS + timeout)
)
- def test_exec_create(self):
- self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1'])
-
- args = fake_request.call_args
- self.assertEqual(
- 'POST',
- args[0][0], url_prefix + 'containers/{0}/exec'.format(
- fake_api.FAKE_CONTAINER_ID
- )
- )
-
- self.assertEqual(
- json.loads(args[1]['data']), {
- 'Tty': False,
- 'AttachStdout': True,
- 'Container': fake_api.FAKE_CONTAINER_ID,
- 'Cmd': ['ls', '-1'],
- 'Privileged': False,
- 'AttachStdin': False,
- 'AttachStderr': True,
- 'User': ''
- }
- )
-
- self.assertEqual(args[1]['headers'],
- {'Content-Type': 'application/json'})
-
- def test_exec_start(self):
- self.client.exec_start(fake_api.FAKE_EXEC_ID)
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1], url_prefix + 'exec/{0}/start'.format(
- fake_api.FAKE_EXEC_ID
- )
- )
-
- self.assertEqual(
- json.loads(args[1]['data']), {
- 'Tty': False,
- 'Detach': False,
- }
- )
-
- self.assertEqual(args[1]['headers'],
- {'Content-Type': 'application/json'})
-
- def test_exec_inspect(self):
- self.client.exec_inspect(fake_api.FAKE_EXEC_ID)
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1], url_prefix + 'exec/{0}/json'.format(
- fake_api.FAKE_EXEC_ID
- )
- )
-
- def test_exec_resize(self):
- self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID),
- params={'h': 20, 'w': 60},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
def test_pause_container(self):
self.client.pause(fake_api.FAKE_CONTAINER_ID)
@@ -1715,16 +1253,6 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
timeout=DEFAULT_TIMEOUT_SECONDS
)
- def test_remove_link(self):
- self.client.remove_container(fake_api.FAKE_CONTAINER_ID, link=True)
-
- fake_request.assert_called_with(
- 'DELETE',
- url_prefix + 'containers/3cc2351ab11b',
- params={'v': False, 'link': True, 'force': False},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
def test_export(self):
self.client.export(fake_api.FAKE_CONTAINER_ID)
@@ -1792,851 +1320,3 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
params={'ps_args': 'waux'},
timeout=DEFAULT_TIMEOUT_SECONDS
)
-
- ##################
- # IMAGES TESTS #
- ##################
-
- def test_pull(self):
- self.client.pull('joffrey/test001')
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1],
- url_prefix + 'images/create'
- )
- self.assertEqual(
- args[1]['params'],
- {'tag': None, 'fromImage': 'joffrey/test001'}
- )
- self.assertFalse(args[1]['stream'])
-
- def test_pull_stream(self):
- self.client.pull('joffrey/test001', stream=True)
-
- args = fake_request.call_args
- self.assertEqual(
- args[0][1],
- url_prefix + 'images/create'
- )
- self.assertEqual(
- args[1]['params'],
- {'tag': None, 'fromImage': 'joffrey/test001'}
- )
- self.assertTrue(args[1]['stream'])
-
- def test_commit(self):
- self.client.commit(fake_api.FAKE_CONTAINER_ID)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'commit',
- data='{}',
- headers={'Content-Type': 'application/json'},
- params={
- 'repo': None,
- 'comment': None,
- 'tag': None,
- 'container': '3cc2351ab11b',
- 'author': None
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_remove_image(self):
- self.client.remove_image(fake_api.FAKE_IMAGE_ID)
-
- fake_request.assert_called_with(
- 'DELETE',
- url_prefix + 'images/e9aa60c60128',
- params={'force': False, 'noprune': False},
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_image_history(self):
- self.client.history(fake_api.FAKE_IMAGE_NAME)
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/test_image/history',
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_import_image(self):
- self.client.import_image(
- fake_api.FAKE_TARBALL_PATH,
- repository=fake_api.FAKE_REPO_NAME,
- tag=fake_api.FAKE_TAG_NAME
- )
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/create',
- params={
- 'repo': fake_api.FAKE_REPO_NAME,
- 'tag': fake_api.FAKE_TAG_NAME,
- 'fromSrc': fake_api.FAKE_TARBALL_PATH
- },
- data=None,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_import_image_from_bytes(self):
- stream = (i for i in range(0, 100))
-
- self.client.import_image(
- stream,
- repository=fake_api.FAKE_REPO_NAME,
- tag=fake_api.FAKE_TAG_NAME
- )
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/create',
- params={
- 'repo': fake_api.FAKE_REPO_NAME,
- 'tag': fake_api.FAKE_TAG_NAME,
- 'fromSrc': '-',
- },
- headers={
- 'Content-Type': 'application/tar',
- },
- data=stream,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_import_image_from_image(self):
- self.client.import_image(
- image=fake_api.FAKE_IMAGE_NAME,
- repository=fake_api.FAKE_REPO_NAME,
- tag=fake_api.FAKE_TAG_NAME
- )
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/create',
- params={
- 'repo': fake_api.FAKE_REPO_NAME,
- 'tag': fake_api.FAKE_TAG_NAME,
- 'fromImage': fake_api.FAKE_IMAGE_NAME
- },
- data=None,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_inspect_image(self):
- self.client.inspect_image(fake_api.FAKE_IMAGE_NAME)
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/test_image/json',
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_inspect_image_undefined_id(self):
- for arg in None, '', {True: True}:
- with pytest.raises(docker.errors.NullResource) as excinfo:
- self.client.inspect_image(arg)
-
- self.assertEqual(
- excinfo.value.args[0], 'image or container param is undefined'
- )
-
- def test_insert_image(self):
- try:
- self.client.insert(fake_api.FAKE_IMAGE_NAME,
- fake_api.FAKE_URL, fake_api.FAKE_PATH)
- except docker.errors.DeprecatedMethod:
- self.assertTrue(
- docker.utils.compare_version('1.12', self.client._version) >= 0
- )
- return
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/test_image/insert',
- params={
- 'url': fake_api.FAKE_URL,
- 'path': fake_api.FAKE_PATH
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_push_image(self):
- with mock.patch('docker.auth.auth.resolve_authconfig',
- fake_resolve_authconfig):
- self.client.push(fake_api.FAKE_IMAGE_NAME)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/test_image/push',
- params={
- 'tag': None
- },
- data='{}',
- headers={'Content-Type': 'application/json'},
- stream=False,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_push_image_with_tag(self):
- with mock.patch('docker.auth.auth.resolve_authconfig',
- fake_resolve_authconfig):
- self.client.push(
- fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME
- )
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/test_image/push',
- params={
- 'tag': fake_api.FAKE_TAG_NAME,
- },
- data='{}',
- headers={'Content-Type': 'application/json'},
- stream=False,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_push_image_stream(self):
- with mock.patch('docker.auth.auth.resolve_authconfig',
- fake_resolve_authconfig):
- self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/test_image/push',
- params={
- 'tag': None
- },
- data='{}',
- headers={'Content-Type': 'application/json'},
- stream=True,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_tag_image(self):
- self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/e9aa60c60128/tag',
- params={
- 'tag': None,
- 'repo': 'repo',
- 'force': 0
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_tag_image_tag(self):
- self.client.tag(
- fake_api.FAKE_IMAGE_ID,
- fake_api.FAKE_REPO_NAME,
- tag=fake_api.FAKE_TAG_NAME
- )
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/e9aa60c60128/tag',
- params={
- 'tag': 'tag',
- 'repo': 'repo',
- 'force': 0
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_tag_image_force(self):
- self.client.tag(
- fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True)
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/e9aa60c60128/tag',
- params={
- 'tag': None,
- 'repo': 'repo',
- 'force': 1
- },
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_get_image(self):
- self.client.get_image(fake_api.FAKE_IMAGE_ID)
-
- fake_request.assert_called_with(
- 'GET',
- url_prefix + 'images/e9aa60c60128/get',
- stream=True,
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- def test_load_image(self):
- self.client.load_image('Byte Stream....')
-
- fake_request.assert_called_with(
- 'POST',
- url_prefix + 'images/load',
- data='Byte Stream....',
- timeout=DEFAULT_TIMEOUT_SECONDS
- )
-
- #################
- # BUILDER TESTS #
- #################
-
- def test_build_container(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
-
- self.client.build(fileobj=script)
-
- def test_build_container_pull(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
-
- self.client.build(fileobj=script, pull=True)
-
- def test_build_container_stream(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
-
- self.client.build(fileobj=script, stream=True)
-
- def test_build_container_custom_context(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
- context = docker.utils.mkbuildcontext(script)
-
- self.client.build(fileobj=context, custom_context=True)
-
- def test_build_container_custom_context_gzip(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
- context = docker.utils.mkbuildcontext(script)
- gz_context = gzip.GzipFile(fileobj=context)
-
- self.client.build(
- fileobj=gz_context,
- custom_context=True,
- encoding="gzip"
- )
-
- def test_build_remote_with_registry_auth(self):
- self.client._auth_configs = {
- 'https://example.com': {
- 'user': 'example',
- 'password': 'example',
- 'email': 'example@example.com'
- }
- }
-
- self.client.build(path='https://github.com/docker-library/mongo')
-
- def test_build_container_with_named_dockerfile(self):
- self.client.build('.', dockerfile='nameddockerfile')
-
- def test_build_container_with_container_limits(self):
- self.client.build('.', container_limits={
- 'memory': 1024 * 1024,
- 'cpusetcpus': 1,
- 'cpushares': 1000,
- 'memswap': 1024 * 1024 * 8
- })
-
- def test_build_container_invalid_container_limits(self):
- self.assertRaises(
- docker.errors.DockerException,
- lambda: self.client.build('.', container_limits={
- 'foo': 'bar'
- })
- )
-
- ###################
- # VOLUMES TESTS #
- ###################
-
- @base.requires_api_version('1.21')
- def test_list_volumes(self):
- volumes = self.client.volumes()
- self.assertIn('Volumes', volumes)
- self.assertEqual(len(volumes['Volumes']), 2)
- args = fake_request.call_args
-
- self.assertEqual(args[0][0], 'GET')
- self.assertEqual(args[0][1], url_prefix + 'volumes')
-
- @base.requires_api_version('1.21')
- def test_create_volume(self):
- name = 'perfectcherryblossom'
- result = self.client.create_volume(name)
- self.assertIn('Name', result)
- self.assertEqual(result['Name'], name)
- self.assertIn('Driver', result)
- self.assertEqual(result['Driver'], 'local')
- args = fake_request.call_args
-
- self.assertEqual(args[0][0], 'POST')
- self.assertEqual(args[0][1], url_prefix + 'volumes/create')
- self.assertEqual(json.loads(args[1]['data']), {'Name': name})
-
- @base.requires_api_version('1.21')
- def test_create_volume_with_driver(self):
- name = 'perfectcherryblossom'
- driver_name = 'sshfs'
- self.client.create_volume(name, driver=driver_name)
- args = fake_request.call_args
-
- self.assertEqual(args[0][0], 'POST')
- self.assertEqual(args[0][1], url_prefix + 'volumes/create')
- data = json.loads(args[1]['data'])
- self.assertIn('Driver', data)
- self.assertEqual(data['Driver'], driver_name)
-
- @base.requires_api_version('1.21')
- def test_create_volume_invalid_opts_type(self):
- with pytest.raises(TypeError):
- self.client.create_volume(
- 'perfectcherryblossom', driver_opts='hello=world'
- )
-
- with pytest.raises(TypeError):
- self.client.create_volume(
- 'perfectcherryblossom', driver_opts=['hello=world']
- )
-
- with pytest.raises(TypeError):
- self.client.create_volume(
- 'perfectcherryblossom', driver_opts=''
- )
-
- @base.requires_api_version('1.21')
- def test_inspect_volume(self):
- name = 'perfectcherryblossom'
- result = self.client.inspect_volume(name)
- self.assertIn('Name', result)
- self.assertEqual(result['Name'], name)
- self.assertIn('Driver', result)
- self.assertEqual(result['Driver'], 'local')
- args = fake_request.call_args
-
- self.assertEqual(args[0][0], 'GET')
- self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
-
- @base.requires_api_version('1.21')
- def test_remove_volume(self):
- name = 'perfectcherryblossom'
- result = self.client.remove_volume(name)
- self.assertTrue(result)
- args = fake_request.call_args
-
- self.assertEqual(args[0][0], 'DELETE')
- self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
-
- #####################
- # NETWORK TESTS #
- #####################
-
- @base.requires_api_version('1.21')
- def test_create_network(self):
- network_data = {
- "id": 'abc12345',
- "warning": "",
- }
-
- network_response = response(status_code=200, content=network_data)
- post = mock.Mock(return_value=network_response)
-
- with mock.patch('docker.Client.post', post):
- result = self.client.create_network('foo')
- self.assertEqual(result, network_data)
-
- self.assertEqual(
- post.call_args[0][0],
- url_prefix + 'networks/create')
-
- self.assertEqual(
- json.loads(post.call_args[1]['data']),
- {"name": "foo"})
-
- self.client.create_network('foo', 'bridge')
-
- self.assertEqual(
- json.loads(post.call_args[1]['data']),
- {"name": "foo", "driver": "bridge"})
-
- @base.requires_api_version('1.21')
- def test_remove_network(self):
- network_id = 'abc12345'
- delete = mock.Mock(return_value=response(status_code=200))
-
- with mock.patch('docker.Client.delete', delete):
- self.client.remove_network(network_id)
-
- args = delete.call_args
- self.assertEqual(args[0][0],
- url_prefix + 'networks/{0}'.format(network_id))
-
- @base.requires_api_version('1.21')
- def test_inspect_network(self):
- network_id = 'abc12345'
- network_name = 'foo'
- network_data = {
- six.u('name'): network_name,
- six.u('id'): network_id,
- six.u('driver'): 'bridge',
- six.u('containers'): {},
- }
-
- network_response = response(status_code=200, content=network_data)
- get = mock.Mock(return_value=network_response)
-
- with mock.patch('docker.Client.get', get):
- result = self.client.inspect_network(network_id)
- self.assertEqual(result, network_data)
-
- args = get.call_args
- self.assertEqual(args[0][0],
- url_prefix + 'networks/{0}'.format(network_id))
-
- @base.requires_api_version('1.21')
- def test_connect_container_to_network(self):
- network_id = 'abc12345'
- container_id = 'def45678'
-
- post = mock.Mock(return_value=response(status_code=201))
-
- with mock.patch('docker.Client.post', post):
- self.client.connect_container_to_network(
- {'Id': container_id}, network_id)
-
- self.assertEqual(
- post.call_args[0][0],
- url_prefix + 'networks/{0}/connect'.format(network_id))
-
- self.assertEqual(
- json.loads(post.call_args[1]['data']),
- {'container': container_id})
-
- @base.requires_api_version('1.21')
- def test_disconnect_container_from_network(self):
- network_id = 'abc12345'
- container_id = 'def45678'
-
- post = mock.Mock(return_value=response(status_code=201))
-
- with mock.patch('docker.Client.post', post):
- self.client.disconnect_container_from_network(
- {'Id': container_id}, network_id)
-
- self.assertEqual(
- post.call_args[0][0],
- url_prefix + 'networks/{0}/disconnect'.format(network_id))
-
- self.assertEqual(
- json.loads(post.call_args[1]['data']),
- {'container': container_id})
-
- #######################
- # PY SPECIFIC TESTS #
- #######################
-
- def test_load_config_no_file(self):
- folder = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, folder)
- cfg = docker.auth.load_config(folder)
- self.assertTrue(cfg is not None)
-
- def test_load_config(self):
- folder = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, folder)
- dockercfg_path = os.path.join(folder, '.dockercfg')
- with open(dockercfg_path, 'w') as f:
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- f.write('auth = {0}\n'.format(auth_))
- f.write('email = sakuya@scarlet.net')
- cfg = docker.auth.load_config(dockercfg_path)
- assert docker.auth.INDEX_NAME in cfg
- self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
- cfg = cfg[docker.auth.INDEX_NAME]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('auth'), None)
-
- def test_load_config_with_random_name(self):
- folder = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, folder)
-
- dockercfg_path = os.path.join(folder,
- '.{0}.dockercfg'.format(
- random.randrange(100000)))
- registry = 'https://your.private.registry.io'
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- config = {
- registry: {
- 'auth': '{0}'.format(auth_),
- 'email': 'sakuya@scarlet.net'
- }
- }
-
- with open(dockercfg_path, 'w') as f:
- json.dump(config, f)
-
- cfg = docker.auth.load_config(dockercfg_path)
- assert registry in cfg
- self.assertNotEqual(cfg[registry], None)
- cfg = cfg[registry]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('auth'), None)
-
- def test_load_config_custom_config_env(self):
- folder = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, folder)
-
- dockercfg_path = os.path.join(folder, 'config.json')
- registry = 'https://your.private.registry.io'
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- config = {
- registry: {
- 'auth': '{0}'.format(auth_),
- 'email': 'sakuya@scarlet.net'
- }
- }
-
- with open(dockercfg_path, 'w') as f:
- json.dump(config, f)
-
- with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
- cfg = docker.auth.load_config(None)
- assert registry in cfg
- self.assertNotEqual(cfg[registry], None)
- cfg = cfg[registry]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('auth'), None)
-
- def test_tar_with_excludes(self):
- dirs = [
- 'foo',
- 'foo/bar',
- 'bar',
- ]
-
- files = [
- 'Dockerfile',
- 'Dockerfile.alt',
- '.dockerignore',
- 'a.py',
- 'a.go',
- 'b.py',
- 'cde.py',
- 'foo/a.py',
- 'foo/b.py',
- 'foo/bar/a.py',
- 'bar/a.py',
- ]
-
- exclude = [
- '*.py',
- '!b.py',
- '!a.go',
- 'foo',
- 'Dockerfile*',
- '.dockerignore',
- ]
-
- expected_names = set([
- 'Dockerfile',
- '.dockerignore',
- 'a.go',
- 'b.py',
- 'bar',
- 'bar/a.py',
- ])
-
- base = make_tree(dirs, files)
- self.addCleanup(shutil.rmtree, base)
-
- with docker.utils.tar(base, exclude=exclude) as archive:
- tar = tarfile.open(fileobj=archive)
- assert sorted(tar.getnames()) == sorted(expected_names)
-
- def test_tar_with_empty_directory(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
- with docker.utils.tar(base) as archive:
- tar = tarfile.open(fileobj=archive)
- self.assertEqual(sorted(tar.getnames()), ['bar', 'foo'])
-
- def test_tar_with_file_symlinks(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- with open(os.path.join(base, 'foo'), 'w') as f:
- f.write("content")
- os.makedirs(os.path.join(base, 'bar'))
- os.symlink('../foo', os.path.join(base, 'bar/foo'))
- with docker.utils.tar(base) as archive:
- tar = tarfile.open(fileobj=archive)
- self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo'])
-
- def test_tar_with_directory_symlinks(self):
- base = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base)
- for d in ['foo', 'bar']:
- os.makedirs(os.path.join(base, d))
- os.symlink('../foo', os.path.join(base, 'bar/foo'))
- with docker.utils.tar(base) as archive:
- tar = tarfile.open(fileobj=archive)
- self.assertEqual(sorted(tar.getnames()), ['bar', 'bar/foo', 'foo'])
-
- #######################
- # HOST CONFIG TESTS #
- #######################
-
- def test_create_host_config_secopt(self):
- security_opt = ['apparmor:test_profile']
- result = self.client.create_host_config(security_opt=security_opt)
- self.assertIn('SecurityOpt', result)
- self.assertEqual(result['SecurityOpt'], security_opt)
-
- self.assertRaises(
- docker.errors.DockerException, self.client.create_host_config,
- security_opt='wrong'
- )
-
-
-class StreamTest(Cleanup, base.BaseTestCase):
-
- def setUp(self):
- socket_dir = tempfile.mkdtemp()
- self.build_context = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, socket_dir)
- self.addCleanup(shutil.rmtree, self.build_context)
- self.socket_file = os.path.join(socket_dir, 'test_sock.sock')
- self.server_socket = self._setup_socket()
- self.stop_server = False
- server_thread = threading.Thread(target=self.run_server)
- server_thread.setDaemon(True)
- server_thread.start()
- self.response = None
- self.request_handler = None
- self.addCleanup(server_thread.join)
- self.addCleanup(self.stop)
-
- def stop(self):
- self.stop_server = True
-
- def _setup_socket(self):
- server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- server_sock.bind(self.socket_file)
- # Non-blocking mode so that we can shut the test down easily
- server_sock.setblocking(0)
- server_sock.listen(5)
- return server_sock
-
- def run_server(self):
- try:
- while not self.stop_server:
- try:
- connection, client_address = self.server_socket.accept()
- except socket.error:
- # Probably no connection to accept yet
- time.sleep(0.01)
- continue
-
- connection.setblocking(1)
- try:
- self.request_handler(connection)
- finally:
- connection.close()
- finally:
- self.server_socket.close()
-
- def early_response_sending_handler(self, connection):
- data = b''
- headers = None
-
- connection.sendall(self.response)
- while not headers:
- data += connection.recv(2048)
- parts = data.split(b'\r\n\r\n', 1)
- if len(parts) == 2:
- headers, data = parts
-
- mo = re.search(r'Content-Length: ([0-9]+)', headers.decode())
- assert mo
- content_length = int(mo.group(1))
-
- while True:
- if len(data) >= content_length:
- break
-
- data += connection.recv(2048)
-
- def test_early_stream_response(self):
- self.request_handler = self.early_response_sending_handler
- lines = []
- for i in range(0, 50):
- line = str(i).encode()
- lines += [('%x' % len(line)).encode(), line]
- lines.append(b'0')
- lines.append(b'')
-
- self.response = (
- b'HTTP/1.1 200 OK\r\n'
- b'Transfer-Encoding: chunked\r\n'
- b'\r\n'
- ) + b'\r\n'.join(lines)
-
- with docker.Client(base_url="http+unix://" + self.socket_file) \
- as client:
- for i in range(5):
- try:
- stream = client.build(
- path=self.build_context,
- stream=True
- )
- break
- except requests.ConnectionError as e:
- if i == 4:
- raise e
-
- self.assertEqual(list(stream), [
- str(i).encode() for i in range(50)])
diff --git a/tests/unit/exec_test.py b/tests/unit/exec_test.py
new file mode 100644
index 0000000..3007799
--- /dev/null
+++ b/tests/unit/exec_test.py
@@ -0,0 +1,75 @@
+import json
+
+from . import fake_api
+from .api_test import (
+ DockerClientTest, url_prefix, fake_request, DEFAULT_TIMEOUT_SECONDS,
+)
+
+
+class ExecTest(DockerClientTest):
+ def test_exec_create(self):
+ self.client.exec_create(fake_api.FAKE_CONTAINER_ID, ['ls', '-1'])
+
+ args = fake_request.call_args
+ self.assertEqual(
+ 'POST',
+ args[0][0], url_prefix + 'containers/{0}/exec'.format(
+ fake_api.FAKE_CONTAINER_ID
+ )
+ )
+
+ self.assertEqual(
+ json.loads(args[1]['data']), {
+ 'Tty': False,
+ 'AttachStdout': True,
+ 'Container': fake_api.FAKE_CONTAINER_ID,
+ 'Cmd': ['ls', '-1'],
+ 'Privileged': False,
+ 'AttachStdin': False,
+ 'AttachStderr': True,
+ 'User': ''
+ }
+ )
+
+ self.assertEqual(args[1]['headers'],
+ {'Content-Type': 'application/json'})
+
+ def test_exec_start(self):
+ self.client.exec_start(fake_api.FAKE_EXEC_ID)
+
+ args = fake_request.call_args
+ self.assertEqual(
+ args[0][1], url_prefix + 'exec/{0}/start'.format(
+ fake_api.FAKE_EXEC_ID
+ )
+ )
+
+ self.assertEqual(
+ json.loads(args[1]['data']), {
+ 'Tty': False,
+ 'Detach': False,
+ }
+ )
+
+ self.assertEqual(args[1]['headers'],
+ {'Content-Type': 'application/json'})
+
+ def test_exec_inspect(self):
+ self.client.exec_inspect(fake_api.FAKE_EXEC_ID)
+
+ args = fake_request.call_args
+ self.assertEqual(
+ args[0][1], url_prefix + 'exec/{0}/json'.format(
+ fake_api.FAKE_EXEC_ID
+ )
+ )
+
+ def test_exec_resize(self):
+ self.client.exec_resize(fake_api.FAKE_EXEC_ID, height=20, width=60)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'exec/{0}/resize'.format(fake_api.FAKE_EXEC_ID),
+ params={'h': 20, 'w': 60},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
diff --git a/tests/fake_api.py b/tests/unit/fake_api.py
index 8852da0..8852da0 100644
--- a/tests/fake_api.py
+++ b/tests/unit/fake_api.py
diff --git a/tests/fake_stat.py b/tests/unit/fake_stat.py
index a7f1029..a7f1029 100644
--- a/tests/fake_stat.py
+++ b/tests/unit/fake_stat.py
diff --git a/tests/unit/image_test.py b/tests/unit/image_test.py
new file mode 100644
index 0000000..a46e48e
--- /dev/null
+++ b/tests/unit/image_test.py
@@ -0,0 +1,346 @@
+import docker
+import pytest
+
+from . import fake_api
+from .api_test import (
+ DockerClientTest, fake_request, DEFAULT_TIMEOUT_SECONDS, url_prefix,
+ fake_resolve_authconfig
+)
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class ImageTest(DockerClientTest):
+ def test_image_viz(self):
+ with pytest.raises(Exception):
+ self.client.images('busybox', viz=True)
+ self.fail('Viz output should not be supported!')
+
+ def test_images(self):
+ self.client.images(all=True)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/json',
+ params={'filter': None, 'only_ids': 0, 'all': 1},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_images_quiet(self):
+ self.client.images(all=True, quiet=True)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/json',
+ params={'filter': None, 'only_ids': 1, 'all': 1},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_image_ids(self):
+ self.client.images(quiet=True)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/json',
+ params={'filter': None, 'only_ids': 1, 'all': 0},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_images_filters(self):
+ self.client.images(filters={'dangling': True})
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/json',
+ params={'filter': None, 'only_ids': 0, 'all': 0,
+ 'filters': '{"dangling": ["true"]}'},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_pull(self):
+ self.client.pull('joffrey/test001')
+
+ args = fake_request.call_args
+ self.assertEqual(
+ args[0][1],
+ url_prefix + 'images/create'
+ )
+ self.assertEqual(
+ args[1]['params'],
+ {'tag': None, 'fromImage': 'joffrey/test001'}
+ )
+ self.assertFalse(args[1]['stream'])
+
+ def test_pull_stream(self):
+ self.client.pull('joffrey/test001', stream=True)
+
+ args = fake_request.call_args
+ self.assertEqual(
+ args[0][1],
+ url_prefix + 'images/create'
+ )
+ self.assertEqual(
+ args[1]['params'],
+ {'tag': None, 'fromImage': 'joffrey/test001'}
+ )
+ self.assertTrue(args[1]['stream'])
+
+ def test_commit(self):
+ self.client.commit(fake_api.FAKE_CONTAINER_ID)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'commit',
+ data='{}',
+ headers={'Content-Type': 'application/json'},
+ params={
+ 'repo': None,
+ 'comment': None,
+ 'tag': None,
+ 'container': '3cc2351ab11b',
+ 'author': None
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_remove_image(self):
+ self.client.remove_image(fake_api.FAKE_IMAGE_ID)
+
+ fake_request.assert_called_with(
+ 'DELETE',
+ url_prefix + 'images/e9aa60c60128',
+ params={'force': False, 'noprune': False},
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_image_history(self):
+ self.client.history(fake_api.FAKE_IMAGE_NAME)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/test_image/history',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_import_image(self):
+ self.client.import_image(
+ fake_api.FAKE_TARBALL_PATH,
+ repository=fake_api.FAKE_REPO_NAME,
+ tag=fake_api.FAKE_TAG_NAME
+ )
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/create',
+ params={
+ 'repo': fake_api.FAKE_REPO_NAME,
+ 'tag': fake_api.FAKE_TAG_NAME,
+ 'fromSrc': fake_api.FAKE_TARBALL_PATH
+ },
+ data=None,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_import_image_from_bytes(self):
+ stream = (i for i in range(0, 100))
+
+ self.client.import_image(
+ stream,
+ repository=fake_api.FAKE_REPO_NAME,
+ tag=fake_api.FAKE_TAG_NAME
+ )
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/create',
+ params={
+ 'repo': fake_api.FAKE_REPO_NAME,
+ 'tag': fake_api.FAKE_TAG_NAME,
+ 'fromSrc': '-',
+ },
+ headers={
+ 'Content-Type': 'application/tar',
+ },
+ data=stream,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_import_image_from_image(self):
+ self.client.import_image(
+ image=fake_api.FAKE_IMAGE_NAME,
+ repository=fake_api.FAKE_REPO_NAME,
+ tag=fake_api.FAKE_TAG_NAME
+ )
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/create',
+ params={
+ 'repo': fake_api.FAKE_REPO_NAME,
+ 'tag': fake_api.FAKE_TAG_NAME,
+ 'fromImage': fake_api.FAKE_IMAGE_NAME
+ },
+ data=None,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_inspect_image(self):
+ self.client.inspect_image(fake_api.FAKE_IMAGE_NAME)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/test_image/json',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_inspect_image_undefined_id(self):
+ for arg in None, '', {True: True}:
+ with pytest.raises(docker.errors.NullResource) as excinfo:
+ self.client.inspect_image(arg)
+
+ self.assertEqual(
+ excinfo.value.args[0], 'image or container param is undefined'
+ )
+
+ def test_insert_image(self):
+ try:
+ self.client.insert(fake_api.FAKE_IMAGE_NAME,
+ fake_api.FAKE_URL, fake_api.FAKE_PATH)
+ except docker.errors.DeprecatedMethod:
+ self.assertTrue(
+ docker.utils.compare_version('1.12', self.client._version) >= 0
+ )
+ return
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/test_image/insert',
+ params={
+ 'url': fake_api.FAKE_URL,
+ 'path': fake_api.FAKE_PATH
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_push_image(self):
+ with mock.patch('docker.auth.auth.resolve_authconfig',
+ fake_resolve_authconfig):
+ self.client.push(fake_api.FAKE_IMAGE_NAME)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/test_image/push',
+ params={
+ 'tag': None
+ },
+ data='{}',
+ headers={'Content-Type': 'application/json'},
+ stream=False,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_push_image_with_tag(self):
+ with mock.patch('docker.auth.auth.resolve_authconfig',
+ fake_resolve_authconfig):
+ self.client.push(
+ fake_api.FAKE_IMAGE_NAME, tag=fake_api.FAKE_TAG_NAME
+ )
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/test_image/push',
+ params={
+ 'tag': fake_api.FAKE_TAG_NAME,
+ },
+ data='{}',
+ headers={'Content-Type': 'application/json'},
+ stream=False,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_push_image_stream(self):
+ with mock.patch('docker.auth.auth.resolve_authconfig',
+ fake_resolve_authconfig):
+ self.client.push(fake_api.FAKE_IMAGE_NAME, stream=True)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/test_image/push',
+ params={
+ 'tag': None
+ },
+ data='{}',
+ headers={'Content-Type': 'application/json'},
+ stream=True,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_tag_image(self):
+ self.client.tag(fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/e9aa60c60128/tag',
+ params={
+ 'tag': None,
+ 'repo': 'repo',
+ 'force': 0
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_tag_image_tag(self):
+ self.client.tag(
+ fake_api.FAKE_IMAGE_ID,
+ fake_api.FAKE_REPO_NAME,
+ tag=fake_api.FAKE_TAG_NAME
+ )
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/e9aa60c60128/tag',
+ params={
+ 'tag': 'tag',
+ 'repo': 'repo',
+ 'force': 0
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_tag_image_force(self):
+ self.client.tag(
+ fake_api.FAKE_IMAGE_ID, fake_api.FAKE_REPO_NAME, force=True)
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/e9aa60c60128/tag',
+ params={
+ 'tag': None,
+ 'repo': 'repo',
+ 'force': 1
+ },
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_get_image(self):
+ self.client.get_image(fake_api.FAKE_IMAGE_ID)
+
+ fake_request.assert_called_with(
+ 'GET',
+ url_prefix + 'images/e9aa60c60128/get',
+ stream=True,
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
+
+ def test_load_image(self):
+ self.client.load_image('Byte Stream....')
+
+ fake_request.assert_called_with(
+ 'POST',
+ url_prefix + 'images/load',
+ data='Byte Stream....',
+ timeout=DEFAULT_TIMEOUT_SECONDS
+ )
diff --git a/tests/unit/network_test.py b/tests/unit/network_test.py
new file mode 100644
index 0000000..306ee4f
--- /dev/null
+++ b/tests/unit/network_test.py
@@ -0,0 +1,149 @@
+import json
+
+import six
+
+from .. import base
+from .api_test import DockerClientTest, url_prefix, response
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class NetworkTest(DockerClientTest):
+ @base.requires_api_version('1.21')
+ def test_list_networks(self):
+ networks = [
+ {
+ "name": "none",
+ "id": "8e4e55c6863ef424",
+ "type": "null",
+ "endpoints": []
+ },
+ {
+ "name": "host",
+ "id": "062b6d9ea7913fde",
+ "type": "host",
+ "endpoints": []
+ },
+ ]
+
+ get = mock.Mock(return_value=response(
+ status_code=200, content=json.dumps(networks).encode('utf-8')))
+
+ with mock.patch('docker.Client.get', get):
+ self.assertEqual(self.client.networks(), networks)
+
+ self.assertEqual(get.call_args[0][0], url_prefix + 'networks')
+
+ filters = json.loads(get.call_args[1]['params']['filters'])
+ self.assertFalse(filters)
+
+ self.client.networks(names=['foo'])
+ filters = json.loads(get.call_args[1]['params']['filters'])
+ self.assertEqual(filters, {'name': ['foo']})
+
+ self.client.networks(ids=['123'])
+ filters = json.loads(get.call_args[1]['params']['filters'])
+ self.assertEqual(filters, {'id': ['123']})
+
+ @base.requires_api_version('1.21')
+ def test_create_network(self):
+ network_data = {
+ "id": 'abc12345',
+ "warning": "",
+ }
+
+ network_response = response(status_code=200, content=network_data)
+ post = mock.Mock(return_value=network_response)
+
+ with mock.patch('docker.Client.post', post):
+ result = self.client.create_network('foo')
+ self.assertEqual(result, network_data)
+
+ self.assertEqual(
+ post.call_args[0][0],
+ url_prefix + 'networks/create')
+
+ self.assertEqual(
+ json.loads(post.call_args[1]['data']),
+ {"name": "foo"})
+
+ self.client.create_network('foo', 'bridge')
+
+ self.assertEqual(
+ json.loads(post.call_args[1]['data']),
+ {"name": "foo", "driver": "bridge"})
+
+ @base.requires_api_version('1.21')
+ def test_remove_network(self):
+ network_id = 'abc12345'
+ delete = mock.Mock(return_value=response(status_code=200))
+
+ with mock.patch('docker.Client.delete', delete):
+ self.client.remove_network(network_id)
+
+ args = delete.call_args
+ self.assertEqual(args[0][0],
+ url_prefix + 'networks/{0}'.format(network_id))
+
+ @base.requires_api_version('1.21')
+ def test_inspect_network(self):
+ network_id = 'abc12345'
+ network_name = 'foo'
+ network_data = {
+ six.u('name'): network_name,
+ six.u('id'): network_id,
+ six.u('driver'): 'bridge',
+ six.u('containers'): {},
+ }
+
+ network_response = response(status_code=200, content=network_data)
+ get = mock.Mock(return_value=network_response)
+
+ with mock.patch('docker.Client.get', get):
+ result = self.client.inspect_network(network_id)
+ self.assertEqual(result, network_data)
+
+ args = get.call_args
+ self.assertEqual(args[0][0],
+ url_prefix + 'networks/{0}'.format(network_id))
+
+ @base.requires_api_version('1.21')
+ def test_connect_container_to_network(self):
+ network_id = 'abc12345'
+ container_id = 'def45678'
+
+ post = mock.Mock(return_value=response(status_code=201))
+
+ with mock.patch('docker.Client.post', post):
+ self.client.connect_container_to_network(
+ {'Id': container_id}, network_id)
+
+ self.assertEqual(
+ post.call_args[0][0],
+ url_prefix + 'networks/{0}/connect'.format(network_id))
+
+ self.assertEqual(
+ json.loads(post.call_args[1]['data']),
+ {'container': container_id})
+
+ @base.requires_api_version('1.21')
+ def test_disconnect_container_from_network(self):
+ network_id = 'abc12345'
+ container_id = 'def45678'
+
+ post = mock.Mock(return_value=response(status_code=201))
+
+ with mock.patch('docker.Client.post', post):
+ self.client.disconnect_container_from_network(
+ {'Id': container_id}, network_id)
+
+ self.assertEqual(
+ post.call_args[0][0],
+ url_prefix + 'networks/{0}/disconnect'.format(network_id))
+
+ self.assertEqual(
+ json.loads(post.call_args[1]['data']),
+ {'container': container_id})
diff --git a/tests/testdata/certs/cert.pem b/tests/unit/testdata/certs/ca.pem
index e69de29..e69de29 100644
--- a/tests/testdata/certs/cert.pem
+++ b/tests/unit/testdata/certs/ca.pem
diff --git a/tests/testdata/certs/key.pem b/tests/unit/testdata/certs/cert.pem
index e69de29..e69de29 100644
--- a/tests/testdata/certs/key.pem
+++ b/tests/unit/testdata/certs/cert.pem
diff --git a/tests/unit/testdata/certs/key.pem b/tests/unit/testdata/certs/key.pem
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/testdata/certs/key.pem
diff --git a/tests/utils_test.py b/tests/unit/utils_test.py
index 04183f9..fffa77a 100644
--- a/tests/utils_test.py
+++ b/tests/unit/utils_test.py
@@ -5,6 +5,7 @@ import json
import os
import os.path
import shutil
+import tarfile
import tempfile
import pytest
@@ -16,15 +17,12 @@ from docker.errors import DockerException
from docker.utils import (
parse_repository_tag, parse_host, convert_filters, kwargs_from_env,
create_host_config, Ulimit, LogConfig, parse_bytes, parse_env_file,
- exclude_paths, convert_volume_binds, decode_json_header
+ exclude_paths, convert_volume_binds, decode_json_header, tar
)
from docker.utils.ports import build_port_bindings, split_port
-from docker.auth import (
- resolve_repository_name, resolve_authconfig, encode_header
-)
-from . import base
-from .helpers import make_tree
+from .. import base
+from ..helpers import make_tree
TEST_CERT_DIR = os.path.join(
@@ -186,20 +184,7 @@ class KwargsFromEnvTest(base.BaseTestCase):
shutil.rmtree(temp_dir)
-class UtilsTest(base.BaseTestCase):
- longMessage = True
-
- def generate_tempfile(self, file_content=None):
- """
- Generates a temporary file for tests with the content
- of 'file_content' and returns the filename.
- Don't forget to unlink the file with os.unlink() after.
- """
- local_tempfile = tempfile.NamedTemporaryFile(delete=False)
- local_tempfile.write(file_content.encode('UTF-8'))
- local_tempfile.close()
- return local_tempfile.name
-
+class ConverVolumeBindsTest(base.BaseTestCase):
def test_convert_volume_binds_empty(self):
self.assertEqual(convert_volume_binds({}), [])
self.assertEqual(convert_volume_binds([]), [])
@@ -283,26 +268,43 @@ class UtilsTest(base.BaseTestCase):
convert_volume_binds(data), expected
)
- def test_parse_repository_tag(self):
- self.assertEqual(parse_repository_tag("root"),
- ("root", None))
- self.assertEqual(parse_repository_tag("root:tag"),
- ("root", "tag"))
- self.assertEqual(parse_repository_tag("user/repo"),
- ("user/repo", None))
- self.assertEqual(parse_repository_tag("user/repo:tag"),
- ("user/repo", "tag"))
- self.assertEqual(parse_repository_tag("url:5000/repo"),
- ("url:5000/repo", None))
- self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
- ("url:5000/repo", "tag"))
- def test_parse_bytes(self):
- self.assertEqual(parse_bytes("512MB"), (536870912))
- self.assertEqual(parse_bytes("512M"), (536870912))
- self.assertRaises(DockerException, parse_bytes, "512MK")
- self.assertRaises(DockerException, parse_bytes, "512L")
+class ParseEnvFileTest(base.BaseTestCase):
+ def generate_tempfile(self, file_content=None):
+ """
+ Generates a temporary file for tests with the content
+ of 'file_content' and returns the filename.
+ Don't forget to unlink the file with os.unlink() after.
+ """
+ local_tempfile = tempfile.NamedTemporaryFile(delete=False)
+ local_tempfile.write(file_content.encode('UTF-8'))
+ local_tempfile.close()
+ return local_tempfile.name
+ def test_parse_env_file_proper(self):
+ env_file = self.generate_tempfile(
+ file_content='USER=jdoe\nPASS=secret')
+ get_parse_env_file = parse_env_file(env_file)
+ self.assertEqual(get_parse_env_file,
+ {'USER': 'jdoe', 'PASS': 'secret'})
+ os.unlink(env_file)
+
+ def test_parse_env_file_commented_line(self):
+ env_file = self.generate_tempfile(
+ file_content='USER=jdoe\n#PASS=secret')
+ get_parse_env_file = parse_env_file((env_file))
+ self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
+ os.unlink(env_file)
+
+ def test_parse_env_file_invalid_line(self):
+ env_file = self.generate_tempfile(
+ file_content='USER jdoe')
+ self.assertRaises(
+ DockerException, parse_env_file, env_file)
+ os.unlink(env_file)
+
+
+class ParseHostTest(base.BaseTestCase):
def test_parse_host(self):
invalid_hosts = [
'0.0.0.0',
@@ -341,27 +343,29 @@ class UtilsTest(base.BaseTestCase):
assert parse_host(val, 'win32') == tcp_port
- def test_parse_env_file_proper(self):
- env_file = self.generate_tempfile(
- file_content='USER=jdoe\nPASS=secret')
- get_parse_env_file = parse_env_file(env_file)
- self.assertEqual(get_parse_env_file,
- {'USER': 'jdoe', 'PASS': 'secret'})
- os.unlink(env_file)
- def test_parse_env_file_commented_line(self):
- env_file = self.generate_tempfile(
- file_content='USER=jdoe\n#PASS=secret')
- get_parse_env_file = parse_env_file((env_file))
- self.assertEqual(get_parse_env_file, {'USER': 'jdoe'})
- os.unlink(env_file)
+class UtilsTest(base.BaseTestCase):
+ longMessage = True
- def test_parse_env_file_invalid_line(self):
- env_file = self.generate_tempfile(
- file_content='USER jdoe')
- self.assertRaises(
- DockerException, parse_env_file, env_file)
- os.unlink(env_file)
+ def test_parse_repository_tag(self):
+ self.assertEqual(parse_repository_tag("root"),
+ ("root", None))
+ self.assertEqual(parse_repository_tag("root:tag"),
+ ("root", "tag"))
+ self.assertEqual(parse_repository_tag("user/repo"),
+ ("user/repo", None))
+ self.assertEqual(parse_repository_tag("user/repo:tag"),
+ ("user/repo", "tag"))
+ self.assertEqual(parse_repository_tag("url:5000/repo"),
+ ("url:5000/repo", None))
+ self.assertEqual(parse_repository_tag("url:5000/repo:tag"),
+ ("url:5000/repo", "tag"))
+
+ def test_parse_bytes(self):
+ self.assertEqual(parse_bytes("512MB"), (536870912))
+ self.assertEqual(parse_bytes("512M"), (536870912))
+ self.assertRaises(DockerException, parse_bytes, "512MK")
+ self.assertRaises(DockerException, parse_bytes, "512L")
def test_convert_filters(self):
tests = [
@@ -384,168 +388,6 @@ class UtilsTest(base.BaseTestCase):
decoded_data = decode_json_header(data)
self.assertEqual(obj, decoded_data)
- def test_803_urlsafe_encode(self):
- auth_data = {
- 'username': 'root',
- 'password': 'GR?XGR?XGR?XGR?X'
- }
- encoded = encode_header(auth_data)
- assert b'/' not in encoded
- assert b'_' in encoded
-
- def test_resolve_repository_name(self):
- # docker hub library image
- self.assertEqual(
- resolve_repository_name('image'),
- ('index.docker.io', 'image'),
- )
-
- # docker hub image
- self.assertEqual(
- resolve_repository_name('username/image'),
- ('index.docker.io', 'username/image'),
- )
-
- # private registry
- self.assertEqual(
- resolve_repository_name('my.registry.net/image'),
- ('my.registry.net', 'image'),
- )
-
- # private registry with port
- self.assertEqual(
- resolve_repository_name('my.registry.net:5000/image'),
- ('my.registry.net:5000', 'image'),
- )
-
- # private registry with username
- self.assertEqual(
- resolve_repository_name('my.registry.net/username/image'),
- ('my.registry.net', 'username/image'),
- )
-
- # no dots but port
- self.assertEqual(
- resolve_repository_name('hostname:5000/image'),
- ('hostname:5000', 'image'),
- )
-
- # no dots but port and username
- self.assertEqual(
- resolve_repository_name('hostname:5000/username/image'),
- ('hostname:5000', 'username/image'),
- )
-
- # localhost
- self.assertEqual(
- resolve_repository_name('localhost/image'),
- ('localhost', 'image'),
- )
-
- # localhost with username
- self.assertEqual(
- resolve_repository_name('localhost/username/image'),
- ('localhost', 'username/image'),
- )
-
- def test_resolve_authconfig(self):
- auth_config = {
- 'https://index.docker.io/v1/': {'auth': 'indexuser'},
- 'my.registry.net': {'auth': 'privateuser'},
- 'http://legacy.registry.url/v1/': {'auth': 'legacyauth'}
- }
- # hostname only
- self.assertEqual(
- resolve_authconfig(auth_config, 'my.registry.net'),
- {'auth': 'privateuser'}
- )
- # no protocol
- self.assertEqual(
- resolve_authconfig(auth_config, 'my.registry.net/v1/'),
- {'auth': 'privateuser'}
- )
- # no path
- self.assertEqual(
- resolve_authconfig(auth_config, 'http://my.registry.net'),
- {'auth': 'privateuser'}
- )
- # no path, trailing slash
- self.assertEqual(
- resolve_authconfig(auth_config, 'http://my.registry.net/'),
- {'auth': 'privateuser'}
- )
- # no path, wrong secure protocol
- self.assertEqual(
- resolve_authconfig(auth_config, 'https://my.registry.net'),
- {'auth': 'privateuser'}
- )
- # no path, wrong insecure protocol
- self.assertEqual(
- resolve_authconfig(auth_config, 'http://index.docker.io'),
- {'auth': 'indexuser'}
- )
- # with path, wrong protocol
- self.assertEqual(
- resolve_authconfig(auth_config, 'https://my.registry.net/v1/'),
- {'auth': 'privateuser'}
- )
- # default registry
- self.assertEqual(
- resolve_authconfig(auth_config), {'auth': 'indexuser'}
- )
- # default registry (explicit None)
- self.assertEqual(
- resolve_authconfig(auth_config, None), {'auth': 'indexuser'}
- )
- # fully explicit
- self.assertEqual(
- resolve_authconfig(auth_config, 'http://my.registry.net/v1/'),
- {'auth': 'privateuser'}
- )
- # legacy entry in config
- self.assertEqual(
- resolve_authconfig(auth_config, 'legacy.registry.url'),
- {'auth': 'legacyauth'}
- )
- # no matching entry
- self.assertTrue(
- resolve_authconfig(auth_config, 'does.not.exist') is None
- )
-
- def test_resolve_registry_and_auth(self):
- auth_config = {
- 'https://index.docker.io/v1/': {'auth': 'indexuser'},
- 'my.registry.net': {'auth': 'privateuser'},
- }
-
- # library image
- image = 'image'
- self.assertEqual(
- resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
- {'auth': 'indexuser'},
- )
-
- # docker hub image
- image = 'username/image'
- self.assertEqual(
- resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
- {'auth': 'indexuser'},
- )
-
- # private registry
- image = 'my.registry.net/image'
- self.assertEqual(
- resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
- {'auth': 'privateuser'},
- )
-
- # unauthenticated registry
- image = 'other.registry.net/image'
- self.assertEqual(
- resolve_authconfig(auth_config, resolve_repository_name(image)[0]),
- None,
- )
-
class PortsTest(base.BaseTestCase):
def test_split_port_with_host_ip(self):
@@ -790,3 +632,85 @@ class ExcludePathsTest(base.BaseTestCase):
assert self.exclude(['foo/bar']) == self.all_paths - set([
'foo/bar', 'foo/bar/a.py',
])
+
+
+class TarTest(base.Cleanup, base.BaseTestCase):
+ def test_tar_with_excludes(self):
+ dirs = [
+ 'foo',
+ 'foo/bar',
+ 'bar',
+ ]
+
+ files = [
+ 'Dockerfile',
+ 'Dockerfile.alt',
+ '.dockerignore',
+ 'a.py',
+ 'a.go',
+ 'b.py',
+ 'cde.py',
+ 'foo/a.py',
+ 'foo/b.py',
+ 'foo/bar/a.py',
+ 'bar/a.py',
+ ]
+
+ exclude = [
+ '*.py',
+ '!b.py',
+ '!a.go',
+ 'foo',
+ 'Dockerfile*',
+ '.dockerignore',
+ ]
+
+ expected_names = set([
+ 'Dockerfile',
+ '.dockerignore',
+ 'a.go',
+ 'b.py',
+ 'bar',
+ 'bar/a.py',
+ ])
+
+ base = make_tree(dirs, files)
+ self.addCleanup(shutil.rmtree, base)
+
+ with tar(base, exclude=exclude) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ assert sorted(tar_data.getnames()) == sorted(expected_names)
+
+ def test_tar_with_empty_directory(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ self.assertEqual(sorted(tar_data.getnames()), ['bar', 'foo'])
+
+ def test_tar_with_file_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ with open(os.path.join(base, 'foo'), 'w') as f:
+ f.write("content")
+ os.makedirs(os.path.join(base, 'bar'))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ self.assertEqual(
+ sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
+ )
+
+ def test_tar_with_directory_symlinks(self):
+ base = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base)
+ for d in ['foo', 'bar']:
+ os.makedirs(os.path.join(base, d))
+ os.symlink('../foo', os.path.join(base, 'bar/foo'))
+ with tar(base) as archive:
+ tar_data = tarfile.open(fileobj=archive)
+ self.assertEqual(
+ sorted(tar_data.getnames()), ['bar', 'bar/foo', 'foo']
+ )
diff --git a/tests/unit/volume_test.py b/tests/unit/volume_test.py
new file mode 100644
index 0000000..4c2f877
--- /dev/null
+++ b/tests/unit/volume_test.py
@@ -0,0 +1,85 @@
+import json
+
+import pytest
+
+from .. import base
+from .api_test import DockerClientTest, url_prefix, fake_request
+
+
+class VolumeTest(DockerClientTest):
+ @base.requires_api_version('1.21')
+ def test_list_volumes(self):
+ volumes = self.client.volumes()
+ self.assertIn('Volumes', volumes)
+ self.assertEqual(len(volumes['Volumes']), 2)
+ args = fake_request.call_args
+
+ self.assertEqual(args[0][0], 'GET')
+ self.assertEqual(args[0][1], url_prefix + 'volumes')
+
+ @base.requires_api_version('1.21')
+ def test_create_volume(self):
+ name = 'perfectcherryblossom'
+ result = self.client.create_volume(name)
+ self.assertIn('Name', result)
+ self.assertEqual(result['Name'], name)
+ self.assertIn('Driver', result)
+ self.assertEqual(result['Driver'], 'local')
+ args = fake_request.call_args
+
+ self.assertEqual(args[0][0], 'POST')
+ self.assertEqual(args[0][1], url_prefix + 'volumes/create')
+ self.assertEqual(json.loads(args[1]['data']), {'Name': name})
+
+ @base.requires_api_version('1.21')
+ def test_create_volume_with_driver(self):
+ name = 'perfectcherryblossom'
+ driver_name = 'sshfs'
+ self.client.create_volume(name, driver=driver_name)
+ args = fake_request.call_args
+
+ self.assertEqual(args[0][0], 'POST')
+ self.assertEqual(args[0][1], url_prefix + 'volumes/create')
+ data = json.loads(args[1]['data'])
+ self.assertIn('Driver', data)
+ self.assertEqual(data['Driver'], driver_name)
+
+ @base.requires_api_version('1.21')
+ def test_create_volume_invalid_opts_type(self):
+ with pytest.raises(TypeError):
+ self.client.create_volume(
+ 'perfectcherryblossom', driver_opts='hello=world'
+ )
+
+ with pytest.raises(TypeError):
+ self.client.create_volume(
+ 'perfectcherryblossom', driver_opts=['hello=world']
+ )
+
+ with pytest.raises(TypeError):
+ self.client.create_volume(
+ 'perfectcherryblossom', driver_opts=''
+ )
+
+ @base.requires_api_version('1.21')
+ def test_inspect_volume(self):
+ name = 'perfectcherryblossom'
+ result = self.client.inspect_volume(name)
+ self.assertIn('Name', result)
+ self.assertEqual(result['Name'], name)
+ self.assertIn('Driver', result)
+ self.assertEqual(result['Driver'], 'local')
+ args = fake_request.call_args
+
+ self.assertEqual(args[0][0], 'GET')
+ self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
+
+ @base.requires_api_version('1.21')
+ def test_remove_volume(self):
+ name = 'perfectcherryblossom'
+ result = self.client.remove_volume(name)
+ self.assertTrue(result)
+ args = fake_request.call_args
+
+ self.assertEqual(args[0][0], 'DELETE')
+ self.assertEqual(args[0][1], '{0}volumes/{1}'.format(url_prefix, name))
diff --git a/tox.ini b/tox.ini
index eb31bee..96b9177 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,7 +5,7 @@ skipsdist=True
[testenv]
usedevelop=True
commands =
- py.test --cov=docker tests/test.py tests/utils_test.py
+ py.test --cov=docker tests/unit/
deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt