#!/usr/bin/python # Copyright 2013 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import cliapp import collections import logging import os import re import shutil import sys import tempfile import time import ttystatus import cmdtestlib import yarnlib class YarnRunner(cliapp.Application): def add_settings(self): self.settings.boolean( ['no-act', 'dry-run', 'pretend', 'n'], 'do not actually run any tests, merely print what would be run') self.settings.boolean( ['quiet', 'q'], 'be quiet, avoid progress reporting, only show errors') self.settings.boolean( ['verbose', 'v'], 'make progress reporting be more verbose ("wall of text"), ' 'instead of a one-line status info; this is turned ' 'automatically if there is not terminal') self.settings.string_list( ['shell-library', 's'], 'include a shell library for the IMPLEMENTS sections to use') self.settings.string_list( ['run', 'r'], 'run only TEST (this option can be repeated)', metavar='TEST') self.settings.string( ['tempdir'], 'use DIR as the temporary directory for tests; ' 'it should be empty or not exist', metavar='DIR') self.settings.string_list( ['env'], 'add NAME=VALUE to the environment when tests are run', metavar='NAME=VALUE') self.settings.boolean( ['snapshot'], 'make snapshots of test working directory ' 'after each scenario step; you probably ' 'want to use this with --tempdir') self.settings.boolean( ['timings'], 'report wall clock time for each scenario and step') self.settings.boolean( ['allow-missing-steps'], 'allow scenarios to reference steps that do not exist, ' 'by warning about them, but otherwise ignoring the scenarios') def info(self, msg): if self.settings['verbose']: logging.info(msg) self.output.write('%s\n' % msg) def warning(self, msg): if self.settings['verbose']: logging.warning(msg) self.output.write('WARNING: %s\n' % msg) elif not self.settings['quiet']: self.ts.notify('WARNING: %s' % msg) def error(self, msg): if self.settings['verbose']: logging.info(msg) sys.stderr.write('%s\n' % msg) elif not self.settings['quiet']: self.ts.error(msg) def process_args(self, args): # Do we have tty? If not, turn on --verbose, unless --quiet. if not self.settings['quiet']: try: open('/dev/tty', 'w') except IOError: self.settings['verbose'] = True self.ts = ttystatus.TerminalStatus(period=0.001) if not self.settings['quiet'] and not self.settings['verbose']: self.ts.format( '%ElapsedTime() %Index(current_step,all_steps): ' '%String(scenario_name): ' '%String(step_name)') if self.settings['tempdir']: self.tempdir = os.path.abspath(self.settings['tempdir']) if not os.path.exists(self.tempdir): os.mkdir(self.tempdir) else: self.tempdir = tempfile.mkdtemp() scenarios, implementations = self.parse_scenarios(args) sv = yarnlib.ScenarioValidator(scenarios) sv.validate_all() scenarios = self.connect_implementations(scenarios, implementations) shell_prelude = self.load_shell_libraries() self.info('Found %d scenarios' % len(scenarios)) all_steps = [] for scenario in scenarios: all_steps.extend(scenario.steps) self.ts['all_steps'] = all_steps self.scenarios_run = 0 self.skipped_for_assuming = 0 self.steps_run = 0 self.timings = [] scenario_runner = yarnlib.ScenarioRunner(shell_prelude, os.getcwd(), self.parse_env(), pre_step_cb=self.pre_step, post_step_cb=self.post_step) start_time = time.time() failed_scenarios = [] for scenario in self.select_scenarios(scenarios): if not self.run_scenario(scenario_runner, scenario): failed_scenarios.append(scenario) duration = time.time() - start_time if not self.settings['snapshot']: shutil.rmtree(self.tempdir) if not self.settings['quiet']: self.ts.clear() self.ts.finish() if failed_scenarios: raise cliapp.AppException( 'Test suite FAILED in %s scenarios' % len(failed_scenarios)) if not self.settings['quiet']: print ( 'Scenario test suite PASS, with %d scenarios ' '(%d total steps), ' 'in %.1f seconds' % (self.scenarios_run, self.steps_run, duration)) if self.skipped_for_assuming: print ('Scenarios SKIPPED due to ASSUMING step failing: %d' % self.skipped_for_assuming) if self.settings['timings']: self.report_timings() def parse_scenarios(self, filenames): mdparser = yarnlib.MarkdownParser() for filename in filenames: self.info('Parsing scenario file %s' % filename) blocks = mdparser.parse_file(filename) if not blocks: self.warning('No scenario code blocks in %s' % filename) block_parser = yarnlib.BlockParser() block_parser.parse_blocks(mdparser.blocks) return block_parser.scenarios, block_parser.implementations def connect_implementations(self, scenarios, implementations): if self.settings['allow-missing-steps']: def warn_missing(scenario, step): self.warning( 'Scenario %s has missing step %s %s' % (scenario.name, step.what, step.text)) return True step_connector = yarnlib.ScenarioStepConnector( implementations, missing_step_cb=warn_missing) else: step_connector = yarnlib.ScenarioStepConnector(implementations) return step_connector.connect_implementations(scenarios) def load_shell_libraries(self): if not self.settings['shell-library']: self.info('No shell libraries defined') return '' return yarnlib.shell_libraries.load( self.settings['shell-library'], pre_read_cb=lambda fn: self.info('Loading shell library %s' % fn)) def select_scenarios(self, scenarios): def normalise(s): return ' '.join(s.lower().split()) def matches(a, b): return normalise(a) == normalise(b) if self.settings['run']: result = [] for name in self.settings['run']: for s in scenarios: if matches(s.name, name) and s not in result: result.append(s) break return result return scenarios def run_scenario(self, scenario_runner, scenario): self.start_scenario_timing(scenario.name) started = time.time() self.info('Running scenario %s' % scenario.name) self.ts['scenario_name'] = scenario.name self.scenarios_run += 1 if self.settings['no-act']: self.info('Pretending everything went OK') for step in scenario.steps: self.ts['current_step'] = step self.remember_scenario_timing(time.time() - started) return True os.mkdir(self.scenario_dir(self.tempdir, scenario)) datadir = self.datadir(self.tempdir, scenario) os.mkdir(datadir) self.info('DATADIR is %s' % datadir) homedir = self.homedir(datadir) os.mkdir(homedir) self.info('HOME for tests is %s' % homedir) ok = scenario_runner.run_scenario(scenario, datadir, homedir) self.remember_scenario_timing(time.time() - started) return ok def homedir(self, datadir): return os.path.join(datadir, 'HOME') def parse_env(self): for option_arg in self.settings['env']: if '=' not in option_arg: raise cliapp.AppException( '--env argument must contain "=" ' 'to separate environment variable name and value') key, value = option_arg.split('=', 1) yield key, value def pre_step(self, scenario, step, step_number, scenario_env): started = time.time() self.info('Running step "%s %s"' % (step.what, step.text)) self.ts['current_step'] = step self.ts['step_name'] = '%s %s' % (step.what, step.text) self.steps_run += 1 return (started,) def post_step(self, scenario, step, step_number, scenario_env, exit, stdout, stderr, pre_step_userdata): stopped = time.time() (started,) = pre_step_userdata logging.debug('Exit code: %d' % exit) if stdout: logging.debug('Standard output:\n%s' % self.indent(stdout)) else: logging.debug('Standard output: empty') if stderr: logging.debug('Standard error:\n%s' % self.indent(stderr)) else: logging.debug('Standard error: empty') if exit != 0: if step.what == 'ASSUMING': self.ts.notify( 'Skipping "%s" because "%s %s" failed' % (scenario.name, step.what, step.text)) self.skipped_for_assuming += 1 else: self.error( 'ERROR: In scenario "%s"\nstep "%s %s" failed,\n' 'with exit code %d:\n' 'Standard output from shell command:\n%s' 'Standard error from shell command:\n%s' % (scenario.name, step.what, step.text, exit, self.indent(stdout), self.indent(stderr))) self.remember_step_timing( '%s %s' % (step.what, step.text), stopped - started) self.snapshot_datadir(self.tempdir, scenario_env['DATADIR'], scenario, step_number, step) def scenario_dir(self, tempdir, scenario): return os.path.join(tempdir, self.nice(scenario.name)) def datadir(self, tempdir, scenario): sd = self.scenario_dir(tempdir, scenario) return os.path.join(sd, 'datadir') def snapshot_dir(self, tempdir, scenario, step, step_number): sd = self.scenario_dir(tempdir, scenario) base = '%03d-%s-%s' % (step_number, step.what, self.nice(step.text)) return os.path.join(sd, base) def snapshot_datadir(self, tempdir, datadir, scenario, step_number, step): snapshot = self.snapshot_dir(tempdir, scenario, step, step_number) cliapp.runcmd(['cp', '-a', datadir, snapshot]) def nice(self, name): # Quote a scenario or step name so it forms a nice filename. nice_chars = "abcdefghijklmnopqrstuvwxyz" nice_chars += nice_chars.upper() nice_chars += "0123456789-." nice = [] for c in name: if c in nice_chars: nice.append(c) elif not nice or nice[-1] != '_': nice.append('_') nice = ''.join(nice) return nice def indent(self, s): return ''.join(' %s\n' % line for line in s.splitlines()) def start_scenario_timing(self, scenario_name): self.timings.append((scenario_name, None, [])) def remember_scenario_timing(self, duration): scenario_name, _, step_tuples = self.timings[-1] self.timings[-1] = (scenario_name, duration, step_tuples) def remember_step_timing(self, step_name, step_duration): scenario_name, scenario_duration, step_tuples = self.timings[-1] step_tuples = step_tuples + [(step_name, step_duration)] self.timings[-1] = (scenario_name, scenario_duration, step_tuples) def report_timings(self): for scenario_name, scenario_duration, step_tuples in self.timings: print '%5.1f %s' % (scenario_duration, scenario_name) for step_name, step_duration in step_tuples: print ' %5.1f %s' % (step_duration, step_name) YarnRunner(version=cmdtestlib.__version__).run()