diff options
-rw-r--r-- | CHANGES | 18 | ||||
-rw-r--r-- | alembic/__init__.py | 2 | ||||
-rw-r--r-- | alembic/ddl/impl.py | 11 | ||||
-rw-r--r-- | tests/__init__.py | 2 | ||||
-rw-r--r-- | tests/test_bulk_insert.py | 123 |
5 files changed, 147 insertions, 9 deletions
@@ -1,3 +1,21 @@ +0.3.1 +===== +- [bug] bulk_insert() fixes: + + 1. bulk_insert() operation was + not working most likely since the 0.2 series + when used with an engine. #41 + 2. Repaired bulk_insert() to complete when + used against a lower-case-t table and executing + with only one set of parameters, working + around SQLAlchemy bug #2461 in this regard. + 3. bulk_insert() uses "inline=True" so that phrases + like RETURNING and such don't get invoked for + single-row bulk inserts. + 4. bulk_insert() will check that you're passing + a list of dictionaries in, raises TypeError + if not detected. + 0.3.0 ===== - [general] The focus of 0.3 is to clean up diff --git a/alembic/__init__.py b/alembic/__init__.py index c1c253c..4831217 100644 --- a/alembic/__init__.py +++ b/alembic/__init__.py @@ -1,6 +1,6 @@ from os import path -__version__ = '0.3.0' +__version__ = '0.3.1' package_dir = path.abspath(path.dirname(__file__)) diff --git a/alembic/ddl/impl.py b/alembic/ddl/impl.py index c87189b..0c5d794 100644 --- a/alembic/ddl/impl.py +++ b/alembic/ddl/impl.py @@ -159,14 +159,21 @@ class DefaultImpl(object): self._exec(schema.DropIndex(index)) def bulk_insert(self, table, rows): + if not isinstance(rows, list): + raise TypeError("List expected") + elif rows and not isinstance(rows[0], dict): + raise TypeError("List of dictionaries expected") if self.as_sql: for row in rows: - self._exec(table.insert().values(**dict( + self._exec(table.insert(inline=True).values(**dict( (k, _literal_bindparam(k, v, type_=table.c[k].type)) for k, v in row.items() ))) else: - self._exec(table.insert(), *rows) + # work around http://www.sqlalchemy.org/trac/ticket/2461 + if not hasattr(table, '_autoincrement_column'): + table._autoincrement_column = None + self._exec(table.insert(inline=True), multiparams=rows) def compare_type(self, inspector_column, metadata_column): diff --git a/tests/__init__.py b/tests/__init__.py index d3193ae..7f83300 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -78,6 +78,8 @@ def _get_dialect(name): except KeyError: dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name) _dialects[name] = d = dialect_mod.dialect() + if name == 'postgresql': + d.implicit_returning = True return d def assert_compiled(element, assert_string, dialect=None): diff --git a/tests/test_bulk_insert.py b/tests/test_bulk_insert.py index af8dc20..375edd0 100644 --- a/tests/test_bulk_insert.py +++ b/tests/test_bulk_insert.py @@ -1,16 +1,32 @@ -from tests import op_fixture +from tests import op_fixture, _sqlite_testing_config, eq_, assert_raises_message from alembic import op from sqlalchemy import Integer, \ UniqueConstraint, String from sqlalchemy.sql import table, column +from unittest import TestCase +from sqlalchemy import Table, Column, MetaData -def _test_bulk_insert(dialect, as_sql): +def _table_fixture(dialect, as_sql): context = op_fixture(dialect, as_sql) t1 = table("ins_table", column('id', Integer), column('v1', String()), column('v2', String()), ) + return context, t1 + +def _big_t_table_fixture(dialect, as_sql): + context = op_fixture(dialect, as_sql) + t1 = Table("ins_table", MetaData(), + Column('id', Integer, primary_key=True), + Column('v1', String()), + Column('v2', String()), + ) + return context, t1 + +def _test_bulk_insert(dialect, as_sql): + context, t1 = _table_fixture(dialect, as_sql) + op.bulk_insert(t1, [ {'id':1, 'v1':'row v1', 'v2':'row v5'}, {'id':2, 'v1':'row v2', 'v2':'row v6'}, @@ -19,6 +35,22 @@ def _test_bulk_insert(dialect, as_sql): ]) return context +def _test_bulk_insert_single(dialect, as_sql): + context, t1 = _table_fixture(dialect, as_sql) + + op.bulk_insert(t1, [ + {'id':1, 'v1':'row v1', 'v2':'row v5'}, + ]) + return context + +def _test_bulk_insert_single_bigt(dialect, as_sql): + context, t1 = _big_t_table_fixture(dialect, as_sql) + + op.bulk_insert(t1, [ + {'id':1, 'v1':'row v1', 'v2':'row v5'}, + ]) + return context + def test_bulk_insert(): context = _test_bulk_insert('default', False) context.assert_( @@ -35,10 +67,6 @@ def test_bulk_insert_wrong_cols(): op.bulk_insert(t1, [ {'v1':'row v1', }, ]) - # TODO: this is wrong because the test fixture isn't actually - # doing what the real context would do. Sending this to - # PG is going to produce a RETURNING clause. fixture would - # need to be beefed up context.assert_( 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' ) @@ -49,6 +77,24 @@ def test_bulk_insert_pg(): 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' ) +def test_bulk_insert_pg_single(): + context = _test_bulk_insert_single('postgresql', False) + context.assert_( + 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' + ) + +def test_bulk_insert_pg_single_as_sql(): + context = _test_bulk_insert_single('postgresql', True) + context.assert_( + "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')" + ) + +def test_bulk_insert_pg_single_big_t_as_sql(): + context = _test_bulk_insert_single_bigt('postgresql', True) + context.assert_( + "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')" + ) + def test_bulk_insert_mssql(): context = _test_bulk_insert('mssql', False) context.assert_( @@ -86,3 +132,68 @@ def test_bulk_insert_as_sql_mssql(): "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')", 'SET IDENTITY_INSERT ins_table OFF' ) + +def test_invalid_format(): + context, t1 = _table_fixture("sqlite", False) + assert_raises_message( + TypeError, + "List expected", + op.bulk_insert, t1, {"id":5} + ) + + assert_raises_message( + TypeError, + "List of dictionaries expected", + op.bulk_insert, t1, [(5, )] + ) + +class RoundTripTest(TestCase): + def setUp(self): + from sqlalchemy import create_engine + from alembic.migration import MigrationContext + self.conn = create_engine("sqlite://").connect() + self.conn.execute(""" + create table foo( + id integer primary key, + data varchar(50), + x integer + ) + """) + context = MigrationContext.configure(self.conn) + self.op = op.Operations(context) + self.t1 = table('foo', + column('id'), + column('data'), + column('x') + ) + def tearDown(self): + self.conn.close() + + def test_single_insert_round_trip(self): + self.op.bulk_insert(self.t1, + [{'data':"d1", "x":"x1"}] + ) + + eq_( + self.conn.execute("select id, data, x from foo").fetchall(), + [ + (1, "d1", "x1"), + ] + ) + + def test_bulk_insert_round_trip(self): + self.op.bulk_insert(self.t1, [ + {'data':"d1", "x":"x1"}, + {'data':"d2", "x":"x2"}, + {'data':"d3", "x":"x3"}, + ]) + + eq_( + self.conn.execute("select id, data, x from foo").fetchall(), + [ + (1, "d1", "x1"), + (2, "d2", "x2"), + (3, "d3", "x3") + ] + ) + |