summaryrefslogtreecommitdiff
path: root/tests/unit/db/sqlalchemy/test_sqlalchemy.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/db/sqlalchemy/test_sqlalchemy.py')
-rw-r--r--tests/unit/db/sqlalchemy/test_sqlalchemy.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/tests/unit/db/sqlalchemy/test_sqlalchemy.py b/tests/unit/db/sqlalchemy/test_sqlalchemy.py
index 49a11625..54840647 100644
--- a/tests/unit/db/sqlalchemy/test_sqlalchemy.py
+++ b/tests/unit/db/sqlalchemy/test_sqlalchemy.py
@@ -75,6 +75,54 @@ class SessionErrorWrapperTestCase(test_base.DbTestCase):
tbl2.update({'foo': 10})
self.assertRaises(db_exc.DBDuplicateEntry, tbl2.save, _session)
+ def test_nested_session_wrappers(self):
+ _session = self.sessionmaker()
+
+ tbl = TmpTable()
+ tbl.update({'foo': 10})
+ tbl.save(_session)
+
+ tbl2 = TmpTable()
+ tbl2.update({'foo': 10})
+ _session.begin()
+ _session.add(tbl2)
+
+ # commit will call upon flush which raises the exception;
+ # the wrap around commit needs to not interfere with the
+ # already wrapped exception coming from flush
+ self.assertRaises(db_exc.DBDuplicateEntry, _session.commit)
+
+ def test_commit_wrapper(self):
+ # test a commit that raises at the point of commit,
+ # not the flush
+
+ _session = self.sessionmaker()
+
+ with _session.begin():
+ # patch SQLAlchemy connection._commit_impl to
+ # raise a deadlock exception during commit only.
+ deadlock_on_commit = mock.Mock(
+ side_effect=sqla_exc.OperationalError(
+ "simulated deadlock", None, None))
+
+ # patch our own _raise_if_deadlock_error filter to treat
+ # this OperationalError as a DBDeadlock.
+ def _raise_if_deadlock_error(operational_error, engine_name):
+ self.assertIsInstance(
+ operational_error, sqla_exc.OperationalError)
+ self.assertTrue(
+ "simulated deadlock" in str(operational_error))
+ raise db_exc.DBDeadlock(operational_error)
+
+ connection = _session.connection()
+
+ with mock.patch.object(
+ connection, '_commit_impl', deadlock_on_commit):
+ with mock.patch.object(
+ session, "_raise_if_deadlock_error",
+ _raise_if_deadlock_error):
+ self.assertRaises(db_exc.DBDeadlock, _session.commit)
+
def test_execute_wrapper(self):
_session = self.sessionmaker()
with _session.begin():