diff options
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/pg8000.py | 19 | ||||
-rw-r--r-- | test/orm/test_dynamic.py | 199 | ||||
-rw-r--r-- | test/orm/test_froms.py | 1999 | ||||
-rw-r--r-- | test/orm/test_naturalpks.py | 448 | ||||
-rw-r--r-- | test/orm/test_query.py | 1074 | ||||
-rw-r--r-- | test/orm/test_transaction.py | 250 | ||||
-rw-r--r-- | test/orm/test_versioning.py | 327 |
7 files changed, 2313 insertions, 2003 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py index 589567d9e..68da5b6d7 100644 --- a/lib/sqlalchemy/dialects/postgresql/pg8000.py +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -171,4 +171,23 @@ class PGDialect_pg8000(PGDialect): (level, self.name, ", ".join(self._isolation_lookup)) ) + def do_begin_twophase(self, connection, xid): + print("begin twophase", xid) + connection.connection.tpc_begin((0, xid, '')) + + def do_prepare_twophase(self, connection, xid): + print("prepare twophase", xid) + connection.connection.tpc_prepare() + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False): + connection.connection.tpc_rollback((0, xid, '')) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False): + connection.connection.tpc_commit((0, xid, '')) + + def do_recover_twophase(self, connection): + return [row[1] for row in connection.connection.tpc_recover()] + dialect = PGDialect_pg8000 diff --git a/test/orm/test_dynamic.py b/test/orm/test_dynamic.py index 54ea3c2f1..bc47ba3f3 100644 --- a/test/orm/test_dynamic.py +++ b/test/orm/test_dynamic.py @@ -1,18 +1,14 @@ -from sqlalchemy.testing import eq_, is_ -from sqlalchemy.orm import backref, configure_mappers -from sqlalchemy import testing -from sqlalchemy import desc, select, func, exc -from sqlalchemy.orm import mapper, relationship, create_session, Query, \ - attributes, exc as orm_exc, Session +from sqlalchemy import testing, desc, select, func, exc, cast, Integer +from sqlalchemy.orm import ( + mapper, relationship, create_session, Query, attributes, exc as orm_exc, + Session, backref, configure_mappers) from sqlalchemy.orm.dynamic import AppenderMixin -from sqlalchemy.testing import AssertsCompiledSQL, \ - assert_raises_message, assert_raises +from sqlalchemy.testing import ( + AssertsCompiledSQL, assert_raises_message, assert_raises, eq_, is_) from test.orm import _fixtures - from sqlalchemy.testing.assertsql import CompiledSQL - class _DynamicFixture(object): def _user_address_fixture(self, addresses_args={}): users, Address, addresses, User = (self.tables.users, @@ -20,10 +16,10 @@ class _DynamicFixture(object): self.tables.addresses, self.classes.User) - mapper(User, users, properties={ - 'addresses': relationship(Address, lazy="dynamic", - **addresses_args) - }) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, lazy="dynamic", **addresses_args)}) mapper(Address, addresses) return User, Address @@ -34,16 +30,15 @@ class _DynamicFixture(object): self.tables.order_items, self.classes.Item) - mapper(Order, orders, properties={ - 'items': relationship(Item, - secondary=order_items, - lazy="dynamic", - **items_args - ) - }) + mapper( + Order, orders, properties={ + 'items': relationship( + Item, secondary=order_items, lazy="dynamic", + **items_args)}) mapper(Item, items) return Order, Item + class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): def test_basic(self): @@ -86,7 +81,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): def test_no_uselist_false(self): User, Address = self._user_address_fixture( - addresses_args={"uselist": False}) + addresses_args={"uselist": False}) assert_raises_message( exc.InvalidRequestError, "On relationship User.addresses, 'dynamic' loaders cannot be " @@ -100,9 +95,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): self.classes.Address, self.tables.addresses, self.classes.User) - mapper(Address, addresses, properties={ - 'user': relationship(User, lazy='dynamic') - }) + mapper( + Address, addresses, properties={ + 'user': relationship(User, lazy='dynamic')}) mapper(User, users) assert_raises_message( exc.InvalidRequestError, @@ -118,7 +113,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): u = sess.query(User).get(8) eq_( list(u.addresses.order_by(desc(Address.email_address))), - [ + [ Address(email_address='ed@wood.com'), Address(email_address='ed@lala.com'), Address(email_address='ed@bettyboop.com') @@ -128,9 +123,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): def test_configured_order_by(self): addresses = self.tables.addresses User, Address = self._user_address_fixture( - addresses_args={ - "order_by": - addresses.c.email_address.desc()}) + addresses_args={"order_by": addresses.c.email_address.desc()}) sess = create_session() u = sess.query(User).get(8) @@ -183,6 +176,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): sess = create_session() ad = sess.query(Address).get(1) + def go(): ad.user = None self.assert_sql_count(testing.db, go, 0) @@ -202,12 +196,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): eq_( q.filter(User.id == 7).all(), [ - User(id=7, - addresses=[ - Address(id=1, email_address='jack@bean.com') - ]) - ] - ) + User( + id=7, addresses=[ + Address(id=1, email_address='jack@bean.com')])]) self.assert_sql_count(testing.db, go, 2) def test_no_populate(self): @@ -220,9 +211,8 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): ) def test_m2m(self): - Order, Item = self._order_item_fixture(items_args={ - "backref": backref("orders", lazy="dynamic") - }) + Order, Item = self._order_item_fixture( + items_args={"backref": backref("orders", lazy="dynamic")}) sess = create_session() o1 = Order(id=15, description="order 10") @@ -234,9 +224,9 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): assert o1 in i1.orders.all() assert i1 in o1.items.all() - @testing.exclude('mysql', 'between', - ((5, 1, 49), (5, 1, 52)), - 'https://bugs.launchpad.net/ubuntu/+source/mysql-5.1/+bug/706988') + @testing.exclude( + 'mysql', 'between', ((5, 1, 49), (5, 1, 52)), + 'https://bugs.launchpad.net/ubuntu/+source/mysql-5.1/+bug/706988') def test_association_nonaliased(self): items, Order, orders, order_items, Item = (self.tables.items, self.classes.Order, @@ -288,7 +278,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): class MyQuery(Query): pass User, Address = self._user_address_fixture( - addresses_args={"query_class": MyQuery}) + addresses_args={"query_class": MyQuery}) sess = create_session() u = User() @@ -322,9 +312,7 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): query_class = MyQuery User, Address = self._user_address_fixture( - addresses_args={ - "query_class": MyAppenderQuery}) - + addresses_args={"query_class": MyAppenderQuery}) sess = create_session() u = User() @@ -345,8 +333,10 @@ class DynamicTest(_DynamicFixture, _fixtures.FixtureTest, AssertsCompiledSQL): eq_(type(q).__name__, 'MyQuery') -class UOWTest(_DynamicFixture, _fixtures.FixtureTest, - testing.AssertsExecutionResults): +class UOWTest( + _DynamicFixture, _fixtures.FixtureTest, + testing.AssertsExecutionResults): + run_inserts = None def test_persistence(self): @@ -361,17 +351,17 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, eq_( testing.db.scalar( - select([func.count(1)]).where(addresses.c.user_id != None) - ), - 0 - ) + select( + [func.count(cast(1, Integer))]). + where(addresses.c.user_id != None)), + 0) u1 = sess.query(User).get(u1.id) u1.addresses.append(a1) sess.flush() eq_( testing.db.execute( - select([addresses]).where(addresses.c.user_id != None) + select([addresses]).where(addresses.c.user_id != None) ).fetchall(), [(a1.id, u1.id, 'foo')] ) @@ -380,8 +370,9 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, sess.flush() eq_( testing.db.scalar( - select([func.count(1)]).where(addresses.c.user_id != None) - ), + select( + [func.count(cast(1, Integer))]). + where(addresses.c.user_id != None)), 0 ) @@ -405,12 +396,10 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, [(a2.id, u1.id, 'bar')] ) - def test_merge(self): addresses = self.tables.addresses User, Address = self._user_address_fixture( - addresses_args={ - "order_by": addresses.c.email_address}) + addresses_args={"order_by": addresses.c.email_address}) sess = create_session() u1 = User(name='jack') a1 = Address(email_address='a1') @@ -452,8 +441,7 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, def test_collection_set(self): addresses = self.tables.addresses User, Address = self._user_address_fixture( - addresses_args={ - "order_by": addresses.c.email_address}) + addresses_args={"order_by": addresses.c.email_address}) sess = create_session(autoflush=True, autocommit=False) u1 = User(name='jack') a1 = Address(email_address='a1') @@ -541,7 +529,7 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, def test_rollback(self): User, Address = self._user_address_fixture() sess = create_session( - expire_on_commit=False, autocommit=False, autoflush=True) + expire_on_commit=False, autocommit=False, autoflush=True) u1 = User(name='jack') u1.addresses.append(Address(email_address='lala@hoho.com')) sess.add(u1) @@ -563,12 +551,11 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, def _test_delete_cascade(self, expected): addresses = self.tables.addresses - User, Address = self._user_address_fixture(addresses_args={ - "order_by": addresses.c.id, - "backref": "user", - "cascade": "save-update" if expected \ - else "all, delete" - }) + User, Address = self._user_address_fixture( + addresses_args={ + "order_by": addresses.c.id, + "backref": "user", + "cascade": "save-update" if expected else "all, delete"}) sess = create_session(autoflush=True, autocommit=False) u = User(name='ed') @@ -577,19 +564,19 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, ) sess.add(u) sess.commit() - eq_(testing.db.scalar( - addresses.count(addresses.c.user_id == None)), 0) - eq_(testing.db.scalar( - addresses.count(addresses.c.user_id != None)), 6) + eq_(testing.db.scalar(addresses.count(addresses.c.user_id == None)), 0) + eq_(testing.db.scalar(addresses.count(addresses.c.user_id != None)), 6) sess.delete(u) sess.commit() if expected: - eq_(testing.db.scalar( + eq_( + testing.db.scalar( addresses.count(addresses.c.user_id == None)), 6) - eq_(testing.db.scalar( + eq_( + testing.db.scalar( addresses.count(addresses.c.user_id != None)), 0) else: eq_(testing.db.scalar(addresses.count()), 0) @@ -603,10 +590,10 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, def test_self_referential(self): Node, nodes = self.classes.Node, self.tables.nodes - - mapper(Node, nodes, properties={ - 'children': relationship(Node, lazy="dynamic", order_by=nodes.c.id) - }) + mapper( + Node, nodes, properties={ + 'children': relationship( + Node, lazy="dynamic", order_by=nodes.c.id)}) sess = Session() n2, n3 = Node(), Node() @@ -616,14 +603,13 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, eq_(n1.children.all(), [n2, n3]) - def test_remove_orphans(self): addresses = self.tables.addresses - User, Address = self._user_address_fixture(addresses_args={ - "order_by": addresses.c.id, - "backref": "user", - "cascade": "all, delete-orphan" - }) + User, Address = self._user_address_fixture( + addresses_args={ + "order_by": addresses.c.id, + "backref": "user", + "cascade": "all, delete-orphan"}) sess = create_session(autoflush=True, autocommit=False) u = User(name='ed') @@ -642,9 +628,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, ) def _backref_test(self, autoflush, saveuser): - User, Address = self._user_address_fixture(addresses_args={ - "backref": "user", - }) + User, Address = self._user_address_fixture( + addresses_args={"backref": "user"}) sess = create_session(autoflush=autoflush, autocommit=False) u = User(name='buffy') @@ -686,9 +671,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, self._backref_test(False, False) def test_backref_events(self): - User, Address = self._user_address_fixture(addresses_args={ - "backref": "user", - }) + User, Address = self._user_address_fixture( + addresses_args={"backref": "user"}) u1 = User() a1 = Address() @@ -696,9 +680,8 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, is_(a1.user, u1) def test_no_deref(self): - User, Address = self._user_address_fixture(addresses_args={ - "backref": "user", - }) + User, Address = self._user_address_fixture( + addresses_args={"backref": "user", }) session = create_session() user = User() @@ -723,19 +706,19 @@ class UOWTest(_DynamicFixture, _fixtures.FixtureTest, def query3(): session = create_session(testing.db) - user = session.query(User).first() return session.query(User).first().addresses.all() eq_(query1(), [Address(email_address='joe@joesdomain.example')]) eq_(query2(), [Address(email_address='joe@joesdomain.example')]) eq_(query3(), [Address(email_address='joe@joesdomain.example')]) + class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): run_inserts = None def _transient_fixture(self, addresses_args={}): User, Address = self._user_address_fixture( - addresses_args=addresses_args) + addresses_args=addresses_args) u1 = User() a1 = Address() @@ -743,7 +726,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): def _persistent_fixture(self, autoflush=True, addresses_args={}): User, Address = self._user_address_fixture( - addresses_args=addresses_args) + addresses_args=addresses_args) u1 = User(name='u1') a1 = Address(email_address='a1') @@ -836,8 +819,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): def test_backref_pop_persistent_autoflush_o2m_active_hist(self): u1, a1, s = self._persistent_fixture( - addresses_args={"backref": - backref("user", active_history=True)}) + addresses_args={"backref": backref("user", active_history=True)}) u1.addresses.append(a1) s.flush() s.expire_all() @@ -850,7 +832,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): def test_backref_pop_persistent_autoflush_m2m(self): o1, i1, s = self._persistent_m2m_fixture( - items_args={"backref": "orders"}) + items_args={"backref": "orders"}) o1.items.append(i1) s.flush() s.expire_all() @@ -863,7 +845,7 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): def test_backref_pop_persistent_noflush_m2m(self): o1, i1, s = self._persistent_m2m_fixture( - items_args={"backref": "orders"}, autoflush=False) + items_args={"backref": "orders"}, autoflush=False) o1.items.append(i1) s.flush() s.expire_all() @@ -897,9 +879,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): u1, a1 = self._transient_fixture() a2, a3, a4, a5 = Address(email_address='a2'), \ - Address(email_address='a3'), \ - Address(email_address='a4'), \ - Address(email_address='a5') + Address(email_address='a3'), Address(email_address='a4'), \ + Address(email_address='a5') u1.addresses = [a1, a2] u1.addresses = [a2, a3, a4, a5] @@ -913,9 +894,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): u1, a1, s = self._persistent_fixture(autoflush=False) a2, a3, a4, a5 = Address(email_address='a2'), \ - Address(email_address='a3'), \ - Address(email_address='a4'), \ - Address(email_address='a5') + Address(email_address='a3'), Address(email_address='a4'), \ + Address(email_address='a5') u1.addresses = [a1, a2] u1.addresses = [a2, a3, a4, a5] @@ -929,9 +909,8 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): u1, a1, s = self._persistent_fixture(autoflush=True) a2, a3, a4, a5 = Address(email_address='a2'), \ - Address(email_address='a3'), \ - Address(email_address='a4'), \ - Address(email_address='a5') + Address(email_address='a3'), Address(email_address='a4'), \ + Address(email_address='a5') u1.addresses = [a1, a2] u1.addresses = [a2, a3, a4, a5] @@ -941,7 +920,6 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): compare_passive=([a3, a4, a5], [], [a1]) ) - def test_persistent_but_readded_noflush(self): u1, a1, s = self._persistent_fixture(autoflush=False) u1.addresses.append(a1) @@ -971,7 +949,4 @@ class HistoryTest(_DynamicFixture, _fixtures.FixtureTest): u1.addresses.remove(a1) - self._assert_history(u1, - ([], [], []), - compare_passive=([], [], [a1]) - ) + self._assert_history(u1, ([], [], []), compare_passive=([], [], [a1])) diff --git a/test/orm/test_froms.py b/test/orm/test_froms.py index 7c06836d2..7259132fe 100644 --- a/test/orm/test_froms.py +++ b/test/orm/test_froms.py @@ -1,24 +1,22 @@ -from sqlalchemy.testing import eq_, assert_raises, assert_raises_message -import operator -from sqlalchemy import * -from sqlalchemy import exc as sa_exc, util -from sqlalchemy.sql import compiler, table, column +from sqlalchemy import testing +from sqlalchemy.testing import ( + fixtures, eq_, assert_raises, assert_raises_message, AssertsCompiledSQL) +from sqlalchemy import ( + exc as sa_exc, util, Integer, Table, String, ForeignKey, select, func, + and_, asc, desc, inspect, literal_column, cast, exists) +from sqlalchemy.orm import ( + configure_mappers, Session, mapper, create_session, relationship, + column_property, joinedload_all, contains_eager, contains_alias, + joinedload, clear_mappers, backref, relation, aliased) +from sqlalchemy.sql import table, column from sqlalchemy.engine import default -from sqlalchemy.orm import * -from sqlalchemy.orm import attributes - -from sqlalchemy.testing import eq_ - import sqlalchemy as sa -from sqlalchemy import testing -from sqlalchemy.testing import AssertsCompiledSQL, engines from sqlalchemy.testing.schema import Column from test.orm import _fixtures -from sqlalchemy.testing import fixtures +from sqlalchemy.orm.util import join -from sqlalchemy.orm.util import join, outerjoin, with_parent class QueryTest(_fixtures.FixtureTest): run_setup_mappers = 'once' @@ -40,33 +38,42 @@ class QueryTest(_fixtures.FixtureTest): cls.classes.CompositePk, cls.tables.nodes, \ cls.classes.Order, cls.tables.orders, cls.tables.addresses - mapper(User, users, properties={ - 'addresses':relationship(Address, backref='user', order_by=addresses.c.id), - 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o - }) - mapper(Address, addresses, properties={ - 'dingaling':relationship(Dingaling, uselist=False, backref="address") #o2o - }) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, backref='user', order_by=addresses.c.id), + 'orders': relationship( + Order, backref='user', order_by=orders.c.id), # o2m, m2o + }) + mapper( + Address, addresses, properties={ + 'dingaling': relationship( + Dingaling, uselist=False, backref="address") # o2o + }) mapper(Dingaling, dingalings) - mapper(Order, orders, properties={ - 'items':relationship(Item, secondary=order_items, order_by=items.c.id), #m2m - 'address':relationship(Address), # m2o - }) - mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords) #m2m - }) + mapper( + Order, orders, properties={ + 'items': relationship( + Item, secondary=order_items, order_by=items.c.id), # m2m + 'address': relationship(Address), # m2o + }) + mapper( + Item, items, properties={ + 'keywords': relationship( + Keyword, secondary=item_keywords)}) # m2m mapper(Keyword, keywords) - mapper(Node, nodes, properties={ - 'children':relationship(Node, - backref=backref('parent', remote_side=[nodes.c.id]) - ) - }) + mapper( + Node, nodes, properties={ + 'children': relationship( + Node, backref=backref('parent', remote_side=[nodes.c.id])) + }) mapper(CompositePk, composite_pk_table) configure_mappers() + class QueryCorrelatesLikeSelect(QueryTest, AssertsCompiledSQL): query_correlated = "SELECT users.name AS users_name, " \ @@ -81,69 +88,57 @@ class QueryCorrelatesLikeSelect(QueryTest, AssertsCompiledSQL): addresses, users = self.tables.addresses, self.tables.users query = select( [func.count(addresses.c.id)], - addresses.c.user_id==users.c.id - ).as_scalar() + addresses.c.user_id == users.c.id).as_scalar() query = select([users.c.name.label('users_name'), query]) - self.assert_compile(query, self.query_correlated, - dialect=default.DefaultDialect() - ) + self.assert_compile( + query, self.query_correlated, dialect=default.DefaultDialect()) def test_as_scalar_select_explicit_correlate(self): addresses, users = self.tables.addresses, self.tables.users query = select( [func.count(addresses.c.id)], - addresses.c.user_id==users.c.id - ).correlate(users).as_scalar() + addresses.c.user_id == users.c.id).correlate(users).as_scalar() query = select([users.c.name.label('users_name'), query]) - self.assert_compile(query, self.query_correlated, - dialect=default.DefaultDialect() - ) + self.assert_compile( + query, self.query_correlated, dialect=default.DefaultDialect()) def test_as_scalar_select_correlate_off(self): addresses, users = self.tables.addresses, self.tables.users query = select( [func.count(addresses.c.id)], - addresses.c.user_id==users.c.id - ).correlate(None).as_scalar() - query = select([ users.c.name.label('users_name'), query]) - self.assert_compile(query, self.query_not_correlated, - dialect=default.DefaultDialect() - ) + addresses.c.user_id == users.c.id).correlate(None).as_scalar() + query = select([users.c.name.label('users_name'), query]) + self.assert_compile( + query, self.query_not_correlated, dialect=default.DefaultDialect()) def test_as_scalar_query_auto_correlate(self): sess = create_session() Address, User = self.classes.Address, self.classes.User query = sess.query(func.count(Address.id))\ - .filter(Address.user_id==User.id)\ + .filter(Address.user_id == User.id)\ .as_scalar() query = sess.query(User.name, query) - self.assert_compile(query, self.query_correlated, - dialect=default.DefaultDialect() - ) + self.assert_compile( + query, self.query_correlated, dialect=default.DefaultDialect()) def test_as_scalar_query_explicit_correlate(self): sess = create_session() Address, User = self.classes.Address, self.classes.User - query = sess.query(func.count(Address.id))\ - .filter(Address.user_id==User.id)\ - .correlate(self.tables.users)\ - .as_scalar() + query = sess.query(func.count(Address.id)). \ + filter(Address.user_id == User.id). \ + correlate(self.tables.users).as_scalar() query = sess.query(User.name, query) - self.assert_compile(query, self.query_correlated, - dialect=default.DefaultDialect() - ) + self.assert_compile( + query, self.query_correlated, dialect=default.DefaultDialect()) def test_as_scalar_query_correlate_off(self): sess = create_session() Address, User = self.classes.Address, self.classes.User - query = sess.query(func.count(Address.id))\ - .filter(Address.user_id==User.id)\ - .correlate(None)\ - .as_scalar() + query = sess.query(func.count(Address.id)). \ + filter(Address.user_id == User.id).correlate(None).as_scalar() query = sess.query(User.name, query) - self.assert_compile(query, self.query_not_correlated, - dialect=default.DefaultDialect() - ) + self.assert_compile( + query, self.query_not_correlated, dialect=default.DefaultDialect()) class RawSelectTest(QueryTest, AssertsCompiledSQL): @@ -161,35 +156,39 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): sess = create_session() - self.assert_compile(sess.query(users).select_entity_from( - users.select()).with_labels().statement, - "SELECT users.id AS users_id, users.name AS users_name FROM users, " + self.assert_compile( + sess.query(users).select_entity_from(users.select()). + with_labels().statement, + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users, " "(SELECT users.id AS id, users.name AS name FROM users) AS anon_1", ) - self.assert_compile(sess.query(users, exists([1], from_obj=addresses) - ).with_labels().statement, + self.assert_compile( + sess.query(users, exists([1], from_obj=addresses)). + with_labels().statement, "SELECT users.id AS users_id, users.name AS users_name, EXISTS " "(SELECT 1 FROM addresses) AS anon_1 FROM users", ) # a little tedious here, adding labels to work around Query's # auto-labelling. - s = sess.query(addresses.c.id.label('id'), - addresses.c.email_address.label('email')).\ + s = sess.query( + addresses.c.id.label('id'), + addresses.c.email_address.label('email')).\ filter(addresses.c.user_id == users.c.id).correlate(users).\ - statement.alias() - - self.assert_compile(sess.query(users, s.c.email).select_entity_from( - users.join(s, s.c.id == users.c.id) - ).with_labels().statement, - "SELECT users.id AS users_id, users.name AS users_name, " - "anon_1.email AS anon_1_email " - "FROM users JOIN (SELECT addresses.id AS id, " - "addresses.email_address AS email FROM addresses, users " - "WHERE addresses.user_id = users.id) AS anon_1 " - "ON anon_1.id = users.id", - ) + statement.alias() + + self.assert_compile( + sess.query(users, s.c.email).select_entity_from( + users.join(s, s.c.id == users.c.id) + ).with_labels().statement, + "SELECT users.id AS users_id, users.name AS users_name, " + "anon_1.email AS anon_1_email " + "FROM users JOIN (SELECT addresses.id AS id, " + "addresses.email_address AS email FROM addresses, users " + "WHERE addresses.user_id = users.id) AS anon_1 " + "ON anon_1.id = users.id",) x = func.lala(users.c.id).label('foo') self.assert_compile(sess.query(x).filter(x == 5).statement, @@ -202,69 +201,53 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): class FromSelfTest(QueryTest, AssertsCompiledSQL): __dialect__ = 'default' + def test_filter(self): User = self.classes.User eq_( [User(id=8), User(id=9)], - create_session(). - query(User). - filter(User.id.in_([8,9])). - from_self().all() - ) + create_session().query(User).filter(User.id.in_([8, 9])). + from_self().all()) eq_( [User(id=8), User(id=9)], - create_session().query(User). - order_by(User.id).slice(1,3). - from_self().all() - ) + create_session().query(User).order_by(User.id).slice(1, 3). + from_self().all()) eq_( [User(id=8)], list( - create_session(). - query(User). - filter(User.id.in_([8,9])). - from_self().order_by(User.id)[0:1] - ) - ) + create_session().query(User).filter(User.id.in_([8, 9])). + from_self().order_by(User.id)[0:1])) def test_join(self): User, Address = self.classes.User, self.classes.Address eq_( - [ - (User(id=8), Address(id=2)), - (User(id=8), Address(id=3)), - (User(id=8), Address(id=4)), - (User(id=9), Address(id=5)) - ], - create_session(). - query(User). - filter(User.id.in_([8,9])). - from_self(). - join('addresses'). - add_entity(Address). - order_by(User.id, Address.id).all() + [ + (User(id=8), Address(id=2)), + (User(id=8), Address(id=3)), + (User(id=8), Address(id=4)), + (User(id=9), Address(id=5))], + create_session().query(User).filter(User.id.in_([8, 9])). + from_self().join('addresses').add_entity(Address). + order_by(User.id, Address.id).all() ) def test_group_by(self): Address = self.classes.Address eq_( - create_session().query(Address.user_id, - func.count(Address.id).label('count')).\ - group_by(Address.user_id). - order_by(Address.user_id).all(), + create_session(). + query(Address.user_id, func.count(Address.id).label('count')). + group_by(Address.user_id).order_by(Address.user_id).all(), [(7, 1), (8, 3), (9, 1)] ) eq_( - create_session().query(Address.user_id, Address.id).\ - from_self(Address.user_id, - func.count(Address.id)).\ - group_by(Address.user_id). - order_by(Address.user_id).all(), + create_session().query(Address.user_id, Address.id). + from_self(Address.user_id, func.count(Address.id)). + group_by(Address.user_id).order_by(Address.user_id).all(), [(7, 1), (8, 3), (9, 1)] ) @@ -274,8 +257,8 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): s = create_session() self.assert_compile( - s.query(User.id).group_by(User.id).having(User.id>5). - from_self(), + s.query(User.id).group_by(User.id).having(User.id > 5). + from_self(), "SELECT anon_1.users_id AS anon_1_users_id FROM " "(SELECT users.id AS users_id FROM users GROUP " "BY users.id HAVING users.id > :id_1) AS anon_1" @@ -287,12 +270,11 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): User = self.classes.User - s = create_session() self.assert_compile( s.query(User).options(joinedload(User.addresses)). - from_self().statement, + from_self().statement, "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, " "addresses_1.user_id, addresses_1.email_address FROM " "(SELECT users.id AS users_id, users.name AS " @@ -302,18 +284,18 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): ) def test_aliases(self): - """test that aliased objects are accessible externally to a from_self() call.""" + """test that aliased objects are accessible externally to a from_self() + call.""" User, Address = self.classes.User, self.classes.Address - s = create_session() ualias = aliased(User) eq_( s.query(User, ualias).filter(User.id > ualias.id). - from_self(User.name, ualias.name). - order_by(User.name, ualias.name).all(), + from_self(User.name, ualias.name). + order_by(User.name, ualias.name).all(), [ ('chuck', 'ed'), ('chuck', 'fred'), @@ -325,28 +307,22 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): ) eq_( - s.query(User, ualias). - filter(User.id > ualias.id). - from_self(User.name, ualias.name). - filter(ualias.name=='ed')\ - .order_by(User.name, ualias.name).all(), - [('chuck', 'ed'), ('fred', 'ed')] - ) + s.query(User, ualias).filter(User.id > ualias.id). + from_self(User.name, ualias.name).filter(ualias.name == 'ed'). + order_by(User.name, ualias.name).all(), + [('chuck', 'ed'), ('fred', 'ed')]) eq_( - s.query(User, ualias). - filter(User.id > ualias.id). - from_self(ualias.name, Address.email_address). - join(ualias.addresses). - order_by(ualias.name, Address.email_address).all(), + s.query(User, ualias).filter(User.id > ualias.id). + from_self(ualias.name, Address.email_address). + join(ualias.addresses). + order_by(ualias.name, Address.email_address).all(), [ ('ed', 'fred@fred.com'), ('jack', 'ed@bettyboop.com'), ('jack', 'ed@lala.com'), ('jack', 'ed@wood.com'), - ('jack', 'fred@fred.com')] - ) - + ('jack', 'fred@fred.com')]) def test_multiple_entities(self): User, Address = self.classes.User, self.classes.Address @@ -354,26 +330,21 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): sess = create_session() eq_( - sess.query(User, Address).\ - filter(User.id==Address.user_id).\ - filter(Address.id.in_([2, 5])).from_self().all(), + sess.query(User, Address). + filter(User.id == Address.user_id). + filter(Address.id.in_([2, 5])).from_self().all(), [ (User(id=8), Address(id=2)), - (User(id=9), Address(id=5)) - ] - ) + (User(id=9), Address(id=5))]) eq_( - sess.query(User, Address).\ - filter(User.id==Address.user_id).\ - filter(Address.id.in_([2, 5])).\ - from_self().\ - options(joinedload('addresses')).first(), - - (User(id=8, - addresses=[Address(), Address(), Address()]), - Address(id=2)), - ) + sess.query(User, Address).filter(User.id == Address.user_id). + filter(Address.id.in_([2, 5])).from_self(). + options(joinedload('addresses')).first(), + ( + User( + id=8, addresses=[Address(), Address(), Address()]), + Address(id=2)),) def test_multiple_with_column_entities(self): User = self.classes.User @@ -381,16 +352,11 @@ class FromSelfTest(QueryTest, AssertsCompiledSQL): sess = create_session() eq_( - sess.query(User.id).from_self().\ - add_column(func.count().label('foo')).\ - group_by(User.id).\ - order_by(User.id).\ - from_self().all(), - [ - (7,1), (8, 1), (9, 1), (10, 1) - ] + sess.query(User.id).from_self(). + add_column(func.count().label('foo')).group_by(User.id). + order_by(User.id).from_self().all(), [ + (7, 1), (8, 1), (9, 1), (10, 1)]) - ) class ColumnAccessTest(QueryTest, AssertsCompiledSQL): """test access of columns after _from_selectable has been applied""" @@ -403,7 +369,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q = sess.query(User).from_self() self.assert_compile( - q.filter(User.name=='ed'), + q.filter(User.name == 'ed'), "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " "anon_1_users_name FROM (SELECT users.id AS users_id, users.name " "AS users_name FROM users) AS anon_1 WHERE anon_1.users_name = " @@ -416,7 +382,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q = sess.query(User).from_self(User.id, User.name).from_self() self.assert_compile( - q.filter(User.name=='ed'), + q.filter(User.name == 'ed'), "SELECT anon_1.anon_2_users_id AS anon_1_anon_2_users_id, " "anon_1.anon_2_users_name AS anon_1_anon_2_users_name FROM " "(SELECT anon_2.users_id AS anon_2_users_id, anon_2.users_name " @@ -432,7 +398,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) q = sess.query(User).select_entity_from(q.statement) self.assert_compile( - q.filter(User.name=='ed'), + q.filter(User.name == 'ed'), "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name " "FROM (SELECT users.id AS id, users.name AS name FROM " "users) AS anon_1 WHERE anon_1.name = :name_1" @@ -442,13 +408,11 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): User = self.classes.User sess = create_session() - q = sess.query(User) assert_raises_message( sa.exc.ArgumentError, r"A selectable \(FromClause\) instance is " "expected when the base alias is being set", - sess.query(User).select_entity_from, User - ) + sess.query(User).select_entity_from, User) def test_select_from_no_aliasing(self): User = self.classes.User @@ -457,7 +421,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) q = sess.query(User).select_from(q.statement) self.assert_compile( - q.filter(User.name=='ed'), + q.filter(User.name == 'ed'), "SELECT users.id AS users_id, users.name AS users_name " "FROM users, (SELECT users.id AS id, users.name AS name FROM " "users) AS anon_1 WHERE users.name = :name_1" @@ -490,7 +454,8 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q1.order_by(c1), "SELECT anon_1.anon_2_c1 AS anon_1_anon_2_c1, anon_1.anon_2_c2 AS " "anon_1_anon_2_c2 FROM (SELECT anon_2.c1 AS anon_2_c1, anon_2.c2 " - "AS anon_2_c2 FROM (SELECT c1 AS c1, c2 AS c2 WHERE c1 = :c1_1) AS " + "AS anon_2_c2 " + "FROM (SELECT c1 AS c1, c2 AS c2 WHERE c1 = :c1_1) AS " "anon_2) AS anon_1 ORDER BY anon_1.anon_2_c1" ) @@ -511,7 +476,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): ) def test_table_anonymous_expression_from_self_twice(self): - from sqlalchemy.sql import column, table + from sqlalchemy.sql import column sess = create_session() t1 = table('t1', column('c1'), column('c2')) @@ -519,16 +484,16 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q1 = q1.from_self().from_self() self.assert_compile( q1.order_by(t1.c.c1), - "SELECT anon_1.anon_2_t1_c1 AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 " - "AS anon_1_anon_2_t1_c2 FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, " + "SELECT anon_1.anon_2_t1_c1 " + "AS anon_1_anon_2_t1_c1, anon_1.anon_2_t1_c2 " + "AS anon_1_anon_2_t1_c2 " + "FROM (SELECT anon_2.t1_c1 AS anon_2_t1_c1, " "anon_2.t1_c2 AS anon_2_t1_c2 FROM (SELECT t1.c1 AS t1_c1, t1.c2 " - "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 ORDER BY " - "anon_1.anon_2_t1_c1" + "AS t1_c2 FROM t1 WHERE t1.c1 = :c1_1) AS anon_2) AS anon_1 " + "ORDER BY anon_1.anon_2_t1_c1" ) def test_anonymous_labeled_expression(self): - from sqlalchemy.sql import column - sess = create_session() c1, c2 = column('c1'), column('c2') q1 = sess.query(c1.label('foo'), c2.label('bar')).filter(c1 == 'dog') @@ -538,8 +503,8 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q3.order_by(c1), "SELECT anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar FROM " "(SELECT c1 AS foo, c2 AS bar WHERE c1 = :c1_1 UNION SELECT " - "c1 AS foo, c2 AS bar WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.foo" - ) + "c1 AS foo, c2 AS bar " + "WHERE c1 = :c1_2) AS anon_1 ORDER BY anon_1.foo") def test_anonymous_expression_plus_aliased_join(self): """test that the 'dont alias non-ORM' rule remains for other @@ -553,7 +518,7 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): q1 = sess.query(User.id).filter(User.id > 5) q1 = q1.from_self() q1 = q1.join(User.addresses, aliased=True).\ - order_by(User.id, Address.id, addresses.c.id) + order_by(User.id, Address.id, addresses.c.id) self.assert_compile( q1, "SELECT anon_1.users_id AS anon_1_users_id " @@ -563,40 +528,44 @@ class ColumnAccessTest(QueryTest, AssertsCompiledSQL): "ORDER BY anon_1.users_id, addresses_1.id, addresses.id" ) + class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL): run_setup_mappers = 'once' @classmethod def define_tables(cls, metadata): - Table('a', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table( + 'a', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), Column('type', String(20)), Column('bid', Integer, ForeignKey('b.id')) ) - Table('b', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table( + 'b', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)), Column('type', String(20)) ) - Table('c', metadata, + Table( + 'c', metadata, Column('id', Integer, ForeignKey('b.id'), primary_key=True), - Column('age', Integer) - ) + Column('age', Integer)) - Table('d', metadata, + Table( + 'd', metadata, Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('dede', Integer) - ) + Column('dede', Integer)) @classmethod def setup_classes(cls): - a, c, b, d = (cls.tables.a, - cls.tables.c, - cls.tables.b, - cls.tables.d) + a, c, b, d = (cls.tables.a, cls.tables.c, cls.tables.b, cls.tables.d) class A(cls.Comparable): pass @@ -610,46 +579,37 @@ class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL): class D(A): pass - mapper(A, a, - polymorphic_identity='a', - polymorphic_on=a.c.type, - with_polymorphic= ('*', None), - properties={ - 'link':relation( B, uselist=False, backref='back') - }) - mapper(B, b, - polymorphic_identity='b', - polymorphic_on=b.c.type, - with_polymorphic= ('*', None) - ) + mapper( + A, a, polymorphic_identity='a', polymorphic_on=a.c.type, + with_polymorphic=('*', None), properties={ + 'link': relation(B, uselist=False, backref='back')}) + mapper( + B, b, polymorphic_identity='b', polymorphic_on=b.c.type, + with_polymorphic=('*', None)) mapper(C, c, inherits=B, polymorphic_identity='c') mapper(D, d, inherits=A, polymorphic_identity='d') @classmethod def insert_data(cls): - A, C, B = (cls.classes.A, - cls.classes.C, - cls.classes.B) + A, C, B = (cls.classes.A, cls.classes.C, cls.classes.B) sess = create_session() - sess.add_all([ - B(name='b1'), - A(name='a1', link= C(name='c1',age=3)), - C(name='c2',age=6), - A(name='a2') - ]) + sess.add_all( + [ + B(name='b1'), + A(name='a1', link=C(name='c1', age=3)), + C(name='c2', age=6), + A(name='a2')]) sess.flush() def test_add_entity_equivalence(self): - A, C, B = (self.classes.A, - self.classes.C, - self.classes.B) + A, C, B = (self.classes.A, self.classes.C, self.classes.B) sess = create_session() for q in [ - sess.query( A,B).join( A.link), - sess.query( A).join( A.link).add_entity(B), + sess.query(A, B).join(A.link), + sess.query(A).join(A.link).add_entity(B), ]: eq_( q.all(), @@ -660,9 +620,9 @@ class AddEntityEquivalenceTest(fixtures.MappedTest, AssertsCompiledSQL): ) for q in [ - sess.query( B,A).join( B.back), - sess.query( B).join( B.back).add_entity(A), - sess.query( B).add_entity(A).join( B.back) + sess.query(B, A).join(B.back), + sess.query(B).join(B.back).add_entity(A), + sess.query(B).add_entity(A).join(B.back) ]: eq_( q.all(), @@ -681,18 +641,17 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.tables.users) query = users.select(users.c.id == 7).\ - union(users.select(users.c.id > 7)).\ - alias('ulist').\ - outerjoin(addresses).\ - select(use_labels=True, - order_by=['ulist.id', addresses.c.id]) + union(users.select(users.c.id > 7)).alias('ulist').\ + outerjoin(addresses).\ + select(use_labels=True, order_by=['ulist.id', addresses.c.id]) sess = create_session() q = sess.query(User) def go(): - l = list(q.options(contains_alias('ulist'), - contains_eager('addresses')).\ - instances(query.execute())) + l = list( + q.options( + contains_alias('ulist'), contains_eager('addresses')). + instances(query.execute())) assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -702,18 +661,16 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.tables.users) query = users.select(users.c.id == 7).\ - union(users.select(users.c.id > 7)).\ - alias('ulist').\ - outerjoin(addresses).\ - select(use_labels=True, - order_by=['ulist.id', addresses.c.id]) + union(users.select(users.c.id > 7)).alias('ulist').\ + outerjoin(addresses). \ + select(use_labels=True, order_by=['ulist.id', addresses.c.id]) sess = create_session() q = sess.query(User) def go(): - l = q.options(contains_alias('ulist'), - contains_eager('addresses')).\ - from_statement(query).all() + l = q.options( + contains_alias('ulist'), contains_eager('addresses')).\ + from_statement(query).all() assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -723,17 +680,15 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.tables.users) query = users.select(users.c.id == 7).\ - union(users.select(users.c.id > 7)).\ - alias('ulist').\ - outerjoin(addresses).\ - select(use_labels=True, - order_by=['ulist.id', addresses.c.id]) + union(users.select(users.c.id > 7)).alias('ulist').\ + outerjoin(addresses). \ + select(use_labels=True, order_by=['ulist.id', addresses.c.id]) sess = create_session() # better way. use select_entity_from() def go(): l = sess.query(User).select_entity_from(query).\ - options(contains_eager('addresses')).all() + options(contains_eager('addresses')).all() assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -749,14 +704,13 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # the adapter created by contains_eager() adalias = addresses.alias() query = users.select(users.c.id == 7).\ - union(users.select(users.c.id > 7)).\ - alias('ulist').\ - outerjoin(adalias).\ - select(use_labels=True, - order_by=['ulist.id', adalias.c.id]) + union(users.select(users.c.id > 7)).\ + alias('ulist').outerjoin(adalias).\ + select(use_labels=True, order_by=['ulist.id', adalias.c.id]) + def go(): l = sess.query(User).select_entity_from(query).\ - options(contains_eager('addresses', alias=adalias)).all() + options(contains_eager('addresses', alias=adalias)).all() assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -769,8 +723,8 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # test that contains_eager suppresses the normal outer join rendering q = sess.query(User).outerjoin(User.addresses).\ - options(contains_eager(User.addresses)).\ - order_by(User.id, addresses.c.id) + options(contains_eager(User.addresses)).\ + order_by(User.id, addresses.c.id) self.assert_compile(q.with_labels().statement, 'SELECT addresses.id AS addresses_id, ' 'addresses.user_id AS addresses_user_id, ' @@ -789,42 +743,41 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): adalias = addresses.alias() q = sess.query(User).\ - select_entity_from(users.outerjoin(adalias)).\ - options(contains_eager(User.addresses, alias=adalias)).\ - order_by(User.id, adalias.c.id) + select_entity_from(users.outerjoin(adalias)).\ + options(contains_eager(User.addresses, alias=adalias)).\ + order_by(User.id, adalias.c.id) + def go(): eq_(self.static.user_address_result, q.order_by(User.id).all()) self.assert_sql_count(testing.db, go, 1) sess.expunge_all() - selectquery = users.\ - outerjoin(addresses).\ - select(users.c.id<10, - use_labels=True, - order_by=[users.c.id, addresses.c.id]) + selectquery = users.outerjoin(addresses). \ + select( + users.c.id < 10, use_labels=True, + order_by=[users.c.id, addresses.c.id]) q = sess.query(User) def go(): - l = list(q.options( - contains_eager('addresses') - ).instances(selectquery.execute())) + l = list( + q.options(contains_eager('addresses')). + instances(selectquery.execute())) assert self.static.user_address_result[0:3] == l self.assert_sql_count(testing.db, go, 1) sess.expunge_all() def go(): - l = list(q.options( - contains_eager(User.addresses) - ).instances(selectquery.execute())) + l = list( + q.options(contains_eager(User.addresses)). + instances(selectquery.execute())) assert self.static.user_address_result[0:3] == l self.assert_sql_count(testing.db, go, 1) sess.expunge_all() def go(): l = q.options( - contains_eager('addresses') - ).from_statement(selectquery).all() + contains_eager('addresses')).from_statement(selectquery).all() assert self.static.user_address_result[0:3] == l self.assert_sql_count(testing.db, go, 1) @@ -837,15 +790,15 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) adalias = addresses.alias('adalias') - selectquery = users.outerjoin(adalias).\ - select(use_labels=True, - order_by=[users.c.id, adalias.c.id]) + selectquery = users.outerjoin(adalias). \ + select(use_labels=True, order_by=[users.c.id, adalias.c.id]) # string alias name def go(): - l = list(q.options( - contains_eager('addresses', alias="adalias") - ).instances(selectquery.execute())) + l = list( + q.options( + contains_eager('addresses', alias="adalias")). + instances(selectquery.execute())) assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -859,14 +812,14 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): adalias = addresses.alias('adalias') selectquery = users.outerjoin(adalias).\ - select(use_labels=True, - order_by=[users.c.id, adalias.c.id]) + select(use_labels=True, order_by=[users.c.id, adalias.c.id]) # expression.Alias object def go(): - l = list(q.options( - contains_eager('addresses', alias=adalias) - ).instances(selectquery.execute())) + l = list( + q.options( + contains_eager('addresses', alias=adalias)). + instances(selectquery.execute())) assert self.static.user_address_result == l self.assert_sql_count(testing.db, go, 1) @@ -878,12 +831,12 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # Aliased object adalias = aliased(Address) + def go(): l = q.options( - contains_eager('addresses', alias=adalias) - ).\ - outerjoin(adalias, User.addresses).\ - order_by(User.id, adalias.id) + contains_eager('addresses', alias=adalias) + ).outerjoin(adalias, User.addresses).\ + order_by(User.id, adalias.id) assert self.static.user_address_result == l.all() self.assert_sql_count(testing.db, go, 1) @@ -899,18 +852,17 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): oalias = orders.alias('o1') ialias = items.alias('i1') - query = users.outerjoin(oalias).\ - outerjoin(order_items).\ - outerjoin(ialias).\ - select(use_labels=True).\ - order_by(users.c.id, oalias.c.id, ialias.c.id) + query = users.outerjoin(oalias).outerjoin(order_items).\ + outerjoin(ialias).select(use_labels=True).\ + order_by(users.c.id, oalias.c.id, ialias.c.id) # test using string alias with more than one level deep def go(): - l = list(q.options( - contains_eager('orders', alias='o1'), - contains_eager('orders.items', alias='i1') - ).instances(query.execute())) + l = list( + q.options( + contains_eager('orders', alias='o1'), + contains_eager('orders.items', alias='i1') + ).instances(query.execute())) assert self.static.user_order_result == l self.assert_sql_count(testing.db, go, 1) @@ -926,30 +878,29 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): oalias = orders.alias('o1') ialias = items.alias('i1') - query = users.outerjoin(oalias).\ - outerjoin(order_items).\ - outerjoin(ialias).\ - select(use_labels=True).\ - order_by(users.c.id, oalias.c.id, ialias.c.id) + query = users.outerjoin(oalias).outerjoin(order_items).\ + outerjoin(ialias).select(use_labels=True).\ + order_by(users.c.id, oalias.c.id, ialias.c.id) # test using Alias with more than one level deep # new way: - #from sqlalchemy.orm.strategy_options import Load - #opt = Load(User).contains_eager('orders', alias=oalias).contains_eager('items', alias=ialias) + # from sqlalchemy.orm.strategy_options import Load + # opt = Load(User).contains_eager('orders', alias=oalias). + # contains_eager('items', alias=ialias) def go(): - l = list(q.options( + l = list( + q.options( contains_eager('orders', alias=oalias), - contains_eager('orders.items', alias=ialias) - ).instances(query.execute())) + contains_eager('orders.items', alias=ialias)). + instances(query.execute())) assert self.static.user_order_result == l self.assert_sql_count(testing.db, go, 1) def test_contains_eager_multi_aliased(self): - Item, User, Order = (self.classes.Item, - self.classes.User, - self.classes.Order) + Item, User, Order = ( + self.classes.Item, self.classes.User, self.classes.Order) sess = create_session() q = sess.query(User) @@ -957,11 +908,11 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # test using Aliased with more than one level deep oalias = aliased(Order) ialias = aliased(Item) + def go(): l = q.options( - contains_eager(User.orders, alias=oalias), - contains_eager(User.orders, Order.items, alias=ialias) - ).\ + contains_eager(User.orders, alias=oalias), + contains_eager(User.orders, Order.items, alias=ialias)).\ outerjoin(oalias, User.orders).\ outerjoin(ialias, oalias.items).\ order_by(User.id, oalias.id, ialias.id) @@ -975,14 +926,10 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.classes.User, self.classes.Address) - sess = create_session() - q = sess.query(User).\ - join(User.addresses).\ - join(Address.dingaling).\ - options( - contains_eager(User.addresses, Address.dingaling), - ) + q = sess.query(User).join(User.addresses).join(Address.dingaling).\ + options(contains_eager(User.addresses, Address.dingaling),) + def go(): eq_( q.all(), @@ -1010,15 +957,13 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): self.classes.User, self.classes.Address) - sess = create_session() da = aliased(Dingaling, name="foob") - q = sess.query(User).\ - join(User.addresses).\ - join(da, Address.dingaling).\ - options( - contains_eager(User.addresses, Address.dingaling, alias=da), - ) + q = sess.query(User).join(User.addresses).\ + join(da, Address.dingaling).\ + options( + contains_eager(User.addresses, Address.dingaling, alias=da),) + def go(): eq_( q.all(), @@ -1046,6 +991,7 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): sess = create_session() q = sess.query(User) + def go(): # outerjoin to User.orders, offset 1/limit 2 so we get user # 7 + second two orders. then joinedload the addresses. @@ -1054,17 +1000,25 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # applies context.adapter to result rows. This was # [ticket:1180]. - l = \ - q.outerjoin(User.orders).options(joinedload(User.addresses), - contains_eager(User.orders)).order_by(User.id, - Order.id).offset(1).limit(2).all() - eq_(l, [User(id=7, - addresses=[Address(email_address='jack@bean.com', - user_id=7, id=1)], name='jack', - orders=[Order(address_id=1, user_id=7, - description='order 3', isopen=1, id=3), - Order(address_id=None, user_id=7, description='order 5' - , isopen=0, id=5)])]) + l = q.outerjoin(User.orders).options( + joinedload(User.addresses), contains_eager(User.orders)). \ + order_by(User.id, Order.id).offset(1).limit(2).all() + eq_( + l, [ + User( + id=7, + addresses=[ + Address( + email_address='jack@bean.com', + user_id=7, id=1)], + name='jack', + orders=[ + Order( + address_id=1, user_id=7, description='order 3', + isopen=1, id=3), + Order( + address_id=None, user_id=7, + description='order 5', isopen=0, id=5)])]) self.assert_sql_count(testing.db, go, 1) sess.expunge_all() @@ -1075,18 +1029,28 @@ class InstancesTest(QueryTest, AssertsCompiledSQL): # are applied by the eager loader oalias = aliased(Order) - l = q.outerjoin(oalias, User.orders).\ - options(joinedload(User.addresses), - contains_eager(User.orders, alias=oalias)).\ - order_by(User.id, oalias.id).\ - offset(1).limit(2).all() - eq_(l, [User(id=7, - addresses=[Address(email_address='jack@bean.com', - user_id=7, id=1)], name='jack', - orders=[Order(address_id=1, user_id=7, - description='order 3', isopen=1, id=3), - Order(address_id=None, user_id=7, description='order 5' - , isopen=0, id=5)])]) + l = q.outerjoin(oalias, User.orders).options( + joinedload(User.addresses), + contains_eager(User.orders, alias=oalias)). \ + order_by(User.id, oalias.id).\ + offset(1).limit(2).all() + eq_( + l, + [ + User( + id=7, + addresses=[ + Address( + email_address='jack@bean.com', + user_id=7, id=1)], + name='jack', + orders=[ + Order( + address_id=1, user_id=7, description='order 3', + isopen=1, id=3), + Order( + address_id=None, user_id=7, + description='order 5', isopen=0, id=5)])]) self.assert_sql_count(testing.db, go, 1) @@ -1110,50 +1074,51 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) q2 = q.order_by(User.id).\ - values(User.name, User.name + " " + cast(User.id, String(50))) + values(User.name, User.name + " " + cast(User.id, String(50))) eq_( list(q2), - [('jack', 'jack 7'), ('ed', 'ed 8'), - ('fred', 'fred 9'), ('chuck', 'chuck 10')] + [ + ('jack', 'jack 7'), ('ed', 'ed 8'), + ('fred', 'fred 9'), ('chuck', 'chuck 10')] ) - q2 = q.join('addresses').\ - filter(User.name.like('%e%')).\ - order_by(User.id, Address.id).\ - values(User.name, Address.email_address) - eq_(list(q2), - [('ed', 'ed@wood.com'), ('ed', 'ed@bettyboop.com'), + q2 = q.join('addresses').filter(User.name.like('%e%')).\ + order_by(User.id, Address.id).\ + values(User.name, Address.email_address) + eq_( + list(q2), + [ + ('ed', 'ed@wood.com'), ('ed', 'ed@bettyboop.com'), ('ed', 'ed@lala.com'), ('fred', 'fred@fred.com')]) - q2 = q.join('addresses').\ - filter(User.name.like('%e%')).\ - order_by(desc(Address.email_address)).\ - slice(1, 3).values(User.name, Address.email_address) + q2 = q.join('addresses').filter(User.name.like('%e%')).\ + order_by(desc(Address.email_address)).\ + slice(1, 3).values(User.name, Address.email_address) eq_(list(q2), [('ed', 'ed@wood.com'), ('ed', 'ed@lala.com')]) adalias = aliased(Address) - q2 = q.join(adalias, 'addresses').\ - filter(User.name.like('%e%')).order_by(adalias.email_address).\ - values(User.name, adalias.email_address) + q2 = q.join(adalias, 'addresses'). \ + filter(User.name.like('%e%')).order_by(adalias.email_address).\ + values(User.name, adalias.email_address) eq_(list(q2), [('ed', 'ed@bettyboop.com'), ('ed', 'ed@lala.com'), ('ed', 'ed@wood.com'), ('fred', 'fred@fred.com')]) q2 = q.values(func.count(User.name)) assert next(q2) == (4,) - q2 = q.select_entity_from(sel).filter(User.id==8).values(User.name, sel.c.name, User.name) + q2 = q.select_entity_from(sel).filter(User.id == 8). \ + values(User.name, sel.c.name, User.name) eq_(list(q2), [('ed', 'ed', 'ed')]) # using User.xxx is alised against "sel", so this query returns nothing - q2 = q.select_entity_from(sel).\ - filter(User.id==8).\ - filter(User.id>sel.c.id).values(User.name, sel.c.name, User.name) + q2 = q.select_entity_from(sel).filter(User.id == 8).\ + filter(User.id > sel.c.id).values(User.name, sel.c.name, User.name) eq_(list(q2), []) # whereas this uses users.c.xxx, is not aliased and creates a new join - q2 = q.select_entity_from(sel).\ - filter(users.c.id==8).\ - filter(users.c.id>sel.c.id).values(users.c.name, sel.c.name, User.name) + q2 = q.select_entity_from(sel).filter(users.c.id == 8).\ + filter(users.c.id > sel.c.id). \ + values(users.c.name, sel.c.name, User.name) eq_(list(q2), [('ed', 'jack', 'jack')]) def test_alias_naming(self): @@ -1162,7 +1127,7 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess = create_session() ua = aliased(User, name="foobar") - q= sess.query(ua) + q = sess.query(ua) self.assert_compile( q, "SELECT foobar.id AS foobar_id, " @@ -1180,14 +1145,16 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sel = users.select(User.id.in_([7, 8])).alias() q = sess.query(User) u2 = aliased(User) - q2 = q.select_entity_from(sel).\ - filter(u2.id>1).\ - order_by(User.id, sel.c.id, u2.id).\ - values(User.name, sel.c.name, u2.name) - eq_(list(q2), [('jack', 'jack', 'jack'), ('jack', 'jack', 'ed'), - ('jack', 'jack', 'fred'), ('jack', 'jack', 'chuck'), - ('ed', 'ed', 'jack'), ('ed', 'ed', 'ed'), - ('ed', 'ed', 'fred'), ('ed', 'ed', 'chuck')]) + q2 = q.select_entity_from(sel).filter(u2.id > 1).\ + order_by(User.id, sel.c.id, u2.id).\ + values(User.name, sel.c.name, u2.name) + eq_( + list(q2), + [ + ('jack', 'jack', 'jack'), ('jack', 'jack', 'ed'), + ('jack', 'jack', 'fred'), ('jack', 'jack', 'chuck'), + ('ed', 'ed', 'jack'), ('ed', 'ed', 'ed'), + ('ed', 'ed', 'fred'), ('ed', 'ed', 'chuck')]) @testing.fails_on('mssql', 'FIXME: unknown') @testing.fails_on('oracle', @@ -1210,51 +1177,44 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) q2 = q.group_by(User.name.like('%j%')).\ - order_by(desc(User.name.like('%j%'))).\ - values(User.name.like('%j%'), func.count(User.name.like('%j%'))) + order_by(desc(User.name.like('%j%'))).\ + values(User.name.like('%j%'), func.count(User.name.like('%j%'))) eq_(list(q2), [(True, 1), (False, 3)]) - q2 = q.order_by(desc(User.name.like('%j%'))).values(User.name.like('%j%')) + q2 = q.order_by(desc(User.name.like('%j%'))). \ + values(User.name.like('%j%')) eq_(list(q2), [(True,), (False,), (False,), (False,)]) - def test_correlated_subquery(self): - """test that a subquery constructed from ORM attributes doesn't leak out - those entities to the outermost query. + """test that a subquery constructed from ORM attributes doesn't leak + out those entities to the outermost query.""" - """ - - Address, users, User = (self.classes.Address, - self.tables.users, - self.classes.User) + Address, users, User = ( + self.classes.Address, self.tables.users, self.classes.User) sess = create_session() - subq = select([func.count()]).\ - where(User.id==Address.user_id).\ - correlate(users).\ - label('count') + subq = select([func.count()]).where(User.id == Address.user_id).\ + correlate(users).label('count') # we don't want Address to be outside of the subquery here eq_( list(sess.query(User, subq)[0:3]), - [(User(id=7,name='jack'), 1), (User(id=8,name='ed'), 3), - (User(id=9,name='fred'), 1)] - ) + [ + (User(id=7, name='jack'), 1), (User(id=8, name='ed'), 3), + (User(id=9, name='fred'), 1)]) # same thing without the correlate, as it should # not be needed - subq = select([func.count()]).\ - where(User.id==Address.user_id).\ + subq = select([func.count()]).where(User.id == Address.user_id).\ label('count') # we don't want Address to be outside of the subquery here eq_( list(sess.query(User, subq)[0:3]), - [(User(id=7,name='jack'), 1), (User(id=8,name='ed'), 3), - (User(id=9,name='fred'), 1)] - ) - + [ + (User(id=7, name='jack'), 1), (User(id=8, name='ed'), 3), + (User(id=9, name='fred'), 1)]) def test_column_queries(self): Address, users, User = (self.classes.Address, @@ -1263,119 +1223,165 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess = create_session() - eq_(sess.query(User.name).all(), [('jack',), ('ed',), ('fred',), ('chuck',)]) + eq_( + sess.query(User.name).all(), + [('jack',), ('ed',), ('fred',), ('chuck',)]) sel = users.select(User.id.in_([7, 8])).alias() q = sess.query(User.name) q2 = q.select_entity_from(sel).all() eq_(list(q2), [('jack',), ('ed',)]) - eq_(sess.query(User.name, Address.email_address).filter(User.id==Address.user_id).all(), [ - ('jack', 'jack@bean.com'), ('ed', 'ed@wood.com'), - ('ed', 'ed@bettyboop.com'), ('ed', 'ed@lala.com'), - ('fred', 'fred@fred.com') - ]) + eq_( + sess.query(User.name, Address.email_address). + filter(User.id == Address.user_id).all(), + [ + ('jack', 'jack@bean.com'), ('ed', 'ed@wood.com'), + ('ed', 'ed@bettyboop.com'), ('ed', 'ed@lala.com'), + ('fred', 'fred@fred.com')]) - eq_(sess.query(User.name, func.count(Address.email_address)).\ - outerjoin(User.addresses).group_by(User.id, User.name).\ - order_by(User.id).all(), - [('jack', 1), ('ed', 3), ('fred', 1), ('chuck', 0)] - ) + eq_( + sess.query(User.name, func.count(Address.email_address)). + outerjoin(User.addresses).group_by(User.id, User.name). + order_by(User.id).all(), + [('jack', 1), ('ed', 3), ('fred', 1), ('chuck', 0)]) - eq_(sess.query(User, func.count(Address.email_address)).\ - outerjoin(User.addresses).group_by(User).\ - order_by(User.id).all(), - [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), - (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)] - ) + eq_( + sess.query(User, func.count(Address.email_address)). + outerjoin(User.addresses).group_by(User). + order_by(User.id).all(), + [ + (User(name='jack', id=7), 1), (User(name='ed', id=8), 3), + (User(name='fred', id=9), 1), (User(name='chuck', id=10), 0)]) - eq_(sess.query(func.count(Address.email_address), User).\ - outerjoin(User.addresses).group_by(User).\ - order_by(User.id).all(), - [(1, User(name='jack',id=7)), (3, User(name='ed',id=8)), - (1, User(name='fred',id=9)), (0, User(name='chuck',id=10))] - ) + eq_( + sess.query(func.count(Address.email_address), User). + outerjoin(User.addresses).group_by(User). + order_by(User.id).all(), + [ + (1, User(name='jack', id=7)), (3, User(name='ed', id=8)), + (1, User(name='fred', id=9)), (0, User(name='chuck', id=10))]) adalias = aliased(Address) - eq_(sess.query(User, func.count(adalias.email_address)).\ - outerjoin(adalias, 'addresses').group_by(User).\ - order_by(User.id).all(), - [(User(name='jack',id=7), 1), (User(name='ed',id=8), 3), - (User(name='fred',id=9), 1), (User(name='chuck',id=10), 0)] - ) + eq_( + sess.query(User, func.count(adalias.email_address)). + outerjoin(adalias, 'addresses').group_by(User). + order_by(User.id).all(), + [ + (User(name='jack', id=7), 1), (User(name='ed', id=8), 3), + (User(name='fred', id=9), 1), (User(name='chuck', id=10), 0)]) - eq_(sess.query(func.count(adalias.email_address), User).\ - outerjoin(adalias, User.addresses).group_by(User).\ - order_by(User.id).all(), - [(1, User(name='jack',id=7)), (3, User(name='ed',id=8)), - (1, User(name='fred',id=9)), (0, User(name='chuck',id=10))] + eq_( + sess.query(func.count(adalias.email_address), User). + outerjoin(adalias, User.addresses).group_by(User). + order_by(User.id).all(), + [ + (1, User(name='jack', id=7)), (3, User(name='ed', id=8)), + (1, User(name='fred', id=9)), (0, User(name='chuck', id=10))] ) # select from aliasing + explicit aliasing eq_( - sess.query(User, adalias.email_address, adalias.id).\ - outerjoin(adalias, User.addresses).\ - from_self(User, adalias.email_address).\ - order_by(User.id, adalias.id).all(), + sess.query(User, adalias.email_address, adalias.id). + outerjoin(adalias, User.addresses). + from_self(User, adalias.email_address). + order_by(User.id, adalias.id).all(), [ - (User(name='jack',id=7), 'jack@bean.com'), - (User(name='ed',id=8), 'ed@wood.com'), - (User(name='ed',id=8), 'ed@bettyboop.com'), - (User(name='ed',id=8), 'ed@lala.com'), - (User(name='fred',id=9), 'fred@fred.com'), - (User(name='chuck',id=10), None) + (User(name='jack', id=7), 'jack@bean.com'), + (User(name='ed', id=8), 'ed@wood.com'), + (User(name='ed', id=8), 'ed@bettyboop.com'), + (User(name='ed', id=8), 'ed@lala.com'), + (User(name='fred', id=9), 'fred@fred.com'), + (User(name='chuck', id=10), None) ] ) # anon + select from aliasing eq_( - sess.query(User).join(User.addresses, aliased=True).\ - filter(Address.email_address.like('%ed%')).\ - from_self().all(), + sess.query(User).join(User.addresses, aliased=True). + filter(Address.email_address.like('%ed%')). + from_self().all(), [ - User(name='ed',id=8), - User(name='fred',id=9), + User(name='ed', id=8), + User(name='fred', id=9), ] ) # test eager aliasing, with/without select_entity_from aliasing for q in [ - sess.query(User, adalias.email_address).\ - outerjoin(adalias, User.addresses).\ - options(joinedload(User.addresses)).\ - order_by(User.id, adalias.id).limit(10), - sess.query(User, adalias.email_address, adalias.id).\ - outerjoin(adalias, User.addresses).\ - from_self(User, adalias.email_address).\ - options(joinedload(User.addresses)).\ - order_by(User.id, adalias.id).limit(10), + sess.query(User, adalias.email_address). + outerjoin(adalias, User.addresses). + options(joinedload(User.addresses)). + order_by(User.id, adalias.id).limit(10), + sess.query(User, adalias.email_address, adalias.id). + outerjoin(adalias, User.addresses). + from_self(User, adalias.email_address). + options(joinedload(User.addresses)). + order_by(User.id, adalias.id).limit(10), ]: eq_( - q.all(), - [(User(addresses=[ - Address(user_id=7,email_address='jack@bean.com',id=1)], - name='jack',id=7), 'jack@bean.com'), - (User(addresses=[ - Address(user_id=8,email_address='ed@wood.com',id=2), - Address(user_id=8,email_address='ed@bettyboop.com',id=3), - Address(user_id=8,email_address='ed@lala.com',id=4)], - name='ed',id=8), 'ed@wood.com'), - (User(addresses=[ - Address(user_id=8,email_address='ed@wood.com',id=2), - Address(user_id=8,email_address='ed@bettyboop.com',id=3), - Address(user_id=8,email_address='ed@lala.com',id=4)],name='ed',id=8), - 'ed@bettyboop.com'), - (User(addresses=[ - Address(user_id=8,email_address='ed@wood.com',id=2), - Address(user_id=8,email_address='ed@bettyboop.com',id=3), - Address(user_id=8,email_address='ed@lala.com',id=4)],name='ed',id=8), - 'ed@lala.com'), - (User(addresses=[Address(user_id=9,email_address='fred@fred.com',id=5)],name='fred',id=9), - 'fred@fred.com'), - - (User(addresses=[],name='chuck',id=10), None)] - ) + [ + ( + User( + addresses=[ + Address( + user_id=7, email_address='jack@bean.com', + id=1)], + name='jack', id=7), + 'jack@bean.com'), + ( + User( + addresses=[ + Address( + user_id=8, email_address='ed@wood.com', + id=2), + Address( + user_id=8, + email_address='ed@bettyboop.com', id=3), + Address( + user_id=8, email_address='ed@lala.com', + id=4)], + name='ed', id=8), + 'ed@wood.com'), + ( + User( + addresses=[ + Address( + user_id=8, email_address='ed@wood.com', + id=2), + Address( + user_id=8, + email_address='ed@bettyboop.com', id=3), + Address( + user_id=8, email_address='ed@lala.com', + id=4)], + name='ed', id=8), + 'ed@bettyboop.com'), + ( + User( + addresses=[ + Address( + user_id=8, email_address='ed@wood.com', + id=2), + Address( + user_id=8, + email_address='ed@bettyboop.com', id=3), + Address( + user_id=8, email_address='ed@lala.com', + id=4)], + name='ed', id=8), + 'ed@lala.com'), + ( + User( + addresses=[ + Address( + user_id=9, email_address='fred@fred.com', + id=5)], + name='fred', id=9), + 'fred@fred.com'), + + (User(addresses=[], name='chuck', id=10), None)]) def test_column_from_limited_joinedload(self): User = self.classes.User @@ -1384,91 +1390,104 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): def go(): results = sess.query(User).limit(1).\ - options(joinedload('addresses')).\ - add_column(User.name).all() + options(joinedload('addresses')).add_column(User.name).all() eq_(results, [(User(name='jack'), 'jack')]) self.assert_sql_count(testing.db, go, 1) @testing.fails_on("firebird", "unknown") - @testing.fails_on('postgresql+pg8000', "'type oid 705 not mapped to py type' (due to literal)") def test_self_referential(self): Order = self.classes.Order - sess = create_session() oalias = aliased(Order) for q in [ - sess.query(Order, oalias).\ - filter(Order.user_id==oalias.user_id).filter(Order.user_id==7).\ - filter(Order.id>oalias.id).order_by(Order.id, oalias.id), - sess.query(Order, oalias).from_self().filter(Order.user_id==oalias.user_id).\ - filter(Order.user_id==7).filter(Order.id>oalias.id).\ - order_by(Order.id, oalias.id), + sess.query(Order, oalias).filter(Order.user_id == oalias.user_id). + filter(Order.user_id == 7). + filter(Order.id > oalias.id).order_by(Order.id, oalias.id), + sess.query(Order, oalias).from_self(). + filter(Order.user_id == oalias.user_id).filter(Order.user_id == 7). + filter(Order.id > oalias.id).order_by(Order.id, oalias.id), # same thing, but reversed. - sess.query(oalias, Order).from_self().filter(oalias.user_id==Order.user_id).\ - filter(oalias.user_id==7).filter(Order.id<oalias.id).\ - order_by(oalias.id, Order.id), + sess.query(oalias, Order).from_self(). + filter(oalias.user_id == Order.user_id). + filter(oalias.user_id == 7).filter(Order.id < oalias.id). + order_by(oalias.id, Order.id), # here we go....two layers of aliasing - sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).\ - filter(Order.user_id==7).filter(Order.id>oalias.id).\ - from_self().order_by(Order.id, oalias.id).\ - limit(10).options(joinedload(Order.items)), + sess.query(Order, oalias).filter(Order.user_id == oalias.user_id). + filter(Order.user_id == 7).filter(Order.id > oalias.id). + from_self().order_by(Order.id, oalias.id). + limit(10).options(joinedload(Order.items)), # gratuitous four layers - sess.query(Order, oalias).filter(Order.user_id==oalias.user_id).\ - filter(Order.user_id==7).filter(Order.id>oalias.id).from_self().\ - from_self().from_self().order_by(Order.id, oalias.id).\ - limit(10).options(joinedload(Order.items)), - + sess.query(Order, oalias).filter(Order.user_id == oalias.user_id). + filter(Order.user_id == 7).filter(Order.id > oalias.id). + from_self().from_self().from_self().order_by(Order.id, oalias.id). + limit(10).options(joinedload(Order.items)), ]: eq_( - q.all(), - [ - (Order(address_id=1,description='order 3',isopen=1,user_id=7,id=3), - Order(address_id=1,description='order 1',isopen=0,user_id=7,id=1)), - (Order(address_id=None,description='order 5',isopen=0,user_id=7,id=5), - Order(address_id=1,description='order 1',isopen=0,user_id=7,id=1)), - (Order(address_id=None,description='order 5',isopen=0,user_id=7,id=5), - Order(address_id=1,description='order 3',isopen=1,user_id=7,id=3)) - ] - ) - + q.all(), + [ + ( + Order( + address_id=1, description='order 3', isopen=1, + user_id=7, id=3), + Order( + address_id=1, description='order 1', isopen=0, + user_id=7, id=1)), + ( + Order( + address_id=None, description='order 5', isopen=0, + user_id=7, id=5), + Order( + address_id=1, description='order 1', isopen=0, + user_id=7, id=1)), + ( + Order( + address_id=None, description='order 5', isopen=0, + user_id=7, id=5), + Order( + address_id=1, description='order 3', isopen=1, + user_id=7, id=3)) + ] + ) - # ensure column expressions are taken from inside the subquery, not restated at the top - q = sess.query(Order.id, Order.description, literal_column("'q'").label('foo')).\ + # ensure column expressions are taken from inside the subquery, not + # restated at the top + q = sess.query( + Order.id, Order.description, + literal_column("'q'").label('foo')).\ filter(Order.description == 'order 3').from_self() - self.assert_compile(q, - "SELECT anon_1.orders_id AS " - "anon_1_orders_id, anon_1.orders_descriptio" - "n AS anon_1_orders_description, " - "anon_1.foo AS anon_1_foo FROM (SELECT " - "orders.id AS orders_id, " - "orders.description AS orders_description, " - "'q' AS foo FROM orders WHERE " - "orders.description = :description_1) AS " - "anon_1") + self.assert_compile( + q, + "SELECT anon_1.orders_id AS " + "anon_1_orders_id, anon_1.orders_descriptio" + "n AS anon_1_orders_description, " + "anon_1.foo AS anon_1_foo FROM (SELECT " + "orders.id AS orders_id, " + "orders.description AS orders_description, " + "'q' AS foo FROM orders WHERE " + "orders.description = :description_1) AS " + "anon_1") eq_( q.all(), [(3, 'order 3', 'q')] ) - def test_multi_mappers(self): Address, addresses, users, User = (self.classes.Address, self.tables.addresses, self.tables.users, self.classes.User) - test_session = create_session() (user7, user8, user9, user10) = test_session.query(User).all() (address1, address2, address3, address4, address5) = \ - test_session.query(Address).all() + test_session.query(Address).all() expected = [(user7, address1), (user8, address2), @@ -1479,14 +1498,17 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess = create_session() - selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id]) - eq_(list(sess.query(User, Address).instances(selectquery.execute())), expected) + selectquery = users.outerjoin(addresses). \ + select(use_labels=True, order_by=[users.c.id, addresses.c.id]) + eq_( + list(sess.query(User, Address).instances(selectquery.execute())), + expected) sess.expunge_all() for address_entity in (Address, aliased(Address)): q = sess.query(User).add_entity(address_entity).\ - outerjoin(address_entity, 'addresses').\ - order_by(User.id, address_entity.id) + outerjoin(address_entity, 'addresses').\ + order_by(User.id, address_entity.id) eq_(q.all(), expected) sess.expunge_all() @@ -1496,14 +1518,16 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): eq_(q.all(), [(user8, address3)]) sess.expunge_all() - q = sess.query(User, address_entity).join(address_entity, 'addresses').\ - filter_by(email_address='ed@bettyboop.com') + q = sess.query(User, address_entity). \ + join(address_entity, 'addresses'). \ + filter_by(email_address='ed@bettyboop.com') eq_(q.all(), [(user8, address3)]) sess.expunge_all() - q = sess.query(User, address_entity).join(address_entity, 'addresses').\ - options(joinedload('addresses')).\ - filter_by(email_address='ed@bettyboop.com') + q = sess.query(User, address_entity). \ + join(address_entity, 'addresses').\ + options(joinedload('addresses')).\ + filter_by(email_address='ed@bettyboop.com') eq_(list(util.OrderedSet(q.all())), [(user8, address3)]) sess.expunge_all() @@ -1516,7 +1540,8 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess = create_session() (user7, user8, user9, user10) = sess.query(User).all() - (address1, address2, address3, address4, address5) = sess.query(Address).all() + (address1, address2, address3, address4, address5) = \ + sess.query(Address).all() expected = [(user7, address1), (user8, address2), @@ -1527,14 +1552,16 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) adalias = addresses.alias('adalias') - q = q.add_entity(Address, alias=adalias).select_entity_from(users.outerjoin(adalias)) + q = q.add_entity(Address, alias=adalias). \ + select_entity_from(users.outerjoin(adalias)) l = q.order_by(User.id, adalias.c.id).all() assert l == expected sess.expunge_all() q = sess.query(User).add_entity(Address, alias=adalias) - l = q.select_entity_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all() + l = q.select_entity_from(users.outerjoin(adalias)). \ + filter(adalias.c.email_address == 'ed@bettyboop.com').all() assert l == [(user8, address3)] def test_with_entities(self): @@ -1542,19 +1569,18 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): sess = create_session() - q = sess.query(User).filter(User.id==7).order_by(User.name) + q = sess.query(User).filter(User.id == 7).order_by(User.name) self.assert_compile( - q.with_entities(User.id,Address).\ - filter(Address.user_id == User.id), - 'SELECT users.id AS users_id, addresses.id ' - 'AS addresses_id, addresses.user_id AS ' - 'addresses_user_id, addresses.email_address' - ' AS addresses_email_address FROM users, ' - 'addresses WHERE users.id = :id_1 AND ' - 'addresses.user_id = users.id ORDER BY ' - 'users.name') - + q.with_entities(User.id, Address). + filter(Address.user_id == User.id), + 'SELECT users.id AS users_id, addresses.id ' + 'AS addresses_id, addresses.user_id AS ' + 'addresses_user_id, addresses.email_address' + ' AS addresses_email_address FROM users, ' + 'addresses WHERE users.id = :id_1 AND ' + 'addresses.user_id = users.id ORDER BY ' + 'users.name') def test_multi_columns(self): users, User = self.tables.users, self.classes.User @@ -1567,14 +1593,14 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): assert sess.query(User).add_column(add_col).all() == expected sess.expunge_all() - assert_raises(sa_exc.InvalidRequestError, sess.query(User).add_column, object()) + assert_raises( + sa_exc.InvalidRequestError, sess.query(User).add_column, object()) def test_add_multi_columns(self): """test that add_column accepts a FROM clause.""" users, User = self.tables.users, self.classes.User - sess = create_session() eq_( @@ -1601,26 +1627,26 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): q = sess.query(User) q = q.group_by(users).order_by(User.id).outerjoin('addresses').\ - add_column(func.count(Address.id).label('count')) + add_column(func.count(Address.id).label('count')) eq_(q.all(), expected) sess.expunge_all() adalias = aliased(Address) q = sess.query(User) - q = q.group_by(users).order_by(User.id).outerjoin(adalias, 'addresses').\ - add_column(func.count(adalias.id).label('count')) + q = q.group_by(users).order_by(User.id). \ + outerjoin(adalias, 'addresses').\ + add_column(func.count(adalias.id).label('count')) eq_(q.all(), expected) sess.expunge_all() # TODO: figure out why group_by(users) doesn't work here - s = select([users, func.count(addresses.c.id).label('count')]).\ - select_from(users.outerjoin(addresses)).\ - group_by(*[c for c in users.c]).order_by(User.id) + s = select([users, func.count(addresses.c.id).label('count')]). \ + select_from(users.outerjoin(addresses)). \ + group_by(*[c for c in users.c]).order_by(User.id) q = sess.query(User) l = q.add_column("count").from_statement(s).all() assert l == expected - def test_raw_columns(self): addresses, users, User = (self.tables.addresses, self.tables.users, @@ -1642,10 +1668,12 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): assert q.all() == expected # test with a straight statement - s = select([users, func.count(addresses.c.id).label('count'), - ("Name:" + users.c.name).label('concat')], - from_obj=[users.outerjoin(addresses)], - group_by=[c for c in users.c], order_by=[users.c.id]) + s = select( + [ + users, func.count(addresses.c.id).label('count'), + ("Name:" + users.c.name).label('concat')], + from_obj=[users.outerjoin(addresses)], + group_by=[c for c in users.c], order_by=[users.c.id]) q = create_session().query(User) l = q.add_column("count").add_column("concat").from_statement(s).all() assert l == expected @@ -1681,26 +1709,26 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): aa = aliased(Address) s = create_session() for crit, j, exp in [ - (User.id + Address.id, User.addresses, - "SELECT users.id + addresses.id AS anon_1 " - "FROM users JOIN addresses ON users.id = " - "addresses.user_id" - ), - (User.id + Address.id, Address.user, - "SELECT users.id + addresses.id AS anon_1 " - "FROM addresses JOIN users ON users.id = " - "addresses.user_id" - ), - (Address.id + User.id, User.addresses, - "SELECT addresses.id + users.id AS anon_1 " - "FROM users JOIN addresses ON users.id = " - "addresses.user_id" - ), - (User.id + aa.id, (aa, User.addresses), - "SELECT users.id + addresses_1.id AS anon_1 " - "FROM users JOIN addresses AS addresses_1 " - "ON users.id = addresses_1.user_id" - ), + ( + User.id + Address.id, User.addresses, + "SELECT users.id + addresses.id AS anon_1 " + "FROM users JOIN addresses ON users.id = " + "addresses.user_id"), + ( + User.id + Address.id, Address.user, + "SELECT users.id + addresses.id AS anon_1 " + "FROM addresses JOIN users ON users.id = " + "addresses.user_id"), + ( + Address.id + User.id, User.addresses, + "SELECT addresses.id + users.id AS anon_1 " + "FROM users JOIN addresses ON users.id = " + "addresses.user_id"), + ( + User.id + aa.id, (aa, User.addresses), + "SELECT users.id + addresses_1.id AS anon_1 " + "FROM users JOIN addresses AS addresses_1 " + "ON users.id = addresses_1.user_id"), ]: q = s.query(crit) mzero = q._mapper_zero() @@ -1709,19 +1737,22 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): self.assert_compile(q, exp) for crit, j, exp in [ - (ua.id + Address.id, ua.addresses, - "SELECT users_1.id + addresses.id AS anon_1 " - "FROM users AS users_1 JOIN addresses " - "ON users_1.id = addresses.user_id"), - (ua.id + aa.id, (aa, ua.addresses), - "SELECT users_1.id + addresses_1.id AS anon_1 " - "FROM users AS users_1 JOIN addresses AS " - "addresses_1 ON users_1.id = addresses_1.user_id"), - (ua.id + aa.id, (ua, aa.user), - "SELECT users_1.id + addresses_1.id AS anon_1 " - "FROM addresses AS addresses_1 JOIN " - "users AS users_1 " - "ON users_1.id = addresses_1.user_id") + ( + ua.id + Address.id, ua.addresses, + "SELECT users_1.id + addresses.id AS anon_1 " + "FROM users AS users_1 JOIN addresses " + "ON users_1.id = addresses.user_id"), + ( + ua.id + aa.id, (aa, ua.addresses), + "SELECT users_1.id + addresses_1.id AS anon_1 " + "FROM users AS users_1 JOIN addresses AS " + "addresses_1 ON users_1.id = addresses_1.user_id"), + ( + ua.id + aa.id, (ua, aa.user), + "SELECT users_1.id + addresses_1.id AS anon_1 " + "FROM addresses AS addresses_1 JOIN " + "users AS users_1 " + "ON users_1.id = addresses_1.user_id") ]: q = s.query(crit) mzero = q._mapper_zero() @@ -1733,87 +1764,99 @@ class MixedEntitiesTest(QueryTest, AssertsCompiledSQL): User, Address = self.classes.User, self.classes.Address sess = Session() - agg_address = sess.query(Address.id, - func.sum(func.length(Address.email_address)).label('email_address') - ).group_by(Address.user_id) + agg_address = sess.query( + Address.id, + func.sum(func.length(Address.email_address)). + label('email_address')).group_by(Address.user_id) ag1 = aliased(Address, agg_address.subquery()) ag2 = aliased(Address, agg_address.subquery(), adapt_on_names=True) - # first, without adapt on names, 'email_address' isn't matched up - we get the raw "address" - # element in the SELECT + # first, without adapt on names, 'email_address' isn't matched up - we + # get the raw "address" element in the SELECT self.assert_compile( - sess.query(User, ag1.email_address).join(ag1, User.addresses).filter(ag1.email_address > 5), - "SELECT users.id AS users_id, users.name AS users_name, addresses.email_address " + sess.query(User, ag1.email_address).join(ag1, User.addresses). + filter(ag1.email_address > 5), + "SELECT users.id " + "AS users_id, users.name AS users_name, addresses.email_address " "AS addresses_email_address FROM addresses, users JOIN " "(SELECT addresses.id AS id, sum(length(addresses.email_address)) " "AS email_address FROM addresses GROUP BY addresses.user_id) AS " - "anon_1 ON users.id = addresses.user_id WHERE addresses.email_address > :email_address_1" - ) + "anon_1 ON users.id = addresses.user_id " + "WHERE addresses.email_address > :email_address_1") - # second, 'email_address' matches up to the aggreagte, and we get a smooth JOIN - # from users->subquery and that's it + # second, 'email_address' matches up to the aggreagte, and we get a + # smooth JOIN from users->subquery and that's it self.assert_compile( - sess.query(User, ag2.email_address).join(ag2, User.addresses).filter(ag2.email_address > 5), + sess.query(User, ag2.email_address).join(ag2, User.addresses). + filter(ag2.email_address > 5), "SELECT users.id AS users_id, users.name AS users_name, " "anon_1.email_address AS anon_1_email_address FROM users " - "JOIN (SELECT addresses.id AS id, sum(length(addresses.email_address)) " + "JOIN (" + "SELECT addresses.id AS id, sum(length(addresses.email_address)) " "AS email_address FROM addresses GROUP BY addresses.user_id) AS " - "anon_1 ON users.id = addresses.user_id WHERE anon_1.email_address > :email_address_1", - ) + "anon_1 ON users.id = addresses.user_id " + "WHERE anon_1.email_address > :email_address_1",) + class SelectFromTest(QueryTest, AssertsCompiledSQL): run_setup_mappers = None __dialect__ = 'default' def test_replace_with_select(self): - users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + users, Address, addresses, User = ( + self.tables.users, self.classes.Address, self.tables.addresses, + self.classes.User) - mapper(User, users, properties = { - 'addresses':relationship(Address) - }) + mapper( + User, users, properties={ + 'addresses': relationship(Address)}) mapper(Address, addresses) sel = users.select(users.c.id.in_([7, 8])).alias() sess = create_session() - eq_(sess.query(User).select_entity_from(sel).all(), [User(id=7), User(id=8)]) + eq_( + sess.query(User).select_entity_from(sel).all(), + [User(id=7), User(id=8)]) - eq_(sess.query(User).select_entity_from(sel).filter(User.id==8).all(), [User(id=8)]) + eq_( + sess.query(User).select_entity_from(sel). + filter(User.id == 8).all(), + [User(id=8)]) - eq_(sess.query(User).select_entity_from(sel).order_by(desc(User.name)).all(), [ - User(name='jack',id=7), User(name='ed',id=8) - ]) + eq_( + sess.query(User).select_entity_from(sel). + order_by(desc(User.name)).all(), [ + User(name='jack', id=7), User(name='ed', id=8)]) - eq_(sess.query(User).select_entity_from(sel).order_by(asc(User.name)).all(), [ - User(name='ed',id=8), User(name='jack',id=7) - ]) + eq_( + sess.query(User).select_entity_from(sel). + order_by(asc(User.name)).all(), [ + User(name='ed', id=8), User(name='jack', id=7)]) - eq_(sess.query(User).select_entity_from(sel).options(joinedload('addresses')).first(), - User(name='jack', addresses=[Address(id=1)]) - ) + eq_( + sess.query(User).select_entity_from(sel). + options(joinedload('addresses')).first(), + User(name='jack', addresses=[Address(id=1)])) def test_join_mapper_order_by(self): """test that mapper-level order_by is adapted to a selectable.""" User, users = self.classes.User, self.tables.users - mapper(User, users, order_by=users.c.id) sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - eq_(sess.query(User).select_entity_from(sel).all(), + eq_( + sess.query(User).select_entity_from(sel).all(), [ - User(name='jack',id=7), User(name='ed',id=8) - ] - ) + User(name='jack', id=7), User(name='ed', id=8)]) def test_differentiate_self_external(self): - """test some different combinations of joining a table to a subquery of itself.""" + """test some different combinations of joining a table to a subquery of + itself.""" users, User = self.tables.users, self.classes.User @@ -1827,48 +1870,52 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): self.assert_compile( sess.query(User).join(sel, User.id > sel.c.id), "SELECT users.id AS users_id, users.name AS users_name FROM " - "users JOIN (SELECT users.id AS id, users.name AS name FROM " - "users WHERE users.id IN (:id_1, :id_2)) AS anon_1 ON users.id > anon_1.id", - ) + "users JOIN (SELECT users.id AS id, users.name AS name FROM users " + "WHERE users.id IN (:id_1, :id_2)) " + "AS anon_1 ON users.id > anon_1.id",) self.assert_compile( - sess.query(ualias).select_entity_from(sel).filter(ualias.id > sel.c.id), - "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM " - "users AS users_1, (SELECT users.id AS id, users.name AS name FROM " - "users WHERE users.id IN (:id_1, :id_2)) AS anon_1 WHERE users_1.id > anon_1.id", - ) + sess.query(ualias).select_entity_from(sel). + filter(ualias.id > sel.c.id), + "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM users AS users_1, (" + "SELECT users.id AS id, users.name AS name FROM users " + "WHERE users.id IN (:id_1, :id_2)) AS anon_1 " + "WHERE users_1.id > anon_1.id",) self.assert_compile( - sess.query(ualias).select_entity_from(sel).join(ualias, ualias.id > sel.c.id), + sess.query(ualias).select_entity_from(sel). + join(ualias, ualias.id > sel.c.id), "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name " "FROM (SELECT users.id AS id, users.name AS name " "FROM users WHERE users.id IN (:id_1, :id_2)) AS anon_1 " - "JOIN users AS users_1 ON users_1.id > anon_1.id" - ) + "JOIN users AS users_1 ON users_1.id > anon_1.id") self.assert_compile( - sess.query(ualias).select_entity_from(sel).join(ualias, ualias.id > User.id), + sess.query(ualias).select_entity_from(sel). + join(ualias, ualias.id > User.id), "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name " "FROM (SELECT users.id AS id, users.name AS name FROM " "users WHERE users.id IN (:id_1, :id_2)) AS anon_1 " - "JOIN users AS users_1 ON users_1.id > anon_1.id" - ) + "JOIN users AS users_1 ON users_1.id > anon_1.id") salias = aliased(User, sel) self.assert_compile( sess.query(salias).join(ualias, ualias.id > salias.id), "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name FROM " - "(SELECT users.id AS id, users.name AS name FROM users WHERE users.id " - "IN (:id_1, :id_2)) AS anon_1 JOIN users AS users_1 ON users_1.id > anon_1.id", - ) + "(SELECT users.id AS id, users.name AS name " + "FROM users WHERE users.id IN (:id_1, :id_2)) AS anon_1 " + "JOIN users AS users_1 ON users_1.id > anon_1.id",) self.assert_compile( - sess.query(ualias).select_entity_from(join(sel, ualias, ualias.id > sel.c.id)), - "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name FROM " - "(SELECT users.id AS id, users.name AS name FROM users WHERE users.id " - "IN (:id_1, :id_2)) AS anon_1 JOIN users AS users_1 ON users_1.id > anon_1.id" - ) - + sess.query(ualias).select_entity_from( + join(sel, ualias, ualias.id > sel.c.id)), + "SELECT users_1.id AS users_1_id, users_1.name AS users_1_name " + "FROM " + "(SELECT users.id AS id, users.name AS name " + "FROM users WHERE users.id " + "IN (:id_1, :id_2)) AS anon_1 " + "JOIN users AS users_1 ON users_1.id > anon_1.id") def test_aliased_class_vs_nonaliased(self): User, users = self.classes.User, self.tables.users @@ -1884,13 +1931,15 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): ) self.assert_compile( - sess.query(User.name).select_from(ua).join(User, ua.name > User.name), + sess.query(User.name).select_from(ua). + join(User, ua.name > User.name), "SELECT users.name AS users_name FROM users AS users_1 " "JOIN users ON users_1.name > users.name" ) self.assert_compile( - sess.query(ua.name).select_from(ua).join(User, ua.name > User.name), + sess.query(ua.name).select_from(ua). + join(User, ua.name > User.name), "SELECT users_1.name AS users_1_name FROM users AS users_1 " "JOIN users ON users_1.name > users.name" ) @@ -1910,11 +1959,10 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): # this is tested in many other places here, just adding it # here for comparison self.assert_compile( - sess.query(User.name).\ - select_entity_from(users.select().where(users.c.id > 5)), + sess.query(User.name).select_entity_from( + users.select().where(users.c.id > 5)), "SELECT anon_1.name AS anon_1_name FROM (SELECT users.id AS id, " - "users.name AS name FROM users WHERE users.id > :id_1) AS anon_1" - ) + "users.name AS name FROM users WHERE users.id > :id_1) AS anon_1") def test_join_no_order_by(self): User, users = self.classes.User, self.tables.users @@ -1924,11 +1972,9 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - eq_(sess.query(User).select_entity_from(sel).all(), - [ - User(name='jack',id=7), User(name='ed',id=8) - ] - ) + eq_( + sess.query(User).select_entity_from(sel).all(), + [User(name='jack', id=7), User(name='ed', id=8)]) def test_join_relname_from_selected_from(self): User, Address = self.classes.User, self.classes.Address @@ -1977,110 +2023,116 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): def test_join(self): - users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + users, Address, addresses, User = ( + self.tables.users, self.classes.Address, self.tables.addresses, + self.classes.User) - mapper(User, users, properties = { - 'addresses':relationship(Address) - }) + mapper(User, users, properties={'addresses': relationship(Address)}) mapper(Address, addresses) sel = users.select(users.c.id.in_([7, 8])) sess = create_session() - eq_(sess.query(User).select_entity_from(sel).join('addresses'). - add_entity(Address).order_by(User.id).order_by(Address.id).all(), + eq_( + sess.query(User).select_entity_from(sel).join('addresses'). + add_entity(Address).order_by(User.id).order_by(Address.id).all(), [ - (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4)) - ] - ) + ( + User(name='jack', id=7), + Address(user_id=7, email_address='jack@bean.com', id=1)), + ( + User(name='ed', id=8), + Address(user_id=8, email_address='ed@wood.com', id=2)), + ( + User(name='ed', id=8), + Address( + user_id=8, email_address='ed@bettyboop.com', id=3)), + ( + User(name='ed', id=8), + Address(user_id=8, email_address='ed@lala.com', id=4))]) adalias = aliased(Address) - eq_(sess.query(User).select_entity_from(sel).join(adalias, 'addresses'). - add_entity(adalias).order_by(User.id).order_by(adalias.id).all(), + eq_( + sess.query(User).select_entity_from(sel). + join(adalias, 'addresses').add_entity(adalias).order_by(User.id). + order_by(adalias.id).all(), [ - (User(name='jack',id=7), Address(user_id=7,email_address='jack@bean.com',id=1)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@wood.com',id=2)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@bettyboop.com',id=3)), - (User(name='ed',id=8), Address(user_id=8,email_address='ed@lala.com',id=4)) - ] - ) - + ( + User(name='jack', id=7), + Address(user_id=7, email_address='jack@bean.com', id=1)), + ( + User(name='ed', id=8), + Address(user_id=8, email_address='ed@wood.com', id=2)), + ( + User(name='ed', id=8), + Address( + user_id=8, email_address='ed@bettyboop.com', id=3)), + ( + User(name='ed', id=8), + Address(user_id=8, email_address='ed@lala.com', id=4))]) def test_more_joins(self): - users, Keyword, orders, items, order_items, Order, Item, \ - User, keywords, item_keywords = (self.tables.users, - self.classes.Keyword, - self.tables.orders, - self.tables.items, - self.tables.order_items, - self.classes.Order, - self.classes.Item, - self.classes.User, - self.tables.keywords, - self.tables.item_keywords) - - mapper(User, users, properties={ - 'orders':relationship(Order, backref='user'), # o2m, m2o - }) - mapper(Order, orders, properties={ - 'items':relationship(Item, secondary=order_items, - order_by=items.c.id), #m2m - }) - mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords, - order_by=keywords.c.id) #m2m - }) + ( + users, Keyword, orders, items, order_items, Order, Item, User, + keywords, item_keywords) = \ + ( + self.tables.users, self.classes.Keyword, self.tables.orders, + self.tables.items, self.tables.order_items, self.classes.Order, + self.classes.Item, self.classes.User, self.tables.keywords, + self.tables.item_keywords) + + mapper( + User, users, properties={ + 'orders': relationship(Order, backref='user')}) # o2m, m2o + mapper( + Order, orders, properties={ + 'items': relationship( + Item, secondary=order_items, order_by=items.c.id)}) # m2m + + mapper( + Item, items, properties={ + 'keywords': relationship( + Keyword, secondary=item_keywords, + order_by=keywords.c.id)}) # m2m mapper(Keyword, keywords) sess = create_session() sel = users.select(users.c.id.in_([7, 8])) - eq_(sess.query(User).select_entity_from(sel).\ - join('orders', 'items', 'keywords').\ - filter(Keyword.name.in_(['red', 'big', 'round'])).\ - all(), - [ - User(name='jack',id=7) - ]) + eq_( + sess.query(User).select_entity_from(sel). + join('orders', 'items', 'keywords'). + filter(Keyword.name.in_(['red', 'big', 'round'])).all(), + [User(name='jack', id=7)]) - eq_(sess.query(User).select_entity_from(sel).\ - join('orders', 'items', 'keywords', aliased=True).\ - filter(Keyword.name.in_(['red', 'big', 'round'])).\ - all(), - [ - User(name='jack',id=7) - ]) + eq_( + sess.query(User).select_entity_from(sel). + join('orders', 'items', 'keywords', aliased=True). + filter(Keyword.name.in_(['red', 'big', 'round'])).all(), + [User(name='jack', id=7)]) def test_very_nested_joins_with_joinedload(self): - users, Keyword, orders, items, order_items, Order, Item, \ - User, keywords, item_keywords = (self.tables.users, - self.classes.Keyword, - self.tables.orders, - self.tables.items, - self.tables.order_items, - self.classes.Order, - self.classes.Item, - self.classes.User, - self.tables.keywords, - self.tables.item_keywords) - - mapper(User, users, properties={ - 'orders':relationship(Order, backref='user'), # o2m, m2o - }) - mapper(Order, orders, properties={ - 'items':relationship(Item, secondary=order_items, - order_by=items.c.id), #m2m - }) - mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords, - order_by=keywords.c.id) #m2m - }) + ( + users, Keyword, orders, items, order_items, Order, Item, User, + keywords, item_keywords) = \ + ( + self.tables.users, self.classes.Keyword, self.tables.orders, + self.tables.items, self.tables.order_items, self.classes.Order, + self.classes.Item, self.classes.User, self.tables.keywords, + self.tables.item_keywords) + + mapper( + User, users, properties={ + 'orders': relationship(Order, backref='user')}) # o2m, m2o + mapper( + Order, orders, properties={ + 'items': relationship( + Item, secondary=order_items, order_by=items.c.id)}) # m2m + mapper( + Item, items, properties={ + 'keywords': relationship( + Keyword, secondary=item_keywords, + order_by=keywords.c.id)}) # m2m mapper(Keyword, keywords) sess = create_session() @@ -2090,65 +2142,61 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): def go(): eq_( sess.query(User).select_entity_from(sel). - options(joinedload_all('orders.items.keywords')). - join('orders', 'items', 'keywords', aliased=True). - filter(Keyword.name.in_(['red', 'big', 'round'])).\ - all(), + options(joinedload_all('orders.items.keywords')). + join('orders', 'items', 'keywords', aliased=True). + filter(Keyword.name.in_(['red', 'big', 'round'])). + all(), [ - User(name='jack',orders=[ - Order(description='order 1',items=[ - Item(description='item 1', - keywords=[ - Keyword(name='red'), - Keyword(name='big'), - Keyword(name='round') - ]), - Item(description='item 2', - keywords=[ - Keyword(name='red',id=2), - Keyword(name='small',id=5), - Keyword(name='square') - ]), - Item(description='item 3', - keywords=[ - Keyword(name='green',id=3), - Keyword(name='big',id=4), - Keyword(name='round',id=6)]) - ]), - Order(description='order 3',items=[ - Item(description='item 3', - keywords=[ - Keyword(name='green',id=3), - Keyword(name='big',id=4), - Keyword(name='round',id=6) - ]), - Item(description='item 4',keywords=[],id=4), - Item(description='item 5',keywords=[],id=5) - ]), - Order(description='order 5', - items=[ - Item(description='item 5',keywords=[])]) - ]) - ]) + User(name='jack', orders=[ + Order( + description='order 1', items=[ + Item( + description='item 1', keywords=[ + Keyword(name='red'), + Keyword(name='big'), + Keyword(name='round')]), + Item( + description='item 2', keywords=[ + Keyword(name='red', id=2), + Keyword(name='small', id=5), + Keyword(name='square')]), + Item( + description='item 3', keywords=[ + Keyword(name='green', id=3), + Keyword(name='big', id=4), + Keyword(name='round', id=6)])]), + Order( + description='order 3', items=[ + Item( + description='item 3', keywords=[ + Keyword(name='green', id=3), + Keyword(name='big', id=4), + Keyword(name='round', id=6)]), + Item(description='item 4', keywords=[], id=4), + Item( + description='item 5', keywords=[], id=5)]), + Order( + description='order 5', + items=[ + Item(description='item 5', keywords=[])])])]) self.assert_sql_count(testing.db, go, 1) sess.expunge_all() - sel2 = orders.select(orders.c.id.in_([1,2,3])) - eq_(sess.query(Order).select_entity_from(sel2).\ - join('items', 'keywords').\ - filter(Keyword.name == 'red').\ - order_by(Order.id).all(), [ - Order(description='order 1',id=1), - Order(description='order 2',id=2), - ]) - eq_(sess.query(Order).select_entity_from(sel2).\ - join('items', 'keywords', aliased=True).\ - filter(Keyword.name == 'red').\ - order_by(Order.id).all(), [ - Order(description='order 1',id=1), - Order(description='order 2',id=2), - ]) - + sel2 = orders.select(orders.c.id.in_([1, 2, 3])) + eq_( + sess.query(Order).select_entity_from(sel2). + join('items', 'keywords').filter(Keyword.name == 'red'). + order_by(Order.id).all(), + [ + Order(description='order 1', id=1), + Order(description='order 2', id=2)]) + eq_( + sess.query(Order).select_entity_from(sel2). + join('items', 'keywords', aliased=True). + filter(Keyword.name == 'red').order_by(Order.id).all(), + [ + Order(description='order 1', id=1), + Order(description='order 2', id=2)]) def test_replace_with_eager(self): users, Address, addresses, User = (self.tables.users, @@ -2156,76 +2204,93 @@ class SelectFromTest(QueryTest, AssertsCompiledSQL): self.tables.addresses, self.classes.User) - mapper(User, users, properties = { - 'addresses':relationship(Address, order_by=addresses.c.id) - }) + mapper( + User, users, properties={ + 'addresses': relationship(Address, order_by=addresses.c.id)}) mapper(Address, addresses) sel = users.select(users.c.id.in_([7, 8])) sess = create_session() def go(): - eq_(sess.query(User).options( - joinedload('addresses') - ).select_entity_from(sel).order_by(User.id).all(), + eq_( + sess.query(User).options(joinedload('addresses')). + select_entity_from(sel).order_by(User.id).all(), [ User(id=7, addresses=[Address(id=1)]), - User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]) - ] - ) + User( + id=8, addresses=[Address(id=2), Address(id=3), + Address(id=4)])]) self.assert_sql_count(testing.db, go, 1) sess.expunge_all() def go(): - eq_(sess.query(User).options( - joinedload('addresses') - ).select_entity_from(sel).filter(User.id==8).order_by(User.id).all(), - [User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])] - ) + eq_( + sess.query(User).options(joinedload('addresses')). + select_entity_from(sel).filter(User.id == 8).order_by(User.id). + all(), + [ + User( + id=8, addresses=[Address(id=2), Address(id=3), + Address(id=4)])]) self.assert_sql_count(testing.db, go, 1) sess.expunge_all() def go(): - eq_(sess.query(User).options( - joinedload('addresses') - ).select_entity_from(sel).order_by(User.id)[1], - User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])) + eq_( + sess.query(User).options(joinedload('addresses')). + select_entity_from(sel).order_by(User.id)[1], + User( + id=8, addresses=[Address(id=2), Address(id=3), + Address(id=4)])) self.assert_sql_count(testing.db, go, 1) + class CustomJoinTest(QueryTest): run_setup_mappers = None def test_double_same_mappers(self): """test aliasing of joins with a custom join condition""" - addresses, items, order_items, orders, Item, User, Address, Order, users = (self.tables.addresses, - self.tables.items, - self.tables.order_items, - self.tables.orders, - self.classes.Item, - self.classes.User, - self.classes.Address, - self.classes.Order, - self.tables.users) + ( + addresses, items, order_items, orders, Item, User, Address, Order, + users) = \ + ( + self.tables.addresses, self.tables.items, + self.tables.order_items, self.tables.orders, self.classes.Item, + self.classes.User, self.classes.Address, self.classes.Order, + self.tables.users) mapper(Address, addresses) - mapper(Order, orders, properties={ - 'items':relationship(Item, secondary=order_items, lazy='select', order_by=items.c.id), - }) + mapper( + Order, orders, properties={ + 'items': relationship( + Item, secondary=order_items, lazy='select', + order_by=items.c.id)}) mapper(Item, items) - mapper(User, users, properties = dict( - addresses = relationship(Address, lazy='select'), - open_orders = relationship(Order, primaryjoin = and_(orders.c.isopen == 1, users.c.id==orders.c.user_id), lazy='select'), - closed_orders = relationship(Order, primaryjoin = and_(orders.c.isopen == 0, users.c.id==orders.c.user_id), lazy='select') - )) + mapper( + User, users, properties=dict( + addresses=relationship(Address, lazy='select'), + open_orders=relationship( + Order, + primaryjoin=and_( + orders.c.isopen == 1, users.c.id == orders.c.user_id), + lazy='select'), + closed_orders=relationship( + Order, + primaryjoin=and_( + orders.c.isopen == 0, users.c.id == orders.c.user_id), + lazy='select'))) q = create_session().query(User) eq_( - q.join('open_orders', 'items', aliased=True).filter(Item.id==4).\ - join('closed_orders', 'items', aliased=True).filter(Item.id==3).all(), + q.join('open_orders', 'items', aliased=True).filter(Item.id == 4). + join('closed_orders', 'items', aliased=True).filter(Item.id == 3). + all(), [User(id=7)] ) + class ExternalColumnsTest(QueryTest): """test mappers with SQL-expressions added as column properties.""" @@ -2234,45 +2299,48 @@ class ExternalColumnsTest(QueryTest): def test_external_columns_bad(self): users, User = self.tables.users, self.classes.User - - assert_raises_message(sa_exc.ArgumentError, "not represented in the mapper's table", mapper, User, users, properties={ - 'concat': (users.c.id * 2), - }) + assert_raises_message( + sa_exc.ArgumentError, + "not represented in the mapper's table", mapper, User, users, + properties={ + 'concat': (users.c.id * 2), + }) clear_mappers() def test_external_columns(self): - """test querying mappings that reference external columns or selectables.""" + """test querying mappings that reference external columns or + selectables.""" users, Address, addresses, User = (self.tables.users, self.classes.Address, self.tables.addresses, self.classes.User) - - mapper(User, users, properties={ - 'concat': column_property((users.c.id * 2)), - 'count': column_property( - select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).\ - correlate(users).\ - as_scalar()) - }) + mapper( + User, users, properties={ + 'concat': column_property((users.c.id * 2)), + 'count': column_property( + select( + [func.count(addresses.c.id)], + users.c.id == addresses.c.user_id).correlate(users). + as_scalar())}) mapper(Address, addresses, properties={ - 'user':relationship(User) + 'user': relationship(User) }) sess = create_session() sess.query(Address).options(joinedload('user')).all() - eq_(sess.query(User).all(), + eq_( + sess.query(User).all(), [ User(id=7, concat=14, count=1), User(id=8, concat=16, count=3), User(id=9, concat=18, count=1), User(id=10, concat=20, count=0), - ] - ) + ]) address_result = [ Address(id=1, user=User(id=7, concat=14, count=1)), @@ -2286,10 +2354,11 @@ class ExternalColumnsTest(QueryTest): # run the eager version twice to test caching of aliased clauses for x in range(2): sess.expunge_all() + def go(): - eq_(sess.query(Address).\ - options(joinedload('user')).\ - order_by(Address.id).all(), + eq_( + sess.query(Address).options(joinedload('user')). + order_by(Address.id).all(), address_result) self.assert_sql_count(testing.db, go, 1) @@ -2300,22 +2369,21 @@ class ExternalColumnsTest(QueryTest): ) eq_( - sess.query(Address, ualias.count).\ - join(ualias, 'user').\ - join('user', aliased=True).\ - order_by(Address.id).all(), - [ - (Address(id=1), 1), - (Address(id=2), 3), - (Address(id=3), 3), - (Address(id=4), 3), - (Address(id=5), 1) - ] - ) + sess.query(Address, ualias.count).join(ualias, 'user'). + join('user', aliased=True).order_by(Address.id).all(), + [ + (Address(id=1), 1), + (Address(id=2), 3), + (Address(id=3), 3), + (Address(id=4), 3), + (Address(id=5), 1) + ] + ) - eq_(sess.query(Address, ualias.concat, ualias.count). - join(ualias, 'user'). - join('user', aliased=True).order_by(Address.id).all(), + eq_( + sess.query(Address, ualias.concat, ualias.count). + join(ualias, 'user'). + join('user', aliased=True).order_by(Address.id).all(), [ (Address(id=1), 14, 1), (Address(id=2), 16, 3), @@ -2326,25 +2394,34 @@ class ExternalColumnsTest(QueryTest): ) ua = aliased(User) - eq_(sess.query(Address, ua.concat, ua.count). - select_entity_from(join(Address, ua, 'user')). - options(joinedload(Address.user)).order_by(Address.id).all(), + eq_( + sess.query(Address, ua.concat, ua.count). + select_entity_from(join(Address, ua, 'user')). + options(joinedload(Address.user)).order_by(Address.id).all(), [ (Address(id=1, user=User(id=7, concat=14, count=1)), 14, 1), (Address(id=2, user=User(id=8, concat=16, count=3)), 16, 3), (Address(id=3, user=User(id=8, concat=16, count=3)), 16, 3), (Address(id=4, user=User(id=8, concat=16, count=3)), 16, 3), (Address(id=5, user=User(id=9, concat=18, count=1)), 18, 1) - ] - ) + ]) - eq_(list(sess.query(Address).join('user').values(Address.id, User.id, User.concat, User.count)), - [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)] - ) + eq_( + list( + sess.query(Address).join('user'). + values(Address.id, User.id, User.concat, User.count)), + [ + (1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), + (5, 9, 18, 1)]) - eq_(list(sess.query(Address, ua).select_entity_from(join(Address,ua, 'user')).values(Address.id, ua.id, ua.concat, ua.count)), - [(1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), (5, 9, 18, 1)] - ) + eq_( + list( + sess.query(Address, ua). + select_entity_from(join(Address, ua, 'user')). + values(Address.id, ua.id, ua.concat, ua.count)), + [ + (1, 7, 14, 1), (2, 8, 16, 3), (3, 8, 16, 3), (4, 8, 16, 3), + (5, 9, 18, 1)]) def test_external_columns_joinedload(self): users, orders, User, Address, Order, addresses = (self.tables.users, @@ -2354,49 +2431,57 @@ class ExternalColumnsTest(QueryTest): self.classes.Order, self.tables.addresses) - # in this test, we have a subquery on User that accesses "addresses", underneath - # an joinedload for "addresses". So the "addresses" alias adapter needs to *not* hit - # the "addresses" table within the "user" subquery, but "user" still needs to be adapted. - # therefore the long standing practice of eager adapters being "chained" has been removed + # in this test, we have a subquery on User that accesses "addresses", + # underneath an joinedload for "addresses". So the "addresses" alias + # adapter needs to *not* hit the "addresses" table within the "user" + # subquery, but "user" still needs to be adapted. therefore the long + # standing practice of eager adapters being "chained" has been removed # since its unnecessary and breaks this exact condition. - mapper(User, users, properties={ - 'addresses':relationship(Address, backref='user', order_by=addresses.c.id), - 'concat': column_property((users.c.id * 2)), - 'count': column_property(select([func.count(addresses.c.id)], users.c.id==addresses.c.user_id).correlate(users)) - }) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, backref='user', order_by=addresses.c.id), + 'concat': column_property((users.c.id * 2)), + 'count': column_property( + select( + [func.count(addresses.c.id)], + users.c.id == addresses.c.user_id).correlate(users))}) mapper(Address, addresses) - mapper(Order, orders, properties={ - 'address':relationship(Address), # m2o - }) + mapper( + Order, orders, properties={ + 'address': relationship(Address)}) # m2o sess = create_session() + def go(): - o1 = sess.query(Order).options(joinedload_all('address.user')).get(1) + o1 = sess.query(Order).options(joinedload_all('address.user')). \ + get(1) eq_(o1.address.user.count, 1) self.assert_sql_count(testing.db, go, 1) sess = create_session() + def go(): - o1 = sess.query(Order).options(joinedload_all('address.user')).first() + o1 = sess.query(Order).options(joinedload_all('address.user')). \ + first() eq_(o1.address.user.count, 1) self.assert_sql_count(testing.db, go, 1) def test_external_columns_compound(self): # see [ticket:2167] for background - users, Address, addresses, User = (self.tables.users, - self.classes.Address, - self.tables.addresses, - self.classes.User) + users, Address, addresses, User = ( + self.tables.users, self.classes.Address, self.tables.addresses, + self.classes.User) - mapper(User, users, properties={ - 'fullname':column_property(users.c.name.label('x')) - }) + mapper( + User, users, properties={ + 'fullname': column_property(users.c.name.label('x'))}) - mapper(Address, addresses, properties={ - 'username':column_property( - select([User.fullname]).\ - where(User.id==addresses.c.user_id).label('y')) - }) + mapper( + Address, addresses, properties={ + 'username': column_property( + select([User.fullname]). + where(User.id == addresses.c.user_id).label('y'))}) sess = create_session() a1 = sess.query(Address).first() eq_(a1.username, "jack") @@ -2409,36 +2494,44 @@ class ExternalColumnsTest(QueryTest): class TestOverlyEagerEquivalentCols(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): - base = Table('base', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table( + 'base', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50)) ) - sub1 = Table('sub1', metadata, + Table( + 'sub1', metadata, Column('id', Integer, ForeignKey('base.id'), primary_key=True), Column('data', String(50)) ) - sub2 = Table('sub2', metadata, - Column('id', Integer, ForeignKey('base.id'), ForeignKey('sub1.id'), primary_key=True), + Table( + 'sub2', metadata, + Column( + 'id', Integer, ForeignKey('base.id'), ForeignKey('sub1.id'), + primary_key=True), Column('data', String(50)) ) def test_equivs(self): - base, sub2, sub1 = (self.tables.base, - self.tables.sub2, - self.tables.sub1) + base, sub2, sub1 = ( + self.tables.base, self.tables.sub2, self.tables.sub1) class Base(fixtures.ComparableEntity): pass + class Sub1(fixtures.ComparableEntity): pass + class Sub2(fixtures.ComparableEntity): pass mapper(Base, base, properties={ - 'sub1':relationship(Sub1), - 'sub2':relationship(Sub2) + 'sub1': relationship(Sub1), + 'sub2': relationship(Sub2) }) mapper(Sub1, sub1) @@ -2463,11 +2556,12 @@ class TestOverlyEagerEquivalentCols(fixtures.MappedTest): assert sub1.c.id not in q._filter_aliases.equivalents eq_( - sess.query(Base).join('sub1').outerjoin('sub2', aliased=True).\ - filter(Sub1.id==1).one(), - b1 + sess.query(Base).join('sub1').outerjoin('sub2', aliased=True). + filter(Sub1.id == 1).one(), + b1 ) + class LabelCollideTest(fixtures.MappedTest): """Test handling for a label collision. This collision is handled by core, see ticket:2702 as well as @@ -2478,18 +2572,19 @@ class LabelCollideTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): - Table('foo', metadata, - Column('id', Integer, primary_key=True), - Column('bar_id', Integer) + Table( + 'foo', metadata, + Column('id', Integer, primary_key=True), + Column('bar_id', Integer) ) - Table('foo_bar', metadata, - Column('id', Integer, primary_key=True), - ) + Table('foo_bar', metadata, Column('id', Integer, primary_key=True)) @classmethod def setup_classes(cls): + class Foo(cls.Basic): pass + class Bar(cls.Basic): pass @@ -2510,6 +2605,7 @@ class LabelCollideTest(fixtures.MappedTest): def test_overlap_plain(self): s = Session() row = s.query(self.classes.Foo, self.classes.Bar).all()[0] + def go(): eq_(row.Foo.id, 1) eq_(row.Foo.bar_id, 2) @@ -2521,10 +2617,11 @@ class LabelCollideTest(fixtures.MappedTest): def test_overlap_subquery(self): s = Session() row = s.query(self.classes.Foo, self.classes.Bar).from_self().all()[0] + def go(): eq_(row.Foo.id, 1) eq_(row.Foo.bar_id, 2) eq_(row.Bar.id, 3) # all three columns are loaded independently without # overlap, no additional SQL to load all attributes - self.assert_sql_count(testing.db, go, 0)
\ No newline at end of file + self.assert_sql_count(testing.db, go, 0) diff --git a/test/orm/test_naturalpks.py b/test/orm/test_naturalpks.py index 9f35b001a..5a72881a0 100644 --- a/test/orm/test_naturalpks.py +++ b/test/orm/test_naturalpks.py @@ -3,18 +3,15 @@ Primary key changing capabilities and passive/non-passive cascading updates. """ -from sqlalchemy.testing import eq_, ne_, \ - assert_raises, assert_raises_message +from sqlalchemy.testing import fixtures, eq_, ne_, assert_raises import sqlalchemy as sa -from sqlalchemy import testing -from sqlalchemy import Integer, String, ForeignKey, Unicode +from sqlalchemy import testing, Integer, String, ForeignKey from sqlalchemy.testing.schema import Table, Column -from sqlalchemy.orm import mapper, relationship, create_session, backref, Session +from sqlalchemy.orm import mapper, relationship, create_session, Session from sqlalchemy.orm.session import make_transient -from sqlalchemy.testing import eq_ -from sqlalchemy.testing import fixtures from test.orm import _fixtures + def _backend_specific_fk_args(): if testing.requires.deferrable_fks.enabled: fk_args = dict(deferrable=True, initially='deferred') @@ -24,6 +21,7 @@ def _backend_specific_fk_args(): fk_args = dict(onupdate='cascade') return fk_args + class NaturalPKTest(fixtures.MappedTest): # MySQL 5.5 on Windows crashes (the entire server, not the client) # if you screw around with ON UPDATE CASCADE type of stuff. @@ -34,37 +32,44 @@ class NaturalPKTest(fixtures.MappedTest): def define_tables(cls, metadata): fk_args = _backend_specific_fk_args() - users = Table('users', metadata, + Table('users', metadata, Column('username', String(50), primary_key=True), Column('fullname', String(100)), test_needs_fk=True) - addresses = Table('addresses', metadata, + Table( + 'addresses', metadata, Column('email', String(50), primary_key=True), - Column('username', String(50), - ForeignKey('users.username', **fk_args)), + Column( + 'username', String(50), + ForeignKey('users.username', **fk_args)), test_needs_fk=True) - items = Table('items', metadata, + Table( + 'items', metadata, Column('itemname', String(50), primary_key=True), Column('description', String(100)), test_needs_fk=True) - users_to_items = Table('users_to_items', metadata, - Column('username', String(50), - ForeignKey('users.username', **fk_args), - primary_key=True), - Column('itemname', String(50), - ForeignKey('items.itemname', **fk_args), - primary_key=True), + Table( + 'users_to_items', metadata, + Column( + 'username', String(50), + ForeignKey('users.username', **fk_args), primary_key=True), + Column( + 'itemname', String(50), + ForeignKey('items.itemname', **fk_args), primary_key=True), test_needs_fk=True) @classmethod def setup_classes(cls): + class User(cls.Comparable): pass + class Address(cls.Comparable): pass + class Item(cls.Comparable): pass @@ -105,7 +110,7 @@ class NaturalPKTest(fixtures.MappedTest): sess.flush() assert sess.query(User).get('jack') is u1 - users.update(values={User.username:'jack'}).execute(username='ed') + users.update(values={User.username: 'jack'}).execute(username='ed') # expire/refresh works off of primary key. the PK is gone # in this case so there's no way to look it up. criterion- @@ -134,7 +139,6 @@ class NaturalPKTest(fixtures.MappedTest): sess.expunge_all() assert sess.query(User).get('ed').fullname == 'jack' - @testing.requires.on_update_cascade def test_onetomany_passive(self): self._test_onetomany(True) @@ -148,9 +152,10 @@ class NaturalPKTest(fixtures.MappedTest): self.tables.addresses, self.classes.User) - mapper(User, users, properties={ - 'addresses':relationship(Address, passive_updates=passive_updates) - }) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, passive_updates=passive_updates)}) mapper(Address, addresses) sess = create_session() @@ -167,11 +172,13 @@ class NaturalPKTest(fixtures.MappedTest): assert u1.addresses[0].username == 'ed' sess.expunge_all() - eq_([Address(username='ed'), Address(username='ed')], - sess.query(Address).all()) + eq_( + [Address(username='ed'), Address(username='ed')], + sess.query(Address).all()) u1 = sess.query(User).get('ed') u1.username = 'jack' + def go(): sess.flush() if not passive_updates: @@ -182,10 +189,10 @@ class NaturalPKTest(fixtures.MappedTest): # test passive_updates=True; update user self.assert_sql_count(testing.db, go, 1) sess.expunge_all() - assert User(username='jack', addresses=[ - Address(username='jack'), - Address(username='jack')]) == \ - sess.query(User).get('jack') + assert User( + username='jack', addresses=[ + Address(username='jack'), + Address(username='jack')]) == sess.query(User).get('jack') u1 = sess.query(User).get('jack') u1.addresses = [] @@ -215,15 +222,9 @@ class NaturalPKTest(fixtures.MappedTest): self.classes.User) with testing.db.begin() as conn: - conn.execute(users.insert(), - username='jack', fullname='jack' - ) - conn.execute(addresses.insert(), - email='jack1', username='jack' - ) - conn.execute(addresses.insert(), - email='jack2', username='jack' - ) + conn.execute(users.insert(), username='jack', fullname='jack') + conn.execute(addresses.insert(), email='jack1', username='jack') + conn.execute(addresses.insert(), email='jack2', username='jack') mapper(User, users) mapper(Address, addresses, properties={ @@ -277,9 +278,9 @@ class NaturalPKTest(fixtures.MappedTest): assert a1.username == a2.username == 'ed' sess.expunge_all() - eq_([Address(username='ed'), Address(username='ed')], - sess.query(Address).all()) - + eq_( + [Address(username='ed'), Address(username='ed')], + sess.query(Address).all()) @testing.requires.on_update_cascade def test_onetoone_passive(self): @@ -294,10 +295,10 @@ class NaturalPKTest(fixtures.MappedTest): self.tables.addresses, self.classes.User) - mapper(User, users, properties={ - "address":relationship(Address, passive_updates=passive_updates, - uselist=False) - }) + mapper( + User, users, properties={ + "address": relationship( + Address, passive_updates=passive_updates, uselist=False)}) mapper(Address, addresses) sess = create_session() @@ -359,6 +360,7 @@ class NaturalPKTest(fixtures.MappedTest): u1.username = 'ed' (ad1, ad2) = sess.query(Address).all() eq_([Address(username='jack'), Address(username='jack')], [ad1, ad2]) + def go(): sess.flush() if passive_updates: @@ -367,12 +369,14 @@ class NaturalPKTest(fixtures.MappedTest): self.assert_sql_count(testing.db, go, 3) eq_([Address(username='ed'), Address(username='ed')], [ad1, ad2]) sess.expunge_all() - eq_([Address(username='ed'), Address(username='ed')], - sess.query(Address).all()) + eq_( + [Address(username='ed'), Address(username='ed')], + sess.query(Address).all()) u1 = sess.query(User).get('ed') assert len(u1.addresses) == 2 # load addresses u1.username = 'fred' + def go(): sess.flush() # check that the passive_updates is on on the other side @@ -381,15 +385,14 @@ class NaturalPKTest(fixtures.MappedTest): else: self.assert_sql_count(testing.db, go, 3) sess.expunge_all() - eq_([Address(username='fred'), Address(username='fred')], - sess.query(Address).all()) - + eq_( + [Address(username='fred'), Address(username='fred')], + sess.query(Address).all()) @testing.requires.on_update_cascade def test_manytomany_passive(self): self._test_manytomany(True) - @testing.requires.non_updating_cascade def test_manytomany_nonpassive(self): self._test_manytomany(False) @@ -400,10 +403,11 @@ class NaturalPKTest(fixtures.MappedTest): self.classes.User, self.tables.users_to_items) - mapper(User, users, properties={ - 'items':relationship(Item, secondary=users_to_items, - backref='users', - passive_updates=passive_updates)}) + mapper( + User, users, properties={ + 'items': relationship( + Item, secondary=users_to_items, backref='users', + passive_updates=passive_updates)}) mapper(Item, items) sess = create_session() @@ -427,10 +431,12 @@ class NaturalPKTest(fixtures.MappedTest): eq_(Item(itemname='item2'), r[1]) eq_(['jack', 'fred'], [u.username for u in r[1].users]) - u2.username='ed' + u2.username = 'ed' + def go(): sess.flush() go() + def go(): sess.flush() self.assert_sql_count(testing.db, go, 0) @@ -444,11 +450,12 @@ class NaturalPKTest(fixtures.MappedTest): sess.expunge_all() u2 = sess.query(User).get(u2.username) - u2.username='wendy' + u2.username = 'wendy' sess.flush() r = sess.query(Item).with_parent(u2).all() eq_(Item(itemname='item2'), r[0]) + class TransientExceptionTesst(_fixtures.FixtureTest): run_inserts = None __backend__ = True @@ -465,7 +472,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest): self.classes.User) mapper(User, users) - mapper(Address, addresses, properties={'user':relationship(User)}) + mapper(Address, addresses, properties={'user': relationship(User)}) sess = create_session() u1 = User(id=5, name='u1') @@ -475,7 +482,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest): make_transient(u1) u1.id = None - u1.username='u2' + u1.username = 'u2' sess.add(u1) sess.flush() @@ -487,6 +494,7 @@ class TransientExceptionTesst(_fixtures.FixtureTest): ne_(u1.id, None) eq_(sess.query(User).count(), 2) + class ReversePKsTest(fixtures.MappedTest): """reverse the primary keys of two entities and ensure bookkeeping succeeds.""" @@ -578,6 +586,7 @@ class ReversePKsTest(fixtures.MappedTest): eq_(a_published.status, PUBLISHED) eq_(a_editable.status, EDITABLE) + class SelfReferentialTest(fixtures.MappedTest): # mssql, mysql don't allow # ON UPDATE on self-referential keys @@ -590,12 +599,11 @@ class SelfReferentialTest(fixtures.MappedTest): def define_tables(cls, metadata): fk_args = _backend_specific_fk_args() - Table('nodes', metadata, - Column('name', String(50), primary_key=True), - Column('parent', String(50), - ForeignKey('nodes.name', **fk_args)), - test_needs_fk=True - ) + Table( + 'nodes', metadata, + Column('name', String(50), primary_key=True), + Column('parent', String(50), ForeignKey('nodes.name', **fk_args)), + test_needs_fk=True) @classmethod def setup_classes(cls): @@ -605,12 +613,14 @@ class SelfReferentialTest(fixtures.MappedTest): def test_one_to_many_on_m2o(self): Node, nodes = self.classes.Node, self.tables.nodes - mapper(Node, nodes, properties={ - 'children': relationship(Node, - backref=sa.orm.backref('parentnode', - remote_side=nodes.c.name, - passive_updates=False), - )}) + mapper( + Node, nodes, properties={ + 'children': relationship( + Node, + backref=sa.orm.backref( + 'parentnode', remote_side=nodes.c.name, + passive_updates=False), + )}) sess = Session() n1 = Node(name='n1') @@ -631,12 +641,13 @@ class SelfReferentialTest(fixtures.MappedTest): def test_one_to_many_on_o2m(self): Node, nodes = self.classes.Node, self.tables.nodes - mapper(Node, nodes, properties={ - 'children': relationship(Node, - backref=sa.orm.backref('parentnode', - remote_side=nodes.c.name), - passive_updates=False - )}) + mapper( + Node, nodes, properties={ + 'children': relationship( + Node, + backref=sa.orm.backref( + 'parentnode', remote_side=nodes.c.name), + passive_updates=False)}) sess = Session() n1 = Node(name='n1') @@ -664,11 +675,10 @@ class SelfReferentialTest(fixtures.MappedTest): def _test_many_to_one(self, passive): Node, nodes = self.classes.Node, self.tables.nodes - mapper(Node, nodes, properties={ - 'parentnode':relationship(Node, - remote_side=nodes.c.name, - passive_updates=passive) - } + mapper( + Node, nodes, properties={ + 'parentnode': relationship( + Node, remote_side=nodes.c.name, passive_updates=passive)} ) sess = Session() @@ -681,10 +691,11 @@ class SelfReferentialTest(fixtures.MappedTest): n1.name = 'new n1' sess.commit() - eq_(['new n1', 'new n1', 'new n1'], - [n.parent - for n in sess.query(Node).filter( - Node.name.in_(['n11', 'n12', 'n13']))]) + eq_( + ['new n1', 'new n1', 'new n1'], + [ + n.parent for n in sess.query(Node).filter( + Node.name.in_(['n11', 'n12', 'n13']))]) class NonPKCascadeTest(fixtures.MappedTest): @@ -695,26 +706,32 @@ class NonPKCascadeTest(fixtures.MappedTest): def define_tables(cls, metadata): fk_args = _backend_specific_fk_args() - Table('users', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + Table( + 'users', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('username', String(50), unique=True), Column('fullname', String(100)), test_needs_fk=True) - Table('addresses', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('email', String(50)), - Column('username', String(50), - ForeignKey('users.username', **fk_args)), - test_needs_fk=True - ) + Table( + 'addresses', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('email', String(50)), + Column( + 'username', String(50), + ForeignKey('users.username', **fk_args)), + test_needs_fk=True) @classmethod def setup_classes(cls): + class User(cls.Comparable): pass + class Address(cls.Comparable): pass @@ -731,9 +748,10 @@ class NonPKCascadeTest(fixtures.MappedTest): self.tables.users, self.tables.addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address, - passive_updates=passive_updates)}) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, passive_updates=passive_updates)}) mapper(Address, addresses) sess = create_session() @@ -744,37 +762,41 @@ class NonPKCascadeTest(fixtures.MappedTest): sess.flush() a1 = u1.addresses[0] - eq_(sa.select([addresses.c.username]).execute().fetchall(), - [('jack',), ('jack',)]) + eq_( + sa.select([addresses.c.username]).execute().fetchall(), + [('jack',), ('jack',)]) assert sess.query(Address).get(a1.id) is u1.addresses[0] u1.username = 'ed' sess.flush() assert u1.addresses[0].username == 'ed' - eq_(sa.select([addresses.c.username]).execute().fetchall(), - [('ed',), ('ed',)]) + eq_( + sa.select([addresses.c.username]).execute().fetchall(), + [('ed',), ('ed',)]) sess.expunge_all() - eq_([Address(username='ed'), Address(username='ed')], - sess.query(Address).all()) + eq_( + [Address(username='ed'), Address(username='ed')], + sess.query(Address).all()) u1 = sess.query(User).get(u1.id) u1.username = 'jack' + def go(): sess.flush() if not passive_updates: # test passive_updates=False; load addresses, - # update user, update 2 addresses + # update user, update 2 addresses self.assert_sql_count(testing.db, go, 4) else: - # test passive_updates=True; update user + # test passive_updates=True; update user self.assert_sql_count(testing.db, go, 1) sess.expunge_all() - assert User(username='jack', - addresses=[Address(username='jack'), - Address(username='jack')]) == \ - sess.query(User).get(u1.id) + assert User( + username='jack', addresses=[ + Address(username='jack'), + Address(username='jack')]) == sess.query(User).get(u1.id) sess.expunge_all() u1 = sess.query(User).get(u1.id) @@ -785,8 +807,9 @@ class NonPKCascadeTest(fixtures.MappedTest): a1 = sess.query(Address).get(a1.id) eq_(a1.username, None) - eq_(sa.select([addresses.c.username]).execute().fetchall(), - [(None,), (None,)]) + eq_( + sa.select([addresses.c.username]).execute().fetchall(), + [(None,), (None,)]) u1 = sess.query(User).get(u1.id) eq_(User(username='fred', fullname='jack'), u1) @@ -805,20 +828,22 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): Column('username', String(50), primary_key=True), test_needs_fk=True) - Table('addresses', metadata, - Column('username', String(50), - ForeignKey('users.username', **fk_args), - primary_key=True - ), - Column('email', String(50), primary_key=True), - Column('etc', String(50)), - test_needs_fk=True - ) + Table( + 'addresses', metadata, + Column( + 'username', String(50), + ForeignKey('users.username', **fk_args), + primary_key=True), + Column('email', String(50), primary_key=True), + Column('etc', String(50)), + test_needs_fk=True) @classmethod def setup_classes(cls): + class User(cls.Comparable): pass + class Address(cls.Comparable): pass @@ -849,9 +874,10 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): self.tables.users, self.tables.addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address, - passive_updates=passive_updates)}) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, passive_updates=passive_updates)}) mapper(Address, addresses) sess = create_session() @@ -882,9 +908,10 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): self.tables.users, self.tables.addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address, - passive_updates=passive_updates)}) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, passive_updates=passive_updates)}) mapper(Address, addresses) sess = create_session() @@ -915,7 +942,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User, passive_updates=passive_updates) + 'user': relationship(User, passive_updates=passive_updates) }) sess = create_session() @@ -924,7 +951,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): sess.add_all([u1, a1]) sess.flush() - u1.username='edmodified' + u1.username = 'edmodified' sess.flush() eq_(a1.username, 'edmodified') @@ -945,9 +972,9 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): # tests [ticket:1856] mapper(User, users) - mapper(Address, addresses, properties={ - 'user':relationship(User, passive_updates=passive_updates) - }) + mapper( + Address, addresses, properties={ + 'user': relationship(User, passive_updates=passive_updates)}) sess = create_session() u1 = User(username='jack') @@ -959,7 +986,6 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): a1.user = u2 sess.flush() - def test_rowswitch_doesntfire(self): User, Address, users, addresses = (self.classes.User, self.classes.Address, @@ -968,7 +994,7 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): mapper(User, users) mapper(Address, addresses, properties={ - 'user':relationship(User, passive_updates=True) + 'user': relationship(User, passive_updates=True) }) sess = create_session() @@ -991,16 +1017,14 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): # test that the primary key columns of addresses are not # being updated as well, since this is a row switch. - self.assert_sql_execution(testing.db, - sess.flush, - CompiledSQL( - "UPDATE addresses SET etc=:etc WHERE " - "addresses.username = :addresses_username AND" - " addresses.email = :addresses_email", - {'etc': 'foo', 'addresses_username':'ed', - 'addresses_email':'ed@host1'} ), - ) - + self.assert_sql_execution( + testing.db, sess.flush, CompiledSQL( + "UPDATE addresses SET etc=:etc WHERE " + "addresses.username = :addresses_username AND" + " addresses.email = :addresses_email", { + 'etc': 'foo', 'addresses_username': 'ed', + 'addresses_email': 'ed@host1'}), + ) def _test_onetomany(self, passive_updates): """Change the PK of a related entity via foreign key cascade. @@ -1016,30 +1040,33 @@ class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL): self.tables.users, self.tables.addresses) - mapper(User, users, properties={ - 'addresses':relationship(Address, - passive_updates=passive_updates)}) + mapper( + User, users, properties={ + 'addresses': relationship( + Address, passive_updates=passive_updates)}) mapper(Address, addresses) sess = create_session() - a1, a2 = Address(username='ed', email='ed@host1'),\ - Address(username='ed', email='ed@host2') + a1, a2 = Address(username='ed', email='ed@host1'), \ + Address(username='ed', email='ed@host2') u1 = User(username='ed', addresses=[a1, a2]) sess.add(u1) sess.flush() eq_(a1.username, 'ed') eq_(a2.username, 'ed') - eq_(sa.select([addresses.c.username]).execute().fetchall(), - [('ed',), ('ed',)]) + eq_( + sa.select([addresses.c.username]).execute().fetchall(), + [('ed',), ('ed',)]) u1.username = 'jack' - a2.email='ed@host3' + a2.email = 'ed@host3' sess.flush() eq_(a1.username, 'jack') eq_(a2.username, 'jack') - eq_(sa.select([addresses.c.username]).execute().fetchall(), - [('jack',), ('jack', )]) + eq_( + sa.select([addresses.c.username]).execute().fetchall(), + [('jack',), ('jack', )]) class JoinedInheritanceTest(fixtures.MappedTest): @@ -1055,34 +1082,39 @@ class JoinedInheritanceTest(fixtures.MappedTest): def define_tables(cls, metadata): fk_args = _backend_specific_fk_args() - Table('person', metadata, + Table( + 'person', metadata, Column('name', String(50), primary_key=True), Column('type', String(50), nullable=False), test_needs_fk=True) - Table('engineer', metadata, - Column('name', String(50), ForeignKey('person.name', **fk_args), - primary_key=True), + Table( + 'engineer', metadata, + Column( + 'name', String(50), ForeignKey('person.name', **fk_args), + primary_key=True), Column('primary_language', String(50)), - Column('boss_name', String(50), - ForeignKey('manager.name', **fk_args)), - test_needs_fk=True + Column( + 'boss_name', String(50), + ForeignKey('manager.name', **fk_args)), + test_needs_fk=True ) - Table('manager', metadata, - Column('name', String(50), - ForeignKey('person.name', **fk_args), - primary_key=True), - Column('paperwork', String(50)), - test_needs_fk=True + Table( + 'manager', metadata, Column('name', String(50), + ForeignKey('person.name', **fk_args), primary_key=True), + Column('paperwork', String(50)), test_needs_fk=True ) @classmethod def setup_classes(cls): + class Person(cls.Comparable): pass + class Engineer(Person): pass + class Manager(Person): pass @@ -1104,25 +1136,22 @@ class JoinedInheritanceTest(fixtures.MappedTest): self._test_fk(False) def _test_pk(self, passive_updates): - Person, Manager, person, manager, Engineer, engineer = (self.classes.Person, - self.classes.Manager, - self.tables.person, - self.tables.manager, - self.classes.Engineer, - self.tables.engineer) - - mapper(Person, person, polymorphic_on=person.c.type, - polymorphic_identity='person', - passive_updates=passive_updates) - mapper(Engineer, engineer, inherits=Person, + Person, Manager, person, manager, Engineer, engineer = ( + self.classes.Person, self.classes.Manager, self.tables.person, + self.tables.manager, self.classes.Engineer, self.tables.engineer) + + mapper( + Person, person, polymorphic_on=person.c.type, + polymorphic_identity='person', passive_updates=passive_updates) + mapper( + Engineer, engineer, inherits=Person, polymorphic_identity='engineer', properties={ - 'boss':relationship(Manager, - primaryjoin=manager.c.name==engineer.c.boss_name, - passive_updates=passive_updates - ) - }) - mapper(Manager, manager, inherits=Person, - polymorphic_identity='manager') + 'boss': relationship( + Manager, + primaryjoin=manager.c.name == engineer.c.boss_name, + passive_updates=passive_updates)}) + mapper( + Manager, manager, inherits=Person, polymorphic_identity='manager') sess = sa.orm.sessionmaker()() @@ -1134,32 +1163,28 @@ class JoinedInheritanceTest(fixtures.MappedTest): sess.commit() def _test_fk(self, passive_updates): - Person, Manager, person, manager, Engineer, engineer = (self.classes.Person, - self.classes.Manager, - self.tables.person, - self.tables.manager, - self.classes.Engineer, - self.tables.engineer) - - mapper(Person, person, polymorphic_on=person.c.type, - polymorphic_identity='person', - passive_updates=passive_updates) - mapper(Engineer, engineer, inherits=Person, - polymorphic_identity='engineer', properties={ - 'boss':relationship(Manager, - primaryjoin=manager.c.name==engineer.c.boss_name, - passive_updates=passive_updates - ) - }) - mapper(Manager, manager, inherits=Person, - polymorphic_identity='manager') + Person, Manager, person, manager, Engineer, engineer = ( + self.classes.Person, self.classes.Manager, self.tables.person, + self.tables.manager, self.classes.Engineer, self.tables.engineer) + + mapper( + Person, person, polymorphic_on=person.c.type, + polymorphic_identity='person', passive_updates=passive_updates) + mapper( + Engineer, engineer, inherits=Person, + polymorphic_identity='engineer', properties={ + 'boss': relationship( + Manager, + primaryjoin=manager.c.name == engineer.c.boss_name, + passive_updates=passive_updates)}) + mapper( + Manager, manager, inherits=Person, polymorphic_identity='manager') sess = sa.orm.sessionmaker()() m1 = Manager(name='dogbert', paperwork='lots') - e1, e2 = \ - Engineer(name='dilbert', primary_language='java', boss=m1),\ - Engineer(name='wally', primary_language='c++', boss=m1) + e1, e2 = Engineer(name='dilbert', primary_language='java', boss=m1),\ + Engineer(name='wally', primary_language='c++', boss=m1) sess.add_all([ e1, e2, m1 ]) @@ -1176,6 +1201,3 @@ class JoinedInheritanceTest(fixtures.MappedTest): eq_(e1.boss_name, 'pointy haired') eq_(e2.boss_name, 'pointy haired') - - - diff --git a/test/orm/test_query.py b/test/orm/test_query.py index f7cd732ee..984a3f93f 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -1,37 +1,32 @@ -from sqlalchemy.sql import operators -from sqlalchemy import MetaData, null, exists, text, union, literal, \ - literal_column, func, between, Unicode, desc, and_, bindparam, \ - select, distinct, or_, collate, insert, Integer, String, Boolean -from sqlalchemy import inspect -from sqlalchemy import exc as sa_exc, util -from sqlalchemy.sql import compiler, table, column -from sqlalchemy.sql import expression +from sqlalchemy import ( + testing, null, exists, text, union, literal, literal_column, func, between, + Unicode, desc, and_, bindparam, select, distinct, or_, collate, insert, + Integer, String, Boolean, exc as sa_exc, util, cast) +from sqlalchemy.sql import operators, column, expression from sqlalchemy.engine import default -from sqlalchemy.orm import attributes, mapper, relationship, backref, \ - configure_mappers, create_session, synonym, Session, class_mapper, \ - aliased, column_property, joinedload_all, joinedload, Query,\ - util as orm_util +from sqlalchemy.orm import ( + attributes, mapper, relationship, create_session, synonym, Session, + aliased, column_property, joinedload_all, joinedload, Query, Bundle) from sqlalchemy.testing.assertsql import CompiledSQL from sqlalchemy.testing.schema import Table, Column import sqlalchemy as sa -from sqlalchemy import testing -from sqlalchemy.testing.assertions import eq_, assert_raises, assert_raises_message -from sqlalchemy.testing import AssertsCompiledSQL +from sqlalchemy.testing.assertions import ( + eq_, assert_raises, assert_raises_message) +from sqlalchemy.testing import fixtures, AssertsCompiledSQL from test.orm import _fixtures -from sqlalchemy.testing import fixtures, engines -from sqlalchemy.orm import Bundle -from sqlalchemy.orm.util import join, outerjoin, with_parent +from sqlalchemy.orm.util import join, with_parent + class QueryTest(_fixtures.FixtureTest): run_setup_mappers = 'once' run_inserts = 'once' run_deletes = None - @classmethod def setup_mappers(cls): cls._setup_stock_mapping() + class MiscTest(QueryTest): run_create_tables = None run_inserts = None @@ -45,19 +40,17 @@ class MiscTest(QueryTest): assert q2.session is s2 assert q1.session is s1 + class RowTupleTest(QueryTest): run_setup_mappers = None def test_custom_names(self): User, users = self.classes.User, self.tables.users - mapper(User, users, properties={ - 'uname': users.c.name - }) + mapper(User, users, properties={'uname': users.c.name}) - row = create_session().\ - query(User.id, User.uname).\ - filter(User.id == 7).first() + row = create_session().query(User.id, User.uname).\ + filter(User.id == 7).first() assert row.id == 7 assert row.uname == 'jack' @@ -78,40 +71,50 @@ class RowTupleTest(QueryTest): for q, asserted in [ ( sess.query(User), - [{'name':'User', 'type':User, 'aliased':False, 'expr':User}] + [ + { + 'name': 'User', 'type': User, 'aliased': False, + 'expr': User}] ), ( sess.query(User.id, User), [ - {'name':'id', 'type':users.c.id.type, 'aliased':False, - 'expr':User.id}, - {'name':'User', 'type':User, 'aliased':False, 'expr':User} + { + 'name': 'id', 'type': users.c.id.type, + 'aliased': False, 'expr': User.id}, + { + 'name': 'User', 'type': User, 'aliased': False, + 'expr': User} ] ), ( sess.query(User.id, user_alias), [ - {'name':'id', 'type':users.c.id.type, 'aliased':False, - 'expr':User.id}, - {'name':None, 'type':User, 'aliased':True, - 'expr':user_alias} + { + 'name': 'id', 'type': users.c.id.type, + 'aliased': False, 'expr': User.id}, + { + 'name': None, 'type': User, 'aliased': True, + 'expr': user_alias} ] ), ( sess.query(address_alias), [ - {'name':'aalias', 'type':Address, 'aliased':True, - 'expr':address_alias} + { + 'name': 'aalias', 'type': Address, 'aliased': True, + 'expr': address_alias} ] ), ( sess.query(name_label, fn), [ - {'name':'uname', 'type':users.c.name.type, - 'aliased':False, 'expr':name_label}, - {'name':None, 'type':fn.type, 'aliased':False, - 'expr':fn - }, + { + 'name': 'uname', 'type': users.c.name.type, + 'aliased': False, 'expr': name_label}, + { + 'name': None, 'type': fn.type, 'aliased': False, + 'expr': fn}, ] ), ( @@ -129,7 +132,6 @@ class RowTupleTest(QueryTest): asserted ) - def test_unhashable_type(self): from sqlalchemy.types import TypeDecorator, Integer from sqlalchemy.sql import type_coerce @@ -137,6 +139,7 @@ class RowTupleTest(QueryTest): class MyType(TypeDecorator): impl = Integer hashable = False + def process_result_value(self, value, dialect): return [value] @@ -145,14 +148,14 @@ class RowTupleTest(QueryTest): mapper(User, users) s = Session() - q = s.\ - query(User, type_coerce(users.c.id, MyType).label('foo')).\ - filter(User.id == 7) + q = s.query(User, type_coerce(users.c.id, MyType).label('foo')).\ + filter(User.id == 7) row = q.first() eq_( row, (User(id=7), [7]) ) + class RawSelectTest(QueryTest, AssertsCompiledSQL): __dialect__ = 'default' @@ -205,14 +208,15 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): Address = self.classes.Address self.assert_compile( - select([User.name, Address.id, - select([func.count(Address.id)]).\ - where(User.id == Address.user_id).\ - correlate(User).as_scalar() - ]), + select( + [ + User.name, Address.id, + select([func.count(Address.id)]). + where(User.id == Address.user_id). + correlate(User).as_scalar()]), "SELECT users.name, addresses.id, " "(SELECT count(addresses.id) AS count_1 " - "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 " + "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 " "FROM users, addresses" ) @@ -222,18 +226,19 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): uu = aliased(User, name="uu") self.assert_compile( - select([uu.name, Address.id, - select([func.count(Address.id)]).\ - where(uu.id == Address.user_id).\ - correlate(uu).as_scalar() - ]), + select( + [ + uu.name, Address.id, + select([func.count(Address.id)]). + where(uu.id == Address.user_id). + correlate(uu).as_scalar()]), # for a long time, "uu.id = address.user_id" was reversed; # this was resolved as of #2872 and had to do with # InstrumentedAttribute.__eq__() taking precedence over # QueryableAttribute.__eq__() "SELECT uu.name, addresses.id, " "(SELECT count(addresses.id) AS count_1 " - "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 " + "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 " "FROM users AS uu, addresses" ) @@ -300,7 +305,7 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): q = s.query(User.id, User.name).filter_by(name='ed') self.assert_compile( insert(Address).from_select( - (Address.id, Address.email_address), q), + (Address.id, Address.email_address), q), "INSERT INTO addresses (id, email_address) " "SELECT users.id AS users_id, users.name AS users_name " "FROM users WHERE users.name = :name_1" @@ -352,16 +357,20 @@ class RawSelectTest(QueryTest, AssertsCompiledSQL): class Foo(object): pass - mapper(Foo, self.tables.users, properties={ - 'foob': column_property(func.coalesce(self.tables.users.c.name)) + mapper( + Foo, self.tables.users, properties={ + 'foob': column_property( + func.coalesce(self.tables.users.c.name)) }) self.assert_compile( select([Foo]).where(Foo.foob == 'somename').order_by(Foo.foob), "SELECT users.id, users.name FROM users " - "WHERE coalesce(users.name) = :coalesce_1 ORDER BY coalesce(users.name)" + "WHERE coalesce(users.name) = :coalesce_1 " + "ORDER BY coalesce(users.name)" ) + class GetTest(QueryTest): def test_get(self): User = self.classes.User @@ -379,13 +388,13 @@ class GetTest(QueryTest): CompositePk = self.classes.CompositePk s = Session() - assert s.query(CompositePk).get((100,100)) is None + assert s.query(CompositePk).get((100, 100)) is None def test_get_composite_pk_result(self): CompositePk = self.classes.CompositePk s = Session() - one_two = s.query(CompositePk).get((1,2)) + one_two = s.query(CompositePk).get((1, 2)) assert one_two.i == 1 assert one_two.j == 2 assert one_two.k == 3 @@ -424,48 +433,48 @@ class GetTest(QueryTest): users, addresses = self.tables.users, self.tables.addresses - s = users.outerjoin(addresses) class UserThing(fixtures.ComparableEntity): pass - mapper(UserThing, s, properties={ - 'id':(users.c.id, addresses.c.user_id), - 'address_id':addresses.c.id, - }) + mapper( + UserThing, s, properties={ + 'id': (users.c.id, addresses.c.user_id), + 'address_id': addresses.c.id, + }) sess = create_session() u10 = sess.query(UserThing).get((10, None)) - eq_(u10, - UserThing(id=10) - ) + eq_(u10, UserThing(id=10)) def test_no_criterion(self): - """test that get()/load() does not use preexisting filter/etc. criterion""" + """test that get()/load() does not use preexisting filter/etc. + criterion""" User, Address = self.classes.User, self.classes.Address - s = create_session() - q = s.query(User).join('addresses').filter(Address.user_id==8) + q = s.query(User).join('addresses').filter(Address.user_id == 8) assert_raises(sa_exc.InvalidRequestError, q.get, 7) - assert_raises(sa_exc.InvalidRequestError, s.query(User).filter(User.id==7).get, 19) + assert_raises( + sa_exc.InvalidRequestError, + s.query(User).filter(User.id == 7).get, 19) # order_by()/get() doesn't raise s.query(User).order_by(User.id).get(8) def test_no_criterion_when_already_loaded(self): - """test that get()/load() does not use preexisting filter/etc. criterion, - even when we're only using the identity map.""" + """test that get()/load() does not use preexisting filter/etc. + criterion, even when we're only using the identity map.""" User, Address = self.classes.User, self.classes.Address s = create_session() - u1 = s.query(User).get(7) + s.query(User).get(7) - q = s.query(User).join('addresses').filter(Address.user_id==8) + q = s.query(User).join('addresses').filter(Address.user_id == 8) assert_raises(sa_exc.InvalidRequestError, q.get, 7) def test_unique_param_names(self): @@ -473,7 +482,7 @@ class GetTest(QueryTest): class SomeUser(object): pass - s = users.select(users.c.id!=12).alias('users') + s = users.select(users.c.id != 12).alias('users') m = mapper(SomeUser, s) assert s.primary_key == m.primary_key @@ -502,7 +511,7 @@ class GetTest(QueryTest): s.query(User).populate_existing().get(7) assert u2 not in s.dirty - assert u2.name =='jack' + assert u2.name == 'jack' assert a not in u2.addresses @testing.provide_metadata @@ -513,18 +522,23 @@ class GetTest(QueryTest): oracle unless it is converted to an encoded string""" metadata = self.metadata - table = Table('unicode_data', metadata, - Column('id', Unicode(40), primary_key=True, test_needs_autoincrement=True), + table = Table( + 'unicode_data', metadata, + Column( + 'id', Unicode(40), primary_key=True, + test_needs_autoincrement=True), Column('data', Unicode(40))) metadata.create_all() ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8') table.insert().execute(id=ustring, data=ustring) + class LocalFoo(self.classes.Base): pass mapper(LocalFoo, table) - eq_(create_session().query(LocalFoo).get(ustring), - LocalFoo(id=ustring, data=ustring)) + eq_( + create_session().query(LocalFoo).get(ustring), + LocalFoo(id=ustring, data=ustring)) def test_populate_existing(self): User, Address = self.classes.User, self.classes.Address @@ -556,7 +570,9 @@ class GetTest(QueryTest): assert u.orders[1].items[2].description == 'item 12' # eager load does - s.query(User).options(joinedload('addresses'), joinedload_all('orders.items')).populate_existing().all() + s.query(User). \ + options(joinedload('addresses'), joinedload_all('orders.items')). \ + populate_existing().all() assert u.addresses[0].email_address == 'jack@bean.com' assert u.orders[1].items[2].description == 'item 5' @@ -574,7 +590,8 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): ): assert_raises(sa_exc.InvalidRequestError, q.join, "addresses") - assert_raises(sa_exc.InvalidRequestError, q.filter, User.name=='ed') + assert_raises( + sa_exc.InvalidRequestError, q.filter, User.name == 'ed') assert_raises(sa_exc.InvalidRequestError, q.filter_by, name='ed') @@ -585,7 +602,7 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): assert_raises(sa_exc.InvalidRequestError, q.having, 'foo') q.enable_assertions(False).join("addresses") - q.enable_assertions(False).filter(User.name=='ed') + q.enable_assertions(False).filter(User.name == 'ed') q.enable_assertions(False).order_by('foo') q.enable_assertions(False).group_by('foo') @@ -615,7 +632,7 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): s = create_session() q = s.query(User) - assert_raises(sa_exc.ArgumentError, q.select_from, User.id==5) + assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5) assert_raises(sa_exc.ArgumentError, q.select_from, User.id) def test_invalid_from_statement(self): @@ -625,8 +642,9 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): s = create_session() q = s.query(User) - assert_raises(sa_exc.ArgumentError, q.from_statement, User.id==5) - assert_raises(sa_exc.ArgumentError, q.from_statement, users.join(addresses)) + assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5) + assert_raises( + sa_exc.ArgumentError, q.from_statement, users.join(addresses)) def test_invalid_column(self): User = self.classes.User @@ -643,27 +661,31 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1)) def test_distinct(self): - """test that a distinct() call is not valid before 'clauseelement' conditions.""" + """test that a distinct() call is not valid before 'clauseelement' + conditions.""" User = self.classes.User - s = create_session() q = s.query(User).distinct() assert_raises(sa_exc.InvalidRequestError, q.select_from, User) - assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table")) + assert_raises( + sa_exc.InvalidRequestError, q.from_statement, + text("select * from table")) assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) def test_order_by(self): - """test that an order_by() call is not valid before 'clauseelement' conditions.""" + """test that an order_by() call is not valid before 'clauseelement' + conditions.""" User = self.classes.User - s = create_session() q = s.query(User).order_by(User.id) assert_raises(sa_exc.InvalidRequestError, q.select_from, User) - assert_raises(sa_exc.InvalidRequestError, q.from_statement, text("select * from table")) + assert_raises( + sa_exc.InvalidRequestError, q.from_statement, + text("select * from table")) assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) def test_cancel_order_by(self): @@ -672,23 +694,29 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): s = create_session() q = s.query(User).order_by(User.id) - self.assert_compile(q, - "SELECT users.id AS users_id, users.name AS users_name FROM users ORDER BY users.id", + self.assert_compile( + q, + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users ORDER BY users.id", use_default_dialect=True) - assert_raises(sa_exc.InvalidRequestError, q._no_select_modifiers, "foo") + assert_raises( + sa_exc.InvalidRequestError, q._no_select_modifiers, "foo") q = q.order_by(None) - self.assert_compile(q, - "SELECT users.id AS users_id, users.name AS users_name FROM users", - use_default_dialect=True) + self.assert_compile( + q, + "SELECT users.id AS users_id, users.name AS users_name FROM users", + use_default_dialect=True) - assert_raises(sa_exc.InvalidRequestError, q._no_select_modifiers, "foo") + assert_raises( + sa_exc.InvalidRequestError, q._no_select_modifiers, "foo") q = q.order_by(False) - self.assert_compile(q, - "SELECT users.id AS users_id, users.name AS users_name FROM users", - use_default_dialect=True) + self.assert_compile( + q, + "SELECT users.id AS users_id, users.name AS users_name FROM users", + use_default_dialect=True) # after False was set, this should pass q._no_select_modifiers("foo") @@ -707,8 +735,8 @@ class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): s = create_session() for meth, arg, kw in [ - (Query.filter, (User.id==5,), {}), - (Query.filter_by, (), {'id':5}), + (Query.filter, (User.id == 5,), {}), + (Query.filter_by, (), {'id': 5}), (Query.limit, (5, ), {}), (Query.group_by, (User.name,), {}), (Query.order_by, (User.name,), {}) @@ -781,23 +809,24 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): (operators.le, '<=', '>='), (operators.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( - ('a', User.id, ':id_1', 'users.id'), - ('a', literal('b'), ':param_2', ':param_1'), # note swap! - (User.id, 'b', 'users.id', ':id_1'), - (User.id, literal('b'), 'users.id', ':param_1'), - (User.id, User.id, 'users.id', 'users.id'), - (literal('a'), 'b', ':param_1', ':param_2'), - (literal('a'), User.id, ':param_1', 'users.id'), - (literal('a'), literal('b'), ':param_1', ':param_2'), - (ualias.id, literal('b'), 'users_1.id', ':param_1'), - (User.id, ualias.name, 'users.id', 'users_1.name'), - (User.name, ualias.name, 'users.name', 'users_1.name'), - (ualias.name, User.name, 'users_1.name', 'users.name'), - ): + ('a', User.id, ':id_1', 'users.id'), + ('a', literal('b'), ':param_2', ':param_1'), # note swap! + (User.id, 'b', 'users.id', ':id_1'), + (User.id, literal('b'), 'users.id', ':param_1'), + (User.id, User.id, 'users.id', 'users.id'), + (literal('a'), 'b', ':param_1', ':param_2'), + (literal('a'), User.id, ':param_1', 'users.id'), + (literal('a'), literal('b'), ':param_1', ':param_2'), + (ualias.id, literal('b'), 'users_1.id', ':param_1'), + (User.id, ualias.name, 'users.id', 'users_1.name'), + (User.name, ualias.name, 'users.name', 'users_1.name'), + (ualias.name, User.name, 'users_1.name', 'users.name'), + ): # the compiled clause should match either (e.g.): # 'a' < 'b' -or- 'b' > 'a'. - compiled = str(py_op(lhs, rhs).compile(dialect=default.DefaultDialect())) + compiled = str(py_op(lhs, rhs).compile( + dialect=default.DefaultDialect())) fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) @@ -809,16 +838,16 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): User, Address = self.classes.User, self.classes.Address self._test(User.id == None, "users.id IS NULL") - self._test(~(User.id==None), "users.id IS NOT NULL") + self._test(~(User.id == None), "users.id IS NOT NULL") self._test(None == User.id, "users.id IS NULL") self._test(~(None == User.id), "users.id IS NOT NULL") self._test(Address.user == None, "addresses.user_id IS NULL") - self._test(~(Address.user==None), "addresses.user_id IS NOT NULL") + self._test(~(Address.user == None), "addresses.user_id IS NOT NULL") self._test(None == Address.user, "addresses.user_id IS NULL") self._test(~(None == Address.user), "addresses.user_id IS NOT NULL") def test_relationship_unimplemented(self): - User, Address = self.classes.User, self.classes.Address + User = self.classes.User for op in [ User.addresses.like, User.addresses.ilike, @@ -830,12 +859,12 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): def test_relationship(self): User, Address = self.classes.User, self.classes.Address - self._test(User.addresses.any(Address.id==17), - "EXISTS (SELECT 1 " - "FROM addresses " - "WHERE users.id = addresses.user_id AND addresses.id = :id_1)", - entity=User - ) + self._test( + User.addresses.any(Address.id == 17), + "EXISTS (SELECT 1 FROM addresses " + "WHERE users.id = addresses.user_id AND addresses.id = :id_1)", + entity=User + ) u7 = User(id=7) attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) @@ -849,7 +878,6 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): self._test(Address.user != None, "addresses.user_id IS NOT NULL") - def test_selfref_relationship(self): Node = self.classes.Node @@ -858,9 +886,9 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): # auto self-referential aliasing self._test( - Node.children.any(Node.data=='n1'), - "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " - "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", + Node.children.any(Node.data == 'n1'), + "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " + "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", entity=Node ) @@ -868,7 +896,7 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): self._test( Node.children == None, "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 " - "WHERE nodes.id = nodes_1.parent_id))", + "WHERE nodes.id = nodes_1.parent_id))", entity=Node ) @@ -884,15 +912,16 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): self._test( nalias.children == None, - "NOT (EXISTS (SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))", + "NOT (EXISTS (" + "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))", entity=nalias ) self._test( - nalias.children.any(Node.data=='some data'), - "EXISTS (SELECT 1 FROM nodes WHERE " - "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)", - entity=nalias) + nalias.children.any(Node.data == 'some data'), + "EXISTS (SELECT 1 FROM nodes WHERE " + "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)", + entity=nalias) # this fails because self-referential any() is auto-aliasing; # the fact that we use "nalias" here means we get two aliases. @@ -905,15 +934,14 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): self._test( nalias.parent.has(Node.data == 'some data'), - "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id " + "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id " "AND nodes.data = :data_1)", entity=nalias ) - self._test( Node.parent.has(Node.data == 'some data'), - "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " + "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " "nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)", entity=Node ) @@ -943,12 +971,13 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): sess = Session() q = sess.query(User).filter( - User.addresses.any( - and_(Address.id == Dingaling.address_id, - Dingaling.data == 'x'))) + User.addresses.any( + and_(Address.id == Dingaling.address_id, + Dingaling.data == 'x'))) # new since #2746 - correlate_except() now takes context into account # so its usage in any() is not as disrupting. - self.assert_compile(q, + self.assert_compile( + q, "SELECT users.id AS users_id, users.name AS users_name " "FROM users " "WHERE EXISTS (SELECT 1 " @@ -966,8 +995,7 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): def test_in(self): User = self.classes.User - self._test(User.id.in_(['a', 'b']), - "users.id IN (:id_1, :id_2)") + self._test(User.id.in_(['a', 'b']), "users.id IN (:id_1, :id_2)") def test_in_on_relationship_not_supported(self): User, Address = self.classes.User, self.classes.Address @@ -983,24 +1011,26 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): def test_between(self): User = self.classes.User - self._test(User.id.between('a', 'b'), - "users.id BETWEEN :id_1 AND :id_2") + self._test( + User.id.between('a', 'b'), "users.id BETWEEN :id_1 AND :id_2") def test_collate(self): User = self.classes.User - self._test(collate(User.id, 'binary'), - "users.id COLLATE binary") + self._test(collate(User.id, 'binary'), "users.id COLLATE binary") - self._test(User.id.collate('binary'), - "users.id COLLATE binary") + self._test(User.id.collate('binary'), "users.id COLLATE binary") def test_selfref_between(self): User = self.classes.User ualias = aliased(User) - self._test(User.id.between(ualias.id, ualias.id), "users.id BETWEEN users_1.id AND users_1.id") - self._test(ualias.id.between(User.id, User.id), "users_1.id BETWEEN users.id AND users.id") + self._test( + User.id.between(ualias.id, ualias.id), + "users.id BETWEEN users_1.id AND users_1.id") + self._test( + ualias.id.between(User.id, User.id), + "users_1.id BETWEEN users.id AND users.id") def test_clauses(self): User, Address = self.classes.User, self.classes.Address @@ -1008,9 +1038,11 @@ class OperatorTest(QueryTest, AssertsCompiledSQL): for (expr, compare) in ( (func.max(User.id), "max(users.id)"), (User.id.desc(), "users.id DESC"), - (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"), - # this one would require adding compile() to InstrumentedScalarAttribute. do we want this ? - #(User.id, "users.id") + (between(5, User.id, Address.id), + ":param_1 BETWEEN users.id AND addresses.id"), + # this one would require adding compile() to + # InstrumentedScalarAttribute. do we want this ? + # (User.id, "users.id") ): c = expr.compile(dialect=default.DefaultDialect()) assert str(c) == compare, "%s != %s" % (str(c), compare) @@ -1025,10 +1057,12 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): self.classes.Address) session = create_session() - s = session.query(User).filter(and_(addresses.c.email_address == bindparam('emailad'), - Address.user_id==User.id)).statement + s = session.query(User).filter( + and_(addresses.c.email_address == bindparam('emailad'), + Address.user_id == User.id)).statement - l = list(session.query(User).instances(s.execute(emailad = 'jack@bean.com'))) + l = list( + session.query(User).instances(s.execute(emailad='jack@bean.com'))) eq_([User(id=7)], l) def test_aliased_sql_construct(self): @@ -1063,9 +1097,9 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() - q = session.query(User.id).filter(User.id==7) + q = session.query(User.id).filter(User.id == 7) - q = session.query(Address).filter(Address.user_id==q) + q = session.query(Address).filter(Address.user_id == q) assert isinstance(q._criterion.right, expression.ColumnElement) self.assert_compile( q, @@ -1080,9 +1114,9 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): User = self.classes.User session = create_session() - a1 = session.query(User.id).filter(User.id==7).subquery('foo1') - a2 = session.query(User.id).filter(User.id==7).subquery(name='foo2') - a3 = session.query(User.id).filter(User.id==7).subquery() + a1 = session.query(User.id).filter(User.id == 7).subquery('foo1') + a2 = session.query(User.id).filter(User.id == 7).subquery(name='foo2') + a3 = session.query(User.id).filter(User.id == 7).subquery() eq_(a1.name, 'foo1') eq_(a2.name, 'foo2') @@ -1092,7 +1126,8 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): User = self.classes.User session = create_session() - a1 = session.query(User.id).filter(User.id == 7).subquery(with_labels=True) + a1 = session.query(User.id).filter(User.id == 7). \ + subquery(with_labels=True) assert a1.c.users_id is not None def test_reduced_subquery(self): @@ -1101,7 +1136,7 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() a1 = session.query(User.id, ua.id, ua.name).\ - filter(User.id == ua.id).subquery(reduce_columns=True) + filter(User.id == ua.id).subquery(reduce_columns=True) self.assert_compile(a1, "SELECT users.id, users_1.name FROM " "users, users AS users_1 WHERE users.id = users_1.id") @@ -1111,7 +1146,7 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() - q = session.query(User.id).filter(User.id==7).label('foo') + q = session.query(User.id).filter(User.id == 7).label('foo') self.assert_compile( session.query(q), "SELECT (SELECT users.id FROM users WHERE users.id = :id_1) AS foo" @@ -1122,7 +1157,7 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() - q = session.query(User.id).filter(User.id==7).as_scalar() + q = session.query(User.id).filter(User.id == 7).as_scalar() self.assert_compile(session.query(User).filter(User.id.in_(q)), 'SELECT users.id AS users_id, users.name ' @@ -1136,7 +1171,7 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() q = session.query(User.id).filter(User.id == bindparam('foo')).\ - params(foo=7).subquery() + params(foo=7).subquery() q = session.query(User).filter(User.id.in_(q)) @@ -1147,22 +1182,19 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): session = create_session() s = session.query(User.id).join(User.addresses).group_by(User.id).\ - having(func.count(Address.id) > 2) - eq_( - session.query(User).filter(User.id.in_(s)).all(), - [User(id=8)] - ) + having(func.count(Address.id) > 2) + eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)]) def test_union(self): User = self.classes.User s = create_session() - q1 = s.query(User).filter(User.name=='ed').with_labels() - q2 = s.query(User).filter(User.name=='fred').with_labels() + q1 = s.query(User).filter(User.name == 'ed').with_labels() + q2 = s.query(User).filter(User.name == 'fred').with_labels() eq_( - s.query(User).from_statement(union(q1, q2).order_by('users_name')).all(), - [User(name='ed'), User(name='fred')] + s.query(User).from_statement(union(q1, q2). + order_by('users_name')).all(), [User(name='ed'), User(name='fred')] ) def test_select(self): @@ -1170,9 +1202,9 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): s = create_session() - # this is actually not legal on most DBs since the subquery has no alias - q1 = s.query(User).filter(User.name=='ed') - + # this is actually not legal on most DBs since the subquery has no + # alias + q1 = s.query(User).filter(User.name == 'ed') self.assert_compile( select([q1]), @@ -1185,23 +1217,28 @@ class ExpressionTest(QueryTest, AssertsCompiledSQL): s = create_session() - # TODO: do we want aliased() to detect a query and convert to subquery() - # automatically ? - q1 = s.query(Address).filter(Address.email_address=='jack@bean.com') + # TODO: do we want aliased() to detect a query and convert to + # subquery() automatically ? + q1 = s.query(Address).filter(Address.email_address == 'jack@bean.com') adalias = aliased(Address, q1.subquery()) eq_( - s.query(User, adalias).join(adalias, User.id==adalias.user_id).all(), - [(User(id=7,name='jack'), Address(email_address='jack@bean.com',user_id=7,id=1))] - ) + s.query(User, adalias).join(adalias, User.id == adalias.user_id). + all(), + [ + ( + User(id=7, name='jack'), + Address(email_address='jack@bean.com', user_id=7, id=1))]) + # more slice tests are available in test/orm/generative.py class SliceTest(QueryTest): def test_first(self): User = self.classes.User - assert User(id=7) == create_session().query(User).first() + assert User(id=7) == create_session().query(User).first() - assert create_session().query(User).filter(User.id==27).first() is None + assert create_session().query(User).filter(User.id == 27). \ + first() is None @testing.only_on('sqlite', 'testing execution but db-specific syntax') def test_limit_offset_applies(self): @@ -1218,34 +1255,48 @@ class SliceTest(QueryTest): sess = create_session() q = sess.query(User) - self.assert_sql(testing.db, lambda: q[10:20], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':10, 'param_2':10}) - ]) - - self.assert_sql(testing.db, lambda: q[:20], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':20, 'param_2':0}) - ]) - - self.assert_sql(testing.db, lambda: q[5:], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users LIMIT :param_1 OFFSET :param_2", {'param_1':-1, 'param_2':5}) - ]) + self.assert_sql( + testing.db, lambda: q[10:20], [ + ( + "SELECT users.id AS users_id, users.name " + "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", + {'param_1': 10, 'param_2': 10})]) + + self.assert_sql( + testing.db, lambda: q[:20], [ + ( + "SELECT users.id AS users_id, users.name " + "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", + {'param_1': 20, 'param_2': 0})]) + + self.assert_sql( + testing.db, lambda: q[5:], [ + ( + "SELECT users.id AS users_id, users.name " + "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", + {'param_1': -1, 'param_2': 5})]) self.assert_sql(testing.db, lambda: q[2:2], []) self.assert_sql(testing.db, lambda: q[-2:-5], []) - self.assert_sql(testing.db, lambda: q[-5:-2], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users", {}) - ]) + self.assert_sql( + testing.db, lambda: q[-5:-2], [ + ( + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users", {})]) - self.assert_sql(testing.db, lambda: q[-5:], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users", {}) - ]) - - self.assert_sql(testing.db, lambda: q[:], [ - ("SELECT users.id AS users_id, users.name AS users_name FROM users", {}) - ]) + self.assert_sql( + testing.db, lambda: q[-5:], [ + ( + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users", {})]) + self.assert_sql( + testing.db, lambda: q[:], [ + ( + "SELECT users.id AS users_id, users.name AS users_name " + "FROM users", {})]) class FilterTest(QueryTest, AssertsCompiledSQL): @@ -1255,10 +1306,7 @@ class FilterTest(QueryTest, AssertsCompiledSQL): User = self.classes.User users = create_session().query(User).all() - eq_( - [User(id=7), User(id=8), User(id=9),User(id=10)], - users - ) + eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users) @testing.requires.offset def test_limit_offset(self): @@ -1266,9 +1314,11 @@ class FilterTest(QueryTest, AssertsCompiledSQL): sess = create_session() - assert [User(id=8), User(id=9)] == sess.query(User).order_by(User.id).limit(2).offset(1).all() + assert [User(id=8), User(id=9)] == \ + sess.query(User).order_by(User.id).limit(2).offset(1).all() - assert [User(id=8), User(id=9)] == list(sess.query(User).order_by(User.id)[1:3]) + assert [User(id=8), User(id=9)] == \ + list(sess.query(User).order_by(User.id)[1:3]) assert User(id=8) == sess.query(User).order_by(User.id)[1] @@ -1280,24 +1330,21 @@ class FilterTest(QueryTest, AssertsCompiledSQL): User = self.classes.User sess = create_session() q1 = sess.query(self.classes.User).\ - order_by(self.classes.User.id).limit(bindparam('n')) + order_by(self.classes.User.id).limit(bindparam('n')) for n in range(1, 4): result = q1.params(n=n).all() eq_(len(result), n) eq_( - sess.query(User).order_by(User.id). - limit(bindparam('limit')). - offset(bindparam('offset')). - params(limit=2, offset=1).all(), + sess.query(User).order_by(User.id).limit(bindparam('limit')). + offset(bindparam('offset')).params(limit=2, offset=1).all(), [User(id=8), User(id=9)] ) eq_( list( - sess.query(User).params(a=1, b=3). - order_by(User.id)[bindparam('a'):bindparam('b')] - ), + sess.query(User).params(a=1, b=3).order_by(User.id) + [cast(bindparam('a'), Integer):cast(bindparam('b'), Integer)]), [User(id=8), User(id=9)] ) @@ -1307,23 +1354,24 @@ class FilterTest(QueryTest, AssertsCompiledSQL): sess = create_session(testing.db) - assert sess.query(exists().where(User.id==9)).scalar() - assert not sess.query(exists().where(User.id==29)).scalar() + assert sess.query(exists().where(User.id == 9)).scalar() + assert not sess.query(exists().where(User.id == 29)).scalar() def test_one_filter(self): User = self.classes.User - assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all() + assert [User(id=8), User(id=9)] == \ + create_session().query(User).filter(User.name.endswith('ed')).all() def test_contains(self): """test comparing a collection to an object instance.""" User, Address = self.classes.User, self.classes.Address - sess = create_session() address = sess.query(Address).get(3) - assert [User(id=8)] == sess.query(User).filter(User.addresses.contains(address)).all() + assert [User(id=8)] == \ + sess.query(User).filter(User.addresses.contains(address)).all() try: sess.query(User).filter(User.addresses == address) @@ -1331,15 +1379,18 @@ class FilterTest(QueryTest, AssertsCompiledSQL): except sa_exc.InvalidRequestError: assert True - assert [User(id=10)] == sess.query(User).filter(User.addresses==None).all() + assert [User(id=10)] == \ + sess.query(User).filter(User.addresses == None).all() try: - assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() + assert [User(id=7), User(id=9), User(id=10)] == \ + sess.query(User).filter(User.addresses != address).all() assert False except sa_exc.InvalidRequestError: assert True - #assert [User(id=7), User(id=9), User(id=10)] == sess.query(User).filter(User.addresses!=address).all() + # assert [User(id=7), User(id=9), User(id=10)] == + # sess.query(User).filter(User.addresses!=address).all() def test_clause_element_ok(self): User = self.classes.User @@ -1351,7 +1402,8 @@ class FilterTest(QueryTest, AssertsCompiledSQL): ) def test_unique_binds_join_cond(self): - """test that binds used when the lazyclause is used in criterion are unique""" + """test that binds used when the lazyclause is used in criterion are + unique""" User, Address = self.classes.User, self.classes.Address sess = Session() @@ -1365,7 +1417,7 @@ class FilterTest(QueryTest, AssertsCompiledSQL): "users.name AS users_name FROM users WHERE users.id = :param_1 " "UNION SELECT users.id AS users_id, users.name AS users_name " "FROM users WHERE users.id = :param_2) AS anon_1", - checkparams = {'param_1': 7, 'param_2': 8} + checkparams={'param_1': 7, 'param_2': 8} ) def test_any(self): @@ -1373,47 +1425,78 @@ class FilterTest(QueryTest, AssertsCompiledSQL): sess = create_session() - assert [User(id=8), User(id=9)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).all() + assert [User(id=8), User(id=9)] == \ + sess.query(User). \ + filter( + User.addresses.any(Address.email_address.like('%ed%'))).all() - assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'), id=4)).all() + assert [User(id=8)] == \ + sess.query(User). \ + filter( + User.addresses.any( + Address.email_address.like('%ed%'), id=4)).all() - assert [User(id=8)] == sess.query(User).filter(User.addresses.any(Address.email_address.like('%ed%'))).\ + assert [User(id=8)] == \ + sess.query(User). \ + filter(User.addresses.any(Address.email_address.like('%ed%'))).\ filter(User.addresses.any(id=4)).all() - assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all() + assert [User(id=9)] == \ + sess.query(User). \ + filter(User.addresses.any(email_address='fred@fred.com')).all() # test that any() doesn't overcorrelate - assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all() + assert [User(id=7), User(id=8)] == \ + sess.query(User).join("addresses"). \ + filter( + ~User.addresses.any( + Address.email_address == 'fred@fred.com')).all() # test that the contents are not adapted by the aliased join - assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all() + assert [User(id=7), User(id=8)] == \ + sess.query(User).join("addresses", aliased=True). \ + filter( + ~User.addresses.any( + Address.email_address == 'fred@fred.com')).all() - assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all() + assert [User(id=10)] == \ + sess.query(User).outerjoin("addresses", aliased=True). \ + filter(~User.addresses.any()).all() def test_has(self): - Dingaling, User, Address = (self.classes.Dingaling, - self.classes.User, - self.classes.Address) + Dingaling, User, Address = ( + self.classes.Dingaling, self.classes.User, self.classes.Address) sess = create_session() - assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all() + assert [Address(id=5)] == \ + sess.query(Address).filter(Address.user.has(name='fred')).all() - assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == \ - sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).order_by(Address.id).all() + assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] \ + == sess.query(Address). \ + filter(Address.user.has(User.name.like('%ed%'))). \ + order_by(Address.id).all() assert [Address(id=2), Address(id=3), Address(id=4)] == \ - sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all() + sess.query(Address). \ + filter(Address.user.has(User.name.like('%ed%'), id=8)). \ + order_by(Address.id).all() # test has() doesn't overcorrelate assert [Address(id=2), Address(id=3), Address(id=4)] == \ - sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all() + sess.query(Address).join("user"). \ + filter(Address.user.has(User.name.like('%ed%'), id=8)). \ + order_by(Address.id).all() # test has() doesn't get subquery contents adapted by aliased join assert [Address(id=2), Address(id=3), Address(id=4)] == \ - sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all() + sess.query(Address).join("user", aliased=True). \ + filter(Address.user.has(User.name.like('%ed%'), id=8)). \ + order_by(Address.id).all() dingaling = sess.query(Dingaling).get(2) - assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all() + assert [User(id=9)] == \ + sess.query(User). \ + filter(User.addresses.any(Address.dingaling == dingaling)).all() def test_contains_m2m(self): Item, Order = self.classes.Item, self.classes.Order @@ -1439,52 +1522,58 @@ class FilterTest(QueryTest, AssertsCompiledSQL): [Order(id=3)] ) - def test_comparison(self): """test scalar comparison to an object instance""" - Item, Order, Dingaling, User, Address = (self.classes.Item, - self.classes.Order, - self.classes.Dingaling, - self.classes.User, - self.classes.Address) - + Item, Order, Dingaling, User, Address = ( + self.classes.Item, self.classes.Order, self.classes.Dingaling, + self.classes.User, self.classes.Address) sess = create_session() user = sess.query(User).get(8) - assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user==user).all() + assert [Address(id=2), Address(id=3), Address(id=4)] == \ + sess.query(Address).filter(Address.user == user).all() - assert [Address(id=1), Address(id=5)] == sess.query(Address).filter(Address.user!=user).all() + assert [Address(id=1), Address(id=5)] == \ + sess.query(Address).filter(Address.user != user).all() # generates an IS NULL assert [] == sess.query(Address).filter(Address.user == None).all() assert [] == sess.query(Address).filter(Address.user == null()).all() - assert [Order(id=5)] == sess.query(Order).filter(Order.address == None).all() + assert [Order(id=5)] == \ + sess.query(Order).filter(Order.address == None).all() # o2o dingaling = sess.query(Dingaling).get(2) - assert [Address(id=5)] == sess.query(Address).filter(Address.dingaling==dingaling).all() + assert [Address(id=5)] == \ + sess.query(Address).filter(Address.dingaling == dingaling).all() # m2m - eq_(sess.query(Item).filter(Item.keywords==None).order_by(Item.id).all(), [Item(id=4), Item(id=5)]) - eq_(sess.query(Item).filter(Item.keywords!=None).order_by(Item.id).all(), [Item(id=1),Item(id=2), Item(id=3)]) - + eq_( + sess.query(Item).filter(Item.keywords == None). + order_by(Item.id).all(), [Item(id=4), Item(id=5)]) + eq_( + sess.query(Item).filter(Item.keywords != None). + order_by(Item.id).all(), [Item(id=1), Item(id=2), Item(id=3)]) def test_filter_by(self): User, Address = self.classes.User, self.classes.Address sess = create_session() user = sess.query(User).get(8) - assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter_by(user=user).all() + assert [Address(id=2), Address(id=3), Address(id=4)] == \ + sess.query(Address).filter_by(user=user).all() # many to one generates IS NULL - assert [] == sess.query(Address).filter_by(user = None).all() - assert [] == sess.query(Address).filter_by(user = null()).all() + assert [] == sess.query(Address).filter_by(user=None).all() + assert [] == sess.query(Address).filter_by(user=null()).all() # one to many generates WHERE NOT EXISTS - assert [User(name='chuck')] == sess.query(User).filter_by(addresses = None).all() - assert [User(name='chuck')] == sess.query(User).filter_by(addresses = null()).all() + assert [User(name='chuck')] == \ + sess.query(User).filter_by(addresses=None).all() + assert [User(name='chuck')] == \ + sess.query(User).filter_by(addresses=null()).all() def test_filter_by_tables(self): @@ -1492,10 +1581,9 @@ class FilterTest(QueryTest, AssertsCompiledSQL): addresses = self.tables.addresses sess = create_session() self.assert_compile( - sess.query(users).\ - filter_by(name='ed').\ - join(addresses, users.c.id==addresses.c.user_id).\ - filter_by(email_address='ed@ed.com'), + sess.query(users).filter_by(name='ed'). + join(addresses, users.c.id == addresses.c.user_id). + filter_by(email_address='ed@ed.com'), "SELECT users.id AS users_id, users.name AS users_name " "FROM users JOIN addresses ON users.id = addresses.user_id " "WHERE users.name = :name_1 AND " @@ -1509,42 +1597,60 @@ class FilterTest(QueryTest, AssertsCompiledSQL): assert_raises_message( sa.exc.InvalidRequestError, "Entity 'addresses' has no property 'name'", - sess.query(addresses).\ - filter_by, name='ed' + sess.query(addresses).filter_by, name='ed' ) def test_none_comparison(self): - Order, User, Address = (self.classes.Order, - self.classes.User, - self.classes.Address) + Order, User, Address = ( + self.classes.Order, self.classes.User, self.classes.Address) sess = create_session() # scalar eq_( [Order(description="order 5")], - sess.query(Order).filter(Order.address_id==None).all() + sess.query(Order).filter(Order.address_id == None).all() ) eq_( [Order(description="order 5")], - sess.query(Order).filter(Order.address_id==null()).all() + sess.query(Order).filter(Order.address_id == null()).all() ) # o2o - eq_([Address(id=1), Address(id=3), Address(id=4)], - sess.query(Address).filter(Address.dingaling==None).order_by(Address.id).all()) - eq_([Address(id=1), Address(id=3), Address(id=4)], - sess.query(Address).filter(Address.dingaling==null()).order_by(Address.id).all()) - eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != None).order_by(Address.id).all()) - eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != null()).order_by(Address.id).all()) + eq_( + [Address(id=1), Address(id=3), Address(id=4)], + sess.query(Address).filter(Address.dingaling == None). + order_by(Address.id).all()) + eq_( + [Address(id=1), Address(id=3), Address(id=4)], + sess.query(Address).filter(Address.dingaling == null()). + order_by(Address.id).all()) + eq_( + [Address(id=2), Address(id=5)], + sess.query(Address).filter(Address.dingaling != None). + order_by(Address.id).all()) + eq_( + [Address(id=2), Address(id=5)], + sess.query(Address).filter(Address.dingaling != null()). + order_by(Address.id).all()) # m2o - eq_([Order(id=5)], sess.query(Order).filter(Order.address==None).all()) - eq_([Order(id=1), Order(id=2), Order(id=3), Order(id=4)], sess.query(Order).order_by(Order.id).filter(Order.address!=None).all()) + eq_( + [Order(id=5)], + sess.query(Order).filter(Order.address == None).all()) + eq_( + [Order(id=1), Order(id=2), Order(id=3), Order(id=4)], + sess.query(Order).order_by(Order.id). + filter(Order.address != None).all()) # o2m - eq_([User(id=10)], sess.query(User).filter(User.addresses==None).all()) - eq_([User(id=7),User(id=8),User(id=9)], sess.query(User).filter(User.addresses!=None).order_by(User.id).all()) + eq_( + [User(id=10)], + sess.query(User).filter(User.addresses == None).all()) + eq_( + [User(id=7), User(id=8), User(id=9)], + sess.query(User).filter(User.addresses != None). + order_by(User.id).all()) def test_blank_filter_by(self): User = self.classes.User @@ -1555,7 +1661,8 @@ class FilterTest(QueryTest, AssertsCompiledSQL): ) eq_( [(7,), (8,), (9,), (10,)], - create_session().query(User.id).filter_by(**{}).order_by(User.id).all() + create_session().query(User.id).filter_by(**{}). + order_by(User.id).all() ) def test_text_coerce(self): @@ -1567,6 +1674,7 @@ class FilterTest(QueryTest, AssertsCompiledSQL): "AS users_name FROM users WHERE name='ed'" ) + class SetOpsTest(QueryTest, AssertsCompiledSQL): __dialect__ = 'default' @@ -1575,15 +1683,17 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): s = create_session() - fred = s.query(User).filter(User.name=='fred') - ed = s.query(User).filter(User.name=='ed') - jack = s.query(User).filter(User.name=='jack') + fred = s.query(User).filter(User.name == 'fred') + ed = s.query(User).filter(User.name == 'ed') + jack = s.query(User).filter(User.name == 'jack') - eq_(fred.union(ed).order_by(User.name).all(), + eq_( + fred.union(ed).order_by(User.name).all(), [User(name='ed'), User(name='fred')] ) - eq_(fred.union(ed, jack).order_by(User.name).all(), + eq_( + fred.union(ed, jack).order_by(User.name).all(), [User(name='ed'), User(name='fred'), User(name='jack')] ) @@ -1592,12 +1702,11 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): User, Address = self.classes.User, self.classes.Address - s = create_session() q1 = s.query(User, Address).join(User.addresses).\ - filter(Address.email_address=="ed@wood.com") + filter(Address.email_address == "ed@wood.com") q2 = s.query(User, Address).join(User.addresses).\ - filter(Address.email_address=="jack@bean.com") + filter(Address.email_address == "jack@bean.com") q3 = q1.union(q2).order_by(User.name) eq_( @@ -1614,7 +1723,6 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): User = self.classes.User - s = Session() q1 = s.query(User, literal("x")) q2 = s.query(User, literal_column("'y'")) @@ -1622,9 +1730,12 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): self.assert_compile( q3, - "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS anon_1_users_name," - " anon_1.param_1 AS anon_1_param_1 FROM (SELECT users.id AS users_id, users.name AS" - " users_name, :param_1 AS param_1 FROM users UNION SELECT users.id AS users_id, " + "SELECT anon_1.users_id AS anon_1_users_id, " + "anon_1.users_name AS anon_1_users_name, " + "anon_1.param_1 AS anon_1_param_1 " + "FROM (SELECT users.id AS users_id, users.name AS " + "users_name, :param_1 AS param_1 " + "FROM users UNION SELECT users.id AS users_id, " "users.name AS users_name, 'y' FROM users) AS anon_1" ) @@ -1646,8 +1757,11 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): ['User', 'foo'] ) - for q in (q3.order_by(User.id, "anon_1_param_1"), q6.order_by(User.id, "foo")): - eq_(q.all(), + for q in ( + q3.order_by(User.id, "anon_1_param_1"), + q6.order_by(User.id, "foo")): + eq_( + q.all(), [ (User(id=7, name='jack'), 'x'), (User(id=7, name='jack'), 'y'), @@ -1702,7 +1816,8 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " - "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id " + "users_name, c1 AS foo, c2 AS bar " + "FROM users UNION SELECT users.id " "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " "FROM users) AS anon_1 ORDER BY anon_1.foo" ) @@ -1712,7 +1827,8 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " - "users_name, c1 AS foo, c2 AS bar FROM users UNION SELECT users.id " + "users_name, c1 AS foo, c2 AS bar " + "FROM users UNION SELECT users.id " "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " "FROM users) AS anon_1 ORDER BY anon_1.foo" ) @@ -1739,40 +1855,35 @@ class SetOpsTest(QueryTest, AssertsCompiledSQL): ['name'] ) - @testing.requires.intersect def test_intersect(self): User = self.classes.User s = create_session() - fred = s.query(User).filter(User.name=='fred') - ed = s.query(User).filter(User.name=='ed') - jack = s.query(User).filter(User.name=='jack') - eq_(fred.intersect(ed, jack).all(), - [] - ) + fred = s.query(User).filter(User.name == 'fred') + ed = s.query(User).filter(User.name == 'ed') + jack = s.query(User).filter(User.name == 'jack') + eq_(fred.intersect(ed, jack).all(), []) - eq_(fred.union(ed).intersect(ed.union(jack)).all(), - [User(name='ed')] - ) + eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name='ed')]) def test_eager_load(self): User, Address = self.classes.User, self.classes.Address s = create_session() - fred = s.query(User).filter(User.name=='fred') - ed = s.query(User).filter(User.name=='ed') - jack = s.query(User).filter(User.name=='jack') + fred = s.query(User).filter(User.name == 'fred') + ed = s.query(User).filter(User.name == 'ed') def go(): eq_( - fred.union(ed).order_by(User.name).options(joinedload(User.addresses)).all(), - [ - User(name='ed', addresses=[Address(), Address(), Address()]), - User(name='fred', addresses=[Address()]) - ] + fred.union(ed).order_by(User.name). + options(joinedload(User.addresses)).all(), [ + User( + name='ed', addresses=[Address(), Address(), + Address()]), + User(name='fred', addresses=[Address()])] ) self.assert_sql_count(testing.db, go, 1) @@ -1784,22 +1895,29 @@ class AggregateTest(QueryTest): sess = create_session() orders = sess.query(Order).filter(Order.id.in_([2, 3, 4])) - eq_(next(orders.values(func.sum(Order.user_id * Order.address_id))), (79,)) + eq_( + next(orders.values(func.sum(Order.user_id * Order.address_id))), + (79,)) eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79) def test_apply(self): Order = self.classes.Order sess = create_session() - assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(Order.id.in_([2, 3, 4])).one() == (79,) + assert sess.query(func.sum(Order.user_id * Order.address_id)). \ + filter(Order.id.in_([2, 3, 4])).one() == (79,) def test_having(self): User, Address = self.classes.User, self.classes.Address sess = create_session() - assert [User(name='ed',id=8)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)> 2).all() + assert [User(name='ed', id=8)] == \ + sess.query(User).order_by(User.id).group_by(User). \ + join('addresses').having(func.count(Address.id) > 2).all() - assert [User(name='jack',id=7), User(name='fred',id=9)] == sess.query(User).order_by(User.id).group_by(User).join('addresses').having(func.count(Address.id)< 2).all() + assert [User(name='jack', id=7), User(name='fred', id=9)] == \ + sess.query(User).order_by(User.id).group_by(User). \ + join('addresses').having(func.count(Address.id) < 2).all() class ExistsTest(QueryTest, AssertsCompiledSQL): @@ -1810,16 +1928,18 @@ class ExistsTest(QueryTest, AssertsCompiledSQL): sess = create_session() q1 = sess.query(User) - self.assert_compile(sess.query(q1.exists()), + self.assert_compile( + sess.query(q1.exists()), 'SELECT EXISTS (' - 'SELECT 1 FROM users' + 'SELECT 1 FROM users' ') AS anon_1' ) q2 = sess.query(User).filter(User.name == 'fred') - self.assert_compile(sess.query(q2.exists()), + self.assert_compile( + sess.query(q2.exists()), 'SELECT EXISTS (' - 'SELECT 1 FROM users WHERE users.name = :name_1' + 'SELECT 1 FROM users WHERE users.name = :name_1' ') AS anon_1' ) @@ -1829,10 +1949,11 @@ class ExistsTest(QueryTest, AssertsCompiledSQL): sess = create_session() q1 = sess.query(User, Address).filter(User.id == Address.user_id) - self.assert_compile(sess.query(q1.exists()), + self.assert_compile( + sess.query(q1.exists()), 'SELECT EXISTS (' - 'SELECT 1 FROM users, addresses ' - 'WHERE users.id = addresses.user_id' + 'SELECT 1 FROM users, addresses ' + 'WHERE users.id = addresses.user_id' ') AS anon_1' ) @@ -1841,10 +1962,9 @@ class ExistsTest(QueryTest, AssertsCompiledSQL): sess = create_session() q1 = sess.query().select_from(User).exists() - self.assert_compile(sess.query(q1), - 'SELECT EXISTS (' - 'SELECT 1 FROM users' - ') AS anon_1' + self.assert_compile( + sess.query(q1), + 'SELECT EXISTS (SELECT 1 FROM users) AS anon_1' ) @@ -1866,14 +1986,11 @@ class CountTest(QueryTest): # rumors about Oracle preferring count(1) don't appear # to be well founded. self.assert_sql_execution( - testing.db, - s.query(User).count, - CompiledSQL( - "SELECT count(*) AS count_1 FROM " - "(SELECT users.id AS users_id, users.name " - "AS users_name FROM users) AS anon_1", - {} - ) + testing.db, s.query(User).count, CompiledSQL( + "SELECT count(*) AS count_1 FROM " + "(SELECT users.id AS users_id, users.name " + "AS users_name FROM users) AS anon_1", {} + ) ) def test_multiple_entity(self): @@ -1904,7 +2021,6 @@ class CountTest(QueryTest): User, Address = self.classes.User, self.classes.Address - s = create_session() q = s.query(func.count(distinct(User.name))) @@ -1929,29 +2045,33 @@ class DistinctTest(QueryTest): User = self.classes.User eq_( - [User(id=7), User(id=8), User(id=9),User(id=10)], + [User(id=7), User(id=8), User(id=9), User(id=10)], create_session().query(User).order_by(User.id).distinct().all() ) eq_( - [User(id=7), User(id=9), User(id=8),User(id=10)], - create_session().query(User).distinct().order_by(desc(User.name)).all() + [User(id=7), User(id=9), User(id=8), User(id=10)], + create_session().query(User).distinct(). + order_by(desc(User.name)).all() ) def test_joined(self): - """test that orderbys from a joined table get placed into the columns clause when DISTINCT is used""" + """test that orderbys from a joined table get placed into the columns + clause when DISTINCT is used""" User, Address = self.classes.User, self.classes.Address - sess = create_session() - q = sess.query(User).join('addresses').distinct().order_by(desc(Address.email_address)) + q = sess.query(User).join('addresses').distinct(). \ + order_by(desc(Address.email_address)) assert [User(id=7), User(id=9), User(id=8)] == q.all() sess.expunge_all() # test that it works on embedded joinedload/LIMIT subquery - q = sess.query(User).join('addresses').distinct().options(joinedload('addresses')).order_by(desc(Address.email_address)).limit(2) + q = sess.query(User).join('addresses').distinct(). \ + options(joinedload('addresses')).\ + order_by(desc(Address.email_address)).limit(2) def go(): assert [ @@ -1974,20 +2094,15 @@ class PrefixWithTest(QueryTest, AssertsCompiledSQL): .prefix_with('PREFIX_1') expected = "SELECT PREFIX_1 "\ "users.name AS users_name FROM users" - self.assert_compile(query, expected, - dialect=default.DefaultDialect() - ) + self.assert_compile(query, expected, dialect=default.DefaultDialect()) def test_many_prefixes(self): User = self.classes.User sess = create_session() - query = sess.query(User.name)\ - .prefix_with('PREFIX_1', 'PREFIX_2') + query = sess.query(User.name).prefix_with('PREFIX_1', 'PREFIX_2') expected = "SELECT PREFIX_1 PREFIX_2 "\ "users.name AS users_name FROM users" - self.assert_compile(query, expected, - dialect=default.DefaultDialect() - ) + self.assert_compile(query, expected, dialect=default.DefaultDialect()) def test_chained_prefixes(self): User = self.classes.User @@ -1997,9 +2112,7 @@ class PrefixWithTest(QueryTest, AssertsCompiledSQL): .prefix_with('PREFIX_2', 'PREFIX_3') expected = "SELECT PREFIX_1 PREFIX_2 PREFIX_3 "\ "users.name AS users_name FROM users" - self.assert_compile(query, expected, - dialect=default.DefaultDialect() - ) + self.assert_compile(query, expected, dialect=default.DefaultDialect()) class YieldTest(QueryTest): @@ -2007,7 +2120,9 @@ class YieldTest(QueryTest): User = self.classes.User sess = create_session() - q = iter(sess.query(User).yield_per(1).from_statement("select * from users")) + q = iter( + sess.query(User).yield_per(1).from_statement( + "select * from users")) ret = [] eq_(len(sess.identity_map), 0) @@ -2033,7 +2148,6 @@ class YieldTest(QueryTest): eq_(q._execution_options, {"stream_results": True, "foo": "bar"}) - class HintsTest(QueryTest, AssertsCompiledSQL): def test_hints(self): User = self.classes.User @@ -2044,28 +2158,30 @@ class HintsTest(QueryTest, AssertsCompiledSQL): sess = create_session() self.assert_compile( - sess.query(User).with_hint(User, 'USE INDEX (col1_index,col2_index)'), + sess.query(User).with_hint( + User, 'USE INDEX (col1_index,col2_index)'), "SELECT users.id AS users_id, users.name AS users_name " "FROM users USE INDEX (col1_index,col2_index)", dialect=dialect ) self.assert_compile( - sess.query(User).with_hint(User, 'WITH INDEX col1_index', 'sybase'), + sess.query(User).with_hint( + User, 'WITH INDEX col1_index', 'sybase'), "SELECT users.id AS users_id, users.name AS users_name " - "FROM users", - dialect=dialect + "FROM users", dialect=dialect ) ualias = aliased(User) self.assert_compile( - sess.query(User, ualias).with_hint(ualias, 'USE INDEX (col1_index,col2_index)'). - join(ualias, ualias.id > User.id), + sess.query(User, ualias).with_hint( + ualias, 'USE INDEX (col1_index,col2_index)'). + join(ualias, ualias.id > User.id), "SELECT users.id AS users_id, users.name AS users_name, " "users_1.id AS users_1_id, users_1.name AS users_1_name " - "FROM users INNER JOIN users AS users_1 USE INDEX (col1_index,col2_index) " - "ON users_1.id > users.id", - dialect=dialect + "FROM users INNER JOIN users AS users_1 " + "USE INDEX (col1_index,col2_index) " + "ON users_1.id > users.id", dialect=dialect ) @@ -2080,16 +2196,12 @@ class TextTest(QueryTest): ) eq_( - create_session().query(User). - from_statement("select * from users order by id").first(), - User(id=7) + create_session().query(User).from_statement( + "select * from users order by id").first(), User(id=7) ) eq_( - create_session().query(User). - from_statement( - "select * from users where name='nonexistent'").first(), - None - ) + create_session().query(User).from_statement( + "select * from users where name='nonexistent'").first(), None) def test_fragment(self): User = self.classes.User @@ -2102,22 +2214,19 @@ class TextTest(QueryTest): eq_( create_session().query(User).filter("name='fred'"). - filter("id=9").all(), - [User(id=9)] + filter("id=9").all(), [User(id=9)] ) eq_( create_session().query(User).filter("name='fred'"). - filter(User.id == 9).all(), - [User(id=9)] + filter(User.id == 9).all(), [User(id=9)] ) def test_binds(self): User = self.classes.User eq_( - create_session().query(User).filter("id in (:id1, :id2)").\ - params(id1=8, id2=9).all(), - [User(id=8), User(id=9)] + create_session().query(User).filter("id in (:id1, :id2)"). + params(id1=8, id2=9).all(), [User(id=8), User(id=9)] ) def test_as_column(self): @@ -2127,8 +2236,9 @@ class TextTest(QueryTest): assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name")) - eq_(s.query(User.id, "name").order_by(User.id).all(), - [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')]) + eq_( + s.query(User.id, "name").order_by(User.id).all(), + [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')]) def test_via_select(self): User = self.classes.User @@ -2146,9 +2256,8 @@ class TextTest(QueryTest): eq_( s.query(User).from_statement( - text("select * from users order by id").\ - columns(id=Integer, name=String) - ).all(), + text("select * from users order by id"). + columns(id=Integer, name=String)).all(), [User(id=7), User(id=8), User(id=9), User(id=10)] ) @@ -2158,9 +2267,8 @@ class TextTest(QueryTest): eq_( s.query(User).from_statement( - text("select * from users order by id").\ - columns(User.id, User.name) - ).all(), + text("select * from users order by id"). + columns(User.id, User.name)).all(), [User(id=7), User(id=8), User(id=9), User(id=10)] ) @@ -2170,19 +2278,18 @@ class TextTest(QueryTest): eq_( s.query(User).select_from( - text("select * from users").\ - columns(id=Integer, name=String) + text("select * from users").columns(id=Integer, name=String) ).order_by(User.id).all(), [User(id=7), User(id=8), User(id=9), User(id=10)] ) + class ParentTest(QueryTest, AssertsCompiledSQL): __dialect__ = 'default' def test_o2m(self): - User, orders, Order = (self.classes.User, - self.tables.orders, - self.classes.Order) + User, orders, Order = ( + self.classes.User, self.tables.orders, self.classes.Order) sess = create_session() q = sess.query(User) @@ -2191,25 +2298,33 @@ class ParentTest(QueryTest, AssertsCompiledSQL): # test auto-lookup of property o = sess.query(Order).with_parent(u1).all() - assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + assert [Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")] == o # test with explicit property o = sess.query(Order).with_parent(u1, property='orders').all() - assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + assert [Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")] == o o = sess.query(Order).with_parent(u1, property=User.orders).all() - assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + assert [Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")] == o o = sess.query(Order).filter(with_parent(u1, User.orders)).all() - assert [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")] == o + assert [ + Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")] == o # test generative criterion - o = sess.query(Order).with_parent(u1).filter(orders.c.id>2).all() - assert [Order(description="order 3"), Order(description="order 5")] == o + o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all() + assert [ + Order(description="order 3"), Order(description="order 5")] == o - # test against None for parent? this can't be done with the current API since we don't know - # what mapper to use - #assert sess.query(Order).with_parent(None, property='addresses').all() == [Order(description="order 5")] + # test against None for parent? this can't be done with the current + # API since we don't know what mapper to use + # assert + # sess.query(Order).with_parent(None, property='addresses').all() + # == [Order(description="order 5")] def test_noparent(self): Item, User = self.classes.Item, self.classes.User @@ -2233,7 +2348,9 @@ class ParentTest(QueryTest, AssertsCompiledSQL): sess = create_session() i1 = sess.query(Item).filter_by(id=2).one() k = sess.query(Keyword).with_parent(i1).all() - assert [Keyword(name='red'), Keyword(name='small'), Keyword(name='square')] == k + assert [ + Keyword(name='red'), Keyword(name='small'), + Keyword(name='square')] == k def test_with_transient(self): User, Order = self.classes.User, self.classes.Order @@ -2245,13 +2362,17 @@ class ParentTest(QueryTest, AssertsCompiledSQL): utrans = User(id=u1.id) o = sess.query(Order).with_parent(utrans, 'orders') eq_( - [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")], + [ + Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")], o.all() ) o = sess.query(Order).filter(with_parent(utrans, 'orders')) eq_( - [Order(description="order 1"), Order(description="order 3"), Order(description="order 5")], + [ + Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")], o.all() ) @@ -2305,7 +2426,8 @@ class ParentTest(QueryTest, AssertsCompiledSQL): "addresses.email_address AS addresses_email_address FROM " "addresses WHERE :param_1 = addresses.user_id UNION SELECT " "addresses.id AS addresses_id, addresses.user_id AS " - "addresses_user_id, addresses.email_address AS addresses_email_address " + "addresses_user_id, addresses.email_address " + "AS addresses_email_address " "FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1", checkparams={'param_1': 7, 'param_2': 8}, ) @@ -2327,6 +2449,7 @@ class ParentTest(QueryTest, AssertsCompiledSQL): checkparams={'param_1': 7, 'param_2': 8}, ) + class SynonymTest(QueryTest): @classmethod @@ -2340,20 +2463,21 @@ class SynonymTest(QueryTest): cls.tables.item_keywords, cls.tables.addresses mapper(User, users, properties={ - 'name_syn':synonym('name'), - 'addresses':relationship(Address), - 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o - 'orders_syn':synonym('orders'), - 'orders_syn_2':synonym('orders_syn') + 'name_syn': synonym('name'), + 'addresses': relationship(Address), + 'orders': relationship( + Order, backref='user', order_by=orders.c.id), # o2m, m2o + 'orders_syn': synonym('orders'), + 'orders_syn_2': synonym('orders_syn') }) mapper(Address, addresses) mapper(Order, orders, properties={ - 'items':relationship(Item, secondary=order_items), #m2m - 'address':relationship(Address), # m2o - 'items_syn':synonym('items') + 'items': relationship(Item, secondary=order_items), # m2m + 'address': relationship(Address), # m2o + 'items_syn': synonym('items') }) mapper(Item, items, properties={ - 'keywords':relationship(Keyword, secondary=item_keywords) #m2m + 'keywords': relationship(Keyword, secondary=item_keywords) # m2m }) mapper(Keyword, keywords) @@ -2361,9 +2485,10 @@ class SynonymTest(QueryTest): User, Order = self.classes.User, self.classes.Order s = create_session() + def go(): result = s.query(User).filter_by(name='jack').\ - options(joinedload(User.orders_syn)).all() + options(joinedload(User.orders_syn)).all() eq_(result, [ User(id=7, name='jack', orders=[ Order(description='order 1'), @@ -2377,9 +2502,10 @@ class SynonymTest(QueryTest): User, Order = self.classes.User, self.classes.Order s = create_session() + def go(): result = s.query(User).filter_by(name='jack').\ - options(joinedload(User.orders_syn_2)).all() + options(joinedload(User.orders_syn_2)).all() eq_(result, [ User(id=7, name='jack', orders=[ Order(description='order 1'), @@ -2393,9 +2519,10 @@ class SynonymTest(QueryTest): User, Order = self.classes.User, self.classes.Order s = create_session() + def go(): result = s.query(User).filter_by(name='jack').\ - options(joinedload('orders_syn_2')).all() + options(joinedload('orders_syn_2')).all() eq_(result, [ User(id=7, name='jack', orders=[ Order(description='order 1'), @@ -2418,7 +2545,8 @@ class SynonymTest(QueryTest): ['orders_syn', 'items_syn'], ['orders_syn_2', 'items_syn'], ): - result = create_session().query(User).join(*j).filter_by(id=3).all() + result = create_session().query(User).join(*j).filter_by(id=3). \ + all() assert [User(id=7, name='jack'), User(id=9, name='fred')] == result def test_with_parent(self): @@ -2435,11 +2563,12 @@ class SynonymTest(QueryTest): sess = create_session() q = sess.query(User) - u1 = q.filter_by(**{nameprop:'jack'}).one() + u1 = q.filter_by(**{nameprop: 'jack'}).one() o = sess.query(Order).with_parent(u1, property=orderprop).all() - assert [Order(description="order 1"), - Order(description="order 3"), Order(description="order 5")] == o + assert [ + Order(description="order 1"), Order(description="order 3"), + Order(description="order 5")] == o class ImmediateTest(_fixtures.FixtureTest): @@ -2463,13 +2592,13 @@ class ImmediateTest(_fixtures.FixtureTest): sess = create_session() - assert_raises(sa.orm.exc.NoResultFound, - sess.query(User).filter(User.id == 99).one) + assert_raises( + sa.orm.exc.NoResultFound, + sess.query(User).filter(User.id == 99).one) eq_(sess.query(User).filter(User.id == 7).one().id, 7) - assert_raises(sa.orm.exc.MultipleResultsFound, - sess.query(User).one) + assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).one) assert_raises( sa.orm.exc.NoResultFound, @@ -2478,44 +2607,41 @@ class ImmediateTest(_fixtures.FixtureTest): eq_(sess.query(User.id, User.name).filter(User.id == 7).one(), (7, 'jack')) - assert_raises(sa.orm.exc.MultipleResultsFound, - sess.query(User.id, User.name).one) + assert_raises( + sa.orm.exc.MultipleResultsFound, + sess.query(User.id, User.name).one) - assert_raises(sa.orm.exc.NoResultFound, - (sess.query(User, Address). - join(User.addresses). - filter(Address.id == 99)).one) + assert_raises( + sa.orm.exc.NoResultFound, + (sess.query(User, Address).join(User.addresses). + filter(Address.id == 99)).one) eq_((sess.query(User, Address). join(User.addresses). filter(Address.id == 4)).one(), (User(id=8), Address(id=4))) - assert_raises(sa.orm.exc.MultipleResultsFound, - sess.query(User, Address).join(User.addresses).one) + assert_raises( + sa.orm.exc.MultipleResultsFound, + sess.query(User, Address).join(User.addresses).one) # this result returns multiple rows, the first # two rows being the same. but uniquing is # not applied for a column based result. - assert_raises(sa.orm.exc.MultipleResultsFound, - sess.query(User.id). - join(User.addresses). - filter(User.id.in_([8, 9])). - order_by(User.id). - one) + assert_raises( + sa.orm.exc.MultipleResultsFound, + sess.query(User.id).join(User.addresses). + filter(User.id.in_([8, 9])).order_by(User.id).one) # test that a join which ultimately returns # multiple identities across many rows still # raises, even though the first two rows are of # the same identity and unique filtering # is applied ([ticket:1688]) - assert_raises(sa.orm.exc.MultipleResultsFound, - sess.query(User). - join(User.addresses). - filter(User.id.in_([8, 9])). - order_by(User.id). - one) - + assert_raises( + sa.orm.exc.MultipleResultsFound, + sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])). + order_by(User.id).one) @testing.future def test_getslice(self): @@ -2533,7 +2659,9 @@ class ImmediateTest(_fixtures.FixtureTest): sess.query(User).filter_by(id=7).one()) assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar) - assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User.id, User.name).scalar) + assert_raises( + sa.orm.exc.MultipleResultsFound, + sess.query(User.id, User.name).scalar) def test_value(self): User = self.classes.User @@ -2547,6 +2675,7 @@ class ImmediateTest(_fixtures.FixtureTest): sess.bind = testing.db eq_(sess.query().value(sa.literal_column('1').label('x')), 1) + class ExecutionOptionsTest(QueryTest): def test_option_building(self): @@ -2567,23 +2696,23 @@ class ExecutionOptionsTest(QueryTest): q3_options = dict(foo='not bar', stream_results=True, answer=42) assert q3._execution_options == q3_options - def test_options_in_connection(self): User = self.classes.User execution_options = dict(foo='bar', stream_results=True) + class TQuery(Query): def instances(self, result, ctx): try: eq_( - result.connection._execution_options, - execution_options - ) + result.connection._execution_options, + execution_options) finally: result.close() return iter([]) - sess = create_session(bind=testing.db, autocommit=False, query_cls=TQuery) + sess = create_session( + bind=testing.db, autocommit=False, query_cls=TQuery) q1 = sess.query(User).execution_options(**execution_options) q1.all() @@ -2641,4 +2770,3 @@ class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): "SELECT x AS x HAVING x = 1", dialect=self._dialect(False) ) - diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py index 32bc27c92..ba31e4c7d 100644 --- a/test/orm/test_transaction.py +++ b/test/orm/test_transaction.py @@ -1,15 +1,13 @@ from __future__ import with_statement -from sqlalchemy.testing import eq_, assert_raises, \ - assert_raises_message, assert_warnings -from sqlalchemy import * -from sqlalchemy.orm import attributes -from sqlalchemy import exc as sa_exc, event -from sqlalchemy.orm import exc as orm_exc -from sqlalchemy.orm import * +from sqlalchemy import ( + testing, exc as sa_exc, event, String, Column, Table, select, func) +from sqlalchemy.testing import ( + fixtures, engines, eq_, assert_raises, assert_raises_message, + assert_warnings) +from sqlalchemy.orm import ( + exc as orm_exc, Session, mapper, sessionmaker, create_session, + relationship, attributes) from sqlalchemy.testing.util import gc_collect -from sqlalchemy import testing -from sqlalchemy.testing import fixtures -from sqlalchemy.testing import engines from test.orm._fixtures import FixtureTest @@ -52,8 +50,8 @@ class SessionTransactionTest(FixtureTest): u = User(name='ed') sess.add(u) sess.flush() - sess.commit() # commit does nothing - trans.rollback() # rolls back + sess.commit() # commit does nothing + trans.rollback() # rolls back assert len(sess.query(User).all()) == 0 sess.close() @@ -84,7 +82,6 @@ class SessionTransactionTest(FixtureTest): conn.close() raise - @testing.requires.savepoints def test_nested_accounting_new_items_removed(self): User, users = self.classes.User, self.tables.users @@ -129,22 +126,22 @@ class SessionTransactionTest(FixtureTest): session = create_session(bind=testing.db) session.begin() - session.connection().execute(users.insert().values(name='user1' - )) + session.connection().execute(users.insert().values( + name='user1')) session.begin(subtransactions=True) session.begin_nested() - session.connection().execute(users.insert().values(name='user2' - )) - assert session.connection().execute('select count(1) from users' - ).scalar() == 2 + session.connection().execute(users.insert().values( + name='user2')) + assert session.connection().execute( + 'select count(1) from users').scalar() == 2 session.rollback() - assert session.connection().execute('select count(1) from users' - ).scalar() == 1 - session.connection().execute(users.insert().values(name='user3' - )) + assert session.connection().execute( + 'select count(1) from users').scalar() == 1 + session.connection().execute(users.insert().values( + name='user3')) session.commit() - assert session.connection().execute('select count(1) from users' - ).scalar() == 2 + assert session.connection().execute( + 'select count(1) from users').scalar() == 2 @testing.requires.independent_connections def test_transactions_isolated(self): @@ -196,8 +193,8 @@ class SessionTransactionTest(FixtureTest): u = User(name='u1') sess.add(u) sess.flush() - sess.commit() # commit does nothing - sess.rollback() # rolls back + sess.commit() # commit does nothing + sess.rollback() # rolls back assert len(sess.query(User).all()) == 0 sess.close() @@ -276,7 +273,7 @@ class SessionTransactionTest(FixtureTest): u3 = User(name='u3') sess.add(u3) - sess.commit() # commit the nested transaction + sess.commit() # commit the nested transaction sess.rollback() eq_(set(sess.query(User).all()), set([u2])) @@ -346,13 +343,15 @@ class SessionTransactionTest(FixtureTest): sess = Session() to_flush = [User(name='ed'), User(name='jack'), User(name='wendy')] + @event.listens_for(sess, "after_flush_postexec") def add_another_user(session, ctx): if to_flush: session.add(to_flush.pop(0)) x = [1] - @event.listens_for(sess, "after_commit") + + @event.listens_for(sess, "after_commit") # noqa def add_another_user(session): x[0] += 1 @@ -379,7 +378,6 @@ class SessionTransactionTest(FixtureTest): sess.commit ) - def test_error_on_using_inactive_session_commands(self): users, User = self.tables.users, self.classes.User @@ -432,15 +430,11 @@ class SessionTransactionTest(FixtureTest): trans = sess.begin() trans.rollback() assert_raises_message( - sa_exc.ResourceClosedError, - "This transaction is closed", - trans.rollback - ) + sa_exc.ResourceClosedError, "This transaction is closed", + trans.rollback) assert_raises_message( - sa_exc.ResourceClosedError, - "This transaction is closed", - trans.commit - ) + sa_exc.ResourceClosedError, "This transaction is closed", + trans.commit) def test_deactive_status_check(self): sess = create_session() @@ -493,6 +487,7 @@ class SessionTransactionTest(FixtureTest): sess, u1 = self._inactive_flushed_session_fixture() u2 = User(name='u2') sess.add(u2) + def go(): sess.rollback() assert_warnings(go, @@ -506,6 +501,7 @@ class SessionTransactionTest(FixtureTest): def test_warning_on_using_inactive_session_dirty(self): sess, u1 = self._inactive_flushed_session_fixture() u1.name = 'newname' + def go(): sess.rollback() assert_warnings(go, @@ -519,6 +515,7 @@ class SessionTransactionTest(FixtureTest): def test_warning_on_using_inactive_session_delete(self): sess, u1 = self._inactive_flushed_session_fixture() sess.delete(u1) + def go(): sess.rollback() assert_warnings(go, @@ -558,6 +555,7 @@ class SessionTransactionTest(FixtureTest): assert session.transaction is not None, \ 'autocommit=False should start a new transaction' + class _LocalFixture(FixtureTest): run_setup_mappers = 'once' run_inserts = None @@ -567,10 +565,11 @@ class _LocalFixture(FixtureTest): def setup_mappers(cls): User, Address = cls.classes.User, cls.classes.Address users, addresses = cls.tables.users, cls.tables.addresses - mapper(User, users, properties={ - 'addresses':relationship(Address, backref='user', - cascade="all, delete-orphan", - order_by=addresses.c.id), + mapper( + User, users, properties={ + 'addresses': relationship( + Address, backref='user', cascade="all, delete-orphan", + order_by=addresses.c.id), }) mapper(Address, addresses) @@ -611,6 +610,7 @@ class FixtureDataTest(_LocalFixture): assert u1.name == 'will' + class CleanSavepointTest(FixtureTest): """test the behavior for [ticket:2452] - rollback on begin_nested() only expires objects tracked as being modified in that transaction. @@ -641,27 +641,30 @@ class CleanSavepointTest(FixtureTest): @testing.requires.savepoints def test_rollback_ignores_clean_on_savepoint(self): - User, users = self.classes.User, self.tables.users + def update_fn(s, u2): u2.name = 'u2modified' self._run_test(update_fn) @testing.requires.savepoints def test_rollback_ignores_clean_on_savepoint_agg_upd_eval(self): - User, users = self.classes.User, self.tables.users + User = self.classes.User + def update_fn(s, u2): - s.query(User).filter_by(name='u2').update(dict(name='u2modified'), - synchronize_session='evaluate') + s.query(User).filter_by(name='u2').update( + dict(name='u2modified'), synchronize_session='evaluate') self._run_test(update_fn) @testing.requires.savepoints def test_rollback_ignores_clean_on_savepoint_agg_upd_fetch(self): - User, users = self.classes.User, self.tables.users + User = self.classes.User + def update_fn(s, u2): s.query(User).filter_by(name='u2').update(dict(name='u2modified'), synchronize_session='fetch') self._run_test(update_fn) + class ContextManagerTest(FixtureTest): run_inserts = None __backend__ = True @@ -674,6 +677,7 @@ class ContextManagerTest(FixtureTest): mapper(User, users) sess = Session() + def go(): with sess.begin_nested(): sess.add(User()) # name can't be null @@ -708,6 +712,7 @@ class ContextManagerTest(FixtureTest): mapper(User, users) sess = Session(autocommit=True) + def go(): with sess.begin(): sess.add(User()) # name can't be null @@ -729,7 +734,7 @@ class AutoExpireTest(_LocalFixture): def test_expunge_pending_on_rollback(self): User = self.classes.User sess = self.session() - u2= User(name='newuser') + u2 = User(name='newuser') sess.add(u2) assert u2 in sess sess.rollback() @@ -738,7 +743,7 @@ class AutoExpireTest(_LocalFixture): def test_trans_pending_cleared_on_commit(self): User = self.classes.User sess = self.session() - u2= User(name='newuser') + u2 = User(name='newuser') sess.add(u2) assert u2 in sess sess.commit() @@ -836,8 +841,7 @@ class AutoExpireTest(_LocalFixture): u1.addresses.remove(a1) s.flush() - eq_(s.query(Address).filter(Address.email_address=='foo').all(), - []) + eq_(s.query(Address).filter(Address.email_address == 'foo').all(), []) s.rollback() assert a1 not in s.deleted assert u1.addresses == [a1] @@ -851,7 +855,6 @@ class AutoExpireTest(_LocalFixture): sess.commit() eq_(u1.name, 'newuser') - def test_concurrent_commit_pending(self): User = self.classes.User s1 = self.session() @@ -860,12 +863,13 @@ class AutoExpireTest(_LocalFixture): s1.commit() s2 = self.session() - u2 = s2.query(User).filter(User.name=='edward').one() + u2 = s2.query(User).filter(User.name == 'edward').one() u2.name = 'will' s2.commit() assert u1.name == 'will' + class TwoPhaseTest(_LocalFixture): __backend__ = True @@ -881,6 +885,7 @@ class TwoPhaseTest(_LocalFixture): assert u not in s + class RollbackRecoverTest(_LocalFixture): __backend__ = True @@ -943,9 +948,10 @@ class RollbackRecoverTest(_LocalFixture): s.commit() eq_( s.query(User).all(), - [User(id=1, name='edward', - addresses=[Address(email_address='foober')])] - ) + [ + User( + id=1, name='edward', + addresses=[Address(email_address='foober')])]) class SavepointTest(_LocalFixture): @@ -965,18 +971,19 @@ class SavepointTest(_LocalFixture): u1.name = 'edward' u2.name = 'jackward' s.add_all([u3, u4]) - eq_(s.query(User.name).order_by(User.id).all(), - [('edward',), ('jackward',), ('wendy',), ('foo',)]) + eq_( + s.query(User.name).order_by(User.id).all(), + [('edward',), ('jackward',), ('wendy',), ('foo',)]) s.rollback() assert u1.name == 'ed' assert u2.name == 'jack' - eq_(s.query(User.name).order_by(User.id).all(), - [('ed',), ('jack',)]) + eq_( + s.query(User.name).order_by(User.id).all(), + [('ed',), ('jack',)]) s.commit() assert u1.name == 'ed' assert u2.name == 'jack' - eq_(s.query(User.name).order_by(User.id).all(), - [('ed',), ('jack',)]) + eq_(s.query(User.name).order_by(User.id).all(), [('ed',), ('jack',)]) @testing.requires.savepoints def test_savepoint_delete(self): @@ -1006,19 +1013,23 @@ class SavepointTest(_LocalFixture): u1.name = 'edward' u2.name = 'jackward' s.add_all([u3, u4]) - eq_(s.query(User.name).order_by(User.id).all(), - [('edward',), ('jackward',), ('wendy',), ('foo',)]) + eq_( + s.query(User.name).order_by(User.id).all(), + [('edward',), ('jackward',), ('wendy',), ('foo',)]) s.commit() + def go(): assert u1.name == 'edward' assert u2.name == 'jackward' - eq_(s.query(User.name).order_by(User.id).all(), - [('edward',), ('jackward',), ('wendy',), ('foo',)]) + eq_( + s.query(User.name).order_by(User.id).all(), + [('edward',), ('jackward',), ('wendy',), ('foo',)]) self.assert_sql_count(testing.db, go, 1) s.commit() - eq_(s.query(User.name).order_by(User.id).all(), - [('edward',), ('jackward',), ('wendy',), ('foo',)]) + eq_( + s.query(User.name).order_by(User.id).all(), + [('edward',), ('jackward',), ('wendy',), ('foo',)]) @testing.requires.savepoints def test_savepoint_rollback_collections(self): @@ -1028,30 +1039,40 @@ class SavepointTest(_LocalFixture): s.add(u1) s.commit() - u1.name='edward' + u1.name = 'edward' u1.addresses.append(Address(email_address='bar')) s.begin_nested() u2 = User(name='jack', addresses=[Address(email_address='bat')]) s.add(u2) - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), - Address(email_address='bar')]), + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), User(name='jack', addresses=[Address(email_address='bat')]) - ] - ) + ]) s.rollback() - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), - Address(email_address='bar')]), - ] - ) + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), + ]) s.commit() - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), - Address(email_address='bar')]), + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), ] ) @@ -1063,28 +1084,43 @@ class SavepointTest(_LocalFixture): s.add(u1) s.commit() - u1.name='edward' + u1.name = 'edward' u1.addresses.append(Address(email_address='bar')) s.begin_nested() u2 = User(name='jack', addresses=[Address(email_address='bat')]) s.add(u2) - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]), + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), User(name='jack', addresses=[Address(email_address='bat')]) ] ) s.commit() - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]), + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), User(name='jack', addresses=[Address(email_address='bat')]) ] ) s.commit() - eq_(s.query(User).order_by(User.id).all(), + eq_( + s.query(User).order_by(User.id).all(), [ - User(name='edward', addresses=[Address(email_address='foo'), Address(email_address='bar')]), + User( + name='edward', + addresses=[ + Address(email_address='foo'), + Address(email_address='bar')]), User(name='jack', addresses=[Address(email_address='bat')]) ] ) @@ -1095,7 +1131,7 @@ class SavepointTest(_LocalFixture): sess = self.session() sess.begin_nested() - u2= User(name='newuser') + u2 = User(name='newuser') sess.add(u2) assert u2 in sess sess.rollback() @@ -1128,7 +1164,8 @@ class AccountingFlagsTest(_LocalFixture): sess.add(u1) sess.commit() - testing.db.execute(users.update(users.c.name=='ed').values(name='edward')) + testing.db.execute( + users.update(users.c.name == 'ed').values(name='edward')) assert u1.name == 'ed' sess.expire_all() @@ -1145,7 +1182,8 @@ class AccountingFlagsTest(_LocalFixture): u1.name = 'edwardo' sess.rollback() - testing.db.execute(users.update(users.c.name=='ed').values(name='edward')) + testing.db.execute( + users.update(users.c.name == 'ed').values(name='edward')) assert u1.name == 'edwardo' sess.expire_all() @@ -1162,12 +1200,14 @@ class AccountingFlagsTest(_LocalFixture): u1.name = 'edwardo' sess.rollback() - testing.db.execute(users.update(users.c.name=='ed').values(name='edward')) + testing.db.execute( + users.update(users.c.name == 'ed').values(name='edward')) assert u1.name == 'edwardo' sess.commit() - assert testing.db.execute(select([users.c.name])).fetchall() == [('edwardo',)] + assert testing.db.execute(select([users.c.name])).fetchall() == \ + [('edwardo',)] assert u1.name == 'edwardo' sess.delete(u1) @@ -1176,8 +1216,9 @@ class AccountingFlagsTest(_LocalFixture): def test_preflush_no_accounting(self): User, users = self.classes.User, self.tables.users - sess = Session(_enable_transaction_accounting=False, - autocommit=True, autoflush=False) + sess = Session( + _enable_transaction_accounting=False, autocommit=True, + autoflush=False) u1 = User(name='ed') sess.add(u1) sess.flush() @@ -1190,7 +1231,8 @@ class AccountingFlagsTest(_LocalFixture): sess.rollback() sess.begin() - assert testing.db.execute(select([users.c.name])).fetchall() == [('ed',)] + assert testing.db.execute(select([users.c.name])).fetchall() == \ + [('ed',)] class AutoCommitTest(_LocalFixture): @@ -1220,6 +1262,7 @@ class AutoCommitTest(_LocalFixture): sess = create_session(autocommit=True) fail = False + def fail_fn(*arg, **kw): if fail: raise Exception("commit fails") @@ -1251,6 +1294,7 @@ class AutoCommitTest(_LocalFixture): sess = create_session(autocommit=True) fail = False + def fail_fn(*arg, **kw): if fail: raise Exception("commit fails") @@ -1296,14 +1340,13 @@ class AutoCommitTest(_LocalFixture): assert 'id' not in u1.__dict__ eq_(u1.id, 3) + class NaturalPKRollbackTest(fixtures.MappedTest): __backend__ = True @classmethod def define_tables(cls, metadata): - Table('users', metadata, - Column('name', String(50), primary_key=True) - ) + Table('users', metadata, Column('name', String(50), primary_key=True)) @classmethod def setup_classes(cls): @@ -1317,10 +1360,7 @@ class NaturalPKRollbackTest(fixtures.MappedTest): session = sessionmaker()() - u1, u2, u3= \ - User(name='u1'),\ - User(name='u2'),\ - User(name='u3') + u1, u2, u3 = User(name='u1'), User(name='u2'), User(name='u3') session.add_all([u1, u2, u3]) @@ -1424,5 +1464,3 @@ class NaturalPKRollbackTest(fixtures.MappedTest): assert u2 not in s assert s.identity_map[(User, ('u1',))] is u1 - - diff --git a/test/orm/test_versioning.py b/test/orm/test_versioning.py index 7a6dc106e..614909f6a 100644 --- a/test/orm/test_versioning.py +++ b/test/orm/test_versioning.py @@ -2,21 +2,22 @@ import datetime import sqlalchemy as sa from sqlalchemy.testing import engines from sqlalchemy import testing -from sqlalchemy import Integer, String, Date, ForeignKey, literal_column, \ - orm, exc, select, TypeDecorator +from sqlalchemy import ( + Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator) from sqlalchemy.testing.schema import Table, Column -from sqlalchemy.orm import mapper, relationship, Session, \ - create_session, column_property, sessionmaker,\ - exc as orm_exc -from sqlalchemy.testing import eq_, ne_, assert_raises, assert_raises_message -from sqlalchemy.testing import fixtures -from test.orm import _fixtures -from sqlalchemy.testing.assertsql import AllOf, CompiledSQL +from sqlalchemy.orm import ( + mapper, relationship, Session, create_session, sessionmaker, + exc as orm_exc) +from sqlalchemy.testing import ( + eq_, assert_raises, assert_raises_message, fixtures) +from sqlalchemy.testing.assertsql import CompiledSQL import uuid + def make_uuid(): return uuid.uuid4().hex + class VersioningTest(fixtures.MappedTest): __backend__ = True @@ -36,8 +37,7 @@ class VersioningTest(fixtures.MappedTest): def _fixture(self): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id) + mapper(Foo, version_table, version_id_col=version_table.c.version_id) s1 = Session() return s1 @@ -54,12 +54,13 @@ class VersioningTest(fixtures.MappedTest): s1.add_all((f1, f2)) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' assert_raises(sa.exc.SAWarning, s1.commit) finally: testing.db.dialect.supports_sane_rowcount = save - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_basic(self): Foo = self.classes.Foo @@ -69,23 +70,23 @@ class VersioningTest(fixtures.MappedTest): s1.add_all((f1, f2)) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' s1.commit() s2 = create_session(autocommit=False) f1_s = s2.query(Foo).get(f1.id) - f1_s.value='f1rev3' + f1_s.value = 'f1rev3' s2.commit() - f1.value='f1rev3mine' + f1.value = 'f1rev3mine' # Only dialects with a sane rowcount can detect the # StaleDataError if testing.db.dialect.supports_sane_rowcount: - assert_raises_message(sa.orm.exc.StaleDataError, - r"UPDATE statement on table 'version_table' expected " - r"to update 1 row\(s\); 0 were matched.", - s1.commit), + assert_raises_message( + sa.orm.exc.StaleDataError, + r"UPDATE statement on table 'version_table' expected " + r"to update 1 row\(s\); 0 were matched.", s1.commit), s1.rollback() else: s1.commit() @@ -94,7 +95,7 @@ class VersioningTest(fixtures.MappedTest): f1 = s1.query(Foo).get(f1.id) f2 = s1.query(Foo).get(f2.id) - f1_s.value='f1rev4' + f1_s.value = 'f1rev4' s2.commit() s1.delete(f1) @@ -109,7 +110,8 @@ class VersioningTest(fixtures.MappedTest): else: s1.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_bump_version(self): """test that version number can be bumped. @@ -145,11 +147,11 @@ class VersioningTest(fixtures.MappedTest): @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections def test_versioncheck(self): - """query.with_lockmode performs a 'version check' on an already loaded instance""" + """query.with_lockmode performs a 'version check' on an already loaded + instance""" Foo = self.classes.Foo - s1 = self._fixture() f1s1 = Foo(value='f1 value') s1.add(f1s1) @@ -157,16 +159,16 @@ class VersioningTest(fixtures.MappedTest): s2 = create_session(autocommit=False) f1s2 = s2.query(Foo).get(f1s1.id) - f1s2.value='f1 new value' + f1s2.value = 'f1 new value' s2.commit() # load, version is wrong assert_raises_message( - sa.orm.exc.StaleDataError, - r"Instance .* has version id '\d+' which does not " - r"match database-loaded version id '\d+'", - s1.query(Foo).with_lockmode('read').get, f1s1.id - ) + sa.orm.exc.StaleDataError, + r"Instance .* has version id '\d+' which does not " + r"match database-loaded version id '\d+'", + s1.query(Foo).with_lockmode('read').get, f1s1.id + ) # reload it - this expires the old version first s1.refresh(f1s1, lockmode='read') @@ -178,16 +180,15 @@ class VersioningTest(fixtures.MappedTest): s1.close() s1.query(Foo).with_lockmode('read').get(f1s1.id) - @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections @testing.requires.update_nowait def test_versioncheck_for_update(self): - """query.with_lockmode performs a 'version check' on an already loaded instance""" + """query.with_lockmode performs a 'version check' on an already loaded + instance""" Foo = self.classes.Foo - s1 = self._fixture() f1s1 = Foo(value='f1 value') s1.add(f1s1) @@ -196,7 +197,7 @@ class VersioningTest(fixtures.MappedTest): s2 = create_session(autocommit=False) f1s2 = s2.query(Foo).get(f1s1.id) s2.refresh(f1s2, lockmode='update') - f1s2.value='f1 new value' + f1s2.value = 'f1 new value' assert_raises( exc.DBAPIError, @@ -211,7 +212,8 @@ class VersioningTest(fixtures.MappedTest): @testing.emits_warning(r'.*does not support updated rowcount') @engines.close_open_connections def test_noversioncheck(self): - """test query.with_lockmode works when the mapper has no version id col""" + """test query.with_lockmode works when the mapper has no version id + col""" Foo, version_table = self.classes.Foo, self.tables.version_table @@ -226,7 +228,8 @@ class VersioningTest(fixtures.MappedTest): assert f1s2.id == f1s1.id assert f1s2.value == f1s1.value - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_no_version(self): Foo = self.classes.Foo @@ -244,7 +247,8 @@ class VersioningTest(fixtures.MappedTest): s1.commit() eq_(f3.version_id, 3) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_correct_version(self): Foo = self.classes.Foo @@ -262,7 +266,8 @@ class VersioningTest(fixtures.MappedTest): s1.commit() eq_(f3.version_id, 3) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_incorrect_version(self): Foo = self.classes.Foo @@ -284,7 +289,8 @@ class VersioningTest(fixtures.MappedTest): s1.merge, f2 ) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_merge_incorrect_version_not_in_session(self): Foo = self.classes.Foo @@ -308,6 +314,7 @@ class VersioningTest(fixtures.MappedTest): s1.merge, f2 ) + class ColumnTypeTest(fixtures.MappedTest): __backend__ = True @@ -315,6 +322,7 @@ class ColumnTypeTest(fixtures.MappedTest): def define_tables(cls, metadata): class SpecialType(TypeDecorator): impl = Date + def process_bind_param(self, value, dialect): assert isinstance(value, datetime.date) return value @@ -332,8 +340,7 @@ class ColumnTypeTest(fixtures.MappedTest): def _fixture(self): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id) + mapper(Foo, version_table, version_id_col=version_table.c.version_id) s1 = Session() return s1 @@ -346,19 +353,23 @@ class ColumnTypeTest(fixtures.MappedTest): s1.add(f1) s1.commit() - f1.value='f1rev2' + f1.value = 'f1rev2' s1.commit() + class RowSwitchTest(fixtures.MappedTest): __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('p', metadata, + Table( + 'p', metadata, Column('id', String(10), primary_key=True), Column('version_id', Integer, default=1, nullable=False), Column('data', String(50)) ) - Table('c', metadata, + Table( + 'c', metadata, Column('id', String(10), ForeignKey('p.id'), primary_key=True), Column('version_id', Integer, default=1, nullable=False), Column('data', String(50)) @@ -366,25 +377,25 @@ class RowSwitchTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class P(cls.Basic): pass + class C(cls.Basic): pass @classmethod def setup_mappers(cls): - p, c, C, P = (cls.tables.p, - cls.tables.c, - cls.classes.C, - cls.classes.P) + p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P - mapper(P, p, version_id_col=p.c.version_id, - properties={ - 'c':relationship(C, uselist=False, cascade='all, delete-orphan') - }) + mapper( + P, p, version_id_col=p.c.version_id, properties={ + 'c': relationship( + C, uselist=False, cascade='all, delete-orphan')}) mapper(C, c, version_id_col=c.c.version_id) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_row_switch(self): P = self.classes.P @@ -398,7 +409,8 @@ class RowSwitchTest(fixtures.MappedTest): session.add(P(id='P1', data="really a row-switch")) session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_child_row_switch(self): P, C = self.classes.P, self.classes.C @@ -417,16 +429,20 @@ class RowSwitchTest(fixtures.MappedTest): p.c = C(data='child row-switch') session.commit() + class AlternateGeneratorTest(fixtures.MappedTest): __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('p', metadata, + Table( + 'p', metadata, Column('id', String(10), primary_key=True), Column('version_id', String(32), nullable=False), Column('data', String(50)) ) - Table('c', metadata, + Table( + 'c', metadata, Column('id', String(10), ForeignKey('p.id'), primary_key=True), Column('version_id', String(32), nullable=False), Column('data', String(50)) @@ -434,28 +450,31 @@ class AlternateGeneratorTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class P(cls.Basic): pass + class C(cls.Basic): pass @classmethod def setup_mappers(cls): - p, c, C, P = (cls.tables.p, - cls.tables.c, - cls.classes.C, - cls.classes.P) + p, c, C, P = cls.tables.p, cls.tables.c, cls.classes.C, cls.classes.P - mapper(P, p, version_id_col=p.c.version_id, + mapper( + P, p, version_id_col=p.c.version_id, version_id_generator=lambda x: make_uuid(), properties={ - 'c': relationship(C, uselist=False, cascade='all, delete-orphan') - }) - mapper(C, c, version_id_col=c.c.version_id, - version_id_generator=lambda x: make_uuid(), + 'c': relationship( + C, uselist=False, cascade='all, delete-orphan') + }) + mapper( + C, c, version_id_col=c.c.version_id, + version_id_generator=lambda x: make_uuid(), ) - @testing.emits_warning_on('+zxjdbc', r'.*does not support updated rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support updated rowcount') def test_row_switch(self): P = self.classes.P @@ -469,7 +488,8 @@ class AlternateGeneratorTest(fixtures.MappedTest): session.add(P(id='P1', data="really a row-switch")) session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_child_row_switch_one(self): P, C = self.classes.P, self.classes.C @@ -488,7 +508,8 @@ class AlternateGeneratorTest(fixtures.MappedTest): p.c = C(data='child row-switch') session.commit() - @testing.emits_warning_on('+zxjdbc', r'.*does not support (update|delete)d rowcount') + @testing.emits_warning_on( + '+zxjdbc', r'.*does not support (update|delete)d rowcount') def test_child_row_switch_two(self): P = self.classes.P @@ -525,20 +546,26 @@ class AlternateGeneratorTest(fixtures.MappedTest): else: sess2.commit + class InheritanceTwoVersionIdsTest(fixtures.MappedTest): """Test versioning where both parent/child table have a versioning column. """ __backend__ = True + @classmethod def define_tables(cls, metadata): - Table('base', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table( + 'base', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('version_id', Integer, nullable=True), Column('data', String(50)) ) - Table('sub', metadata, + Table( + 'sub', metadata, Column('id', Integer, ForeignKey('base.id'), primary_key=True), Column('version_id', Integer, nullable=False), Column('sub_data', String(50)) @@ -546,19 +573,19 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): @classmethod def setup_classes(cls): + class Base(cls.Basic): pass + class Sub(Base): pass def test_base_both(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) mapper(Sub, sub, inherits=Base) session = Session() @@ -570,13 +597,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), 1) def test_sub_both(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) mapper(Sub, sub, inherits=Base) session = Session() @@ -591,14 +616,12 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), 1) def test_sub_only(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) mapper(Base, base) - mapper(Sub, sub, inherits=Base, - version_id_col=sub.c.version_id) + mapper(Sub, sub, inherits=Base, version_id_col=sub.c.version_id) session = Session() s1 = Sub(data='s1', sub_data='s1') @@ -612,13 +635,11 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): eq_(select([base.c.version_id]).scalar(), None) def test_mismatch_version_col_warning(self): - Base, sub, base, Sub = (self.classes.Base, - self.tables.sub, - self.tables.base, - self.classes.Sub) + Base, sub, base, Sub = ( + self.classes.Base, self.tables.sub, self.tables.base, + self.classes.Sub) - mapper(Base, base, - version_id_col=base.c.version_id) + mapper(Base, base, version_id_col=base.c.version_id) assert_raises_message( exc.SAWarning, @@ -627,9 +648,8 @@ class InheritanceTwoVersionIdsTest(fixtures.MappedTest): "automatically populate the inherited versioning column. " "version_id_col should only be specified on " "the base-most mapper that includes versioning.", - mapper, - Sub, sub, inherits=Base, - version_id_col=sub.c.version_id) + mapper, Sub, sub, inherits=Base, + version_id_col=sub.c.version_id) class ServerVersioningTest(fixtures.MappedTest): @@ -659,27 +679,32 @@ class ServerVersioningTest(fixtures.MappedTest): stmt._counter = str(next(counter)) return stmt._counter - Table('version_table', metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('version_id', Integer, nullable=False, - default=IncDefault(), onupdate=IncDefault()), - Column('value', String(40), nullable=False)) + Table( + 'version_table', metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column( + 'version_id', Integer, nullable=False, + default=IncDefault(), onupdate=IncDefault()), + Column('value', String(40), nullable=False)) @classmethod def setup_classes(cls): + class Foo(cls.Basic): pass + class Bar(cls.Basic): pass def _fixture(self, expire_on_commit=True): Foo, version_table = self.classes.Foo, self.tables.version_table - mapper(Foo, version_table, - version_id_col=version_table.c.version_id, - version_id_generator=False, - ) + mapper( + Foo, version_table, version_id_col=version_table.c.version_id, + version_id_generator=False, + ) s1 = Session(expire_on_commit=expire_on_commit) return s1 @@ -691,21 +716,22 @@ class ServerVersioningTest(fixtures.MappedTest): sess.add(f1) statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "INSERT INTO version_table (version_id, value) " - "VALUES (1, :value)", - lambda ctx: [{'value': 'f1'}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "INSERT INTO version_table (version_id, value) " + "VALUES (1, :value)", + lambda ctx: [{'value': 'f1'}] + ) ] if not testing.db.dialect.implicit_returning: # DBs without implicit returning, we must immediately # SELECT for the new version id statements.append( CompiledSQL( - "SELECT version_table.version_id AS version_table_version_id " + "SELECT version_table.version_id " + " AS version_table_version_id " "FROM version_table WHERE version_table.id = :param_1", lambda ctx: [{"param_1": 1}] ) @@ -722,30 +748,32 @@ class ServerVersioningTest(fixtures.MappedTest): f1.value = 'f2' statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "UPDATE version_table SET version_id=2, value=:value " - "WHERE version_table.id = :version_table_id AND " - "version_table.version_id = :version_table_version_id", - lambda ctx: [{"version_table_id": 1, - "version_table_version_id": 1, "value": "f2"}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "UPDATE version_table SET version_id=2, value=:value " + "WHERE version_table.id = :version_table_id AND " + "version_table.version_id = :version_table_version_id", + lambda ctx: [ + { + "version_table_id": 1, + "version_table_version_id": 1, "value": "f2"}] + ) ] if not testing.db.dialect.implicit_returning: # DBs without implicit returning, we must immediately # SELECT for the new version id statements.append( CompiledSQL( - "SELECT version_table.version_id AS version_table_version_id " + "SELECT version_table.version_id " + " AS version_table_version_id " "FROM version_table WHERE version_table.id = :param_1", lambda ctx: [{"param_1": 1}] ) ) self.assert_sql_execution(testing.db, sess.flush, *statements) - def test_delete_col(self): sess = self._fixture() @@ -756,15 +784,15 @@ class ServerVersioningTest(fixtures.MappedTest): sess.delete(f1) statements = [ - # note that the assertsql tests the rule against - # "default" - on a "returning" backend, the statement - # includes "RETURNING" - CompiledSQL( - "DELETE FROM version_table " - "WHERE version_table.id = :id AND " - "version_table.version_id = :version_id", - lambda ctx: [{"id": 1, "version_id": 1}] - ) + # note that the assertsql tests the rule against + # "default" - on a "returning" backend, the statement + # includes "RETURNING" + CompiledSQL( + "DELETE FROM version_table " + "WHERE version_table.id = :id AND " + "version_table.version_id = :version_id", + lambda ctx: [{"id": 1, "version_id": 1}] + ) ] self.assert_sql_execution(testing.db, sess.flush, *statements) @@ -817,29 +845,32 @@ class ServerVersioningTest(fixtures.MappedTest): sess.commit ) + class ManualVersionTest(fixtures.MappedTest): run_define_tables = 'each' __backend__ = True @classmethod def define_tables(cls, metadata): - Table("a", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(30)), - Column('vid', Integer) - ) + Table( + "a", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(30)), + Column('vid', Integer) + ) @classmethod def setup_classes(cls): class A(cls.Basic): pass - @classmethod def setup_mappers(cls): - mapper(cls.classes.A, cls.tables.a, - version_id_col=cls.tables.a.c.vid, - version_id_generator=False) + mapper( + cls.classes.A, cls.tables.a, version_id_col=cls.tables.a.c.vid, + version_id_generator=False) def test_insert(self): sess = Session() @@ -904,4 +935,4 @@ class ManualVersionTest(fixtures.MappedTest): a1.vid = 2 sess.commit() - eq_(a1.vid, 2)
\ No newline at end of file + eq_(a1.vid, 2) |