summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-04-07 18:00:42 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-04-07 18:00:42 -0400
commita7ada78d87aeb7383455b3935457950f1070d4b2 (patch)
tree1ef5720cf920459d4120bab10c868002fa248e0b /tests
parent456d364626bdbff1d3ded8499242f819b937e7f6 (diff)
downloadalembic-a7ada78d87aeb7383455b3935457950f1070d4b2.tar.gz
- [bug] bulk_insert() fixes:rel_0_3_1
1. bulk_insert() operation was not working most likely since the 0.2 series when used with an engine. #41 2. Repaired bulk_insert() to complete when used against a lower-case-t table and executing with only one set of parameters, working around SQLAlchemy bug #2461 in this regard. 3. bulk_insert() uses "inline=True" so that phrases like RETURNING and such don't get invoked for single-row bulk inserts. 4. bulk_insert() will check that you're passing a list of dictionaries in, raises TypeError if not detected.
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py2
-rw-r--r--tests/test_bulk_insert.py123
2 files changed, 119 insertions, 6 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index d3193ae..7f83300 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -78,6 +78,8 @@ def _get_dialect(name):
except KeyError:
dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name)
_dialects[name] = d = dialect_mod.dialect()
+ if name == 'postgresql':
+ d.implicit_returning = True
return d
def assert_compiled(element, assert_string, dialect=None):
diff --git a/tests/test_bulk_insert.py b/tests/test_bulk_insert.py
index af8dc20..375edd0 100644
--- a/tests/test_bulk_insert.py
+++ b/tests/test_bulk_insert.py
@@ -1,16 +1,32 @@
-from tests import op_fixture
+from tests import op_fixture, _sqlite_testing_config, eq_, assert_raises_message
from alembic import op
from sqlalchemy import Integer, \
UniqueConstraint, String
from sqlalchemy.sql import table, column
+from unittest import TestCase
+from sqlalchemy import Table, Column, MetaData
-def _test_bulk_insert(dialect, as_sql):
+def _table_fixture(dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
+ return context, t1
+
+def _big_t_table_fixture(dialect, as_sql):
+ context = op_fixture(dialect, as_sql)
+ t1 = Table("ins_table", MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('v1', String()),
+ Column('v2', String()),
+ )
+ return context, t1
+
+def _test_bulk_insert(dialect, as_sql):
+ context, t1 = _table_fixture(dialect, as_sql)
+
op.bulk_insert(t1, [
{'id':1, 'v1':'row v1', 'v2':'row v5'},
{'id':2, 'v1':'row v2', 'v2':'row v6'},
@@ -19,6 +35,22 @@ def _test_bulk_insert(dialect, as_sql):
])
return context
+def _test_bulk_insert_single(dialect, as_sql):
+ context, t1 = _table_fixture(dialect, as_sql)
+
+ op.bulk_insert(t1, [
+ {'id':1, 'v1':'row v1', 'v2':'row v5'},
+ ])
+ return context
+
+def _test_bulk_insert_single_bigt(dialect, as_sql):
+ context, t1 = _big_t_table_fixture(dialect, as_sql)
+
+ op.bulk_insert(t1, [
+ {'id':1, 'v1':'row v1', 'v2':'row v5'},
+ ])
+ return context
+
def test_bulk_insert():
context = _test_bulk_insert('default', False)
context.assert_(
@@ -35,10 +67,6 @@ def test_bulk_insert_wrong_cols():
op.bulk_insert(t1, [
{'v1':'row v1', },
])
- # TODO: this is wrong because the test fixture isn't actually
- # doing what the real context would do. Sending this to
- # PG is going to produce a RETURNING clause. fixture would
- # need to be beefed up
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
@@ -49,6 +77,24 @@ def test_bulk_insert_pg():
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
+def test_bulk_insert_pg_single():
+ context = _test_bulk_insert_single('postgresql', False)
+ context.assert_(
+ 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
+ )
+
+def test_bulk_insert_pg_single_as_sql():
+ context = _test_bulk_insert_single('postgresql', True)
+ context.assert_(
+ "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
+ )
+
+def test_bulk_insert_pg_single_big_t_as_sql():
+ context = _test_bulk_insert_single_bigt('postgresql', True)
+ context.assert_(
+ "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
+ )
+
def test_bulk_insert_mssql():
context = _test_bulk_insert('mssql', False)
context.assert_(
@@ -86,3 +132,68 @@ def test_bulk_insert_as_sql_mssql():
"INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')",
'SET IDENTITY_INSERT ins_table OFF'
)
+
+def test_invalid_format():
+ context, t1 = _table_fixture("sqlite", False)
+ assert_raises_message(
+ TypeError,
+ "List expected",
+ op.bulk_insert, t1, {"id":5}
+ )
+
+ assert_raises_message(
+ TypeError,
+ "List of dictionaries expected",
+ op.bulk_insert, t1, [(5, )]
+ )
+
+class RoundTripTest(TestCase):
+ def setUp(self):
+ from sqlalchemy import create_engine
+ from alembic.migration import MigrationContext
+ self.conn = create_engine("sqlite://").connect()
+ self.conn.execute("""
+ create table foo(
+ id integer primary key,
+ data varchar(50),
+ x integer
+ )
+ """)
+ context = MigrationContext.configure(self.conn)
+ self.op = op.Operations(context)
+ self.t1 = table('foo',
+ column('id'),
+ column('data'),
+ column('x')
+ )
+ def tearDown(self):
+ self.conn.close()
+
+ def test_single_insert_round_trip(self):
+ self.op.bulk_insert(self.t1,
+ [{'data':"d1", "x":"x1"}]
+ )
+
+ eq_(
+ self.conn.execute("select id, data, x from foo").fetchall(),
+ [
+ (1, "d1", "x1"),
+ ]
+ )
+
+ def test_bulk_insert_round_trip(self):
+ self.op.bulk_insert(self.t1, [
+ {'data':"d1", "x":"x1"},
+ {'data':"d2", "x":"x2"},
+ {'data':"d3", "x":"x3"},
+ ])
+
+ eq_(
+ self.conn.execute("select id, data, x from foo").fetchall(),
+ [
+ (1, "d1", "x1"),
+ (2, "d2", "x2"),
+ (3, "d3", "x3")
+ ]
+ )
+