summaryrefslogtreecommitdiff
path: root/test/dialect/mysql/test_on_duplicate.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-11-15 16:58:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2020-12-11 13:26:05 -0500
commitba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch)
tree038f2263d581d5e49d74731af68febc4bf64eb19 /test/dialect/mysql/test_on_duplicate.py
parent87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff)
downloadsqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or within tests. Also includes fixes for SQL Server full text tests which apparently have not been working at all for a long time, as it used long removed APIs. CI has not had fulltext running for some years and is now installed. Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/dialect/mysql/test_on_duplicate.py')
-rw-r--r--test/dialect/mysql/test_on_duplicate.py263
1 files changed, 126 insertions, 137 deletions
diff --git a/test/dialect/mysql/test_on_duplicate.py b/test/dialect/mysql/test_on_duplicate.py
index ed88121a5..dc86aaeb0 100644
--- a/test/dialect/mysql/test_on_duplicate.py
+++ b/test/dialect/mysql/test_on_duplicate.py
@@ -5,7 +5,6 @@ from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
-from sqlalchemy import testing
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
@@ -47,155 +46,145 @@ class OnDuplicateTest(fixtures.TablesTest):
{"id": 2, "bar": "baz"},
)
- def test_on_duplicate_key_update_multirow(self):
+ def test_on_duplicate_key_update_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
-
- result = conn.execute(stmt)
-
- # multirow, so its ambiguous. this is a behavioral change
- # in 1.4
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
+
+ result = conn.execute(stmt)
+
+ # multirow, so its ambiguous. this is a behavioral change
+ # in 1.4
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab", "bz", False)],
+ )
- def test_on_duplicate_key_update_singlerow(self):
+ def test_on_duplicate_key_update_singlerow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(dict(id=2, bar="b"))
- stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
-
- result = conn.execute(stmt)
-
- # only one row in the INSERT so we do inserted_primary_key
- eq_(result.inserted_primary_key, (2,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "b", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values(dict(id=2, bar="b"))
+ stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
+
+ result = conn.execute(stmt)
+
+ # only one row in the INSERT so we do inserted_primary_key
+ eq_(result.inserted_primary_key, (2,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "b", "bz", False)],
+ )
- def test_on_duplicate_key_update_null_multirow(self):
+ def test_on_duplicate_key_update_null_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(updated_once=None)
- result = conn.execute(stmt)
-
- # ambiguous
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "b", "bz", None)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(updated_once=None)
+ result = conn.execute(stmt)
+
+ # ambiguous
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "b", "bz", None)],
+ )
- def test_on_duplicate_key_update_expression_multirow(self):
+ def test_on_duplicate_key_update_expression_multirow(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
- stmt = insert(foos).values(
- [dict(id=1, bar="ab"), dict(id=2, bar="b")]
- )
- stmt = stmt.on_duplicate_key_update(
- bar=func.concat(stmt.inserted.bar, "_foo")
- )
- result = conn.execute(stmt)
- eq_(result.inserted_primary_key, (None,))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab_foo", "bz", False)],
- )
+ conn = connection
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values([dict(id=1, bar="ab"), dict(id=2, bar="b")])
+ stmt = stmt.on_duplicate_key_update(
+ bar=func.concat(stmt.inserted.bar, "_foo")
+ )
+ result = conn.execute(stmt)
+ eq_(result.inserted_primary_key, (None,))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab_foo", "bz", False)],
+ )
- def test_on_duplicate_key_update_preserve_order(self):
+ def test_on_duplicate_key_update_preserve_order(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- conn.execute(
- insert(
- foos,
- [
- dict(id=1, bar="b", baz="bz"),
- dict(id=2, bar="b", baz="bz2"),
- ],
- )
- )
-
- stmt = insert(foos)
- update_condition = foos.c.updated_once == False
-
- # The following statements show importance of the columns update
- # ordering as old values being referenced in UPDATE clause are
- # getting replaced one by one from left to right with their new
- # values.
- stmt1 = stmt.on_duplicate_key_update(
+ conn = connection
+ conn.execute(
+ insert(
+ foos,
[
- (
- "bar",
- func.if_(
- update_condition,
- func.values(foos.c.bar),
- foos.c.bar,
- ),
- ),
- (
- "updated_once",
- func.if_(update_condition, True, foos.c.updated_once),
- ),
- ]
+ dict(id=1, bar="b", baz="bz"),
+ dict(id=2, bar="b", baz="bz2"),
+ ],
)
- stmt2 = stmt.on_duplicate_key_update(
- [
- (
- "updated_once",
- func.if_(update_condition, True, foos.c.updated_once),
+ )
+
+ stmt = insert(foos)
+ update_condition = foos.c.updated_once == False
+
+ # The following statements show importance of the columns update
+ # ordering as old values being referenced in UPDATE clause are
+ # getting replaced one by one from left to right with their new
+ # values.
+ stmt1 = stmt.on_duplicate_key_update(
+ [
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
),
- (
- "bar",
- func.if_(
- update_condition,
- func.values(foos.c.bar),
- foos.c.bar,
- ),
+ ),
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ ]
+ )
+ stmt2 = stmt.on_duplicate_key_update(
+ [
+ (
+ "updated_once",
+ func.if_(update_condition, True, foos.c.updated_once),
+ ),
+ (
+ "bar",
+ func.if_(
+ update_condition,
+ func.values(foos.c.bar),
+ foos.c.bar,
),
- ]
- )
- # First statement should succeed updating column bar
- conn.execute(stmt1, dict(id=1, bar="ab"))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
- [(1, "ab", "bz", True)],
- )
- # Second statement will do noop update of column bar
- conn.execute(stmt2, dict(id=2, bar="ab"))
- eq_(
- conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
- [(2, "b", "bz2", True)],
- )
+ ),
+ ]
+ )
+ # First statement should succeed updating column bar
+ conn.execute(stmt1, dict(id=1, bar="ab"))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "ab", "bz", True)],
+ )
+ # Second statement will do noop update of column bar
+ conn.execute(stmt2, dict(id=2, bar="ab"))
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
+ [(2, "b", "bz2", True)],
+ )
- def test_last_inserted_id(self):
+ def test_last_inserted_id(self, connection):
foos = self.tables.foos
- with testing.db.connect() as conn:
- stmt = insert(foos).values({"bar": "b", "baz": "bz"})
- result = conn.execute(
- stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz"
- )
- )
- eq_(result.inserted_primary_key, (1,))
+ conn = connection
+ stmt = insert(foos).values({"bar": "b", "baz": "bz"})
+ result = conn.execute(
+ stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
+ )
+ eq_(result.inserted_primary_key, (1,))
- stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
- result = conn.execute(
- stmt.on_duplicate_key_update(
- bar=stmt.inserted.bar, baz="newbz"
- )
- )
- eq_(result.inserted_primary_key, (1,))
+ stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
+ result = conn.execute(
+ stmt.on_duplicate_key_update(bar=stmt.inserted.bar, baz="newbz")
+ )
+ eq_(result.inserted_primary_key, (1,))