summaryrefslogtreecommitdiff
path: root/oslo_db
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_db')
-rw-r--r--oslo_db/sqlalchemy/test_fixtures.py16
-rw-r--r--oslo_db/sqlalchemy/utils.py127
-rw-r--r--oslo_db/tests/sqlalchemy/test_provision.py76
3 files changed, 0 insertions, 219 deletions
diff --git a/oslo_db/sqlalchemy/test_fixtures.py b/oslo_db/sqlalchemy/test_fixtures.py
index 6b82f05..8b69d3f 100644
--- a/oslo_db/sqlalchemy/test_fixtures.py
+++ b/oslo_db/sqlalchemy/test_fixtures.py
@@ -254,22 +254,6 @@ class DeletesFromSchema(ResetsData):
"""
-class RollsBackTransaction(ResetsData):
- """Fixture class that maintains a database transaction per test.
-
- """
-
- def setup_for_reset(self, engine, facade):
- conn = engine.connect()
- engine = utils.NonCommittingEngine(conn)
- self._reset_engine = enginefacade._TestTransactionFactory.apply_engine(
- engine, facade)
-
- def reset_schema_data(self, engine, facade):
- self._reset_engine()
- engine._dispose()
-
-
class SimpleDbFixture(BaseDbFixture):
"""Fixture which provides an engine from a fixed URL.
diff --git a/oslo_db/sqlalchemy/utils.py b/oslo_db/sqlalchemy/utils.py
index 3c58bd6..3a6a993 100644
--- a/oslo_db/sqlalchemy/utils.py
+++ b/oslo_db/sqlalchemy/utils.py
@@ -1261,130 +1261,3 @@ def suspend_fk_constraints_for_col_alter(
deferrable=fk['options'].get('deferrable'),
initially=fk['options'].get('initially'),
)
-
-
-class NonCommittingConnectable(object):
- """A ``Connectable`` substitute which rolls all operations back.
-
- ``NonCommittingConnectable`` forms the basis of mock
- ``Engine`` and ``Connection`` objects within a test. It provides
- only that part of the API that should reasonably be used within
- a single-connection test environment (e.g. no engine.dispose(),
- connection.invalidate(), etc. ). The connection runs both within
- a transaction as well as a savepoint. The transaction is there
- so that any operations upon the connection can be rolled back.
- If the test calls begin(), a "pseduo" transaction is returned that
- won't actually commit anything. The subtransaction is there to allow
- a test to successfully call rollback(), however, where all operations
- to that point will be rolled back and the operations can continue,
- simulating a real rollback while still remaining within a transaction
- external to the test.
-
- """
-
- _nested_trans = None
-
- def __init__(self, connection):
- self.connection = connection
- self._trans = connection.begin()
- self._restart_nested()
-
- def _restart_nested(self):
- if self._nested_trans is not None:
- self._nested_trans.rollback()
- self._nested_trans = self.connection.begin_nested()
-
- def _dispose(self):
- if not self.connection.closed:
- self._nested_trans.rollback()
- self._trans.rollback()
- self.connection.close()
-
- def execute(self, obj, *multiparams, **params):
- """Executes the given construct and returns a :class:`.ResultProxy`."""
-
- return self.connection.execute(obj, *multiparams, **params)
-
- def scalar(self, obj, *multiparams, **params):
- """Executes and returns the first column of the first row."""
-
- return self.connection.scalar(obj, *multiparams, **params)
-
-
-class NonCommittingEngine(NonCommittingConnectable):
- """``Engine`` -specific non committing connectbale."""
-
- @property
- def url(self):
- return self.connection.engine.url
-
- @property
- def engine(self):
- return self
-
- def connect(self):
- return NonCommittingConnection(self.connection)
-
- @contextlib.contextmanager
- def begin(self):
- conn = self.connect()
- trans = conn.begin()
- try:
- yield conn
- except Exception:
- trans.rollback()
- else:
- trans.commit()
-
-
-class NonCommittingConnection(NonCommittingConnectable):
- """``Connection`` -specific non committing connectbale."""
-
- def close(self):
- """Close the 'Connection'.
-
- In this context, close() is a no-op.
-
- """
- pass
-
- def begin(self):
- return NonCommittingTransaction(self, self.connection.begin())
-
- def __enter__(self):
- return self
-
- def __exit__(self, *arg):
- pass
-
-
-class NonCommittingTransaction(object):
- """A wrapper for ``Transaction``.
-
- This is to accommodate being able to guaranteed start a new
- SAVEPOINT when a transaction is rolled back.
-
- """
- def __init__(self, provisioned, transaction):
- self.provisioned = provisioned
- self.transaction = transaction
-
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, traceback):
- if type is None:
- try:
- self.commit()
- except Exception:
- self.rollback()
- raise
- else:
- self.rollback()
-
- def commit(self):
- self.transaction.commit()
-
- def rollback(self):
- self.transaction.rollback()
- self.provisioned._restart_nested()
diff --git a/oslo_db/tests/sqlalchemy/test_provision.py b/oslo_db/tests/sqlalchemy/test_provision.py
index f0ef944..a6cedce 100644
--- a/oslo_db/tests/sqlalchemy/test_provision.py
+++ b/oslo_db/tests/sqlalchemy/test_provision.py
@@ -22,7 +22,6 @@ from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import provision
from oslo_db.sqlalchemy import test_fixtures
-from oslo_db.sqlalchemy import utils
from oslo_db.tests import base as test_base
from oslo_db.tests.sqlalchemy import base as db_test_base
@@ -149,81 +148,6 @@ class PostgreSQLDropAllObjectsTest(
pass
-class RetainSchemaTest(test_base.BaseTestCase):
- DRIVER = "sqlite"
-
- def setUp(self):
- super(RetainSchemaTest, self).setUp()
-
- metadata = schema.MetaData()
- self.test_table = schema.Table(
- 'test_table', metadata,
- schema.Column('x', types.Integer),
- schema.Column('y', types.Integer),
- mysql_engine='InnoDB'
- )
-
- def gen_schema(engine):
- metadata.create_all(engine, checkfirst=False)
- self._gen_schema = gen_schema
-
- def test_once(self):
- self._run_test()
-
- def test_twice(self):
- self._run_test()
-
- def _run_test(self):
- try:
- database_resource = provision.DatabaseResource(
- self.DRIVER, provision_new_database=True)
- except exception.BackendNotAvailable:
- self.skipTest("database not available")
-
- schema_resource = provision.SchemaResource(
- database_resource, self._gen_schema)
-
- schema = schema_resource.getResource()
-
- conn = schema.database.engine.connect()
- engine = utils.NonCommittingEngine(conn)
-
- with engine.connect() as conn:
- rows = conn.execute(self.test_table.select())
- self.assertEqual([], rows.fetchall())
-
- trans = conn.begin()
- conn.execute(
- self.test_table.insert(),
- {"x": 1, "y": 2}
- )
- trans.rollback()
-
- rows = conn.execute(self.test_table.select())
- self.assertEqual([], rows.fetchall())
-
- trans = conn.begin()
- conn.execute(
- self.test_table.insert(),
- {"x": 2, "y": 3}
- )
- trans.commit()
-
- rows = conn.execute(self.test_table.select())
- self.assertEqual([(2, 3)], rows.fetchall())
-
- engine._dispose()
- schema_resource.finishedWith(schema)
-
-
-class MySQLRetainSchemaTest(RetainSchemaTest):
- DRIVER = "mysql"
-
-
-class PostgresqlRetainSchemaTest(RetainSchemaTest):
- DRIVER = "postgresql"
-
-
class AdHocURLTest(test_base.BaseTestCase):
def test_sqlite_setup_teardown(self):