summaryrefslogtreecommitdiff
path: root/test/lib/testing.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-09-26 17:21:21 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-09-26 17:21:21 -0400
commit1a7778632d8039bd687e23522ce6c027e859d487 (patch)
tree1ea58d81c25db7d65f6244d1e12344c1ef8dd97f /test/lib/testing.py
parent0895e34d21c9818ec73e5c87e35ad6ba5c05acbd (diff)
downloadsqlalchemy-1a7778632d8039bd687e23522ce6c027e859d487.tar.gz
- further reorganization of test suite:
- bootstrap and lib move to all absolute imports - testing.py is no longer internally referenced. - requirements move to be a pluggable class which can be overridden. - cleanup in the interests of third party testing, test/lib and test/bootstrap may move to be an independent package.
Diffstat (limited to 'test/lib/testing.py')
-rw-r--r--test/lib/testing.py452
1 files changed, 12 insertions, 440 deletions
diff --git a/test/lib/testing.py b/test/lib/testing.py
index 1de446f91..f244dfecc 100644
--- a/test/lib/testing.py
+++ b/test/lib/testing.py
@@ -1,453 +1,25 @@
-"""TestCase and TestSuite artifacts and testing decorators."""
+from __future__ import absolute_import
-import itertools
-import re
-import sys
-import types
-import warnings
-from cStringIO import StringIO
+from .warnings import testing_warn, assert_warnings, resetwarnings
-from test.bootstrap import config
-from test.lib import assertsql, util as testutil
+from ..bootstrap import config
+from . import assertsql, util as testutil
from sqlalchemy.util import decorator
-from engines import drop_all_tables
-from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema, \
- pool, orm
-from sqlalchemy.engine import default
-from exclusions import db_spec, _is_excluded, fails_if, skip_if, future,\
+from .exclusions import db_spec, _is_excluded, fails_if, skip_if, future,\
fails_on, fails_on_everything_except, skip, only_on, exclude, against,\
_server_version
+from .assertions import emits_warning, emits_warning_on, uses_deprecated, \
+ eq_, ne_, is_, is_not_, startswith_, assert_raises, \
+ assert_raises_message, AssertsCompiledSQL, ComparesTables, AssertsExecutionResults
+
+from .util import run_as_contextmanager, rowset, fail, provide_metadata, adict
+
crashes = skip
-# sugar ('testing.db'); set here by config() at runtime
+# various sugar installed by config.py
db = None
-
-# more sugar, installed by __init__
requires = None
-def emits_warning(*messages):
- """Mark a test as emitting a warning.
-
- With no arguments, squelches all SAWarning failures. Or pass one or more
- strings; these will be matched to the root of the warning description by
- warnings.filterwarnings().
- """
- # TODO: it would be nice to assert that a named warning was
- # emitted. should work with some monkeypatching of warnings,
- # and may work on non-CPython if they keep to the spirit of
- # warnings.showwarning's docstring.
- # - update: jython looks ok, it uses cpython's module
-
- @decorator
- def decorate(fn, *args, **kw):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SAWarning))
- else:
- filters.extend(dict(action='ignore',
- message=message,
- category=sa_exc.SAWarning)
- for message in messages)
- for f in filters:
- warnings.filterwarnings(**f)
- try:
- return fn(*args, **kw)
- finally:
- resetwarnings()
- return decorate
-
-def emits_warning_on(db, *warnings):
- """Mark a test as emitting a warning on a specific dialect.
-
- With no arguments, squelches all SAWarning failures. Or pass one or more
- strings; these will be matched to the root of the warning description by
- warnings.filterwarnings().
- """
- spec = db_spec(db)
-
- @decorator
- def decorate(fn, *args, **kw):
- if isinstance(db, basestring):
- if not spec(config.db):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
- else:
- if not _is_excluded(*db):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
- return decorate
-
-def assert_warnings(fn, warnings):
- """Assert that each of the given warnings are emitted by fn."""
-
- canary = []
- orig_warn = util.warn
- def capture_warnings(*args, **kw):
- orig_warn(*args, **kw)
- popwarn = warnings.pop(0)
- canary.append(popwarn)
- eq_(args[0], popwarn)
- util.warn = util.langhelpers.warn = capture_warnings
-
- result = emits_warning()(fn)()
- assert canary, "No warning was emitted"
- return result
-
-def uses_deprecated(*messages):
- """Mark a test as immune from fatal deprecation warnings.
-
- With no arguments, squelches all SADeprecationWarning failures.
- Or pass one or more strings; these will be matched to the root
- of the warning description by warnings.filterwarnings().
-
- As a special case, you may pass a function name prefixed with //
- and it will be re-written as needed to match the standard warning
- verbiage emitted by the sqlalchemy.util.deprecated decorator.
- """
-
- @decorator
- def decorate(fn, *args, **kw):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SADeprecationWarning))
- else:
- filters.extend(
- [dict(action='ignore',
- message=message,
- category=sa_exc.SADeprecationWarning)
- for message in
- [ (m.startswith('//') and
- ('Call to deprecated function ' + m[2:]) or m)
- for m in messages] ])
-
- for f in filters:
- warnings.filterwarnings(**f)
- try:
- return fn(*args, **kw)
- finally:
- resetwarnings()
- return decorate
-
-def testing_warn(msg, stacklevel=3):
- """Replaces sqlalchemy.util.warn during tests."""
-
- filename = "test.lib.testing"
- lineno = 1
- if isinstance(msg, basestring):
- warnings.warn_explicit(msg, sa_exc.SAWarning, filename, lineno)
- else:
- warnings.warn_explicit(msg, filename, lineno)
-
-def resetwarnings():
- """Reset warning behavior to testing defaults."""
-
- util.warn = util.langhelpers.warn = testing_warn
-
- warnings.filterwarnings('ignore',
- category=sa_exc.SAPendingDeprecationWarning)
- warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning)
- warnings.filterwarnings('error', category=sa_exc.SAWarning)
-
-
-def global_cleanup_assertions():
- """Check things that have to be finalized at the end of a test suite.
-
- Hardcoded at the moment, a modular system can be built here
- to support things like PG prepared transactions, tables all
- dropped, etc.
-
- """
-
- testutil.lazy_gc()
- assert not pool._refs, str(pool._refs)
-
-
-def run_as_contextmanager(ctx, fn, *arg, **kw):
- """Run the given function under the given contextmanager,
- simulating the behavior of 'with' to support older
- Python versions.
-
- """
-
- obj = ctx.__enter__()
- try:
- result = fn(obj, *arg, **kw)
- ctx.__exit__(None, None, None)
- return result
- except:
- exc_info = sys.exc_info()
- raise_ = ctx.__exit__(*exc_info)
- if raise_ is None:
- raise
- else:
- return raise_
-
-def rowset(results):
- """Converts the results of sql execution into a plain set of column tuples.
-
- Useful for asserting the results of an unordered query.
- """
-
- return set([tuple(row) for row in results])
-
-
-def eq_(a, b, msg=None):
- """Assert a == b, with repr messaging on failure."""
- assert a == b, msg or "%r != %r" % (a, b)
-
-def ne_(a, b, msg=None):
- """Assert a != b, with repr messaging on failure."""
- assert a != b, msg or "%r == %r" % (a, b)
-
-def is_(a, b, msg=None):
- """Assert a is b, with repr messaging on failure."""
- assert a is b, msg or "%r is not %r" % (a, b)
-
-def is_not_(a, b, msg=None):
- """Assert a is not b, with repr messaging on failure."""
- assert a is not b, msg or "%r is %r" % (a, b)
-
-def startswith_(a, fragment, msg=None):
- """Assert a.startswith(fragment), with repr messaging on failure."""
- assert a.startswith(fragment), msg or "%r does not start with %r" % (
- a, fragment)
-
-def assert_raises(except_cls, callable_, *args, **kw):
- try:
- callable_(*args, **kw)
- success = False
- except except_cls, e:
- success = True
-
- # assert outside the block so it works for AssertionError too !
- assert success, "Callable did not raise an exception"
-
-def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
- try:
- callable_(*args, **kwargs)
- assert False, "Callable did not raise an exception"
- except except_cls, e:
- assert re.search(msg, unicode(e), re.UNICODE), u"%r !~ %s" % (msg, e)
- print unicode(e).encode('utf-8')
-
-def fail(msg):
- assert False, msg
-
-
-@decorator
-def provide_metadata(fn, *args, **kw):
- """Provide bound MetaData for a single test, dropping afterwards."""
-
- metadata = schema.MetaData(db)
- self = args[0]
- prev_meta = getattr(self, 'metadata', None)
- self.metadata = metadata
- try:
- return fn(*args, **kw)
- finally:
- metadata.drop_all()
- self.metadata = prev_meta
-
-class adict(dict):
- """Dict keys available as attributes. Shadows."""
- def __getattribute__(self, key):
- try:
- return self[key]
- except KeyError:
- return dict.__getattribute__(self, key)
-
- def get_all(self, *keys):
- return tuple([self[key] for key in keys])
-
-
-class AssertsCompiledSQL(object):
- def assert_compile(self, clause, result, params=None,
- checkparams=None, dialect=None,
- checkpositional=None,
- use_default_dialect=False,
- allow_dialect_select=False):
- if use_default_dialect:
- dialect = default.DefaultDialect()
- elif dialect == None and not allow_dialect_select:
- dialect = getattr(self, '__dialect__', None)
- if dialect == 'default':
- dialect = default.DefaultDialect()
- elif dialect is None:
- dialect = db.dialect
-
- kw = {}
- if params is not None:
- kw['column_keys'] = params.keys()
-
- if isinstance(clause, orm.Query):
- context = clause._compile_context()
- context.statement.use_labels = True
- clause = context.statement
-
- c = clause.compile(dialect=dialect, **kw)
-
- param_str = repr(getattr(c, 'params', {}))
- # Py3K
- #param_str = param_str.encode('utf-8').decode('ascii', 'ignore')
-
- print "\nSQL String:\n" + str(c) + param_str
-
- cc = re.sub(r'[\n\t]', '', str(c))
-
- eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect))
-
- if checkparams is not None:
- eq_(c.construct_params(params), checkparams)
- if checkpositional is not None:
- p = c.construct_params(params)
- eq_(tuple([p[x] for x in c.positiontup]), checkpositional)
-
-class ComparesTables(object):
- def assert_tables_equal(self, table, reflected_table, strict_types=False):
- assert len(table.c) == len(reflected_table.c)
- for c, reflected_c in zip(table.c, reflected_table.c):
- eq_(c.name, reflected_c.name)
- assert reflected_c is reflected_table.c[c.name]
- eq_(c.primary_key, reflected_c.primary_key)
- eq_(c.nullable, reflected_c.nullable)
-
- if strict_types:
- assert type(reflected_c.type) is type(c.type), \
- "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type)
- else:
- self.assert_types_base(reflected_c, c)
-
- if isinstance(c.type, sqltypes.String):
- eq_(c.type.length, reflected_c.type.length)
-
- eq_(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]))
- if c.server_default:
- assert isinstance(reflected_c.server_default,
- schema.FetchedValue)
-
- assert len(table.primary_key) == len(reflected_table.primary_key)
- for c in table.primary_key:
- assert reflected_table.primary_key.columns[c.name] is not None
-
- def assert_types_base(self, c1, c2):
- assert c1.type._compare_type_affinity(c2.type),\
- "On column %r, type '%s' doesn't correspond to type '%s'" % \
- (c1.name, c1.type, c2.type)
-
-class AssertsExecutionResults(object):
- def assert_result(self, result, class_, *objects):
- result = list(result)
- print repr(result)
- self.assert_list(result, class_, objects)
-
- def assert_list(self, result, class_, list):
- self.assert_(len(result) == len(list),
- "result list is not the same size as test list, " +
- "for class " + class_.__name__)
- for i in range(0, len(list)):
- self.assert_row(class_, result[i], list[i])
-
- def assert_row(self, class_, rowobj, desc):
- self.assert_(rowobj.__class__ is class_,
- "item class is not " + repr(class_))
- for key, value in desc.iteritems():
- if isinstance(value, tuple):
- if isinstance(value[1], list):
- self.assert_list(getattr(rowobj, key), value[0], value[1])
- else:
- self.assert_row(value[0], getattr(rowobj, key), value[1])
- else:
- self.assert_(getattr(rowobj, key) == value,
- "attribute %s value %s does not match %s" % (
- key, getattr(rowobj, key), value))
-
- def assert_unordered_result(self, result, cls, *expected):
- """As assert_result, but the order of objects is not considered.
-
- The algorithm is very expensive but not a big deal for the small
- numbers of rows that the test suite manipulates.
- """
-
- class immutabledict(dict):
- def __hash__(self):
- return id(self)
-
- found = util.IdentitySet(result)
- expected = set([immutabledict(e) for e in expected])
-
- for wrong in itertools.ifilterfalse(lambda o: type(o) == cls, found):
- fail('Unexpected type "%s", expected "%s"' % (
- type(wrong).__name__, cls.__name__))
-
- if len(found) != len(expected):
- fail('Unexpected object count "%s", expected "%s"' % (
- len(found), len(expected)))
-
- NOVALUE = object()
- def _compare_item(obj, spec):
- for key, value in spec.iteritems():
- if isinstance(value, tuple):
- try:
- self.assert_unordered_result(
- getattr(obj, key), value[0], *value[1])
- except AssertionError:
- return False
- else:
- if getattr(obj, key, NOVALUE) != value:
- return False
- return True
-
- for expected_item in expected:
- for found_item in found:
- if _compare_item(found_item, expected_item):
- found.remove(found_item)
- break
- else:
- fail(
- "Expected %s instance with attributes %s not found." % (
- cls.__name__, repr(expected_item)))
- return True
-
- def assert_sql_execution(self, db, callable_, *rules):
- assertsql.asserter.add_rules(rules)
- try:
- callable_()
- assertsql.asserter.statement_complete()
- finally:
- assertsql.asserter.clear_rules()
-
- def assert_sql(self, db, callable_, list_, with_sequences=None):
- if with_sequences is not None and config.db.name in ('firebird', 'oracle', 'postgresql'):
- rules = with_sequences
- else:
- rules = list_
-
- newrules = []
- for rule in rules:
- if isinstance(rule, dict):
- newrule = assertsql.AllOf(*[
- assertsql.ExactSQL(k, v) for k, v in rule.iteritems()
- ])
- else:
- newrule = assertsql.ExactSQL(*rule)
- newrules.append(newrule)
-
- self.assert_sql_execution(db, callable_, *newrules)
-
- def assert_sql_count(self, db, callable_, count):
- self.assert_sql_execution(db, callable_, assertsql.CountStatements(count))
-
-