summaryrefslogtreecommitdiff
path: root/testtools/testrunner.py
blob: 6648372099638139b83e903bf329e04d88da47bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/python3
#
# Testcase runner for NetSurf projects
#
# Usage: testrunner <builddir> <testdir> <prefix> [<executable extension>]
#
# Operates upon INDEX files described in the README.
# Locates and executes testcases, feeding data files to programs 
# as appropriate.
# Logs testcase output to file.
# Aborts test sequence on detection of error.
#

import os
import sys
import queue
import threading
import subprocess

global builddir
global directory
global prefix
global exeext

if len(sys.argv) < 4:
    print("Usage: testrunner.pl <builddir> <testdir> <prefix> [<exeext>]")
    sys.exit(1)

# Get directories
builddir = sys.argv[1]
directory = sys.argv[2]
prefix = sys.argv[3]

# Get EXE extension (if any)
exeext = None
if len(sys.argv) > 4:
    exeext = sys.argv[4]

# Open log file and /dev/null
LOG = open(os.path.join(builddir, "testlog"), "w")


def run_test(test_exe, test_data):
    io_q = queue.Queue()

    def output_collector(identifier, stream):
        for line in stream:
            io_q.put((identifier, line.decode("utf-8").rstrip()))

        if not stream.closed:
            stream.close()

    # Run the test
    proc = subprocess.Popen([test_exe, test_data],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Threads for sticking output into queue
    tout = threading.Thread(target=output_collector, name='stdout-watcher',
            args=('STDOUT', proc.stdout))
    terr = threading.Thread(target=output_collector, name='stderr-watcher',
            args=('STDERR', proc.stderr))

    tout.start()
    terr.start()

    # Wait for test to finish
    ret = proc.wait()

    # And join the output collector threads
    tout.join()
    terr.join()

    # If last line of STDOUT is "PASS", then the test passed
    last_line = None

    # The STDERR output is just collected up and put at the end of LOG
    error_lines = []

    while not io_q.empty():
        identifier, line = io_q.get()
        if identifier == "STDOUT":
            LOG.write("%s\n" % line)
            last_line = line
        else:
            error_lines += line

    # Check for test process returning error
    if ret != 0:
        LOG.write("    FAIL: Exit status %i\n" % ret)
        last_line = "FAIL"

    # Dump stderr to log
    for error_line in error_lines:
        LOG.write("%s\n" % line)

    if not last_line == "PASS":
        last_line = "FAIL"

    # Print rest result
    print(last_line)
    return

# Open testcase index
with open(os.path.join(directory, "INDEX"), "r") as TINDEX:
    # Parse testcase index, looking for testcases
    for line in TINDEX:
        # Skip comments
        if line[0] == '#' or line.isspace():
            continue

        # Found one; decompose
        decomposed = list(filter(None, line.split('\t')))

        # Strip whitespace
        test = decomposed[0].strip()
        desc = decomposed[1].strip()
        data = None
        if len(decomposed) > 2:
            data = decomposed[2].strip()

        # Append prefix & EXE extension to binary name
        test = prefix + test
        if exeext:
            test += exeext

        print("Test: " + desc)

        if data:
            # Testcase has external data files

            # Open datafile index
            with open(os.path.join(directory, "data/" + data + "/INDEX"), "r") as DINDEX:
                # Parse datafile index, looking for datafiles
                for dline in DINDEX:
                    if dline[0] == '#' or dline.isspace():
                        continue

                    # Found one; decompose
                    ddecomposed = list(filter(None, dline.split('\t')))

                    # Strip whitespace
                    dtest = ddecomposed[0].strip()
                    ddesc = ddecomposed[1].strip()

                    LOG.write("Running %s %s\n" % (
                            os.path.join(builddir, test),
                            os.path.join(directory, "data/" + data + "/" + dtest)))

                    # Make message fit on an 80 column terminal
                    msg = "    ==> " + test + " [" + data + "/" + dtest + "]"
                    msg += "." * (80 - len(msg) - 8)

                    sys.stdout.write(msg)

                    # Run testcase
                    run_test(os.path.join(builddir, test),
                             os.path.join(directory, "data/" + data + "/" + dtest))
        else:
            # Testcase has no external data files
            LOG.write("Running %s\n" % (os.path.join(builddir, test)))

            # Make message fit on an 80 column terminal
            msg = "    ==> " + test
            msg += "." * (80 - len(msg) - 8)

            sys.stdout.write(msg)

            # Run testcase
            run_test(os.path.join(builddir, test))

        print("")