import functools import os import os.path import random import re import socket import tarfile import tempfile import time import docker import paramiko import pytest def make_tree(dirs, files): base = tempfile.mkdtemp() for path in dirs: os.makedirs(os.path.join(base, path)) for path in files: with open(os.path.join(base, path), 'w') as f: f.write("content") return base def simple_tar(path): f = tempfile.NamedTemporaryFile() t = tarfile.open(mode='w', fileobj=f) abs_path = os.path.abspath(path) t.add(abs_path, arcname=os.path.basename(path), recursive=False) t.close() f.seek(0) return f def untar_file(tardata, filename): with tarfile.open(mode='r', fileobj=tardata) as t: f = t.extractfile(filename) result = f.read() f.close() return result def requires_api_version(version): test_version = os.environ.get( 'DOCKER_TEST_API_VERSION', docker.constants.DEFAULT_DOCKER_API_VERSION ) return pytest.mark.skipif( docker.utils.version_lt(test_version, version), reason=f"API version is too low (< {version})" ) def requires_experimental(until=None): test_version = os.environ.get( 'DOCKER_TEST_API_VERSION', docker.constants.DEFAULT_DOCKER_API_VERSION ) def req_exp(f): @functools.wraps(f) def wrapped(self, *args, **kwargs): if not self.client.info()['ExperimentalBuild']: pytest.skip('Feature requires Docker Engine experimental mode') return f(self, *args, **kwargs) if until and docker.utils.version_gte(test_version, until): return f return wrapped return req_exp def wait_on_condition(condition, delay=0.1, timeout=40): start_time = time.time() while not condition(): if time.time() - start_time > timeout: raise AssertionError("Timeout: %s" % condition) time.sleep(delay) def random_name(): return f'dockerpytest_{random.getrandbits(64):x}' def force_leave_swarm(client): """Actually force leave a Swarm. There seems to be a bug in Swarm that occasionally throws "context deadline exceeded" errors when leaving.""" while True: try: if isinstance(client, docker.DockerClient): return client.swarm.leave(force=True) return client.leave_swarm(force=True) # elif APIClient except docker.errors.APIError as e: if e.explanation == "context deadline exceeded": continue else: return def swarm_listen_addr(): return f'0.0.0.0:{random.randrange(10000, 25000)}' def assert_cat_socket_detached_with_keys(sock, inputs): if hasattr(sock, '_sock'): sock = sock._sock for i in inputs: sock.sendall(i) time.sleep(0.5) # If we're using a Unix socket, the sock.send call will fail with a # BrokenPipeError ; INET sockets will just stop receiving / sending data # but will not raise an error if isinstance(sock, paramiko.Channel): with pytest.raises(OSError): sock.sendall(b'make sure the socket is closed\n') else: if getattr(sock, 'family', -9) == getattr(socket, 'AF_UNIX', -1): # We do not want to use pytest.raises here because future versions # of the daemon no longer cause this to raise an error. try: sock.sendall(b'make sure the socket is closed\n') except OSError: return sock.sendall(b"make sure the socket is closed\n") data = sock.recv(128) # New in 18.06: error message is broadcast over the socket when reading # after detach assert data == b'' or data.startswith( b'exec attach failed: error on attach stdin: read escape sequence' ) def ctrl_with(char): if re.match('[a-z]', char): return chr(ord(char) - ord('a') + 1).encode('ascii') else: raise Exception('char must be [a-z]')