diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-04-07 18:00:42 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-04-07 18:00:42 -0400 |
commit | a7ada78d87aeb7383455b3935457950f1070d4b2 (patch) | |
tree | 1ef5720cf920459d4120bab10c868002fa248e0b /tests | |
parent | 456d364626bdbff1d3ded8499242f819b937e7f6 (diff) | |
download | alembic-a7ada78d87aeb7383455b3935457950f1070d4b2.tar.gz |
- [bug] bulk_insert() fixes:rel_0_3_1
1. bulk_insert() operation was
not working most likely since the 0.2 series
when used with an engine. #41
2. Repaired bulk_insert() to complete when
used against a lower-case-t table and executing
with only one set of parameters, working
around SQLAlchemy bug #2461 in this regard.
3. bulk_insert() uses "inline=True" so that phrases
like RETURNING and such don't get invoked for
single-row bulk inserts.
4. bulk_insert() will check that you're passing
a list of dictionaries in, raises TypeError
if not detected.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 2 | ||||
-rw-r--r-- | tests/test_bulk_insert.py | 123 |
2 files changed, 119 insertions, 6 deletions
diff --git a/tests/__init__.py b/tests/__init__.py index d3193ae..7f83300 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -78,6 +78,8 @@ def _get_dialect(name): except KeyError: dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name) _dialects[name] = d = dialect_mod.dialect() + if name == 'postgresql': + d.implicit_returning = True return d def assert_compiled(element, assert_string, dialect=None): diff --git a/tests/test_bulk_insert.py b/tests/test_bulk_insert.py index af8dc20..375edd0 100644 --- a/tests/test_bulk_insert.py +++ b/tests/test_bulk_insert.py @@ -1,16 +1,32 @@ -from tests import op_fixture +from tests import op_fixture, _sqlite_testing_config, eq_, assert_raises_message from alembic import op from sqlalchemy import Integer, \ UniqueConstraint, String from sqlalchemy.sql import table, column +from unittest import TestCase +from sqlalchemy import Table, Column, MetaData -def _test_bulk_insert(dialect, as_sql): +def _table_fixture(dialect, as_sql): context = op_fixture(dialect, as_sql) t1 = table("ins_table", column('id', Integer), column('v1', String()), column('v2', String()), ) + return context, t1 + +def _big_t_table_fixture(dialect, as_sql): + context = op_fixture(dialect, as_sql) + t1 = Table("ins_table", MetaData(), + Column('id', Integer, primary_key=True), + Column('v1', String()), + Column('v2', String()), + ) + return context, t1 + +def _test_bulk_insert(dialect, as_sql): + context, t1 = _table_fixture(dialect, as_sql) + op.bulk_insert(t1, [ {'id':1, 'v1':'row v1', 'v2':'row v5'}, {'id':2, 'v1':'row v2', 'v2':'row v6'}, @@ -19,6 +35,22 @@ def _test_bulk_insert(dialect, as_sql): ]) return context +def _test_bulk_insert_single(dialect, as_sql): + context, t1 = _table_fixture(dialect, as_sql) + + op.bulk_insert(t1, [ + {'id':1, 'v1':'row v1', 'v2':'row v5'}, + ]) + return context + +def _test_bulk_insert_single_bigt(dialect, as_sql): + context, t1 = _big_t_table_fixture(dialect, as_sql) + + op.bulk_insert(t1, [ + {'id':1, 'v1':'row v1', 'v2':'row v5'}, + ]) + return context + def test_bulk_insert(): context = _test_bulk_insert('default', False) context.assert_( @@ -35,10 +67,6 @@ def test_bulk_insert_wrong_cols(): op.bulk_insert(t1, [ {'v1':'row v1', }, ]) - # TODO: this is wrong because the test fixture isn't actually - # doing what the real context would do. Sending this to - # PG is going to produce a RETURNING clause. fixture would - # need to be beefed up context.assert_( 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' ) @@ -49,6 +77,24 @@ def test_bulk_insert_pg(): 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' ) +def test_bulk_insert_pg_single(): + context = _test_bulk_insert_single('postgresql', False) + context.assert_( + 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)' + ) + +def test_bulk_insert_pg_single_as_sql(): + context = _test_bulk_insert_single('postgresql', True) + context.assert_( + "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')" + ) + +def test_bulk_insert_pg_single_big_t_as_sql(): + context = _test_bulk_insert_single_bigt('postgresql', True) + context.assert_( + "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')" + ) + def test_bulk_insert_mssql(): context = _test_bulk_insert('mssql', False) context.assert_( @@ -86,3 +132,68 @@ def test_bulk_insert_as_sql_mssql(): "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')", 'SET IDENTITY_INSERT ins_table OFF' ) + +def test_invalid_format(): + context, t1 = _table_fixture("sqlite", False) + assert_raises_message( + TypeError, + "List expected", + op.bulk_insert, t1, {"id":5} + ) + + assert_raises_message( + TypeError, + "List of dictionaries expected", + op.bulk_insert, t1, [(5, )] + ) + +class RoundTripTest(TestCase): + def setUp(self): + from sqlalchemy import create_engine + from alembic.migration import MigrationContext + self.conn = create_engine("sqlite://").connect() + self.conn.execute(""" + create table foo( + id integer primary key, + data varchar(50), + x integer + ) + """) + context = MigrationContext.configure(self.conn) + self.op = op.Operations(context) + self.t1 = table('foo', + column('id'), + column('data'), + column('x') + ) + def tearDown(self): + self.conn.close() + + def test_single_insert_round_trip(self): + self.op.bulk_insert(self.t1, + [{'data':"d1", "x":"x1"}] + ) + + eq_( + self.conn.execute("select id, data, x from foo").fetchall(), + [ + (1, "d1", "x1"), + ] + ) + + def test_bulk_insert_round_trip(self): + self.op.bulk_insert(self.t1, [ + {'data':"d1", "x":"x1"}, + {'data':"d2", "x":"x2"}, + {'data':"d3", "x":"x3"}, + ]) + + eq_( + self.conn.execute("select id, data, x from foo").fetchall(), + [ + (1, "d1", "x1"), + (2, "d2", "x2"), + (3, "d3", "x3") + ] + ) + |