summaryrefslogtreecommitdiff
path: root/test/orm/test_session.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/test_session.py')
-rw-r--r--test/orm/test_session.py382
1 files changed, 0 insertions, 382 deletions
diff --git a/test/orm/test_session.py b/test/orm/test_session.py
index 9aeea8287..d8b9b073a 100644
--- a/test/orm/test_session.py
+++ b/test/orm/test_session.py
@@ -5,17 +5,14 @@ import inspect
import pickle
from sqlalchemy.orm import create_session, sessionmaker, attributes, \
make_transient, Session
-from sqlalchemy.orm.attributes import instance_state
import sqlalchemy as sa
from test.lib import engines, testing, config
from sqlalchemy import Integer, String, Sequence
from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
exc as orm_exc, object_session
-from test.lib.testing import eq_
from test.engine import _base as engine_base
from test.orm import _base, _fixtures
-from sqlalchemy import event
class SessionTest(_fixtures.FixtureTest):
run_inserts = None
@@ -1211,385 +1208,6 @@ class SessionTest(_fixtures.FixtureTest):
del u3
gc_collect()
-class SessionEventsTest(_fixtures.FixtureTest):
- run_inserts = None
-
- def test_class_listen(self):
- def my_listener(*arg, **kw):
- pass
-
- event.listen(Session, 'before_flush', my_listener)
-
- s = Session()
- assert my_listener in s.dispatch.before_flush
-
- def test_sessionmaker_listen(self):
- """test that listen can be applied to individual scoped_session() classes."""
-
- def my_listener_one(*arg, **kw):
- pass
- def my_listener_two(*arg, **kw):
- pass
-
- S1 = sessionmaker()
- S2 = sessionmaker()
-
- event.listen(Session, 'before_flush', my_listener_one)
- event.listen(S1, 'before_flush', my_listener_two)
-
- s1 = S1()
- assert my_listener_one in s1.dispatch.before_flush
- assert my_listener_two in s1.dispatch.before_flush
-
- s2 = S2()
- assert my_listener_one in s2.dispatch.before_flush
- assert my_listener_two not in s2.dispatch.before_flush
-
- def test_scoped_session_invalid_callable(self):
- from sqlalchemy.orm import scoped_session
-
- def my_listener_one(*arg, **kw):
- pass
-
- scope = scoped_session(lambda:Session())
-
- assert_raises_message(
- sa.exc.ArgumentError,
- "Session event listen on a ScopedSession "
- "requries that its creation callable is a Session subclass.",
- event.listen, scope, "before_flush", my_listener_one
- )
-
- def test_scoped_session_invalid_class(self):
- from sqlalchemy.orm import scoped_session
-
- def my_listener_one(*arg, **kw):
- pass
-
- class NotASession(object):
- def __call__(self):
- return Session()
-
- scope = scoped_session(NotASession)
-
- assert_raises_message(
- sa.exc.ArgumentError,
- "Session event listen on a ScopedSession "
- "requries that its creation callable is a Session subclass.",
- event.listen, scope, "before_flush", my_listener_one
- )
-
- def test_scoped_session_listen(self):
- from sqlalchemy.orm import scoped_session
-
- def my_listener_one(*arg, **kw):
- pass
-
- scope = scoped_session(sessionmaker())
- event.listen(scope, "before_flush", my_listener_one)
-
- assert my_listener_one in scope().dispatch.before_flush
-
- def _listener_fixture(self, **kw):
- canary = []
- def listener(name):
- def go(*arg, **kw):
- canary.append(name)
- return go
-
- sess = Session(**kw)
-
- for evt in [
- 'before_commit',
- 'after_commit',
- 'after_rollback',
- 'before_flush',
- 'after_flush',
- 'after_flush_postexec',
- 'after_begin',
- 'after_attach',
- 'after_bulk_update',
- 'after_bulk_delete'
- ]:
- event.listen(sess, evt, listener(evt))
-
- return sess, canary
-
- @testing.resolve_artifact_names
- def test_flush_autocommit_hook(self):
-
- mapper(User, users)
-
- sess, canary = self._listener_fixture(autoflush=False, autocommit=True)
-
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- eq_(
- canary,
- [ 'after_attach', 'before_flush', 'after_begin',
- 'after_flush', 'before_commit', 'after_commit',
- 'after_flush_postexec', ]
- )
-
- @testing.resolve_artifact_names
- def test_flush_noautocommit_hook(self):
- sess, canary = self._listener_fixture()
-
- mapper(User, users)
-
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- eq_(canary, ['after_attach', 'before_flush', 'after_begin',
- 'after_flush', 'after_flush_postexec'])
-
- @testing.resolve_artifact_names
- def test_flush_in_commit_hook(self):
- sess, canary = self._listener_fixture()
-
- mapper(User, users)
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- canary[:] = []
-
- u.name = 'ed'
- sess.commit()
- eq_(canary, ['before_commit', 'before_flush', 'after_flush',
- 'after_flush_postexec', 'after_commit'])
-
- def test_standalone_on_commit_hook(self):
- sess, canary = self._listener_fixture()
- sess.commit()
- eq_(canary, ['before_commit', 'after_commit'])
-
- @testing.resolve_artifact_names
- def test_on_bulk_update_hook(self):
- sess, canary = self._listener_fixture()
- mapper(User, users)
- sess.query(User).update({'name': 'foo'})
- eq_(canary, ['after_begin', 'after_bulk_update'])
-
- @testing.resolve_artifact_names
- def test_on_bulk_delete_hook(self):
- sess, canary = self._listener_fixture()
- mapper(User, users)
- sess.query(User).delete()
- eq_(canary, ['after_begin', 'after_bulk_delete'])
-
- def test_connection_emits_after_begin(self):
- sess, canary = self._listener_fixture(bind=testing.db)
- conn = sess.connection()
- eq_(canary, ['after_begin'])
-
- @testing.resolve_artifact_names
- def test_reentrant_flush(self):
-
- mapper(User, users)
-
- def before_flush(session, flush_context, objects):
- session.flush()
-
- sess = Session()
- event.listen(sess, 'before_flush', before_flush)
- sess.add(User(name='foo'))
- assert_raises_message(sa.exc.InvalidRequestError,
- 'already flushing', sess.flush)
-
- @testing.resolve_artifact_names
- def test_before_flush_affects_flush_plan(self):
-
- mapper(User, users)
-
- def before_flush(session, flush_context, objects):
- for obj in list(session.new) + list(session.dirty):
- if isinstance(obj, User):
- session.add(User(name='another %s' % obj.name))
- for obj in list(session.deleted):
- if isinstance(obj, User):
- x = session.query(User).filter(User.name
- == 'another %s' % obj.name).one()
- session.delete(x)
-
- sess = Session()
- event.listen(sess, 'before_flush', before_flush)
-
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- eq_(sess.query(User).order_by(User.name).all(),
- [
- User(name='another u1'),
- User(name='u1')
- ]
- )
-
- sess.flush()
- eq_(sess.query(User).order_by(User.name).all(),
- [
- User(name='another u1'),
- User(name='u1')
- ]
- )
-
- u.name='u2'
- sess.flush()
- eq_(sess.query(User).order_by(User.name).all(),
- [
- User(name='another u1'),
- User(name='another u2'),
- User(name='u2')
- ]
- )
-
- sess.delete(u)
- sess.flush()
- eq_(sess.query(User).order_by(User.name).all(),
- [
- User(name='another u1'),
- ]
- )
-
- @testing.resolve_artifact_names
- def test_before_flush_affects_dirty(self):
- mapper(User, users)
-
- def before_flush(session, flush_context, objects):
- for obj in list(session.identity_map.values()):
- obj.name += " modified"
-
- sess = Session(autoflush=True)
- event.listen(sess, 'before_flush', before_flush)
-
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- eq_(sess.query(User).order_by(User.name).all(),
- [User(name='u1')]
- )
-
- sess.add(User(name='u2'))
- sess.flush()
- sess.expunge_all()
- eq_(sess.query(User).order_by(User.name).all(),
- [
- User(name='u1 modified'),
- User(name='u2')
- ]
- )
-
- def teardown(self):
- # TODO: need to get remove() functionality
- # going
- Session.dispatch._clear()
- super(SessionEventsTest, self).teardown()
-
-
-class SessionExtensionTest(_fixtures.FixtureTest):
- run_inserts = None
-
- @testing.resolve_artifact_names
- def test_extension(self):
- mapper(User, users)
- log = []
- class MyExt(sa.orm.session.SessionExtension):
- def before_commit(self, session):
- log.append('before_commit')
- def after_commit(self, session):
- log.append('after_commit')
- def after_rollback(self, session):
- log.append('after_rollback')
- def before_flush(self, session, flush_context, objects):
- log.append('before_flush')
- def after_flush(self, session, flush_context):
- log.append('after_flush')
- def after_flush_postexec(self, session, flush_context):
- log.append('after_flush_postexec')
- def after_begin(self, session, transaction, connection):
- log.append('after_begin')
- def after_attach(self, session, instance):
- log.append('after_attach')
- def after_bulk_update(
- self,
- session,
- query,
- query_context,
- result,
- ):
- log.append('after_bulk_update')
-
- def after_bulk_delete(
- self,
- session,
- query,
- query_context,
- result,
- ):
- log.append('after_bulk_delete')
-
- sess = create_session(extension = MyExt())
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- assert log == [
- 'after_attach',
- 'before_flush',
- 'after_begin',
- 'after_flush',
- 'before_commit',
- 'after_commit',
- 'after_flush_postexec',
- ]
- log = []
- sess = create_session(autocommit=False, extension=MyExt())
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- assert log == ['after_attach', 'before_flush', 'after_begin',
- 'after_flush', 'after_flush_postexec']
- log = []
- u.name = 'ed'
- sess.commit()
- assert log == ['before_commit', 'before_flush', 'after_flush',
- 'after_flush_postexec', 'after_commit']
- log = []
- sess.commit()
- assert log == ['before_commit', 'after_commit']
- log = []
- sess.query(User).delete()
- assert log == ['after_begin', 'after_bulk_delete']
- log = []
- sess.query(User).update({'name': 'foo'})
- assert log == ['after_bulk_update']
- log = []
- sess = create_session(autocommit=False, extension=MyExt(),
- bind=testing.db)
- conn = sess.connection()
- assert log == ['after_begin']
-
- @testing.resolve_artifact_names
- def test_multiple_extensions(self):
- log = []
- class MyExt1(sa.orm.session.SessionExtension):
- def before_commit(self, session):
- log.append('before_commit_one')
-
-
- class MyExt2(sa.orm.session.SessionExtension):
- def before_commit(self, session):
- log.append('before_commit_two')
-
- mapper(User, users)
- sess = create_session(extension = [MyExt1(), MyExt2()])
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- assert log == [
- 'before_commit_one',
- 'before_commit_two',
- ]
-
class DisposedStates(_base.MappedTest):
run_setup_mappers = 'once'
run_inserts = 'once'