summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomi Richards <thomi.richards@canonical.com>2013-11-20 11:06:07 +1300
committerThomi Richards <thomi.richards@canonical.com>2013-11-20 11:06:07 +1300
commit8ac7e16b1c5e83a55ee3cd76086b5c2eb74fe09e (patch)
treecbfbd8f47d1fdb62a8d40de588e422328170aba5
parent31de7c17f4f9239be6d8c91302f59633117c2012 (diff)
downloadsubunit-8ac7e16b1c5e83a55ee3cd76086b5c2eb74fe09e.tar.gz
Lots of code cleanup, about to refactor argument parsing.
-rw-r--r--filters/subunit-output2
-rw-r--r--python/subunit/_output.py190
-rw-r--r--python/subunit/tests/test_output_filter.py143
-rwxr-xr-xsetup.py1
4 files changed, 182 insertions, 154 deletions
diff --git a/filters/subunit-output b/filters/subunit-output
index 12c68f5..cb2af2d 100644
--- a/filters/subunit-output
+++ b/filters/subunit-output
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2013 Thomi Richards <thomi.richards@canonical.com>
+# Copyright (C) 2013 'Subunit Contributors'
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
diff --git a/python/subunit/_output.py b/python/subunit/_output.py
index c8b6a21..c65fbe0 100644
--- a/python/subunit/_output.py
+++ b/python/subunit/_output.py
@@ -1,5 +1,5 @@
# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2013 Thomi Richards <thomi.richards@canonical.com>
+# Copyright (C) 2013 'Subunit Contributors'
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@@ -15,9 +15,11 @@
from argparse import ArgumentParser
import datetime
from functools import partial
-from sys import stdout
+from sys import stdin, stdout
+
from testtools.compat import _b
+from subunit.iso8601 import UTC
from subunit.v2 import StreamResultToBytes
@@ -25,7 +27,6 @@ def output_main():
args = parse_arguments()
output = get_output_stream_writer()
generate_bytestream(args, output)
-
return 0
@@ -36,40 +37,53 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
ParserClass can be specified to override the class we use to parse the
command-line arguments. This is useful for testing.
-
"""
- parser = ParserClass(
- prog='subunit-output',
- description="A tool to generate a subunit result byte-stream",
- usage="""%(prog)s [-h] action [-h] test [--attach-file ATTACH_FILE]
- [--mimetype MIMETYPE] [--tags TAGS]""",
- epilog="""Additional help can be printed by passing -h to an action
- (e.g.- '%(prog)s pass -h' will show help for the 'pass' action)."""
- )
- common_args = ParserClass(add_help=False)
- common_args.add_argument(
- "test_id",
- help="A string that uniquely identifies this test."
- )
- common_args.add_argument(
+ file_args = ParserClass(add_help=False)
+ file_args.add_argument(
"--attach-file",
- type=open,
- help="Attach a file to the result stream for this test."
+ help="Attach a file to the result stream for this test. If '-' is "
+ "specified, stdin will be read instead. In this case, the file "
+ "name will be set to 'stdin' (but can still be overridden with "
+ "the --file-name option)."
)
- common_args.add_argument(
+ file_args.add_argument(
+ "--file-name",
+ help="The name to give this file attachment. If not specified, the "
+ "name of the file on disk will be used, or 'stdin' in the case "
+ "where '-' was passed to the '--attach-file' argument. This option"
+ " may only be specified when '--attach-file' is specified.",
+ )
+ file_args.add_argument(
"--mimetype",
help="The mime type to send with this file. This is only used if the "
- "--attach-file argument is used. This argument is optional. If it is "
- "not specified, the file will be sent wihtout a mime type.",
+ "--attach-file argument is used. This argument is optional. If it "
+ "is not specified, the file will be sent wihtout a mime type. This "
+ "option may only be specified when '--attach-file' is specified.",
default=None
)
+
+ common_args = ParserClass(add_help=False)
+ common_args.add_argument(
+ "test_id",
+ help="A string that uniquely identifies this test."
+ )
common_args.add_argument(
"--tags",
help="A comma-separated list of tags to associate with this test.",
type=lambda s: s.split(','),
default=None
)
+
+ parser = ParserClass(
+ prog='subunit-output',
+ description="A tool to generate a subunit result byte-stream",
+ usage="%(prog)s [-h] action [-h] test [--attach-file ATTACH_FILE]"
+ "[--mimetype MIMETYPE] [--tags TAGS]",
+ epilog="Additional help can be printed by passing -h to an action"
+ "(e.g.- '%(prog)s pass -h' will show help for the 'pass' action).",
+ parents=[file_args]
+ )
sub_parsers = parser.add_subparsers(
dest="action",
title="actions",
@@ -80,63 +94,65 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
"for this test id after this one."
sub_parsers.add_parser(
- "start",
- help="Start a test.",
- parents=[common_args]
+ "inprogress",
+ help="Report that a test is in progress.",
+ parents=[common_args, file_args]
)
sub_parsers.add_parser(
- "pass",
- help="Pass a test. " + final_state,
- parents=[common_args],
+ "success",
+ help="Report that a test has succeeded. " + final_state,
+ parents=[common_args, file_args],
)
sub_parsers.add_parser(
"fail",
- help="Fail a test. " + final_state,
- parents=[common_args]
+ help="Report that a test has failed. " + final_state,
+ parents=[common_args, file_args]
)
sub_parsers.add_parser(
"skip",
- help="Skip a test. " + final_state,
- parents=[common_args]
+ help="Report that a test was skipped. " + final_state,
+ parents=[common_args, file_args]
)
sub_parsers.add_parser(
"exists",
- help="Marks a test as existing. " + final_state,
- parents=[common_args]
+ help="Report that a test exists. " + final_state,
+ parents=[common_args, file_args]
)
sub_parsers.add_parser(
- "expected-fail",
- help="Marks a test as failing expectedly (this is not counted as a "
- "failure). " + final_state,
- parents=[common_args],
+ "xfail",
+ help="Report that a test has failed expectedly (this is not counted as "
+ "a failure). " + final_state,
+ parents=[common_args, file_args],
)
sub_parsers.add_parser(
- "unexpected-success",
- help="Marks a test as succeeding unexpectedly (this is counted as a "
- "failure). " + final_state,
- parents=[common_args],
+ "uxsuccess",
+ help="Report that a test has succeeded unexpectedly (this is counted "
+ " as a failure). " + final_state,
+ parents=[common_args, file_args],
)
- return parser.parse_args(args)
-
-
-def translate_command_name(command_name):
- """Turn the friendly command names we show users on the command line into
- something subunit understands.
-
- """
- return {
- 'start': 'inprogress',
- 'pass': 'success',
- 'expected-fail': 'xfail',
- 'unexpected-success': 'uxsuccess',
- }.get(command_name, command_name)
+ args = parser.parse_args(args)
+ if args.mimetype and not args.attach_file:
+ parser.error("Cannot specify --mimetype without --attach_file")
+ if args.file_name and not args.attach_file:
+ parser.error("Cannot specify --file-name without --attach_file")
+ if args.attach_file:
+ if args.attach_file == '-':
+ if not args.file_name:
+ args.file_name = 'stdin'
+ args.attach_file = stdin
+ else:
+ try:
+ args.attach_file = open(args.attach_file)
+ except IOError as e:
+ parser.error("Cannot open %s (%s)" % (args.attach_file, e.strerror))
+ return args
def get_output_stream_writer():
@@ -147,57 +163,49 @@ def generate_bytestream(args, output_writer):
output_writer.startTestRun()
if args.attach_file:
write_chunked_file(
- args.attach_file,
- args.test_id,
- output_writer,
- args.mimetype,
+ file_obj=args.attach_file,
+ test_id=args.test_id,
+ output_writer=output_writer,
+ mime_type=args.mimetype,
)
output_writer.status(
test_id=args.test_id,
- test_status=translate_command_name(args.action),
+ test_status=args.action,
timestamp=create_timestamp(),
test_tags=args.tags,
)
output_writer.stopTestRun()
-def write_chunked_file(file_obj, test_id, output_writer, chunk_size=1024,
- mime_type=None):
+def write_chunked_file(file_obj, output_writer, chunk_size=1024,
+ mime_type=None, test_id=None, file_name=None):
reader = partial(file_obj.read, chunk_size)
+
+ write_status = output_writer.status
+ if mime_type is not None:
+ write_status = partial(
+ write_status,
+ mime_type=mime_type
+ )
+ if test_id is not None:
+ write_status = partial(
+ write_status,
+ test_id=test_id
+ )
+ filename = file_name if file_name else file_obj.name
+
for chunk in iter(reader, _b('')):
- output_writer.status(
- test_id=test_id,
- file_name=file_obj.name,
+ write_status(
+ file_name=filename,
file_bytes=chunk,
- mime_type=mime_type,
eof=False,
)
- output_writer.status(
- test_id=test_id,
- file_name=file_obj.name,
+ write_status(
+ file_name=filename,
file_bytes=_b(''),
- mime_type=mime_type,
eof=True,
)
-_ZERO = datetime.timedelta(0)
-
-
-class UTC(datetime.tzinfo):
-
- def utcoffset(self, dt):
- return _ZERO
-
- def tzname(self, dt):
- return "UTC"
-
- def dst(self, dt):
- return _ZERO
-
-
-utc = UTC()
-
-
def create_timestamp():
- return datetime.datetime.now(utc)
+ return datetime.datetime.now(UTC)
diff --git a/python/subunit/tests/test_output_filter.py b/python/subunit/tests/test_output_filter.py
index be42ea6..8fda9aa 100644
--- a/python/subunit/tests/test_output_filter.py
+++ b/python/subunit/tests/test_output_filter.py
@@ -1,6 +1,6 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Thomi Richards <thomi.richards@canonical.com>
+# Copyright (C) 2013 'Subunit Contributors'
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@@ -18,10 +18,13 @@
import argparse
import datetime
from functools import partial
-from io import BytesIO
+from io import BytesIO, StringIO
+import sys
from tempfile import NamedTemporaryFile
+
+from testscenarios import WithScenarios
from testtools import TestCase
-from testtools.compat import _b
+from testtools.compat import _b, _u
from testtools.matchers import (
Equals,
IsInstance,
@@ -31,12 +34,11 @@ from testtools.matchers import (
)
from testtools.testresult.doubles import StreamResult
+from subunit.iso8601 import UTC
from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
from subunit._output import (
generate_bytestream,
parse_arguments,
- translate_command_name,
- utc,
write_chunked_file,
)
import subunit._output as _o
@@ -55,17 +57,19 @@ class SafeArgumentParser(argparse.ArgumentParser):
safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
-class OutputFilterArgumentParserTests(TestCase):
+class TestStatusArgParserTests(WithScenarios, TestCase):
- _all_supported_commands = (
- 'exists',
- 'expected-fail',
- 'fail',
- 'pass',
- 'skip',
- 'start',
- 'unexpected-success',
- )
+ scenarios = [
+ (cmd, dict(command=cmd)) for cmd in (
+ 'exists',
+ 'xfail',
+ 'fail',
+ 'success',
+ 'skip',
+ 'inprogress',
+ 'uxsuccess',
+ )
+ ]
def _test_command(self, command, test_id):
args = safe_parse_arguments(args=[command, test_id])
@@ -74,55 +78,49 @@ class OutputFilterArgumentParserTests(TestCase):
self.assertThat(args.test_id, Equals(test_id))
def test_can_parse_all_commands_with_test_id(self):
- for command in self._all_supported_commands:
- self._test_command(command, self.getUniqueString())
-
- def test_command_translation(self):
- self.assertThat(
- translate_command_name('start'),
- Equals('inprogress')
- )
- self.assertThat(
- translate_command_name('pass'),
- Equals('success')
- )
- self.assertThat(
- translate_command_name('expected-fail'),
- Equals('xfail')
- )
- self.assertThat(
- translate_command_name('unexpected-success'),
- Equals('uxsuccess')
- )
- for command in ('fail', 'skip', 'exists'):
- self.assertThat(translate_command_name(command), Equals(command))
+ self._test_command(self.command, self.getUniqueString())
def test_all_commands_parse_file_attachment(self):
with NamedTemporaryFile() as tmp_file:
- for command in self._all_supported_commands:
- args = safe_parse_arguments(
- args=[command, 'foo', '--attach-file', tmp_file.name]
- )
- self.assertThat(args.attach_file.name, Equals(tmp_file.name))
+ args = safe_parse_arguments(
+ args=[self.command, 'foo', '--attach-file', tmp_file.name]
+ )
+ self.assertThat(args.attach_file.name, Equals(tmp_file.name))
def test_all_commands_accept_mimetype_argument(self):
- for command in self._all_supported_commands:
+ with NamedTemporaryFile() as tmp_file:
args = safe_parse_arguments(
- args=[command, 'foo', '--mimetype', "text/plain"]
+ args=[self.command, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
)
self.assertThat(args.mimetype, Equals("text/plain"))
def test_all_commands_accept_tags_argument(self):
- for command in self._all_supported_commands:
+ args = safe_parse_arguments(
+ args=[self.command, 'foo', '--tags', "foo,bar,baz"]
+ )
+ self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
+
+ def test_attach_file_with_hyphen_opens_stdin(self):
+ self.patch(_o, 'stdin', StringIO(_u("Hello")))
+ args = safe_parse_arguments(
+ args=[self.command, "foo", "--attach-file", "-"]
+ )
+
+ self.assertThat(args.attach_file.read(), Equals("Hello"))
+
+
+class GlobalFileAttachmentTests(TestCase):
+
+ def test_can_parse_attach_file_without_test_id(self):
+ with NamedTemporaryFile() as tmp_file:
args = safe_parse_arguments(
- args=[command, 'foo', '--tags', "foo,bar,baz"]
+ args=["--attach-file", tmp_file.name]
)
- self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
-
+ self.assertThat(args.attach_file.name, Equals(tmp_file.name))
class ByteStreamCompatibilityTests(TestCase):
- _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
+ _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
def setUp(self):
super(ByteStreamCompatibilityTests, self).setUp()
@@ -135,7 +133,6 @@ class ByteStreamCompatibilityTests(TestCase):
parsing *commands as if they were specified on the command line. The
resulting bytestream is then converted back into a result object and
returned.
-
"""
stream = BytesIO()
@@ -153,7 +150,7 @@ class ByteStreamCompatibilityTests(TestCase):
def test_start_generates_inprogress(self):
result = self._get_result_for(
- ['start', 'foo'],
+ ['inprogress', 'foo'],
)
self.assertThat(
@@ -168,7 +165,7 @@ class ByteStreamCompatibilityTests(TestCase):
def test_pass_generates_success(self):
result = self._get_result_for(
- ['pass', 'foo'],
+ ['success', 'foo'],
)
self.assertThat(
@@ -228,7 +225,7 @@ class ByteStreamCompatibilityTests(TestCase):
def test_expected_fail_generates_xfail(self):
result = self._get_result_for(
- ['expected-fail', 'foo'],
+ ['xfail', 'foo'],
)
self.assertThat(
@@ -243,7 +240,7 @@ class ByteStreamCompatibilityTests(TestCase):
def test_unexpected_success_generates_uxsuccess(self):
result = self._get_result_for(
- ['unexpected-success', 'foo'],
+ ['uxsuccess', 'foo'],
)
self.assertThat(
@@ -273,21 +270,23 @@ class ByteStreamCompatibilityTests(TestCase):
class FileChunkingTests(TestCase):
- def _write_chunk_file(self, file_data, chunk_size, mimetype=None):
+ def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None):
"""Write file data to a subunit stream, get a StreamResult object."""
stream = BytesIO()
output_writer = StreamResultToBytes(output_stream=stream)
with NamedTemporaryFile() as f:
+ self._tmp_filename = f.name
f.write(file_data)
f.seek(0)
write_chunked_file(
- f,
- 'foo_test',
- output_writer,
- chunk_size,
- mimetype
+ file_obj=f,
+ output_writer=output_writer,
+ chunk_size=chunk_size,
+ mime_type=mimetype,
+ test_id='foo_test',
+ file_name=filename,
)
stream.seek(0)
@@ -298,7 +297,7 @@ class FileChunkingTests(TestCase):
return result
def test_file_chunk_size_is_honored(self):
- result = self._write_chunk_file(_b("Hello"), 1)
+ result = self._write_chunk_file(file_data=_b("Hello"), chunk_size=1)
self.assertThat(
result._events,
MatchesListwise([
@@ -312,7 +311,7 @@ class FileChunkingTests(TestCase):
)
def test_file_mimetype_is_honored(self):
- result = self._write_chunk_file(_b("SomeData"), 1024, "text/plain")
+ result = self._write_chunk_file(file_data=_b("SomeData"), mimetype="text/plain")
self.assertThat(
result._events,
MatchesListwise([
@@ -321,6 +320,26 @@ class FileChunkingTests(TestCase):
])
)
+ def test_file_name_is_honored(self):
+ result = self._write_chunk_file(file_data=_b("data"), filename="/some/name")
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesCall(call='status', file_name='/some/name'),
+ MatchesCall(call='status', file_name='/some/name'),
+ ])
+ )
+
+ def test_default_filename_is_used(self):
+ result = self._write_chunk_file(file_data=_b("data"))
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesCall(call='status', file_name=self._tmp_filename),
+ MatchesCall(call='status', file_name=self._tmp_filename),
+ ])
+ )
+
class MatchesCall(Matcher):
diff --git a/setup.py b/setup.py
index d319d9c..2f22300 100755
--- a/setup.py
+++ b/setup.py
@@ -11,6 +11,7 @@ else:
'install_requires': [
'extras',
'testtools>=0.9.30',
+ 'testscenarios',
]
}