# Copyright 2016 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import argparse import json import logging import os import re import socket import shlex import sys import time import traceback import unittest import urlparse sys.path.append(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'third_party', 'webdriver', 'pylib')) from selenium import webdriver from selenium.webdriver.chrome.options import Options def ParseFlags(): """Parses the given command line arguments. Returns: A new Namespace object with class properties for each argument added below. See pydoc for argparse. """ def TestFilter(v): try: # The filtering here allows for any number of * wildcards with a required # . separator between classname and methodname, but no other special # characters. return re.match(r'^([A-Za-z_0-9\*]+\.)?[A-Za-z_0-9\*]+$', v).group(0) except: raise argparse.ArgumentTypeError('Test filter "%s" is not a valid filter' % v) parser = argparse.ArgumentParser() parser.add_argument('--browser_args', type=str, help='Override browser flags ' 'in code with these flags') parser.add_argument('--via_header_value', default='1.1 Chrome-Compression-Proxy', help='What the via should match to ' 'be considered valid') parser.add_argument('--android', help='If given, attempts to run the test on ' 'Android via adb. Ignores usage of --chrome_exec', action='store_true') parser.add_argument('--android_package', default='com.android.chrome', help='Set the android package for Chrome') parser.add_argument('--chrome_exec', type=str, help='The path to ' 'the Chrome or Chromium executable') parser.add_argument('chrome_driver', type=str, help='The path to ' 'the ChromeDriver executable. If not given, the default system chrome ' 'will be used.') parser.add_argument('--disable_buffer', help='Causes stdout and stderr from ' 'tests to output normally. Otherwise, the standard output and standard ' 'error streams are buffered during the test run, and output from a ' 'passing test is discarded. Output will always be echoed normally on test ' 'fail or error and is added to the failure messages.', action='store_true') parser.add_argument('-c', '--catch', help='Control-C during the test run ' 'waits for the current test to end and then reports all the results so ' 'far. A second Control-C raises the normal KeyboardInterrupt exception.', action='store_true') parser.add_argument('-f', '--failfast', help='Stop the test run on the first ' 'error or failure.', action='store_true') parser.add_argument('--test_filter', '--gtest_filter', type=TestFilter, help='The filter to use when discovering tests to run, in the form ' '. Wildcards (*) are accepted. Default=*', default='*') parser.add_argument('--logging_level', choices=['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRIT'], default='WARN', help='The logging verbosity for log ' 'messages, printed to stderr. To see stderr logging output during a ' 'successful test run, also pass --disable_buffer. Default=ERROR') parser.add_argument('--log_file', help='If given, write logging statements ' 'to the given file instead of stderr.') return parser.parse_args(sys.argv[1:]) def GetLogger(name='common'): """Creates a Logger instance with the given name and returns it. If a logger has already been created with the same name, that instance is returned instead. Args: name: The name of the logger to return. Returns: A logger with the given name. """ logger = logging.getLogger(name) if hasattr(logger, "initialized"): return logger logging_level = ParseFlags().logging_level if logging_level == 'DEBUG': logger.setLevel(logging.DEBUG) elif logging_level == 'INFO': logger.setLevel(logging.INFO) elif logging_level == 'WARN': logger.setLevel(logging.WARNING) elif logging_level == 'ERROR': logger.setLevel(logging.ERROR) elif logging_level == 'CRIT': logger.setLevel(logging.CRITICAL) else: logger.setLevel(logging.NOTSET) formatter = logging.Formatter('%(asctime)s %(funcName)s() %(levelname)s: ' '%(message)s') if ParseFlags().log_file: fh = logging.FileHandler(ParseFlags().log_file) fh.setFormatter(formatter) logger.addHandler(fh) else: ch = logging.StreamHandler(sys.stderr) ch.setFormatter(formatter) logger.addHandler(ch) logger.initialized = True return logger class TestDriver: """The main driver for an integration test. This class is the tool that is used by every integration test to interact with the Chromium browser and validate proper functionality. This class sits on top of the Selenium Chrome Webdriver with added utility and helper functions for Chrome-Proxy. This class should be used with Python's 'with' operator. Attributes: _flags: A Namespace object from the call to ParseFlags() _driver: A reference to the driver object from the Chrome Driver library. _chrome_args: A set of string arguments to start Chrome with. _url: The string URL that Chrome will navigate to for this test. _has_logs: Boolean flag set when a page is loaded and cleared when logs are fetched. """ def __init__(self): self._flags = ParseFlags() self._driver = None self._chrome_args = set() self._url = '' self._logger = GetLogger(name='TestDriver') self._has_logs = False def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): if self._driver: self._StopDriver() def _OverrideChromeArgs(self): """Overrides any given arguments in the code with those given on the command line. Arguments that need to be overridden may contain different values for a flag given in the code. In that case, check by the flag whether to override the argument. """ def GetDictKey(argument): return argument.split('=', 1)[0] if self._flags.browser_args and len(self._flags.browser_args) > 0: # Build a dict of flags mapped to the whole argument. original_args = {} for arg in self._chrome_args: original_args[GetDictKey(arg)] = arg # Override flags given in code with any command line arguments. for override_arg in shlex.split(self._flags.browser_args): arg_key = GetDictKey(override_arg) if arg_key in original_args: self._chrome_args.remove(original_args[arg_key]) self._logger.info('Removed Chrome flag. %s', original_args[arg_key]) self._chrome_args.add(override_arg) self._logger.info('Added Chrome flag. %s', override_arg) # Always add the flag that allows histograms to be queried in javascript. self._chrome_args.add('--enable-stats-collection-bindings') def _StartDriver(self): """Parses the flags to pass to Chromium, then starts the ChromeDriver. If running Android, the Android package name is passed to ChromeDriver here. """ self._OverrideChromeArgs() capabilities = { 'loggingPrefs': {'performance': 'INFO'} } chrome_options = Options() for arg in self._chrome_args: chrome_options.add_argument(arg) self._logger.info('Starting Chrome with these flags: %s', str(self._chrome_args)) if self._flags.android: chrome_options.add_experimental_option('androidPackage', self._flags.android_package) self._logger.debug('Will use Chrome on Android') elif self._flags.chrome_exec: chrome_options.binary_location = self._flags.chrome_exec self._logger.info('Using the Chrome binary at this path: %s', self._flags.chrome_exec) self._logger.debug('ChromeOptions will be parsed into these capabilities: ' '%s', json.dumps(chrome_options.to_capabilities())) driver = webdriver.Chrome(executable_path=self._flags.chrome_driver, desired_capabilities=capabilities, chrome_options=chrome_options) driver.command_executor._commands.update({ 'getAvailableLogTypes': ('GET', '/session/$sessionId/log/types'), 'getLog': ('POST', '/session/$sessionId/log')}) self._driver = driver def _StopDriver(self): """Nicely stops the ChromeDriver. """ self._logger.debug('Stopping ChromeDriver') self._driver.quit() self._driver = None def AddChromeArgs(self, args): """Adds multiple arguments that will be passed to Chromium at start. Args: args: An iterable of strings, each an argument to pass to Chrome at start. """ for arg in args: self._chrome_args.add(arg) def AddChromeArg(self, arg): """Adds a single argument that will be passed to Chromium at start. Args: arg: a string argument to pass to Chrome at start """ self._chrome_args.add(arg) self._logger.debug('Adding Chrome arg: %s', arg) def RemoveChromeArgs(self, args): """Removes multiple arguments that will no longer be passed to Chromium at start. Args: args: An iterable of strings to no longer use the next time Chrome starts. """ for arg in args: self._chrome_args.discard(arg) def RemoveChromeArg(self, arg): """Removes a single argument that will no longer be passed to Chromium at start. Args: arg: A string flag to no longer use the next time Chrome starts. """ self._chrome_args.discard(arg) self._logger.debug('Removing Chrome arg: %s', arg) def ClearChromeArgs(self): """Removes all arguments from Chromium at start. """ self._chrome_args.clear() self._logger.debug('Clearing all Chrome args') def ClearCache(self): """Clears the browser cache. Important note: ChromeDriver automatically starts a clean copy of Chrome on every instantiation. """ res = self.ExecuteJavascript('if(window.chrome && chrome.benchmarking && ' 'chrome.benchmarking.clearCache){chrome.benchmarking.clearCache(); ' 'chrome.benchmarking.clearPredictorCache();chrome.benchmarking.' 'clearHostResolverCache();}') self._logger.info('Cleared browser cache. Returned=%s', str(res)) def LoadURL(self, url, timeout=30): """Starts Chromium with any arguments previously given and navigates to the given URL. Args: url: The URL to navigate to. timeout: The time in seconds to load the page before timing out. """ self._url = url if (len(urlparse.urlparse(url).netloc) == 0 and len(urlparse.urlparse(url).scheme) == 0): self._logger.warn('Invalid URL: "%s". Did you forget to prepend ' '"http://"? See RFC 1808 for more information', url) if not self._driver: self._StartDriver() self._driver.set_page_load_timeout(timeout) self._logger.debug('Set page load timeout to %f seconds', timeout) self._driver.get(self._url) self._logger.debug('Loaded page %s', url) self._has_logs = True def ExecuteJavascript(self, script, timeout=30): """Executes the given javascript in the browser's current page in an anonymous function. If you expect a result and don't get one, try adding a return statement or using ExecuteJavascriptStatement() below. Args: script: A string of Javascript code. timeout: Timeout for the Javascript code to return in seconds. Returns: A string of the verbatim output from the Javascript execution. """ if not self._driver: self._StartDriver() # TODO(robertogden): Use 'driver.set_script_timeout(timeout)' instead after # crbug/672114 is fixed. default_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) self._logger.debug('Set socket timeout to %f seconds', timeout) script_result = self._driver.execute_script(script) self._logger.debug('Executed Javascript in browser: %s', script) socket.setdefaulttimeout(default_timeout) self._logger.debug('Set socket timeout to %s', str(default_timeout)) return script_result def ExecuteJavascriptStatement(self, script, timeout=30): """Wraps ExecuteJavascript() for use with a single statement. Behavior is analogous to 'function(){ return