summaryrefslogtreecommitdiff
path: root/test/dialect/mysql/test_on_duplicate.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/mysql/test_on_duplicate.py')
-rw-r--r--test/dialect/mysql/test_on_duplicate.py263
1 files changed, 126 insertions, 137 deletions
diff --git a/test/dialect/mysql/test_on_duplicate.py b/test/dialect/mysql/test_on_duplicate.py
index ed88121a5..dc86aaeb0 100644
--- a/test/dialect/mysql/test_on_duplicate.py
+++ b/test/dialect/mysql/test_on_duplicate.py
@@ -5,7 +5,6 @@ from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
-from sqlalchemy import testing
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
@@ -47,155 +46,145 @@ class OnDuplicateTest(fixtures.TablesTest):
{"id": 2, "bar": "baz"},
)
- def test_on_duplicate_key_update_multirow(self):
+ def test_on_duplicate_key_update_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
-
- result = conn.execute(stmt)
-
- # multirow, so its ambiguous. this is a behavioral change
- # in 1.4
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
+
+ result = conn.execute(stmt)
+
+ # multirow, so its ambiguous. this is a behavioral change
+ # in 1.4
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab", "bz", False)],
+ )
- def test_on_duplicate_key_update_singlerow(self):
+ def test_on_duplicate_key_update_singlerow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(dict(id=2, bar="b"))
- stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
-
- result = conn.execute(stmt)
-
- # only one row in the INSERT so we do inserted_primary_key
- eq_(result.inserted_primary_key, (2,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "b", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values(dict(id=2, bar="b"))
+ stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
+
+ result = conn.execute(stmt)
+
+ # only one row in the INSERT so we do inserted_primary_key
+ eq_(result.inserted_primary_key, (2,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "b", "bz", False)],
+ )
- def test_on_duplicate_key_update_null_multirow(self):
+ def test_on_duplicate_key_update_null_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(updated_once=None)
- result = conn.execute(stmt)
-
- # ambiguous
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "b", "bz", None)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(updated_once=None)
+ result = conn.execute(stmt)
+
+ # ambiguous
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "b", "bz", None)],
+ )
- def test_on_duplicate_key_update_expression_multirow(self):
+ def test_on_duplicate_key_update_expression_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(
- bar=func.concat(stmt.inserted.bar, "_foo")
- )
- result = conn.execute(stmt)
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab_foo", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(
+ bar=func.concat(stmt.inserted.bar, "_foo")
+ )
+ result = conn.execute(stmt)
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab_foo", "bz", False)],
+ )
- def test_on_duplicate_key_update_preserve_order(self):
+ def test_on_duplicate_key_update_preserve_order(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(
- insert(
- foos,
- [
- dict(id=1, bar="b", baz="bz"),
- dict(id=2, bar="b", baz="bz2"),
- ],
- )
- )
-
- stmt = insert(foos)
- update_condition = foos.c.updated_once == False
-
- # The following statements show importance of the columns update
- # ordering as old values being referenced in UPDATE clause are
- # getting replaced one by one from left to right with their new
- # values.
- stmt1 = stmt.on_duplicate_key_update(
+ conn = connection
+ conn.execute(
+ insert(
+ foos,
[
- (
- "bar",
- func.if_(
- update_condition,
- func.values(foos.c.bar),
- foos.c.bar,
- ),
- ),
- (
- "updated_once",
- func.if_(update_condition, True, foos.c.updated_once),
- ),
- ]
+ dict(id=1, bar="b", baz="bz"),
+ dict(id=2, bar="b", baz="bz2"),
+ ],
)
- stmt2 = stmt.on_duplicate_key_update(
- [
- (
- "updated_once",
- func.if_(update_condition, True, foos.c.updated_once),
+ )
+
+ stmt = insert(foos)
+ update_condition = foos.c.updated_once == False
+
+ # The following statements show importance of the columns update
+ # ordering as old values being referenced in UPDATE clause are
+ # getting replaced one by one from left to right with their new
+ # values.
+ stmt1 = stmt.on_duplicate_key_update(
+ [
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
),
- (
- "bar",
- func.if_(
- update_condition,
- func.values(foos.c.bar),
- foos.c.bar,
- ),
+ ),
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ ]
+ )
+ stmt2 = stmt.on_duplicate_key_update(
+ [
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
),
- ]
- )
- # First statement should succeed updating column bar
- conn.execute(stmt1, dict(id=1, bar="ab"))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab", "bz", True)],
- )
- # Second statement will do noop update of column bar
- conn.execute(stmt2, dict(id=2, bar="ab"))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
- [(2, "b", "bz2", True)],
- )
+ ),
+ ]
+ )
+ # First statement should succeed updating column bar
+ conn.execute(stmt1, dict(id=1, bar="ab"))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab", "bz", True)],
+ )
+ # Second statement will do noop update of column bar
+ conn.execute(stmt2, dict(id=2, bar="ab"))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
+ [(2, "b", "bz2", True)],
+ )
- def test_last_inserted_id(self):
+ def test_last_inserted_id(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- stmt = insert(foos).values({"bar": "b", "baz": "bz"})
- result = conn.execute(
- stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz"
- )
- )
- eq_(result.inserted_primary_key, (1,))
+ conn = connection
+ stmt = insert(foos).values({"bar": "b", "baz": "bz"})
+ result = conn.execute(
+ stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
+ )
+ eq_(result.inserted_primary_key, (1,))
- stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
- result = conn.execute(
- stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz"
- )
- )
- eq_(result.inserted_primary_key, (1,))
+ stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
+ result = conn.execute(
+ stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
+ )
+ eq_(result.inserted_primary_key, (1,))