diff options
Diffstat (limited to 'migrate/tests/fixture')
-rw-r--r-- | migrate/tests/fixture/__init__.py | 18 | ||||
-rw-r--r-- | migrate/tests/fixture/base.py | 26 | ||||
-rw-r--r-- | migrate/tests/fixture/database.py | 203 | ||||
-rw-r--r-- | migrate/tests/fixture/models.py | 14 | ||||
-rw-r--r-- | migrate/tests/fixture/pathed.py | 77 | ||||
-rw-r--r-- | migrate/tests/fixture/shell.py | 33 | ||||
-rw-r--r-- | migrate/tests/fixture/warnings.py | 88 |
7 files changed, 0 insertions, 459 deletions
diff --git a/migrate/tests/fixture/__init__.py b/migrate/tests/fixture/__init__.py deleted file mode 100644 index 6b8bc48..0000000 --- a/migrate/tests/fixture/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import testtools - -def main(imports=None): - if imports: - global suite - suite = suite(imports) - defaultTest='fixture.suite' - else: - defaultTest=None - return testtools.TestProgram(defaultTest=defaultTest) - -from .base import Base -from .pathed import Pathed -from .shell import Shell -from .database import DB,usedb diff --git a/migrate/tests/fixture/base.py b/migrate/tests/fixture/base.py deleted file mode 100644 index 38c91af..0000000 --- a/migrate/tests/fixture/base.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import re -import testtools - -class Base(testtools.TestCase): - - def assertEqualIgnoreWhitespace(self, v1, v2): - """Compares two strings that should be\ - identical except for whitespace - """ - def strip_whitespace(s): - return re.sub(r'\s', '', s) - - line1 = strip_whitespace(v1) - line2 = strip_whitespace(v2) - - self.assertEqual(line1, line2, "%s != %s" % (v1, v2)) - - def ignoreErrors(self, func, *p,**k): - """Call a function, ignoring any exceptions""" - try: - func(*p,**k) - except: - pass diff --git a/migrate/tests/fixture/database.py b/migrate/tests/fixture/database.py deleted file mode 100644 index 93bd69b..0000000 --- a/migrate/tests/fixture/database.py +++ /dev/null @@ -1,203 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import logging -import sys - -import six -from decorator import decorator - -from sqlalchemy import create_engine, Table, MetaData -from sqlalchemy import exc as sa_exc -from sqlalchemy.orm import create_session -from sqlalchemy.pool import StaticPool - -from migrate.changeset.schema import ColumnDelta -from migrate.versioning.util import Memoize - -from migrate.tests.fixture.base import Base -from migrate.tests.fixture.pathed import Pathed - - -log = logging.getLogger(__name__) - -@Memoize -def readurls(): - """read URLs from config file return a list""" - # TODO: remove tmpfile since sqlite can store db in memory - filename = 'test_db.cfg' if six.PY2 else "test_db_py3.cfg" - ret = list() - tmpfile = Pathed.tmp() - fullpath = os.path.join(os.curdir, filename) - - try: - fd = open(fullpath) - except IOError: - raise IOError("""You must specify the databases to use for testing! -Copy %(filename)s.tmpl to %(filename)s and edit your database URLs.""" % locals()) - - for line in fd: - if line.startswith('#'): - continue - line = line.replace('__tmp__', tmpfile).strip() - ret.append(line) - fd.close() - return ret - -def is_supported(url, supported, not_supported): - db = url.split(':', 1)[0] - - if supported is not None: - if isinstance(supported, six.string_types): - return supported == db - else: - return db in supported - elif not_supported is not None: - if isinstance(not_supported, six.string_types): - return not_supported != db - else: - return not (db in not_supported) - return True - - -def usedb(supported=None, not_supported=None): - """Decorates tests to be run with a database connection - These tests are run once for each available database - - @param supported: run tests for ONLY these databases - @param not_supported: run tests for all databases EXCEPT these - - If both supported and not_supported are empty, all dbs are assumed - to be supported - """ - if supported is not None and not_supported is not None: - raise AssertionError("Can't specify both supported and not_supported in fixture.db()") - - urls = readurls() - my_urls = [url for url in urls if is_supported(url, supported, not_supported)] - - @decorator - def dec(f, self, *a, **kw): - failed_for = [] - fail = False - for url in my_urls: - try: - log.debug("Running test with engine %s", url) - try: - self._setup(url) - except sa_exc.OperationalError: - log.info('Backend %s is not available, skip it', url) - continue - except Exception as e: - raise RuntimeError('Exception during _setup(): %r' % e) - - try: - f(self, *a, **kw) - finally: - try: - self._teardown() - except Exception as e: - raise RuntimeError('Exception during _teardown(): %r' % e) - except Exception: - failed_for.append(url) - fail = sys.exc_info() - for url in failed_for: - log.error('Failed for %s', url) - if fail: - # cause the failure :-) - six.reraise(*fail) - return dec - - -class DB(Base): - # Constants: connection level - NONE = 0 # No connection; just set self.url - CONNECT = 1 # Connect; no transaction - TXN = 2 # Everything in a transaction - - level = TXN - - def _engineInfo(self, url=None): - if url is None: - url = self.url - return url - - def _setup(self, url): - self._connect(url) - # make sure there are no tables lying around - meta = MetaData(self.engine) - meta.reflect() - meta.drop_all() - - def _teardown(self): - self._disconnect() - - def _connect(self, url): - self.url = url - # TODO: seems like 0.5.x branch does not work with engine.dispose and staticpool - #self.engine = create_engine(url, echo=True, poolclass=StaticPool) - self.engine = create_engine(url, echo=True) - # silence the logger added by SA, nose adds its own! - logging.getLogger('sqlalchemy').handlers=[] - self.meta = MetaData(bind=self.engine) - if self.level < self.CONNECT: - return - #self.session = create_session(bind=self.engine) - if self.level < self.TXN: - return - #self.txn = self.session.begin() - - def _disconnect(self): - if hasattr(self, 'txn'): - self.txn.rollback() - if hasattr(self, 'session'): - self.session.close() - #if hasattr(self,'conn'): - # self.conn.close() - self.engine.dispose() - - def _supported(self, url): - db = url.split(':',1)[0] - func = getattr(self, self._TestCase__testMethodName) - if hasattr(func, 'supported'): - return db in func.supported - if hasattr(func, 'not_supported'): - return not (db in func.not_supported) - # Neither list assigned; assume all are supported - return True - - def _not_supported(self, url): - return not self._supported(url) - - def _select_row(self): - """Select rows, used in multiple tests""" - return self.table.select().execution_options( - autocommit=True).execute().fetchone() - - def refresh_table(self, name=None): - """Reload the table from the database - Assumes we're working with only a single table, self.table, and - metadata self.meta - - Working w/ multiple tables is not possible, as tables can only be - reloaded with meta.clear() - """ - if name is None: - name = self.table.name - self.meta.clear() - self.table = Table(name, self.meta, autoload=True) - - def compare_columns_equal(self, columns1, columns2, ignore=None): - """Loop through all columns and compare them""" - def key(column): - return column.name - for c1, c2 in zip(sorted(columns1, key=key), sorted(columns2, key=key)): - diffs = ColumnDelta(c1, c2).diffs - if ignore: - for key in ignore: - diffs.pop(key, None) - if diffs: - self.fail("Comparing %s to %s failed: %s" % (columns1, columns2, diffs)) - -# TODO: document engine.dispose and write tests diff --git a/migrate/tests/fixture/models.py b/migrate/tests/fixture/models.py deleted file mode 100644 index ee76429..0000000 --- a/migrate/tests/fixture/models.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy import * - -# test rundiffs in shell -meta_old_rundiffs = MetaData() -meta_rundiffs = MetaData() -meta = MetaData() - -tmp_account_rundiffs = Table('tmp_account_rundiffs', meta_rundiffs, - Column('id', Integer, primary_key=True), - Column('login', Text()), - Column('passwd', Text()), -) - -tmp_sql_table = Table('tmp_sql_table', meta, Column('id', Integer)) diff --git a/migrate/tests/fixture/pathed.py b/migrate/tests/fixture/pathed.py deleted file mode 100644 index 78cf4cd..0000000 --- a/migrate/tests/fixture/pathed.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import shutil -import tempfile - -from migrate.tests.fixture import base - - -class Pathed(base.Base): - # Temporary files - - _tmpdir = tempfile.mkdtemp() - - def setUp(self): - super(Pathed, self).setUp() - self.temp_usable_dir = tempfile.mkdtemp() - sys.path.append(self.temp_usable_dir) - - def tearDown(self): - super(Pathed, self).tearDown() - try: - sys.path.remove(self.temp_usable_dir) - except: - pass # w00t? - Pathed.purge(self.temp_usable_dir) - - @classmethod - def _tmp(cls, prefix='', suffix=''): - """Generate a temporary file name that doesn't exist - All filenames are generated inside a temporary directory created by - tempfile.mkdtemp(); only the creating user has access to this directory. - It should be secure to return a nonexistant temp filename in this - directory, unless the user is messing with their own files. - """ - file, ret = tempfile.mkstemp(suffix,prefix,cls._tmpdir) - os.close(file) - os.remove(ret) - return ret - - @classmethod - def tmp(cls, *p, **k): - return cls._tmp(*p, **k) - - @classmethod - def tmp_py(cls, *p, **k): - return cls._tmp(suffix='.py', *p, **k) - - @classmethod - def tmp_sql(cls, *p, **k): - return cls._tmp(suffix='.sql', *p, **k) - - @classmethod - def tmp_named(cls, name): - return os.path.join(cls._tmpdir, name) - - @classmethod - def tmp_repos(cls, *p, **k): - return cls._tmp(*p, **k) - - @classmethod - def purge(cls, path): - """Removes this path if it exists, in preparation for tests - Careful - all tests should take place in /tmp. - We don't want to accidentally wipe stuff out... - """ - if os.path.exists(path): - if os.path.isdir(path): - shutil.rmtree(path) - else: - os.remove(path) - if path.endswith('.py'): - pyc = path + 'c' - if os.path.exists(pyc): - os.remove(pyc) diff --git a/migrate/tests/fixture/shell.py b/migrate/tests/fixture/shell.py deleted file mode 100644 index 566d250..0000000 --- a/migrate/tests/fixture/shell.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import sys -import logging - -from scripttest import TestFileEnvironment - -from migrate.tests.fixture.pathed import * - - -log = logging.getLogger(__name__) - -class Shell(Pathed): - """Base class for command line tests""" - - def setUp(self): - super(Shell, self).setUp() - migrate_path = os.path.dirname(sys.executable) - # PATH to migrate development script folder - log.debug('PATH for ScriptTest: %s', migrate_path) - self.env = TestFileEnvironment( - base_path=os.path.join(self.temp_usable_dir, 'env'), - ) - - def run_version(self, repos_path): - result = self.env.run('migrate version %s' % repos_path) - return int(result.stdout.strip()) - - def run_db_version(self, url, repos_path): - result = self.env.run('migrate db_version %s %s' % (url, repos_path)) - return int(result.stdout.strip()) diff --git a/migrate/tests/fixture/warnings.py b/migrate/tests/fixture/warnings.py deleted file mode 100644 index 8d99c0f..0000000 --- a/migrate/tests/fixture/warnings.py +++ /dev/null @@ -1,88 +0,0 @@ -# lifted from Python 2.6, so we can use it in Python 2.5 -import sys - -class WarningMessage(object): - - """Holds the result of a single showwarning() call.""" - - _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", - "line") - - def __init__(self, message, category, filename, lineno, file=None, - line=None): - local_values = locals() - for attr in self._WARNING_DETAILS: - setattr(self, attr, local_values[attr]) - if category: - self._category_name = category.__name__ - else: - self._category_name = None - - def __str__(self): - return ("{message : %r, category : %r, filename : %r, lineno : %s, " - "line : %r}" % (self.message, self._category_name, - self.filename, self.lineno, self.line)) - - -class catch_warnings(object): - - """A context manager that copies and restores the warnings filter upon - exiting the context. - - The 'record' argument specifies whether warnings should be captured by a - custom implementation of warnings.showwarning() and be appended to a list - returned by the context manager. Otherwise None is returned by the context - manager. The objects appended to the list are arguments whose attributes - mirror the arguments to showwarning(). - - The 'module' argument is to specify an alternative module to the module - named 'warnings' and imported under that name. This argument is only useful - when testing the warnings module itself. - - """ - - def __init__(self, record=False, module=None): - """Specify whether to record warnings and if an alternative module - should be used other than sys.modules['warnings']. - - For compatibility with Python 3.0, please consider all arguments to be - keyword-only. - - """ - self._record = record - if module is None: - self._module = sys.modules['warnings'] - else: - self._module = module - self._entered = False - - def __repr__(self): - args = [] - if self._record: - args.append("record=True") - if self._module is not sys.modules['warnings']: - args.append("module=%r" % self._module) - name = type(self).__name__ - return "%s(%s)" % (name, ", ".join(args)) - - def __enter__(self): - if self._entered: - raise RuntimeError("Cannot enter %r twice" % self) - self._entered = True - self._filters = self._module.filters - self._module.filters = self._filters[:] - self._showwarning = self._module.showwarning - if self._record: - log = [] - def showwarning(*args, **kwargs): - log.append(WarningMessage(*args, **kwargs)) - self._module.showwarning = showwarning - return log - else: - return None - - def __exit__(self, *exc_info): - if not self._entered: - raise RuntimeError("Cannot exit %r without entering first" % self) - self._module.filters = self._filters - self._module.showwarning = self._showwarning |