summaryrefslogtreecommitdiff
path: root/testrepository/tests/commands/test_load.py
diff options
context:
space:
mode:
Diffstat (limited to 'testrepository/tests/commands/test_load.py')
-rw-r--r--testrepository/tests/commands/test_load.py111
1 files changed, 40 insertions, 71 deletions
diff --git a/testrepository/tests/commands/test_load.py b/testrepository/tests/commands/test_load.py
index a0c2947..89a0589 100644
--- a/testrepository/tests/commands/test_load.py
+++ b/testrepository/tests/commands/test_load.py
@@ -18,8 +18,6 @@ from datetime import datetime, timedelta
from io import BytesIO
from tempfile import NamedTemporaryFile
-from extras import try_import
-v2_avail = try_import('subunit.ByteStreamToStreamResult')
import subunit
from subunit import iso8601
@@ -115,14 +113,11 @@ class TestCommandLoad(ResourcedTestCase):
self.assertEqual(0, cmd.execute())
def test_load_returns_1_on_failed_stream(self):
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- stream.status(test_id='foo', test_status='inprogress')
- stream.status(test_id='foo', test_status='fail')
- subunit_bytes = buffer.getvalue()
- else:
- subunit_bytes = _b('test: foo\nfailure: foo\n')
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ stream.status(test_id='foo', test_status='inprogress')
+ stream.status(test_id='foo', test_status='fail')
+ subunit_bytes = buffer.getvalue()
ui = UI([('subunit', subunit_bytes)])
cmd = load.load(ui)
ui.set_command(cmd)
@@ -131,14 +126,11 @@ class TestCommandLoad(ResourcedTestCase):
self.assertEqual(1, cmd.execute())
def test_load_new_shows_test_failures(self):
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- stream.status(test_id='foo', test_status='inprogress')
- stream.status(test_id='foo', test_status='fail')
- subunit_bytes = buffer.getvalue()
- else:
- subunit_bytes = b'test: foo\nfailure: foo\n'
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ stream.status(test_id='foo', test_status='inprogress')
+ stream.status(test_id='foo', test_status='fail')
+ subunit_bytes = buffer.getvalue()
ui = UI([('subunit', subunit_bytes)])
cmd = load.load(ui)
ui.set_command(cmd)
@@ -151,16 +143,13 @@ class TestCommandLoad(ResourcedTestCase):
ui.outputs[1:])
def test_load_new_shows_test_failure_details(self):
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- stream.status(test_id='foo', test_status='inprogress')
- stream.status(test_id='foo', test_status='fail',
- file_name="traceback", mime_type='text/plain;charset=utf8',
- file_bytes=b'arg\n')
- subunit_bytes = buffer.getvalue()
- else:
- subunit_bytes = b'test: foo\nfailure: foo [\narg\n]\n'
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ stream.status(test_id='foo', test_status='inprogress')
+ stream.status(test_id='foo', test_status='fail',
+ file_name="traceback", mime_type='text/plain;charset=utf8',
+ file_bytes=b'arg\n')
+ subunit_bytes = buffer.getvalue()
ui = UI([('subunit', subunit_bytes)])
cmd = load.load(ui)
ui.set_command(cmd)
@@ -183,14 +172,11 @@ class TestCommandLoad(ResourcedTestCase):
self.assertEqual(1, len(result.errors))
def test_load_new_shows_test_skips(self):
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- stream.status(test_id='foo', test_status='inprogress')
- stream.status(test_id='foo', test_status='skip')
- subunit_bytes = buffer.getvalue()
- else:
- subunit_bytes = b'test: foo\nskip: foo\n'
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ stream.status(test_id='foo', test_status='inprogress')
+ stream.status(test_id='foo', test_status='skip')
+ subunit_bytes = buffer.getvalue()
ui = UI([('subunit', subunit_bytes)])
cmd = load.load(ui)
ui.set_command(cmd)
@@ -251,20 +237,13 @@ class TestCommandLoad(ResourcedTestCase):
cmd.repository_factory.repos[ui.here].get_test_run(0)._partial)
def test_load_timed_run(self):
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- time = datetime(2011, 1, 1, 0, 0, 1, tzinfo=iso8601.Utc())
- stream.status(test_id='foo', test_status='inprogress', timestamp=time)
- stream.status(test_id='foo', test_status='success',
- timestamp=time+timedelta(seconds=2))
- timed_bytes = buffer.getvalue()
- else:
- timed_bytes = _b('time: 2011-01-01 00:00:01.000000Z\n'
- 'test: foo\n'
- 'time: 2011-01-01 00:00:03.000000Z\n'
- 'success: foo\n'
- 'time: 2011-01-01 00:00:06.000000Z\n')
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ time = datetime(2011, 1, 1, 0, 0, 1, tzinfo=iso8601.Utc())
+ stream.status(test_id='foo', test_status='inprogress', timestamp=time)
+ stream.status(test_id='foo', test_status='success',
+ timestamp=time+timedelta(seconds=2))
+ timed_bytes = buffer.getvalue()
ui = UI(
[('subunit', timed_bytes)])
cmd = load.load(ui)
@@ -284,27 +263,17 @@ class TestCommandLoad(ResourcedTestCase):
# If there's a previous run in the database, then show information
# about the high level differences in the test run: how many more
# tests, how many more failures, how much longer it takes.
- if v2_avail:
- buffer = BytesIO()
- stream = subunit.StreamResultToBytes(buffer)
- time = datetime(2011, 1, 2, 0, 0, 1, tzinfo=iso8601.Utc())
- stream.status(test_id='foo', test_status='inprogress', timestamp=time)
- stream.status(test_id='foo', test_status='fail',
- timestamp=time+timedelta(seconds=2))
- stream.status(test_id='bar', test_status='inprogress',
- timestamp=time+timedelta(seconds=4))
- stream.status(test_id='bar', test_status='fail',
- timestamp=time+timedelta(seconds=6))
- timed_bytes = buffer.getvalue()
- else:
- timed_bytes = _b('time: 2011-01-02 00:00:01.000000Z\n'
- 'test: foo\n'
- 'time: 2011-01-02 00:00:03.000000Z\n'
- 'error: foo\n'
- 'time: 2011-01-02 00:00:05.000000Z\n'
- 'test: bar\n'
- 'time: 2011-01-02 00:00:07.000000Z\n'
- 'error: bar\n')
+ buffer = BytesIO()
+ stream = subunit.StreamResultToBytes(buffer)
+ time = datetime(2011, 1, 2, 0, 0, 1, tzinfo=iso8601.Utc())
+ stream.status(test_id='foo', test_status='inprogress', timestamp=time)
+ stream.status(test_id='foo', test_status='fail',
+ timestamp=time+timedelta(seconds=2))
+ stream.status(test_id='bar', test_status='inprogress',
+ timestamp=time+timedelta(seconds=4))
+ stream.status(test_id='bar', test_status='fail',
+ timestamp=time+timedelta(seconds=6))
+ timed_bytes = buffer.getvalue()
ui = UI(
[('subunit', timed_bytes)])
cmd = load.load(ui)