summaryrefslogtreecommitdiff
path: root/test/testlib/config.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/testlib/config.py')
-rw-r--r--test/testlib/config.py255
1 files changed, 255 insertions, 0 deletions
diff --git a/test/testlib/config.py b/test/testlib/config.py
new file mode 100644
index 000000000..f05cda46d
--- /dev/null
+++ b/test/testlib/config.py
@@ -0,0 +1,255 @@
+import optparse, os, sys, ConfigParser, StringIO
+logging, require = None, None
+
+__all__ = 'parser', 'configure', 'options',
+
+db, db_uri, db_type, db_label = None, None, None, None
+
+options = None
+file_config = None
+
+base_config = """
+[db]
+sqlite=sqlite:///:memory:
+sqlite_file=sqlite:///querytest.db
+postgres=postgres://scott:tiger@127.0.0.1:5432/test
+mysql=mysql://scott:tiger@127.0.0.1:3306/test
+oracle=oracle://scott:tiger@127.0.0.1:1521
+oracle8=oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
+mssql=mssql://scott:tiger@SQUAWK\\SQLEXPRESS/test
+firebird=firebird://sysdba:s@localhost/tmp/test.fdb
+"""
+
+parser = optparse.OptionParser(usage = "usage: %prog [options] [tests...]")
+
+def configure():
+ global options, config
+ global getopts_options, file_config
+
+ file_config = ConfigParser.ConfigParser()
+ file_config.readfp(StringIO.StringIO(base_config))
+ file_config.read(['test.cfg', os.path.expanduser('~/.satest.cfg')])
+
+ # Opt parsing can fire immediate actions, like logging and coverage
+ (options, args) = parser.parse_args()
+ sys.argv[1:] = args
+
+ # Lazy setup of other options (post coverage)
+ for fn in post_configure:
+ fn(options, file_config)
+
+ return options, file_config
+
+def _log(option, opt_str, value, parser):
+ global logging
+ if not logging:
+ import logging
+ logging.basicConfig()
+
+ if opt_str.endswith('-info'):
+ logging.getLogger(value).setLevel(logging.INFO)
+ elif opt_str.endswith('-debug'):
+ logging.getLogger(value).setLevel(logging.DEBUG)
+
+def _start_coverage(option, opt_str, value, parser):
+ import sys, atexit, coverage
+ true_out = sys.stdout
+
+ def _iter_covered_files():
+ import sqlalchemy
+ for rec in os.walk(os.path.dirname(sqlalchemy.__file__)):
+ for x in rec[2]:
+ if x.endswith('.py'):
+ yield os.path.join(rec[0], x)
+ def _stop():
+ coverage.stop()
+ true_out.write("\nPreparing coverage report...\n")
+ coverage.report(list(_iter_covered_files()),
+ show_missing=False, ignore_errors=False,
+ file=true_out)
+ atexit.register(_stop)
+ coverage.erase()
+ coverage.start()
+
+def _list_dbs(*args):
+ print "Available --db options (use --dburi to override)"
+ for macro in sorted(file_config.options('db')):
+ print "%20s\t%s" % (macro, file_config.get('db', macro))
+ sys.exit(0)
+
+opt = parser.add_option
+opt("--verbose", action="store_true", dest="verbose",
+ help="enable stdout echoing/printing")
+opt("--quiet", action="store_true", dest="quiet", help="suppress output")
+opt("--log-info", action="callback", type="string", callback=_log,
+ help="turn on info logging for <LOG> (multiple OK)")
+opt("--log-debug", action="callback", type="string", callback=_log,
+ help="turn on debug logging for <LOG> (multiple OK)")
+opt("--require", action="append", dest="require", default=[],
+ help="require a particular driver or module version (multiple OK)")
+opt("--db", action="store", dest="db", default="sqlite",
+ help="Use prefab database uri")
+opt('--dbs', action='callback', callback=_list_dbs,
+ help="List available prefab dbs")
+opt("--dburi", action="store", dest="dburi",
+ help="Database uri (overrides --db)")
+opt("--mockpool", action="store_true", dest="mockpool",
+ help="Use mock pool (asserts only one connection used)")
+opt("--enginestrategy", action="store", dest="enginestrategy", default=None,
+ help="Engine strategy (plain or threadlocal, defaults toplain)")
+opt("--reversetop", action="store_true", dest="reversetop", default=False,
+ help="Reverse the collection ordering for topological sorts (helps "
+ "reveal dependency issues)")
+opt("--serverside", action="store_true", dest="serverside",
+ help="Turn on server side cursors for PG")
+opt("--mysql-engine", action="store", dest="mysql_engine", default=None,
+ help="Use the specified MySQL storage engine for all tables, default is "
+ "a db-default/InnoDB combo.")
+opt("--table-option", action="append", dest="tableopts", default=[],
+ help="Add a dialect-specific table option, key=value")
+opt("--coverage", action="callback", callback=_start_coverage,
+ help="Dump a full coverage report after running tests")
+opt("--profile", action="append", dest="profile_targets", default=[],
+ help="Enable a named profile target (multiple OK.)")
+opt("--profile-sort", action="store", dest="profile_sort", default=None,
+ help="Sort profile stats with this comma-separated sort order")
+opt("--profile-limit", type="int", action="store", dest="profile_limit",
+ default=None,
+ help="Limit function count in profile stats")
+
+class _ordered_map(object):
+ def __init__(self):
+ self._keys = list()
+ self._data = dict()
+
+ def __setitem__(self, key, value):
+ if key not in self._keys:
+ self._keys.append(key)
+ self._data[key] = value
+
+ def __iter__(self):
+ for key in self._keys:
+ yield self._data[key]
+
+post_configure = _ordered_map()
+
+def _engine_uri(options, file_config):
+ global db_label, db_uri
+ db_label = 'sqlite'
+ if options.dburi:
+ db_uri = options.dburi
+ db_label = db_uri[:db_uri.index(':')]
+ elif options.db:
+ db_label = options.db
+ db_uri = None
+
+ if db_uri is None:
+ if db_label not in file_config.options('db'):
+ raise RuntimeError(
+ "Unknown engine. Specify --dbs for known engines.")
+ db_uri = file_config.get('db', db_label)
+post_configure['engine_uri'] = _engine_uri
+
+def _require(options, file_config):
+ if not(options.require or
+ (file_config.has_section('require') and
+ file_config.items('require'))):
+ return
+
+ try:
+ import pkg_resources
+ except ImportError:
+ raise RuntimeError("setuptools is required for version requirements")
+
+ cmdline = []
+ for requirement in options.require:
+ pkg_resources.require(requirement)
+ cmdline.append(re.split('\s*(<!>=)', requirement, 1)[0])
+
+ if file_config.has_section('require'):
+ for label, requirement in file_config.items('require'):
+ if not label == db_label or label.startswith('%s.' % db_label):
+ continue
+ seen = [c for c in cmdline if requirement.startswith(c)]
+ if seen:
+ continue
+ pkg_resources.require(requirement)
+post_configure['require'] = _require
+
+def _create_testing_engine(options, file_config):
+ from sqlalchemy import engine
+ global db, db_type
+ engine_opts = {}
+ if options.serverside:
+ engine_opts['server_side_cursors'] = True
+
+ if options.enginestrategy is not None:
+ engine_opts['strategy'] = options.enginestrategy
+
+ if options.mockpool:
+ db = engine.create_engine(db_uri, poolclass=pool.AssertionPool,
+ **engine_opts)
+ else:
+ db = engine.create_engine(db_uri, **engine_opts)
+ db_type = db.name
+
+ # decorate the dialect's create_execution_context() method
+ # to produce a wrapper
+ from testlib.testing import ExecutionContextWrapper
+
+ create_context = db.dialect.create_execution_context
+ def create_exec_context(*args, **kwargs):
+ return ExecutionContextWrapper(create_context(*args, **kwargs))
+ db.dialect.create_execution_context = create_exec_context
+post_configure['create_engine'] = _create_testing_engine
+
+def _set_table_options(options, file_config):
+ import testlib.schema
+
+ table_options = testlib.schema.table_options
+ for spec in options.tableopts:
+ key, value = spec.split('=')
+ table_options[key] = value
+
+ if options.mysql_engine:
+ table_options['mysql_engine'] = options.mysql_engine
+post_configure['table_options'] = _set_table_options
+
+def _reverse_topological(options, file_config):
+ if options.reversetop:
+ from sqlalchemy.orm import unitofwork
+ from sqlalchemy import topological
+ class RevQueueDepSort(topological.QueueDependencySorter):
+ def __init__(self, tuples, allitems):
+ self.tuples = list(tuples)
+ self.allitems = list(allitems)
+ self.tuples.reverse()
+ self.allitems.reverse()
+ topological.QueueDependencySorter = RevQueueDepSort
+ unitofwork.DependencySorter = RevQueueDepSort
+post_configure['topological'] = _reverse_topological
+
+def _set_profile_targets(options, file_config):
+ from testlib import profiling
+
+ profile_config = profiling.profile_config
+
+ for target in options.profile_targets:
+ profile_config['targets'].add(target)
+
+ if options.profile_sort:
+ profile_config['sort'] = options.profile_sort.split(',')
+
+ if options.profile_limit:
+ profile_config['limit'] = options.profile_limit
+
+ if options.quiet:
+ profile_config['report'] = False
+
+ # magic "all" target
+ if 'all' in profiling.all_targets:
+ targets = profile_config['targets']
+ if 'all' in targets and len(targets) != 1:
+ targets.clear()
+ targets.add('all')
+post_configure['profile_targets'] = _set_profile_targets