diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
commit | 1c23741b8e045d266d0ecbed975952547444a5fa (patch) | |
tree | 366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/test_mssql.py | |
parent | 83f3dbc83d1066216084a01b32cddcc090f697d5 (diff) | |
download | sqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz |
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/test_mssql.py')
-rw-r--r-- | test/dialect/test_mssql.py | 2143 |
1 files changed, 0 insertions, 2143 deletions
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py deleted file mode 100644 index b2eb8609b..000000000 --- a/test/dialect/test_mssql.py +++ /dev/null @@ -1,2143 +0,0 @@ -# -*- encoding: utf-8 -from sqlalchemy.testing import eq_, engines, pickleable -import datetime -import os -from sqlalchemy import * -from sqlalchemy import types, exc, schema, event -from sqlalchemy.orm import * -from sqlalchemy.sql import table, column -from sqlalchemy.databases import mssql -from sqlalchemy.dialects.mssql import pyodbc, mxodbc, pymssql -from sqlalchemy.dialects.mssql.base import TIME -from sqlalchemy.engine import url -from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ - AssertsExecutionResults, ComparesTables -from sqlalchemy import testing -from sqlalchemy.testing import emits_warning_on, assert_raises_message -import decimal -from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.util import b, u, ue -from sqlalchemy import sql, util - - -class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = mssql.dialect() - - def test_true_false(self): - self.assert_compile( - sql.false(), "0" - ) - self.assert_compile( - sql.true(), - "1" - ) - - def test_select(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select(), - 'SELECT sometable.somecolumn FROM sometable') - - def test_select_with_nolock(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select().with_hint(t, 'WITH (NOLOCK)'), - 'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)') - - def test_join_with_hint(self): - t1 = table('t1', - column('a', Integer), - column('b', String), - column('c', String), - ) - t2 = table('t2', - column("a", Integer), - column("b", Integer), - column("c", Integer), - ) - join = t1.join(t2, t1.c.a==t2.c.a).\ - select().with_hint(t1, 'WITH (NOLOCK)') - self.assert_compile( - join, - 'SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c ' - 'FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a' - ) - - def test_insert(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.insert(), - 'INSERT INTO sometable (somecolumn) VALUES ' - '(:somecolumn)') - - def test_update(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.update(t.c.somecolumn == 7), - 'UPDATE sometable SET somecolumn=:somecolum' - 'n WHERE sometable.somecolumn = ' - ':somecolumn_1', dict(somecolumn=10)) - - def test_insert_hint(self): - t = table('sometable', column('somecolumn')) - for targ in (None, t): - for darg in ("*", "mssql"): - self.assert_compile( - t.insert(). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), - "INSERT INTO sometable WITH (PAGLOCK) " - "(somecolumn) VALUES (:somecolumn)" - ) - - def test_update_hint(self): - t = table('sometable', column('somecolumn')) - for targ in (None, t): - for darg in ("*", "mssql"): - self.assert_compile( - t.update().where(t.c.somecolumn=="q"). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), - "UPDATE sometable WITH (PAGLOCK) " - "SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" - ) - - def test_update_exclude_hint(self): - t = table('sometable', column('somecolumn')) - self.assert_compile( - t.update().where(t.c.somecolumn=="q"). - values(somecolumn="x"). - with_hint("XYZ", "mysql"), - "UPDATE sometable SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" - ) - - def test_delete_hint(self): - t = table('sometable', column('somecolumn')) - for targ in (None, t): - for darg in ("*", "mssql"): - self.assert_compile( - t.delete().where(t.c.somecolumn=="q"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), - "DELETE FROM sometable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = :somecolumn_1" - ) - - def test_delete_exclude_hint(self): - t = table('sometable', column('somecolumn')) - self.assert_compile( - t.delete().\ - where(t.c.somecolumn=="q").\ - with_hint("XYZ", dialect_name="mysql"), - "DELETE FROM sometable WHERE " - "sometable.somecolumn = :somecolumn_1" - ) - - def test_update_from_hint(self): - t = table('sometable', column('somecolumn')) - t2 = table('othertable', column('somecolumn')) - for darg in ("*", "mssql"): - self.assert_compile( - t.update().where(t.c.somecolumn==t2.c.somecolumn). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=t2, - dialect_name=darg), - "UPDATE sometable SET somecolumn=:somecolumn " - "FROM sometable, othertable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = othertable.somecolumn" - ) - - # TODO: not supported yet. - #def test_delete_from_hint(self): - # t = table('sometable', column('somecolumn')) - # t2 = table('othertable', column('somecolumn')) - # for darg in ("*", "mssql"): - # self.assert_compile( - # t.delete().where(t.c.somecolumn==t2.c.somecolumn). - # with_hint("WITH (PAGLOCK)", - # selectable=t2, - # dialect_name=darg), - # "" - # ) - - def test_strict_binds(self): - """test the 'strict' compiler binds.""" - - from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler - mxodbc_dialect = mxodbc.dialect() - mxodbc_dialect.statement_compiler = MSSQLStrictCompiler - - t = table('sometable', column('foo')) - - for expr, compile in [ - ( - select([literal("x"), literal("y")]), - "SELECT 'x' AS anon_1, 'y' AS anon_2", - ), - ( - select([t]).where(t.c.foo.in_(['x', 'y', 'z'])), - "SELECT sometable.foo FROM sometable WHERE sometable.foo " - "IN ('x', 'y', 'z')", - ), - ( - t.c.foo.in_([None]), - "sometable.foo IN (NULL)" - ) - ]: - self.assert_compile(expr, compile, dialect=mxodbc_dialect) - - def test_in_with_subqueries(self): - """Test removal of legacy behavior that converted "x==subquery" - to use IN. - - """ - - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select().where(t.c.somecolumn - == t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn = ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') - self.assert_compile(t.select().where(t.c.somecolumn - != t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn != ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') - - def test_count(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.count(), - 'SELECT count(sometable.somecolumn) AS ' - 'tbl_row_count FROM sometable') - - def test_noorderby_insubquery(self): - """test that the ms-sql dialect removes ORDER BY clauses from - subqueries""" - - table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String), - ) - - q = select([table1.c.myid], - order_by=[table1.c.myid]).alias('foo') - crit = q.c.myid == table1.c.myid - self.assert_compile(select(['*'], crit), - "SELECT * FROM (SELECT mytable.myid AS " - "myid FROM mytable) AS foo, mytable WHERE " - "foo.myid = mytable.myid") - - - - def test_delete_schema(self): - metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), schema='paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM paj.test WHERE paj.test.id = ' - ':id_1') - s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM paj.test WHERE paj.test.id IN ' - '(SELECT test_1.id FROM paj.test AS test_1 ' - 'WHERE test_1.id = :id_1)') - - def test_delete_schema_multipart(self): - metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), schema='banana.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id = :id_1') - s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id IN (SELECT test_1.id ' - 'FROM banana.paj.test AS test_1 WHERE ' - 'test_1.id = :id_1)') - - def test_delete_schema_multipart_needs_quoting(self): - metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), schema='banana split.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id = :id_1') - s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id IN (SELECT ' - 'test_1.id FROM [banana split].paj.test AS ' - 'test_1 WHERE test_1.id = :id_1)') - - def test_delete_schema_multipart_both_need_quoting(self): - metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), - schema='banana split.paj with a space') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].[paj with a ' - 'space].test WHERE [banana split].[paj ' - 'with a space].test.id = :id_1') - s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM [banana split].[paj with a ' - 'space].test WHERE [banana split].[paj ' - 'with a space].test.id IN (SELECT ' - 'test_1.id FROM [banana split].[paj with a ' - 'space].test AS test_1 WHERE test_1.id = ' - ':id_1)') - - def test_union(self): - t1 = table('t1', column('col1'), column('col2'), column('col3' - ), column('col4')) - t2 = table('t2', column('col1'), column('col2'), column('col3' - ), column('col4')) - s1, s2 = select([t1.c.col3.label('col3'), t1.c.col4.label('col4' - )], t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \ - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(['t2col2r2', 't2col2r3'])) - u = union(s1, s2, order_by=['col3', 'col4']) - self.assert_compile(u, - 'SELECT t1.col3 AS col3, t1.col4 AS col4 ' - 'FROM t1 WHERE t1.col2 IN (:col2_1, ' - ':col2_2) UNION SELECT t2.col3 AS col3, ' - 't2.col4 AS col4 FROM t2 WHERE t2.col2 IN ' - '(:col2_3, :col2_4) ORDER BY col3, col4') - self.assert_compile(u.alias('bar').select(), - 'SELECT bar.col3, bar.col4 FROM (SELECT ' - 't1.col3 AS col3, t1.col4 AS col4 FROM t1 ' - 'WHERE t1.col2 IN (:col2_1, :col2_2) UNION ' - 'SELECT t2.col3 AS col3, t2.col4 AS col4 ' - 'FROM t2 WHERE t2.col2 IN (:col2_3, ' - ':col2_4)) AS bar') - - def test_function(self): - self.assert_compile(func.foo(1, 2), 'foo(:foo_1, :foo_2)') - self.assert_compile(func.current_time(), 'CURRENT_TIME') - self.assert_compile(func.foo(), 'foo()') - m = MetaData() - t = Table('sometable', m, Column('col1', Integer), Column('col2' - , Integer)) - self.assert_compile(select([func.max(t.c.col1)]), - 'SELECT max(sometable.col1) AS max_1 FROM ' - 'sometable') - - def test_function_overrides(self): - self.assert_compile(func.current_date(), "GETDATE()") - self.assert_compile(func.length(3), "LEN(:length_1)") - - def test_extract(self): - t = table('t', column('col1')) - - for field in 'day', 'month', 'year': - self.assert_compile( - select([extract(field, t.c.col1)]), - 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field) - - def test_update_returning(self): - table1 = table('mytable', column('myid', Integer), column('name' - , String(128)), column('description', - String(128))) - u = update(table1, values=dict(name='foo' - )).returning(table1.c.myid, table1.c.name) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name') - u = update(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description') - u = update(table1, values=dict(name='foo' - )).returning(table1).where(table1.c.name == 'bar') - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description WHERE mytable.name = ' - ':name_1') - u = update(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'LEN(inserted.name) AS length_1') - - def test_delete_returning(self): - table1 = table('mytable', column('myid', Integer), column('name' - , String(128)), column('description', - String(128))) - d = delete(table1).returning(table1.c.myid, table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name') - d = delete(table1).where(table1.c.name == 'bar' - ).returning(table1.c.myid, - table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name WHERE mytable.name = :name_1') - - def test_insert_returning(self): - table1 = table('mytable', column('myid', Integer), column('name' - , String(128)), column('description', - String(128))) - i = insert(table1, values=dict(name='foo' - )).returning(table1.c.myid, table1.c.name) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name VALUES ' - '(:name)') - i = insert(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description VALUES (:name)') - i = insert(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'LEN(inserted.name) AS length_1 VALUES ' - '(:name)') - - def test_limit_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) - - s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(10) - - self.assert_compile( - s, - "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} - ) - - def test_limit_zero_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) - - s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(0) - - self.assert_compile( - s, - "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} - ) - - def test_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) - - s = select([t]).where(t.c.x==5).order_by(t.c.y).offset(20) - - # test that the select is not altered with subsequent compile - # calls - for i in range(2): - self.assert_compile( - s, - "SELECT anon_1.x, anon_1.y FROM (SELECT t.x AS x, t.y " - "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS " - "mssql_rn FROM t WHERE t.x = :x_1) AS " - "anon_1 WHERE mssql_rn > :mssql_rn_1", - checkparams={'mssql_rn_1': 20, 'x_1': 5} - ) - - def test_limit_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) - - s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(10).offset(20) - - self.assert_compile( - s, - "SELECT anon_1.x, anon_1.y " - "FROM (SELECT t.x AS x, t.y AS y, " - "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn " - "FROM t " - "WHERE t.x = :x_1) AS anon_1 " - "WHERE mssql_rn > :mssql_rn_1 AND mssql_rn <= :mssql_rn_2", - checkparams={'mssql_rn_1': 20, 'mssql_rn_2': 30, 'x_1': 5} - ) - - def test_limit_offset_with_correlated_order_by(self): - t1 = table('t1', column('x', Integer), column('y', Integer)) - t2 = table('t2', column('x', Integer), column('y', Integer)) - - order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar() - s = select([t1]).where(t1.c.x == 5).order_by(order_by) \ - .limit(10).offset(20) - - self.assert_compile( - s, - "SELECT anon_1.x, anon_1.y " - "FROM (SELECT t1.x AS x, t1.y AS y, " - "ROW_NUMBER() OVER (ORDER BY " - "(SELECT t2.y FROM t2 WHERE t1.x = t2.x)" - ") AS mssql_rn " - "FROM t1 " - "WHERE t1.x = :x_1) AS anon_1 " - "WHERE mssql_rn > :mssql_rn_1 AND mssql_rn <= :mssql_rn_2", - checkparams={'mssql_rn_1': 20, 'mssql_rn_2': 30, 'x_1': 5} - ) - - def test_limit_zero_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) - - s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(0).offset(0) - - # render the LIMIT of zero, but not the OFFSET - # of zero, so produces TOP 0 - self.assert_compile( - s, - "SELECT TOP 0 t.x, t.y FROM t " - "WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} - ) - - def test_sequence_start_0(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', 0), primary_key=True)) - self.assert_compile(schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " - "PRIMARY KEY (id))" - ) - - def test_sequence_non_primary_key(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence(''), primary_key=False)) - self.assert_compile(schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))" - ) - - def test_sequence_ignore_nullability(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence(''), nullable=True)) - self.assert_compile(schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))" - ) - - def test_index_clustering(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer)) - idx = Index("foo", tbl.c.id, mssql_clustered=True) - self.assert_compile(schema.CreateIndex(idx), - "CREATE CLUSTERED INDEX foo ON test (id)" - ) - - def test_index_ordering(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) - idx = Index("foo", tbl.c.x.desc(), "y") - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x DESC, y)" - ) - - def test_index_extra_include_1(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) - idx = Index("foo", tbl.c.x, mssql_include=['y']) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) - - def test_index_extra_include_2(self): - metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) - idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y]) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) - -class SchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL): - """SQL server cannot reference schema-qualified tables in a SELECT statement, they - must be aliased. - """ - __dialect__ = mssql.dialect() - - def setup(self): - metadata = MetaData() - self.t1 = table('t1', - column('a', Integer), - column('b', String), - column('c', String), - ) - self.t2 = Table( - 't2', metadata, - Column("a", Integer), - Column("b", Integer), - Column("c", Integer), - schema = 'schema' - ) - - def test_result_map(self): - s = self.t2.select() - c = s.compile(dialect=self.__dialect__) - assert self.t2.c.a in set(c.result_map['a'][1]) - - def test_result_map_use_labels(self): - s = self.t2.select(use_labels=True) - c = s.compile(dialect=self.__dialect__) - assert self.t2.c.a in set(c.result_map['schema_t2_a'][1]) - - def test_straight_select(self): - self.assert_compile(self.t2.select(), - "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1" - ) - - def test_straight_select_use_labels(self): - self.assert_compile( - self.t2.select(use_labels=True), - "SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, " - "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1" - ) - - def test_join_to_schema(self): - t1, t2 = self.t1, self.t2 - self.assert_compile( - t1.join(t2, t1.c.a==t2.c.a).select(), - "SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 " - "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a" - ) - - def test_union_schema_to_non(self): - t1, t2 = self.t1, self.t2 - s = select([t2.c.a, t2.c.b]).apply_labels().\ - union( - select([t1.c.a, t1.c.b]).apply_labels() - ).alias().select() - self.assert_compile( - s, - "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM " - "(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b " - "FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, " - "t1.b AS t1_b FROM t1) AS anon_1" - ) - - def test_column_subquery_to_alias(self): - a1 = self.t2.alias('a1') - s = select([self.t2, select([a1.c.a]).as_scalar()]) - self.assert_compile( - s, - "SELECT t2_1.a, t2_1.b, t2_1.c, " - "(SELECT a1.a FROM [schema].t2 AS a1) " - "AS anon_1 FROM [schema].t2 AS t2_1" - - ) - -class IdentityInsertTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'mssql' - __dialect__ = mssql.MSDialect() - - @classmethod - def setup_class(cls): - global metadata, cattable - metadata = MetaData(testing.db) - - cattable = Table('cattable', metadata, - Column('id', Integer), - Column('description', String(50)), - PrimaryKeyConstraint('id', name='PK_cattable'), - ) - - def setup(self): - metadata.create_all() - - def teardown(self): - metadata.drop_all() - - def test_compiled(self): - self.assert_compile(cattable.insert().values(id=9, - description='Python'), - 'INSERT INTO cattable (id, description) ' - 'VALUES (:id, :description)') - - def test_execute(self): - cattable.insert().values(id=9, description='Python').execute() - - cats = cattable.select().order_by(cattable.c.id).execute() - eq_([(9, 'Python')], list(cats)) - - result = cattable.insert().values(description='PHP').execute() - eq_([10], result.inserted_primary_key) - lastcat = cattable.select().order_by(desc(cattable.c.id)).execute() - eq_((10, 'PHP'), lastcat.first()) - - def test_executemany(self): - cattable.insert().execute([{'id': 89, 'description': 'Python'}, - {'id': 8, 'description': 'Ruby'}, - {'id': 3, 'description': 'Perl'}, - {'id': 1, 'description': 'Java'}]) - cats = cattable.select().order_by(cattable.c.id).execute() - eq_([(1, 'Java'), (3, 'Perl'), (8, 'Ruby'), (89, 'Python')], - list(cats)) - cattable.insert().execute([{'description': 'PHP'}, - {'description': 'Smalltalk'}]) - lastcats = \ - cattable.select().order_by(desc(cattable.c.id)).limit(2).execute() - eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats)) - - -class ReflectionTest(fixtures.TestBase, ComparesTables): - __only_on__ = 'mssql' - - @testing.provide_metadata - def test_basic_reflection(self): - meta = self.metadata - - users = Table( - 'engine_users', - meta, - Column('user_id', types.INT, primary_key=True), - Column('user_name', types.VARCHAR(20), nullable=False), - Column('test1', types.CHAR(5), nullable=False), - Column('test2', types.Float(5), nullable=False), - Column('test3', types.Text), - Column('test4', types.Numeric, nullable=False), - Column('test5', types.DateTime), - Column('parent_user_id', types.Integer, - ForeignKey('engine_users.user_id')), - Column('test6', types.DateTime, nullable=False), - Column('test7', types.Text), - Column('test8', types.LargeBinary), - Column('test_passivedefault2', types.Integer, - server_default='5'), - Column('test9', types.BINARY(100)), - Column('test_numeric', types.Numeric()), - ) - - addresses = Table( - 'engine_email_addresses', - meta, - Column('address_id', types.Integer, primary_key=True), - Column('remote_user_id', types.Integer, - ForeignKey(users.c.user_id)), - Column('email_address', types.String(20)), - ) - meta.create_all() - - meta2 = MetaData() - reflected_users = Table('engine_users', meta2, - autoload=True, - autoload_with=testing.db) - reflected_addresses = Table('engine_email_addresses', - meta2, autoload=True, autoload_with=testing.db) - self.assert_tables_equal(users, reflected_users) - self.assert_tables_equal(addresses, reflected_addresses) - - @testing.provide_metadata - def test_identity(self): - metadata = self.metadata - table = Table( - 'identity_test', metadata, - Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True) - ) - table.create() - - meta2 = MetaData(testing.db) - table2 = Table('identity_test', meta2, autoload=True) - sequence = isinstance(table2.c['col1'].default, schema.Sequence) \ - and table2.c['col1'].default - assert sequence.start == 2 - assert sequence.increment == 3 - - @testing.emits_warning("Did not recognize") - @testing.provide_metadata - def test_skip_types(self): - metadata = self.metadata - testing.db.execute(""" - create table foo (id integer primary key, data xml) - """) - t1 = Table('foo', metadata, autoload=True) - assert isinstance(t1.c.id.type, Integer) - assert isinstance(t1.c.data.type, types.NullType) - - - @testing.provide_metadata - def test_db_qualified_items(self): - metadata = self.metadata - Table('foo', metadata, Column('id', Integer, primary_key=True)) - Table('bar', metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer, ForeignKey('foo.id', name="fkfoo")) - ) - metadata.create_all() - - dbname = testing.db.scalar("select db_name()") - owner = testing.db.scalar("SELECT user_name()") - - inspector = inspect(testing.db) - bar_via_db = inspector.get_foreign_keys( - "bar", schema="%s.%s" % (dbname, owner)) - eq_( - bar_via_db, - [{ - 'referred_table': 'foo', - 'referred_columns': ['id'], - 'referred_schema': 'test.dbo', - 'name': 'fkfoo', - 'constrained_columns': ['foo_id']}] - ) - - assert testing.db.has_table("bar", schema="test.dbo") - - m2 = MetaData() - Table('bar', m2, schema="test.dbo", autoload=True, - autoload_with=testing.db) - eq_(m2.tables["test.dbo.foo"].schema, "test.dbo") - - - @testing.provide_metadata - def test_indexes_cols(self): - metadata = self.metadata - - t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer)) - Index('foo', t1.c.x, t1.c.y) - metadata.create_all() - - m2 = MetaData() - t2 = Table('t', m2, autoload=True, autoload_with=testing.db) - - eq_( - set(list(t2.indexes)[0].columns), - set([t2.c['x'], t2.c.y]) - ) - - @testing.provide_metadata - def test_indexes_cols_with_commas(self): - metadata = self.metadata - - t1 = Table('t', metadata, - Column('x, col', Integer, key='x'), - Column('y', Integer) - ) - Index('foo', t1.c.x, t1.c.y) - metadata.create_all() - - m2 = MetaData() - t2 = Table('t', m2, autoload=True, autoload_with=testing.db) - - eq_( - set(list(t2.indexes)[0].columns), - set([t2.c['x, col'], t2.c.y]) - ) - - @testing.provide_metadata - def test_indexes_cols_with_spaces(self): - metadata = self.metadata - - t1 = Table('t', metadata, Column('x col', Integer, key='x'), - Column('y', Integer)) - Index('foo', t1.c.x, t1.c.y) - metadata.create_all() - - m2 = MetaData() - t2 = Table('t', m2, autoload=True, autoload_with=testing.db) - - eq_( - set(list(t2.indexes)[0].columns), - set([t2.c['x col'], t2.c.y]) - ) - -class QueryUnicodeTest(fixtures.TestBase): - - __only_on__ = 'mssql' - - def test_convert_unicode(self): - meta = MetaData(testing.db) - t1 = Table('unitest_table', meta, Column('id', Integer, - primary_key=True), Column('descr', - mssql.MSText(convert_unicode=True))) - meta.create_all() - con = testing.db.connect() - - # encode in UTF-8 (sting object) because this is the default - # dialect encoding - - con.execute(ue("insert into unitest_table values ('bien u\ - umang\xc3\xa9')").encode('UTF-8')) - try: - r = t1.select().execute().first() - assert isinstance(r[1], util.text_type), \ - '%s is %s instead of unicode, working on %s' % (r[1], - type(r[1]), meta.bind) - finally: - meta.drop_all() - -from sqlalchemy.testing.assertsql import ExactSQL -class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase): - __only_on__ = 'mssql' - - def test_fetchid_trigger(self): - """ - Verify identity return value on inserting to a trigger table. - - MSSQL's OUTPUT INSERTED clause does not work for the - case of a table having an identity (autoincrement) - primary key column, and which also has a trigger configured - to fire upon each insert and subsequently perform an - insert into a different table. - - SQLALchemy's MSSQL dialect by default will attempt to - use an OUTPUT_INSERTED clause, which in this case will - raise the following error: - - ProgrammingError: (ProgrammingError) ('42000', 334, - "[Microsoft][SQL Server Native Client 10.0][SQL Server]The - target table 't1' of the DML statement cannot have any enabled - triggers if the statement contains an OUTPUT clause without - INTO clause.", 7748) 'INSERT INTO t1 (descr) OUTPUT inserted.id - VALUES (?)' ('hello',) - - This test verifies a workaround, which is to rely on the - older SCOPE_IDENTITY() call, which still works for this scenario. - To enable the workaround, the Table must be instantiated - with the init parameter 'implicit_returning = False'. - """ - - #todo: this same test needs to be tried in a multithreaded context - # with multiple threads inserting to the same table. - #todo: check whether this error also occurs with clients other - # than the SQL Server Native Client. Maybe an assert_raises - # test should be written. - meta = MetaData(testing.db) - t1 = Table('t1', meta, - Column('id', Integer, Sequence('fred', 100, 1), - primary_key=True), - Column('descr', String(200)), - # the following flag will prevent the - # MSSQLCompiler.returning_clause from getting called, - # though the ExecutionContext will still have a - # _select_lastrowid, so the SELECT SCOPE_IDENTITY() will - # hopefully be called instead. - implicit_returning = False - ) - t2 = Table('t2', meta, - Column('id', Integer, Sequence('fred', 200, 1), - primary_key=True), - Column('descr', String(200))) - meta.create_all() - con = testing.db.connect() - con.execute("""create trigger paj on t1 for insert as - insert into t2 (descr) select descr from inserted""") - - try: - tr = con.begin() - r = con.execute(t2.insert(), descr='hello') - self.assert_(r.inserted_primary_key == [200]) - r = con.execute(t1.insert(), descr='hello') - self.assert_(r.inserted_primary_key == [100]) - - finally: - tr.commit() - con.execute("""drop trigger paj""") - meta.drop_all() - - @testing.provide_metadata - def test_disable_scope_identity(self): - engine = engines.testing_engine(options={"use_scope_identity":False}) - metadata = self.metadata - metadata.bind = engine - t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True), - implicit_returning=False - ) - metadata.create_all() - - self.assert_sql_execution( - testing.db, - lambda: engine.execute(t1.insert()), - ExactSQL("INSERT INTO t1 DEFAULT VALUES"), - # we dont have an event for - # "SELECT @@IDENTITY" part here. - # this will be in 0.8 with #2459 - ) - assert not engine.dialect.use_scope_identity - - def test_insertid_schema(self): - meta = MetaData(testing.db) - con = testing.db.connect() - con.execute('create schema paj') - tbl = Table('test', meta, - Column('id', Integer, primary_key=True), schema='paj') - tbl.create() - try: - tbl.insert().execute({'id':1}) - finally: - tbl.drop() - con.execute('drop schema paj') - - def test_returning_no_autoinc(self): - meta = MetaData(testing.db) - table = Table('t1', meta, Column('id', Integer, - primary_key=True), Column('data', String(50))) - table.create() - try: - result = table.insert().values(id=1, - data=func.lower('SomeString' - )).returning(table.c.id, table.c.data).execute() - eq_(result.fetchall(), [(1, 'somestring')]) - finally: - - # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs - # before the result is fetched - - table.drop() - - def test_delete_schema(self): - meta = MetaData(testing.db) - con = testing.db.connect() - con.execute('create schema paj') - tbl = Table('test', meta, Column('id', Integer, - primary_key=True), schema='paj') - tbl.create() - try: - tbl.insert().execute({'id': 1}) - tbl.delete(tbl.c.id == 1).execute() - finally: - tbl.drop() - con.execute('drop schema paj') - - def test_insertid_reserved(self): - meta = MetaData(testing.db) - table = Table( - 'select', meta, - Column('col', Integer, primary_key=True) - ) - table.create() - - meta2 = MetaData(testing.db) - try: - table.insert().execute(col=7) - finally: - table.drop() - - -class Foo(object): - def __init__(self, **kw): - for k in kw: - setattr(self, k, kw[k]) - -class GenerativeQueryTest(fixtures.TestBase): - __only_on__ = 'mssql' - - @classmethod - def setup_class(cls): - global foo, metadata - metadata = MetaData(testing.db) - foo = Table('foo', metadata, - Column('id', Integer, Sequence('foo_id_seq'), - primary_key=True), - Column('bar', Integer), - Column('range', Integer)) - - mapper(Foo, foo) - metadata.create_all() - - sess = create_session(bind=testing.db) - for i in range(100): - sess.add(Foo(bar=i, range=i%10)) - sess.flush() - - @classmethod - def teardown_class(cls): - metadata.drop_all() - clear_mappers() - - def test_slice_mssql(self): - sess = create_session(bind=testing.db) - query = sess.query(Foo) - orig = query.all() - assert list(query[:10]) == orig[:10] - assert list(query[:10]) == orig[:10] - - -class SchemaTest(fixtures.TestBase): - - def setup(self): - t = Table('sometable', MetaData(), - Column('pk_column', Integer), - Column('test_column', String) - ) - self.column = t.c.test_column - - dialect = mssql.dialect() - self.ddl_compiler = dialect.ddl_compiler(dialect, - schema.CreateTable(t)) - - def _column_spec(self): - return self.ddl_compiler.get_column_specification(self.column) - - def test_that_mssql_default_nullability_emits_null(self): - eq_("test_column VARCHAR(max) NULL", self._column_spec()) - - def test_that_mssql_none_nullability_does_not_emit_nullability(self): - self.column.nullable = None - eq_("test_column VARCHAR(max)", self._column_spec()) - - def test_that_mssql_specified_nullable_emits_null(self): - self.column.nullable = True - eq_("test_column VARCHAR(max) NULL", self._column_spec()) - - def test_that_mssql_specified_not_nullable_emits_not_null(self): - self.column.nullable = False - eq_("test_column VARCHAR(max) NOT NULL", self._column_spec()) - - -def full_text_search_missing(): - """Test if full text search is not implemented and return False if - it is and True otherwise.""" - - try: - connection = testing.db.connect() - try: - connection.execute('CREATE FULLTEXT CATALOG Catalog AS ' - 'DEFAULT') - return False - except: - return True - finally: - connection.close() - -class MatchTest(fixtures.TestBase, AssertsCompiledSQL): - - __only_on__ = 'mssql' - __skip_if__ = full_text_search_missing, - - @classmethod - def setup_class(cls): - global metadata, cattable, matchtable - metadata = MetaData(testing.db) - cattable = Table('cattable', metadata, Column('id', Integer), - Column('description', String(50)), - PrimaryKeyConstraint('id', name='PK_cattable')) - matchtable = Table( - 'matchtable', - metadata, - Column('id', Integer), - Column('title', String(200)), - Column('category_id', Integer, ForeignKey('cattable.id')), - PrimaryKeyConstraint('id', name='PK_matchtable'), - ) - DDL("""CREATE FULLTEXT INDEX - ON cattable (description) - KEY INDEX PK_cattable""").execute_at('after-create' - , matchtable) - DDL("""CREATE FULLTEXT INDEX - ON matchtable (title) - KEY INDEX PK_matchtable""").execute_at('after-create' - , matchtable) - metadata.create_all() - cattable.insert().execute([{'id': 1, 'description': 'Python'}, - {'id': 2, 'description': 'Ruby'}]) - matchtable.insert().execute([{'id': 1, 'title' - : 'Agile Web Development with Rails' - , 'category_id': 2}, {'id': 2, - 'title': 'Dive Into Python', - 'category_id': 1}, {'id': 3, 'title' - : "Programming Matz's Ruby", - 'category_id': 2}, {'id': 4, 'title' - : 'The Definitive Guide to Django', - 'category_id': 1}, {'id': 5, 'title' - : 'Python in a Nutshell', - 'category_id': 1}]) - DDL("WAITFOR DELAY '00:00:05'" - ).execute(bind=engines.testing_engine()) - - @classmethod - def teardown_class(cls): - metadata.drop_all() - connection = testing.db.connect() - connection.execute("DROP FULLTEXT CATALOG Catalog") - connection.close() - - def test_expression(self): - self.assert_compile(matchtable.c.title.match('somstr'), - 'CONTAINS (matchtable.title, ?)') - - def test_simple_match(self): - results = \ - matchtable.select().where(matchtable.c.title.match('python' - )).order_by(matchtable.c.id).execute().fetchall() - eq_([2, 5], [r.id for r in results]) - - def test_simple_match_with_apostrophe(self): - results = \ - matchtable.select().where(matchtable.c.title.match("Matz's" - )).execute().fetchall() - eq_([3], [r.id for r in results]) - - def test_simple_prefix_match(self): - results = \ - matchtable.select().where(matchtable.c.title.match('"nut*"' - )).execute().fetchall() - eq_([5], [r.id for r in results]) - - def test_simple_inflectional_match(self): - results = \ - matchtable.select().where( - matchtable.c.title.match('FORMSOF(INFLECTIONAL, "dives")' - )).execute().fetchall() - eq_([2], [r.id for r in results]) - - def test_or_match(self): - results1 = \ - matchtable.select().where(or_(matchtable.c.title.match('nutshell' - ), matchtable.c.title.match('ruby' - ))).order_by(matchtable.c.id).execute().fetchall() - eq_([3, 5], [r.id for r in results1]) - results2 = \ - matchtable.select().where( - matchtable.c.title.match('nutshell OR ruby' - )).order_by(matchtable.c.id).execute().fetchall() - eq_([3, 5], [r.id for r in results2]) - - def test_and_match(self): - results1 = \ - matchtable.select().where(and_(matchtable.c.title.match('python' - ), matchtable.c.title.match('nutshell' - ))).execute().fetchall() - eq_([5], [r.id for r in results1]) - results2 = \ - matchtable.select().where( - matchtable.c.title.match('python AND nutshell' - )).execute().fetchall() - eq_([5], [r.id for r in results2]) - - def test_match_across_joins(self): - results = matchtable.select().where(and_(cattable.c.id - == matchtable.c.category_id, - or_(cattable.c.description.match('Ruby'), - matchtable.c.title.match('nutshell' - )))).order_by(matchtable.c.id).execute().fetchall() - eq_([1, 3, 5], [r.id for r in results]) - - -class ParseConnectTest(fixtures.TestBase): - - def test_pyodbc_connect_dsn_trusted(self): - dialect = pyodbc.dialect() - u = url.make_url('mssql://mydsn') - connection = dialect.create_connect_args(u) - eq_([['dsn=mydsn;Trusted_Connection=Yes'], {}], connection) - - def test_pyodbc_connect_old_style_dsn_trusted(self): - dialect = pyodbc.dialect() - u = url.make_url('mssql:///?dsn=mydsn') - connection = dialect.create_connect_args(u) - eq_([['dsn=mydsn;Trusted_Connection=Yes'], {}], connection) - - def test_pyodbc_connect_dsn_non_trusted(self): - dialect = pyodbc.dialect() - u = url.make_url('mssql://username:password@mydsn') - connection = dialect.create_connect_args(u) - eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection) - - def test_pyodbc_connect_dsn_extra(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_' - 'english&foo=bar') - connection = dialect.create_connect_args(u) - dsn_string = connection[0][0] - assert ";LANGUAGE=us_english" in dsn_string - assert ";foo=bar" in dsn_string - - def test_pyodbc_connect(self): - dialect = pyodbc.dialect() - u = url.make_url('mssql://username:password@hostspec/database') - connection = dialect.create_connect_args(u) - eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI' - 'D=username;PWD=password'], {}], connection) - - def test_pyodbc_connect_comma_port(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql://username:password@hostspec:12345/data' - 'base') - connection = dialect.create_connect_args(u) - eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=datab' - 'ase;UID=username;PWD=password'], {}], connection) - - def test_pyodbc_connect_config_port(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql://username:password@hostspec/database?p' - 'ort=12345') - connection = dialect.create_connect_args(u) - eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI' - 'D=username;PWD=password;port=12345'], {}], connection) - - def test_pyodbc_extra_connect(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql://username:password@hostspec/database?L' - 'ANGUAGE=us_english&foo=bar') - connection = dialect.create_connect_args(u) - eq_(connection[1], {}) - eq_(connection[0][0] - in ('DRIVER={SQL Server};Server=hostspec;Database=database;' - 'UID=username;PWD=password;foo=bar;LANGUAGE=us_english', - 'DRIVER={SQL Server};Server=hostspec;Database=database;UID=' - 'username;PWD=password;LANGUAGE=us_english;foo=bar'), True) - - def test_pyodbc_odbc_connect(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server' - '%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase' - '%3BUID%3Dusername%3BPWD%3Dpassword') - connection = dialect.create_connect_args(u) - eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI' - 'D=username;PWD=password'], {}], connection) - - def test_pyodbc_odbc_connect_with_dsn(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase' - '%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword' - ) - connection = dialect.create_connect_args(u) - eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'], - {}], connection) - - def test_pyodbc_odbc_connect_ignores_other_values(self): - dialect = pyodbc.dialect() - u = \ - url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?od' - 'bc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer' - '%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Duse' - 'rname%3BPWD%3Dpassword') - connection = dialect.create_connect_args(u) - eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI' - 'D=username;PWD=password'], {}], connection) - - def test_pymssql_port_setting(self): - dialect = pymssql.dialect() - - u = \ - url.make_url('mssql+pymssql://scott:tiger@somehost/test') - connection = dialect.create_connect_args(u) - eq_( - [[], {'host': 'somehost', 'password': 'tiger', - 'user': 'scott', 'database': 'test'}], connection - ) - - u = \ - url.make_url('mssql+pymssql://scott:tiger@somehost:5000/test') - connection = dialect.create_connect_args(u) - eq_( - [[], {'host': 'somehost:5000', 'password': 'tiger', - 'user': 'scott', 'database': 'test'}], connection - ) - - def test_pymssql_disconnect(self): - dialect = pymssql.dialect() - - for error in [ - 'Adaptive Server connection timed out', - 'message 20003', - "Error 10054", - "Not connected to any MS SQL server", - "Connection is closed" - ]: - eq_(dialect.is_disconnect(error, None, None), True) - - eq_(dialect.is_disconnect("not an error", None, None), False) - - @testing.only_on(['mssql+pyodbc', 'mssql+pymssql'], - "FreeTDS specific test") - def test_bad_freetds_warning(self): - engine = engines.testing_engine() - - def _bad_version(connection): - return 95, 10, 255 - - engine.dialect._get_server_version_info = _bad_version - assert_raises_message(exc.SAWarning, - 'Unrecognized server version info', - engine.connect) - -class TimeTypeTest(fixtures.TestBase): - - def test_result_processor_no_microseconds(self): - expected = datetime.time(12, 34, 56) - self._assert_result_processor(expected, '12:34:56') - - def test_result_processor_too_many_microseconds(self): - # microsecond must be in 0..999999, should truncate (6 vs 7 digits) - expected = datetime.time(12, 34, 56, 123456) - self._assert_result_processor(expected, '12:34:56.1234567') - - def _assert_result_processor(self, expected, value): - mssql_time_type = TIME() - result_processor = mssql_time_type.result_processor(None, None) - eq_(expected, result_processor(value)) - - -class TypeDDLTest(fixtures.TestBase): - def test_boolean(self): - "Exercise type specification for boolean type." - - columns = [ - # column type, args, kwargs, expected ddl - (Boolean, [], {}, - 'BIT'), - ] - - metadata = MetaData() - table_args = ['test_mssql_boolean', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append( - Column('c%s' % index, type_(*args, **kw), nullable=None)) - - boolean_table = Table(*table_args) - dialect = mssql.dialect() - gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table)) - - for col in boolean_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) - self.assert_(repr(col)) - - - def test_numeric(self): - "Exercise type specification and options for numeric types." - - columns = [ - # column type, args, kwargs, expected ddl - (types.NUMERIC, [], {}, - 'NUMERIC'), - (types.NUMERIC, [None], {}, - 'NUMERIC'), - (types.NUMERIC, [12, 4], {}, - 'NUMERIC(12, 4)'), - - (types.Float, [], {}, - 'FLOAT'), - (types.Float, [None], {}, - 'FLOAT'), - (types.Float, [12], {}, - 'FLOAT(12)'), - (mssql.MSReal, [], {}, - 'REAL'), - - (types.Integer, [], {}, - 'INTEGER'), - (types.BigInteger, [], {}, - 'BIGINT'), - (mssql.MSTinyInteger, [], {}, - 'TINYINT'), - (types.SmallInteger, [], {}, - 'SMALLINT'), - ] - - metadata = MetaData() - table_args = ['test_mssql_numeric', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append( - Column('c%s' % index, type_(*args, **kw), nullable=None)) - - numeric_table = Table(*table_args) - dialect = mssql.dialect() - gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table)) - - for col in numeric_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) - self.assert_(repr(col)) - - - def test_char(self): - """Exercise COLLATE-ish options on string types.""" - - columns = [ - (mssql.MSChar, [], {}, - 'CHAR'), - (mssql.MSChar, [1], {}, - 'CHAR(1)'), - (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'}, - 'CHAR(1) COLLATE Latin1_General_CI_AS'), - - (mssql.MSNChar, [], {}, - 'NCHAR'), - (mssql.MSNChar, [1], {}, - 'NCHAR(1)'), - (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'}, - 'NCHAR(1) COLLATE Latin1_General_CI_AS'), - - (mssql.MSString, [], {}, - 'VARCHAR(max)'), - (mssql.MSString, [1], {}, - 'VARCHAR(1)'), - (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'}, - 'VARCHAR(1) COLLATE Latin1_General_CI_AS'), - - (mssql.MSNVarchar, [], {}, - 'NVARCHAR(max)'), - (mssql.MSNVarchar, [1], {}, - 'NVARCHAR(1)'), - (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'}, - 'NVARCHAR(1) COLLATE Latin1_General_CI_AS'), - - (mssql.MSText, [], {}, - 'TEXT'), - (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'}, - 'TEXT COLLATE Latin1_General_CI_AS'), - - (mssql.MSNText, [], {}, - 'NTEXT'), - (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'}, - 'NTEXT COLLATE Latin1_General_CI_AS'), - ] - - metadata = MetaData() - table_args = ['test_mssql_charset', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append( - Column('c%s' % index, type_(*args, **kw), nullable=None)) - - charset_table = Table(*table_args) - dialect = mssql.dialect() - gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table)) - - for col in charset_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), - "%s %s" % (col.name, columns[index][3])) - self.assert_(repr(col)) - - - def test_timestamp(self): - """Exercise TIMESTAMP column.""" - - dialect = mssql.dialect() - - metadata = MetaData() - spec, expected = (TIMESTAMP, 'TIMESTAMP') - t = Table('mssql_ts', metadata, - Column('id', Integer, primary_key=True), - Column('t', spec, nullable=None)) - gen = dialect.ddl_compiler(dialect, schema.CreateTable(t)) - testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected) - self.assert_(repr(t.c.t)) - - def test_money(self): - """Exercise type specification for money types.""" - - columns = [(mssql.MSMoney, [], {}, 'MONEY'), - (mssql.MSSmallMoney, [], {}, 'SMALLMONEY')] - metadata = MetaData() - table_args = ['test_mssql_money', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append(Column('c%s' % index, type_(*args, **kw), - nullable=None)) - money_table = Table(*table_args) - dialect = mssql.dialect() - gen = dialect.ddl_compiler(dialect, - schema.CreateTable(money_table)) - for col in money_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), '%s %s' - % (col.name, columns[index][3])) - self.assert_(repr(col)) - - def test_binary(self): - "Exercise type specification for binary types." - - columns = [ - # column type, args, kwargs, expected ddl - (mssql.MSBinary, [], {}, - 'BINARY'), - (mssql.MSBinary, [10], {}, - 'BINARY(10)'), - - (types.BINARY, [], {}, - 'BINARY'), - (types.BINARY, [10], {}, - 'BINARY(10)'), - - (mssql.MSVarBinary, [], {}, - 'VARBINARY(max)'), - (mssql.MSVarBinary, [10], {}, - 'VARBINARY(10)'), - - (types.VARBINARY, [10], {}, - 'VARBINARY(10)'), - (types.VARBINARY, [], {}, - 'VARBINARY(max)'), - - (mssql.MSImage, [], {}, - 'IMAGE'), - - (mssql.IMAGE, [], {}, - 'IMAGE'), - - (types.LargeBinary, [], {}, - 'IMAGE'), - ] - - metadata = MetaData() - table_args = ['test_mssql_binary', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append(Column('c%s' % index, type_(*args, **kw), - nullable=None)) - binary_table = Table(*table_args) - dialect = mssql.dialect() - gen = dialect.ddl_compiler(dialect, - schema.CreateTable(binary_table)) - for col in binary_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), '%s %s' - % (col.name, columns[index][3])) - self.assert_(repr(col)) - -class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables): - __only_on__ = 'mssql' - - @classmethod - def setup_class(cls): - global metadata - metadata = MetaData(testing.db) - - def teardown(self): - metadata.drop_all() - - @testing.fails_on_everything_except('mssql+pyodbc', - 'this is some pyodbc-specific feature') - def test_decimal_notation(self): - numeric_table = Table('numeric_table', metadata, Column('id', - Integer, Sequence('numeric_id_seq', - optional=True), primary_key=True), - Column('numericcol', - Numeric(precision=38, scale=20, - asdecimal=True))) - metadata.create_all() - test_items = [decimal.Decimal(d) for d in ( - '1500000.00000000000000000000', - '-1500000.00000000000000000000', - '1500000', - '0.0000000000000000002', - '0.2', - '-0.0000000000000000002', - '-2E-2', - '156666.458923543', - '-156666.458923543', - '1', - '-1', - '-1234', - '1234', - '2E-12', - '4E8', - '3E-6', - '3E-7', - '4.1', - '1E-1', - '1E-2', - '1E-3', - '1E-4', - '1E-5', - '1E-6', - '1E-7', - '1E-1', - '1E-8', - '0.2732E2', - '-0.2432E2', - '4.35656E2', - '-02452E-2', - '45125E-2', - '1234.58965E-2', - '1.521E+15', - '-1E-25', - '1E-25', - '1254E-25', - '-1203E-25', - '0', - '-0.00', - '-0', - '4585E12', - '000000000000000000012', - '000000000000.32E12', - '00000000000000.1E+12', - '000000000000.2E-32', - )] - - for value in test_items: - numeric_table.insert().execute(numericcol=value) - - for value in select([numeric_table.c.numericcol]).execute(): - assert value[0] in test_items, "%r not in test_items" % value[0] - - def test_float(self): - float_table = Table('float_table', metadata, Column('id', - Integer, Sequence('numeric_id_seq', - optional=True), primary_key=True), - Column('floatcol', Float())) - metadata.create_all() - try: - test_items = [float(d) for d in ( - '1500000.00000000000000000000', - '-1500000.00000000000000000000', - '1500000', - '0.0000000000000000002', - '0.2', - '-0.0000000000000000002', - '156666.458923543', - '-156666.458923543', - '1', - '-1', - '1234', - '2E-12', - '4E8', - '3E-6', - '3E-7', - '4.1', - '1E-1', - '1E-2', - '1E-3', - '1E-4', - '1E-5', - '1E-6', - '1E-7', - '1E-8', - )] - for value in test_items: - float_table.insert().execute(floatcol=value) - except Exception as e: - raise e - - - # todo this should suppress warnings, but it does not - @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*') - def test_dates(self): - "Exercise type specification for date types." - - columns = [ - # column type, args, kwargs, expected ddl - (mssql.MSDateTime, [], {}, - 'DATETIME', []), - - (types.DATE, [], {}, - 'DATE', ['>=', (10,)]), - (types.Date, [], {}, - 'DATE', ['>=', (10,)]), - (types.Date, [], {}, - 'DATETIME', ['<', (10,)], mssql.MSDateTime), - (mssql.MSDate, [], {}, - 'DATE', ['>=', (10,)]), - (mssql.MSDate, [], {}, - 'DATETIME', ['<', (10,)], mssql.MSDateTime), - - (types.TIME, [], {}, - 'TIME', ['>=', (10,)]), - (types.Time, [], {}, - 'TIME', ['>=', (10,)]), - (mssql.MSTime, [], {}, - 'TIME', ['>=', (10,)]), - (mssql.MSTime, [1], {}, - 'TIME(1)', ['>=', (10,)]), - (types.Time, [], {}, - 'DATETIME', ['<', (10,)], mssql.MSDateTime), - (mssql.MSTime, [], {}, - 'TIME', ['>=', (10,)]), - - (mssql.MSSmallDateTime, [], {}, - 'SMALLDATETIME', []), - - (mssql.MSDateTimeOffset, [], {}, - 'DATETIMEOFFSET', ['>=', (10,)]), - (mssql.MSDateTimeOffset, [1], {}, - 'DATETIMEOFFSET(1)', ['>=', (10,)]), - - (mssql.MSDateTime2, [], {}, - 'DATETIME2', ['>=', (10,)]), - (mssql.MSDateTime2, [1], {}, - 'DATETIME2(1)', ['>=', (10,)]), - - ] - - table_args = ['test_mssql_dates', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res, requires = spec[0:5] - if requires and testing._is_excluded('mssql', *requires) \ - or not requires: - c = Column('c%s' % index, type_(*args, - **kw), nullable=None) - testing.db.dialect.type_descriptor(c.type) - table_args.append(c) - dates_table = Table(*table_args) - gen = testing.db.dialect.ddl_compiler(testing.db.dialect, - schema.CreateTable(dates_table)) - for col in dates_table.c: - index = int(col.name[1:]) - testing.eq_(gen.get_column_specification(col), '%s %s' - % (col.name, columns[index][3])) - self.assert_(repr(col)) - dates_table.create(checkfirst=True) - reflected_dates = Table('test_mssql_dates', - MetaData(testing.db), autoload=True) - for col in reflected_dates.c: - self.assert_types_base(col, dates_table.c[col.key]) - - def test_date_roundtrip(self): - t = Table('test_dates', metadata, - Column('id', Integer, - Sequence('datetest_id_seq', optional=True), - primary_key=True), - Column('adate', Date), - Column('atime', Time), - Column('adatetime', DateTime)) - metadata.create_all() - d1 = datetime.date(2007, 10, 30) - t1 = datetime.time(11, 2, 32) - d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) - t.insert().execute(adate=d1, adatetime=d2, atime=t1) - t.insert().execute(adate=d2, adatetime=d2, atime=d2) - - x = t.select().execute().fetchall()[0] - self.assert_(x.adate.__class__ == datetime.date) - self.assert_(x.atime.__class__ == datetime.time) - self.assert_(x.adatetime.__class__ == datetime.datetime) - - t.delete().execute() - - t.insert().execute(adate=d1, adatetime=d2, atime=t1) - - eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate - == d1).execute().fetchall(), [(d1, t1, d2)]) - - @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*') - @testing.provide_metadata - def test_binary_reflection(self): - "Exercise type specification for binary types." - - columns = [ - # column type, args, kwargs, expected ddl - (mssql.MSBinary, [], {}, - 'BINARY'), - (mssql.MSBinary, [10], {}, - 'BINARY(10)'), - - (types.BINARY, [], {}, - 'BINARY'), - (types.BINARY, [10], {}, - 'BINARY(10)'), - - (mssql.MSVarBinary, [], {}, - 'VARBINARY(max)'), - (mssql.MSVarBinary, [10], {}, - 'VARBINARY(10)'), - - (types.VARBINARY, [10], {}, - 'VARBINARY(10)'), - (types.VARBINARY, [], {}, - 'VARBINARY(max)'), - - (mssql.MSImage, [], {}, - 'IMAGE'), - - (mssql.IMAGE, [], {}, - 'IMAGE'), - - (types.LargeBinary, [], {}, - 'IMAGE'), - ] - - metadata = self.metadata - table_args = ['test_mssql_binary', metadata] - for index, spec in enumerate(columns): - type_, args, kw, res = spec - table_args.append(Column('c%s' % index, type_(*args, **kw), - nullable=None)) - binary_table = Table(*table_args) - metadata.create_all() - reflected_binary = Table('test_mssql_binary', - MetaData(testing.db), autoload=True) - for col in reflected_binary.c: - c1 = testing.db.dialect.type_descriptor(col.type).__class__ - c2 = \ - testing.db.dialect.type_descriptor( - binary_table.c[col.name].type).__class__ - assert issubclass(c1, c2), '%r is not a subclass of %r' \ - % (c1, c2) - if binary_table.c[col.name].type.length: - testing.eq_(col.type.length, - binary_table.c[col.name].type.length) - - - def test_autoincrement(self): - Table('ai_1', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_2', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_3', metadata, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_y', Integer, primary_key=True)) - Table('ai_4', metadata, - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False), - Column('int_n2', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_5', metadata, - Column('int_y', Integer, primary_key=True), - Column('int_n', Integer, DefaultClause('0'), - primary_key=True, autoincrement=False)) - Table('ai_6', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_7', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True), - Column('int_y', Integer, primary_key=True)) - Table('ai_8', metadata, - Column('o1', String(1), DefaultClause('x'), - primary_key=True), - Column('o2', String(1), DefaultClause('x'), - primary_key=True)) - metadata.create_all() - - table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', - 'ai_5', 'ai_6', 'ai_7', 'ai_8'] - mr = MetaData(testing.db) - - for name in table_names: - tbl = Table(name, mr, autoload=True) - tbl = metadata.tables[name] - for c in tbl.c: - if c.name.startswith('int_y'): - assert c.autoincrement, name - assert tbl._autoincrement_column is c, name - elif c.name.startswith('int_n'): - assert not c.autoincrement, name - assert tbl._autoincrement_column is not c, name - - # mxodbc can't handle scope_identity() with DEFAULT VALUES - - if testing.db.driver == 'mxodbc': - eng = \ - [engines.testing_engine(options={'implicit_returning' - : True})] - else: - eng = \ - [engines.testing_engine(options={'implicit_returning' - : False}), - engines.testing_engine(options={'implicit_returning' - : True})] - - for counter, engine in enumerate(eng): - engine.execute(tbl.insert()) - if 'int_y' in tbl.c: - assert engine.scalar(select([tbl.c.int_y])) \ - == counter + 1 - assert list(engine.execute(tbl.select()).first()).\ - count(counter + 1) == 1 - else: - assert 1 \ - not in list(engine.execute(tbl.select()).first()) - engine.execute(tbl.delete()) - -class MonkeyPatchedBinaryTest(fixtures.TestBase): - __only_on__ = 'mssql+pymssql' - - def test_unicode(self): - module = __import__('pymssql') - result = module.Binary('foo') - eq_(result, 'foo') - - def test_bytes(self): - module = __import__('pymssql') - input = b('\x80\x03]q\x00X\x03\x00\x00\x00oneq\x01a.') - expected_result = input - result = module.Binary(input) - eq_(result, expected_result) - -class BinaryTest(fixtures.TestBase, AssertsExecutionResults): - """Test the Binary and VarBinary types""" - - __only_on__ = 'mssql' - - @classmethod - def setup_class(cls): - global binary_table, MyPickleType - - class MyPickleType(types.TypeDecorator): - impl = PickleType - - def process_bind_param(self, value, dialect): - if value: - value.stuff = 'this is modified stuff' - return value - - def process_result_value(self, value, dialect): - if value: - value.stuff = 'this is the right stuff' - return value - - binary_table = Table( - 'binary_table', - MetaData(testing.db), - Column('primary_id', Integer, Sequence('binary_id_seq', - optional=True), primary_key=True), - Column('data', mssql.MSVarBinary(8000)), - Column('data_image', mssql.MSImage), - Column('data_slice', types.BINARY(100)), - Column('misc', String(30)), - Column('pickled', PickleType), - Column('mypickle', MyPickleType), - ) - binary_table.create() - - def teardown(self): - binary_table.delete().execute() - - @classmethod - def teardown_class(cls): - binary_table.drop() - - def test_binary(self): - testobj1 = pickleable.Foo('im foo 1') - testobj2 = pickleable.Foo('im foo 2') - testobj3 = pickleable.Foo('im foo 3') - stream1 = self.load_stream('binary_data_one.dat') - stream2 = self.load_stream('binary_data_two.dat') - binary_table.insert().execute( - primary_id=1, - misc='binary_data_one.dat', - data=stream1, - data_image=stream1, - data_slice=stream1[0:100], - pickled=testobj1, - mypickle=testobj3, - ) - binary_table.insert().execute( - primary_id=2, - misc='binary_data_two.dat', - data=stream2, - data_image=stream2, - data_slice=stream2[0:99], - pickled=testobj2, - ) - - # TODO: pyodbc does not seem to accept "None" for a VARBINARY - # column (data=None). error: [Microsoft][ODBC SQL Server - # Driver][SQL Server]Implicit conversion from data type varchar - # to varbinary is not allowed. Use the CONVERT function to run - # this query. (257) binary_table.insert().execute(primary_id=3, - # misc='binary_data_two.dat', data=None, data_image=None, - # data_slice=stream2[0:99], pickled=None) - - binary_table.insert().execute(primary_id=3, - misc='binary_data_two.dat', data_image=None, - data_slice=stream2[0:99], pickled=None) - for stmt in \ - binary_table.select(order_by=binary_table.c.primary_id), \ - text('select * from binary_table order by ' - 'binary_table.primary_id', - typemap=dict(data=mssql.MSVarBinary(8000), - data_image=mssql.MSImage, - data_slice=types.BINARY(100), pickled=PickleType, - mypickle=MyPickleType), bind=testing.db): - l = stmt.execute().fetchall() - eq_(list(stream1), list(l[0]['data'])) - paddedstream = list(stream1[0:100]) - paddedstream.extend(['\x00'] * (100 - len(paddedstream))) - eq_(paddedstream, list(l[0]['data_slice'])) - eq_(list(stream2), list(l[1]['data'])) - eq_(list(stream2), list(l[1]['data_image'])) - eq_(testobj1, l[0]['pickled']) - eq_(testobj2, l[1]['pickled']) - eq_(testobj3.moredata, l[0]['mypickle'].moredata) - eq_(l[0]['mypickle'].stuff, 'this is the right stuff') - - def load_stream(self, name, len=3000): - fp = open(os.path.join(os.path.dirname(__file__), "..", name), 'rb') - stream = fp.read(len) - fp.close() - return stream - -from sqlalchemy.dialects.mssql.information_schema import CoerceUnicode, tables -from sqlalchemy.dialects.mssql import base - -class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL): - def test_info_unicode_coercion(self): - - dialect = mssql.dialect() - value = CoerceUnicode().bind_processor(dialect)('a string') - assert isinstance(value, util.text_type) - - def test_info_unicode_cast_no_2000(self): - dialect = mssql.dialect() - dialect.server_version_info = base.MS_2000_VERSION - stmt = tables.c.table_name == 'somename' - self.assert_compile( - stmt, - "[TABLES_1].[TABLE_NAME] = :TABLE_NAME_1", - dialect=dialect - ) - - def test_info_unicode_cast(self): - dialect = mssql.dialect() - dialect.server_version_info = base.MS_2005_VERSION - stmt = tables.c.table_name == 'somename' - self.assert_compile( - stmt, - "[TABLES_1].[TABLE_NAME] = CAST(:TABLE_NAME_1 AS NVARCHAR(max))", - dialect=dialect - ) - -class ReflectHugeViewTest(fixtures.TestBase): - __only_on__ = 'mssql' - - def setup(self): - self.col_num = 150 - - self.metadata = MetaData(testing.db) - t = Table('base_table', self.metadata, - *[ - Column("long_named_column_number_%d" % i, Integer) - for i in range(self.col_num) - ] - ) - self.view_str = view_str = \ - "CREATE VIEW huge_named_view AS SELECT %s FROM base_table" % ( - ",".join("long_named_column_number_%d" % i - for i in range(self.col_num)) - ) - assert len(view_str) > 4000 - - event.listen(t, 'after_create', DDL(view_str) ) - event.listen(t, 'before_drop', DDL("DROP VIEW huge_named_view") ) - - self.metadata.create_all() - - def teardown(self): - self.metadata.drop_all() - - def test_inspect_view_definition(self): - inspector = Inspector.from_engine(testing.db) - view_def = inspector.get_view_definition("huge_named_view") - eq_(view_def, self.view_str) - |