diff options
Diffstat (limited to 'tests/integration_test.py')
-rw-r--r-- | tests/integration_test.py | 2044 |
1 files changed, 4 insertions, 2040 deletions
diff --git a/tests/integration_test.py b/tests/integration_test.py index 85eb8da..73cbf93 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -1,2040 +1,4 @@ -# Copyright 2013 dotCloud inc. - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import contextlib -import errno -import json -import io -import os -import random -import shutil -import signal -import socket -import struct -import tarfile -import tempfile -import threading -import time -import unittest -import warnings - -import pytest -import six -from six.moves import BaseHTTPServer -from six.moves import socketserver - -import docker -from docker.errors import APIError, NotFound -from docker.utils import kwargs_from_env - -from . import helpers -from .base import requires_api_version -from .test import Cleanup - - -# FIXME: missing tests for -# export; history; insert; port; push; tag; get; load; stats - -warnings.simplefilter('error') -compare_version = docker.utils.compare_version - -EXEC_DRIVER = [] -BUSYBOX = 'busybox:buildroot-2014.02' - - -def exec_driver_is_native(): - global EXEC_DRIVER - if not EXEC_DRIVER: - c = docker_client() - EXEC_DRIVER = c.info()['ExecutionDriver'] - c.close() - return EXEC_DRIVER.startswith('native') - - -def docker_client(**kwargs): - return docker.Client(**docker_client_kwargs(**kwargs)) - - -def docker_client_kwargs(**kwargs): - client_kwargs = kwargs_from_env(assert_hostname=False) - client_kwargs.update(kwargs) - return client_kwargs - - -def setup_module(): - c = docker_client() - try: - c.inspect_image(BUSYBOX) - except NotFound: - os.write(2, "\npulling busybox\n".encode('utf-8')) - for data in c.pull(BUSYBOX, stream=True): - data = json.loads(data.decode('utf-8')) - os.write(2, ("%c[2K\r" % 27).encode('utf-8')) - status = data.get("status") - progress = data.get("progress") - detail = "{0} - {1}".format(status, progress).encode('utf-8') - os.write(2, detail) - os.write(2, "\npulled busybox\n".encode('utf-8')) - - # Double make sure we now have busybox - c.inspect_image(BUSYBOX) - c.close() - - -class BaseTestCase(unittest.TestCase): - tmp_imgs = [] - tmp_containers = [] - tmp_folders = [] - tmp_volumes = [] - - def setUp(self): - if six.PY2: - self.assertRegex = self.assertRegexpMatches - self.assertCountEqual = self.assertItemsEqual - self.client = docker_client(timeout=60) - self.tmp_imgs = [] - self.tmp_containers = [] - self.tmp_folders = [] - self.tmp_volumes = [] - self.tmp_networks = [] - - def tearDown(self): - for img in self.tmp_imgs: - try: - self.client.remove_image(img) - except docker.errors.APIError: - pass - for container in self.tmp_containers: - try: - self.client.stop(container, timeout=1) - self.client.remove_container(container) - except docker.errors.APIError: - pass - for network in self.tmp_networks: - try: - self.client.remove_network(network) - except docker.errors.APIError: - pass - for folder in self.tmp_folders: - shutil.rmtree(folder) - - for volume in self.tmp_volumes: - try: - self.client.remove_volume(volume) - except docker.errors.APIError: - pass - - self.client.close() - - def run_container(self, *args, **kwargs): - container = self.client.create_container(*args, **kwargs) - self.tmp_containers.append(container) - self.client.start(container) - exitcode = self.client.wait(container) - - if exitcode != 0: - output = self.client.logs(container) - raise Exception( - "Container exited with code {}:\n{}" - .format(exitcode, output)) - - return container - - -######################### -# INFORMATION TESTS # -######################### - - -class TestVersion(BaseTestCase): - def runTest(self): - res = self.client.version() - self.assertIn('GoVersion', res) - self.assertIn('Version', res) - self.assertEqual(len(res['Version'].split('.')), 3) - - -class TestInfo(BaseTestCase): - def runTest(self): - res = self.client.info() - self.assertIn('Containers', res) - self.assertIn('Images', res) - self.assertIn('Debug', res) - - -class TestSearch(BaseTestCase): - def runTest(self): - self.client = docker_client(timeout=10) - res = self.client.search('busybox') - self.assertTrue(len(res) >= 1) - base_img = [x for x in res if x['name'] == 'busybox'] - self.assertEqual(len(base_img), 1) - self.assertIn('description', base_img[0]) - -################### -# LISTING TESTS # -################### - - -class TestImages(BaseTestCase): - def runTest(self): - res1 = self.client.images(all=True) - self.assertIn('Id', res1[0]) - res10 = res1[0] - self.assertIn('Created', res10) - self.assertIn('RepoTags', res10) - distinct = [] - for img in res1: - if img['Id'] not in distinct: - distinct.append(img['Id']) - self.assertEqual(len(distinct), self.client.info()['Images']) - - -class TestImageIds(BaseTestCase): - def runTest(self): - res1 = self.client.images(quiet=True) - self.assertEqual(type(res1[0]), six.text_type) - - -class TestListContainers(BaseTestCase): - def runTest(self): - res0 = self.client.containers(all=True) - size = len(res0) - res1 = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res1) - self.client.start(res1['Id']) - self.tmp_containers.append(res1['Id']) - res2 = self.client.containers(all=True) - self.assertEqual(size + 1, len(res2)) - retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])] - self.assertEqual(len(retrieved), 1) - retrieved = retrieved[0] - self.assertIn('Command', retrieved) - self.assertEqual(retrieved['Command'], six.text_type('true')) - self.assertIn('Image', retrieved) - self.assertRegex(retrieved['Image'], r'busybox:.*') - self.assertIn('Status', retrieved) - -##################### -# CONTAINER TESTS # -##################### - - -class TestCreateContainer(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - - -class TestCreateContainerWithBinds(BaseTestCase): - def setUp(self): - super(TestCreateContainerWithBinds, self).setUp() - - self.mount_dest = '/mnt' - - # Get a random pathname - we don't need it to exist locally - self.mount_origin = tempfile.mkdtemp() - shutil.rmtree(self.mount_origin) - - self.filename = 'shared.txt' - - self.run_with_volume( - False, - BUSYBOX, - ['touch', os.path.join(self.mount_dest, self.filename)], - ) - - def run_with_volume(self, ro, *args, **kwargs): - return self.run_container( - *args, - volumes={self.mount_dest: {}}, - host_config=self.client.create_host_config( - binds={ - self.mount_origin: { - 'bind': self.mount_dest, - 'ro': ro, - }, - }, - network_mode='none' - ), - **kwargs - ) - - def test_rw(self): - container = self.run_with_volume( - False, - BUSYBOX, - ['ls', self.mount_dest], - ) - logs = self.client.logs(container) - - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn(self.filename, logs) - inspect_data = self.client.inspect_container(container) - self.check_container_data(inspect_data, True) - - def test_ro(self): - container = self.run_with_volume( - True, - BUSYBOX, - ['ls', self.mount_dest], - ) - logs = self.client.logs(container) - - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn(self.filename, logs) - - inspect_data = self.client.inspect_container(container) - self.check_container_data(inspect_data, False) - - def check_container_data(self, inspect_data, rw): - if docker.utils.compare_version('1.20', self.client._version) < 0: - self.assertIn('Volumes', inspect_data) - self.assertIn(self.mount_dest, inspect_data['Volumes']) - self.assertEqual( - self.mount_origin, inspect_data['Volumes'][self.mount_dest] - ) - self.assertIn(self.mount_dest, inspect_data['VolumesRW']) - self.assertFalse(inspect_data['VolumesRW'][self.mount_dest]) - else: - self.assertIn('Mounts', inspect_data) - filtered = list(filter( - lambda x: x['Destination'] == self.mount_dest, - inspect_data['Mounts'] - )) - self.assertEqual(len(filtered), 1) - mount_data = filtered[0] - self.assertEqual(mount_data['Source'], self.mount_origin) - self.assertEqual(mount_data['RW'], rw) - - -@requires_api_version('1.20') -class CreateContainerWithGroupAddTest(BaseTestCase): - def test_group_id_ints(self): - container = self.client.create_container( - BUSYBOX, 'id -G', - host_config=self.client.create_host_config(group_add=[1000, 1001]) - ) - self.tmp_containers.append(container) - self.client.start(container) - self.client.wait(container) - - logs = self.client.logs(container) - if six.PY3: - logs = logs.decode('utf-8') - groups = logs.strip().split(' ') - self.assertIn('1000', groups) - self.assertIn('1001', groups) - - def test_group_id_strings(self): - container = self.client.create_container( - BUSYBOX, 'id -G', host_config=self.client.create_host_config( - group_add=['1000', '1001'] - ) - ) - self.tmp_containers.append(container) - self.client.start(container) - self.client.wait(container) - - logs = self.client.logs(container) - if six.PY3: - logs = logs.decode('utf-8') - - groups = logs.strip().split(' ') - self.assertIn('1000', groups) - self.assertIn('1001', groups) - - -class CreateContainerWithLogConfigTest(BaseTestCase): - def test_valid_log_driver_and_log_opt(self): - log_config = docker.utils.LogConfig( - type='json-file', - config={'max-file': '100'} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], log_config.type) - self.assertEqual(container_log_config['Config'], log_config.config) - - def test_invalid_log_driver_raises_exception(self): - log_config = docker.utils.LogConfig( - type='asdf-nope', - config={} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - - expected_msg = "logger: no log driver named 'asdf-nope' is registered" - - with pytest.raises(APIError) as excinfo: - # raises an internal server error 500 - self.client.start(container) - - assert expected_msg in str(excinfo.value) - - @pytest.mark.skipif(True, - reason="https://github.com/docker/docker/issues/15633") - def test_valid_no_log_driver_specified(self): - log_config = docker.utils.LogConfig( - type="", - config={'max-file': '100'} - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], "json-file") - self.assertEqual(container_log_config['Config'], log_config.config) - - def test_valid_no_config_specified(self): - log_config = docker.utils.LogConfig( - type="json-file", - config=None - ) - - container = self.client.create_container( - BUSYBOX, ['true'], - host_config=self.client.create_host_config(log_config=log_config) - ) - self.tmp_containers.append(container['Id']) - self.client.start(container) - - info = self.client.inspect_container(container) - container_log_config = info['HostConfig']['LogConfig'] - - self.assertEqual(container_log_config['Type'], "json-file") - self.assertEqual(container_log_config['Config'], {}) - - -@requires_api_version('1.20') -class GetArchiveTest(BaseTestCase): - def test_get_file_archive_from_container(self): - data = 'The Maid and the Pocket Watch of Blood' - ctnr = self.client.create_container( - BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.client.wait(ctnr) - with tempfile.NamedTemporaryFile() as destination: - strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') - for d in strm: - destination.write(d) - destination.seek(0) - retrieved_data = helpers.untar_file(destination, 'data.txt') - if six.PY3: - retrieved_data = retrieved_data.decode('utf-8') - self.assertEqual(data, retrieved_data.strip()) - - def test_get_file_stat_from_container(self): - data = 'The Maid and the Pocket Watch of Blood' - ctnr = self.client.create_container( - BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.client.wait(ctnr) - strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') - self.assertIn('name', stat) - self.assertEqual(stat['name'], 'data.txt') - self.assertIn('size', stat) - self.assertEqual(stat['size'], len(data)) - - -@requires_api_version('1.20') -class PutArchiveTest(BaseTestCase): - def test_copy_file_to_container(self): - data = b'Deaf To All But The Song' - with tempfile.NamedTemporaryFile() as test_file: - test_file.write(data) - test_file.seek(0) - ctnr = self.client.create_container( - BUSYBOX, - 'cat {0}'.format( - os.path.join('/vol1', os.path.basename(test_file.name)) - ), - volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - with helpers.simple_tar(test_file.name) as test_tar: - self.client.put_archive(ctnr, '/vol1', test_tar) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - data = data.decode('utf-8') - self.assertEqual(logs.strip(), data) - - def test_copy_directory_to_container(self): - files = ['a.py', 'b.py', 'foo/b.py'] - dirs = ['foo', 'bar'] - base = helpers.make_tree(dirs, files) - ctnr = self.client.create_container( - BUSYBOX, 'ls -p /vol1', volumes=['/vol1'] - ) - self.tmp_containers.append(ctnr) - with docker.utils.tar(base) as test_tar: - self.client.put_archive(ctnr, '/vol1', test_tar) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - results = logs.strip().split() - self.assertIn('a.py', results) - self.assertIn('b.py', results) - self.assertIn('foo/', results) - self.assertIn('bar/', results) - - -class TestCreateContainerReadOnlyFs(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - ctnr = self.client.create_container( - BUSYBOX, ['mkdir', '/shrine'], - host_config=self.client.create_host_config( - read_only=True, network_mode='none' - ) - ) - self.assertIn('Id', ctnr) - self.tmp_containers.append(ctnr['Id']) - self.client.start(ctnr) - res = self.client.wait(ctnr) - self.assertNotEqual(res, 0) - - -class TestCreateContainerWithName(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true', name='foobar') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Name', inspect) - self.assertEqual('/foobar', inspect['Name']) - - -class TestRenameContainer(BaseTestCase): - def runTest(self): - version = self.client.version()['Version'] - name = 'hong_meiling' - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.rename(res, name) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Name', inspect) - if version == '1.5.0': - self.assertEqual(name, inspect['Name']) - else: - self.assertEqual('/{0}'.format(name), inspect['Name']) - - -class TestStartContainer(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - - -class TestStartContainerWithDictInsteadOfId(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, 'true') - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - - -class TestCreateContainerPrivileged(BaseTestCase): - def runTest(self): - res = self.client.create_container( - BUSYBOX, 'true', host_config=self.client.create_host_config( - privileged=True, network_mode='none' - ) - ) - self.assertIn('Id', res) - self.tmp_containers.append(res['Id']) - self.client.start(res['Id']) - inspect = self.client.inspect_container(res['Id']) - self.assertIn('Config', inspect) - self.assertIn('Id', inspect) - self.assertTrue(inspect['Id'].startswith(res['Id'])) - self.assertIn('Image', inspect) - self.assertIn('State', inspect) - self.assertIn('Running', inspect['State']) - if not inspect['State']['Running']: - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], 0) - # Since Nov 2013, the Privileged flag is no longer part of the - # container's config exposed via the API (safety concerns?). - # - if 'Privileged' in inspect['Config']: - self.assertEqual(inspect['Config']['Privileged'], True) - - -class TestWait(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, ['sleep', '3']) - id = res['Id'] - self.tmp_containers.append(id) - self.client.start(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - inspect = self.client.inspect_container(id) - self.assertIn('Running', inspect['State']) - self.assertEqual(inspect['State']['Running'], False) - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], exitcode) - - -class TestWaitWithDictInsteadOfId(BaseTestCase): - def runTest(self): - res = self.client.create_container(BUSYBOX, ['sleep', '3']) - id = res['Id'] - self.tmp_containers.append(id) - self.client.start(res) - exitcode = self.client.wait(res) - self.assertEqual(exitcode, 0) - inspect = self.client.inspect_container(res) - self.assertIn('Running', inspect['State']) - self.assertEqual(inspect['State']['Running'], False) - self.assertIn('ExitCode', inspect['State']) - self.assertEqual(inspect['State']['ExitCode'], exitcode) - - -class TestLogs(BaseTestCase): - def runTest(self): - snippet = 'Flowering Nights (Sakuya Iyazoi)' - container = self.client.create_container( - BUSYBOX, 'echo {0}'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(id) - self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestLogsWithTailOption(BaseTestCase): - def runTest(self): - snippet = '''Line1 -Line2''' - container = self.client.create_container( - BUSYBOX, 'echo "{0}"'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(id, tail=1) - self.assertEqual(logs, ('Line2\n').encode(encoding='ascii')) - - -# class TestLogsStreaming(BaseTestCase): -# def runTest(self): -# snippet = 'Flowering Nights (Sakuya Iyazoi)' -# container = self.client.create_container( -# BUSYBOX, 'echo {0}'.format(snippet) -# ) -# id = container['Id'] -# self.client.start(id) -# self.tmp_containers.append(id) -# logs = bytes() if six.PY3 else str() -# for chunk in self.client.logs(id, stream=True): -# logs += chunk - -# exitcode = self.client.wait(id) -# self.assertEqual(exitcode, 0) - -# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestLogsWithDictInsteadOfId(BaseTestCase): - def runTest(self): - snippet = 'Flowering Nights (Sakuya Iyazoi)' - container = self.client.create_container( - BUSYBOX, 'echo {0}'.format(snippet) - ) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - logs = self.client.logs(container) - self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) - - -class TestDiff(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - diff = self.client.diff(id) - test_diff = [x for x in diff if x.get('Path', None) == '/test'] - self.assertEqual(len(test_diff), 1) - self.assertIn('Kind', test_diff[0]) - self.assertEqual(test_diff[0]['Kind'], 1) - - -class TestDiffWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0) - diff = self.client.diff(container) - test_diff = [x for x in diff if x.get('Path', None) == '/test'] - self.assertEqual(len(test_diff), 1) - self.assertIn('Kind', test_diff[0]) - self.assertEqual(test_diff[0]['Kind'], 1) - - -class TestStop(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.stop(id, timeout=2) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestStopWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - self.assertIn('Id', container) - id = container['Id'] - self.client.start(container) - self.tmp_containers.append(id) - self.client.stop(container, timeout=2) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKill(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKillWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(container) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - if exec_driver_is_native(): - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False) - - -class TestKillWithSignal(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '60']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - self.client.kill(id, signal=signal.SIGKILL) - exitcode = self.client.wait(id) - self.assertNotEqual(exitcode, 0) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertNotEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], False, state) - - -class TestPort(BaseTestCase): - def runTest(self): - - port_bindings = { - '1111': ('127.0.0.1', '4567'), - '2222': ('127.0.0.1', '4568') - } - - container = self.client.create_container( - BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()), - host_config=self.client.create_host_config( - port_bindings=port_bindings, network_mode='bridge' - ) - ) - id = container['Id'] - - self.client.start(container) - - # Call the port function on each biding and compare expected vs actual - for port in port_bindings: - actual_bindings = self.client.port(container, port) - port_binding = actual_bindings.pop() - - ip, host_port = port_binding['HostIp'], port_binding['HostPort'] - - self.assertEqual(ip, port_bindings[port][0]) - self.assertEqual(host_port, port_bindings[port][1]) - - self.client.kill(id) - - -class TestMacAddress(BaseTestCase): - def runTest(self): - mac_address_expected = "02:42:ac:11:00:0a" - container = self.client.create_container( - BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected) - - id = container['Id'] - - self.client.start(container) - res = self.client.inspect_container(container['Id']) - self.assertEqual(mac_address_expected, - res['NetworkSettings']['MacAddress']) - - self.client.kill(id) - - -class TestContainerTop(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '60']) - - id = container['Id'] - - self.client.start(container) - res = self.client.top(container['Id']) - self.assertEqual( - res['Titles'], - ['UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD'] - ) - self.assertEqual(len(res['Processes']), 1) - self.assertEqual(res['Processes'][0][7], 'sleep 60') - self.client.kill(id) - - -class TestContainerTopWithPsArgs(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '60']) - - id = container['Id'] - - self.client.start(container) - res = self.client.top(container['Id'], 'waux') - self.assertEqual( - res['Titles'], - ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS', - 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'], - ) - self.assertEqual(len(res['Processes']), 1) - self.assertEqual(res['Processes'][0][10], 'sleep 60') - self.client.kill(id) - - -class TestRestart(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - info = self.client.inspect_container(id) - self.assertIn('State', info) - self.assertIn('StartedAt', info['State']) - start_time1 = info['State']['StartedAt'] - self.client.restart(id, timeout=2) - info2 = self.client.inspect_container(id) - self.assertIn('State', info2) - self.assertIn('StartedAt', info2['State']) - start_time2 = info2['State']['StartedAt'] - self.assertNotEqual(start_time1, start_time2) - self.assertIn('Running', info2['State']) - self.assertEqual(info2['State']['Running'], True) - self.client.kill(id) - - -class TestRestartWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - self.assertIn('Id', container) - id = container['Id'] - self.client.start(container) - self.tmp_containers.append(id) - info = self.client.inspect_container(id) - self.assertIn('State', info) - self.assertIn('StartedAt', info['State']) - start_time1 = info['State']['StartedAt'] - self.client.restart(container, timeout=2) - info2 = self.client.inspect_container(id) - self.assertIn('State', info2) - self.assertIn('StartedAt', info2['State']) - start_time2 = info2['State']['StartedAt'] - self.assertNotEqual(start_time1, start_time2) - self.assertIn('Running', info2['State']) - self.assertEqual(info2['State']['Running'], True) - self.client.kill(id) - - -class TestRemoveContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['true']) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - self.client.remove_container(id) - containers = self.client.containers(all=True) - res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] - self.assertEqual(len(res), 0) - - -class TestRemoveContainerWithDictInsteadOfId(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['true']) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - self.client.remove_container(container) - containers = self.client.containers(all=True) - res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] - self.assertEqual(len(res), 0) - - -class TestCreateContainerWithVolumesFrom(BaseTestCase): - def runTest(self): - vol_names = ['foobar_vol0', 'foobar_vol1'] - - res0 = self.client.create_container( - BUSYBOX, 'true', name=vol_names[0] - ) - container1_id = res0['Id'] - self.tmp_containers.append(container1_id) - self.client.start(container1_id) - - res1 = self.client.create_container( - BUSYBOX, 'true', name=vol_names[1] - ) - container2_id = res1['Id'] - self.tmp_containers.append(container2_id) - self.client.start(container2_id) - with self.assertRaises(docker.errors.DockerException): - self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True, - volumes_from=vol_names - ) - res2 = self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True, - host_config=self.client.create_host_config( - volumes_from=vol_names, network_mode='none' - ) - ) - container3_id = res2['Id'] - self.tmp_containers.append(container3_id) - self.client.start(container3_id) - - info = self.client.inspect_container(res2['Id']) - self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names) - - -class TestCreateContainerWithLinks(BaseTestCase): - def runTest(self): - res0 = self.client.create_container( - BUSYBOX, 'cat', - detach=True, stdin_open=True, - environment={'FOO': '1'}) - - container1_id = res0['Id'] - self.tmp_containers.append(container1_id) - - self.client.start(container1_id) - - res1 = self.client.create_container( - BUSYBOX, 'cat', - detach=True, stdin_open=True, - environment={'FOO': '1'}) - - container2_id = res1['Id'] - self.tmp_containers.append(container2_id) - - self.client.start(container2_id) - - # we don't want the first / - link_path1 = self.client.inspect_container(container1_id)['Name'][1:] - link_alias1 = 'mylink1' - link_env_prefix1 = link_alias1.upper() - - link_path2 = self.client.inspect_container(container2_id)['Name'][1:] - link_alias2 = 'mylink2' - link_env_prefix2 = link_alias2.upper() - - res2 = self.client.create_container( - BUSYBOX, 'env', host_config=self.client.create_host_config( - links={link_path1: link_alias1, link_path2: link_alias2}, - network_mode='none' - ) - ) - container3_id = res2['Id'] - self.tmp_containers.append(container3_id) - self.client.start(container3_id) - self.assertEqual(self.client.wait(container3_id), 0) - - logs = self.client.logs(container3_id) - if six.PY3: - logs = logs.decode('utf-8') - self.assertIn('{0}_NAME='.format(link_env_prefix1), logs) - self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs) - self.assertIn('{0}_NAME='.format(link_env_prefix2), logs) - self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs) - - -class TestRestartingContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container( - BUSYBOX, ['sleep', '2'], - host_config=self.client.create_host_config( - restart_policy={"Name": "always", "MaximumRetryCount": 0}, - network_mode='none' - ) - ) - id = container['Id'] - self.client.start(id) - self.client.wait(id) - with self.assertRaises(docker.errors.APIError) as exc: - self.client.remove_container(id) - err = exc.exception.response.text - self.assertIn( - 'You cannot remove a running container', err - ) - self.client.remove_container(id, force=True) - - -class TestExecuteCommand(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, ['echo', 'hello']) - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'hello\n') - - -class TestExecuteCommandString(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'echo hello world') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'hello world\n') - - -class TestExecuteCommandStringAsUser(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'whoami', user='default') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'default\n') - - -class TestExecuteCommandStringAsRoot(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - res = self.client.exec_create(id, 'whoami') - self.assertIn('Id', res) - - exec_log = self.client.exec_start(res) - self.assertEqual(exec_log, b'root\n') - - -class TestExecuteCommandStreaming(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - exec_id = self.client.exec_create(id, ['echo', 'hello\nworld']) - self.assertIn('Id', exec_id) - - res = b'' - for chunk in self.client.exec_start(exec_id, stream=True): - res += chunk - self.assertEqual(res, b'hello\nworld\n') - - -class TestExecInspect(BaseTestCase): - def runTest(self): - if not exec_driver_is_native(): - pytest.skip('Exec driver not native') - - container = self.client.create_container(BUSYBOX, 'cat', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - - exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist']) - self.assertIn('Id', exec_id) - self.client.exec_start(exec_id) - exec_info = self.client.exec_inspect(exec_id) - self.assertIn('ExitCode', exec_info) - self.assertNotEqual(exec_info['ExitCode'], 0) - - -class TestRunContainerStreaming(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, '/bin/sh', - detach=True, stdin_open=True) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - sock = self.client.attach_socket(container, ws=False) - self.assertTrue(sock.fileno() > -1) - - -class TestRunContainerReadingSocket(BaseTestCase): - def runTest(self): - line = 'hi there and stuff and things, words!' - command = "echo '{0}'".format(line) - container = self.client.create_container(BUSYBOX, command, - detach=True, tty=False) - ident = container['Id'] - self.tmp_containers.append(ident) - - opts = {"stdout": 1, "stream": 1, "logs": 1} - pty_stdout = self.client.attach_socket(ident, opts) - self.client.start(ident) - - recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK) - - def read(n=4096): - """Code stolen from dockerpty to read the socket""" - try: - if hasattr(pty_stdout, 'recv'): - return pty_stdout.recv(n) - return os.read(pty_stdout.fileno(), n) - except EnvironmentError as e: - if e.errno not in recoverable_errors: - raise - - def next_packet_size(): - """Code stolen from dockerpty to get the next packet size""" - data = six.binary_type() - while len(data) < 8: - next_data = read(8 - len(data)) - if not next_data: - return 0 - data = data + next_data - - if data is None: - return 0 - - if len(data) == 8: - _, actual = struct.unpack('>BxxxL', data) - return actual - - next_size = next_packet_size() - self.assertEqual(next_size, len(line)+1) - - data = six.binary_type() - while len(data) < next_size: - next_data = read(next_size - len(data)) - if not next_data: - assert False, "Failed trying to read in the dataz" - data += next_data - self.assertEqual(data.decode('utf-8'), "{0}\n".format(line)) - pty_stdout.close() - - # Prevent segfault at the end of the test run - if hasattr(pty_stdout, "_response"): - del pty_stdout._response - - -class TestPauseUnpauseContainer(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['sleep', '9999']) - id = container['Id'] - self.tmp_containers.append(id) - self.client.start(container) - self.client.pause(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], True) - self.assertIn('Paused', state) - self.assertEqual(state['Paused'], True) - - self.client.unpause(id) - container_info = self.client.inspect_container(id) - self.assertIn('State', container_info) - state = container_info['State'] - self.assertIn('ExitCode', state) - self.assertEqual(state['ExitCode'], 0) - self.assertIn('Running', state) - self.assertEqual(state['Running'], True) - self.assertIn('Paused', state) - self.assertEqual(state['Paused'], False) - - -class TestCreateContainerWithHostPidMode(BaseTestCase): - def runTest(self): - ctnr = self.client.create_container( - BUSYBOX, 'true', host_config=self.client.create_host_config( - pid_mode='host', network_mode='none' - ) - ) - self.assertIn('Id', ctnr) - self.tmp_containers.append(ctnr['Id']) - self.client.start(ctnr) - inspect = self.client.inspect_container(ctnr) - self.assertIn('HostConfig', inspect) - host_config = inspect['HostConfig'] - self.assertIn('PidMode', host_config) - self.assertEqual(host_config['PidMode'], 'host') - - -################# -# LINKS TESTS # -################# - - -class TestRemoveLink(BaseTestCase): - def runTest(self): - # Create containers - container1 = self.client.create_container( - BUSYBOX, 'cat', detach=True, stdin_open=True - ) - container1_id = container1['Id'] - self.tmp_containers.append(container1_id) - self.client.start(container1_id) - - # Create Link - # we don't want the first / - link_path = self.client.inspect_container(container1_id)['Name'][1:] - link_alias = 'mylink' - - container2 = self.client.create_container( - BUSYBOX, 'cat', host_config=self.client.create_host_config( - links={link_path: link_alias}, network_mode='none' - ) - ) - container2_id = container2['Id'] - self.tmp_containers.append(container2_id) - self.client.start(container2_id) - - # Remove link - linked_name = self.client.inspect_container(container2_id)['Name'][1:] - link_name = '%s/%s' % (linked_name, link_alias) - self.client.remove_container(link_name, link=True) - - # Link is gone - containers = self.client.containers(all=True) - retrieved = [x for x in containers if link_name in x['Names']] - self.assertEqual(len(retrieved), 0) - - # Containers are still there - retrieved = [ - x for x in containers if x['Id'].startswith(container1_id) or - x['Id'].startswith(container2_id) - ] - self.assertEqual(len(retrieved), 2) - -################## -# IMAGES TESTS # -################## - - -class TestPull(BaseTestCase): - def runTest(self): - try: - self.client.remove_image('hello-world') - except docker.errors.APIError: - pass - res = self.client.pull('hello-world') - self.tmp_imgs.append('hello-world') - self.assertEqual(type(res), six.text_type) - self.assertGreaterEqual( - len(self.client.images('hello-world')), 1 - ) - img_info = self.client.inspect_image('hello-world') - self.assertIn('Id', img_info) - - -class TestPullStream(BaseTestCase): - def runTest(self): - try: - self.client.remove_image('hello-world') - except docker.errors.APIError: - pass - stream = self.client.pull('hello-world', stream=True) - self.tmp_imgs.append('hello-world') - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - json.loads(chunk) # ensure chunk is a single, valid JSON blob - self.assertGreaterEqual( - len(self.client.images('hello-world')), 1 - ) - img_info = self.client.inspect_image('hello-world') - self.assertIn('Id', img_info) - - -class TestCommit(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - res = self.client.commit(id) - self.assertIn('Id', res) - img_id = res['Id'] - self.tmp_imgs.append(img_id) - img = self.client.inspect_image(img_id) - self.assertIn('Container', img) - self.assertTrue(img['Container'].startswith(id)) - self.assertIn('ContainerConfig', img) - self.assertIn('Image', img['ContainerConfig']) - self.assertEqual(BUSYBOX, img['ContainerConfig']['Image']) - busybox_id = self.client.inspect_image(BUSYBOX)['Id'] - self.assertIn('Parent', img) - self.assertEqual(img['Parent'], busybox_id) - - -class TestRemoveImage(BaseTestCase): - def runTest(self): - container = self.client.create_container(BUSYBOX, ['touch', '/test']) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - res = self.client.commit(id) - self.assertIn('Id', res) - img_id = res['Id'] - self.tmp_imgs.append(img_id) - self.client.remove_image(img_id, force=True) - images = self.client.images(all=True) - res = [x for x in images if x['Id'].startswith(img_id)] - self.assertEqual(len(res), 0) - - -################## -# IMPORT TESTS # -################## - - -class ImportTestCase(BaseTestCase): - '''Base class for `docker import` test cases.''' - - TAR_SIZE = 512 * 1024 - - def write_dummy_tar_content(self, n_bytes, tar_fd): - def extend_file(f, n_bytes): - f.seek(n_bytes - 1) - f.write(bytearray([65])) - f.seek(0) - - tar = tarfile.TarFile(fileobj=tar_fd, mode='w') - - with tempfile.NamedTemporaryFile() as f: - extend_file(f, n_bytes) - tarinfo = tar.gettarinfo(name=f.name, arcname='testdata') - tar.addfile(tarinfo, fileobj=f) - - tar.close() - - @contextlib.contextmanager - def dummy_tar_stream(self, n_bytes): - '''Yields a stream that is valid tar data of size n_bytes.''' - with tempfile.NamedTemporaryFile() as tar_file: - self.write_dummy_tar_content(n_bytes, tar_file) - tar_file.seek(0) - yield tar_file - - @contextlib.contextmanager - def dummy_tar_file(self, n_bytes): - '''Yields the name of a valid tar file of size n_bytes.''' - with tempfile.NamedTemporaryFile() as tar_file: - self.write_dummy_tar_content(n_bytes, tar_file) - tar_file.seek(0) - yield tar_file.name - - -class TestImportFromBytes(ImportTestCase): - '''Tests importing an image from in-memory byte data.''' - - def runTest(self): - with self.dummy_tar_stream(n_bytes=500) as f: - content = f.read() - - # The generic import_image() function cannot import in-memory bytes - # data that happens to be represented as a string type, because - # import_image() will try to use it as a filename and usually then - # trigger an exception. So we test the import_image_from_data() - # function instead. - statuses = self.client.import_image_from_data( - content, repository='test/import-from-bytes') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromFile(ImportTestCase): - '''Tests importing an image from a tar file on disk.''' - - def runTest(self): - with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename: - # statuses = self.client.import_image( - # src=tar_filename, repository='test/import-from-file') - statuses = self.client.import_image_from_file( - tar_filename, repository='test/import-from-file') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromStream(ImportTestCase): - '''Tests importing an image from a stream containing tar data.''' - - def runTest(self): - with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream: - statuses = self.client.import_image( - src=tar_stream, repository='test/import-from-stream') - # statuses = self.client.import_image_from_stream( - # tar_stream, repository='test/import-from-stream') - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -class TestImportFromURL(ImportTestCase): - '''Tests downloading an image over HTTP.''' - - @contextlib.contextmanager - def temporary_http_file_server(self, stream): - '''Serve data from an IO stream over HTTP.''' - - class Handler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - self.send_response(200) - self.send_header('Content-Type', 'application/x-tar') - self.end_headers() - shutil.copyfileobj(stream, self.wfile) - - server = socketserver.TCPServer(('', 0), Handler) - thread = threading.Thread(target=server.serve_forever) - thread.setDaemon(True) - thread.start() - - yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1]) - - server.shutdown() - - @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME") - def runTest(self): - # The crappy test HTTP server doesn't handle large files well, so use - # a small file. - TAR_SIZE = 10240 - - with self.dummy_tar_stream(n_bytes=TAR_SIZE) as tar_data: - with self.temporary_http_file_server(tar_data) as url: - statuses = self.client.import_image( - src=url, repository='test/import-from-url') - - result_text = statuses.splitlines()[-1] - result = json.loads(result_text) - - self.assertNotIn('error', result) - - self.assertIn('status', result) - img_id = result['status'] - self.tmp_imgs.append(img_id) - - -################# -# VOLUMES TESTS # -################# - -@requires_api_version('1.21') -class TestVolumes(BaseTestCase): - def test_create_volume(self): - name = 'perfectcherryblossom' - self.tmp_volumes.append(name) - result = self.client.create_volume(name) - self.assertIn('Name', result) - self.assertEqual(result['Name'], name) - self.assertIn('Driver', result) - self.assertEqual(result['Driver'], 'local') - - def test_create_volume_invalid_driver(self): - driver_name = 'invalid.driver' - - with pytest.raises(docker.errors.NotFound): - self.client.create_volume('perfectcherryblossom', driver_name) - - def test_list_volumes(self): - name = 'imperishablenight' - self.tmp_volumes.append(name) - volume_info = self.client.create_volume(name) - result = self.client.volumes() - self.assertIn('Volumes', result) - volumes = result['Volumes'] - self.assertIn(volume_info, volumes) - - def test_inspect_volume(self): - name = 'embodimentofscarletdevil' - self.tmp_volumes.append(name) - volume_info = self.client.create_volume(name) - result = self.client.inspect_volume(name) - self.assertEqual(volume_info, result) - - def test_inspect_nonexistent_volume(self): - name = 'embodimentofscarletdevil' - with pytest.raises(docker.errors.NotFound): - self.client.inspect_volume(name) - - def test_remove_volume(self): - name = 'shootthebullet' - self.tmp_volumes.append(name) - self.client.create_volume(name) - result = self.client.remove_volume(name) - self.assertTrue(result) - - def test_remove_nonexistent_volume(self): - name = 'shootthebullet' - with pytest.raises(docker.errors.NotFound): - self.client.remove_volume(name) - - -################# -# BUILDER TESTS # -################# - -class TestBuildStream(BaseTestCase): - def runTest(self): - script = io.BytesIO('\n'.join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ]).encode('ascii')) - stream = self.client.build(fileobj=script, stream=True) - logs = '' - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - json.loads(chunk) # ensure chunk is a single, valid JSON blob - logs += chunk - self.assertNotEqual(logs, '') - - -class TestBuildFromStringIO(BaseTestCase): - def runTest(self): - if six.PY3: - return - script = io.StringIO(six.text_type('\n').join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'RUN mkdir -p /tmp/test', - 'EXPOSE 8080', - 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz' - ' /tmp/silence.tar.gz' - ])) - stream = self.client.build(fileobj=script, stream=True) - logs = '' - for chunk in stream: - if six.PY3: - chunk = chunk.decode('utf-8') - logs += chunk - self.assertNotEqual(logs, '') - - -@requires_api_version('1.8') -class TestBuildWithDockerignore(Cleanup, BaseTestCase): - def runTest(self): - base_dir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, base_dir) - - with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f: - f.write("\n".join([ - 'FROM busybox', - 'MAINTAINER docker-py', - 'ADD . /test', - ])) - - with open(os.path.join(base_dir, '.dockerignore'), 'w') as f: - f.write("\n".join([ - 'ignored', - 'Dockerfile', - '.dockerignore', - '', # empty line - ])) - - with open(os.path.join(base_dir, 'not-ignored'), 'w') as f: - f.write("this file should not be ignored") - - subdir = os.path.join(base_dir, 'ignored', 'subdir') - os.makedirs(subdir) - with open(os.path.join(subdir, 'file'), 'w') as f: - f.write("this file should be ignored") - - tag = 'docker-py-test-build-with-dockerignore' - stream = self.client.build( - path=base_dir, - tag=tag, - ) - for chunk in stream: - pass - - c = self.client.create_container(tag, ['ls', '-1A', '/test']) - self.client.start(c) - self.client.wait(c) - logs = self.client.logs(c) - - if six.PY3: - logs = logs.decode('utf-8') - - self.assertEqual( - list(filter(None, logs.split('\n'))), - ['not-ignored'], - ) - - -####################### -# NETWORK TESTS # -####################### - - -@requires_api_version('1.21') -class TestNetworks(BaseTestCase): - def create_network(self, *args, **kwargs): - net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] - net_id = self.client.create_network(net_name, *args, **kwargs)['id'] - self.tmp_networks.append(net_id) - return (net_name, net_id) - - def test_list_networks(self): - networks = self.client.networks() - initial_size = len(networks) - - net_name, net_id = self.create_network() - - networks = self.client.networks() - self.assertEqual(len(networks), initial_size + 1) - self.assertTrue(net_id in [n['id'] for n in networks]) - - networks_by_name = self.client.networks(names=[net_name]) - self.assertEqual([n['id'] for n in networks_by_name], [net_id]) - - networks_by_partial_id = self.client.networks(ids=[net_id[:8]]) - self.assertEqual([n['id'] for n in networks_by_partial_id], [net_id]) - - def test_inspect_network(self): - net_name, net_id = self.create_network() - - net = self.client.inspect_network(net_id) - self.assertEqual(net, { - u'name': net_name, - u'id': net_id, - u'driver': 'bridge', - u'containers': {}, - }) - - def test_create_network_with_host_driver_fails(self): - net_name = 'dockerpy{}'.format(random.getrandbits(24))[:14] - - with pytest.raises(APIError): - self.client.create_network(net_name, driver='host') - - def test_remove_network(self): - initial_size = len(self.client.networks()) - - net_name, net_id = self.create_network() - self.assertEqual(len(self.client.networks()), initial_size + 1) - - self.client.remove_network(net_id) - self.assertEqual(len(self.client.networks()), initial_size) - - def test_connect_and_disconnect_container(self): - net_name, net_id = self.create_network() - - container = self.client.create_container('busybox', 'top') - self.tmp_containers.append(container) - self.client.start(container) - - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - self.client.connect_container_to_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertEqual( - list(network_data['containers'].keys()), - [container['Id']]) - - self.client.disconnect_container_from_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - def test_connect_on_container_create(self): - net_name, net_id = self.create_network() - - container = self.client.create_container( - image='busybox', - command='top', - host_config=self.client.create_host_config(network_mode=net_name), - ) - self.tmp_containers.append(container) - self.client.start(container) - - network_data = self.client.inspect_network(net_id) - self.assertEqual( - list(network_data['containers'].keys()), - [container['Id']]) - - self.client.disconnect_container_from_network(container, net_id) - network_data = self.client.inspect_network(net_id) - self.assertFalse(network_data.get('containers')) - - -####################### -# PY SPECIFIC TESTS # -####################### - - -class TestRunShlex(BaseTestCase): - def runTest(self): - commands = [ - 'true', - 'echo "The Young Descendant of Tepes & Septette for the ' - 'Dead Princess"', - 'echo -n "The Young Descendant of Tepes & Septette for the ' - 'Dead Princess"', - '/bin/sh -c "echo Hello World"', - '/bin/sh -c \'echo "Hello World"\'', - 'echo "\"Night of Nights\""', - 'true && echo "Night of Nights"' - ] - for cmd in commands: - container = self.client.create_container(BUSYBOX, cmd) - id = container['Id'] - self.client.start(id) - self.tmp_containers.append(id) - exitcode = self.client.wait(id) - self.assertEqual(exitcode, 0, msg=cmd) - - -class TestLoadConfig(BaseTestCase): - def runTest(self): - folder = tempfile.mkdtemp() - self.tmp_folders.append(folder) - cfg_path = os.path.join(folder, '.dockercfg') - f = open(cfg_path, 'w') - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - f.write('auth = {0}\n'.format(auth_)) - f.write('email = sakuya@scarlet.net') - f.close() - cfg = docker.auth.load_config(cfg_path) - self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None) - cfg = cfg[docker.auth.INDEX_NAME] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('Auth'), None) - - -class TestLoadJSONConfig(BaseTestCase): - def runTest(self): - folder = tempfile.mkdtemp() - self.tmp_folders.append(folder) - cfg_path = os.path.join(folder, '.dockercfg') - f = open(os.path.join(folder, '.dockercfg'), 'w') - auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') - email_ = 'sakuya@scarlet.net' - f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format( - docker.auth.INDEX_URL, auth_, email_)) - f.close() - cfg = docker.auth.load_config(cfg_path) - self.assertNotEqual(cfg[docker.auth.INDEX_URL], None) - cfg = cfg[docker.auth.INDEX_URL] - self.assertEqual(cfg['username'], 'sakuya') - self.assertEqual(cfg['password'], 'izayoi') - self.assertEqual(cfg['email'], 'sakuya@scarlet.net') - self.assertEqual(cfg.get('Auth'), None) - - -class TestAutoDetectVersion(unittest.TestCase): - def test_client_init(self): - client = docker_client(version='auto') - client_version = client._version - api_version = client.version(api_version=False)['ApiVersion'] - self.assertEqual(client_version, api_version) - api_version_2 = client.version()['ApiVersion'] - self.assertEqual(client_version, api_version_2) - client.close() - - def test_auto_client(self): - client = docker.AutoVersionClient(**docker_client_kwargs()) - client_version = client._version - api_version = client.version(api_version=False)['ApiVersion'] - self.assertEqual(client_version, api_version) - api_version_2 = client.version()['ApiVersion'] - self.assertEqual(client_version, api_version_2) - client.close() - with self.assertRaises(docker.errors.DockerException): - docker.AutoVersionClient(**docker_client_kwargs(version='1.11')) - - -class TestConnectionTimeout(unittest.TestCase): - def setUp(self): - self.timeout = 0.5 - self.client = docker.client.Client(base_url='http://192.168.10.2:4243', - timeout=self.timeout) - - def runTest(self): - start = time.time() - res = None - # This call isn't supposed to complete, and it should fail fast. - try: - res = self.client.inspect_container('id') - except: - pass - end = time.time() - self.assertTrue(res is None) - self.assertTrue(end - start < 2 * self.timeout) - - -class UnixconnTestCase(unittest.TestCase): - """ - Test UNIX socket connection adapter. - """ - - def test_resource_warnings(self): - """ - Test no warnings are produced when using the client. - """ - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - - client = docker_client() - client.images() - client.close() - del client - - assert len(w) == 0, \ - "No warnings produced: {0}".format(w[0].message) - - -#################### -# REGRESSION TESTS # -#################### - -class TestRegressions(BaseTestCase): - def test_443(self): - dfile = io.BytesIO() - with self.assertRaises(docker.errors.APIError) as exc: - for line in self.client.build(fileobj=dfile, tag="a/b/c"): - pass - self.assertEqual(exc.exception.response.status_code, 500) - dfile.close() - - def test_542(self): - self.client.start( - self.client.create_container(BUSYBOX, ['true']) - ) - result = self.client.containers(all=True, trunc=True) - self.assertEqual(len(result[0]['Id']), 12) - - def test_647(self): - with self.assertRaises(docker.errors.APIError): - self.client.inspect_image('gensokyo.jp//kirisame') - - def test_649(self): - self.client.timeout = None - ctnr = self.client.create_container(BUSYBOX, ['sleep', '2']) - self.client.start(ctnr) - self.client.stop(ctnr) - - def test_715(self): - ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000) - self.client.start(ctnr) - self.client.wait(ctnr) - logs = self.client.logs(ctnr) - if six.PY3: - logs = logs.decode('utf-8') - assert logs == '1000\n' - - def test_792_explicit_port_protocol(self): - - tcp_port, udp_port = random.sample(range(9999, 32000), 2) - ctnr = self.client.create_container( - BUSYBOX, ['sleep', '9999'], ports=[2000, (2000, 'udp')], - host_config=self.client.create_host_config( - port_bindings={'2000/tcp': tcp_port, '2000/udp': udp_port} - ) - ) - self.tmp_containers.append(ctnr) - self.client.start(ctnr) - self.assertEqual( - self.client.port(ctnr, 2000)[0]['HostPort'], - six.text_type(tcp_port) - ) - self.assertEqual( - self.client.port(ctnr, '2000/tcp')[0]['HostPort'], - six.text_type(tcp_port) - ) - self.assertEqual( - self.client.port(ctnr, '2000/udp')[0]['HostPort'], - six.text_type(udp_port) - ) +# FIXME: placeholder while we transition to the new folder architecture +# Remove when merged in master and Jenkins is updated to find the tests +# in the new location. +from .integration import * # flake8: noqa |