diff options
Diffstat (limited to 'test/orm/test_session.py')
-rw-r--r-- | test/orm/test_session.py | 382 |
1 files changed, 0 insertions, 382 deletions
diff --git a/test/orm/test_session.py b/test/orm/test_session.py index 9aeea8287..d8b9b073a 100644 --- a/test/orm/test_session.py +++ b/test/orm/test_session.py @@ -5,17 +5,14 @@ import inspect import pickle from sqlalchemy.orm import create_session, sessionmaker, attributes, \ make_transient, Session -from sqlalchemy.orm.attributes import instance_state import sqlalchemy as sa from test.lib import engines, testing, config from sqlalchemy import Integer, String, Sequence from test.lib.schema import Table, Column from sqlalchemy.orm import mapper, relationship, backref, joinedload, \ exc as orm_exc, object_session -from test.lib.testing import eq_ from test.engine import _base as engine_base from test.orm import _base, _fixtures -from sqlalchemy import event class SessionTest(_fixtures.FixtureTest): run_inserts = None @@ -1211,385 +1208,6 @@ class SessionTest(_fixtures.FixtureTest): del u3 gc_collect() -class SessionEventsTest(_fixtures.FixtureTest): - run_inserts = None - - def test_class_listen(self): - def my_listener(*arg, **kw): - pass - - event.listen(Session, 'before_flush', my_listener) - - s = Session() - assert my_listener in s.dispatch.before_flush - - def test_sessionmaker_listen(self): - """test that listen can be applied to individual scoped_session() classes.""" - - def my_listener_one(*arg, **kw): - pass - def my_listener_two(*arg, **kw): - pass - - S1 = sessionmaker() - S2 = sessionmaker() - - event.listen(Session, 'before_flush', my_listener_one) - event.listen(S1, 'before_flush', my_listener_two) - - s1 = S1() - assert my_listener_one in s1.dispatch.before_flush - assert my_listener_two in s1.dispatch.before_flush - - s2 = S2() - assert my_listener_one in s2.dispatch.before_flush - assert my_listener_two not in s2.dispatch.before_flush - - def test_scoped_session_invalid_callable(self): - from sqlalchemy.orm import scoped_session - - def my_listener_one(*arg, **kw): - pass - - scope = scoped_session(lambda:Session()) - - assert_raises_message( - sa.exc.ArgumentError, - "Session event listen on a ScopedSession " - "requries that its creation callable is a Session subclass.", - event.listen, scope, "before_flush", my_listener_one - ) - - def test_scoped_session_invalid_class(self): - from sqlalchemy.orm import scoped_session - - def my_listener_one(*arg, **kw): - pass - - class NotASession(object): - def __call__(self): - return Session() - - scope = scoped_session(NotASession) - - assert_raises_message( - sa.exc.ArgumentError, - "Session event listen on a ScopedSession " - "requries that its creation callable is a Session subclass.", - event.listen, scope, "before_flush", my_listener_one - ) - - def test_scoped_session_listen(self): - from sqlalchemy.orm import scoped_session - - def my_listener_one(*arg, **kw): - pass - - scope = scoped_session(sessionmaker()) - event.listen(scope, "before_flush", my_listener_one) - - assert my_listener_one in scope().dispatch.before_flush - - def _listener_fixture(self, **kw): - canary = [] - def listener(name): - def go(*arg, **kw): - canary.append(name) - return go - - sess = Session(**kw) - - for evt in [ - 'before_commit', - 'after_commit', - 'after_rollback', - 'before_flush', - 'after_flush', - 'after_flush_postexec', - 'after_begin', - 'after_attach', - 'after_bulk_update', - 'after_bulk_delete' - ]: - event.listen(sess, evt, listener(evt)) - - return sess, canary - - @testing.resolve_artifact_names - def test_flush_autocommit_hook(self): - - mapper(User, users) - - sess, canary = self._listener_fixture(autoflush=False, autocommit=True) - - u = User(name='u1') - sess.add(u) - sess.flush() - eq_( - canary, - [ 'after_attach', 'before_flush', 'after_begin', - 'after_flush', 'before_commit', 'after_commit', - 'after_flush_postexec', ] - ) - - @testing.resolve_artifact_names - def test_flush_noautocommit_hook(self): - sess, canary = self._listener_fixture() - - mapper(User, users) - - u = User(name='u1') - sess.add(u) - sess.flush() - eq_(canary, ['after_attach', 'before_flush', 'after_begin', - 'after_flush', 'after_flush_postexec']) - - @testing.resolve_artifact_names - def test_flush_in_commit_hook(self): - sess, canary = self._listener_fixture() - - mapper(User, users) - u = User(name='u1') - sess.add(u) - sess.flush() - canary[:] = [] - - u.name = 'ed' - sess.commit() - eq_(canary, ['before_commit', 'before_flush', 'after_flush', - 'after_flush_postexec', 'after_commit']) - - def test_standalone_on_commit_hook(self): - sess, canary = self._listener_fixture() - sess.commit() - eq_(canary, ['before_commit', 'after_commit']) - - @testing.resolve_artifact_names - def test_on_bulk_update_hook(self): - sess, canary = self._listener_fixture() - mapper(User, users) - sess.query(User).update({'name': 'foo'}) - eq_(canary, ['after_begin', 'after_bulk_update']) - - @testing.resolve_artifact_names - def test_on_bulk_delete_hook(self): - sess, canary = self._listener_fixture() - mapper(User, users) - sess.query(User).delete() - eq_(canary, ['after_begin', 'after_bulk_delete']) - - def test_connection_emits_after_begin(self): - sess, canary = self._listener_fixture(bind=testing.db) - conn = sess.connection() - eq_(canary, ['after_begin']) - - @testing.resolve_artifact_names - def test_reentrant_flush(self): - - mapper(User, users) - - def before_flush(session, flush_context, objects): - session.flush() - - sess = Session() - event.listen(sess, 'before_flush', before_flush) - sess.add(User(name='foo')) - assert_raises_message(sa.exc.InvalidRequestError, - 'already flushing', sess.flush) - - @testing.resolve_artifact_names - def test_before_flush_affects_flush_plan(self): - - mapper(User, users) - - def before_flush(session, flush_context, objects): - for obj in list(session.new) + list(session.dirty): - if isinstance(obj, User): - session.add(User(name='another %s' % obj.name)) - for obj in list(session.deleted): - if isinstance(obj, User): - x = session.query(User).filter(User.name - == 'another %s' % obj.name).one() - session.delete(x) - - sess = Session() - event.listen(sess, 'before_flush', before_flush) - - u = User(name='u1') - sess.add(u) - sess.flush() - eq_(sess.query(User).order_by(User.name).all(), - [ - User(name='another u1'), - User(name='u1') - ] - ) - - sess.flush() - eq_(sess.query(User).order_by(User.name).all(), - [ - User(name='another u1'), - User(name='u1') - ] - ) - - u.name='u2' - sess.flush() - eq_(sess.query(User).order_by(User.name).all(), - [ - User(name='another u1'), - User(name='another u2'), - User(name='u2') - ] - ) - - sess.delete(u) - sess.flush() - eq_(sess.query(User).order_by(User.name).all(), - [ - User(name='another u1'), - ] - ) - - @testing.resolve_artifact_names - def test_before_flush_affects_dirty(self): - mapper(User, users) - - def before_flush(session, flush_context, objects): - for obj in list(session.identity_map.values()): - obj.name += " modified" - - sess = Session(autoflush=True) - event.listen(sess, 'before_flush', before_flush) - - u = User(name='u1') - sess.add(u) - sess.flush() - eq_(sess.query(User).order_by(User.name).all(), - [User(name='u1')] - ) - - sess.add(User(name='u2')) - sess.flush() - sess.expunge_all() - eq_(sess.query(User).order_by(User.name).all(), - [ - User(name='u1 modified'), - User(name='u2') - ] - ) - - def teardown(self): - # TODO: need to get remove() functionality - # going - Session.dispatch._clear() - super(SessionEventsTest, self).teardown() - - -class SessionExtensionTest(_fixtures.FixtureTest): - run_inserts = None - - @testing.resolve_artifact_names - def test_extension(self): - mapper(User, users) - log = [] - class MyExt(sa.orm.session.SessionExtension): - def before_commit(self, session): - log.append('before_commit') - def after_commit(self, session): - log.append('after_commit') - def after_rollback(self, session): - log.append('after_rollback') - def before_flush(self, session, flush_context, objects): - log.append('before_flush') - def after_flush(self, session, flush_context): - log.append('after_flush') - def after_flush_postexec(self, session, flush_context): - log.append('after_flush_postexec') - def after_begin(self, session, transaction, connection): - log.append('after_begin') - def after_attach(self, session, instance): - log.append('after_attach') - def after_bulk_update( - self, - session, - query, - query_context, - result, - ): - log.append('after_bulk_update') - - def after_bulk_delete( - self, - session, - query, - query_context, - result, - ): - log.append('after_bulk_delete') - - sess = create_session(extension = MyExt()) - u = User(name='u1') - sess.add(u) - sess.flush() - assert log == [ - 'after_attach', - 'before_flush', - 'after_begin', - 'after_flush', - 'before_commit', - 'after_commit', - 'after_flush_postexec', - ] - log = [] - sess = create_session(autocommit=False, extension=MyExt()) - u = User(name='u1') - sess.add(u) - sess.flush() - assert log == ['after_attach', 'before_flush', 'after_begin', - 'after_flush', 'after_flush_postexec'] - log = [] - u.name = 'ed' - sess.commit() - assert log == ['before_commit', 'before_flush', 'after_flush', - 'after_flush_postexec', 'after_commit'] - log = [] - sess.commit() - assert log == ['before_commit', 'after_commit'] - log = [] - sess.query(User).delete() - assert log == ['after_begin', 'after_bulk_delete'] - log = [] - sess.query(User).update({'name': 'foo'}) - assert log == ['after_bulk_update'] - log = [] - sess = create_session(autocommit=False, extension=MyExt(), - bind=testing.db) - conn = sess.connection() - assert log == ['after_begin'] - - @testing.resolve_artifact_names - def test_multiple_extensions(self): - log = [] - class MyExt1(sa.orm.session.SessionExtension): - def before_commit(self, session): - log.append('before_commit_one') - - - class MyExt2(sa.orm.session.SessionExtension): - def before_commit(self, session): - log.append('before_commit_two') - - mapper(User, users) - sess = create_session(extension = [MyExt1(), MyExt2()]) - u = User(name='u1') - sess.add(u) - sess.flush() - assert log == [ - 'before_commit_one', - 'before_commit_two', - ] - class DisposedStates(_base.MappedTest): run_setup_mappers = 'once' run_inserts = 'once' |