path: root/qpid/cpp/src/tests/
diff options
Diffstat (limited to 'qpid/cpp/src/tests/')
1 files changed, 543 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ b/qpid/cpp/src/tests/
new file mode 100644
index 0000000000..74a0f6d0b3
--- /dev/null
+++ b/qpid/cpp/src/tests/
@@ -0,0 +1,543 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import print_function
+import atexit as _atexit
+import codecs as _codecs
+import fnmatch as _fnmatch
+import getpass as _getpass
+import os as _os
+import random as _random
+import re as _re
+import shutil as _shutil
+import subprocess as _subprocess
+import sys as _sys
+import tarfile as _tarfile
+import tempfile as _tempfile
+import traceback as _traceback
+# See documentation at
+def fail(message, *args):
+ error(message, *args)
+ if isinstance(message, BaseException):
+ raise message
+ raise Exception(message)
+def error(message, *args):
+ _print_message("Error", message, args, _sys.stderr)
+def warn(message, *args):
+ _print_message("Warn", message, args, _sys.stderr)
+def notice(message, *args):
+ _print_message(None, message, args, _sys.stdout)
+def debug(message, *args):
+ _print_message("Debug", message, args, _sys.stdout)
+def exit(message=None, *args):
+ if message is None:
+ _sys.exit()
+ _print_message("Error", message, args, _sys.stderr)
+ _sys.exit(1)
+def _print_message(category, message, args, file):
+ message = _format_message(category, message, args)
+ print(message, file=file)
+ file.flush()
+def _format_message(category, message, args):
+ if isinstance(message, BaseException):
+ message = str(message)
+ if message == "":
+ message = message.__class__.__name__
+ if category:
+ message = "{}: {}".format(category, message)
+ if args:
+ message = message.format(*args)
+ script = split(_sys.argv[0])[1]
+ message = "{}: {}".format(script, message)
+ return message
+def flush():
+ _sys.stdout.flush()
+ _sys.stderr.flush()
+absolute_path = _os.path.abspath
+normalize_path = _os.path.normpath
+exists = _os.path.exists
+is_absolute = _os.path.isabs
+is_dir = _os.path.isdir
+is_file = _os.path.isfile
+is_link = _os.path.islink
+join = _os.path.join
+split = _os.path.split
+split_extension = _os.path.splitext
+LINE_SEP = _os.linesep
+PATH_SEP = _os.sep
+PATH_VAR_SEP = _os.pathsep
+ENV = _os.environ
+ARGS = _sys.argv
+current_dir = _os.getcwd
+def home_dir(user=""):
+ return _os.path.expanduser("~{}".format(user))
+def parent_dir(path):
+ path = normalize_path(path)
+ parent, child = split(path)
+ return parent
+def file_name(file):
+ file = normalize_path(file)
+ dir, name = split(file)
+ return name
+def name_stem(file):
+ name = file_name(file)
+ if name.endswith(".tar.gz"):
+ name = name[:-3]
+ stem, ext = split_extension(name)
+ return stem
+def name_extension(file):
+ name = file_name(file)
+ stem, ext = split_extension(name)
+ return ext
+def read(file):
+ with, encoding="utf-8", mode="r") as f:
+ return
+def write(file, string):
+ with, encoding="utf-8", mode="w") as f:
+ f.write(string)
+ return file
+def append(file, string):
+ with, encoding="utf-8", mode="a") as f:
+ f.write(string)
+ return file
+def prepend(file, string):
+ orig = read(file)
+ prepended = string + orig
+ return write(file, prepended)
+def touch(file):
+ return append(file, "")
+def tail(file, n):
+ return "".join(tail_lines(file, n))
+def read_lines(file):
+ with, encoding="utf-8", mode="r") as f:
+ return f.readlines()
+def write_lines(file, lines):
+ with, encoding="utf-8", mode="r") as f:
+ f.writelines(lines)
+ return file
+def append_lines(file, lines):
+ with, encoding="utf-8", mode="a") as f:
+ f.writelines(string)
+ return file
+def prepend_lines(file, lines):
+ orig_lines = read_lines(file)
+ with, encoding="utf-8", mode="w") as f:
+ f.writelines(lines)
+ f.writelines(orig_lines)
+ return file
+# Derived from
+def tail_lines(file, n):
+ assert n >= 0
+ with, encoding="utf-8", mode="r") as f:
+ pos = n + 1
+ lines = list()
+ while len(lines) <= n:
+ try:
+, 2)
+ except IOError:
+ break
+ finally:
+ lines = f.readlines()
+ pos *= 2
+ return lines[-n:]
+_temp_dir = _tempfile.mkdtemp(prefix="plano.")
+def _get_temp_file(key):
+ assert not key.startswith("_")
+ return join(_temp_dir, "_file_{}".format(key))
+def _remove_temp_dir():
+ _shutil.rmtree(_temp_dir, ignore_errors=True)
+def read_temp(key):
+ file = _get_temp_file(key)
+ return read(file)
+def write_temp(key, string):
+ file = _get_temp_file(key)
+ return write(file, string)
+def append_temp(key, string):
+ file = _get_temp_file(key)
+ return append(file, string)
+def prepend_temp(key, string):
+ file = _get_temp_file(key)
+ return prepend(file, string)
+def make_temp(key):
+ return append_temp(key, "")
+def open_temp(key, mode="r"):
+ file = _get_temp_file(key)
+ return, encoding="utf-8", mode=mode)
+# This one is deleted on process exit
+def make_temp_dir():
+ return _tempfile.mkdtemp(prefix="_dir_", dir=_temp_dir)
+# This one sticks around
+def make_user_temp_dir():
+ temp_dir = _tempfile.gettempdir()
+ user = _getpass.getuser()
+ user_temp_dir = join(temp_dir, user)
+ return make_dir(user_temp_dir)
+def copy(from_path, to_path):
+ notice("Copying '{}' to '{}'", from_path, to_path)
+ to_dir = parent_dir(to_path)
+ if to_dir:
+ make_dir(to_dir)
+ if is_dir(from_path):
+ _copytree(from_path, to_path, symlinks=True)
+ else:
+ _shutil.copy(from_path, to_path)
+ return to_path
+def move(from_path, to_path):
+ notice("Moving '{}' to '{}'", from_path, to_path)
+ _shutil.move(from_path, to_path)
+ return to_path
+def rename(path, expr, replacement):
+ path = normalize_path(path)
+ parent_dir, name = split(path)
+ to_name = string_replace(name, expr, replacement)
+ to_path = join(parent_dir, to_name)
+ notice("Renaming '{}' to '{}'", path, to_path)
+ move(path, to_path)
+ return to_path
+def remove(path):
+ notice("Removing '{}'", path)
+ if not exists(path):
+ return
+ if is_dir(path):
+ _shutil.rmtree(path, ignore_errors=True)
+ else:
+ _os.remove(path)
+ return path
+def make_link(source_path, link_file):
+ if exists(link_file):
+ assert read_link(link_file) == source_path
+ return
+ _os.symlink(source_path, link_file)
+ return link_file
+def read_link(file):
+ return _os.readlink(file)
+def find(dir, *patterns):
+ matched_paths = set()
+ if not patterns:
+ patterns = ("*",)
+ for root, dirs, files in _os.walk(dir):
+ for pattern in patterns:
+ matched_dirs = _fnmatch.filter(dirs, pattern)
+ matched_files = _fnmatch.filter(files, pattern)
+ matched_paths.update([join(root, x) for x in matched_dirs])
+ matched_paths.update([join(root, x) for x in matched_files])
+ return sorted(matched_paths)
+def find_any_one(dir, *patterns):
+ paths = find(dir, *patterns)
+ if len(paths) == 0:
+ return
+ return paths[0]
+def find_only_one(dir, *patterns):
+ paths = find(dir, *patterns)
+ if len(paths) == 0:
+ return
+ assert len(paths) == 1
+ return paths[0]
+# find_via_expr?
+def string_replace(string, expr, replacement, count=0):
+ return _re.sub(expr, replacement, string, count)
+def make_dir(dir):
+ if not exists(dir):
+ _os.makedirs(dir)
+ return dir
+# Returns the current working directory so you can change it back
+def change_dir(dir):
+ notice("Changing directory to '{}'", dir)
+ cwd = current_dir()
+ _os.chdir(dir)
+ return cwd
+def list_dir(dir, *patterns):
+ assert is_dir(dir)
+ names = _os.listdir(dir)
+ if not patterns:
+ return sorted(names)
+ matched_names = set()
+ for pattern in patterns:
+ matched_names.update(_fnmatch.filter(names, pattern))
+ return sorted(matched_names)
+class working_dir(object):
+ def __init__(self, dir):
+ self.dir = dir
+ self.prev_dir = None
+ def __enter__(self):
+ self.prev_dir = change_dir(self.dir)
+ return self.dir
+ def __exit__(self, type, value, traceback):
+ change_dir(self.prev_dir)
+def _init_call(command, args, kwargs):
+ if args:
+ command = command.format(*args)
+ if "shell" not in kwargs:
+ kwargs["shell"] = True
+ notice("Calling '{}'", command)
+ return command, kwargs
+def call(command, *args, **kwargs):
+ command, args = _init_call(command, args, kwargs)
+ _subprocess.check_call(command, **kwargs)
+def call_for_output(command, *args, **kwargs):
+ command, args = _init_call(command, args, kwargs)
+ return _subprocess.check_output(command, **kwargs)
+def make_archive(input_dir, output_dir, archive_stem):
+ temp_dir = make_temp_dir()
+ temp_input_dir = join(temp_dir, archive_stem)
+ copy(input_dir, temp_input_dir)
+ make_dir(output_dir)
+ output_file = "{}.tar.gz".format(join(output_dir, archive_stem))
+ output_file = absolute_path(output_file)
+ with working_dir(temp_dir):
+ call("tar -czf {} {}", output_file, archive_stem)
+ return output_file
+def extract_archive(archive_file, output_dir):
+ assert is_file(archive_file)
+ if not exists(output_dir):
+ make_dir(output_dir)
+ archive_file = absolute_path(archive_file)
+ with working_dir(output_dir):
+ call("tar -xf {}", archive_file)
+ return output_dir
+def rename_archive(archive_file, new_archive_stem):
+ assert is_file(archive_file)
+ if name_stem(archive_file) == new_archive_stem:
+ return
+ temp_dir = make_temp_dir()
+ extract_archive(archive_file, temp_dir)
+ input_name = list_dir(temp_dir)[0]
+ input_dir = join(temp_dir, input_name)
+ output_file = make_archive(input_dir, temp_dir, new_archive_stem)
+ output_name = file_name(output_file)
+ archive_dir = parent_dir(archive_file)
+ new_archive_file = join(archive_dir, output_name)
+ move(output_file, new_archive_file)
+ remove(archive_file)
+ return new_archive_file
+def random_port(min=49152, max=65535):
+ return _random.randint(min, max)
+# Modified copytree impl that allows for already existing destination
+# dirs
+def _copytree(src, dst, symlinks=False, ignore=None):
+ """Recursively copy a directory tree using copy2().
+ If exception(s) occur, an Error is raised with a list of reasons.
+ If the optional symlinks flag is true, symbolic links in the
+ source tree result in symbolic links in the destination tree; if
+ it is false, the contents of the files pointed to by symbolic
+ links are copied.
+ The optional ignore argument is a callable. If given, it
+ is called with the `src` parameter, which is the directory
+ being visited by copytree(), and `names` which is the list of
+ `src` contents, as returned by os.listdir():
+ callable(src, names) -> ignored_names
+ Since copytree() is called recursively, the callable will be
+ called once for each directory that is copied. It returns a
+ list of names relative to the `src` directory that should
+ not be copied.
+ XXX Consider this example code rather than the ultimate tool.
+ """
+ names = _os.listdir(src)
+ if ignore is not None:
+ ignored_names = ignore(src, names)
+ else:
+ ignored_names = set()
+ if not exists(dst):
+ _os.makedirs(dst)
+ errors = []
+ for name in names:
+ if name in ignored_names:
+ continue
+ srcname = _os.path.join(src, name)
+ dstname = _os.path.join(dst, name)
+ try:
+ if symlinks and _os.path.islink(srcname):
+ linkto = _os.readlink(srcname)
+ _os.symlink(linkto, dstname)
+ elif _os.path.isdir(srcname):
+ _copytree(srcname, dstname, symlinks, ignore)
+ else:
+ # Will raise a SpecialFileError for unsupported file types
+ _shutil.copy2(srcname, dstname)
+ # catch the Error from the recursive copytree so that we can
+ # continue with other files
+ except _shutil.Error as err:
+ errors.extend(err.args[0])
+ except EnvironmentError as why:
+ errors.append((srcname, dstname, str(why)))
+ try:
+ _shutil.copystat(src, dst)
+ except OSError as why:
+ if _shutil.WindowsError is not None and isinstance \
+ (why, _shutil.WindowsError):
+ # Copying file access times may fail on Windows
+ pass
+ else:
+ errors.append((src, dst, str(why)))
+ if errors:
+ raise _shutil.Error(errors)