diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-17 10:55:08 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-06-27 21:30:37 -0400 |
commit | 08c46eea924d23a234bf3feea1a928eb8ae8a00a (patch) | |
tree | 3795e1d04fa0e35c1e93080320b43c8fe0ed792e /test/dialect/postgresql/test_dialect.py | |
parent | 2d9387354f11da322c516412eb5dfe937163c90b (diff) | |
download | sqlalchemy-08c46eea924d23a234bf3feea1a928eb8ae8a00a.tar.gz |
ORM executemany returning
Build on #5401 to allow the ORM to take advanage
of executemany INSERT + RETURNING.
Implemented the feature
updated tests
to support INSERT DEFAULT VALUES, needed to come up with
a new syntax for compiler INSERT INTO table (anycol) VALUES (DEFAULT)
which can then be iterated out for executemany.
Added graceful degrade to plain executemany for PostgreSQL <= 8.2
Renamed EXECUTEMANY_DEFAULT to EXECUTEMANY_PLAIN
Fix issue where unicode identifiers or parameter names wouldn't
work with execute_values() under Py2K, because we have to
encode the statement and therefore have to encode the
insert_single_values_expr too.
Correct issue from #5401 to support executemany + return_defaults
for a PK that is explicitly pre-generated, meaning we aren't actually
getting RETURNING but need to return it from compiled_parameters.
Fixes: #5263
Change-Id: Id68e5c158c4f9ebc33b61c06a448907921c2a657
Diffstat (limited to 'test/dialect/postgresql/test_dialect.py')
-rw-r--r-- | test/dialect/postgresql/test_dialect.py | 155 |
1 files changed, 115 insertions, 40 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 1fbe870ba..cc0f3b4df 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -1,5 +1,6 @@ # coding: utf-8 import datetime +import itertools import logging import logging.handlers @@ -26,10 +27,11 @@ from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import TypeDecorator +from sqlalchemy import util from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH -from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT +from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_PLAIN from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES from sqlalchemy.engine import cursor as _cursor from sqlalchemy.engine import engine_from_config @@ -45,6 +47,8 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ from sqlalchemy.testing.assertions import eq_regex from sqlalchemy.testing.assertions import ne_ +from sqlalchemy.util import u +from sqlalchemy.util import ue from ...engine import test_execute if True: @@ -160,6 +164,7 @@ class ExecuteManyMode(object): __backend__ = True run_create_tables = "each" + run_deletes = None options = None @@ -174,6 +179,13 @@ class ExecuteManyMode(object): Column("z", Integer, server_default="5"), ) + Table( + u("Unitéble2"), + metadata, + Column(u("méil"), Integer, primary_key=True), + Column(ue("\u6e2c\u8a66"), Integer), + ) + def setup(self): super(ExecuteManyMode, self).setup() self.engine = engines.testing_engine(options=self.options) @@ -343,6 +355,22 @@ class ExecuteManyMode(object): ], ) + def test_insert_unicode_keys(self, connection): + table = self.tables[u("Unitéble2")] + + stmt = table.insert() + + connection.execute( + stmt, + [ + {u("méil"): 1, ue("\u6e2c\u8a66"): 1}, + {u("méil"): 2, ue("\u6e2c\u8a66"): 2}, + {u("méil"): 3, ue("\u6e2c\u8a66"): 3}, + ], + ) + + eq_(connection.execute(table.select()).all(), [(1, 1), (2, 2), (3, 3)]) + def test_update_fallback(self): from psycopg2 import extras @@ -423,57 +451,104 @@ class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest): class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest): options = {"executemany_mode": "values_only"} - def test_insert_returning_values(self): + def test_insert_returning_values(self, connection): """the psycopg2 dialect needs to assemble a fully buffered result with the return value of execute_values(). """ t = self.tables.data - with self.engine.connect() as conn: - page_size = conn.dialect.executemany_values_page_size or 100 - data = [ - {"x": "x%d" % i, "y": "y%d" % i} - for i in range(1, page_size * 5 + 27) - ] - result = conn.execute(t.insert().returning(t.c.x, t.c.y), data) - - eq_([tup[0] for tup in result.cursor.description], ["x", "y"]) - eq_(result.keys(), ["x", "y"]) - assert t.c.x in result.keys() - assert t.c.id not in result.keys() - assert not result._soft_closed - assert isinstance( - result.cursor_strategy, - _cursor.FullyBufferedCursorFetchStrategy, - ) - assert not result.cursor.closed - assert not result.closed - eq_(result.mappings().all(), data) + conn = connection + page_size = conn.dialect.executemany_values_page_size or 100 + data = [ + {"x": "x%d" % i, "y": "y%d" % i} + for i in range(1, page_size * 5 + 27) + ] + result = conn.execute(t.insert().returning(t.c.x, t.c.y), data) + + eq_([tup[0] for tup in result.cursor.description], ["x", "y"]) + eq_(result.keys(), ["x", "y"]) + assert t.c.x in result.keys() + assert t.c.id not in result.keys() + assert not result._soft_closed + assert isinstance( + result.cursor_strategy, _cursor.FullyBufferedCursorFetchStrategy, + ) + assert not result.cursor.closed + assert not result.closed + eq_(result.mappings().all(), data) + + assert result._soft_closed + # assert result.closed + assert result.cursor is None + + @testing.provide_metadata + def test_insert_returning_preexecute_pk(self, connection): + counter = itertools.count(1) + + t = Table( + "t", + self.metadata, + Column( + "id", + Integer, + primary_key=True, + default=lambda: util.next(counter), + ), + Column("data", Integer), + ) + self.metadata.create_all(connection) + + result = connection.execute( + t.insert().return_defaults(), + [{"data": 1}, {"data": 2}, {"data": 3}], + ) - assert result._soft_closed - # assert result.closed - assert result.cursor is None + eq_(result.inserted_primary_key_rows, [(1,), (2,), (3,)]) - def test_insert_returning_defaults(self): + def test_insert_returning_defaults(self, connection): t = self.tables.data - with self.engine.connect() as conn: + conn = connection - result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) - first_pk = result.inserted_primary_key[0] + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] - page_size = conn.dialect.executemany_values_page_size or 100 - total_rows = page_size * 5 + 27 - data = [ - {"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows) - ] - result = conn.execute(t.insert().returning(t.c.id, t.c.z), data) + page_size = conn.dialect.executemany_values_page_size or 100 + total_rows = page_size * 5 + 27 + data = [{"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id, t.c.z), data) - eq_( - result.all(), - [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)], - ) + eq_( + result.all(), + [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)], + ) + + def test_insert_return_pks_default_values(self, connection): + """test sending multiple, empty rows into an INSERT and getting primary + key values back. + + This has to use a format that indicates at least one DEFAULT in + multiple parameter sets, i.e. "INSERT INTO table (anycol) VALUES + (DEFAULT) (DEFAULT) (DEFAULT) ... RETURNING col" + + """ + t = self.tables.data + + conn = connection + + result = conn.execute(t.insert(), {"x": "x0", "y": "y0"}) + first_pk = result.inserted_primary_key[0] + + page_size = conn.dialect.executemany_values_page_size or 100 + total_rows = page_size * 5 + 27 + data = [{} for i in range(1, total_rows)] + result = conn.execute(t.insert().returning(t.c.id), data) + + eq_( + result.all(), + [(pk,) for pk in range(1 + first_pk, total_rows + first_pk)], + ) def test_insert_w_newlines(self): from psycopg2 import extras @@ -611,7 +686,7 @@ class ExecutemanyFlagOptionsTest(fixtures.TablesTest): def test_executemany_correct_flag_options(self): for opt, expected in [ - (None, EXECUTEMANY_DEFAULT), + (None, EXECUTEMANY_PLAIN), ("batch", EXECUTEMANY_BATCH), ("values_only", EXECUTEMANY_VALUES), ("values_plus_batch", EXECUTEMANY_VALUES_PLUS_BATCH), |