summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite/test_insert.py
diff options
context:
space:
mode:
authorGord Thompson <gord@gordthompson.com>2020-04-11 10:52:15 -0600
committerGord Thompson <gord@gordthompson.com>2020-04-11 11:25:13 -0600
commitaa07b7a8144cc10dede80efbafcc19cbe7af8d01 (patch)
tree15a17c506a84c7122dfd3c15591bf6c6016478cd /lib/sqlalchemy/testing/suite/test_insert.py
parent6e1cbe426479438f1fc12ce6ae9424cf5d191ddf (diff)
downloadsqlalchemy-aa07b7a8144cc10dede80efbafcc19cbe7af8d01.tar.gz
Clean up .execute calls in test_insert.py
Change-Id: I87e144d00577b53baab203c675caeaf8ad46cb5c
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_insert.py')
-rw-r--r--lib/sqlalchemy/testing/suite/test_insert.py100
1 files changed, 48 insertions, 52 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py
index 931b0ef65..f449b2fe6 100644
--- a/lib/sqlalchemy/testing/suite/test_insert.py
+++ b/lib/sqlalchemy/testing/suite/test_insert.py
@@ -41,28 +41,28 @@ class LastrowidTest(fixtures.TablesTest):
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+ eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
- def test_autoincrement_on_insert(self):
+ def test_autoincrement_on_insert(self, connection):
- config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
- self._assert_round_trip(self.tables.autoinc_pk, config.db)
+ connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
- def test_last_inserted_id(self):
+ def test_last_inserted_id(self, connection):
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])
@requirements.dbapi_lastrowid
- def test_native_lastrowid_autoinc(self):
- r = config.db.execute(
+ def test_native_lastrowid_autoinc(self, connection):
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
lastrowid = r.lastrowid
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(lastrowid, pk)
@@ -117,8 +117,8 @@ class InsertBehaviorTest(fixtures.TablesTest):
assert not r.returns_rows
@requirements.returning
- def test_autoclose_on_insert_implicit_returning(self):
- r = config.db.execute(
+ def test_autoclose_on_insert_implicit_returning(self, connection):
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
assert r._soft_closed
@@ -127,12 +127,12 @@ class InsertBehaviorTest(fixtures.TablesTest):
assert not r.returns_rows
@requirements.empty_inserts
- def test_empty_insert(self):
- r = config.db.execute(self.tables.autoinc_pk.insert())
+ def test_empty_insert(self, connection):
+ r = connection.execute(self.tables.autoinc_pk.insert())
assert r._soft_closed
assert not r.closed
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.select().where(
self.tables.autoinc_pk.c.id != None
)
@@ -141,10 +141,10 @@ class InsertBehaviorTest(fixtures.TablesTest):
assert len(r.fetchall())
@requirements.insert_from_select
- def test_insert_from_select_autoinc(self):
+ def test_insert_from_select_autoinc(self, connection):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
- config.db.execute(
+ connection.execute(
src_table.insert(),
[
dict(id=1, data="data1"),
@@ -153,7 +153,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
],
)
- result = config.db.execute(
+ result = connection.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
@@ -164,17 +164,17 @@ class InsertBehaviorTest(fixtures.TablesTest):
eq_(result.inserted_primary_key, [None])
- result = config.db.execute(
+ result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [("data2",), ("data3",)])
@requirements.insert_from_select
- def test_insert_from_select_autoinc_no_rows(self):
+ def test_insert_from_select_autoinc_no_rows(self, connection):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
- result = config.db.execute(
+ result = connection.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
@@ -184,16 +184,16 @@ class InsertBehaviorTest(fixtures.TablesTest):
)
eq_(result.inserted_primary_key, [None])
- result = config.db.execute(
+ result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [])
@requirements.insert_from_select
- def test_insert_from_select(self):
+ def test_insert_from_select(self, connection):
table = self.tables.manual_pk
- config.db.execute(
+ connection.execute(
table.insert(),
[
dict(id=1, data="data1"),
@@ -202,7 +202,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
],
)
- config.db.execute(
+ connection.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
@@ -212,16 +212,16 @@ class InsertBehaviorTest(fixtures.TablesTest):
)
eq_(
- config.db.execute(
+ connection.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
)
@requirements.insert_from_select
- def test_insert_from_select_with_defaults(self):
+ def test_insert_from_select_with_defaults(self, connection):
table = self.tables.includes_defaults
- config.db.execute(
+ connection.execute(
table.insert(),
[
dict(id=1, data="data1"),
@@ -230,7 +230,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
],
)
- config.db.execute(
+ connection.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
@@ -240,7 +240,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
)
eq_(
- config.db.execute(
+ connection.execute(
select([table]).order_by(table.c.data, table.c.id)
).fetchall(),
[
@@ -262,7 +262,7 @@ class ReturningTest(fixtures.TablesTest):
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+ eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
@classmethod
def define_tables(cls, metadata):
@@ -276,39 +276,35 @@ class ReturningTest(fixtures.TablesTest):
)
@requirements.fetch_rows_post_commit
- def test_explicit_returning_pk_autocommit(self):
- engine = config.db
+ def test_explicit_returning_pk_autocommit(self, connection):
table = self.tables.autoinc_pk
- with engine.begin() as conn:
- r = conn.execute(
- table.insert().returning(table.c.id), data="some data"
- )
+ r = connection.execute(
+ table.insert().returning(table.c.id), data="some data"
+ )
pk = r.first()[0]
- fetched_pk = config.db.scalar(select([table.c.id]))
+ fetched_pk = connection.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
- def test_explicit_returning_pk_no_autocommit(self):
- engine = config.db
+ def test_explicit_returning_pk_no_autocommit(self, connection):
table = self.tables.autoinc_pk
- with engine.begin() as conn:
- r = conn.execute(
- table.insert().returning(table.c.id), data="some data"
- )
- pk = r.first()[0]
- fetched_pk = config.db.scalar(select([table.c.id]))
+ r = connection.execute(
+ table.insert().returning(table.c.id), data="some data"
+ )
+ pk = r.first()[0]
+ fetched_pk = connection.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
- def test_autoincrement_on_insert_implicit_returning(self):
+ def test_autoincrement_on_insert_implicit_returning(self, connection):
- config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
- self._assert_round_trip(self.tables.autoinc_pk, config.db)
+ connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
- def test_last_inserted_id_implicit_returning(self):
+ def test_last_inserted_id_implicit_returning(self, connection):
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])