diff options
Diffstat (limited to 'test/sql/test_update.py')
-rw-r--r-- | test/sql/test_update.py | 646 |
1 files changed, 409 insertions, 237 deletions
diff --git a/test/sql/test_update.py b/test/sql/test_update.py index b46489cd2..a8df86cd2 100644 --- a/test/sql/test_update.py +++ b/test/sql/test_update.py @@ -1,55 +1,53 @@ -from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL -import datetime from sqlalchemy import * -from sqlalchemy import exc, sql, util -from sqlalchemy.engine import default, base from sqlalchemy import testing -from sqlalchemy.testing import fixtures -from sqlalchemy.testing.schema import Table, Column from sqlalchemy.dialects import mysql +from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures +from sqlalchemy.testing.schema import Table, Column + class _UpdateFromTestBase(object): @classmethod def define_tables(cls, metadata): + Table('mytable', metadata, + Column('myid', Integer), + Column('name', String(30)), + Column('description', String(50))) + Table('myothertable', metadata, + Column('otherid', Integer), + Column('othername', String(30))) Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('name', String(30), nullable=False), - ) - + test_needs_autoincrement=True), + Column('name', String(30), nullable=False)) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), Column('name', String(30), nullable=False), - Column('email_address', String(50), nullable=False), - ) - - Table("dingalings", metadata, + Column('email_address', String(50), nullable=False)) + Table('dingalings', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('address_id', None, ForeignKey('addresses.id')), - Column('data', String(30)), - ) + Column('data', String(30))) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name'), (7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck') ), - addresses = ( ('id', 'user_id', 'name', 'email_address'), - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed@wood.com"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed@wood.com'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com') ), dingalings = ( ('id', 'address_id', 'data'), @@ -59,288 +57,462 @@ class _UpdateFromTestBase(object): ) -class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): +class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_update_1(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 7), + 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1', + params={table1.c.name: 'fred'}) + + def test_update_2(self): + table1 = self.tables.mytable + + self.assert_compile( + table1.update(). + where(table1.c.myid == 7). + values({table1.c.myid: 5}), + 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1', + checkparams={'myid': 5, 'myid_1': 7}) + + def test_update_3(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 7), + 'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1', + params={'name': 'fred'}) + + def test_update_4(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, values={table1.c.name: table1.c.myid}), + 'UPDATE mytable SET name=mytable.myid') + + def test_update_5(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, + whereclause=table1.c.name == bindparam('crit'), + values={table1.c.name: 'hi'}), + 'UPDATE mytable SET name=:name WHERE mytable.name = :crit', + params={'crit': 'notthere'}, + checkparams={'crit': 'notthere', 'name': 'hi'}) + + def test_update_6(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, + table1.c.myid == 12, + values={table1.c.name: table1.c.myid}), + 'UPDATE mytable ' + 'SET name=mytable.myid, description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'description': 'test'}, + checkparams={'description': 'test', 'myid_1': 12}) + + def test_update_7(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 12, values={table1.c.myid: 9}), + 'UPDATE mytable ' + 'SET myid=:myid, description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'myid_1': 12, 'myid': 9, 'description': 'test'}) + + def test_update_8(self): + table1 = self.tables.mytable + + self.assert_compile( + update(table1, table1.c.myid == 12), + 'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1', + params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12}) + + def test_update_9(self): + table1 = self.tables.mytable + + s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'}) + c = s.compile(column_keys=['id', 'name']) + eq_(str(s), str(c)) + + def test_update_10(self): + table1 = self.tables.mytable + + v1 = {table1.c.name: table1.c.myid} + v2 = {table1.c.name: table1.c.name + 'foo'} + self.assert_compile( + update(table1, table1.c.myid == 12, values=v1).values(v2), + 'UPDATE mytable ' + 'SET ' + 'name=(mytable.name || :name_1), ' + 'description=:description ' + 'WHERE mytable.myid = :myid_1', + params={'description': 'test'}) + + def test_update_11(self): + table1 = self.tables.mytable + + values = { + table1.c.name: table1.c.name + 'lala', + table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho')) + } + self.assert_compile(update(table1, + (table1.c.myid == func.hoho(4)) & + (table1.c.name == literal('foo') + + table1.c.name + literal('lala')), + values=values), + 'UPDATE mytable ' + 'SET ' + 'myid=do_stuff(mytable.myid, :param_1), ' + 'name=(mytable.name || :name_1) ' + 'WHERE ' + 'mytable.myid = hoho(:hoho_1) AND ' + 'mytable.name = :param_2 || mytable.name || :param_3') + + def test_prefix_with(self): + table1 = self.tables.mytable + + stmt = table1.update().\ + prefix_with('A', 'B', dialect='mysql').\ + prefix_with('C', 'D') + + self.assert_compile(stmt, + 'UPDATE C D mytable SET myid=:myid, name=:name, ' + 'description=:description') + + self.assert_compile(stmt, + 'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s', + dialect=mysql.dialect()) + + def test_alias(self): + table1 = self.tables.mytable + talias1 = table1.alias('t1') + + self.assert_compile(update(talias1, talias1.c.myid == 7), + 'UPDATE mytable AS t1 ' + 'SET name=:name ' + 'WHERE t1.myid = :myid_1', + params={table1.c.name: 'fred'}) + + self.assert_compile(update(talias1, table1.c.myid == 7), + 'UPDATE mytable AS t1 ' + 'SET name=:name ' + 'FROM mytable ' + 'WHERE mytable.myid = :myid_1', + params={table1.c.name: 'fred'}) + + def test_update_to_expression(self): + """test update from an expression. + + this logic is triggered currently by a left side that doesn't + have a key. The current supported use case is updating the index + of a Postgresql ARRAY type. + + """ + table1 = self.tables.mytable + expr = func.foo(table1.c.myid) + assert not hasattr(expr, 'key') + self.assert_compile(table1.update().values({expr: 'bar'}), + 'UPDATE mytable SET foo(myid)=:param_1') + + +class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, + AssertsCompiledSQL): __dialect__ = 'default' run_create_tables = run_inserts = run_deletes = None def test_render_table(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users SET name=:name FROM addresses " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM addresses ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1', + checkparams={u'email_address_1': 'e1', 'name': 'newname'}) def test_render_multi_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 2, + 'name': 'newname' + } + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1').\ - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==2), - "UPDATE users SET name=:name FROM addresses, " - "dingalings WHERE users.id = addresses.user_id " - "AND addresses.email_address = :email_address_1 " - "AND addresses.id = dingalings.address_id AND " - "dingalings.id = :id_1", - checkparams={u'email_address_1': 'e1', u'id_1': 2, - 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 2), + 'UPDATE users ' + 'SET name=:name ' + 'FROM addresses, dingalings ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1 AND ' + 'addresses.id = dingalings.address_id AND ' + 'dingalings.id = :id_1', + checkparams=checkparams) def test_render_table_mysql(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users, addresses SET users.name=%s " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = %s", + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users, addresses ' + 'SET users.name=%s ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = %s', checkparams={u'email_address_1': 'e1', 'name': 'newname'}, - dialect=mysql.dialect() - ) + dialect=mysql.dialect()) def test_render_subquery(self): users, addresses = self.tables.users, self.tables.addresses - subq = select([addresses.c.id, - addresses.c.user_id, - addresses.c.email_address]).\ - where(addresses.c.id==7).alias() + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 7, + 'name': 'newname' + } + + cols = [ + addresses.c.id, + addresses.c.user_id, + addresses.c.email_address + ] + + subq = select(cols).where(addresses.c.id == 7).alias() self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==subq.c.user_id).\ - where(subq.c.email_address=='e1'), - "UPDATE users SET name=:name FROM " - "(SELECT addresses.id AS id, addresses.user_id " - "AS user_id, addresses.email_address AS " - "email_address FROM addresses WHERE addresses.id = " - ":id_1) AS anon_1 WHERE users.id = anon_1.user_id " - "AND anon_1.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', - u'id_1': 7, 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == subq.c.user_id). + where(subq.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM (' + 'SELECT ' + 'addresses.id AS id, ' + 'addresses.user_id AS user_id, ' + 'addresses.email_address AS email_address ' + 'FROM addresses ' + 'WHERE addresses.id = :id_1' + ') AS anon_1 ' + 'WHERE users.id = anon_1.user_id ' + 'AND anon_1.email_address = :email_address_1', + checkparams=checkparams) + class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest): @testing.requires.update_from def test_exec_two_table(self): users, addresses = self.tables.users, self.tables.addresses + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.requires.update_from def test_exec_two_table_plus_alias(self): users, addresses = self.tables.users, self.tables.addresses - a1 = addresses.alias() + a1 = addresses.alias() testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==a1.c.user_id).\ - where(users.c.name=='ed').\ - where(a1.c.id==addresses.c.id) - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == a1.c.user_id). + where(users.c.name == 'ed'). + where(a1.c.id == addresses.c.id) ) + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + @testing.requires.update_from def test_exec_three_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed'). - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==1), - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id) - ).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 1)) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.only_on('mysql', 'Multi table update') def test_exec_multitable(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (7, 'jack'), - (8, 'ed2'), - (9, 'fred'), - (10, 'chuck') - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (7, 'jack'), + (8, 'ed2'), + (9, 'fred'), + (10, 'chuck')] + self._assert_users(users, expected) -class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest): + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + +class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, + fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('name', String(30), nullable=False), - Column('some_update', String(30), onupdate="im the update") - ) + Column('some_update', String(30), onupdate='im the update')) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), - Column('email_address', String(50), nullable=False), - ) + Column('email_address', String(50), nullable=False)) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name', 'some_update'), (8, 'ed', 'value'), (9, 'fred', 'value'), ), - - addresses = ( + addresses=( ('id', 'user_id', 'email_address'), - (2, 8, "ed@wood.com"), - (3, 8, "ed@bettyboop.com"), - (4, 9, "fred@fred.com") + (2, 8, 'ed@wood.com'), + (3, 8, 'ed@bettyboop.com'), + (4, 9, 'fred@fred.com') ), ) @testing.only_on('mysql', 'Multi table update') def test_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + ret = testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - set(ret.prefetch_cols()), - set([users.c.some_update]) - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed2', 'im the update'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(set(ret.prefetch_cols()), set([users.c.some_update])) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (8, 'ed2', 'im the update'), + (9, 'fred', 'value')] + self._assert_users(users, expected) @testing.only_on('mysql', 'Multi table update') def test_no_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + ret = testing.db.execute( - addresses.update().\ - values({ - 'email_address':users.c.name, - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - ret.prefetch_cols(),[] - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - # users table not actually updated, - # so no onupdate - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed', 'value'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values({'email_address': users.c.name}). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(ret.prefetch_cols(), []) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + # users table not actually updated, so no onupdate + expected = [ + (8, 'ed', 'value'), + (9, 'fred', 'value')] + self._assert_users(users, expected) + + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) |