summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pbr/tests/base.py47
-rw-r--r--pbr/tests/test_integration.py60
-rw-r--r--pbr/tests/test_packaging.py30
3 files changed, 72 insertions, 65 deletions
diff --git a/pbr/tests/base.py b/pbr/tests/base.py
index 09bd642..e017b06 100644
--- a/pbr/tests/base.py
+++ b/pbr/tests/base.py
@@ -47,6 +47,7 @@ import sys
import fixtures
import testresources
import testtools
+from testtools import content
from pbr import options
@@ -147,6 +148,48 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
return result
+class CapturedSubprocess(fixtures.Fixture):
+ """Run a process and capture its output.
+
+ :attr stdout: The output (a string).
+ :attr stderr: The standard error (a string).
+ :attr returncode: The return code of the process.
+
+ Note that stdout and stderr are decoded from the bytestrings subprocess
+ returns using error=replace.
+ """
+
+ def __init__(self, label, *args, **kwargs):
+ """Create a CapturedSubprocess.
+
+ :param label: A label for the subprocess in the test log. E.g. 'foo'.
+ :param *args: The *args to pass to Popen.
+ :param **kwargs: The **kwargs to pass to Popen.
+ """
+ super(CapturedSubprocess, self).__init__()
+ self.label = label
+ self.args = args
+ self.kwargs = kwargs
+ self.kwargs['stderr'] = subprocess.PIPE
+ self.kwargs['stdin'] = subprocess.PIPE
+ self.kwargs['stdout'] = subprocess.PIPE
+
+ def setUp(self):
+ super(CapturedSubprocess, self).setUp()
+ proc = subprocess.Popen(*self.args, **self.kwargs)
+ out, err = proc.communicate()
+ self.out = out.decode('utf-8', 'replace')
+ self.err = err.decode('utf-8', 'replace')
+ self.addDetail(self.label + '-stdout', content.text_content(self.out))
+ self.addDetail(self.label + '-stderr', content.text_content(self.err))
+ self.returncode = proc.returncode
+ if proc.returncode:
+ raise AssertionError('Failed process %s' % proc.returncode)
+ self.addCleanup(delattr, self, 'out')
+ self.addCleanup(delattr, self, 'err')
+ self.addCleanup(delattr, self, 'returncode')
+
+
def _run_cmd(args, cwd):
"""Run the command args in cwd.
@@ -158,8 +201,8 @@ def _run_cmd(args, cwd):
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=cwd)
streams = tuple(s.decode('latin1').strip() for s in p.communicate())
- for content in streams:
- print(content)
+ for stream_content in streams:
+ print(stream_content)
return (streams) + (p.returncode,)
diff --git a/pbr/tests/test_integration.py b/pbr/tests/test_integration.py
index 30cc5a2..5dff189 100644
--- a/pbr/tests/test_integration.py
+++ b/pbr/tests/test_integration.py
@@ -13,12 +13,10 @@
import os.path
import shlex
-import subprocess
import fixtures
import testscenarios
import testtools
-from testtools import content
import virtualenv
from pbr.tests import base
@@ -52,48 +50,6 @@ def all_projects():
yield (short_name, dict(name=name, short_name=short_name))
-class CapturedSubprocess(fixtures.Fixture):
- """Run a process and capture its output.
-
- :attr stdout: The output (a string).
- :attr stderr: The standard error (a string).
- :attr returncode: The return code of the process.
-
- Note that stdout and stderr are decoded from the bytestrings subprocess
- returns using error=replace.
- """
-
- def __init__(self, label, *args, **kwargs):
- """Create a CapturedSubprocess.
-
- :param label: A label for the subprocess in the test log. E.g. 'foo'.
- :param *args: The *args to pass to Popen.
- :param **kwargs: The **kwargs to pass to Popen.
- """
- super(CapturedSubprocess, self).__init__()
- self.label = label
- self.args = args
- self.kwargs = kwargs
- self.kwargs['stderr'] = subprocess.PIPE
- self.kwargs['stdin'] = subprocess.PIPE
- self.kwargs['stdout'] = subprocess.PIPE
-
- def setUp(self):
- super(CapturedSubprocess, self).setUp()
- proc = subprocess.Popen(*self.args, **self.kwargs)
- out, err = proc.communicate()
- self.out = out.decode('utf-8', 'replace')
- self.err = err.decode('utf-8', 'replace')
- self.addDetail(self.label + '-stdout', content.text_content(self.out))
- self.addDetail(self.label + '-stderr', content.text_content(self.err))
- self.returncode = proc.returncode
- if proc.returncode:
- raise AssertionError('Failed process %s' % proc.returncode)
- self.addCleanup(delattr, self, 'out')
- self.addCleanup(delattr, self, 'err')
- self.addCleanup(delattr, self, 'returncode')
-
-
class TestIntegration(base.BaseTestCase):
scenarios = list(all_projects())
@@ -109,7 +65,7 @@ class TestIntegration(base.BaseTestCase):
path = self.useFixture(fixtures.TempDir()).path
virtualenv.create_environment(path, clear=True)
python = os.path.join(path, 'bin', 'python')
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'mkvenv-' + reason, [python] + PIP_CMD + [
'-U', PIPVERSION, 'wheel', PBRVERSION]))
return path, python
@@ -126,29 +82,29 @@ class TestIntegration(base.BaseTestCase):
# We don't break these into separate tests because we'd need separate
# source dirs to isolate from side effects of running pip, and the
# overheads of setup would start to beat the benefits of parallelism.
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'sync-req',
['python', 'update.py', os.path.join(REPODIR, self.short_name)],
cwd=os.path.join(REPODIR, 'requirements')))
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'commit-requirements',
'git diff --quiet || git commit -amrequirements',
cwd=os.path.join(REPODIR, self.short_name), shell=True))
path = os.path.join(
self.useFixture(fixtures.TempDir()).path, 'project')
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'clone',
['git', 'clone', os.path.join(REPODIR, self.short_name), path]))
_, python = self.venv('sdist')
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'sdist', [python, 'setup.py', 'sdist'], cwd=path))
_, python = self.venv('tarball')
filename = os.path.join(
path, 'dist', os.listdir(os.path.join(path, 'dist'))[0])
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'tarball', [python] + PIP_CMD + [filename]))
root, python = self.venv('install-git')
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'install-git', [python] + PIP_CMD + ['git+file://' + path]))
if self.short_name == 'nova':
found = False
@@ -157,7 +113,7 @@ class TestIntegration(base.BaseTestCase):
found = True
self.assertTrue(found)
_, python = self.venv('install-e')
- self.useFixture(CapturedSubprocess(
+ self.useFixture(base.CapturedSubprocess(
'install-e', [python] + PIP_CMD + ['-e', path]))
diff --git a/pbr/tests/test_packaging.py b/pbr/tests/test_packaging.py
index 9e846d8..81701d1 100644
--- a/pbr/tests/test_packaging.py
+++ b/pbr/tests/test_packaging.py
@@ -99,12 +99,27 @@ class GPGKeyFixture(fixtures.Fixture):
def setUp(self):
super(GPGKeyFixture, self).setUp()
tempdir = self.useFixture(fixtures.TempDir())
+ gnupg_version_re = re.compile('^gpg\s.*\s([\d+])\.([\d+])\.([\d+])')
+ gnupg_version = base._run_cmd(['gpg', '--version'], tempdir.path)
+ for line in gnupg_version[0].split('\n'):
+ gnupg_version = gnupg_version_re.match(line)
+ if gnupg_version:
+ gnupg_version = (int(gnupg_version.group(1)),
+ int(gnupg_version.group(2)),
+ int(gnupg_version.group(3)))
+ break
+ else:
+ if gnupg_version is None:
+ gnupg_version = (0, 0, 0)
config_file = tempdir.path + '/key-config'
f = open(config_file, 'wt')
try:
+ if gnupg_version[0] == 2 and gnupg_version[1] >= 1:
+ f.write("""
+ %no-protection
+ %transient-key
+ """)
f.write("""
- #%no-protection -- these would be ideal but they are documented
- #%transient-key -- but not implemented in gnupg!
%no-ask-passphrase
Key-Type: RSA
Name-Real: Example Key
@@ -119,16 +134,9 @@ class GPGKeyFixture(fixtures.Fixture):
# Note that --quick-random (--debug-quick-random in GnuPG 2.x)
# does not have a corresponding preferences file setting and
# must be passed explicitly on the command line instead
- gnupg_version_re = re.compile('gpg .* ([12])\.')
- gnupg_version = base._run_cmd(['gpg', '--version'], tempdir.path)
- for line in gnupg_version[0].split('\n'):
- gnupg_version = gnupg_version_re.match(line)
- if gnupg_version:
- gnupg_version = gnupg_version.group(1)
- break
- if gnupg_version == '1':
+ if gnupg_version[0] == 1:
gnupg_random = '--quick-random'
- elif gnupg_version == '2':
+ elif gnupg_version[0] >= 2:
gnupg_random = '--debug-quick-random'
else:
gnupg_random = ''