summaryrefslogtreecommitdiff
path: root/test/dialect/mysql/test_on_duplicate.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/mysql/test_on_duplicate.py')
-rw-r--r--test/dialect/mysql/test_on_duplicate.py99
1 files changed, 67 insertions, 32 deletions
diff --git a/test/dialect/mysql/test_on_duplicate.py b/test/dialect/mysql/test_on_duplicate.py
index 376f9a9af..04072d4a9 100644
--- a/test/dialect/mysql/test_on_duplicate.py
+++ b/test/dialect/mysql/test_on_duplicate.py
@@ -6,84 +6,117 @@ from sqlalchemy import Table, Column, Boolean, Integer, String, func
class OnDuplicateTest(fixtures.TablesTest):
- __only_on__ = 'mysql',
+ __only_on__ = ("mysql",)
__backend__ = True
- run_define_tables = 'each'
+ run_define_tables = "each"
@classmethod
def define_tables(cls, metadata):
Table(
- 'foos', metadata,
- Column('id', Integer, primary_key=True, autoincrement=True),
- Column('bar', String(10)),
- Column('baz', String(10)),
- Column('updated_once', Boolean, default=False),
+ "foos",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("bar", String(10)),
+ Column("baz", String(10)),
+ Column("updated_once", Boolean, default=False),
)
def test_bad_args(self):
assert_raises(
ValueError,
- insert(self.tables.foos, values={}).on_duplicate_key_update
+ insert(self.tables.foos, values={}).on_duplicate_key_update,
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos, values={}).on_duplicate_key_update,
- {'id': 1, 'bar': 'b'},
+ {"id": 1, "bar": "b"},
id=1,
- bar='b',
+ bar="b",
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos, values={}).on_duplicate_key_update,
- {'id': 1, 'bar': 'b'},
- {'id': 2, 'bar': 'baz'},
+ {"id": 1, "bar": "b"},
+ {"id": 2, "bar": "baz"},
)
def test_on_duplicate_key_update(self):
foos = self.tables.foos
with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar='b', baz='bz')))
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
stmt = insert(foos).values(
- [dict(id=1, bar='ab'), dict(id=2, bar='b')])
+ [dict(id=1, bar="ab"), dict(id=2, bar="b")]
+ )
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
eq_(result.inserted_primary_key, [2])
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, 'ab', 'bz', False)]
+ [(1, "ab", "bz", False)],
)
def test_on_duplicate_key_update_preserve_order(self):
foos = self.tables.foos
with testing.db.connect() as conn:
- conn.execute(insert(foos,
- [dict(id=1, bar='b', baz='bz'), dict(id=2, bar='b', baz='bz2')]))
+ conn.execute(
+ insert(
+ foos,
+ [
+ dict(id=1, bar="b", baz="bz"),
+ dict(id=2, bar="b", baz="bz2"),
+ ],
+ )
+ )
stmt = insert(foos)
- update_condition = (foos.c.updated_once == False)
+ update_condition = foos.c.updated_once == False
# The following statements show importance of the columns update ordering
# as old values being referenced in UPDATE clause are getting replaced one
# by one from left to right with their new values.
- stmt1 = stmt.on_duplicate_key_update([
- ('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)),
- ('updated_once', func.if_(update_condition, True, foos.c.updated_once)),
- ])
- stmt2 = stmt.on_duplicate_key_update([
- ('updated_once', func.if_(update_condition, True, foos.c.updated_once)),
- ('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)),
- ])
+ stmt1 = stmt.on_duplicate_key_update(
+ [
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
+ ),
+ ),
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ ]
+ )
+ stmt2 = stmt.on_duplicate_key_update(
+ [
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
+ ),
+ ),
+ ]
+ )
# First statement should succeed updating column bar
- conn.execute(stmt1, dict(id=1, bar='ab'))
+ conn.execute(stmt1, dict(id=1, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, 'ab', 'bz', True)],
+ [(1, "ab", "bz", True)],
)
# Second statement will do noop update of column bar
- conn.execute(stmt2, dict(id=2, bar='ab'))
+ conn.execute(stmt2, dict(id=2, bar="ab"))
eq_(
conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
- [(2, 'b', 'bz2', True)]
+ [(2, "b", "bz2", True)],
)
def test_last_inserted_id(self):
@@ -92,13 +125,15 @@ class OnDuplicateTest(fixtures.TablesTest):
stmt = insert(foos).values({"bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz")
+ bar=stmt.inserted.bar, baz="newbz"
+ )
)
eq_(result.inserted_primary_key, [1])
stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz")
+ bar=stmt.inserted.bar, baz="newbz"
+ )
)
eq_(result.inserted_primary_key, [1])