diff options
Diffstat (limited to 'test/dialect/mysql/test_on_duplicate.py')
-rw-r--r-- | test/dialect/mysql/test_on_duplicate.py | 263 |
1 files changed, 126 insertions, 137 deletions
diff --git a/test/dialect/mysql/test_on_duplicate.py b/test/dialect/mysql/test_on_duplicate.py index ed88121a5..dc86aaeb0 100644 --- a/test/dialect/mysql/test_on_duplicate.py +++ b/test/dialect/mysql/test_on_duplicate.py @@ -5,7 +5,6 @@ from sqlalchemy import func from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy import Table -from sqlalchemy import testing from sqlalchemy.dialects.mysql import insert from sqlalchemy.testing import fixtures from sqlalchemy.testing.assertions import assert_raises @@ -47,155 +46,145 @@ class OnDuplicateTest(fixtures.TablesTest): {"id": 2, "bar": "baz"}, ) - def test_on_duplicate_key_update_multirow(self): + def test_on_duplicate_key_update_multirow(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) - stmt = insert(foos).values( - [dict(id=1, bar="ab"), dict(id=2, bar="b")] - ) - stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) - - result = conn.execute(stmt) - - # multirow, so its ambiguous. this is a behavioral change - # in 1.4 - eq_(result.inserted_primary_key, (None,)) - eq_( - conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, "ab", "bz", False)], - ) + conn = connection + conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) + stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")]) + stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) + + result = conn.execute(stmt) + + # multirow, so its ambiguous. this is a behavioral change + # in 1.4 + eq_(result.inserted_primary_key, (None,)) + eq_( + conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), + [(1, "ab", "bz", False)], + ) - def test_on_duplicate_key_update_singlerow(self): + def test_on_duplicate_key_update_singlerow(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) - stmt = insert(foos).values(dict(id=2, bar="b")) - stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) - - result = conn.execute(stmt) - - # only one row in the INSERT so we do inserted_primary_key - eq_(result.inserted_primary_key, (2,)) - eq_( - conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, "b", "bz", False)], - ) + conn = connection + conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) + stmt = insert(foos).values(dict(id=2, bar="b")) + stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar) + + result = conn.execute(stmt) + + # only one row in the INSERT so we do inserted_primary_key + eq_(result.inserted_primary_key, (2,)) + eq_( + conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), + [(1, "b", "bz", False)], + ) - def test_on_duplicate_key_update_null_multirow(self): + def test_on_duplicate_key_update_null_multirow(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) - stmt = insert(foos).values( - [dict(id=1, bar="ab"), dict(id=2, bar="b")] - ) - stmt = stmt.on_duplicate_key_update(updated_once=None) - result = conn.execute(stmt) - - # ambiguous - eq_(result.inserted_primary_key, (None,)) - eq_( - conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, "b", "bz", None)], - ) + conn = connection + conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) + stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")]) + stmt = stmt.on_duplicate_key_update(updated_once=None) + result = conn.execute(stmt) + + # ambiguous + eq_(result.inserted_primary_key, (None,)) + eq_( + conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), + [(1, "b", "bz", None)], + ) - def test_on_duplicate_key_update_expression_multirow(self): + def test_on_duplicate_key_update_expression_multirow(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) - stmt = insert(foos).values( - [dict(id=1, bar="ab"), dict(id=2, bar="b")] - ) - stmt = stmt.on_duplicate_key_update( - bar=func.concat(stmt.inserted.bar, "_foo") - ) - result = conn.execute(stmt) - eq_(result.inserted_primary_key, (None,)) - eq_( - conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, "ab_foo", "bz", False)], - ) + conn = connection + conn.execute(insert(foos, dict(id=1, bar="b", baz="bz"))) + stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")]) + stmt = stmt.on_duplicate_key_update( + bar=func.concat(stmt.inserted.bar, "_foo") + ) + result = conn.execute(stmt) + eq_(result.inserted_primary_key, (None,)) + eq_( + conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), + [(1, "ab_foo", "bz", False)], + ) - def test_on_duplicate_key_update_preserve_order(self): + def test_on_duplicate_key_update_preserve_order(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - conn.execute( - insert( - foos, - [ - dict(id=1, bar="b", baz="bz"), - dict(id=2, bar="b", baz="bz2"), - ], - ) - ) - - stmt = insert(foos) - update_condition = foos.c.updated_once == False - - # The following statements show importance of the columns update - # ordering as old values being referenced in UPDATE clause are - # getting replaced one by one from left to right with their new - # values. - stmt1 = stmt.on_duplicate_key_update( + conn = connection + conn.execute( + insert( + foos, [ - ( - "bar", - func.if_( - update_condition, - func.values(foos.c.bar), - foos.c.bar, - ), - ), - ( - "updated_once", - func.if_(update_condition, True, foos.c.updated_once), - ), - ] + dict(id=1, bar="b", baz="bz"), + dict(id=2, bar="b", baz="bz2"), + ], ) - stmt2 = stmt.on_duplicate_key_update( - [ - ( - "updated_once", - func.if_(update_condition, True, foos.c.updated_once), + ) + + stmt = insert(foos) + update_condition = foos.c.updated_once == False + + # The following statements show importance of the columns update + # ordering as old values being referenced in UPDATE clause are + # getting replaced one by one from left to right with their new + # values. + stmt1 = stmt.on_duplicate_key_update( + [ + ( + "bar", + func.if_( + update_condition, + func.values(foos.c.bar), + foos.c.bar, ), - ( - "bar", - func.if_( - update_condition, - func.values(foos.c.bar), - foos.c.bar, - ), + ), + ( + "updated_once", + func.if_(update_condition, True, foos.c.updated_once), + ), + ] + ) + stmt2 = stmt.on_duplicate_key_update( + [ + ( + "updated_once", + func.if_(update_condition, True, foos.c.updated_once), + ), + ( + "bar", + func.if_( + update_condition, + func.values(foos.c.bar), + foos.c.bar, ), - ] - ) - # First statement should succeed updating column bar - conn.execute(stmt1, dict(id=1, bar="ab")) - eq_( - conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), - [(1, "ab", "bz", True)], - ) - # Second statement will do noop update of column bar - conn.execute(stmt2, dict(id=2, bar="ab")) - eq_( - conn.execute(foos.select().where(foos.c.id == 2)).fetchall(), - [(2, "b", "bz2", True)], - ) + ), + ] + ) + # First statement should succeed updating column bar + conn.execute(stmt1, dict(id=1, bar="ab")) + eq_( + conn.execute(foos.select().where(foos.c.id == 1)).fetchall(), + [(1, "ab", "bz", True)], + ) + # Second statement will do noop update of column bar + conn.execute(stmt2, dict(id=2, bar="ab")) + eq_( + conn.execute(foos.select().where(foos.c.id == 2)).fetchall(), + [(2, "b", "bz2", True)], + ) - def test_last_inserted_id(self): + def test_last_inserted_id(self, connection): foos = self.tables.foos - with testing.db.connect() as conn: - stmt = insert(foos).values({"bar": "b", "baz": "bz"}) - result = conn.execute( - stmt.on_duplicate_key_update( - bar=stmt.inserted.bar, baz="newbz" - ) - ) - eq_(result.inserted_primary_key, (1,)) + conn = connection + stmt = insert(foos).values({"bar": "b", "baz": "bz"}) + result = conn.execute( + stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz") + ) + eq_(result.inserted_primary_key, (1,)) - stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"}) - result = conn.execute( - stmt.on_duplicate_key_update( - bar=stmt.inserted.bar, baz="newbz" - ) - ) - eq_(result.inserted_primary_key, (1,)) + stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"}) + result = conn.execute( + stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz") + ) + eq_(result.inserted_primary_key, (1,)) |