diff options
Diffstat (limited to 'test/dialect/mysql/test_on_duplicate.py')
-rw-r--r-- | test/dialect/mysql/test_on_duplicate.py | 99 |
1 files changed, 67 insertions, 32 deletions
diff --git a/test/dialect/mysql/test_on_duplicate.py b/test/dialect/mysql/test_on_duplicate.py index 376f9a9af..04072d4a9 100644 --- a/test/dialect/mysql/test_on_duplicate.py +++ b/test/dialect/mysql/test_on_duplicate.py @@ -6,84 +6,117 @@ from sqlalchemy import Table, Column, Boolean, Integer, String, func class OnDuplicateTest(fixtures.TablesTest): - __only_on__ = 'mysql', + __only_on__ = ("mysql",) __backend__ = True - run_define_tables = 'each' + run_define_tables = "each" @classmethod def define_tables(cls, metadata): Table( - 'foos', metadata, - Column('id', Integer, primary_key=True, autoincrement=True), - Column('bar', String(10)), - Column('baz', String(10)), - Column('updated_once', Boolean, default=False), + "foos", + metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("bar", String(10)), + Column("baz", String(10)), + Column("updated_once", Boolean, default=False), ) def test_bad_args(self): assert_raises( ValueError, - insert(self.tables.foos, values={}).on_duplicate_key_update + insert(self.tables.foos, values={}).on_duplicate_key_update, ) assert_raises( exc.ArgumentError, insert(self.tables.foos, values={}).on_duplicate_key_update, - {'id': 1, 'bar': 'b'}, + {"id": 1, "bar": "b"}, id=1, - bar='b', + bar="b", ) assert_raises( exc.ArgumentError, insert(self.tables.foos, values={}).on_duplicate_key_update, - {'id': 1, 'bar': 'b'}, - {'id': 2, 'bar': 'baz'}, + {"id": 1, "bar": "b"}, + {"id": 2, "bar": "baz"}, ) def test_on_duplicate_key_update(self): foos = self.tables.foos with testing.db.connect() as conn: - conn.execute(insert(foos, dict(id=1, bar='b', baz='bz'))) + conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) stmt = insert(foos).values( - [dict(id=1, bar='ab'), dict(id=2, bar='b')]) + [dict(id=1, bar="ab"), dict(id=2, bar="b")] + ) stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) result = conn.execute(stmt) eq_(result.inserted_primary_key, [2]) eq_( conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, 'ab', 'bz', False)] + [(1, "ab", "bz", False)], ) def test_on_duplicate_key_update_preserve_order(self): foos = self.tables.foos with testing.db.connect() as conn: - conn.execute(insert(foos, - [dict(id=1, bar='b', baz='bz'), dict(id=2, bar='b', baz='bz2')])) + conn.execute( + insert( + foos, + [ + dict(id=1, bar="b", baz="bz"), + dict(id=2, bar="b", baz="bz2"), + ], + ) + ) stmt = insert(foos) - update_condition = (foos.c.updated_once == False) + update_condition = foos.c.updated_once == False # The following statements show importance of the columns update ordering # as old values being referenced in UPDATE clause are getting replaced one # by one from left to right with their new values. - stmt1 = stmt.on_duplicate_key_update([ - ('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)), - ('updated_once', func.if_(update_condition, True, foos.c.updated_once)), - ]) - stmt2 = stmt.on_duplicate_key_update([ - ('updated_once', func.if_(update_condition, True, foos.c.updated_once)), - ('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)), - ]) + stmt1 = stmt.on_duplicate_key_update( + [ + ( + "bar", + func.if_( + update_condition, + func.values(foos.c.bar), + foos.c.bar, + ), + ), + ( + "updated_once", + func.if_(update_condition, True, foos.c.updated_once), + ), + ] + ) + stmt2 = stmt.on_duplicate_key_update( + [ + ( + "updated_once", + func.if_(update_condition, True, foos.c.updated_once), + ), + ( + "bar", + func.if_( + update_condition, + func.values(foos.c.bar), + foos.c.bar, + ), + ), + ] + ) # First statement should succeed updating column bar - conn.execute(stmt1, dict(id=1, bar='ab')) + conn.execute(stmt1, dict(id=1, bar="ab")) eq_( conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, 'ab', 'bz', True)], + [(1, "ab", "bz", True)], ) # Second statement will do noop update of column bar - conn.execute(stmt2, dict(id=2, bar='ab')) + conn.execute(stmt2, dict(id=2, bar="ab")) eq_( conn.execute(foos.select().where(foos.c.id == 2)).fetchall(), - [(2, 'b', 'bz2', True)] + [(2, "b", "bz2", True)], ) def test_last_inserted_id(self): @@ -92,13 +125,15 @@ class OnDuplicateTest(fixtures.TablesTest): stmt = insert(foos).values({"bar": "b", "baz": "bz"}) result = conn.execute( stmt.on_duplicate_key_update( - bar=stmt.inserted.bar, baz="newbz") + bar=stmt.inserted.bar, baz="newbz" + ) ) eq_(result.inserted_primary_key, [1]) stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"}) result = conn.execute( stmt.on_duplicate_key_update( - bar=stmt.inserted.bar, baz="newbz") + bar=stmt.inserted.bar, baz="newbz" + ) ) eq_(result.inserted_primary_key, [1]) |