diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-15 19:37:50 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-15 19:37:50 -0500 |
commit | e1402efb198f96090833a9b561cdba8dee937f70 (patch) | |
tree | c20dc33466476bbf394e8e19da7a045c8a008d08 /lib/sqlalchemy/test | |
parent | 756aa2724e495b8a969bca73d133b27615a343e7 (diff) | |
download | sqlalchemy-e1402efb198f96090833a9b561cdba8dee937f70.tar.gz |
- move sqlalchemy.test to test.lib
Diffstat (limited to 'lib/sqlalchemy/test')
-rw-r--r-- | lib/sqlalchemy/test/__init__.py | 27 | ||||
-rw-r--r-- | lib/sqlalchemy/test/assertsql.py | 295 | ||||
-rw-r--r-- | lib/sqlalchemy/test/engines.py | 305 | ||||
-rw-r--r-- | lib/sqlalchemy/test/entities.py | 83 | ||||
-rw-r--r-- | lib/sqlalchemy/test/orm.py | 111 | ||||
-rw-r--r-- | lib/sqlalchemy/test/pickleable.py | 75 | ||||
-rw-r--r-- | lib/sqlalchemy/test/profiling.py | 221 | ||||
-rw-r--r-- | lib/sqlalchemy/test/requires.py | 318 | ||||
-rw-r--r-- | lib/sqlalchemy/test/schema.py | 79 | ||||
-rw-r--r-- | lib/sqlalchemy/test/testing.py | 797 | ||||
-rw-r--r-- | lib/sqlalchemy/test/util.py | 76 |
11 files changed, 0 insertions, 2387 deletions
diff --git a/lib/sqlalchemy/test/__init__.py b/lib/sqlalchemy/test/__init__.py deleted file mode 100644 index e82b3ac91..000000000 --- a/lib/sqlalchemy/test/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Testing environment and utilities. - -This package contains base classes and routines used by -the unit tests. Tests are based on Nose and bootstrapped -by noseplugin.NoseSQLAlchemy. - -""" - -from test.bootstrap import config -from sqlalchemy.test import testing, engines, requires, profiling, pickleable -from sqlalchemy.test.schema import Column, Table -from sqlalchemy.test.testing import \ - AssertsCompiledSQL, \ - AssertsExecutionResults, \ - ComparesTables, \ - TestBase, \ - rowset - - -__all__ = ('testing', - 'Column', 'Table', - 'rowset', - 'TestBase', 'AssertsExecutionResults', - 'AssertsCompiledSQL', 'ComparesTables', - 'engines', 'profiling', 'pickleable') - - diff --git a/lib/sqlalchemy/test/assertsql.py b/lib/sqlalchemy/test/assertsql.py deleted file mode 100644 index a044f9d02..000000000 --- a/lib/sqlalchemy/test/assertsql.py +++ /dev/null @@ -1,295 +0,0 @@ - -from sqlalchemy.interfaces import ConnectionProxy -from sqlalchemy.engine.default import DefaultDialect -from sqlalchemy.engine.base import Connection -from sqlalchemy import util -import re - -class AssertRule(object): - def process_execute(self, clauseelement, *multiparams, **params): - pass - - def process_cursor_execute(self, statement, parameters, context, executemany): - pass - - def is_consumed(self): - """Return True if this rule has been consumed, False if not. - - Should raise an AssertionError if this rule's condition has definitely failed. - - """ - raise NotImplementedError() - - def rule_passed(self): - """Return True if the last test of this rule passed, False if failed, None if no test was applied.""" - - raise NotImplementedError() - - def consume_final(self): - """Return True if this rule has been consumed. - - Should raise an AssertionError if this rule's condition has not been consumed or has failed. - - """ - - if self._result is None: - assert False, "Rule has not been consumed" - - return self.is_consumed() - -class SQLMatchRule(AssertRule): - def __init__(self): - self._result = None - self._errmsg = "" - - def rule_passed(self): - return self._result - - def is_consumed(self): - if self._result is None: - return False - - assert self._result, self._errmsg - - return True - -class ExactSQL(SQLMatchRule): - def __init__(self, sql, params=None): - SQLMatchRule.__init__(self) - self.sql = sql - self.params = params - - def process_cursor_execute(self, statement, parameters, context, executemany): - if not context: - return - - _received_statement = _process_engine_statement(context.unicode_statement, context) - _received_parameters = context.compiled_parameters - - # TODO: remove this step once all unit tests - # are migrated, as ExactSQL should really be *exact* SQL - sql = _process_assertion_statement(self.sql, context) - - equivalent = _received_statement == sql - if self.params: - if util.callable(self.params): - params = self.params(context) - else: - params = self.params - - if not isinstance(params, list): - params = [params] - equivalent = equivalent and params == context.compiled_parameters - else: - params = {} - - - self._result = equivalent - if not self._result: - self._errmsg = "Testing for exact statement %r exact params %r, " \ - "received %r with params %r" % (sql, params, _received_statement, _received_parameters) - - -class RegexSQL(SQLMatchRule): - def __init__(self, regex, params=None): - SQLMatchRule.__init__(self) - self.regex = re.compile(regex) - self.orig_regex = regex - self.params = params - - def process_cursor_execute(self, statement, parameters, context, executemany): - if not context: - return - - _received_statement = _process_engine_statement(context.unicode_statement, context) - _received_parameters = context.compiled_parameters - - equivalent = bool(self.regex.match(_received_statement)) - if self.params: - if util.callable(self.params): - params = self.params(context) - else: - params = self.params - - if not isinstance(params, list): - params = [params] - - # do a positive compare only - for param, received in zip(params, _received_parameters): - for k, v in param.iteritems(): - if k not in received or received[k] != v: - equivalent = False - break - else: - params = {} - - self._result = equivalent - if not self._result: - self._errmsg = "Testing for regex %r partial params %r, "\ - "received %r with params %r" % (self.orig_regex, params, _received_statement, _received_parameters) - -class CompiledSQL(SQLMatchRule): - def __init__(self, statement, params): - SQLMatchRule.__init__(self) - self.statement = statement - self.params = params - - def process_cursor_execute(self, statement, parameters, context, executemany): - if not context: - return - - _received_parameters = list(context.compiled_parameters) - - # recompile from the context, using the default dialect - compiled = context.compiled.statement.\ - compile(dialect=DefaultDialect(), column_keys=context.compiled.column_keys) - - _received_statement = re.sub(r'\n', '', str(compiled)) - - equivalent = self.statement == _received_statement - if self.params: - if util.callable(self.params): - params = self.params(context) - else: - params = self.params - - if not isinstance(params, list): - params = [params] - - all_params = list(params) - all_received = list(_received_parameters) - while params: - param = dict(params.pop(0)) - for k, v in context.compiled.params.iteritems(): - param.setdefault(k, v) - - if param not in _received_parameters: - equivalent = False - break - else: - _received_parameters.remove(param) - if _received_parameters: - equivalent = False - else: - params = {} - - self._result = equivalent - if not self._result: - self._errmsg = "Testing for compiled statement %r partial params %r, " \ - "received %r with params %r" % \ - (self.statement, all_params, _received_statement, all_received) - #print self._errmsg - - -class CountStatements(AssertRule): - def __init__(self, count): - self.count = count - self._statement_count = 0 - - def process_execute(self, clauseelement, *multiparams, **params): - self._statement_count += 1 - - def process_cursor_execute(self, statement, parameters, context, executemany): - pass - - def is_consumed(self): - return False - - def consume_final(self): - assert self.count == self._statement_count, "desired statement count %d does not match %d" % (self.count, self._statement_count) - return True - -class AllOf(AssertRule): - def __init__(self, *rules): - self.rules = set(rules) - - def process_execute(self, clauseelement, *multiparams, **params): - for rule in self.rules: - rule.process_execute(clauseelement, *multiparams, **params) - - def process_cursor_execute(self, statement, parameters, context, executemany): - for rule in self.rules: - rule.process_cursor_execute(statement, parameters, context, executemany) - - def is_consumed(self): - if not self.rules: - return True - - for rule in list(self.rules): - if rule.rule_passed(): # a rule passed, move on - self.rules.remove(rule) - return len(self.rules) == 0 - - assert False, "No assertion rules were satisfied for statement" - - def consume_final(self): - return len(self.rules) == 0 - -def _process_engine_statement(query, context): - if util.jython: - # oracle+zxjdbc passes a PyStatement when returning into - query = unicode(query) - if context.engine.name == 'mssql' and query.endswith('; select scope_identity()'): - query = query[:-25] - - query = re.sub(r'\n', '', query) - - return query - -def _process_assertion_statement(query, context): - paramstyle = context.dialect.paramstyle - if paramstyle == 'named': - pass - elif paramstyle =='pyformat': - query = re.sub(r':([\w_]+)', r"%(\1)s", query) - else: - # positional params - repl = None - if paramstyle=='qmark': - repl = "?" - elif paramstyle=='format': - repl = r"%s" - elif paramstyle=='numeric': - repl = None - query = re.sub(r':([\w_]+)', repl, query) - - return query - -class SQLAssert(ConnectionProxy): - rules = None - - def add_rules(self, rules): - self.rules = list(rules) - - def statement_complete(self): - for rule in self.rules: - if not rule.consume_final(): - assert False, "All statements are complete, but pending assertion rules remain" - - def clear_rules(self): - del self.rules - - def execute(self, conn, execute, clauseelement, *multiparams, **params): - result = execute(clauseelement, *multiparams, **params) - - if self.rules is not None: - if not self.rules: - assert False, "All rules have been exhausted, but further statements remain" - rule = self.rules[0] - rule.process_execute(clauseelement, *multiparams, **params) - if rule.is_consumed(): - self.rules.pop(0) - - return result - - def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): - result = execute(cursor, statement, parameters, context) - - if self.rules: - rule = self.rules[0] - rule.process_cursor_execute(statement, parameters, context, executemany) - - return result - -asserter = SQLAssert() - diff --git a/lib/sqlalchemy/test/engines.py b/lib/sqlalchemy/test/engines.py deleted file mode 100644 index d18b8c8cf..000000000 --- a/lib/sqlalchemy/test/engines.py +++ /dev/null @@ -1,305 +0,0 @@ -import sys, types, weakref -from collections import deque -from test.bootstrap import config -from sqlalchemy.util import function_named, callable -import re -import warnings - -class ConnectionKiller(object): - def __init__(self): - self.proxy_refs = weakref.WeakKeyDictionary() - - def checkout(self, dbapi_con, con_record, con_proxy): - self.proxy_refs[con_proxy] = True - - def _apply_all(self, methods): - # must copy keys atomically - for rec in self.proxy_refs.keys(): - if rec is not None and rec.is_valid: - try: - for name in methods: - if callable(name): - name(rec) - else: - getattr(rec, name)() - except (SystemExit, KeyboardInterrupt): - raise - except Exception, e: - warnings.warn("testing_reaper couldn't close connection: %s" % e) - - def rollback_all(self): - self._apply_all(('rollback',)) - - def close_all(self): - self._apply_all(('rollback', 'close')) - - def assert_all_closed(self): - for rec in self.proxy_refs: - if rec.is_valid: - assert False - -testing_reaper = ConnectionKiller() - -def drop_all_tables(metadata): - testing_reaper.close_all() - metadata.drop_all() - -def assert_conns_closed(fn): - def decorated(*args, **kw): - try: - fn(*args, **kw) - finally: - testing_reaper.assert_all_closed() - return function_named(decorated, fn.__name__) - -def rollback_open_connections(fn): - """Decorator that rolls back all open connections after fn execution.""" - - def decorated(*args, **kw): - try: - fn(*args, **kw) - finally: - testing_reaper.rollback_all() - return function_named(decorated, fn.__name__) - -def close_first(fn): - """Decorator that closes all connections before fn execution.""" - def decorated(*args, **kw): - testing_reaper.close_all() - fn(*args, **kw) - return function_named(decorated, fn.__name__) - - -def close_open_connections(fn): - """Decorator that closes all connections after fn execution.""" - - def decorated(*args, **kw): - try: - fn(*args, **kw) - finally: - testing_reaper.close_all() - return function_named(decorated, fn.__name__) - -def all_dialects(exclude=None): - import sqlalchemy.databases as d - for name in d.__all__: - # TEMPORARY - if exclude and name in exclude: - continue - mod = getattr(d, name, None) - if not mod: - mod = getattr(__import__('sqlalchemy.databases.%s' % name).databases, name) - yield mod.dialect() - -class ReconnectFixture(object): - def __init__(self, dbapi): - self.dbapi = dbapi - self.connections = [] - - def __getattr__(self, key): - return getattr(self.dbapi, key) - - def connect(self, *args, **kwargs): - conn = self.dbapi.connect(*args, **kwargs) - self.connections.append(conn) - return conn - - def shutdown(self): - # TODO: this doesn't cover all cases - # as nicely as we'd like, namely MySQLdb. - # would need to implement R. Brewer's - # proxy server idea to get better - # coverage. - for c in list(self.connections): - c.close() - self.connections = [] - -def reconnecting_engine(url=None, options=None): - url = url or config.db_url - dbapi = config.db.dialect.dbapi - if not options: - options = {} - options['module'] = ReconnectFixture(dbapi) - engine = testing_engine(url, options) - engine.test_shutdown = engine.dialect.dbapi.shutdown - return engine - -def testing_engine(url=None, options=None): - """Produce an engine configured by --options with optional overrides.""" - - from sqlalchemy import create_engine - from sqlalchemy.test.assertsql import asserter - - url = url or config.db_url - options = options or config.db_opts - - options.setdefault('proxy', asserter) - - listeners = options.setdefault('listeners', []) - listeners.append(testing_reaper) - - engine = create_engine(url, **options) - - # may want to call this, results - # in first-connect initializers - #engine.connect() - - return engine - -def utf8_engine(url=None, options=None): - """Hook for dialects or drivers that don't handle utf8 by default.""" - - from sqlalchemy.engine import url as engine_url - - if config.db.driver == 'mysqldb': - dbapi_ver = config.db.dialect.dbapi.version_info - if (dbapi_ver < (1, 2, 1) or - dbapi_ver in ((1, 2, 1, 'gamma', 1), (1, 2, 1, 'gamma', 2), - (1, 2, 1, 'gamma', 3), (1, 2, 1, 'gamma', 5))): - raise RuntimeError('Character set support unavailable with this ' - 'driver version: %s' % repr(dbapi_ver)) - else: - url = url or config.db_url - url = engine_url.make_url(url) - url.query['charset'] = 'utf8' - url.query['use_unicode'] = '0' - url = str(url) - - return testing_engine(url, options) - -def mock_engine(dialect_name=None): - """Provides a mocking engine based on the current testing.db. - - This is normally used to test DDL generation flow as emitted - by an Engine. - - It should not be used in other cases, as assert_compile() and - assert_sql_execution() are much better choices with fewer - moving parts. - - """ - - from sqlalchemy import create_engine - - if not dialect_name: - dialect_name = config.db.name - - buffer = [] - def executor(sql, *a, **kw): - buffer.append(sql) - def assert_sql(stmts): - recv = [re.sub(r'[\n\t]', '', str(s)) for s in buffer] - assert recv == stmts, recv - - engine = create_engine(dialect_name + '://', - strategy='mock', executor=executor) - assert not hasattr(engine, 'mock') - engine.mock = buffer - engine.assert_sql = assert_sql - return engine - -class ReplayableSession(object): - """A simple record/playback tool. - - This is *not* a mock testing class. It only records a session for later - playback and makes no assertions on call consistency whatsoever. It's - unlikely to be suitable for anything other than DB-API recording. - - """ - - Callable = object() - NoAttribute = object() - Natives = set([getattr(types, t) - for t in dir(types) if not t.startswith('_')]). \ - difference([getattr(types, t) - # Py3K - #for t in ('FunctionType', 'BuiltinFunctionType', - # 'MethodType', 'BuiltinMethodType', - # 'LambdaType', )]) - - # Py2K - for t in ('FunctionType', 'BuiltinFunctionType', - 'MethodType', 'BuiltinMethodType', - 'LambdaType', 'UnboundMethodType',)]) - # end Py2K - def __init__(self): - self.buffer = deque() - - def recorder(self, base): - return self.Recorder(self.buffer, base) - - def player(self): - return self.Player(self.buffer) - - class Recorder(object): - def __init__(self, buffer, subject): - self._buffer = buffer - self._subject = subject - - def __call__(self, *args, **kw): - subject, buffer = [object.__getattribute__(self, x) - for x in ('_subject', '_buffer')] - - result = subject(*args, **kw) - if type(result) not in ReplayableSession.Natives: - buffer.append(ReplayableSession.Callable) - return type(self)(buffer, result) - else: - buffer.append(result) - return result - - @property - def _sqla_unwrap(self): - return self._subject - - def __getattribute__(self, key): - try: - return object.__getattribute__(self, key) - except AttributeError: - pass - - subject, buffer = [object.__getattribute__(self, x) - for x in ('_subject', '_buffer')] - try: - result = type(subject).__getattribute__(subject, key) - except AttributeError: - buffer.append(ReplayableSession.NoAttribute) - raise - else: - if type(result) not in ReplayableSession.Natives: - buffer.append(ReplayableSession.Callable) - return type(self)(buffer, result) - else: - buffer.append(result) - return result - - class Player(object): - def __init__(self, buffer): - self._buffer = buffer - - def __call__(self, *args, **kw): - buffer = object.__getattribute__(self, '_buffer') - result = buffer.popleft() - if result is ReplayableSession.Callable: - return self - else: - return result - - @property - def _sqla_unwrap(self): - return None - - def __getattribute__(self, key): - try: - return object.__getattribute__(self, key) - except AttributeError: - pass - buffer = object.__getattribute__(self, '_buffer') - result = buffer.popleft() - if result is ReplayableSession.Callable: - return self - elif result is ReplayableSession.NoAttribute: - raise AttributeError(key) - else: - return result - diff --git a/lib/sqlalchemy/test/entities.py b/lib/sqlalchemy/test/entities.py deleted file mode 100644 index 0ec677eea..000000000 --- a/lib/sqlalchemy/test/entities.py +++ /dev/null @@ -1,83 +0,0 @@ -import sqlalchemy as sa -from sqlalchemy import exc as sa_exc - -_repr_stack = set() -class BasicEntity(object): - def __init__(self, **kw): - for key, value in kw.iteritems(): - setattr(self, key, value) - - def __repr__(self): - if id(self) in _repr_stack: - return object.__repr__(self) - _repr_stack.add(id(self)) - try: - return "%s(%s)" % ( - (self.__class__.__name__), - ', '.join(["%s=%r" % (key, getattr(self, key)) - for key in sorted(self.__dict__.keys()) - if not key.startswith('_')])) - finally: - _repr_stack.remove(id(self)) - -_recursion_stack = set() -class ComparableEntity(BasicEntity): - def __hash__(self): - return hash(self.__class__) - - def __ne__(self, other): - return not self.__eq__(other) - - def __eq__(self, other): - """'Deep, sparse compare. - - Deeply compare two entities, following the non-None attributes of the - non-persisted object, if possible. - - """ - if other is self: - return True - elif not self.__class__ == other.__class__: - return False - - if id(self) in _recursion_stack: - return True - _recursion_stack.add(id(self)) - - try: - # pick the entity thats not SA persisted as the source - try: - self_key = sa.orm.attributes.instance_state(self).key - except sa.orm.exc.NO_STATE: - self_key = None - - if other is None: - a = self - b = other - elif self_key is not None: - a = other - b = self - else: - a = self - b = other - - for attr in a.__dict__.keys(): - if attr.startswith('_'): - continue - value = getattr(a, attr) - - try: - # handle lazy loader errors - battr = getattr(b, attr) - except (AttributeError, sa_exc.UnboundExecutionError): - return False - - if hasattr(value, '__iter__'): - if list(value) != list(battr): - return False - else: - if value is not None and value != battr: - return False - return True - finally: - _recursion_stack.remove(id(self)) diff --git a/lib/sqlalchemy/test/orm.py b/lib/sqlalchemy/test/orm.py deleted file mode 100644 index 7ec13c555..000000000 --- a/lib/sqlalchemy/test/orm.py +++ /dev/null @@ -1,111 +0,0 @@ -import inspect, re -import config, testing -from sqlalchemy import orm - -__all__ = 'mapper', - - -_whitespace = re.compile(r'^(\s+)') - -def _find_pragma(lines, current): - m = _whitespace.match(lines[current]) - basis = m and m.group() or '' - - for line in reversed(lines[0:current]): - if 'testlib.pragma' in line: - return line - m = _whitespace.match(line) - indent = m and m.group() or '' - - # simplistic detection: - - # >> # testlib.pragma foo - # >> center_line() - if indent == basis: - break - # >> # testlib.pragma foo - # >> if fleem: - # >> center_line() - if line.endswith(':'): - break - return None - -def _make_blocker(method_name, fallback): - """Creates tripwired variant of a method, raising when called. - - To excempt an invocation from blockage, there are two options. - - 1) add a pragma in a comment:: - - # testlib.pragma exempt:methodname - offending_line() - - 2) add a magic cookie to the function's namespace:: - __sa_baremethodname_exempt__ = True - ... - offending_line() - another_offending_lines() - - The second is useful for testing and development. - """ - - if method_name.startswith('__') and method_name.endswith('__'): - frame_marker = '__sa_%s_exempt__' % method_name[2:-2] - else: - frame_marker = '__sa_%s_exempt__' % method_name - pragma_marker = 'exempt:' + method_name - - def method(self, *args, **kw): - frame_r = None - try: - frame = inspect.stack()[1][0] - frame_r = inspect.getframeinfo(frame, 9) - - module = frame.f_globals.get('__name__', '') - - type_ = type(self) - - pragma = _find_pragma(*frame_r[3:5]) - - exempt = ( - (not module.startswith('sqlalchemy')) or - (pragma and pragma_marker in pragma) or - (frame_marker in frame.f_locals) or - ('self' in frame.f_locals and - getattr(frame.f_locals['self'], frame_marker, False))) - - if exempt: - supermeth = getattr(super(type_, self), method_name, None) - if (supermeth is None or - getattr(supermeth, 'im_func', None) is method): - return fallback(self, *args, **kw) - else: - return supermeth(*args, **kw) - else: - raise AssertionError( - "%s.%s called in %s, line %s in %s" % ( - type_.__name__, method_name, module, frame_r[1], frame_r[2])) - finally: - del frame - method.__name__ = method_name - return method - -def mapper(type_, *args, **kw): - forbidden = [ - ('__hash__', 'unhashable', lambda s: id(s)), - ('__eq__', 'noncomparable', lambda s, o: s is o), - ('__ne__', 'noncomparable', lambda s, o: s is not o), - ('__cmp__', 'noncomparable', lambda s, o: object.__cmp__(s, o)), - ('__le__', 'noncomparable', lambda s, o: object.__le__(s, o)), - ('__lt__', 'noncomparable', lambda s, o: object.__lt__(s, o)), - ('__ge__', 'noncomparable', lambda s, o: object.__ge__(s, o)), - ('__gt__', 'noncomparable', lambda s, o: object.__gt__(s, o)), - ('__nonzero__', 'truthless', lambda s: 1), ] - - if isinstance(type_, type) and type_.__bases__ == (object,): - for method_name, option, fallback in forbidden: - if (getattr(config.options, option, False) and - method_name not in type_.__dict__): - setattr(type_, method_name, _make_blocker(method_name, fallback)) - - return orm.mapper(type_, *args, **kw) diff --git a/lib/sqlalchemy/test/pickleable.py b/lib/sqlalchemy/test/pickleable.py deleted file mode 100644 index 9794e424d..000000000 --- a/lib/sqlalchemy/test/pickleable.py +++ /dev/null @@ -1,75 +0,0 @@ -""" - -some objects used for pickle tests, declared in their own module so that they -are easily pickleable. - -""" - - -class Foo(object): - def __init__(self, moredata): - self.data = 'im data' - self.stuff = 'im stuff' - self.moredata = moredata - __hash__ = object.__hash__ - def __eq__(self, other): - return other.data == self.data and other.stuff == self.stuff and other.moredata==self.moredata - - -class Bar(object): - def __init__(self, x, y): - self.x = x - self.y = y - __hash__ = object.__hash__ - def __eq__(self, other): - return other.__class__ is self.__class__ and other.x==self.x and other.y==self.y - def __str__(self): - return "Bar(%d, %d)" % (self.x, self.y) - -class OldSchool: - def __init__(self, x, y): - self.x = x - self.y = y - def __eq__(self, other): - return other.__class__ is self.__class__ and other.x==self.x and other.y==self.y - -class OldSchoolWithoutCompare: - def __init__(self, x, y): - self.x = x - self.y = y - -class BarWithoutCompare(object): - def __init__(self, x, y): - self.x = x - self.y = y - def __str__(self): - return "Bar(%d, %d)" % (self.x, self.y) - - -class NotComparable(object): - def __init__(self, data): - self.data = data - - def __hash__(self): - return id(self) - - def __eq__(self, other): - return NotImplemented - - def __ne__(self, other): - return NotImplemented - - -class BrokenComparable(object): - def __init__(self, data): - self.data = data - - def __hash__(self): - return id(self) - - def __eq__(self, other): - raise NotImplementedError - - def __ne__(self, other): - raise NotImplementedError - diff --git a/lib/sqlalchemy/test/profiling.py b/lib/sqlalchemy/test/profiling.py deleted file mode 100644 index 6f839897d..000000000 --- a/lib/sqlalchemy/test/profiling.py +++ /dev/null @@ -1,221 +0,0 @@ -"""Profiling support for unit and performance tests. - -These are special purpose profiling methods which operate -in a more fine-grained way than nose's profiling plugin. - -""" - -import os, sys -from sqlalchemy.test.util import function_named, gc_collect -from nose import SkipTest - -__all__ = 'profiled', 'function_call_count', 'conditional_call_count' - -all_targets = set() -profile_config = { 'targets': set(), - 'report': True, - 'sort': ('time', 'calls'), - 'limit': None } -profiler = None - -def profiled(target=None, **target_opts): - """Optional function profiling. - - @profiled('label') - or - @profiled('label', report=True, sort=('calls',), limit=20) - - Enables profiling for a function when 'label' is targetted for - profiling. Report options can be supplied, and override the global - configuration and command-line options. - """ - - # manual or automatic namespacing by module would remove conflict issues - if target is None: - target = 'anonymous_target' - elif target in all_targets: - print "Warning: redefining profile target '%s'" % target - all_targets.add(target) - - filename = "%s.prof" % target - - def decorator(fn): - def profiled(*args, **kw): - if (target not in profile_config['targets'] and - not target_opts.get('always', None)): - return fn(*args, **kw) - - elapsed, load_stats, result = _profile( - filename, fn, *args, **kw) - - report = target_opts.get('report', profile_config['report']) - if report: - sort_ = target_opts.get('sort', profile_config['sort']) - limit = target_opts.get('limit', profile_config['limit']) - print "Profile report for target '%s' (%s)" % ( - target, filename) - - stats = load_stats() - stats.sort_stats(*sort_) - if limit: - stats.print_stats(limit) - else: - stats.print_stats() - #stats.print_callers() - os.unlink(filename) - return result - return function_named(profiled, fn.__name__) - return decorator - -def function_call_count(count=None, versions={}, variance=0.05): - """Assert a target for a test case's function call count. - - count - Optional, general target function call count. - - versions - Optional, a dictionary of Python version strings to counts, - for example:: - - { '2.5.1': 110, - '2.5': 100, - '2.4': 150 } - - The best match for the current running python will be used. - If none match, 'count' will be used as the fallback. - - variance - An +/- deviation percentage, defaults to 5%. - """ - - # this could easily dump the profile report if --verbose is in effect - - version_info = list(sys.version_info) - py_version = '.'.join([str(v) for v in sys.version_info]) - try: - from sqlalchemy.cprocessors import to_float - cextension = True - except ImportError: - cextension = False - - while version_info: - version = '.'.join([str(v) for v in version_info]) - if cextension: - version += "+cextension" - if version in versions: - count = versions[version] - break - version_info.pop() - - if count is None: - return lambda fn: fn - - def decorator(fn): - def counted(*args, **kw): - try: - filename = "%s.prof" % fn.__name__ - - elapsed, stat_loader, result = _profile( - filename, fn, *args, **kw) - - stats = stat_loader() - calls = stats.total_calls - - stats.sort_stats('calls', 'cumulative') - stats.print_stats() - #stats.print_callers() - deviance = int(count * variance) - if (calls < (count - deviance) or - calls > (count + deviance)): - raise AssertionError( - "Function call count %s not within %s%% " - "of expected %s. (Python version %s)" % ( - calls, (variance * 100), count, py_version)) - - return result - finally: - if os.path.exists(filename): - os.unlink(filename) - return function_named(counted, fn.__name__) - return decorator - -def conditional_call_count(discriminator, categories): - """Apply a function call count conditionally at runtime. - - Takes two arguments, a callable that returns a key value, and a dict - mapping key values to a tuple of arguments to function_call_count. - - The callable is not evaluated until the decorated function is actually - invoked. If the `discriminator` returns a key not present in the - `categories` dictionary, no call count assertion is applied. - - Useful for integration tests, where running a named test in isolation may - have a function count penalty not seen in the full suite, due to lazy - initialization in the DB-API, SA, etc. - """ - - def decorator(fn): - def at_runtime(*args, **kw): - criteria = categories.get(discriminator(), None) - if criteria is None: - return fn(*args, **kw) - - rewrapped = function_call_count(*criteria)(fn) - return rewrapped(*args, **kw) - return function_named(at_runtime, fn.__name__) - return decorator - - -def _profile(filename, fn, *args, **kw): - global profiler - if not profiler: - if sys.version_info > (2, 5): - try: - import cProfile - profiler = 'cProfile' - except ImportError: - pass - if not profiler: - try: - import hotshot - profiler = 'hotshot' - except ImportError: - profiler = 'skip' - - if profiler == 'skip': - raise SkipTest('Profiling not supported on this platform') - elif profiler == 'cProfile': - return _profile_cProfile(filename, fn, *args, **kw) - else: - return _profile_hotshot(filename, fn, *args, **kw) - -def _profile_cProfile(filename, fn, *args, **kw): - import cProfile, gc, pstats, time - - load_stats = lambda: pstats.Stats(filename) - gc_collect() - - began = time.time() - cProfile.runctx('result = fn(*args, **kw)', globals(), locals(), - filename=filename) - ended = time.time() - - return ended - began, load_stats, locals()['result'] - -def _profile_hotshot(filename, fn, *args, **kw): - import gc, hotshot, hotshot.stats, time - load_stats = lambda: hotshot.stats.load(filename) - - gc_collect() - prof = hotshot.Profile(filename) - began = time.time() - prof.start() - try: - result = fn(*args, **kw) - finally: - prof.stop() - ended = time.time() - prof.close() - - return ended - began, load_stats, result - diff --git a/lib/sqlalchemy/test/requires.py b/lib/sqlalchemy/test/requires.py deleted file mode 100644 index d29b7abc2..000000000 --- a/lib/sqlalchemy/test/requires.py +++ /dev/null @@ -1,318 +0,0 @@ -"""Global database feature support policy. - -Provides decorators to mark tests requiring specific feature support from the -target database. - -""" - -from testing import \ - _block_unconditionally as no_support, \ - _chain_decorators_on, \ - exclude, \ - emits_warning_on,\ - skip_if,\ - fails_on,\ - fails_on_everything_except - -import testing -import sys - -def deferrable_constraints(fn): - """Target database must support derferable constraints.""" - return _chain_decorators_on( - fn, - no_support('firebird', 'not supported by database'), - no_support('mysql', 'not supported by database'), - no_support('mssql', 'not supported by database'), - ) - -def foreign_keys(fn): - """Target database must support foreign keys.""" - return _chain_decorators_on( - fn, - no_support('sqlite', 'not supported by database'), - ) - - -def unbounded_varchar(fn): - """Target database must support VARCHAR with no length""" - return _chain_decorators_on( - fn, - no_support('firebird', 'not supported by database'), - no_support('oracle', 'not supported by database'), - no_support('mysql', 'not supported by database'), - ) - -def boolean_col_expressions(fn): - """Target database must support boolean expressions as columns""" - return _chain_decorators_on( - fn, - no_support('firebird', 'not supported by database'), - no_support('oracle', 'not supported by database'), - no_support('mssql', 'not supported by database'), - no_support('sybase', 'not supported by database'), - no_support('maxdb', 'FIXME: verify not supported by database'), - no_support('informix', 'not supported by database'), - ) - -def identity(fn): - """Target database must support GENERATED AS IDENTITY or a facsimile. - - Includes GENERATED AS IDENTITY, AUTOINCREMENT, AUTO_INCREMENT, or other - column DDL feature that fills in a DB-generated identifier at INSERT-time - without requiring pre-execution of a SEQUENCE or other artifact. - - """ - return _chain_decorators_on( - fn, - no_support('firebird', 'not supported by database'), - no_support('oracle', 'not supported by database'), - no_support('postgresql', 'not supported by database'), - no_support('sybase', 'not supported by database'), - ) - -def independent_cursors(fn): - """Target must support simultaneous, independent database cursors on a single connection.""" - - return _chain_decorators_on( - fn, - no_support('mssql+pyodbc', 'no driver support'), - no_support('mssql+mxodbc', 'no driver support'), - ) - -def independent_connections(fn): - """Target must support simultaneous, independent database connections.""" - - # This is also true of some configurations of UnixODBC and probably win32 - # ODBC as well. - return _chain_decorators_on( - fn, - no_support('sqlite', 'no driver support'), - exclude('mssql', '<', (9, 0, 0), - 'SQL Server 2005+ is required for independent connections'), - ) - -def row_triggers(fn): - """Target must support standard statement-running EACH ROW triggers.""" - return _chain_decorators_on( - fn, - # no access to same table - no_support('mysql', 'requires SUPER priv'), - exclude('mysql', '<', (5, 0, 10), 'not supported by database'), - - # huh? TODO: implement triggers for PG tests, remove this - no_support('postgresql', 'PG triggers need to be implemented for tests'), - ) - -def correlated_outer_joins(fn): - """Target must support an outer join to a subquery which correlates to the parent.""" - - return _chain_decorators_on( - fn, - no_support('oracle', 'Raises "ORA-01799: a column may not be outer-joined to a subquery"') - ) - -def savepoints(fn): - """Target database must support savepoints.""" - return _chain_decorators_on( - fn, - emits_warning_on('mssql', 'Savepoint support in mssql is experimental and may lead to data loss.'), - no_support('access', 'not supported by database'), - no_support('sqlite', 'not supported by database'), - no_support('sybase', 'FIXME: guessing, needs confirmation'), - exclude('mysql', '<', (5, 0, 3), 'not supported by database'), - exclude('informix', '<', (11, 55, 'xC3'), 'not supported by database'), - ) - -def denormalized_names(fn): - """Target database must have 'denormalized', i.e. UPPERCASE as case insensitive names.""" - - return skip_if( - lambda: not testing.db.dialect.requires_name_normalize, - "Backend does not require denomralized names." - )(fn) - -def schemas(fn): - """Target database must support external schemas, and have one named 'test_schema'.""" - - return _chain_decorators_on( - fn, - no_support('sqlite', 'no schema support'), - no_support('firebird', 'no schema support') - ) - -def sequences(fn): - """Target database must support SEQUENCEs.""" - return _chain_decorators_on( - fn, - no_support('access', 'no SEQUENCE support'), - no_support('mssql', 'no SEQUENCE support'), - no_support('mysql', 'no SEQUENCE support'), - no_support('sqlite', 'no SEQUENCE support'), - no_support('sybase', 'no SEQUENCE support'), - no_support('informix', 'no SEQUENCE support'), - ) - -def update_nowait(fn): - """Target database must support SELECT...FOR UPDATE NOWAIT""" - return _chain_decorators_on( - fn, - no_support('access', 'no FOR UPDATE NOWAIT support'), - no_support('firebird', 'no FOR UPDATE NOWAIT support'), - no_support('mssql', 'no FOR UPDATE NOWAIT support'), - no_support('mysql', 'no FOR UPDATE NOWAIT support'), - no_support('sqlite', 'no FOR UPDATE NOWAIT support'), - no_support('sybase', 'no FOR UPDATE NOWAIT support'), - ) - -def subqueries(fn): - """Target database must support subqueries.""" - return _chain_decorators_on( - fn, - exclude('mysql', '<', (4, 1, 1), 'no subquery support'), - ) - -def intersect(fn): - """Target database must support INTERSECT or equivlaent.""" - return _chain_decorators_on( - fn, - fails_on('firebird', 'no support for INTERSECT'), - fails_on('mysql', 'no support for INTERSECT'), - fails_on('sybase', 'no support for INTERSECT'), - fails_on('informix', 'no support for INTERSECT'), - ) - -def except_(fn): - """Target database must support EXCEPT or equivlaent (i.e. MINUS).""" - return _chain_decorators_on( - fn, - fails_on('firebird', 'no support for EXCEPT'), - fails_on('mysql', 'no support for EXCEPT'), - fails_on('sybase', 'no support for EXCEPT'), - fails_on('informix', 'no support for EXCEPT'), - ) - -def offset(fn): - """Target database must support some method of adding OFFSET or equivalent to a result set.""" - return _chain_decorators_on( - fn, - fails_on('sybase', 'no support for OFFSET or equivalent'), - ) - -def returning(fn): - return _chain_decorators_on( - fn, - no_support('access', 'not supported by database'), - no_support('sqlite', 'not supported by database'), - no_support('mysql', 'not supported by database'), - no_support('maxdb', 'not supported by database'), - no_support('sybase', 'not supported by database'), - no_support('informix', 'not supported by database'), - ) - -def two_phase_transactions(fn): - """Target database must support two-phase transactions.""" - return _chain_decorators_on( - fn, - no_support('access', 'not supported by database'), - no_support('firebird', 'no SA implementation'), - no_support('maxdb', 'not supported by database'), - no_support('mssql', 'FIXME: guessing, needs confirmation'), - no_support('oracle', 'no SA implementation'), - no_support('sqlite', 'not supported by database'), - no_support('sybase', 'FIXME: guessing, needs confirmation'), - no_support('postgresql+zxjdbc', 'FIXME: JDBC driver confuses the transaction state, may ' - 'need separate XA implementation'), - exclude('mysql', '<', (5, 0, 3), 'not supported by database'), - ) - -def unicode_connections(fn): - """Target driver must support some encoding of Unicode across the wire.""" - # TODO: expand to exclude MySQLdb versions w/ broken unicode - return _chain_decorators_on( - fn, - exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'), - ) - -def unicode_ddl(fn): - """Target driver must support some encoding of Unicode across the wire.""" - # TODO: expand to exclude MySQLdb versions w/ broken unicode - return _chain_decorators_on( - fn, - no_support('maxdb', 'database support flakey'), - no_support('oracle', 'FIXME: no support in database?'), - no_support('sybase', 'FIXME: guessing, needs confirmation'), - no_support('mssql+pymssql', 'no FreeTDS support'), - exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'), - ) - -def sane_rowcount(fn): - return _chain_decorators_on( - fn, - skip_if(lambda: not testing.db.dialect.supports_sane_rowcount) - ) - -def cextensions(fn): - return _chain_decorators_on( - fn, - skip_if(lambda: not _has_cextensions(), "C extensions not installed") - ) - -def dbapi_lastrowid(fn): - return _chain_decorators_on( - fn, - fails_on_everything_except('mysql+mysqldb', 'mysql+oursql', 'sqlite+pysqlite') - ) - -def sane_multi_rowcount(fn): - return _chain_decorators_on( - fn, - skip_if(lambda: not testing.db.dialect.supports_sane_multi_rowcount) - ) - -def reflects_pk_names(fn): - """Target driver reflects the name of primary key constraints.""" - return _chain_decorators_on( - fn, - fails_on_everything_except('postgresql', 'oracle') - ) - -def python2(fn): - return _chain_decorators_on( - fn, - skip_if( - lambda: sys.version_info >= (3,), - "Python version 2.xx is required." - ) - ) - -def python26(fn): - return _chain_decorators_on( - fn, - skip_if( - lambda: sys.version_info < (2, 6), - "Python version 2.6 or greater is required" - ) - ) - -def _has_cextensions(): - try: - from sqlalchemy import cresultproxy, cprocessors - return True - except ImportError: - return False - -def _has_sqlite(): - from sqlalchemy import create_engine - try: - e = create_engine('sqlite://') - return True - except ImportError: - return False - -def sqlite(fn): - return _chain_decorators_on( - fn, - skip_if(lambda: not _has_sqlite()) - ) - diff --git a/lib/sqlalchemy/test/schema.py b/lib/sqlalchemy/test/schema.py deleted file mode 100644 index d33d75e2c..000000000 --- a/lib/sqlalchemy/test/schema.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Enhanced versions of schema.Table and schema.Column which establish -desired state for different backends. -""" - -from sqlalchemy.test import testing -from sqlalchemy import schema - -__all__ = 'Table', 'Column', - -table_options = {} - -def Table(*args, **kw): - """A schema.Table wrapper/hook for dialect-specific tweaks.""" - - test_opts = dict([(k,kw.pop(k)) for k in kw.keys() - if k.startswith('test_')]) - - kw.update(table_options) - - if testing.against('mysql'): - if 'mysql_engine' not in kw and 'mysql_type' not in kw: - if 'test_needs_fk' in test_opts or 'test_needs_acid' in test_opts: - kw['mysql_engine'] = 'InnoDB' - - # Apply some default cascading rules for self-referential foreign keys. - # MySQL InnoDB has some issues around seleting self-refs too. - if testing.against('firebird'): - table_name = args[0] - unpack = (testing.config.db.dialect. - identifier_preparer.unformat_identifiers) - - # Only going after ForeignKeys in Columns. May need to - # expand to ForeignKeyConstraint too. - fks = [fk - for col in args if isinstance(col, schema.Column) - for fk in col.foreign_keys] - - for fk in fks: - # root around in raw spec - ref = fk._colspec - if isinstance(ref, schema.Column): - name = ref.table.name - else: - # take just the table name: on FB there cannot be - # a schema, so the first element is always the - # table name, possibly followed by the field name - name = unpack(ref)[0] - if name == table_name: - if fk.ondelete is None: - fk.ondelete = 'CASCADE' - if fk.onupdate is None: - fk.onupdate = 'CASCADE' - - return schema.Table(*args, **kw) - - -def Column(*args, **kw): - """A schema.Column wrapper/hook for dialect-specific tweaks.""" - - test_opts = dict([(k,kw.pop(k)) for k in kw.keys() - if k.startswith('test_')]) - - col = schema.Column(*args, **kw) - if 'test_needs_autoincrement' in test_opts and \ - kw.get('primary_key', False) and \ - testing.against('firebird', 'oracle'): - def add_seq(tbl, c): - c._init_items( - schema.Sequence(_truncate_name(testing.db.dialect, tbl.name + '_' + c.name + '_seq'), optional=True) - ) - col._on_table_attach(add_seq) - return col - -def _truncate_name(dialect, name): - if len(name) > dialect.max_identifier_length: - return name[0:max(dialect.max_identifier_length - 6, 0)] + "_" + hex(hash(name) % 64)[2:] - else: - return name - diff --git a/lib/sqlalchemy/test/testing.py b/lib/sqlalchemy/test/testing.py deleted file mode 100644 index 46723baa0..000000000 --- a/lib/sqlalchemy/test/testing.py +++ /dev/null @@ -1,797 +0,0 @@ -"""TestCase and TestSuite artifacts and testing decorators.""" - -import itertools -import operator -import re -import sys -import types -import warnings -from cStringIO import StringIO - -from test.bootstrap import config -from sqlalchemy.test import assertsql, util as testutil -from sqlalchemy.util import function_named, py3k -from engines import drop_all_tables - -from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema, pool, orm -from sqlalchemy.engine import default -from nose import SkipTest - - -_ops = { '<': operator.lt, - '>': operator.gt, - '==': operator.eq, - '!=': operator.ne, - '<=': operator.le, - '>=': operator.ge, - 'in': operator.contains, - 'between': lambda val, pair: val >= pair[0] and val <= pair[1], - } - -# sugar ('testing.db'); set here by config() at runtime -db = None - -# more sugar, installed by __init__ -requires = None - -def fails_if(callable_, reason=None): - """Mark a test as expected to fail if callable_ returns True. - - If the callable returns false, the test is run and reported as normal. - However if the callable returns true, the test is expected to fail and the - unit test logic is inverted: if the test fails, a success is reported. If - the test succeeds, a failure is reported. - """ - - docstring = getattr(callable_, '__doc__', None) or callable_.__name__ - description = docstring.split('\n')[0] - - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if not callable_(): - return fn(*args, **kw) - else: - try: - fn(*args, **kw) - except Exception, ex: - print ("'%s' failed as expected (condition: %s): %s " % ( - fn_name, description, str(ex))) - return True - else: - raise AssertionError( - "Unexpected success for '%s' (condition: %s)" % - (fn_name, description)) - return function_named(maybe, fn_name) - return decorate - - -def future(fn): - """Mark a test as expected to unconditionally fail. - - Takes no arguments, omit parens when using as a decorator. - """ - - fn_name = fn.__name__ - def decorated(*args, **kw): - try: - fn(*args, **kw) - except Exception, ex: - print ("Future test '%s' failed as expected: %s " % ( - fn_name, str(ex))) - return True - else: - raise AssertionError( - "Unexpected success for future test '%s'" % fn_name) - return function_named(decorated, fn_name) - -def db_spec(*dbs): - dialects = set([x for x in dbs if '+' not in x]) - drivers = set([x[1:] for x in dbs if x.startswith('+')]) - specs = set([tuple(x.split('+')) for x in dbs if '+' in x and x not in drivers]) - - def check(engine): - return engine.name in dialects or \ - engine.driver in drivers or \ - (engine.name, engine.driver) in specs - - return check - - -def fails_on(dbs, reason): - """Mark a test as expected to fail on the specified database - implementation. - - Unlike ``crashes``, tests marked as ``fails_on`` will be run - for the named databases. The test is expected to fail and the unit test - logic is inverted: if the test fails, a success is reported. If the test - succeeds, a failure is reported. - """ - - spec = db_spec(dbs) - - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if not spec(config.db): - return fn(*args, **kw) - else: - try: - fn(*args, **kw) - except Exception, ex: - print ("'%s' failed as expected on DB implementation " - "'%s+%s': %s" % ( - fn_name, config.db.name, config.db.driver, reason)) - return True - else: - raise AssertionError( - "Unexpected success for '%s' on DB implementation '%s+%s'" % - (fn_name, config.db.name, config.db.driver)) - return function_named(maybe, fn_name) - return decorate - -def fails_on_everything_except(*dbs): - """Mark a test as expected to fail on most database implementations. - - Like ``fails_on``, except failure is the expected outcome on all - databases except those listed. - """ - - spec = db_spec(*dbs) - - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if spec(config.db): - return fn(*args, **kw) - else: - try: - fn(*args, **kw) - except Exception, ex: - print ("'%s' failed as expected on DB implementation " - "'%s+%s': %s" % ( - fn_name, config.db.name, config.db.driver, str(ex))) - return True - else: - raise AssertionError( - "Unexpected success for '%s' on DB implementation '%s+%s'" % - (fn_name, config.db.name, config.db.driver)) - return function_named(maybe, fn_name) - return decorate - -def crashes(db, reason): - """Mark a test as unsupported by a database implementation. - - ``crashes`` tests will be skipped unconditionally. Use for feature tests - that cause deadlocks or other fatal problems. - - """ - carp = _should_carp_about_exclusion(reason) - spec = db_spec(db) - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if spec(config.db): - msg = "'%s' unsupported on DB implementation '%s+%s': %s" % ( - fn_name, config.db.name, config.db.driver, reason) - print msg - if carp: - print >> sys.stderr, msg - return True - else: - return fn(*args, **kw) - return function_named(maybe, fn_name) - return decorate - -def _block_unconditionally(db, reason): - """Mark a test as unsupported by a database implementation. - - Will never run the test against any version of the given database, ever, - no matter what. Use when your assumptions are infallible; past, present - and future. - - """ - carp = _should_carp_about_exclusion(reason) - spec = db_spec(db) - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if spec(config.db): - msg = "'%s' unsupported on DB implementation '%s+%s': %s" % ( - fn_name, config.db.name, config.db.driver, reason) - print msg - if carp: - print >> sys.stderr, msg - return True - else: - return fn(*args, **kw) - return function_named(maybe, fn_name) - return decorate - -def only_on(dbs, reason): - carp = _should_carp_about_exclusion(reason) - spec = db_spec(*util.to_list(dbs)) - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if spec(config.db): - return fn(*args, **kw) - else: - msg = "'%s' unsupported on DB implementation '%s+%s': %s" % ( - fn_name, config.db.name, config.db.driver, reason) - print msg - if carp: - print >> sys.stderr, msg - return True - return function_named(maybe, fn_name) - return decorate - -def exclude(db, op, spec, reason): - """Mark a test as unsupported by specific database server versions. - - Stackable, both with other excludes and other decorators. Examples:: - - # Not supported by mydb versions less than 1, 0 - @exclude('mydb', '<', (1,0)) - # Other operators work too - @exclude('bigdb', '==', (9,0,9)) - @exclude('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3'))) - - """ - carp = _should_carp_about_exclusion(reason) - - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if _is_excluded(db, op, spec): - msg = "'%s' unsupported on DB %s version '%s': %s" % ( - fn_name, config.db.name, _server_version(), reason) - print msg - if carp: - print >> sys.stderr, msg - return True - else: - return fn(*args, **kw) - return function_named(maybe, fn_name) - return decorate - -def _should_carp_about_exclusion(reason): - """Guard against forgotten exclusions.""" - assert reason - for _ in ('todo', 'fixme', 'xxx'): - if _ in reason.lower(): - return True - else: - if len(reason) < 4: - return True - -def _is_excluded(db, op, spec): - """Return True if the configured db matches an exclusion specification. - - db: - A dialect name - op: - An operator or stringified operator, such as '==' - spec: - A value that will be compared to the dialect's server_version_info - using the supplied operator. - - Examples:: - # Not supported by mydb versions less than 1, 0 - _is_excluded('mydb', '<', (1,0)) - # Other operators work too - _is_excluded('bigdb', '==', (9,0,9)) - _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3'))) - """ - - vendor_spec = db_spec(db) - - if not vendor_spec(config.db): - return False - - version = _server_version() - - oper = hasattr(op, '__call__') and op or _ops[op] - return oper(version, spec) - -def _server_version(bind=None): - """Return a server_version_info tuple.""" - - if bind is None: - bind = config.db - - # force metadata to be retrieved - conn = bind.connect() - version = getattr(bind.dialect, 'server_version_info', ()) - conn.close() - return version - -def skip_if(predicate, reason=None): - """Skip a test if predicate is true.""" - reason = reason or predicate.__name__ - carp = _should_carp_about_exclusion(reason) - - def decorate(fn): - fn_name = fn.__name__ - def maybe(*args, **kw): - if predicate(): - msg = "'%s' skipped on DB %s version '%s': %s" % ( - fn_name, config.db.name, _server_version(), reason) - print msg - if carp: - print >> sys.stderr, msg - return True - else: - return fn(*args, **kw) - return function_named(maybe, fn_name) - return decorate - -def emits_warning(*messages): - """Mark a test as emitting a warning. - - With no arguments, squelches all SAWarning failures. Or pass one or more - strings; these will be matched to the root of the warning description by - warnings.filterwarnings(). - """ - - # TODO: it would be nice to assert that a named warning was - # emitted. should work with some monkeypatching of warnings, - # and may work on non-CPython if they keep to the spirit of - # warnings.showwarning's docstring. - # - update: jython looks ok, it uses cpython's module - def decorate(fn): - def safe(*args, **kw): - # todo: should probably be strict about this, too - filters = [dict(action='ignore', - category=sa_exc.SAPendingDeprecationWarning)] - if not messages: - filters.append(dict(action='ignore', - category=sa_exc.SAWarning)) - else: - filters.extend(dict(action='ignore', - message=message, - category=sa_exc.SAWarning) - for message in messages) - for f in filters: - warnings.filterwarnings(**f) - try: - return fn(*args, **kw) - finally: - resetwarnings() - return function_named(safe, fn.__name__) - return decorate - -def emits_warning_on(db, *warnings): - """Mark a test as emitting a warning on a specific dialect. - - With no arguments, squelches all SAWarning failures. Or pass one or more - strings; these will be matched to the root of the warning description by - warnings.filterwarnings(). - """ - spec = db_spec(db) - - def decorate(fn): - def maybe(*args, **kw): - if isinstance(db, basestring): - if not spec(config.db): - return fn(*args, **kw) - else: - wrapped = emits_warning(*warnings)(fn) - return wrapped(*args, **kw) - else: - if not _is_excluded(*db): - return fn(*args, **kw) - else: - wrapped = emits_warning(*warnings)(fn) - return wrapped(*args, **kw) - return function_named(maybe, fn.__name__) - return decorate - -def uses_deprecated(*messages): - """Mark a test as immune from fatal deprecation warnings. - - With no arguments, squelches all SADeprecationWarning failures. - Or pass one or more strings; these will be matched to the root - of the warning description by warnings.filterwarnings(). - - As a special case, you may pass a function name prefixed with // - and it will be re-written as needed to match the standard warning - verbiage emitted by the sqlalchemy.util.deprecated decorator. - """ - - - def decorate(fn): - def safe(*args, **kw): - # todo: should probably be strict about this, too - filters = [dict(action='ignore', - category=sa_exc.SAPendingDeprecationWarning)] - if not messages: - filters.append(dict(action='ignore', - category=sa_exc.SADeprecationWarning)) - else: - filters.extend( - [dict(action='ignore', - message=message, - category=sa_exc.SADeprecationWarning) - for message in - [ (m.startswith('//') and - ('Call to deprecated function ' + m[2:]) or m) - for m in messages] ]) - - for f in filters: - warnings.filterwarnings(**f) - try: - return fn(*args, **kw) - finally: - resetwarnings() - return function_named(safe, fn.__name__) - return decorate - -def resetwarnings(): - """Reset warning behavior to testing defaults.""" - - warnings.filterwarnings('ignore', - category=sa_exc.SAPendingDeprecationWarning) - warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning) - warnings.filterwarnings('error', category=sa_exc.SAWarning) - -# warnings.simplefilter('error') - - -def global_cleanup_assertions(): - """Check things that have to be finalized at the end of a test suite. - - Hardcoded at the moment, a modular system can be built here - to support things like PG prepared transactions, tables all - dropped, etc. - - """ - - testutil.lazy_gc() - assert not pool._refs - - - -def against(*queries): - """Boolean predicate, compares to testing database configuration. - - Given one or more dialect names, returns True if one is the configured - database engine. - - Also supports comparison to database version when provided with one or - more 3-tuples of dialect name, operator, and version specification:: - - testing.against('mysql', 'postgresql') - testing.against(('mysql', '>=', (5, 0, 0)) - """ - - for query in queries: - if isinstance(query, basestring): - if db_spec(query)(config.db): - return True - else: - name, op, spec = query - if not db_spec(name)(config.db): - continue - - have = _server_version() - - oper = hasattr(op, '__call__') and op or _ops[op] - if oper(have, spec): - return True - return False - -def _chain_decorators_on(fn, *decorators): - """Apply a series of decorators to fn, returning a decorated function.""" - for decorator in reversed(decorators): - fn = decorator(fn) - return fn - -def rowset(results): - """Converts the results of sql execution into a plain set of column tuples. - - Useful for asserting the results of an unordered query. - """ - - return set([tuple(row) for row in results]) - - -def eq_(a, b, msg=None): - """Assert a == b, with repr messaging on failure.""" - assert a == b, msg or "%r != %r" % (a, b) - -def ne_(a, b, msg=None): - """Assert a != b, with repr messaging on failure.""" - assert a != b, msg or "%r == %r" % (a, b) - -def is_(a, b, msg=None): - """Assert a is b, with repr messaging on failure.""" - assert a is b, msg or "%r is not %r" % (a, b) - -def is_not_(a, b, msg=None): - """Assert a is not b, with repr messaging on failure.""" - assert a is not b, msg or "%r is %r" % (a, b) - -def startswith_(a, fragment, msg=None): - """Assert a.startswith(fragment), with repr messaging on failure.""" - assert a.startswith(fragment), msg or "%r does not start with %r" % ( - a, fragment) - -def assert_raises(except_cls, callable_, *args, **kw): - try: - callable_(*args, **kw) - success = False - except except_cls, e: - success = True - - # assert outside the block so it works for AssertionError too ! - assert success, "Callable did not raise an exception" - -def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): - try: - callable_(*args, **kwargs) - assert False, "Callable did not raise an exception" - except except_cls, e: - assert re.search(msg, str(e)), "%r !~ %s" % (msg, e) - print str(e) - -def fail(msg): - assert False, msg - -def fixture(table, columns, *rows): - """Insert data into table after creation.""" - def onload(event, schema_item, connection): - insert = table.insert() - column_names = [col.key for col in columns] - connection.execute(insert, [dict(zip(column_names, column_values)) - for column_values in rows]) - table.append_ddl_listener('after-create', onload) - -def provide_metadata(fn): - """Provides a bound MetaData object for a single test, - drops it afterwards.""" - def maybe(*args, **kw): - metadata = schema.MetaData(db) - context = dict(fn.func_globals) - context['metadata'] = metadata - # jython bug #1034 - rebound = types.FunctionType( - fn.func_code, context, fn.func_name, fn.func_defaults, - fn.func_closure) - try: - return rebound(*args, **kw) - finally: - metadata.drop_all() - return function_named(maybe, fn.__name__) - -def resolve_artifact_names(fn): - """Decorator, augment function globals with tables and classes. - - Swaps out the function's globals at execution time. The 'global' statement - will not work as expected inside a decorated function. - - """ - # This could be automatically applied to framework and test_ methods in - # the MappedTest-derived test suites but... *some* explicitness for this - # magic is probably good. Especially as 'global' won't work- these - # rebound functions aren't regular Python.. - # - # Also: it's lame that CPython accepts a dict-subclass for globals, but - # only calls dict methods. That would allow 'global' to pass through to - # the func_globals. - def resolved(*args, **kwargs): - self = args[0] - context = dict(fn.func_globals) - for source in self._artifact_registries: - context.update(getattr(self, source)) - # jython bug #1034 - rebound = types.FunctionType( - fn.func_code, context, fn.func_name, fn.func_defaults, - fn.func_closure) - return rebound(*args, **kwargs) - return function_named(resolved, fn.func_name) - -class adict(dict): - """Dict keys available as attributes. Shadows.""" - def __getattribute__(self, key): - try: - return self[key] - except KeyError: - return dict.__getattribute__(self, key) - - def get_all(self, *keys): - return tuple([self[key] for key in keys]) - - -class TestBase(object): - # A sequence of database names to always run, regardless of the - # constraints below. - __whitelist__ = () - - # A sequence of requirement names matching testing.requires decorators - __requires__ = () - - # A sequence of dialect names to exclude from the test class. - __unsupported_on__ = () - - # If present, test class is only runnable for the *single* specified - # dialect. If you need multiple, use __unsupported_on__ and invert. - __only_on__ = None - - # A sequence of no-arg callables. If any are True, the entire testcase is - # skipped. - __skip_if__ = None - - _artifact_registries = () - - def assert_(self, val, msg=None): - assert val, msg - -class AssertsCompiledSQL(object): - def assert_compile(self, clause, result, params=None, checkparams=None, dialect=None, use_default_dialect=False): - if use_default_dialect: - dialect = default.DefaultDialect() - - if dialect is None: - dialect = getattr(self, '__dialect__', None) - - kw = {} - if params is not None: - kw['column_keys'] = params.keys() - - if isinstance(clause, orm.Query): - context = clause._compile_context() - context.statement.use_labels = True - clause = context.statement - - c = clause.compile(dialect=dialect, **kw) - - param_str = repr(getattr(c, 'params', {})) - # Py3K - #param_str = param_str.encode('utf-8').decode('ascii', 'ignore') - - print "\nSQL String:\n" + str(c) + param_str - - cc = re.sub(r'[\n\t]', '', str(c)) - - eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect)) - - if checkparams is not None: - eq_(c.construct_params(params), checkparams) - -class ComparesTables(object): - def assert_tables_equal(self, table, reflected_table, strict_types=False): - assert len(table.c) == len(reflected_table.c) - for c, reflected_c in zip(table.c, reflected_table.c): - eq_(c.name, reflected_c.name) - assert reflected_c is reflected_table.c[c.name] - eq_(c.primary_key, reflected_c.primary_key) - eq_(c.nullable, reflected_c.nullable) - - if strict_types: - assert type(reflected_c.type) is type(c.type), \ - "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type) - else: - self.assert_types_base(reflected_c, c) - - if isinstance(c.type, sqltypes.String): - eq_(c.type.length, reflected_c.type.length) - - eq_(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys])) - if c.server_default: - assert isinstance(reflected_c.server_default, - schema.FetchedValue) - - assert len(table.primary_key) == len(reflected_table.primary_key) - for c in table.primary_key: - assert reflected_table.primary_key.columns[c.name] is not None - - def assert_types_base(self, c1, c2): - assert c1.type._compare_type_affinity(c2.type),\ - "On column %r, type '%s' doesn't correspond to type '%s'" % \ - (c1.name, c1.type, c2.type) - -class AssertsExecutionResults(object): - def assert_result(self, result, class_, *objects): - result = list(result) - print repr(result) - self.assert_list(result, class_, objects) - - def assert_list(self, result, class_, list): - self.assert_(len(result) == len(list), - "result list is not the same size as test list, " + - "for class " + class_.__name__) - for i in range(0, len(list)): - self.assert_row(class_, result[i], list[i]) - - def assert_row(self, class_, rowobj, desc): - self.assert_(rowobj.__class__ is class_, - "item class is not " + repr(class_)) - for key, value in desc.iteritems(): - if isinstance(value, tuple): - if isinstance(value[1], list): - self.assert_list(getattr(rowobj, key), value[0], value[1]) - else: - self.assert_row(value[0], getattr(rowobj, key), value[1]) - else: - self.assert_(getattr(rowobj, key) == value, - "attribute %s value %s does not match %s" % ( - key, getattr(rowobj, key), value)) - - def assert_unordered_result(self, result, cls, *expected): - """As assert_result, but the order of objects is not considered. - - The algorithm is very expensive but not a big deal for the small - numbers of rows that the test suite manipulates. - """ - - class frozendict(dict): - def __hash__(self): - return id(self) - - found = util.IdentitySet(result) - expected = set([frozendict(e) for e in expected]) - - for wrong in itertools.ifilterfalse(lambda o: type(o) == cls, found): - fail('Unexpected type "%s", expected "%s"' % ( - type(wrong).__name__, cls.__name__)) - - if len(found) != len(expected): - fail('Unexpected object count "%s", expected "%s"' % ( - len(found), len(expected))) - - NOVALUE = object() - def _compare_item(obj, spec): - for key, value in spec.iteritems(): - if isinstance(value, tuple): - try: - self.assert_unordered_result( - getattr(obj, key), value[0], *value[1]) - except AssertionError: - return False - else: - if getattr(obj, key, NOVALUE) != value: - return False - return True - - for expected_item in expected: - for found_item in found: - if _compare_item(found_item, expected_item): - found.remove(found_item) - break - else: - fail( - "Expected %s instance with attributes %s not found." % ( - cls.__name__, repr(expected_item))) - return True - - def assert_sql_execution(self, db, callable_, *rules): - assertsql.asserter.add_rules(rules) - try: - callable_() - assertsql.asserter.statement_complete() - finally: - assertsql.asserter.clear_rules() - - def assert_sql(self, db, callable_, list_, with_sequences=None): - if with_sequences is not None and config.db.name in ('firebird', 'oracle', 'postgresql'): - rules = with_sequences - else: - rules = list_ - - newrules = [] - for rule in rules: - if isinstance(rule, dict): - newrule = assertsql.AllOf(*[ - assertsql.ExactSQL(k, v) for k, v in rule.iteritems() - ]) - else: - newrule = assertsql.ExactSQL(*rule) - newrules.append(newrule) - - self.assert_sql_execution(db, callable_, *newrules) - - def assert_sql_count(self, db, callable_, count): - self.assert_sql_execution(db, callable_, assertsql.CountStatements(count)) - - diff --git a/lib/sqlalchemy/test/util.py b/lib/sqlalchemy/test/util.py deleted file mode 100644 index f2b6b49ea..000000000 --- a/lib/sqlalchemy/test/util.py +++ /dev/null @@ -1,76 +0,0 @@ -from sqlalchemy.util import jython, function_named - -import gc -import time -import random - -if jython: - def gc_collect(*args): - """aggressive gc.collect for tests.""" - gc.collect() - time.sleep(0.1) - gc.collect() - gc.collect() - return 0 - - # "lazy" gc, for VM's that don't GC on refcount == 0 - lazy_gc = gc_collect - -else: - # assume CPython - straight gc.collect, lazy_gc() is a pass - gc_collect = gc.collect - def lazy_gc(): - pass - -def picklers(): - picklers = set() - # Py2K - try: - import cPickle - picklers.add(cPickle) - except ImportError: - pass - # end Py2K - import pickle - picklers.add(pickle) - - # yes, this thing needs this much testing - for pickle in picklers: - for protocol in -1, 0, 1, 2: - yield pickle.loads, lambda d:pickle.dumps(d, protocol) - - -def round_decimal(value, prec): - if isinstance(value, float): - return round(value, prec) - - import decimal - - # can also use shift() here but that is 2.6 only - return (value * decimal.Decimal("1" + "0" * prec)).to_integral(decimal.ROUND_FLOOR) / \ - pow(10, prec) - -class RandomSet(set): - def __iter__(self): - l = list(set.__iter__(self)) - random.shuffle(l) - return iter(l) - - def pop(self): - index = random.randint(0, len(self) - 1) - item = list(set.__iter__(self))[index] - self.remove(item) - return item - - def union(self, other): - return RandomSet(set.union(self, other)) - - def difference(self, other): - return RandomSet(set.difference(self, other)) - - def intersection(self, other): - return RandomSet(set.intersection(self, other)) - - def copy(self): - return RandomSet(self) -
\ No newline at end of file |