diff options
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/mssql/test_compiler.py | 44 | ||||
| -rw-r--r-- | test/dialect/mssql/test_engine.py | 7 | ||||
| -rw-r--r-- | test/dialect/mssql/test_query.py | 3 | ||||
| -rw-r--r-- | test/dialect/mysql/test_compiler.py | 47 | ||||
| -rw-r--r-- | test/dialect/mysql/test_dialect.py | 59 | ||||
| -rw-r--r-- | test/dialect/mysql/test_reflection.py | 73 | ||||
| -rw-r--r-- | test/dialect/mysql/test_types.py | 299 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_compiler.py | 123 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_dialect.py | 39 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_reflection.py | 76 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_types.py | 400 | ||||
| -rw-r--r-- | test/dialect/test_firebird.py | 9 | ||||
| -rw-r--r-- | test/dialect/test_informix.py | 25 | ||||
| -rw-r--r-- | test/dialect/test_oracle.py | 635 |
14 files changed, 1358 insertions, 481 deletions
diff --git a/test/dialect/mssql/test_compiler.py b/test/dialect/mssql/test_compiler.py index 87037c6a4..f12ab0330 100644 --- a/test/dialect/mssql/test_compiler.py +++ b/test/dialect/mssql/test_compiler.py @@ -510,6 +510,29 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))" ) + def test_table_pkc_clustering(self): + metadata = MetaData() + tbl = Table('test', metadata, + Column('x', Integer, autoincrement=False), + Column('y', Integer, autoincrement=False), + PrimaryKeyConstraint("x", "y", mssql_clustered=True)) + self.assert_compile(schema.CreateTable(tbl), + "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " + "PRIMARY KEY CLUSTERED (x, y))" + ) + + def test_table_uc_clustering(self): + metadata = MetaData() + tbl = Table('test', metadata, + Column('x', Integer, autoincrement=False), + Column('y', Integer, autoincrement=False), + PrimaryKeyConstraint("x"), + UniqueConstraint("y", mssql_clustered=True)) + self.assert_compile(schema.CreateTable(tbl), + "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, " + "PRIMARY KEY (x), UNIQUE CLUSTERED (y))" + ) + def test_index_clustering(self): metadata = MetaData() tbl = Table('test', metadata, @@ -528,6 +551,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "CREATE INDEX foo ON test (x DESC, y)" ) + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer) + ) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + + def test_drop_index_w_schema(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer), + schema='bar' + ) + self.assert_compile( + schema.DropIndex(Index("idx_foo", t1.c.x)), + "DROP INDEX idx_foo ON bar.foo" + ) + def test_index_extra_include_1(self): metadata = MetaData() tbl = Table('test', metadata, diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py index 2834f35ec..c07f30040 100644 --- a/test/dialect/mssql/test_engine.py +++ b/test/dialect/mssql/test_engine.py @@ -131,10 +131,11 @@ class ParseConnectTest(fixtures.TestBase): for error in [ 'Adaptive Server connection timed out', + 'Net-Lib error during Connection reset by peer', 'message 20003', - "Error 10054", - "Not connected to any MS SQL server", - "Connection is closed" + 'Error 10054', + 'Not connected to any MS SQL server', + 'Connection is closed' ]: eq_(dialect.is_disconnect(error, None, None), True) diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py index bff737145..6a12744a7 100644 --- a/test/dialect/mssql/test_query.py +++ b/test/dialect/mssql/test_query.py @@ -232,9 +232,10 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase): con.execute("""drop trigger paj""") meta.drop_all() + @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature') @testing.provide_metadata def test_disable_scope_identity(self): - engine = engines.testing_engine(options={"use_scope_identity":False}) + engine = engines.testing_engine(options={"use_scope_identity": False}) metadata = self.metadata metadata.bind = engine t1 = Table('t1', metadata, diff --git a/test/dialect/mysql/test_compiler.py b/test/dialect/mysql/test_compiler.py index a77a25cc4..45f8405c8 100644 --- a/test/dialect/mysql/test_compiler.py +++ b/test/dialect/mysql/test_compiler.py @@ -6,6 +6,7 @@ from sqlalchemy import sql, exc, schema, types as sqltypes from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import testing +from sqlalchemy.sql import table, column class CompileTest(fixtures.TestBase, AssertsCompiledSQL): @@ -94,19 +95,57 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "CREATE TABLE testtbl (data VARCHAR(255), " "PRIMARY KEY (data) USING btree)") - def test_skip_deferrable_kw(self): + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer) + ) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + + def test_deferrable_initially_kw_not_ignored(self): m = MetaData() t1 = Table('t1', m, Column('id', Integer, primary_key=True)) t2 = Table('t2', m, Column('id', Integer, - ForeignKey('t1.id', deferrable=True), + ForeignKey('t1.id', deferrable=True, initially="XYZ"), primary_key=True)) self.assert_compile( schema.CreateTable(t2), "CREATE TABLE t2 (id INTEGER NOT NULL, " - "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id))" + "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id) DEFERRABLE INITIALLY XYZ)" ) + def test_match_kw_raises(self): + m = MetaData() + t1 = Table('t1', m, Column('id', Integer, primary_key=True)) + t2 = Table('t2', m, Column('id', Integer, + ForeignKey('t1.id', match="XYZ"), + primary_key=True)) + + assert_raises_message( + exc.CompileError, + "MySQL ignores the 'MATCH' keyword while at the same time causes " + "ON UPDATE/ON DELETE clauses to be ignored.", + schema.CreateTable(t2).compile, dialect=mysql.dialect() + ) + + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %s FOR UPDATE") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(read=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE") + class SQLTest(fixtures.TestBase, AssertsCompiledSQL): """Tests MySQL-dialect specific compilation.""" @@ -302,8 +341,10 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL): (VARCHAR, "CAST(t.col AS CHAR)"), (NCHAR, "CAST(t.col AS CHAR)"), (CHAR, "CAST(t.col AS CHAR)"), + (m.CHAR(charset='utf8'), "CAST(t.col AS CHAR CHARACTER SET utf8)"), (CLOB, "CAST(t.col AS CHAR)"), (TEXT, "CAST(t.col AS CHAR)"), + (m.TEXT(charset='utf8'), "CAST(t.col AS CHAR CHARACTER SET utf8)"), (String(32), "CAST(t.col AS CHAR(32))"), (Unicode(32), "CAST(t.col AS CHAR(32))"), (CHAR(32), "CAST(t.col AS CHAR(32))"), diff --git a/test/dialect/mysql/test_dialect.py b/test/dialect/mysql/test_dialect.py index 62bdfc81b..2ff17f0f7 100644 --- a/test/dialect/mysql/test_dialect.py +++ b/test/dialect/mysql/test_dialect.py @@ -9,12 +9,17 @@ from sqlalchemy.testing import engines import datetime class DialectTest(fixtures.TestBase): - __only_on__ = 'mysql' + def test_ssl_arguments_mysqldb(self): + from sqlalchemy.dialects.mysql import mysqldb + dialect = mysqldb.dialect() + self._test_ssl_arguments(dialect) - @testing.only_on(['mysql+mysqldb', 'mysql+oursql'], - 'requires particular SSL arguments') - def test_ssl_arguments(self): - dialect = testing.db.dialect + def test_ssl_arguments_oursql(self): + from sqlalchemy.dialects.mysql import oursql + dialect = oursql.dialect() + self._test_ssl_arguments(dialect) + + def _test_ssl_arguments(self, dialect): kwarg = dialect.create_connect_args( make_url("mysql://scott:tiger@localhost:3306/test" "?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem") @@ -33,6 +38,50 @@ class DialectTest(fixtures.TestBase): } ) + def test_mysqlconnector_buffered_arg(self): + from sqlalchemy.dialects.mysql import mysqlconnector + dialect = mysqlconnector.dialect() + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db?buffered=true") + )[1] + eq_(kw['buffered'], True) + + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db?buffered=false") + )[1] + eq_(kw['buffered'], False) + + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db") + )[1] + eq_(kw['buffered'], True) + + def test_mysqlconnector_raise_on_warnings_arg(self): + from sqlalchemy.dialects.mysql import mysqlconnector + dialect = mysqlconnector.dialect() + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db?raise_on_warnings=true") + )[1] + eq_(kw['raise_on_warnings'], True) + + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db?raise_on_warnings=false") + )[1] + eq_(kw['raise_on_warnings'], False) + + kw = dialect.create_connect_args( + make_url("mysql+mysqlconnector://u:p@host/db") + )[1] + eq_(kw['raise_on_warnings'], True) + + @testing.only_on('mysql') + def test_random_arg(self): + dialect = testing.db.dialect + kw = dialect.create_connect_args( + make_url("mysql://u:p@host/db?foo=true") + )[1] + eq_(kw['foo'], "true") + class SQLModeDetectionTest(fixtures.TestBase): __only_on__ = 'mysql' diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index b9e347d41..7494eaf43 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -140,33 +140,33 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults): @testing.uses_deprecated('Manually quoting ENUM value literals') def test_type_reflection(self): # (ask_for, roundtripped_as_if_different) - specs = [( String(1), mysql.MSString(1), ), - ( String(3), mysql.MSString(3), ), - ( Text(), mysql.MSText(), ), - ( Unicode(1), mysql.MSString(1), ), - ( Unicode(3), mysql.MSString(3), ), - ( UnicodeText(), mysql.MSText(), ), - ( mysql.MSChar(1), ), - ( mysql.MSChar(3), ), - ( NCHAR(2), mysql.MSChar(2), ), - ( mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only - ( mysql.MSNVarChar(22), mysql.MSString(22), ), - ( SmallInteger(), mysql.MSSmallInteger(), ), - ( SmallInteger(), mysql.MSSmallInteger(4), ), - ( mysql.MSSmallInteger(), ), - ( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ), - ( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ), - ( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ), - ( LargeBinary(3), mysql.TINYBLOB(), ), - ( LargeBinary(), mysql.BLOB() ), - ( mysql.MSBinary(3), mysql.MSBinary(3), ), - ( mysql.MSVarBinary(3),), - ( mysql.MSTinyBlob(),), - ( mysql.MSBlob(),), - ( mysql.MSBlob(1234), mysql.MSBlob()), - ( mysql.MSMediumBlob(),), - ( mysql.MSLongBlob(),), - ( mysql.ENUM("''","'fleem'"), ), + specs = [(String(1), mysql.MSString(1), ), + (String(3), mysql.MSString(3), ), + (Text(), mysql.MSText(), ), + (Unicode(1), mysql.MSString(1), ), + (Unicode(3), mysql.MSString(3), ), + (UnicodeText(), mysql.MSText(), ), + (mysql.MSChar(1), ), + (mysql.MSChar(3), ), + (NCHAR(2), mysql.MSChar(2), ), + (mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only + (mysql.MSNVarChar(22), mysql.MSString(22), ), + (SmallInteger(), mysql.MSSmallInteger(), ), + (SmallInteger(), mysql.MSSmallInteger(4), ), + (mysql.MSSmallInteger(), ), + (mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ), + (mysql.MSMediumInteger(), mysql.MSMediumInteger(), ), + (mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ), + (LargeBinary(3), mysql.TINYBLOB(), ), + (LargeBinary(), mysql.BLOB() ), + (mysql.MSBinary(3), mysql.MSBinary(3), ), + (mysql.MSVarBinary(3),), + (mysql.MSTinyBlob(),), + (mysql.MSBlob(),), + (mysql.MSBlob(1234), mysql.MSBlob()), + (mysql.MSMediumBlob(),), + (mysql.MSLongBlob(),), + (mysql.ENUM("''","'fleem'"), ), ] columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] @@ -298,3 +298,22 @@ class RawReflectionTest(fixtures.TestBase): assert regex.match(' PRIMARY KEY USING BTREE (`id`)') assert regex.match(' PRIMARY KEY (`id`) USING BTREE') + def test_fk_reflection(self): + regex = self.parser._re_constraint + + m = regex.match(' CONSTRAINT `addresses_user_id_fkey` ' + 'FOREIGN KEY (`user_id`) ' + 'REFERENCES `users` (`id`) ' + 'ON DELETE CASCADE ON UPDATE CASCADE') + eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`', + '`users`', '`id`', None, 'CASCADE', 'CASCADE')) + + + m = regex.match(' CONSTRAINT `addresses_user_id_fkey` ' + 'FOREIGN KEY (`user_id`) ' + 'REFERENCES `users` (`id`) ' + 'ON DELETE CASCADE ON UPDATE SET NULL') + eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`', + '`users`', '`id`', None, 'CASCADE', 'SET NULL')) + + diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py index b918abe25..acf9c1e2f 100644 --- a/test/dialect/mysql/test_types.py +++ b/test/dialect/mysql/test_types.py @@ -4,12 +4,13 @@ from sqlalchemy.testing import eq_, assert_raises from sqlalchemy import * from sqlalchemy import sql, exc, schema from sqlalchemy.util import u +from sqlalchemy import util from sqlalchemy.dialects.mysql import base as mysql from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults from sqlalchemy import testing from sqlalchemy.testing.engines import utf8_engine import datetime - +import decimal class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): "Test MySQL column types" @@ -141,10 +142,36 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): ] for type_, args, kw, res in columns: + type_inst = type_(*args, **kw) self.assert_compile( - type_(*args, **kw), + type_inst, res ) + # test that repr() copies out all arguments + self.assert_compile( + eval("mysql.%r" % type_inst), + res + ) + + @testing.only_if('mysql') + @testing.provide_metadata + def test_precision_float_roundtrip(self): + t = Table('t', self.metadata, + Column('scale_value', mysql.DOUBLE( + precision=15, scale=12, asdecimal=True)), + Column('unscale_value', mysql.DOUBLE( + decimal_return_scale=12, asdecimal=True)) + ) + t.create(testing.db) + testing.db.execute( + t.insert(), scale_value=45.768392065789, + unscale_value=45.768392065789 + ) + result = testing.db.scalar(select([t.c.scale_value])) + eq_(result, decimal.Decimal("45.768392065789")) + + result = testing.db.scalar(select([t.c.unscale_value])) + eq_(result, decimal.Decimal("45.768392065789")) @testing.exclude('mysql', '<', (4, 1, 1), 'no charset support') def test_charset(self): @@ -212,14 +239,22 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): (mysql.ENUM, ["foo", "bar"], {'unicode':True}, '''ENUM('foo','bar') UNICODE'''), - (String, [20], {"collation":"utf8"}, 'VARCHAR(20) COLLATE utf8') + (String, [20], {"collation": "utf8"}, 'VARCHAR(20) COLLATE utf8') ] for type_, args, kw, res in columns: + type_inst = type_(*args, **kw) self.assert_compile( - type_(*args, **kw), + type_inst, + res + ) + # test that repr() copies out all arguments + self.assert_compile( + eval("mysql.%r" % type_inst) + if type_ is not String + else eval("%r" % type_inst), res ) @@ -229,15 +264,23 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): def test_charset_collate_table(self): t = Table('foo', self.metadata, Column('id', Integer), + Column('data', UnicodeText), mysql_default_charset='utf8', - mysql_collate='utf8_unicode_ci' + mysql_collate='utf8_bin' ) t.create() m2 = MetaData(testing.db) t2 = Table('foo', m2, autoload=True) - eq_(t2.kwargs['mysql_collate'], 'utf8_unicode_ci') + eq_(t2.kwargs['mysql_collate'], 'utf8_bin') eq_(t2.kwargs['mysql_default charset'], 'utf8') + # test [ticket:2906] + # in order to test the condition here, need to use + # MySQLdb 1.2.3 and also need to pass either use_unicode=1 + # or charset=utf8 to the URL. + t.insert().execute(id=1, data=u('some text')) + assert isinstance(testing.db.scalar(select([t.c.data])), util.text_type) + def test_bit_50(self): """Exercise BIT types on 5.0+ (not valid for all engine types)""" @@ -250,7 +293,9 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): @testing.only_if('mysql') @testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature') - @testing.fails_on('mysql+oursql', 'some round trips fail, oursql bug ?') + @testing.fails_if( + lambda: testing.against("mysql+oursql") and util.py3k, + 'some round trips fail, oursql bug ?') @testing.provide_metadata def test_bit_50_roundtrip(self): bit_table = Table('mysql_bits', self.metadata, @@ -474,72 +519,24 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): self.assert_(colspec(table.c.y1).startswith('y1 YEAR')) eq_(colspec(table.c.y5), 'y5 YEAR(4)') - @testing.only_if('mysql') - @testing.provide_metadata - def test_set(self): - """Exercise the SET type.""" - set_table = Table('mysql_set', self.metadata, - Column('s1', - mysql.MSSet("'dq'", "'sq'")), Column('s2', - mysql.MSSet("'a'")), Column('s3', - mysql.MSSet("'5'", "'7'", "'9'"))) - eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')") - eq_(colspec(set_table.c.s2), "s2 SET('a')") - eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')") - set_table.create() - reflected = Table('mysql_set', MetaData(testing.db), - autoload=True) - for table in set_table, reflected: - - def roundtrip(store, expected=None): - expected = expected or store - table.insert(store).execute() - row = table.select().execute().first() - self.assert_(list(row) == expected) - table.delete().execute() - - roundtrip([None, None, None], [None] * 3) - roundtrip(['', '', ''], [set([''])] * 3) - roundtrip([set(['dq']), set(['a']), set(['5'])]) - roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']), - set(['5'])]) - roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5' - ])]) - roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7' - ])]) - set_table.insert().execute({'s3': set(['5'])}, - {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])}, - {'s3': set(['7', '9'])}) - - # NOTE: the string sent to MySQL here is sensitive to ordering. - # for some reason the set ordering is always "5, 7" when we test on - # MySQLdb but in Py3K this is not guaranteed. So basically our - # SET type doesn't do ordering correctly (not sure how it can, - # as we don't know how the SET was configured in the first place.) - rows = select([set_table.c.s3], - set_table.c.s3.in_([set(['5']), ['5', '7']]) - ).execute().fetchall() - found = set([frozenset(row[0]) for row in rows]) - eq_(found, set([frozenset(['5']), frozenset(['5', '7'])])) - -class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): +class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): __only_on__ = 'mysql' __dialect__ = mysql.dialect() - @testing.uses_deprecated('Manually quoting ENUM value literals') @testing.provide_metadata def test_enum(self): """Exercise the ENUM type.""" + with testing.expect_deprecated('Manually quoting ENUM value literals'): + e1, e2 = mysql.ENUM("'a'", "'b'"), mysql.ENUM("'a'", "'b'") + enum_table = Table('mysql_enum', self.metadata, - Column('e1', mysql.ENUM("'a'", "'b'")), - Column('e2', mysql.ENUM("'a'", "'b'"), - nullable=False), - Column('e2generic', Enum("a", "b"), - nullable=False), + Column('e1', e1), + Column('e2', e2, nullable=False), + Column('e2generic', Enum("a", "b"), nullable=False), Column('e3', mysql.ENUM("'a'", "'b'", strict=True)), Column('e4', mysql.ENUM("'a'", "'b'", strict=True), nullable=False), @@ -587,6 +584,106 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): eq_(res, expected) + @testing.provide_metadata + def test_set(self): + + with testing.expect_deprecated('Manually quoting SET value literals'): + e1, e2 = mysql.SET("'a'", "'b'"), mysql.SET("'a'", "'b'") + + set_table = Table('mysql_set', self.metadata, + Column('e1', e1), + Column('e2', e2, nullable=False), + Column('e3', mysql.SET("a", "b")), + Column('e4', mysql.SET("'a'", "b")), + Column('e5', mysql.SET("'a'", "'b'", quoting="quoted")) + ) + + eq_(colspec(set_table.c.e1), + "e1 SET('a','b')") + eq_(colspec(set_table.c.e2), + "e2 SET('a','b') NOT NULL") + eq_(colspec(set_table.c.e3), + "e3 SET('a','b')") + eq_(colspec(set_table.c.e4), + "e4 SET('''a''','b')") + eq_(colspec(set_table.c.e5), + "e5 SET('a','b')") + set_table.create() + + assert_raises(exc.DBAPIError, set_table.insert().execute, + e1=None, e2=None, e3=None, e4=None) + + if testing.against("+oursql"): + assert_raises(exc.StatementError, set_table.insert().execute, + e1='c', e2='c', e3='c', e4='c') + + set_table.insert().execute(e1='a', e2='a', e3='a', e4="'a'", e5="a,b") + set_table.insert().execute(e1='b', e2='b', e3='b', e4='b', e5="a,b") + + res = set_table.select().execute().fetchall() + + if testing.against("+oursql"): + expected = [ + # 1st row with all c's, data truncated + (set(['']), set(['']), set(['']), set(['']), None), + ] + else: + expected = [] + + expected.extend([ + (set(['a']), set(['a']), set(['a']), set(["'a'"]), set(['a', 'b'])), + (set(['b']), set(['b']), set(['b']), set(['b']), set(['a', 'b'])) + ]) + + eq_(res, expected) + + @testing.provide_metadata + def test_set_roundtrip_plus_reflection(self): + set_table = Table('mysql_set', self.metadata, + Column('s1', + mysql.SET("dq", "sq")), + Column('s2', mysql.SET("a")), + Column('s3', mysql.SET("5", "7", "9"))) + + eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')") + eq_(colspec(set_table.c.s2), "s2 SET('a')") + eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')") + set_table.create() + reflected = Table('mysql_set', MetaData(testing.db), + autoload=True) + for table in set_table, reflected: + + def roundtrip(store, expected=None): + expected = expected or store + table.insert(store).execute() + row = table.select().execute().first() + self.assert_(list(row) == expected) + table.delete().execute() + + roundtrip([None, None, None], [None] * 3) + roundtrip(['', '', ''], [set([''])] * 3) + roundtrip([set(['dq']), set(['a']), set(['5'])]) + roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']), + set(['5'])]) + roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5' + ])]) + roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7' + ])]) + set_table.insert().execute({'s3': set(['5'])}, + {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])}, + {'s3': set(['7', '9'])}) + + # NOTE: the string sent to MySQL here is sensitive to ordering. + # for some reason the set ordering is always "5, 7" when we test on + # MySQLdb but in Py3K this is not guaranteed. So basically our + # SET type doesn't do ordering correctly (not sure how it can, + # as we don't know how the SET was configured in the first place.) + rows = select([set_table.c.s3], + set_table.c.s3.in_([set(['5']), ['5', '7']]) + ).execute().fetchall() + found = set([frozenset(row[0]) for row in rows]) + eq_(found, set([frozenset(['5']), frozenset(['5', '7'])])) + def test_unicode_enum(self): unicode_engine = utf8_engine() metadata = MetaData(unicode_engine) @@ -634,38 +731,64 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): "VARCHAR(1), CHECK (somecolumn IN ('x', " "'y', 'z')))") + @testing.provide_metadata @testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''") - @testing.uses_deprecated('Manually quoting ENUM value literals') def test_enum_parse(self): - """More exercises for the ENUM type.""" - # MySQL 3.23 can't handle an ENUM of ''.... - - enum_table = Table('mysql_enum', MetaData(testing.db), - Column('e1', mysql.ENUM("'a'")), - Column('e2', mysql.ENUM("''")), - Column('e3', mysql.ENUM('a')), - Column('e4', mysql.ENUM('')), - Column('e5', mysql.ENUM("'a'", "''")), - Column('e6', mysql.ENUM("''", "'a'")), - Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''"))) + with testing.expect_deprecated('Manually quoting ENUM value literals'): + enum_table = Table('mysql_enum', self.metadata, + Column('e1', mysql.ENUM("'a'")), + Column('e2', mysql.ENUM("''")), + Column('e3', mysql.ENUM('a')), + Column('e4', mysql.ENUM('')), + Column('e5', mysql.ENUM("'a'", "''")), + Column('e6', mysql.ENUM("''", "'a'")), + Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''"))) for col in enum_table.c: self.assert_(repr(col)) - try: - enum_table.create() - reflected = Table('mysql_enum', MetaData(testing.db), - autoload=True) - for t in enum_table, reflected: - eq_(t.c.e1.type.enums, ("a",)) - eq_(t.c.e2.type.enums, ("",)) - eq_(t.c.e3.type.enums, ("a",)) - eq_(t.c.e4.type.enums, ("",)) - eq_(t.c.e5.type.enums, ("a", "")) - eq_(t.c.e6.type.enums, ("", "a")) - eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'")) - finally: - enum_table.drop() + + enum_table.create() + reflected = Table('mysql_enum', MetaData(testing.db), + autoload=True) + for t in enum_table, reflected: + eq_(t.c.e1.type.enums, ("a",)) + eq_(t.c.e2.type.enums, ("",)) + eq_(t.c.e3.type.enums, ("a",)) + eq_(t.c.e4.type.enums, ("",)) + eq_(t.c.e5.type.enums, ("a", "")) + eq_(t.c.e6.type.enums, ("", "a")) + eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'")) + + @testing.provide_metadata + @testing.exclude('mysql', '<', (5,)) + def test_set_parse(self): + with testing.expect_deprecated('Manually quoting SET value literals'): + set_table = Table('mysql_set', self.metadata, + Column('e1', mysql.SET("'a'")), + Column('e2', mysql.SET("''")), + Column('e3', mysql.SET('a')), + Column('e4', mysql.SET('')), + Column('e5', mysql.SET("'a'", "''")), + Column('e6', mysql.SET("''", "'a'")), + Column('e7', mysql.SET("''", "'''a'''", "'b''b'", "''''"))) + + for col in set_table.c: + self.assert_(repr(col)) + + set_table.create() + + # don't want any warnings on reflection + reflected = Table('mysql_set', MetaData(testing.db), + autoload=True) + for t in set_table, reflected: + eq_(t.c.e1.type.values, ("a",)) + eq_(t.c.e2.type.values, ("",)) + eq_(t.c.e3.type.values, ("a",)) + eq_(t.c.e4.type.values, ("",)) + eq_(t.c.e5.type.values, ("a", "")) + eq_(t.c.e6.type.values, ("", "a")) + eq_(t.c.e7.type.values, ("", "'a'", "b'b", "'")) def colspec(c): return testing.db.dialect.ddl_compiler( diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index 11661b11f..e64afb186 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -16,6 +16,7 @@ from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import TSRANGE from sqlalchemy.orm import mapper, aliased, Session from sqlalchemy.sql import table, column, operators +from sqlalchemy.util import u class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): @@ -106,6 +107,45 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'AS length_1', dialect=dialect) + def test_create_drop_enum(self): + # test escaping and unicode within CREATE TYPE for ENUM + typ = postgresql.ENUM( + "val1", "val2", "val's 3", u('méil'), name="myname") + self.assert_compile(postgresql.CreateEnumType(typ), + u("CREATE TYPE myname AS ENUM ('val1', 'val2', 'val''s 3', 'méil')") + ) + + typ = postgresql.ENUM( + "val1", "val2", "val's 3", name="PleaseQuoteMe") + self.assert_compile(postgresql.CreateEnumType(typ), + "CREATE TYPE \"PleaseQuoteMe\" AS ENUM " + "('val1', 'val2', 'val''s 3')" + ) + + def test_generic_enum(self): + e1 = Enum('x', 'y', 'z', name='somename') + e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') + self.assert_compile(postgresql.CreateEnumType(e1), + "CREATE TYPE somename AS ENUM ('x', 'y', 'z')" + ) + self.assert_compile(postgresql.CreateEnumType(e2), + "CREATE TYPE someschema.somename AS ENUM " + "('x', 'y', 'z')") + self.assert_compile(postgresql.DropEnumType(e1), + 'DROP TYPE somename') + self.assert_compile(postgresql.DropEnumType(e2), + 'DROP TYPE someschema.somename') + t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) + self.assert_compile(schema.CreateTable(t1), + 'CREATE TABLE sometable (somecolumn ' + 'somename)') + t1 = Table('sometable', MetaData(), Column('somecolumn', + Enum('x', 'y', 'z', native_enum=False))) + self.assert_compile(schema.CreateTable(t1), + "CREATE TABLE sometable (somecolumn " + "VARCHAR(1), CHECK (somecolumn IN ('x', " + "'y', 'z')))") + def test_create_partial_index(self): m = MetaData() tbl = Table('testtbl', m, Column('data', Integer)) @@ -173,6 +213,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'USING hash (data)', dialect=postgresql.dialect()) + + def test_create_index_expr_gets_parens(self): + m = MetaData() + tbl = Table('testtbl', m, Column('x', Integer), Column('y', Integer)) + + idx1 = Index('test_idx1', 5 / (tbl.c.x + tbl.c.y)) + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))" + ) + + def test_create_index_literals(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', Integer)) + + idx1 = Index('test_idx1', tbl.c.data + 5) + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl ((data + 5))" + ) + def test_exclude_constraint_min(self): m = MetaData() tbl = Table('testtbl', m, @@ -228,6 +289,68 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(read=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(read=True, nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s " + "FOR UPDATE OF mytable") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(read=True, nowait=True, of=table1), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s " + "FOR SHARE OF mytable NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(read=True, nowait=True, of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s " + "FOR SHARE OF mytable NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(read=True, nowait=True, + of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = %(myid_1)s " + "FOR SHARE OF mytable NOWAIT") + + ta = table1.alias() + self.assert_compile( + ta.select(ta.c.myid == 7). + with_for_update(of=[ta.c.myid, ta.c.name]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable AS mytable_1 " + "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1" + ) def test_reserved_words(self): diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 1fc239cb7..fd6df2c98 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -17,6 +17,7 @@ from sqlalchemy.dialects.postgresql import base as postgresql import logging import logging.handlers from sqlalchemy.testing.mock import Mock +from sqlalchemy.engine.reflection import Inspector class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): @@ -53,7 +54,11 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)), ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, ' 'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), ' - '64-bit', (9, 1, 2))]: + '64-bit', (9, 1, 2)), + ('[PostgreSQL 9.2.4 ] VMware vFabric Postgres 9.2.4.0 ' + 'release build 1080137', (9, 2, 4)) + + ]: eq_(testing.db.dialect._get_server_version_info(mock_conn(string)), version) @@ -63,8 +68,10 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): assert testing.db.dialect.dbapi.__version__.\ startswith(".".join(str(x) for x in v)) + # currently not passing with pg 9.3 that does not seem to generate + # any notices here, woudl rather find a way to mock this @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') - def test_notice_logging(self): + def _test_notice_logging(self): log = logging.getLogger('sqlalchemy.dialects.postgresql') buf = logging.handlers.BufferingHandler(100) lev = log.level @@ -199,18 +206,32 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): assert_raises(exc.InvalidRequestError, testing.db.execute, stmt) def test_serial_integer(self): - for type_, expected in [ - (Integer, 'SERIAL'), - (BigInteger, 'BIGSERIAL'), - (SmallInteger, 'SMALLINT'), - (postgresql.INTEGER, 'SERIAL'), - (postgresql.BIGINT, 'BIGSERIAL'), + + for version, type_, expected in [ + (None, Integer, 'SERIAL'), + (None, BigInteger, 'BIGSERIAL'), + ((9, 1), SmallInteger, 'SMALLINT'), + ((9, 2), SmallInteger, 'SMALLSERIAL'), + (None, postgresql.INTEGER, 'SERIAL'), + (None, postgresql.BIGINT, 'BIGSERIAL'), ]: m = MetaData() t = Table('t', m, Column('c', type_, primary_key=True)) - ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t)) + + if version: + dialect = postgresql.dialect() + dialect._get_server_version_info = Mock(return_value=version) + dialect.initialize(testing.db.connect()) + else: + dialect = testing.db.dialect + + ddl_compiler = dialect.ddl_compiler( + dialect, + schema.CreateTable(t) + ) eq_( ddl_compiler.get_column_specification(t.c.c), "c %s NOT NULL" % expected ) + diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index fb399b546..58f34d5d0 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -5,6 +5,7 @@ from sqlalchemy.testing.assertions import eq_, assert_raises, \ AssertsCompiledSQL, ComparesTables from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing +from sqlalchemy import inspect from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ String, Sequence, ForeignKey, join, Numeric, \ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ @@ -159,6 +160,17 @@ class ReflectionTest(fixtures.TestBase): subject.join(referer).onclause)) @testing.provide_metadata + def test_reflect_default_over_128_chars(self): + Table('t', self.metadata, + Column('x', String(200), server_default="abcd" * 40) + ).create(testing.db) + + m = MetaData() + t = Table('t', m, autoload=True, autoload_with=testing.db) + eq_( + t.c.x.server_default.arg.text, "'%s'::character varying" % ("abcd" * 40) + ) + @testing.provide_metadata def test_renamed_sequence_reflection(self): metadata = self.metadata t = Table('t', metadata, Column('id', Integer, primary_key=True)) @@ -416,6 +428,70 @@ class ReflectionTest(fixtures.TestBase): eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}]) conn.close() + @testing.provide_metadata + def test_foreign_key_option_inspection(self): + metadata = self.metadata + Table('person', metadata, + Column('id', String(length=32), nullable=False, primary_key=True), + Column('company_id', ForeignKey('company.id', + name='person_company_id_fkey', + match='FULL', onupdate='RESTRICT', ondelete='RESTRICT', + deferrable=True, initially='DEFERRED' + ) + ) + ) + Table('company', metadata, + Column('id', String(length=32), nullable=False, primary_key=True), + Column('name', String(length=255)), + Column('industry_id', ForeignKey('industry.id', + name='company_industry_id_fkey', + onupdate='CASCADE', ondelete='CASCADE', + deferrable=False, # PG default + initially='IMMEDIATE' # PG default + ) + ) + ) + Table('industry', metadata, + Column('id', Integer(), nullable=False, primary_key=True), + Column('name', String(length=255)) + ) + fk_ref = { + 'person_company_id_fkey': { + 'name': 'person_company_id_fkey', + 'constrained_columns': ['company_id'], + 'referred_columns': ['id'], + 'referred_table': 'company', + 'referred_schema': None, + 'options': { + 'onupdate': 'RESTRICT', + 'deferrable': True, + 'ondelete': 'RESTRICT', + 'initially': 'DEFERRED', + 'match': 'FULL' + } + }, + 'company_industry_id_fkey': { + 'name': 'company_industry_id_fkey', + 'constrained_columns': ['industry_id'], + 'referred_columns': ['id'], + 'referred_table': 'industry', + 'referred_schema': None, + 'options': { + 'onupdate': 'CASCADE', + 'deferrable': None, + 'ondelete': 'CASCADE', + 'initially': None, + 'match': None + } + } + } + metadata.create_all() + inspector = inspect(testing.db) + fks = inspector.get_foreign_keys('person') + \ + inspector.get_foreign_keys('company') + for fk in fks: + eq_(fk, fk_ref[fk['name']]) + class CustomTypeReflectionTest(fixtures.TestBase): class CustomType(object): diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py index 784f8bcbf..ba4b63e1a 100644 --- a/test/dialect/postgresql/test_types.py +++ b/test/dialect/postgresql/test_types.py @@ -10,18 +10,22 @@ from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ func, literal_column, literal, bindparam, cast, extract, \ SmallInteger, Enum, REAL, update, insert, Index, delete, \ - and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text, \ + type_coerce from sqlalchemy.orm import Session, mapper, aliased from sqlalchemy import exc, schema, types from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \ - INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE + INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \ + JSON import decimal from sqlalchemy import util from sqlalchemy.testing.util import round_decimal from sqlalchemy.sql import table, column, operators import logging import re +from sqlalchemy import inspect +from sqlalchemy import event class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = 'postgresql' @@ -96,34 +100,10 @@ class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): ([5], [5], [6], [decimal.Decimal("6.4")]) ) -class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): +class EnumTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' - __dialect__ = postgresql.dialect() - def test_compile(self): - e1 = Enum('x', 'y', 'z', name='somename') - e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') - self.assert_compile(postgresql.CreateEnumType(e1), - "CREATE TYPE somename AS ENUM ('x','y','z')" - ) - self.assert_compile(postgresql.CreateEnumType(e2), - "CREATE TYPE someschema.somename AS ENUM " - "('x','y','z')") - self.assert_compile(postgresql.DropEnumType(e1), - 'DROP TYPE somename') - self.assert_compile(postgresql.DropEnumType(e2), - 'DROP TYPE someschema.somename') - t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) - self.assert_compile(schema.CreateTable(t1), - 'CREATE TABLE sometable (somecolumn ' - 'somename)') - t1 = Table('sometable', MetaData(), Column('somecolumn', - Enum('x', 'y', 'z', native_enum=False))) - self.assert_compile(schema.CreateTable(t1), - "CREATE TABLE sometable (somecolumn " - "VARCHAR(1), CHECK (somecolumn IN ('x', " - "'y', 'z')))") @testing.fails_on('postgresql+zxjdbc', 'zxjdbc fails on ENUM: column "XXX" is of type ' @@ -860,7 +840,8 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): Column('plain_interval', postgresql.INTERVAL), Column('year_interval', y2m()), Column('month_interval', d2s()), - Column('precision_interval', postgresql.INTERVAL(precision=3)) + Column('precision_interval', postgresql.INTERVAL(precision=3)), + Column('tsvector_document', postgresql.TSVECTOR) ) metadata.create_all() @@ -893,6 +874,17 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): self.assert_compile(type_, expected) @testing.provide_metadata + def test_tsvector_round_trip(self): + t = Table('t1', self.metadata, Column('data', postgresql.TSVECTOR)) + t.create() + testing.db.execute(t.insert(), data="a fat cat sat") + eq_(testing.db.scalar(select([t.c.data])), "'a' 'cat' 'fat' 'sat'") + + testing.db.execute(t.update(), data="'a' 'cat' 'fat' 'mat' 'sat'") + + eq_(testing.db.scalar(select([t.c.data])), "'a' 'cat' 'fat' 'mat' 'sat'") + + @testing.provide_metadata def test_bit_reflection(self): metadata = self.metadata t1 = Table('t1', metadata, @@ -918,7 +910,6 @@ class UUIDTest(fixtures.TestBase): __only_on__ = 'postgresql' - @testing.requires.python25 @testing.fails_on('postgresql+zxjdbc', 'column "data" is of type uuid but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') @@ -932,7 +923,6 @@ class UUIDTest(fixtures.TestBase): str(uuid.uuid4()) ) - @testing.requires.python25 @testing.fails_on('postgresql+zxjdbc', 'column "data" is of type uuid but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') @@ -978,13 +968,8 @@ class UUIDTest(fixtures.TestBase): -class HStoreTest(fixtures.TestBase): - def _assert_sql(self, construct, expected): - dialect = postgresql.dialect() - compiled = str(construct.compile(dialect=dialect)) - compiled = re.sub(r'\s+', ' ', compiled) - expected = re.sub(r'\s+', ' ', expected) - eq_(compiled, expected) +class HStoreTest(AssertsCompiledSQL, fixtures.TestBase): + __dialect__ = 'postgresql' def setup(self): metadata = MetaData() @@ -996,7 +981,7 @@ class HStoreTest(fixtures.TestBase): def _test_where(self, whereclause, expected): stmt = select([self.test_table]).where(whereclause) - self._assert_sql( + self.assert_compile( stmt, "SELECT test_table.id, test_table.hash FROM test_table " "WHERE %s" % expected @@ -1004,7 +989,7 @@ class HStoreTest(fixtures.TestBase): def _test_cols(self, colclause, expected, from_=True): stmt = select([colclause]) - self._assert_sql( + self.assert_compile( stmt, ( "SELECT %s" + @@ -1013,9 +998,8 @@ class HStoreTest(fixtures.TestBase): ) def test_bind_serialize_default(self): - from sqlalchemy.engine import default - dialect = default.DefaultDialect() + dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), @@ -1023,9 +1007,7 @@ class HStoreTest(fixtures.TestBase): ) def test_bind_serialize_with_slashes_and_quotes(self): - from sqlalchemy.engine import default - - dialect = default.DefaultDialect() + dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( proc({'\\"a': '\\"1'}), @@ -1033,9 +1015,7 @@ class HStoreTest(fixtures.TestBase): ) def test_parse_error(self): - from sqlalchemy.engine import default - - dialect = default.DefaultDialect() + dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) assert_raises_message( @@ -1048,9 +1028,7 @@ class HStoreTest(fixtures.TestBase): ) def test_result_deserialize_default(self): - from sqlalchemy.engine import default - - dialect = default.DefaultDialect() + dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) eq_( @@ -1059,9 +1037,7 @@ class HStoreTest(fixtures.TestBase): ) def test_result_deserialize_with_slashes_and_quotes(self): - from sqlalchemy.engine import default - - dialect = default.DefaultDialect() + dialect = postgresql.dialect() proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) eq_( @@ -1305,7 +1281,6 @@ class HStoreRoundTripTest(fixtures.TablesTest): return engine def test_reflect(self): - from sqlalchemy import inspect insp = inspect(testing.db) cols = insp.get_columns('data_table') assert isinstance(cols[2]['type'], HSTORE) @@ -1677,3 +1652,320 @@ class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest): def _data_obj(self): return self.extras.DateTimeTZRange(*self.tstzs()) + + +class JSONTest(AssertsCompiledSQL, fixtures.TestBase): + __dialect__ = 'postgresql' + + def setup(self): + metadata = MetaData() + self.test_table = Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('test_column', JSON) + ) + self.jsoncol = self.test_table.c.test_column + + def _test_where(self, whereclause, expected): + stmt = select([self.test_table]).where(whereclause) + self.assert_compile( + stmt, + "SELECT test_table.id, test_table.test_column FROM test_table " + "WHERE %s" % expected + ) + + def _test_cols(self, colclause, expected, from_=True): + stmt = select([colclause]) + self.assert_compile( + stmt, + ( + "SELECT %s" + + (" FROM test_table" if from_ else "") + ) % expected + ) + + def test_bind_serialize_default(self): + dialect = postgresql.dialect() + proc = self.test_table.c.test_column.type._cached_bind_processor(dialect) + eq_( + proc({"A": [1, 2, 3, True, False]}), + '{"A": [1, 2, 3, true, false]}' + ) + + def test_result_deserialize_default(self): + dialect = postgresql.dialect() + proc = self.test_table.c.test_column.type._cached_result_processor( + dialect, None) + eq_( + proc('{"A": [1, 2, 3, true, false]}'), + {"A": [1, 2, 3, True, False]} + ) + + # This test is a bit misleading -- in real life you will need to cast to do anything + def test_where_getitem(self): + self._test_where( + self.jsoncol['bar'] == None, + "(test_table.test_column -> %(test_column_1)s) IS NULL" + ) + + def test_where_path(self): + self._test_where( + self.jsoncol[("foo", 1)] == None, + "(test_table.test_column #> %(test_column_1)s) IS NULL" + ) + + def test_where_getitem_as_text(self): + self._test_where( + self.jsoncol['bar'].astext == None, + "(test_table.test_column ->> %(test_column_1)s) IS NULL" + ) + + def test_where_getitem_as_cast(self): + self._test_where( + self.jsoncol['bar'].cast(Integer) == 5, + "CAST(test_table.test_column ->> %(test_column_1)s AS INTEGER) " + "= %(param_1)s" + ) + + def test_where_path_as_text(self): + self._test_where( + self.jsoncol[("foo", 1)].astext == None, + "(test_table.test_column #>> %(test_column_1)s) IS NULL" + ) + + def test_cols_get(self): + self._test_cols( + self.jsoncol['foo'], + "test_table.test_column -> %(test_column_1)s AS anon_1", + True + ) + + +class JSONRoundTripTest(fixtures.TablesTest): + __only_on__ = ('postgresql >= 9.3',) + + @classmethod + def define_tables(cls, metadata): + Table('data_table', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(30), nullable=False), + Column('data', JSON) + ) + + def _fixture_data(self, engine): + data_table = self.tables.data_table + engine.execute( + data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, + {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, + {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, + {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, + {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}}, + ) + + def _assert_data(self, compare): + data = testing.db.execute( + select([self.tables.data_table.c.data]). + order_by(self.tables.data_table.c.name) + ).fetchall() + eq_([d for d, in data], compare) + + def _test_insert(self, engine): + engine.execute( + self.tables.data_table.insert(), + {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} + ) + self._assert_data([{"k1": "r1v1", "k2": "r1v2"}]) + + def _non_native_engine(self, json_serializer=None, json_deserializer=None): + if json_serializer is not None or json_deserializer is not None: + options = { + "json_serializer": json_serializer, + "json_deserializer": json_deserializer + } + else: + options = {} + + if testing.against("postgresql+psycopg2"): + from psycopg2.extras import register_default_json + engine = engines.testing_engine(options=options) + @event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + engine.dialect._has_native_json = False + def pass_(value): + return value + register_default_json(dbapi_connection, loads=pass_) + elif options: + engine = engines.testing_engine(options=options) + else: + engine = testing.db + engine.connect() + return engine + + def test_reflect(self): + insp = inspect(testing.db) + cols = insp.get_columns('data_table') + assert isinstance(cols[2]['type'], JSON) + + @testing.only_on("postgresql+psycopg2") + def test_insert_native(self): + engine = testing.db + self._test_insert(engine) + + def test_insert_python(self): + engine = self._non_native_engine() + self._test_insert(engine) + + + def _test_custom_serialize_deserialize(self, native): + import json + def loads(value): + value = json.loads(value) + value['x'] = value['x'] + '_loads' + return value + + def dumps(value): + value = dict(value) + value['x'] = 'dumps_y' + return json.dumps(value) + + if native: + engine = engines.testing_engine(options=dict( + json_serializer=dumps, + json_deserializer=loads + )) + else: + engine = self._non_native_engine( + json_serializer=dumps, + json_deserializer=loads + ) + + s = select([ + cast( + { + "key": "value", + "x": "q" + }, + JSON + ) + ]) + eq_( + engine.scalar(s), + { + "key": "value", + "x": "dumps_y_loads" + }, + ) + + @testing.only_on("postgresql+psycopg2") + def test_custom_native(self): + self._test_custom_serialize_deserialize(True) + + @testing.only_on("postgresql+psycopg2") + def test_custom_python(self): + self._test_custom_serialize_deserialize(False) + + + @testing.only_on("postgresql+psycopg2") + def test_criterion_native(self): + engine = testing.db + self._fixture_data(engine) + self._test_criterion(engine) + + def test_criterion_python(self): + engine = self._non_native_engine() + self._fixture_data(engine) + self._test_criterion(engine) + + def test_path_query(self): + engine = testing.db + self._fixture_data(engine) + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where( + data_table.c.data[('k1',)].astext == 'r3v1' + ) + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def test_query_returned_as_text(self): + engine = testing.db + self._fixture_data(engine) + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data['k1'].astext]) + ).first() + assert isinstance(result[0], util.text_type) + + def test_query_returned_as_int(self): + engine = testing.db + self._fixture_data(engine) + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data['k3'].cast(Integer)]).where( + data_table.c.name == 'r5') + ).first() + assert isinstance(result[0], int) + + def _test_criterion(self, engine): + data_table = self.tables.data_table + result = engine.execute( + select([data_table.c.data]).where( + data_table.c.data['k1'].astext == 'r3v1' + ) + ).first() + eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) + + def _test_fixed_round_trip(self, engine): + s = select([ + cast( + { + "key": "value", + "key2": {"k1": "v1", "k2": "v2"} + }, + JSON + ) + ]) + eq_( + engine.scalar(s), + { + "key": "value", + "key2": {"k1": "v1", "k2": "v2"} + }, + ) + + def test_fixed_round_trip_python(self): + engine = self._non_native_engine() + self._test_fixed_round_trip(engine) + + @testing.only_on("postgresql+psycopg2") + def test_fixed_round_trip_native(self): + engine = testing.db + self._test_fixed_round_trip(engine) + + def _test_unicode_round_trip(self, engine): + s = select([ + cast( + { + util.u('réveillé'): util.u('réveillé'), + "data": {"k1": util.u('drôle')} + }, + JSON + ) + ]) + eq_( + engine.scalar(s), + { + util.u('réveillé'): util.u('réveillé'), + "data": {"k1": util.u('drôle')} + }, + ) + + + def test_unicode_round_trip_python(self): + engine = self._non_native_engine() + self._test_unicode_round_trip(engine) + + @testing.only_on("postgresql+psycopg2") + def test_unicode_round_trip_native(self): + engine = testing.db + self._test_unicode_round_trip(engine) diff --git a/test/dialect/test_firebird.py b/test/dialect/test_firebird.py index 4a71b7d05..222e34b93 100644 --- a/test/dialect/test_firebird.py +++ b/test/dialect/test_firebird.py @@ -352,6 +352,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): for type_, args, kw, res in columns: self.assert_compile(type_(*args, **kw), res) + def test_quoting_initial_chars(self): + self.assert_compile( + column("_somecol"), + '"_somecol"' + ) + self.assert_compile( + column("$somecol"), + '"$somecol"' + ) class TypesTest(fixtures.TestBase): __only_on__ = 'firebird' diff --git a/test/dialect/test_informix.py b/test/dialect/test_informix.py deleted file mode 100644 index 332edd24e..000000000 --- a/test/dialect/test_informix.py +++ /dev/null @@ -1,25 +0,0 @@ -from sqlalchemy import * -from sqlalchemy.databases import informix -from sqlalchemy.testing import * - - -class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - - __dialect__ = informix.InformixDialect() - - def test_statements(self): - meta = MetaData() - t1 = Table('t1', meta, Column('col1', Integer, - primary_key=True), Column('col2', String(50))) - t2 = Table('t2', meta, Column('col1', Integer, - primary_key=True), Column('col2', String(50)), - Column('col3', Integer, ForeignKey('t1.col1'))) - self.assert_compile(t1.select(), - 'SELECT t1.col1, t1.col2 FROM t1') - self.assert_compile(select([t1, t2]).select_from(t1.join(t2)), - 'SELECT t1.col1, t1.col2, t2.col1, ' - 't2.col2, t2.col3 FROM t1 JOIN t2 ON ' - 't1.col1 = t2.col3') - self.assert_compile(t1.update().values({t1.c.col1: t1.c.col1 - + 1}), 'UPDATE t1 SET col1=(t1.col1 + ?)') - diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 71b2d96cb..8d0ff9776 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -18,7 +18,7 @@ from sqlalchemy.testing.schema import Table, Column import datetime import os from sqlalchemy import sql - +from sqlalchemy.testing.mock import Mock class OutParamTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'oracle+cx_oracle' @@ -26,31 +26,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): testing.db.execute(""" -create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS - retval number; - begin - retval := 6; - x_out := 10; - y_out := x_in * 15; - z_out := NULL; - end; + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; """) def test_out_params(self): - result = \ - testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' ':z_out); end;', bindparams=[bindparam('x_in', Float), outparam('x_out', Integer), outparam('y_out', Float), outparam('z_out', String)]), x_in=5) - eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out' - : None}) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) assert isinstance(result.out_parameters['x_out'], int) @classmethod def teardown_class(cls): - testing.db.execute("DROP PROCEDURE foo") + testing.db.execute("DROP PROCEDURE foo") class CXOracleArgsTest(fixtures.TestBase): __only_on__ = 'oracle+cx_oracle' @@ -92,7 +92,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): metadata.create_all() table.insert().execute( - {"option":1, "plain":1, "union":1} + {"option": 1, "plain": 1, "union": 1} ) eq_( testing.db.execute(table.select()).first(), @@ -106,8 +106,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - - __dialect__ = oracle.dialect() + __dialect__ = "oracle" #oracle.dialect() def test_true_false(self): self.assert_compile( @@ -218,6 +217,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ':ROWNUM_1) WHERE ora_rn > :ora_rn_1 FOR ' 'UPDATE') + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF mytable.myid") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 " + "FOR UPDATE OF mytable.myid NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7). + with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " + "mytable.myid, mytable.name NOWAIT") + + ta = table1.alias() + self.assert_compile( + ta.select(ta.c.myid == 7). + with_for_update(of=[ta.c.myid, ta.c.name]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable mytable_1 " + "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF " + "mytable_1.myid, mytable_1.name" + ) + def test_limit_preserves_typing_information(self): class MyType(TypeDecorator): impl = Integer @@ -250,7 +292,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_use_binds_for_limits_enabled(self): t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits = True) + dialect = oracle.OracleDialect(use_binds_for_limits=True) self.assert_compile(select([t]).limit(10), "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " @@ -348,8 +390,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) query = select([table1, table2], or_(table1.c.name == 'fred', - table1.c.myid == 10, table2.c.othername != 'jack' - , 'EXISTS (select yay from foo where boo = lar)' + table1.c.myid == 10, table2.c.othername != 'jack', + 'EXISTS (select yay from foo where boo = lar)' ), from_obj=[outerjoin(table1, table2, table1.c.myid == table2.c.otherid)]) self.assert_compile(query, @@ -435,8 +477,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'mytable.description AS description FROM ' 'mytable LEFT OUTER JOIN myothertable ON ' 'mytable.myid = myothertable.otherid) ' - 'anon_1 ON thirdtable.userid = anon_1.myid' - , dialect=oracle.dialect(use_ansi=True)) + 'anon_1 ON thirdtable.userid = anon_1.myid', + dialect=oracle.dialect(use_ansi=True)) self.assert_compile(q, 'SELECT thirdtable.userid, ' @@ -549,7 +591,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_returning_insert_labeled(self): t1 = table('t1', column('c1'), column('c2'), column('c3')) self.assert_compile( - t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), + t1.insert().values(c1=1).returning( + t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " "t1.c2, t1.c3 INTO :ret_0, :ret_1" ) @@ -587,33 +630,52 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateIndex(Index("bar", t1.c.x)), "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)" ) + + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer) + ) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - def test_ora8_flags(self): - def server_version_info(self): - return (8, 2, 5) + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + return dialect + + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) # before connect, assume modern DB assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - dialect.initialize(testing.db.connect()) + dialect.initialize(Mock()) assert not dialect.implicit_returning assert not dialect._supports_char_length assert not dialect._supports_nchar assert not dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"CLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) - dialect = oracle.dialect(implicit_returning=True, - dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info + + dialect = self._dialect((8, 2, 5), implicit_returning=True) dialect.initialize(testing.db.connect()) assert dialect.implicit_returning @@ -621,26 +683,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): def test_default_flags(self): """test with no initialization or server version info""" - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = self._dialect(None) + assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) def test_ora10_flags(self): - def server_version_info(self): - return (10, 2, 5) - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info - dialect.initialize(testing.db.connect()) + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): @@ -664,9 +725,18 @@ create table test_schema.child( parent_id integer references test_schema.parent(id) ); +create table local_table( + id integer primary key, + data varchar2(50) +); + create synonym test_schema.ptable for test_schema.parent; create synonym test_schema.ctable for test_schema.child; +create synonym test_schema_ptable for test_schema.parent; + +create synonym test_schema.local_table for local_table; + -- can't make a ref from local schema to the -- remote schema's table without this, -- *and* cant give yourself a grant ! @@ -682,15 +752,20 @@ grant references on test_schema.child to public; for stmt in """ drop table test_schema.child; drop table test_schema.parent; +drop table local_table; drop synonym test_schema.ctable; drop synonym test_schema.ptable; +drop synonym test_schema_ptable; +drop synonym test_schema.local_table; + """.split(";"): if stmt.strip(): testing.db.execute(stmt) + @testing.provide_metadata def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), schema=schema @@ -701,15 +776,31 @@ drop synonym test_schema.ptable; schema=schema ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) - def test_create_same_names_implicit_schema(self): + def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) + parent = Table('test_schema_ptable', meta, autoload=True, + oracle_resolve_synonyms=True) + self.assert_compile(parent.select(), + "SELECT test_schema_ptable.id, " + "test_schema_ptable.data FROM test_schema_ptable") + select([parent]).execute().fetchall() + + def test_reflect_alt_synonym_owner_local_table(self): + meta = MetaData(testing.db) + parent = Table('local_table', meta, autoload=True, + oracle_resolve_synonyms=True, schema="test_schema") + self.assert_compile(parent.select(), + "SELECT test_schema.local_table.id, " + "test_schema.local_table.data FROM test_schema.local_table") + select([parent]).execute().fetchall() + + @testing.provide_metadata + def test_create_same_names_implicit_schema(self): + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), ) @@ -718,12 +809,9 @@ drop synonym test_schema.ptable; Column('pid', Integer, ForeignKey('parent.pid')), ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): @@ -911,10 +999,17 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): dbapi = FakeDBAPI() b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) + def test_long(self): self.assert_compile(oracle.LONG(), "LONG") @@ -943,14 +1038,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(oracle.RAW(35), "RAW(35)") def test_char_length(self): - self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)") + self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") oracle8dialect = oracle.dialect() oracle8dialect.server_version_info = (8, 0) - self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect) + self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) - self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)") - self.assert_compile(CHAR(50),"CHAR(50)") + self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") + self.assert_compile(CHAR(50), "CHAR(50)") def test_varchar_types(self): dialect = oracle.dialect() @@ -961,6 +1056,12 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): (VARCHAR(50), "VARCHAR(50 CHAR)"), (oracle.NVARCHAR2(50), "NVARCHAR2(50)"), (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"), + (String(), "VARCHAR2"), + (Unicode(), "NVARCHAR2"), + (NVARCHAR(), "NVARCHAR2"), + (VARCHAR(), "VARCHAR"), + (oracle.NVARCHAR2(), "NVARCHAR2"), + (oracle.VARCHAR2(), "VARCHAR2"), ]: self.assert_compile(typ, exp, dialect=dialect) @@ -998,36 +1099,36 @@ class TypesTest(fixtures.TestBase): dict(id=3, data="value 3") ) - eq_(t.select().where(t.c.data=='value 2').execute().fetchall(), + eq_( + t.select().where(t.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) assert type(t2.c.data.type) is CHAR - eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(), + eq_( + t2.select().where(t2.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) finally: t.drop() @testing.requires.returning + @testing.provide_metadata def test_int_not_float(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('foo', Integer)) t1.create() - try: - r = t1.insert().values(foo=5).returning(t1.c.foo).execute() - x = r.scalar() - assert x == 5 - assert isinstance(x, int) - - x = t1.select().scalar() - assert x == 5 - assert isinstance(x, int) - finally: - t1.drop() + r = t1.insert().values(foo=5).returning(t1.c.foo).execute() + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) @testing.provide_metadata def test_rowid(self): @@ -1044,7 +1145,7 @@ class TypesTest(fixtures.TestBase): # the ROWID type is not really needed here, # as cx_oracle just treats it as a string, # but we want to make sure the ROWID works... - rowid_col= column('rowid', oracle.ROWID) + rowid_col = column('rowid', oracle.ROWID) s3 = select([t.c.x, rowid_col]).\ where(rowid_col == cast(rowid, oracle.ROWID)) eq_(s3.select().execute().fetchall(), @@ -1070,8 +1171,9 @@ class TypesTest(fixtures.TestBase): eq_(row['day_interval'], datetime.timedelta(days=35, seconds=5743)) + @testing.provide_metadata def test_numerics(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('intcol', Integer), Column('numericcol', Numeric(precision=9, scale=2)), @@ -1084,41 +1186,38 @@ class TypesTest(fixtures.TestBase): ) t1.create() - try: - t1.insert().execute( - intcol=1, - numericcol=5.2, - floatcol1=6.5, - floatcol2 = 8.5, - doubleprec = 9.5, - numbercol1=12, - numbercol2=14.85, - numbercol3=15.76 - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) + t1.insert().execute( + intcol=1, + numericcol=5.2, + floatcol1=6.5, + floatcol2=8.5, + doubleprec=9.5, + numbercol1=12, + numbercol2=14.85, + numbercol3=15.76 + ) - for row in ( - t1.select().execute().first(), - t2.select().execute().first() - ): - for i, (val, type_) in enumerate(( - (1, int), - (decimal.Decimal("5.2"), decimal.Decimal), - (6.5, float), - (8.5, float), - (9.5, float), - (12, int), - (decimal.Decimal("14.85"), decimal.Decimal), - (15.76, float), - )): - eq_(row[i], val) - assert isinstance(row[i], type_), '%r is not %r' \ - % (row[i], type_) + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + + for row in ( + t1.select().execute().first(), + t2.select().execute().first() + ): + for i, (val, type_) in enumerate(( + (1, int), + (decimal.Decimal("5.2"), decimal.Decimal), + (6.5, float), + (8.5, float), + (9.5, float), + (12, int), + (decimal.Decimal("14.85"), decimal.Decimal), + (15.76, float), + )): + eq_(row[i], val) + assert isinstance(row[i], type_), '%r is not %r' \ + % (row[i], type_) - finally: - t1.drop() def test_numeric_no_decimal_mode(self): @@ -1150,28 +1249,26 @@ class TypesTest(fixtures.TestBase): ) foo.create() - foo.insert().execute( - {'idata':5, 'ndata':decimal.Decimal("45.6"), - 'ndata2':decimal.Decimal("45.0"), - 'nidata':decimal.Decimal('53'), 'fdata':45.68392}, - ) + foo.insert().execute({ + 'idata': 5, + 'ndata': decimal.Decimal("45.6"), + 'ndata2': decimal.Decimal("45.0"), + 'nidata': decimal.Decimal('53'), + 'fdata': 45.68392 + }) - stmt = """ - SELECT - idata, - ndata, - ndata2, - nidata, - fdata - FROM foo - """ + stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, int, float] + ) eq_( row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001) + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + 53, 45.683920000000001) ) # with a nested subquery, @@ -1195,7 +1292,10 @@ class TypesTest(fixtures.TestBase): FROM dual """ row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) eq_( row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) @@ -1203,15 +1303,20 @@ class TypesTest(fixtures.TestBase): row = testing.db.execute(text(stmt, typemap={ - 'idata':Integer(), - 'ndata':Numeric(20, 2), - 'ndata2':Numeric(20, 2), - 'nidata':Numeric(5, 0), - 'fdata':Float() + 'idata': Integer(), + 'ndata': Numeric(20, 2), + 'ndata2': Numeric(20, 2), + 'nidata': Numeric(5, 0), + 'fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) stmt = """ @@ -1237,39 +1342,55 @@ class TypesTest(fixtures.TestBase): ) WHERE ROWNUM >= 0) anon_1 """ - row =testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) - eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))) + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2), - 'anon_1_ndata2':Numeric(20, 2), - 'anon_1_nidata':Numeric(5, 0), - 'anon_1_fdata':Float() + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2), + 'anon_1_ndata2': Numeric(20, 2), + 'anon_1_nidata': Numeric(5, 0), + 'anon_1_fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2, asdecimal=False), - 'anon_1_ndata2':Numeric(20, 2, asdecimal=False), - 'anon_1_nidata':Numeric(5, 0, asdecimal=False), - 'anon_1_fdata':Float(asdecimal=True) + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2, asdecimal=False), + 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), + 'anon_1_nidata': Numeric(5, 0, asdecimal=False), + 'anon_1_fdata': Float(asdecimal=True) })).fetchall()[0] - eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal]) - eq_(row, + eq_( + [type(x) for x in row], + [int, float, float, float, decimal.Decimal] + ) + eq_( + row, (5, 45.6, 45, 53, decimal.Decimal('45.68392')) ) + @testing.provide_metadata def test_reflect_dates(self): - metadata = MetaData(testing.db) + metadata = self.metadata Table( "date_types", metadata, Column('d1', DATE), @@ -1278,20 +1399,16 @@ class TypesTest(fixtures.TestBase): Column('d4', oracle.INTERVAL(second_precision=5)), ) metadata.create_all() - try: - m = MetaData(testing.db) - t1 = Table( - "date_types", m, - autoload=True) - assert isinstance(t1.c.d1.type, DATE) - assert isinstance(t1.c.d2.type, TIMESTAMP) - assert not t1.c.d2.type.timezone - assert isinstance(t1.c.d3.type, TIMESTAMP) - assert t1.c.d3.type.timezone - assert isinstance(t1.c.d4.type, oracle.INTERVAL) - - finally: - metadata.drop_all() + m = MetaData(testing.db) + t1 = Table( + "date_types", m, + autoload=True) + assert isinstance(t1.c.d1.type, DATE) + assert isinstance(t1.c.d2.type, TIMESTAMP) + assert not t1.c.d2.type.timezone + assert isinstance(t1.c.d3.type, TIMESTAMP) + assert t1.c.d3.type.timezone + assert isinstance(t1.c.d4.type, oracle.INTERVAL) def test_reflect_all_types_schema(self): types_table = Table('all_types', MetaData(testing.db), @@ -1319,7 +1436,7 @@ class TypesTest(fixtures.TestBase): @testing.provide_metadata def test_reflect_nvarchar(self): metadata = self.metadata - t = Table('t', metadata, + Table('t', metadata, Column('data', sqltypes.NVARCHAR(255)) ) metadata.create_all() @@ -1341,22 +1458,20 @@ class TypesTest(fixtures.TestBase): assert isinstance(res, util.text_type) + @testing.provide_metadata def test_char_length(self): - metadata = MetaData(testing.db) + metadata = self.metadata t1 = Table('t1', metadata, Column("c1", VARCHAR(50)), Column("c2", NVARCHAR(250)), Column("c3", CHAR(200)) ) t1.create() - try: - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - eq_(t2.c.c1.type.length, 50) - eq_(t2.c.c2.type.length, 250) - eq_(t2.c.c3.type.length, 200) - finally: - t1.drop() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.length, 50) + eq_(t2.c.c2.type.length, 250) + eq_(t2.c.c3.type.length, 200) @testing.provide_metadata def test_long_type(self): @@ -1372,8 +1487,6 @@ class TypesTest(fixtures.TestBase): "xyz" ) - - def test_longstring(self): metadata = MetaData(testing.db) testing.db.execute(""" @@ -1424,15 +1537,16 @@ class EuroNumericTest(fixtures.TestBase): del os.environ['NLS_LANG'] self.engine.dispose() - @testing.provide_metadata def test_output_type_handler(self): - metadata = self.metadata for stmt, exp, kw in [ ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), ("SELECT 15 FROM DUAL", 15, {}), - ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}), - ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}), - ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")}) + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", + decimal.Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", + decimal.Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), + {'num': decimal.Decimal("2.5")}) ]: test_exp = self.engine.scalar(stmt, **kw) eq_( @@ -1513,97 +1627,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL): class UnsupportedIndexReflectTest(fixtures.TestBase): __only_on__ = 'oracle' - def setup(self): - global metadata - metadata = MetaData(testing.db) - t1 = Table('test_index_reflect', metadata, + @testing.emits_warning("No column names") + @testing.provide_metadata + def test_reflect_functional_index(self): + metadata = self.metadata + Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True) ) metadata.create_all() - def teardown(self): - metadata.drop_all() - - @testing.emits_warning("No column names") - def test_reflect_functional_index(self): testing.db.execute('CREATE INDEX DATA_IDX ON ' 'TEST_INDEX_REFLECT (UPPER(DATA))') m2 = MetaData(testing.db) - t2 = Table('test_index_reflect', m2, autoload=True) + Table('test_index_reflect', m2, autoload=True) class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' + @testing.provide_metadata def test_basic(self): - engine = testing.db - metadata = MetaData(engine) + metadata = self.metadata - table=Table("sometable", metadata, + table = Table("sometable", metadata, Column("id_a", Unicode(255), primary_key=True), Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col','group'), + UniqueConstraint('col', 'group'), ) # "group" is a keyword, so lower case normalind = Index('tableind', table.c.id_b, table.c.group) - # create metadata.create_all() - try: - # round trip, create from reflection - mirror = MetaData(engine) - mirror.reflect() - metadata.drop_all() - mirror.create_all() - - # inspect the reflected creation - inspect = MetaData(engine) - inspect.reflect() - - def obj_definition(obj): - return obj.__class__, tuple([c.name for c in - obj.columns]), getattr(obj, 'unique', None) - - # find what the primary k constraint name should be - primaryconsname = engine.execute( - text("""SELECT constraint_name - FROM all_constraints - WHERE table_name = :table_name - AND owner = :owner - AND constraint_type = 'P' """), - table_name=table.name.upper(), - owner=engine.url.username.upper()).fetchall()[0][0] - - reflectedtable = inspect.tables[table.name] - - # make a dictionary of the reflected objects: - - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) - - # assert we got primary key constraint and its name, Error - # if not in dict - - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ - == primaryconsname.upper() - - # Error if not in dict - - assert reflected[(Index, ('id_b', 'group'), False)].name \ - == normalind.name - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected - assert len(reflectedtable.constraints) == 1 - assert len(reflectedtable.indexes) == 3 + mirror = MetaData(testing.db) + mirror.reflect() + metadata.drop_all() + mirror.create_all() - finally: - metadata.drop_all() + inspect = MetaData(testing.db) + inspect.reflect() + def obj_definition(obj): + return obj.__class__, tuple([c.name for c in + obj.columns]), getattr(obj, 'unique', None) + # find what the primary k constraint name should be + primaryconsname = testing.db.execute( + text("""SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=table.name.upper(), + owner=testing.db.url.username.upper()).fetchall()[0][0] + + reflectedtable = inspect.tables[table.name] + + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + eq_( + reflected[(Index, ('id_b', 'group'), False)].name, + normalind.name + ) + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + eq_(len(reflectedtable.constraints), 1) + eq_(len(reflectedtable.indexes), 3) class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): @@ -1650,11 +1753,11 @@ class ExecuteTest(fixtures.TestBase): metadata.create_all() t.insert().execute( - {'id':1, 'data':1}, - {'id':2, 'data':7}, - {'id':3, 'data':12}, - {'id':4, 'data':15}, - {'id':5, 'data':32}, + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, ) # here, we can't use ORDER BY. @@ -1679,7 +1782,7 @@ class UnicodeSchemaTest(fixtures.TestBase): @testing.provide_metadata def test_quoted_column_non_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column("_underscorecolumn", Unicode(255), primary_key=True), ) metadata.create_all() @@ -1688,14 +1791,14 @@ class UnicodeSchemaTest(fixtures.TestBase): {'_underscorecolumn': u('’é')}, ) result = testing.db.execute( - table.select().where(table.c._underscorecolumn==u('’é')) + table.select().where(table.c._underscorecolumn == u('’é')) ).scalar() eq_(result, u('’é')) @testing.provide_metadata def test_quoted_column_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column(u("méil"), Unicode(255), primary_key=True), ) metadata.create_all() |
