summaryrefslogtreecommitdiff
path: root/test/dialect
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect')
-rw-r--r--test/dialect/mssql/test_compiler.py44
-rw-r--r--test/dialect/mssql/test_engine.py7
-rw-r--r--test/dialect/mssql/test_query.py3
-rw-r--r--test/dialect/mysql/test_compiler.py47
-rw-r--r--test/dialect/mysql/test_dialect.py59
-rw-r--r--test/dialect/mysql/test_reflection.py73
-rw-r--r--test/dialect/mysql/test_types.py299
-rw-r--r--test/dialect/postgresql/test_compiler.py123
-rw-r--r--test/dialect/postgresql/test_dialect.py39
-rw-r--r--test/dialect/postgresql/test_reflection.py76
-rw-r--r--test/dialect/postgresql/test_types.py400
-rw-r--r--test/dialect/test_firebird.py9
-rw-r--r--test/dialect/test_informix.py25
-rw-r--r--test/dialect/test_oracle.py635
14 files changed, 1358 insertions, 481 deletions
diff --git a/test/dialect/mssql/test_compiler.py b/test/dialect/mssql/test_compiler.py
index 87037c6a4..f12ab0330 100644
--- a/test/dialect/mssql/test_compiler.py
+++ b/test/dialect/mssql/test_compiler.py
@@ -510,6 +510,29 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))"
)
+ def test_table_pkc_clustering(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata,
+ Column('x', Integer, autoincrement=False),
+ Column('y', Integer, autoincrement=False),
+ PrimaryKeyConstraint("x", "y", mssql_clustered=True))
+ self.assert_compile(schema.CreateTable(tbl),
+ "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, "
+ "PRIMARY KEY CLUSTERED (x, y))"
+ )
+
+ def test_table_uc_clustering(self):
+ metadata = MetaData()
+ tbl = Table('test', metadata,
+ Column('x', Integer, autoincrement=False),
+ Column('y', Integer, autoincrement=False),
+ PrimaryKeyConstraint("x"),
+ UniqueConstraint("y", mssql_clustered=True))
+ self.assert_compile(schema.CreateTable(tbl),
+ "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, "
+ "PRIMARY KEY (x), UNIQUE CLUSTERED (y))"
+ )
+
def test_index_clustering(self):
metadata = MetaData()
tbl = Table('test', metadata,
@@ -528,6 +551,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"CREATE INDEX foo ON test (x DESC, y)"
)
+ def test_create_index_expr(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer)
+ )
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x > 5)),
+ "CREATE INDEX bar ON foo (x > 5)"
+ )
+
+ def test_drop_index_w_schema(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer),
+ schema='bar'
+ )
+ self.assert_compile(
+ schema.DropIndex(Index("idx_foo", t1.c.x)),
+ "DROP INDEX idx_foo ON bar.foo"
+ )
+
def test_index_extra_include_1(self):
metadata = MetaData()
tbl = Table('test', metadata,
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py
index 2834f35ec..c07f30040 100644
--- a/test/dialect/mssql/test_engine.py
+++ b/test/dialect/mssql/test_engine.py
@@ -131,10 +131,11 @@ class ParseConnectTest(fixtures.TestBase):
for error in [
'Adaptive Server connection timed out',
+ 'Net-Lib error during Connection reset by peer',
'message 20003',
- "Error 10054",
- "Not connected to any MS SQL server",
- "Connection is closed"
+ 'Error 10054',
+ 'Not connected to any MS SQL server',
+ 'Connection is closed'
]:
eq_(dialect.is_disconnect(error, None, None), True)
diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py
index bff737145..6a12744a7 100644
--- a/test/dialect/mssql/test_query.py
+++ b/test/dialect/mssql/test_query.py
@@ -232,9 +232,10 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
con.execute("""drop trigger paj""")
meta.drop_all()
+ @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature')
@testing.provide_metadata
def test_disable_scope_identity(self):
- engine = engines.testing_engine(options={"use_scope_identity":False})
+ engine = engines.testing_engine(options={"use_scope_identity": False})
metadata = self.metadata
metadata.bind = engine
t1 = Table('t1', metadata,
diff --git a/test/dialect/mysql/test_compiler.py b/test/dialect/mysql/test_compiler.py
index a77a25cc4..45f8405c8 100644
--- a/test/dialect/mysql/test_compiler.py
+++ b/test/dialect/mysql/test_compiler.py
@@ -6,6 +6,7 @@ from sqlalchemy import sql, exc, schema, types as sqltypes
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import testing
+from sqlalchemy.sql import table, column
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -94,19 +95,57 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
"CREATE TABLE testtbl (data VARCHAR(255), "
"PRIMARY KEY (data) USING btree)")
- def test_skip_deferrable_kw(self):
+ def test_create_index_expr(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer)
+ )
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x > 5)),
+ "CREATE INDEX bar ON foo (x > 5)"
+ )
+
+ def test_deferrable_initially_kw_not_ignored(self):
m = MetaData()
t1 = Table('t1', m, Column('id', Integer, primary_key=True))
t2 = Table('t2', m, Column('id', Integer,
- ForeignKey('t1.id', deferrable=True),
+ ForeignKey('t1.id', deferrable=True, initially="XYZ"),
primary_key=True))
self.assert_compile(
schema.CreateTable(t2),
"CREATE TABLE t2 (id INTEGER NOT NULL, "
- "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id))"
+ "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id) DEFERRABLE INITIALLY XYZ)"
)
+ def test_match_kw_raises(self):
+ m = MetaData()
+ t1 = Table('t1', m, Column('id', Integer, primary_key=True))
+ t2 = Table('t2', m, Column('id', Integer,
+ ForeignKey('t1.id', match="XYZ"),
+ primary_key=True))
+
+ assert_raises_message(
+ exc.CompileError,
+ "MySQL ignores the 'MATCH' keyword while at the same time causes "
+ "ON UPDATE/ON DELETE clauses to be ignored.",
+ schema.CreateTable(t2).compile, dialect=mysql.dialect()
+ )
+
+ def test_for_update(self):
+ table1 = table('mytable',
+ column('myid'), column('name'), column('description'))
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s FOR UPDATE")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(read=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE")
+
class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
"""Tests MySQL-dialect specific compilation."""
@@ -302,8 +341,10 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
(VARCHAR, "CAST(t.col AS CHAR)"),
(NCHAR, "CAST(t.col AS CHAR)"),
(CHAR, "CAST(t.col AS CHAR)"),
+ (m.CHAR(charset='utf8'), "CAST(t.col AS CHAR CHARACTER SET utf8)"),
(CLOB, "CAST(t.col AS CHAR)"),
(TEXT, "CAST(t.col AS CHAR)"),
+ (m.TEXT(charset='utf8'), "CAST(t.col AS CHAR CHARACTER SET utf8)"),
(String(32), "CAST(t.col AS CHAR(32))"),
(Unicode(32), "CAST(t.col AS CHAR(32))"),
(CHAR(32), "CAST(t.col AS CHAR(32))"),
diff --git a/test/dialect/mysql/test_dialect.py b/test/dialect/mysql/test_dialect.py
index 62bdfc81b..2ff17f0f7 100644
--- a/test/dialect/mysql/test_dialect.py
+++ b/test/dialect/mysql/test_dialect.py
@@ -9,12 +9,17 @@ from sqlalchemy.testing import engines
import datetime
class DialectTest(fixtures.TestBase):
- __only_on__ = 'mysql'
+ def test_ssl_arguments_mysqldb(self):
+ from sqlalchemy.dialects.mysql import mysqldb
+ dialect = mysqldb.dialect()
+ self._test_ssl_arguments(dialect)
- @testing.only_on(['mysql+mysqldb', 'mysql+oursql'],
- 'requires particular SSL arguments')
- def test_ssl_arguments(self):
- dialect = testing.db.dialect
+ def test_ssl_arguments_oursql(self):
+ from sqlalchemy.dialects.mysql import oursql
+ dialect = oursql.dialect()
+ self._test_ssl_arguments(dialect)
+
+ def _test_ssl_arguments(self, dialect):
kwarg = dialect.create_connect_args(
make_url("mysql://scott:tiger@localhost:3306/test"
"?ssl_ca=/ca.pem&ssl_cert=/cert.pem&ssl_key=/key.pem")
@@ -33,6 +38,50 @@ class DialectTest(fixtures.TestBase):
}
)
+ def test_mysqlconnector_buffered_arg(self):
+ from sqlalchemy.dialects.mysql import mysqlconnector
+ dialect = mysqlconnector.dialect()
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db?buffered=true")
+ )[1]
+ eq_(kw['buffered'], True)
+
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db?buffered=false")
+ )[1]
+ eq_(kw['buffered'], False)
+
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db")
+ )[1]
+ eq_(kw['buffered'], True)
+
+ def test_mysqlconnector_raise_on_warnings_arg(self):
+ from sqlalchemy.dialects.mysql import mysqlconnector
+ dialect = mysqlconnector.dialect()
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db?raise_on_warnings=true")
+ )[1]
+ eq_(kw['raise_on_warnings'], True)
+
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db?raise_on_warnings=false")
+ )[1]
+ eq_(kw['raise_on_warnings'], False)
+
+ kw = dialect.create_connect_args(
+ make_url("mysql+mysqlconnector://u:p@host/db")
+ )[1]
+ eq_(kw['raise_on_warnings'], True)
+
+ @testing.only_on('mysql')
+ def test_random_arg(self):
+ dialect = testing.db.dialect
+ kw = dialect.create_connect_args(
+ make_url("mysql://u:p@host/db?foo=true")
+ )[1]
+ eq_(kw['foo'], "true")
+
class SQLModeDetectionTest(fixtures.TestBase):
__only_on__ = 'mysql'
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index b9e347d41..7494eaf43 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -140,33 +140,33 @@ class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
@testing.uses_deprecated('Manually quoting ENUM value literals')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
- specs = [( String(1), mysql.MSString(1), ),
- ( String(3), mysql.MSString(3), ),
- ( Text(), mysql.MSText(), ),
- ( Unicode(1), mysql.MSString(1), ),
- ( Unicode(3), mysql.MSString(3), ),
- ( UnicodeText(), mysql.MSText(), ),
- ( mysql.MSChar(1), ),
- ( mysql.MSChar(3), ),
- ( NCHAR(2), mysql.MSChar(2), ),
- ( mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only
- ( mysql.MSNVarChar(22), mysql.MSString(22), ),
- ( SmallInteger(), mysql.MSSmallInteger(), ),
- ( SmallInteger(), mysql.MSSmallInteger(4), ),
- ( mysql.MSSmallInteger(), ),
- ( mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
- ( mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
- ( mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
- ( LargeBinary(3), mysql.TINYBLOB(), ),
- ( LargeBinary(), mysql.BLOB() ),
- ( mysql.MSBinary(3), mysql.MSBinary(3), ),
- ( mysql.MSVarBinary(3),),
- ( mysql.MSTinyBlob(),),
- ( mysql.MSBlob(),),
- ( mysql.MSBlob(1234), mysql.MSBlob()),
- ( mysql.MSMediumBlob(),),
- ( mysql.MSLongBlob(),),
- ( mysql.ENUM("''","'fleem'"), ),
+ specs = [(String(1), mysql.MSString(1), ),
+ (String(3), mysql.MSString(3), ),
+ (Text(), mysql.MSText(), ),
+ (Unicode(1), mysql.MSString(1), ),
+ (Unicode(3), mysql.MSString(3), ),
+ (UnicodeText(), mysql.MSText(), ),
+ (mysql.MSChar(1), ),
+ (mysql.MSChar(3), ),
+ (NCHAR(2), mysql.MSChar(2), ),
+ (mysql.MSNChar(2), mysql.MSChar(2), ), # N is CREATE only
+ (mysql.MSNVarChar(22), mysql.MSString(22), ),
+ (SmallInteger(), mysql.MSSmallInteger(), ),
+ (SmallInteger(), mysql.MSSmallInteger(4), ),
+ (mysql.MSSmallInteger(), ),
+ (mysql.MSSmallInteger(4), mysql.MSSmallInteger(4), ),
+ (mysql.MSMediumInteger(), mysql.MSMediumInteger(), ),
+ (mysql.MSMediumInteger(8), mysql.MSMediumInteger(8), ),
+ (LargeBinary(3), mysql.TINYBLOB(), ),
+ (LargeBinary(), mysql.BLOB() ),
+ (mysql.MSBinary(3), mysql.MSBinary(3), ),
+ (mysql.MSVarBinary(3),),
+ (mysql.MSTinyBlob(),),
+ (mysql.MSBlob(),),
+ (mysql.MSBlob(1234), mysql.MSBlob()),
+ (mysql.MSMediumBlob(),),
+ (mysql.MSLongBlob(),),
+ (mysql.ENUM("''","'fleem'"), ),
]
columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
@@ -298,3 +298,22 @@ class RawReflectionTest(fixtures.TestBase):
assert regex.match(' PRIMARY KEY USING BTREE (`id`)')
assert regex.match(' PRIMARY KEY (`id`) USING BTREE')
+ def test_fk_reflection(self):
+ regex = self.parser._re_constraint
+
+ m = regex.match(' CONSTRAINT `addresses_user_id_fkey` '
+ 'FOREIGN KEY (`user_id`) '
+ 'REFERENCES `users` (`id`) '
+ 'ON DELETE CASCADE ON UPDATE CASCADE')
+ eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
+ '`users`', '`id`', None, 'CASCADE', 'CASCADE'))
+
+
+ m = regex.match(' CONSTRAINT `addresses_user_id_fkey` '
+ 'FOREIGN KEY (`user_id`) '
+ 'REFERENCES `users` (`id`) '
+ 'ON DELETE CASCADE ON UPDATE SET NULL')
+ eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
+ '`users`', '`id`', None, 'CASCADE', 'SET NULL'))
+
+
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index b918abe25..acf9c1e2f 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -4,12 +4,13 @@ from sqlalchemy.testing import eq_, assert_raises
from sqlalchemy import *
from sqlalchemy import sql, exc, schema
from sqlalchemy.util import u
+from sqlalchemy import util
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults
from sqlalchemy import testing
from sqlalchemy.testing.engines import utf8_engine
import datetime
-
+import decimal
class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
"Test MySQL column types"
@@ -141,10 +142,36 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
]
for type_, args, kw, res in columns:
+ type_inst = type_(*args, **kw)
self.assert_compile(
- type_(*args, **kw),
+ type_inst,
res
)
+ # test that repr() copies out all arguments
+ self.assert_compile(
+ eval("mysql.%r" % type_inst),
+ res
+ )
+
+ @testing.only_if('mysql')
+ @testing.provide_metadata
+ def test_precision_float_roundtrip(self):
+ t = Table('t', self.metadata,
+ Column('scale_value', mysql.DOUBLE(
+ precision=15, scale=12, asdecimal=True)),
+ Column('unscale_value', mysql.DOUBLE(
+ decimal_return_scale=12, asdecimal=True))
+ )
+ t.create(testing.db)
+ testing.db.execute(
+ t.insert(), scale_value=45.768392065789,
+ unscale_value=45.768392065789
+ )
+ result = testing.db.scalar(select([t.c.scale_value]))
+ eq_(result, decimal.Decimal("45.768392065789"))
+
+ result = testing.db.scalar(select([t.c.unscale_value]))
+ eq_(result, decimal.Decimal("45.768392065789"))
@testing.exclude('mysql', '<', (4, 1, 1), 'no charset support')
def test_charset(self):
@@ -212,14 +239,22 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
(mysql.ENUM, ["foo", "bar"], {'unicode':True},
'''ENUM('foo','bar') UNICODE'''),
- (String, [20], {"collation":"utf8"}, 'VARCHAR(20) COLLATE utf8')
+ (String, [20], {"collation": "utf8"}, 'VARCHAR(20) COLLATE utf8')
]
for type_, args, kw, res in columns:
+ type_inst = type_(*args, **kw)
self.assert_compile(
- type_(*args, **kw),
+ type_inst,
+ res
+ )
+ # test that repr() copies out all arguments
+ self.assert_compile(
+ eval("mysql.%r" % type_inst)
+ if type_ is not String
+ else eval("%r" % type_inst),
res
)
@@ -229,15 +264,23 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def test_charset_collate_table(self):
t = Table('foo', self.metadata,
Column('id', Integer),
+ Column('data', UnicodeText),
mysql_default_charset='utf8',
- mysql_collate='utf8_unicode_ci'
+ mysql_collate='utf8_bin'
)
t.create()
m2 = MetaData(testing.db)
t2 = Table('foo', m2, autoload=True)
- eq_(t2.kwargs['mysql_collate'], 'utf8_unicode_ci')
+ eq_(t2.kwargs['mysql_collate'], 'utf8_bin')
eq_(t2.kwargs['mysql_default charset'], 'utf8')
+ # test [ticket:2906]
+ # in order to test the condition here, need to use
+ # MySQLdb 1.2.3 and also need to pass either use_unicode=1
+ # or charset=utf8 to the URL.
+ t.insert().execute(id=1, data=u('some text'))
+ assert isinstance(testing.db.scalar(select([t.c.data])), util.text_type)
+
def test_bit_50(self):
"""Exercise BIT types on 5.0+ (not valid for all engine types)"""
@@ -250,7 +293,9 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
@testing.only_if('mysql')
@testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature')
- @testing.fails_on('mysql+oursql', 'some round trips fail, oursql bug ?')
+ @testing.fails_if(
+ lambda: testing.against("mysql+oursql") and util.py3k,
+ 'some round trips fail, oursql bug ?')
@testing.provide_metadata
def test_bit_50_roundtrip(self):
bit_table = Table('mysql_bits', self.metadata,
@@ -474,72 +519,24 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
self.assert_(colspec(table.c.y1).startswith('y1 YEAR'))
eq_(colspec(table.c.y5), 'y5 YEAR(4)')
- @testing.only_if('mysql')
- @testing.provide_metadata
- def test_set(self):
- """Exercise the SET type."""
- set_table = Table('mysql_set', self.metadata,
- Column('s1',
- mysql.MSSet("'dq'", "'sq'")), Column('s2',
- mysql.MSSet("'a'")), Column('s3',
- mysql.MSSet("'5'", "'7'", "'9'")))
- eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')")
- eq_(colspec(set_table.c.s2), "s2 SET('a')")
- eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')")
- set_table.create()
- reflected = Table('mysql_set', MetaData(testing.db),
- autoload=True)
- for table in set_table, reflected:
-
- def roundtrip(store, expected=None):
- expected = expected or store
- table.insert(store).execute()
- row = table.select().execute().first()
- self.assert_(list(row) == expected)
- table.delete().execute()
-
- roundtrip([None, None, None], [None] * 3)
- roundtrip(['', '', ''], [set([''])] * 3)
- roundtrip([set(['dq']), set(['a']), set(['5'])])
- roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']),
- set(['5'])])
- roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'
- ])])
- roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'
- ])])
- set_table.insert().execute({'s3': set(['5'])},
- {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])},
- {'s3': set(['7', '9'])})
-
- # NOTE: the string sent to MySQL here is sensitive to ordering.
- # for some reason the set ordering is always "5, 7" when we test on
- # MySQLdb but in Py3K this is not guaranteed. So basically our
- # SET type doesn't do ordering correctly (not sure how it can,
- # as we don't know how the SET was configured in the first place.)
- rows = select([set_table.c.s3],
- set_table.c.s3.in_([set(['5']), ['5', '7']])
- ).execute().fetchall()
- found = set([frozenset(row[0]) for row in rows])
- eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
-
-class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
__only_on__ = 'mysql'
__dialect__ = mysql.dialect()
- @testing.uses_deprecated('Manually quoting ENUM value literals')
@testing.provide_metadata
def test_enum(self):
"""Exercise the ENUM type."""
+ with testing.expect_deprecated('Manually quoting ENUM value literals'):
+ e1, e2 = mysql.ENUM("'a'", "'b'"), mysql.ENUM("'a'", "'b'")
+
enum_table = Table('mysql_enum', self.metadata,
- Column('e1', mysql.ENUM("'a'", "'b'")),
- Column('e2', mysql.ENUM("'a'", "'b'"),
- nullable=False),
- Column('e2generic', Enum("a", "b"),
- nullable=False),
+ Column('e1', e1),
+ Column('e2', e2, nullable=False),
+ Column('e2generic', Enum("a", "b"), nullable=False),
Column('e3', mysql.ENUM("'a'", "'b'", strict=True)),
Column('e4', mysql.ENUM("'a'", "'b'", strict=True),
nullable=False),
@@ -587,6 +584,106 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
eq_(res, expected)
+ @testing.provide_metadata
+ def test_set(self):
+
+ with testing.expect_deprecated('Manually quoting SET value literals'):
+ e1, e2 = mysql.SET("'a'", "'b'"), mysql.SET("'a'", "'b'")
+
+ set_table = Table('mysql_set', self.metadata,
+ Column('e1', e1),
+ Column('e2', e2, nullable=False),
+ Column('e3', mysql.SET("a", "b")),
+ Column('e4', mysql.SET("'a'", "b")),
+ Column('e5', mysql.SET("'a'", "'b'", quoting="quoted"))
+ )
+
+ eq_(colspec(set_table.c.e1),
+ "e1 SET('a','b')")
+ eq_(colspec(set_table.c.e2),
+ "e2 SET('a','b') NOT NULL")
+ eq_(colspec(set_table.c.e3),
+ "e3 SET('a','b')")
+ eq_(colspec(set_table.c.e4),
+ "e4 SET('''a''','b')")
+ eq_(colspec(set_table.c.e5),
+ "e5 SET('a','b')")
+ set_table.create()
+
+ assert_raises(exc.DBAPIError, set_table.insert().execute,
+ e1=None, e2=None, e3=None, e4=None)
+
+ if testing.against("+oursql"):
+ assert_raises(exc.StatementError, set_table.insert().execute,
+ e1='c', e2='c', e3='c', e4='c')
+
+ set_table.insert().execute(e1='a', e2='a', e3='a', e4="'a'", e5="a,b")
+ set_table.insert().execute(e1='b', e2='b', e3='b', e4='b', e5="a,b")
+
+ res = set_table.select().execute().fetchall()
+
+ if testing.against("+oursql"):
+ expected = [
+ # 1st row with all c's, data truncated
+ (set(['']), set(['']), set(['']), set(['']), None),
+ ]
+ else:
+ expected = []
+
+ expected.extend([
+ (set(['a']), set(['a']), set(['a']), set(["'a'"]), set(['a', 'b'])),
+ (set(['b']), set(['b']), set(['b']), set(['b']), set(['a', 'b']))
+ ])
+
+ eq_(res, expected)
+
+ @testing.provide_metadata
+ def test_set_roundtrip_plus_reflection(self):
+ set_table = Table('mysql_set', self.metadata,
+ Column('s1',
+ mysql.SET("dq", "sq")),
+ Column('s2', mysql.SET("a")),
+ Column('s3', mysql.SET("5", "7", "9")))
+
+ eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')")
+ eq_(colspec(set_table.c.s2), "s2 SET('a')")
+ eq_(colspec(set_table.c.s3), "s3 SET('5','7','9')")
+ set_table.create()
+ reflected = Table('mysql_set', MetaData(testing.db),
+ autoload=True)
+ for table in set_table, reflected:
+
+ def roundtrip(store, expected=None):
+ expected = expected or store
+ table.insert(store).execute()
+ row = table.select().execute().first()
+ self.assert_(list(row) == expected)
+ table.delete().execute()
+
+ roundtrip([None, None, None], [None] * 3)
+ roundtrip(['', '', ''], [set([''])] * 3)
+ roundtrip([set(['dq']), set(['a']), set(['5'])])
+ roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']),
+ set(['5'])])
+ roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'
+ ])])
+ roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'
+ ])])
+ set_table.insert().execute({'s3': set(['5'])},
+ {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])},
+ {'s3': set(['7', '9'])})
+
+ # NOTE: the string sent to MySQL here is sensitive to ordering.
+ # for some reason the set ordering is always "5, 7" when we test on
+ # MySQLdb but in Py3K this is not guaranteed. So basically our
+ # SET type doesn't do ordering correctly (not sure how it can,
+ # as we don't know how the SET was configured in the first place.)
+ rows = select([set_table.c.s3],
+ set_table.c.s3.in_([set(['5']), ['5', '7']])
+ ).execute().fetchall()
+ found = set([frozenset(row[0]) for row in rows])
+ eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
+
def test_unicode_enum(self):
unicode_engine = utf8_engine()
metadata = MetaData(unicode_engine)
@@ -634,38 +731,64 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
"VARCHAR(1), CHECK (somecolumn IN ('x', "
"'y', 'z')))")
+ @testing.provide_metadata
@testing.exclude('mysql', '<', (4,), "3.23 can't handle an ENUM of ''")
- @testing.uses_deprecated('Manually quoting ENUM value literals')
def test_enum_parse(self):
- """More exercises for the ENUM type."""
- # MySQL 3.23 can't handle an ENUM of ''....
-
- enum_table = Table('mysql_enum', MetaData(testing.db),
- Column('e1', mysql.ENUM("'a'")),
- Column('e2', mysql.ENUM("''")),
- Column('e3', mysql.ENUM('a')),
- Column('e4', mysql.ENUM('')),
- Column('e5', mysql.ENUM("'a'", "''")),
- Column('e6', mysql.ENUM("''", "'a'")),
- Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''")))
+ with testing.expect_deprecated('Manually quoting ENUM value literals'):
+ enum_table = Table('mysql_enum', self.metadata,
+ Column('e1', mysql.ENUM("'a'")),
+ Column('e2', mysql.ENUM("''")),
+ Column('e3', mysql.ENUM('a')),
+ Column('e4', mysql.ENUM('')),
+ Column('e5', mysql.ENUM("'a'", "''")),
+ Column('e6', mysql.ENUM("''", "'a'")),
+ Column('e7', mysql.ENUM("''", "'''a'''", "'b''b'", "''''")))
for col in enum_table.c:
self.assert_(repr(col))
- try:
- enum_table.create()
- reflected = Table('mysql_enum', MetaData(testing.db),
- autoload=True)
- for t in enum_table, reflected:
- eq_(t.c.e1.type.enums, ("a",))
- eq_(t.c.e2.type.enums, ("",))
- eq_(t.c.e3.type.enums, ("a",))
- eq_(t.c.e4.type.enums, ("",))
- eq_(t.c.e5.type.enums, ("a", ""))
- eq_(t.c.e6.type.enums, ("", "a"))
- eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'"))
- finally:
- enum_table.drop()
+
+ enum_table.create()
+ reflected = Table('mysql_enum', MetaData(testing.db),
+ autoload=True)
+ for t in enum_table, reflected:
+ eq_(t.c.e1.type.enums, ("a",))
+ eq_(t.c.e2.type.enums, ("",))
+ eq_(t.c.e3.type.enums, ("a",))
+ eq_(t.c.e4.type.enums, ("",))
+ eq_(t.c.e5.type.enums, ("a", ""))
+ eq_(t.c.e6.type.enums, ("", "a"))
+ eq_(t.c.e7.type.enums, ("", "'a'", "b'b", "'"))
+
+ @testing.provide_metadata
+ @testing.exclude('mysql', '<', (5,))
+ def test_set_parse(self):
+ with testing.expect_deprecated('Manually quoting SET value literals'):
+ set_table = Table('mysql_set', self.metadata,
+ Column('e1', mysql.SET("'a'")),
+ Column('e2', mysql.SET("''")),
+ Column('e3', mysql.SET('a')),
+ Column('e4', mysql.SET('')),
+ Column('e5', mysql.SET("'a'", "''")),
+ Column('e6', mysql.SET("''", "'a'")),
+ Column('e7', mysql.SET("''", "'''a'''", "'b''b'", "''''")))
+
+ for col in set_table.c:
+ self.assert_(repr(col))
+
+ set_table.create()
+
+ # don't want any warnings on reflection
+ reflected = Table('mysql_set', MetaData(testing.db),
+ autoload=True)
+ for t in set_table, reflected:
+ eq_(t.c.e1.type.values, ("a",))
+ eq_(t.c.e2.type.values, ("",))
+ eq_(t.c.e3.type.values, ("a",))
+ eq_(t.c.e4.type.values, ("",))
+ eq_(t.c.e5.type.values, ("a", ""))
+ eq_(t.c.e6.type.values, ("", "a"))
+ eq_(t.c.e7.type.values, ("", "'a'", "b'b", "'"))
def colspec(c):
return testing.db.dialect.ddl_compiler(
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py
index 11661b11f..e64afb186 100644
--- a/test/dialect/postgresql/test_compiler.py
+++ b/test/dialect/postgresql/test_compiler.py
@@ -16,6 +16,7 @@ from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import TSRANGE
from sqlalchemy.orm import mapper, aliased, Session
from sqlalchemy.sql import table, column, operators
+from sqlalchemy.util import u
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -106,6 +107,45 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'AS length_1', dialect=dialect)
+ def test_create_drop_enum(self):
+ # test escaping and unicode within CREATE TYPE for ENUM
+ typ = postgresql.ENUM(
+ "val1", "val2", "val's 3", u('méil'), name="myname")
+ self.assert_compile(postgresql.CreateEnumType(typ),
+ u("CREATE TYPE myname AS ENUM ('val1', 'val2', 'val''s 3', 'méil')")
+ )
+
+ typ = postgresql.ENUM(
+ "val1", "val2", "val's 3", name="PleaseQuoteMe")
+ self.assert_compile(postgresql.CreateEnumType(typ),
+ "CREATE TYPE \"PleaseQuoteMe\" AS ENUM "
+ "('val1', 'val2', 'val''s 3')"
+ )
+
+ def test_generic_enum(self):
+ e1 = Enum('x', 'y', 'z', name='somename')
+ e2 = Enum('x', 'y', 'z', name='somename', schema='someschema')
+ self.assert_compile(postgresql.CreateEnumType(e1),
+ "CREATE TYPE somename AS ENUM ('x', 'y', 'z')"
+ )
+ self.assert_compile(postgresql.CreateEnumType(e2),
+ "CREATE TYPE someschema.somename AS ENUM "
+ "('x', 'y', 'z')")
+ self.assert_compile(postgresql.DropEnumType(e1),
+ 'DROP TYPE somename')
+ self.assert_compile(postgresql.DropEnumType(e2),
+ 'DROP TYPE someschema.somename')
+ t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
+ self.assert_compile(schema.CreateTable(t1),
+ 'CREATE TABLE sometable (somecolumn '
+ 'somename)')
+ t1 = Table('sometable', MetaData(), Column('somecolumn',
+ Enum('x', 'y', 'z', native_enum=False)))
+ self.assert_compile(schema.CreateTable(t1),
+ "CREATE TABLE sometable (somecolumn "
+ "VARCHAR(1), CHECK (somecolumn IN ('x', "
+ "'y', 'z')))")
+
def test_create_partial_index(self):
m = MetaData()
tbl = Table('testtbl', m, Column('data', Integer))
@@ -173,6 +213,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'USING hash (data)',
dialect=postgresql.dialect())
+
+ def test_create_index_expr_gets_parens(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('x', Integer), Column('y', Integer))
+
+ idx1 = Index('test_idx1', 5 / (tbl.c.x + tbl.c.y))
+ self.assert_compile(
+ schema.CreateIndex(idx1),
+ "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))"
+ )
+
+ def test_create_index_literals(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', Integer))
+
+ idx1 = Index('test_idx1', tbl.c.data + 5)
+ self.assert_compile(
+ schema.CreateIndex(idx1),
+ "CREATE INDEX test_idx1 ON testtbl ((data + 5))"
+ )
+
def test_exclude_constraint_min(self):
m = MetaData()
tbl = Table('testtbl', m,
@@ -228,6 +289,68 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'SUBSTRING(%(substring_1)s FROM %(substring_2)s)')
+ def test_for_update(self):
+ table1 = table('mytable',
+ column('myid'), column('name'), column('description'))
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(read=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(read=True, nowait=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s "
+ "FOR UPDATE OF mytable")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(read=True, nowait=True, of=table1),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s "
+ "FOR SHARE OF mytable NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(read=True, nowait=True, of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s "
+ "FOR SHARE OF mytable NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(read=True, nowait=True,
+ of=[table1.c.myid, table1.c.name]),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s "
+ "FOR SHARE OF mytable NOWAIT")
+
+ ta = table1.alias()
+ self.assert_compile(
+ ta.select(ta.c.myid == 7).
+ with_for_update(of=[ta.c.myid, ta.c.name]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable AS mytable_1 "
+ "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1"
+ )
def test_reserved_words(self):
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index 1fc239cb7..fd6df2c98 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -17,6 +17,7 @@ from sqlalchemy.dialects.postgresql import base as postgresql
import logging
import logging.handlers
from sqlalchemy.testing.mock import Mock
+from sqlalchemy.engine.reflection import Inspector
class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
@@ -53,7 +54,11 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)),
('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, '
'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), '
- '64-bit', (9, 1, 2))]:
+ '64-bit', (9, 1, 2)),
+ ('[PostgreSQL 9.2.4 ] VMware vFabric Postgres 9.2.4.0 '
+ 'release build 1080137', (9, 2, 4))
+
+ ]:
eq_(testing.db.dialect._get_server_version_info(mock_conn(string)),
version)
@@ -63,8 +68,10 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert testing.db.dialect.dbapi.__version__.\
startswith(".".join(str(x) for x in v))
+ # currently not passing with pg 9.3 that does not seem to generate
+ # any notices here, woudl rather find a way to mock this
@testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
- def test_notice_logging(self):
+ def _test_notice_logging(self):
log = logging.getLogger('sqlalchemy.dialects.postgresql')
buf = logging.handlers.BufferingHandler(100)
lev = log.level
@@ -199,18 +206,32 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
def test_serial_integer(self):
- for type_, expected in [
- (Integer, 'SERIAL'),
- (BigInteger, 'BIGSERIAL'),
- (SmallInteger, 'SMALLINT'),
- (postgresql.INTEGER, 'SERIAL'),
- (postgresql.BIGINT, 'BIGSERIAL'),
+
+ for version, type_, expected in [
+ (None, Integer, 'SERIAL'),
+ (None, BigInteger, 'BIGSERIAL'),
+ ((9, 1), SmallInteger, 'SMALLINT'),
+ ((9, 2), SmallInteger, 'SMALLSERIAL'),
+ (None, postgresql.INTEGER, 'SERIAL'),
+ (None, postgresql.BIGINT, 'BIGSERIAL'),
]:
m = MetaData()
t = Table('t', m, Column('c', type_, primary_key=True))
- ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t))
+
+ if version:
+ dialect = postgresql.dialect()
+ dialect._get_server_version_info = Mock(return_value=version)
+ dialect.initialize(testing.db.connect())
+ else:
+ dialect = testing.db.dialect
+
+ ddl_compiler = dialect.ddl_compiler(
+ dialect,
+ schema.CreateTable(t)
+ )
eq_(
ddl_compiler.get_column_specification(t.c.c),
"c %s NOT NULL" % expected
)
+
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index fb399b546..58f34d5d0 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -5,6 +5,7 @@ from sqlalchemy.testing.assertions import eq_, assert_raises, \
AssertsCompiledSQL, ComparesTables
from sqlalchemy.testing import engines, fixtures
from sqlalchemy import testing
+from sqlalchemy import inspect
from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
String, Sequence, ForeignKey, join, Numeric, \
PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
@@ -159,6 +160,17 @@ class ReflectionTest(fixtures.TestBase):
subject.join(referer).onclause))
@testing.provide_metadata
+ def test_reflect_default_over_128_chars(self):
+ Table('t', self.metadata,
+ Column('x', String(200), server_default="abcd" * 40)
+ ).create(testing.db)
+
+ m = MetaData()
+ t = Table('t', m, autoload=True, autoload_with=testing.db)
+ eq_(
+ t.c.x.server_default.arg.text, "'%s'::character varying" % ("abcd" * 40)
+ )
+ @testing.provide_metadata
def test_renamed_sequence_reflection(self):
metadata = self.metadata
t = Table('t', metadata, Column('id', Integer, primary_key=True))
@@ -416,6 +428,70 @@ class ReflectionTest(fixtures.TestBase):
eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}])
conn.close()
+ @testing.provide_metadata
+ def test_foreign_key_option_inspection(self):
+ metadata = self.metadata
+ Table('person', metadata,
+ Column('id', String(length=32), nullable=False, primary_key=True),
+ Column('company_id', ForeignKey('company.id',
+ name='person_company_id_fkey',
+ match='FULL', onupdate='RESTRICT', ondelete='RESTRICT',
+ deferrable=True, initially='DEFERRED'
+ )
+ )
+ )
+ Table('company', metadata,
+ Column('id', String(length=32), nullable=False, primary_key=True),
+ Column('name', String(length=255)),
+ Column('industry_id', ForeignKey('industry.id',
+ name='company_industry_id_fkey',
+ onupdate='CASCADE', ondelete='CASCADE',
+ deferrable=False, # PG default
+ initially='IMMEDIATE' # PG default
+ )
+ )
+ )
+ Table('industry', metadata,
+ Column('id', Integer(), nullable=False, primary_key=True),
+ Column('name', String(length=255))
+ )
+ fk_ref = {
+ 'person_company_id_fkey': {
+ 'name': 'person_company_id_fkey',
+ 'constrained_columns': ['company_id'],
+ 'referred_columns': ['id'],
+ 'referred_table': 'company',
+ 'referred_schema': None,
+ 'options': {
+ 'onupdate': 'RESTRICT',
+ 'deferrable': True,
+ 'ondelete': 'RESTRICT',
+ 'initially': 'DEFERRED',
+ 'match': 'FULL'
+ }
+ },
+ 'company_industry_id_fkey': {
+ 'name': 'company_industry_id_fkey',
+ 'constrained_columns': ['industry_id'],
+ 'referred_columns': ['id'],
+ 'referred_table': 'industry',
+ 'referred_schema': None,
+ 'options': {
+ 'onupdate': 'CASCADE',
+ 'deferrable': None,
+ 'ondelete': 'CASCADE',
+ 'initially': None,
+ 'match': None
+ }
+ }
+ }
+ metadata.create_all()
+ inspector = inspect(testing.db)
+ fks = inspector.get_foreign_keys('person') + \
+ inspector.get_foreign_keys('company')
+ for fk in fks:
+ eq_(fk, fk_ref[fk['name']])
+
class CustomTypeReflectionTest(fixtures.TestBase):
class CustomType(object):
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index 784f8bcbf..ba4b63e1a 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -10,18 +10,22 @@ from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
func, literal_column, literal, bindparam, cast, extract, \
SmallInteger, Enum, REAL, update, insert, Index, delete, \
- and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
+ and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text, \
+ type_coerce
from sqlalchemy.orm import Session, mapper, aliased
from sqlalchemy import exc, schema, types
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \
- INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE
+ INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \
+ JSON
import decimal
from sqlalchemy import util
from sqlalchemy.testing.util import round_decimal
from sqlalchemy.sql import table, column, operators
import logging
import re
+from sqlalchemy import inspect
+from sqlalchemy import event
class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults):
__only_on__ = 'postgresql'
@@ -96,34 +100,10 @@ class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults):
([5], [5], [6], [decimal.Decimal("6.4")])
)
-class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class EnumTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = 'postgresql'
- __dialect__ = postgresql.dialect()
- def test_compile(self):
- e1 = Enum('x', 'y', 'z', name='somename')
- e2 = Enum('x', 'y', 'z', name='somename', schema='someschema')
- self.assert_compile(postgresql.CreateEnumType(e1),
- "CREATE TYPE somename AS ENUM ('x','y','z')"
- )
- self.assert_compile(postgresql.CreateEnumType(e2),
- "CREATE TYPE someschema.somename AS ENUM "
- "('x','y','z')")
- self.assert_compile(postgresql.DropEnumType(e1),
- 'DROP TYPE somename')
- self.assert_compile(postgresql.DropEnumType(e2),
- 'DROP TYPE someschema.somename')
- t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
- self.assert_compile(schema.CreateTable(t1),
- 'CREATE TABLE sometable (somecolumn '
- 'somename)')
- t1 = Table('sometable', MetaData(), Column('somecolumn',
- Enum('x', 'y', 'z', native_enum=False)))
- self.assert_compile(schema.CreateTable(t1),
- "CREATE TABLE sometable (somecolumn "
- "VARCHAR(1), CHECK (somecolumn IN ('x', "
- "'y', 'z')))")
@testing.fails_on('postgresql+zxjdbc',
'zxjdbc fails on ENUM: column "XXX" is of type '
@@ -860,7 +840,8 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
Column('plain_interval', postgresql.INTERVAL),
Column('year_interval', y2m()),
Column('month_interval', d2s()),
- Column('precision_interval', postgresql.INTERVAL(precision=3))
+ Column('precision_interval', postgresql.INTERVAL(precision=3)),
+ Column('tsvector_document', postgresql.TSVECTOR)
)
metadata.create_all()
@@ -893,6 +874,17 @@ class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
self.assert_compile(type_, expected)
@testing.provide_metadata
+ def test_tsvector_round_trip(self):
+ t = Table('t1', self.metadata, Column('data', postgresql.TSVECTOR))
+ t.create()
+ testing.db.execute(t.insert(), data="a fat cat sat")
+ eq_(testing.db.scalar(select([t.c.data])), "'a' 'cat' 'fat' 'sat'")
+
+ testing.db.execute(t.update(), data="'a' 'cat' 'fat' 'mat' 'sat'")
+
+ eq_(testing.db.scalar(select([t.c.data])), "'a' 'cat' 'fat' 'mat' 'sat'")
+
+ @testing.provide_metadata
def test_bit_reflection(self):
metadata = self.metadata
t1 = Table('t1', metadata,
@@ -918,7 +910,6 @@ class UUIDTest(fixtures.TestBase):
__only_on__ = 'postgresql'
- @testing.requires.python25
@testing.fails_on('postgresql+zxjdbc',
'column "data" is of type uuid but expression is of type character varying')
@testing.fails_on('postgresql+pg8000', 'No support for UUID type')
@@ -932,7 +923,6 @@ class UUIDTest(fixtures.TestBase):
str(uuid.uuid4())
)
- @testing.requires.python25
@testing.fails_on('postgresql+zxjdbc',
'column "data" is of type uuid but expression is of type character varying')
@testing.fails_on('postgresql+pg8000', 'No support for UUID type')
@@ -978,13 +968,8 @@ class UUIDTest(fixtures.TestBase):
-class HStoreTest(fixtures.TestBase):
- def _assert_sql(self, construct, expected):
- dialect = postgresql.dialect()
- compiled = str(construct.compile(dialect=dialect))
- compiled = re.sub(r'\s+', ' ', compiled)
- expected = re.sub(r'\s+', ' ', expected)
- eq_(compiled, expected)
+class HStoreTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = 'postgresql'
def setup(self):
metadata = MetaData()
@@ -996,7 +981,7 @@ class HStoreTest(fixtures.TestBase):
def _test_where(self, whereclause, expected):
stmt = select([self.test_table]).where(whereclause)
- self._assert_sql(
+ self.assert_compile(
stmt,
"SELECT test_table.id, test_table.hash FROM test_table "
"WHERE %s" % expected
@@ -1004,7 +989,7 @@ class HStoreTest(fixtures.TestBase):
def _test_cols(self, colclause, expected, from_=True):
stmt = select([colclause])
- self._assert_sql(
+ self.assert_compile(
stmt,
(
"SELECT %s" +
@@ -1013,9 +998,8 @@ class HStoreTest(fixtures.TestBase):
)
def test_bind_serialize_default(self):
- from sqlalchemy.engine import default
- dialect = default.DefaultDialect()
+ dialect = postgresql.dialect()
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(
proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
@@ -1023,9 +1007,7 @@ class HStoreTest(fixtures.TestBase):
)
def test_bind_serialize_with_slashes_and_quotes(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
+ dialect = postgresql.dialect()
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(
proc({'\\"a': '\\"1'}),
@@ -1033,9 +1015,7 @@ class HStoreTest(fixtures.TestBase):
)
def test_parse_error(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
+ dialect = postgresql.dialect()
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None)
assert_raises_message(
@@ -1048,9 +1028,7 @@ class HStoreTest(fixtures.TestBase):
)
def test_result_deserialize_default(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
+ dialect = postgresql.dialect()
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None)
eq_(
@@ -1059,9 +1037,7 @@ class HStoreTest(fixtures.TestBase):
)
def test_result_deserialize_with_slashes_and_quotes(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
+ dialect = postgresql.dialect()
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None)
eq_(
@@ -1305,7 +1281,6 @@ class HStoreRoundTripTest(fixtures.TablesTest):
return engine
def test_reflect(self):
- from sqlalchemy import inspect
insp = inspect(testing.db)
cols = insp.get_columns('data_table')
assert isinstance(cols[2]['type'], HSTORE)
@@ -1677,3 +1652,320 @@ class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest):
def _data_obj(self):
return self.extras.DateTimeTZRange(*self.tstzs())
+
+
+class JSONTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = 'postgresql'
+
+ def setup(self):
+ metadata = MetaData()
+ self.test_table = Table('test_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('test_column', JSON)
+ )
+ self.jsoncol = self.test_table.c.test_column
+
+ def _test_where(self, whereclause, expected):
+ stmt = select([self.test_table]).where(whereclause)
+ self.assert_compile(
+ stmt,
+ "SELECT test_table.id, test_table.test_column FROM test_table "
+ "WHERE %s" % expected
+ )
+
+ def _test_cols(self, colclause, expected, from_=True):
+ stmt = select([colclause])
+ self.assert_compile(
+ stmt,
+ (
+ "SELECT %s" +
+ (" FROM test_table" if from_ else "")
+ ) % expected
+ )
+
+ def test_bind_serialize_default(self):
+ dialect = postgresql.dialect()
+ proc = self.test_table.c.test_column.type._cached_bind_processor(dialect)
+ eq_(
+ proc({"A": [1, 2, 3, True, False]}),
+ '{"A": [1, 2, 3, true, false]}'
+ )
+
+ def test_result_deserialize_default(self):
+ dialect = postgresql.dialect()
+ proc = self.test_table.c.test_column.type._cached_result_processor(
+ dialect, None)
+ eq_(
+ proc('{"A": [1, 2, 3, true, false]}'),
+ {"A": [1, 2, 3, True, False]}
+ )
+
+ # This test is a bit misleading -- in real life you will need to cast to do anything
+ def test_where_getitem(self):
+ self._test_where(
+ self.jsoncol['bar'] == None,
+ "(test_table.test_column -> %(test_column_1)s) IS NULL"
+ )
+
+ def test_where_path(self):
+ self._test_where(
+ self.jsoncol[("foo", 1)] == None,
+ "(test_table.test_column #> %(test_column_1)s) IS NULL"
+ )
+
+ def test_where_getitem_as_text(self):
+ self._test_where(
+ self.jsoncol['bar'].astext == None,
+ "(test_table.test_column ->> %(test_column_1)s) IS NULL"
+ )
+
+ def test_where_getitem_as_cast(self):
+ self._test_where(
+ self.jsoncol['bar'].cast(Integer) == 5,
+ "CAST(test_table.test_column ->> %(test_column_1)s AS INTEGER) "
+ "= %(param_1)s"
+ )
+
+ def test_where_path_as_text(self):
+ self._test_where(
+ self.jsoncol[("foo", 1)].astext == None,
+ "(test_table.test_column #>> %(test_column_1)s) IS NULL"
+ )
+
+ def test_cols_get(self):
+ self._test_cols(
+ self.jsoncol['foo'],
+ "test_table.test_column -> %(test_column_1)s AS anon_1",
+ True
+ )
+
+
+class JSONRoundTripTest(fixtures.TablesTest):
+ __only_on__ = ('postgresql >= 9.3',)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('data_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30), nullable=False),
+ Column('data', JSON)
+ )
+
+ def _fixture_data(self, engine):
+ data_table = self.tables.data_table
+ engine.execute(
+ data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}},
+ {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}},
+ {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}},
+ {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}},
+ {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2", "k3": 5}},
+ )
+
+ def _assert_data(self, compare):
+ data = testing.db.execute(
+ select([self.tables.data_table.c.data]).
+ order_by(self.tables.data_table.c.name)
+ ).fetchall()
+ eq_([d for d, in data], compare)
+
+ def _test_insert(self, engine):
+ engine.execute(
+ self.tables.data_table.insert(),
+ {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}
+ )
+ self._assert_data([{"k1": "r1v1", "k2": "r1v2"}])
+
+ def _non_native_engine(self, json_serializer=None, json_deserializer=None):
+ if json_serializer is not None or json_deserializer is not None:
+ options = {
+ "json_serializer": json_serializer,
+ "json_deserializer": json_deserializer
+ }
+ else:
+ options = {}
+
+ if testing.against("postgresql+psycopg2"):
+ from psycopg2.extras import register_default_json
+ engine = engines.testing_engine(options=options)
+ @event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ engine.dialect._has_native_json = False
+ def pass_(value):
+ return value
+ register_default_json(dbapi_connection, loads=pass_)
+ elif options:
+ engine = engines.testing_engine(options=options)
+ else:
+ engine = testing.db
+ engine.connect()
+ return engine
+
+ def test_reflect(self):
+ insp = inspect(testing.db)
+ cols = insp.get_columns('data_table')
+ assert isinstance(cols[2]['type'], JSON)
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_insert_native(self):
+ engine = testing.db
+ self._test_insert(engine)
+
+ def test_insert_python(self):
+ engine = self._non_native_engine()
+ self._test_insert(engine)
+
+
+ def _test_custom_serialize_deserialize(self, native):
+ import json
+ def loads(value):
+ value = json.loads(value)
+ value['x'] = value['x'] + '_loads'
+ return value
+
+ def dumps(value):
+ value = dict(value)
+ value['x'] = 'dumps_y'
+ return json.dumps(value)
+
+ if native:
+ engine = engines.testing_engine(options=dict(
+ json_serializer=dumps,
+ json_deserializer=loads
+ ))
+ else:
+ engine = self._non_native_engine(
+ json_serializer=dumps,
+ json_deserializer=loads
+ )
+
+ s = select([
+ cast(
+ {
+ "key": "value",
+ "x": "q"
+ },
+ JSON
+ )
+ ])
+ eq_(
+ engine.scalar(s),
+ {
+ "key": "value",
+ "x": "dumps_y_loads"
+ },
+ )
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_custom_native(self):
+ self._test_custom_serialize_deserialize(True)
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_custom_python(self):
+ self._test_custom_serialize_deserialize(False)
+
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_criterion_native(self):
+ engine = testing.db
+ self._fixture_data(engine)
+ self._test_criterion(engine)
+
+ def test_criterion_python(self):
+ engine = self._non_native_engine()
+ self._fixture_data(engine)
+ self._test_criterion(engine)
+
+ def test_path_query(self):
+ engine = testing.db
+ self._fixture_data(engine)
+ data_table = self.tables.data_table
+ result = engine.execute(
+ select([data_table.c.data]).where(
+ data_table.c.data[('k1',)].astext == 'r3v1'
+ )
+ ).first()
+ eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
+
+ def test_query_returned_as_text(self):
+ engine = testing.db
+ self._fixture_data(engine)
+ data_table = self.tables.data_table
+ result = engine.execute(
+ select([data_table.c.data['k1'].astext])
+ ).first()
+ assert isinstance(result[0], util.text_type)
+
+ def test_query_returned_as_int(self):
+ engine = testing.db
+ self._fixture_data(engine)
+ data_table = self.tables.data_table
+ result = engine.execute(
+ select([data_table.c.data['k3'].cast(Integer)]).where(
+ data_table.c.name == 'r5')
+ ).first()
+ assert isinstance(result[0], int)
+
+ def _test_criterion(self, engine):
+ data_table = self.tables.data_table
+ result = engine.execute(
+ select([data_table.c.data]).where(
+ data_table.c.data['k1'].astext == 'r3v1'
+ )
+ ).first()
+ eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
+
+ def _test_fixed_round_trip(self, engine):
+ s = select([
+ cast(
+ {
+ "key": "value",
+ "key2": {"k1": "v1", "k2": "v2"}
+ },
+ JSON
+ )
+ ])
+ eq_(
+ engine.scalar(s),
+ {
+ "key": "value",
+ "key2": {"k1": "v1", "k2": "v2"}
+ },
+ )
+
+ def test_fixed_round_trip_python(self):
+ engine = self._non_native_engine()
+ self._test_fixed_round_trip(engine)
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_fixed_round_trip_native(self):
+ engine = testing.db
+ self._test_fixed_round_trip(engine)
+
+ def _test_unicode_round_trip(self, engine):
+ s = select([
+ cast(
+ {
+ util.u('réveillé'): util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ },
+ JSON
+ )
+ ])
+ eq_(
+ engine.scalar(s),
+ {
+ util.u('réveillé'): util.u('réveillé'),
+ "data": {"k1": util.u('drôle')}
+ },
+ )
+
+
+ def test_unicode_round_trip_python(self):
+ engine = self._non_native_engine()
+ self._test_unicode_round_trip(engine)
+
+ @testing.only_on("postgresql+psycopg2")
+ def test_unicode_round_trip_native(self):
+ engine = testing.db
+ self._test_unicode_round_trip(engine)
diff --git a/test/dialect/test_firebird.py b/test/dialect/test_firebird.py
index 4a71b7d05..222e34b93 100644
--- a/test/dialect/test_firebird.py
+++ b/test/dialect/test_firebird.py
@@ -352,6 +352,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
for type_, args, kw, res in columns:
self.assert_compile(type_(*args, **kw), res)
+ def test_quoting_initial_chars(self):
+ self.assert_compile(
+ column("_somecol"),
+ '"_somecol"'
+ )
+ self.assert_compile(
+ column("$somecol"),
+ '"$somecol"'
+ )
class TypesTest(fixtures.TestBase):
__only_on__ = 'firebird'
diff --git a/test/dialect/test_informix.py b/test/dialect/test_informix.py
deleted file mode 100644
index 332edd24e..000000000
--- a/test/dialect/test_informix.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from sqlalchemy import *
-from sqlalchemy.databases import informix
-from sqlalchemy.testing import *
-
-
-class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __dialect__ = informix.InformixDialect()
-
- def test_statements(self):
- meta = MetaData()
- t1 = Table('t1', meta, Column('col1', Integer,
- primary_key=True), Column('col2', String(50)))
- t2 = Table('t2', meta, Column('col1', Integer,
- primary_key=True), Column('col2', String(50)),
- Column('col3', Integer, ForeignKey('t1.col1')))
- self.assert_compile(t1.select(),
- 'SELECT t1.col1, t1.col2 FROM t1')
- self.assert_compile(select([t1, t2]).select_from(t1.join(t2)),
- 'SELECT t1.col1, t1.col2, t2.col1, '
- 't2.col2, t2.col3 FROM t1 JOIN t2 ON '
- 't1.col1 = t2.col3')
- self.assert_compile(t1.update().values({t1.c.col1: t1.c.col1
- + 1}), 'UPDATE t1 SET col1=(t1.col1 + ?)')
-
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index 71b2d96cb..8d0ff9776 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -18,7 +18,7 @@ from sqlalchemy.testing.schema import Table, Column
import datetime
import os
from sqlalchemy import sql
-
+from sqlalchemy.testing.mock import Mock
class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = 'oracle+cx_oracle'
@@ -26,31 +26,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
testing.db.execute("""
-create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS
- retval number;
- begin
- retval := 6;
- x_out := 10;
- y_out := x_in * 15;
- z_out := NULL;
- end;
+ create or replace procedure foo(x_in IN number, x_out OUT number,
+ y_out OUT number, z_out OUT varchar) IS
+ retval number;
+ begin
+ retval := 6;
+ x_out := 10;
+ y_out := x_in * 15;
+ z_out := NULL;
+ end;
""")
def test_out_params(self):
- result = \
- testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
+ result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
':z_out); end;',
bindparams=[bindparam('x_in', Float),
outparam('x_out', Integer),
outparam('y_out', Float),
outparam('z_out', String)]), x_in=5)
- eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out'
- : None})
+ eq_(result.out_parameters,
+ {'x_out': 10, 'y_out': 75, 'z_out': None})
assert isinstance(result.out_parameters['x_out'], int)
@classmethod
def teardown_class(cls):
- testing.db.execute("DROP PROCEDURE foo")
+ testing.db.execute("DROP PROCEDURE foo")
class CXOracleArgsTest(fixtures.TestBase):
__only_on__ = 'oracle+cx_oracle'
@@ -92,7 +92,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
metadata.create_all()
table.insert().execute(
- {"option":1, "plain":1, "union":1}
+ {"option": 1, "plain": 1, "union": 1}
)
eq_(
testing.db.execute(table.select()).first(),
@@ -106,8 +106,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __dialect__ = oracle.dialect()
+ __dialect__ = "oracle" #oracle.dialect()
def test_true_false(self):
self.assert_compile(
@@ -218,6 +217,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
':ROWNUM_1) WHERE ora_rn > :ora_rn_1 FOR '
'UPDATE')
+ def test_for_update(self):
+ table1 = table('mytable',
+ column('myid'), column('name'), column('description'))
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF mytable.myid")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 "
+ "FOR UPDATE OF mytable.myid NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
+ "mytable.myid, mytable.name NOWAIT")
+
+ ta = table1.alias()
+ self.assert_compile(
+ ta.select(ta.c.myid == 7).
+ with_for_update(of=[ta.c.myid, ta.c.name]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable mytable_1 "
+ "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF "
+ "mytable_1.myid, mytable_1.name"
+ )
+
def test_limit_preserves_typing_information(self):
class MyType(TypeDecorator):
impl = Integer
@@ -250,7 +292,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_use_binds_for_limits_enabled(self):
t = table('sometable', column('col1'), column('col2'))
- dialect = oracle.OracleDialect(use_binds_for_limits = True)
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(select([t]).limit(10),
"SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
@@ -348,8 +390,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
)
query = select([table1, table2], or_(table1.c.name == 'fred',
- table1.c.myid == 10, table2.c.othername != 'jack'
- , 'EXISTS (select yay from foo where boo = lar)'
+ table1.c.myid == 10, table2.c.othername != 'jack',
+ 'EXISTS (select yay from foo where boo = lar)'
), from_obj=[outerjoin(table1, table2,
table1.c.myid == table2.c.otherid)])
self.assert_compile(query,
@@ -435,8 +477,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'mytable.description AS description FROM '
'mytable LEFT OUTER JOIN myothertable ON '
'mytable.myid = myothertable.otherid) '
- 'anon_1 ON thirdtable.userid = anon_1.myid'
- , dialect=oracle.dialect(use_ansi=True))
+ 'anon_1 ON thirdtable.userid = anon_1.myid',
+ dialect=oracle.dialect(use_ansi=True))
self.assert_compile(q,
'SELECT thirdtable.userid, '
@@ -549,7 +591,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_returning_insert_labeled(self):
t1 = table('t1', column('c1'), column('c2'), column('c3'))
self.assert_compile(
- t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
+ t1.insert().values(c1=1).returning(
+ t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
"INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
"t1.c2, t1.c3 INTO :ret_0, :ret_1"
)
@@ -587,33 +630,52 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
schema.CreateIndex(Index("bar", t1.c.x)),
"CREATE INDEX alt_schema.bar ON alt_schema.foo (x)"
)
+
+ def test_create_index_expr(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer)
+ )
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x > 5)),
+ "CREATE INDEX bar ON foo (x > 5)"
+ )
+
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'oracle'
- def test_ora8_flags(self):
- def server_version_info(self):
- return (8, 2, 5)
+ def _dialect(self, server_version, **kw):
+ def server_version_info(conn):
+ return server_version
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+ dialect = oracle.dialect(
+ dbapi=Mock(version="0.0.0", paramstyle="named"),
+ **kw)
dialect._get_server_version_info = server_version_info
+ dialect._check_unicode_returns = Mock()
+ dialect._check_unicode_description = Mock()
+ dialect._get_default_schema_name = Mock()
+ return dialect
+
+
+ def test_ora8_flags(self):
+ dialect = self._dialect((8, 2, 5))
# before connect, assume modern DB
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- dialect.initialize(testing.db.connect())
+ dialect.initialize(Mock())
assert not dialect.implicit_returning
assert not dialect._supports_char_length
assert not dialect._supports_nchar
assert not dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect)
- self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"CLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "CLOB", dialect=dialect)
- dialect = oracle.dialect(implicit_returning=True,
- dbapi=testing.db.dialect.dbapi)
- dialect._get_server_version_info = server_version_info
+
+ dialect = self._dialect((8, 2, 5), implicit_returning=True)
dialect.initialize(testing.db.connect())
assert dialect.implicit_returning
@@ -621,26 +683,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def test_default_flags(self):
"""test with no initialization or server version info"""
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+ dialect = self._dialect(None)
+
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
- self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
def test_ora10_flags(self):
- def server_version_info(self):
- return (10, 2, 5)
- dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
- dialect._get_server_version_info = server_version_info
- dialect.initialize(testing.db.connect())
+ dialect = self._dialect((10, 2, 5))
+
+ dialect.initialize(Mock())
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
- self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
- self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
- self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -664,9 +725,18 @@ create table test_schema.child(
parent_id integer references test_schema.parent(id)
);
+create table local_table(
+ id integer primary key,
+ data varchar2(50)
+);
+
create synonym test_schema.ptable for test_schema.parent;
create synonym test_schema.ctable for test_schema.child;
+create synonym test_schema_ptable for test_schema.parent;
+
+create synonym test_schema.local_table for local_table;
+
-- can't make a ref from local schema to the
-- remote schema's table without this,
-- *and* cant give yourself a grant !
@@ -682,15 +752,20 @@ grant references on test_schema.child to public;
for stmt in """
drop table test_schema.child;
drop table test_schema.parent;
+drop table local_table;
drop synonym test_schema.ctable;
drop synonym test_schema.ptable;
+drop synonym test_schema_ptable;
+drop synonym test_schema.local_table;
+
""".split(";"):
if stmt.strip():
testing.db.execute(stmt)
+ @testing.provide_metadata
def test_create_same_names_explicit_schema(self):
schema = testing.db.dialect.default_schema_name
- meta = MetaData(testing.db)
+ meta = self.metadata
parent = Table('parent', meta,
Column('pid', Integer, primary_key=True),
schema=schema
@@ -701,15 +776,31 @@ drop synonym test_schema.ptable;
schema=schema
)
meta.create_all()
- try:
- parent.insert().execute({'pid':1})
- child.insert().execute({'cid':1, 'pid':1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
- finally:
- meta.drop_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
- def test_create_same_names_implicit_schema(self):
+ def test_reflect_alt_table_owner_local_synonym(self):
meta = MetaData(testing.db)
+ parent = Table('test_schema_ptable', meta, autoload=True,
+ oracle_resolve_synonyms=True)
+ self.assert_compile(parent.select(),
+ "SELECT test_schema_ptable.id, "
+ "test_schema_ptable.data FROM test_schema_ptable")
+ select([parent]).execute().fetchall()
+
+ def test_reflect_alt_synonym_owner_local_table(self):
+ meta = MetaData(testing.db)
+ parent = Table('local_table', meta, autoload=True,
+ oracle_resolve_synonyms=True, schema="test_schema")
+ self.assert_compile(parent.select(),
+ "SELECT test_schema.local_table.id, "
+ "test_schema.local_table.data FROM test_schema.local_table")
+ select([parent]).execute().fetchall()
+
+ @testing.provide_metadata
+ def test_create_same_names_implicit_schema(self):
+ meta = self.metadata
parent = Table('parent', meta,
Column('pid', Integer, primary_key=True),
)
@@ -718,12 +809,9 @@ drop synonym test_schema.ptable;
Column('pid', Integer, ForeignKey('parent.pid')),
)
meta.create_all()
- try:
- parent.insert().execute({'pid':1})
- child.insert().execute({'cid':1, 'pid':1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
- finally:
- meta.drop_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
def test_reflect_alt_owner_explicit(self):
@@ -911,10 +999,17 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
dbapi = FakeDBAPI()
b = bindparam("foo", "hello world!")
- assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
b = bindparam("foo", "hello world!")
- assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
+
def test_long(self):
self.assert_compile(oracle.LONG(), "LONG")
@@ -943,14 +1038,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(oracle.RAW(35), "RAW(35)")
def test_char_length(self):
- self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)")
+ self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)")
oracle8dialect = oracle.dialect()
oracle8dialect.server_version_info = (8, 0)
- self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect)
+ self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect)
- self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)")
- self.assert_compile(CHAR(50),"CHAR(50)")
+ self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)")
+ self.assert_compile(CHAR(50), "CHAR(50)")
def test_varchar_types(self):
dialect = oracle.dialect()
@@ -961,6 +1056,12 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
(VARCHAR(50), "VARCHAR(50 CHAR)"),
(oracle.NVARCHAR2(50), "NVARCHAR2(50)"),
(oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"),
+ (String(), "VARCHAR2"),
+ (Unicode(), "NVARCHAR2"),
+ (NVARCHAR(), "NVARCHAR2"),
+ (VARCHAR(), "VARCHAR"),
+ (oracle.NVARCHAR2(), "NVARCHAR2"),
+ (oracle.VARCHAR2(), "VARCHAR2"),
]:
self.assert_compile(typ, exp, dialect=dialect)
@@ -998,36 +1099,36 @@ class TypesTest(fixtures.TestBase):
dict(id=3, data="value 3")
)
- eq_(t.select().where(t.c.data=='value 2').execute().fetchall(),
+ eq_(
+ t.select().where(t.c.data == 'value 2').execute().fetchall(),
[(2, 'value 2 ')]
- )
+ )
m2 = MetaData(testing.db)
t2 = Table('t1', m2, autoload=True)
assert type(t2.c.data.type) is CHAR
- eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(),
+ eq_(
+ t2.select().where(t2.c.data == 'value 2').execute().fetchall(),
[(2, 'value 2 ')]
- )
+ )
finally:
t.drop()
@testing.requires.returning
+ @testing.provide_metadata
def test_int_not_float(self):
- m = MetaData(testing.db)
+ m = self.metadata
t1 = Table('t1', m, Column('foo', Integer))
t1.create()
- try:
- r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
- x = r.scalar()
- assert x == 5
- assert isinstance(x, int)
-
- x = t1.select().scalar()
- assert x == 5
- assert isinstance(x, int)
- finally:
- t1.drop()
+ r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
+ x = r.scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ x = t1.select().scalar()
+ assert x == 5
+ assert isinstance(x, int)
@testing.provide_metadata
def test_rowid(self):
@@ -1044,7 +1145,7 @@ class TypesTest(fixtures.TestBase):
# the ROWID type is not really needed here,
# as cx_oracle just treats it as a string,
# but we want to make sure the ROWID works...
- rowid_col= column('rowid', oracle.ROWID)
+ rowid_col = column('rowid', oracle.ROWID)
s3 = select([t.c.x, rowid_col]).\
where(rowid_col == cast(rowid, oracle.ROWID))
eq_(s3.select().execute().fetchall(),
@@ -1070,8 +1171,9 @@ class TypesTest(fixtures.TestBase):
eq_(row['day_interval'], datetime.timedelta(days=35,
seconds=5743))
+ @testing.provide_metadata
def test_numerics(self):
- m = MetaData(testing.db)
+ m = self.metadata
t1 = Table('t1', m,
Column('intcol', Integer),
Column('numericcol', Numeric(precision=9, scale=2)),
@@ -1084,41 +1186,38 @@ class TypesTest(fixtures.TestBase):
)
t1.create()
- try:
- t1.insert().execute(
- intcol=1,
- numericcol=5.2,
- floatcol1=6.5,
- floatcol2 = 8.5,
- doubleprec = 9.5,
- numbercol1=12,
- numbercol2=14.85,
- numbercol3=15.76
- )
-
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
+ t1.insert().execute(
+ intcol=1,
+ numericcol=5.2,
+ floatcol1=6.5,
+ floatcol2=8.5,
+ doubleprec=9.5,
+ numbercol1=12,
+ numbercol2=14.85,
+ numbercol3=15.76
+ )
- for row in (
- t1.select().execute().first(),
- t2.select().execute().first()
- ):
- for i, (val, type_) in enumerate((
- (1, int),
- (decimal.Decimal("5.2"), decimal.Decimal),
- (6.5, float),
- (8.5, float),
- (9.5, float),
- (12, int),
- (decimal.Decimal("14.85"), decimal.Decimal),
- (15.76, float),
- )):
- eq_(row[i], val)
- assert isinstance(row[i], type_), '%r is not %r' \
- % (row[i], type_)
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+
+ for row in (
+ t1.select().execute().first(),
+ t2.select().execute().first()
+ ):
+ for i, (val, type_) in enumerate((
+ (1, int),
+ (decimal.Decimal("5.2"), decimal.Decimal),
+ (6.5, float),
+ (8.5, float),
+ (9.5, float),
+ (12, int),
+ (decimal.Decimal("14.85"), decimal.Decimal),
+ (15.76, float),
+ )):
+ eq_(row[i], val)
+ assert isinstance(row[i], type_), '%r is not %r' \
+ % (row[i], type_)
- finally:
- t1.drop()
def test_numeric_no_decimal_mode(self):
@@ -1150,28 +1249,26 @@ class TypesTest(fixtures.TestBase):
)
foo.create()
- foo.insert().execute(
- {'idata':5, 'ndata':decimal.Decimal("45.6"),
- 'ndata2':decimal.Decimal("45.0"),
- 'nidata':decimal.Decimal('53'), 'fdata':45.68392},
- )
+ foo.insert().execute({
+ 'idata': 5,
+ 'ndata': decimal.Decimal("45.6"),
+ 'ndata2': decimal.Decimal("45.0"),
+ 'nidata': decimal.Decimal('53'),
+ 'fdata': 45.68392
+ })
- stmt = """
- SELECT
- idata,
- ndata,
- ndata2,
- nidata,
- fdata
- FROM foo
- """
+ stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
row = testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float])
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, int, float]
+ )
eq_(
row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001)
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ 53, 45.683920000000001)
)
# with a nested subquery,
@@ -1195,7 +1292,10 @@ class TypesTest(fixtures.TestBase):
FROM dual
"""
row = testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
eq_(
row,
(5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
@@ -1203,15 +1303,20 @@ class TypesTest(fixtures.TestBase):
row = testing.db.execute(text(stmt,
typemap={
- 'idata':Integer(),
- 'ndata':Numeric(20, 2),
- 'ndata2':Numeric(20, 2),
- 'nidata':Numeric(5, 0),
- 'fdata':Float()
+ 'idata': Integer(),
+ 'ndata': Numeric(20, 2),
+ 'ndata2': Numeric(20, 2),
+ 'nidata': Numeric(5, 0),
+ 'fdata': Float()
})).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
- eq_(row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
)
stmt = """
@@ -1237,39 +1342,55 @@ class TypesTest(fixtures.TestBase):
)
WHERE ROWNUM >= 0) anon_1
"""
- row =testing.db.execute(stmt).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
- eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')))
+ row = testing.db.execute(stmt).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
+ )
row = testing.db.execute(text(stmt,
typemap={
- 'anon_1_idata':Integer(),
- 'anon_1_ndata':Numeric(20, 2),
- 'anon_1_ndata2':Numeric(20, 2),
- 'anon_1_nidata':Numeric(5, 0),
- 'anon_1_fdata':Float()
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2),
+ 'anon_1_ndata2': Numeric(20, 2),
+ 'anon_1_nidata': Numeric(5, 0),
+ 'anon_1_fdata': Float()
})).fetchall()[0]
- eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
- eq_(row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
)
row = testing.db.execute(text(stmt,
typemap={
- 'anon_1_idata':Integer(),
- 'anon_1_ndata':Numeric(20, 2, asdecimal=False),
- 'anon_1_ndata2':Numeric(20, 2, asdecimal=False),
- 'anon_1_nidata':Numeric(5, 0, asdecimal=False),
- 'anon_1_fdata':Float(asdecimal=True)
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2, asdecimal=False),
+ 'anon_1_ndata2': Numeric(20, 2, asdecimal=False),
+ 'anon_1_nidata': Numeric(5, 0, asdecimal=False),
+ 'anon_1_fdata': Float(asdecimal=True)
})).fetchall()[0]
- eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal])
- eq_(row,
+ eq_(
+ [type(x) for x in row],
+ [int, float, float, float, decimal.Decimal]
+ )
+ eq_(
+ row,
(5, 45.6, 45, 53, decimal.Decimal('45.68392'))
)
+ @testing.provide_metadata
def test_reflect_dates(self):
- metadata = MetaData(testing.db)
+ metadata = self.metadata
Table(
"date_types", metadata,
Column('d1', DATE),
@@ -1278,20 +1399,16 @@ class TypesTest(fixtures.TestBase):
Column('d4', oracle.INTERVAL(second_precision=5)),
)
metadata.create_all()
- try:
- m = MetaData(testing.db)
- t1 = Table(
- "date_types", m,
- autoload=True)
- assert isinstance(t1.c.d1.type, DATE)
- assert isinstance(t1.c.d2.type, TIMESTAMP)
- assert not t1.c.d2.type.timezone
- assert isinstance(t1.c.d3.type, TIMESTAMP)
- assert t1.c.d3.type.timezone
- assert isinstance(t1.c.d4.type, oracle.INTERVAL)
-
- finally:
- metadata.drop_all()
+ m = MetaData(testing.db)
+ t1 = Table(
+ "date_types", m,
+ autoload=True)
+ assert isinstance(t1.c.d1.type, DATE)
+ assert isinstance(t1.c.d2.type, TIMESTAMP)
+ assert not t1.c.d2.type.timezone
+ assert isinstance(t1.c.d3.type, TIMESTAMP)
+ assert t1.c.d3.type.timezone
+ assert isinstance(t1.c.d4.type, oracle.INTERVAL)
def test_reflect_all_types_schema(self):
types_table = Table('all_types', MetaData(testing.db),
@@ -1319,7 +1436,7 @@ class TypesTest(fixtures.TestBase):
@testing.provide_metadata
def test_reflect_nvarchar(self):
metadata = self.metadata
- t = Table('t', metadata,
+ Table('t', metadata,
Column('data', sqltypes.NVARCHAR(255))
)
metadata.create_all()
@@ -1341,22 +1458,20 @@ class TypesTest(fixtures.TestBase):
assert isinstance(res, util.text_type)
+ @testing.provide_metadata
def test_char_length(self):
- metadata = MetaData(testing.db)
+ metadata = self.metadata
t1 = Table('t1', metadata,
Column("c1", VARCHAR(50)),
Column("c2", NVARCHAR(250)),
Column("c3", CHAR(200))
)
t1.create()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- eq_(t2.c.c1.type.length, 50)
- eq_(t2.c.c2.type.length, 250)
- eq_(t2.c.c3.type.length, 200)
- finally:
- t1.drop()
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ eq_(t2.c.c1.type.length, 50)
+ eq_(t2.c.c2.type.length, 250)
+ eq_(t2.c.c3.type.length, 200)
@testing.provide_metadata
def test_long_type(self):
@@ -1372,8 +1487,6 @@ class TypesTest(fixtures.TestBase):
"xyz"
)
-
-
def test_longstring(self):
metadata = MetaData(testing.db)
testing.db.execute("""
@@ -1424,15 +1537,16 @@ class EuroNumericTest(fixtures.TestBase):
del os.environ['NLS_LANG']
self.engine.dispose()
- @testing.provide_metadata
def test_output_type_handler(self):
- metadata = self.metadata
for stmt, exp, kw in [
("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}),
("SELECT 15 FROM DUAL", 15, {}),
- ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}),
- ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}),
- ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")})
+ ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL",
+ decimal.Decimal("15"), {}),
+ ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL",
+ decimal.Decimal("0.1"), {}),
+ ("SELECT :num FROM DUAL", decimal.Decimal("2.5"),
+ {'num': decimal.Decimal("2.5")})
]:
test_exp = self.engine.scalar(stmt, **kw)
eq_(
@@ -1513,97 +1627,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL):
class UnsupportedIndexReflectTest(fixtures.TestBase):
__only_on__ = 'oracle'
- def setup(self):
- global metadata
- metadata = MetaData(testing.db)
- t1 = Table('test_index_reflect', metadata,
+ @testing.emits_warning("No column names")
+ @testing.provide_metadata
+ def test_reflect_functional_index(self):
+ metadata = self.metadata
+ Table('test_index_reflect', metadata,
Column('data', String(20), primary_key=True)
)
metadata.create_all()
- def teardown(self):
- metadata.drop_all()
-
- @testing.emits_warning("No column names")
- def test_reflect_functional_index(self):
testing.db.execute('CREATE INDEX DATA_IDX ON '
'TEST_INDEX_REFLECT (UPPER(DATA))')
m2 = MetaData(testing.db)
- t2 = Table('test_index_reflect', m2, autoload=True)
+ Table('test_index_reflect', m2, autoload=True)
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = 'oracle'
+ @testing.provide_metadata
def test_basic(self):
- engine = testing.db
- metadata = MetaData(engine)
+ metadata = self.metadata
- table=Table("sometable", metadata,
+ table = Table("sometable", metadata,
Column("id_a", Unicode(255), primary_key=True),
Column("id_b", Unicode(255), primary_key=True, unique=True),
Column("group", Unicode(255), primary_key=True),
Column("col", Unicode(255)),
- UniqueConstraint('col','group'),
+ UniqueConstraint('col', 'group'),
)
# "group" is a keyword, so lower case
normalind = Index('tableind', table.c.id_b, table.c.group)
- # create
metadata.create_all()
- try:
- # round trip, create from reflection
- mirror = MetaData(engine)
- mirror.reflect()
- metadata.drop_all()
- mirror.create_all()
-
- # inspect the reflected creation
- inspect = MetaData(engine)
- inspect.reflect()
-
- def obj_definition(obj):
- return obj.__class__, tuple([c.name for c in
- obj.columns]), getattr(obj, 'unique', None)
-
- # find what the primary k constraint name should be
- primaryconsname = engine.execute(
- text("""SELECT constraint_name
- FROM all_constraints
- WHERE table_name = :table_name
- AND owner = :owner
- AND constraint_type = 'P' """),
- table_name=table.name.upper(),
- owner=engine.url.username.upper()).fetchall()[0][0]
-
- reflectedtable = inspect.tables[table.name]
-
- # make a dictionary of the reflected objects:
-
- reflected = dict([(obj_definition(i), i) for i in
- reflectedtable.indexes
- | reflectedtable.constraints])
-
- # assert we got primary key constraint and its name, Error
- # if not in dict
-
- assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
- 'group'), None)].name.upper() \
- == primaryconsname.upper()
-
- # Error if not in dict
-
- assert reflected[(Index, ('id_b', 'group'), False)].name \
- == normalind.name
- assert (Index, ('id_b', ), True) in reflected
- assert (Index, ('col', 'group'), True) in reflected
- assert len(reflectedtable.constraints) == 1
- assert len(reflectedtable.indexes) == 3
+ mirror = MetaData(testing.db)
+ mirror.reflect()
+ metadata.drop_all()
+ mirror.create_all()
- finally:
- metadata.drop_all()
+ inspect = MetaData(testing.db)
+ inspect.reflect()
+ def obj_definition(obj):
+ return obj.__class__, tuple([c.name for c in
+ obj.columns]), getattr(obj, 'unique', None)
+ # find what the primary k constraint name should be
+ primaryconsname = testing.db.execute(
+ text("""SELECT constraint_name
+ FROM all_constraints
+ WHERE table_name = :table_name
+ AND owner = :owner
+ AND constraint_type = 'P' """),
+ table_name=table.name.upper(),
+ owner=testing.db.url.username.upper()).fetchall()[0][0]
+
+ reflectedtable = inspect.tables[table.name]
+
+ # make a dictionary of the reflected objects:
+
+ reflected = dict([(obj_definition(i), i) for i in
+ reflectedtable.indexes
+ | reflectedtable.constraints])
+
+ # assert we got primary key constraint and its name, Error
+ # if not in dict
+
+ assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+ 'group'), None)].name.upper() \
+ == primaryconsname.upper()
+
+ # Error if not in dict
+
+ eq_(
+ reflected[(Index, ('id_b', 'group'), False)].name,
+ normalind.name
+ )
+ assert (Index, ('id_b', ), True) in reflected
+ assert (Index, ('col', 'group'), True) in reflected
+ eq_(len(reflectedtable.constraints), 1)
+ eq_(len(reflectedtable.indexes), 3)
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -1650,11 +1753,11 @@ class ExecuteTest(fixtures.TestBase):
metadata.create_all()
t.insert().execute(
- {'id':1, 'data':1},
- {'id':2, 'data':7},
- {'id':3, 'data':12},
- {'id':4, 'data':15},
- {'id':5, 'data':32},
+ {'id': 1, 'data': 1},
+ {'id': 2, 'data': 7},
+ {'id': 3, 'data': 12},
+ {'id': 4, 'data': 15},
+ {'id': 5, 'data': 32},
)
# here, we can't use ORDER BY.
@@ -1679,7 +1782,7 @@ class UnicodeSchemaTest(fixtures.TestBase):
@testing.provide_metadata
def test_quoted_column_non_unicode(self):
metadata = self.metadata
- table=Table("atable", metadata,
+ table = Table("atable", metadata,
Column("_underscorecolumn", Unicode(255), primary_key=True),
)
metadata.create_all()
@@ -1688,14 +1791,14 @@ class UnicodeSchemaTest(fixtures.TestBase):
{'_underscorecolumn': u('’é')},
)
result = testing.db.execute(
- table.select().where(table.c._underscorecolumn==u('’é'))
+ table.select().where(table.c._underscorecolumn == u('’é'))
).scalar()
eq_(result, u('’é'))
@testing.provide_metadata
def test_quoted_column_unicode(self):
metadata = self.metadata
- table=Table("atable", metadata,
+ table = Table("atable", metadata,
Column(u("méil"), Unicode(255), primary_key=True),
)
metadata.create_all()