summaryrefslogtreecommitdiff
path: root/test/dialect/test_mssql.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
commit1c23741b8e045d266d0ecbed975952547444a5fa (patch)
tree366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/test_mssql.py
parent83f3dbc83d1066216084a01b32cddcc090f697d5 (diff)
downloadsqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/test_mssql.py')
-rw-r--r--test/dialect/test_mssql.py2143
1 files changed, 0 insertions, 2143 deletions
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
deleted file mode 100644
index b2eb8609b..000000000
--- a/test/dialect/test_mssql.py
+++ /dev/null
@@ -1,2143 +0,0 @@
-# -*- encoding: utf-8
-from sqlalchemy.testing import eq_, engines, pickleable
-import datetime
-import os
-from sqlalchemy import *
-from sqlalchemy import types, exc, schema, event
-from sqlalchemy.orm import *
-from sqlalchemy.sql import table, column
-from sqlalchemy.databases import mssql
-from sqlalchemy.dialects.mssql import pyodbc, mxodbc, pymssql
-from sqlalchemy.dialects.mssql.base import TIME
-from sqlalchemy.engine import url
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
- AssertsExecutionResults, ComparesTables
-from sqlalchemy import testing
-from sqlalchemy.testing import emits_warning_on, assert_raises_message
-import decimal
-from sqlalchemy.engine.reflection import Inspector
-from sqlalchemy.util import b, u, ue
-from sqlalchemy import sql, util
-
-
-class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = mssql.dialect()
-
- def test_true_false(self):
- self.assert_compile(
- sql.false(), "0"
- )
- self.assert_compile(
- sql.true(),
- "1"
- )
-
- def test_select(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.select(),
- 'SELECT sometable.somecolumn FROM sometable')
-
- def test_select_with_nolock(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.select().with_hint(t, 'WITH (NOLOCK)'),
- 'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)')
-
- def test_join_with_hint(self):
- t1 = table('t1',
- column('a', Integer),
- column('b', String),
- column('c', String),
- )
- t2 = table('t2',
- column("a", Integer),
- column("b", Integer),
- column("c", Integer),
- )
- join = t1.join(t2, t1.c.a==t2.c.a).\
- select().with_hint(t1, 'WITH (NOLOCK)')
- self.assert_compile(
- join,
- 'SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c '
- 'FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a'
- )
-
- def test_insert(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.insert(),
- 'INSERT INTO sometable (somecolumn) VALUES '
- '(:somecolumn)')
-
- def test_update(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.update(t.c.somecolumn == 7),
- 'UPDATE sometable SET somecolumn=:somecolum'
- 'n WHERE sometable.somecolumn = '
- ':somecolumn_1', dict(somecolumn=10))
-
- def test_insert_hint(self):
- t = table('sometable', column('somecolumn'))
- for targ in (None, t):
- for darg in ("*", "mssql"):
- self.assert_compile(
- t.insert().
- values(somecolumn="x").
- with_hint("WITH (PAGLOCK)",
- selectable=targ,
- dialect_name=darg),
- "INSERT INTO sometable WITH (PAGLOCK) "
- "(somecolumn) VALUES (:somecolumn)"
- )
-
- def test_update_hint(self):
- t = table('sometable', column('somecolumn'))
- for targ in (None, t):
- for darg in ("*", "mssql"):
- self.assert_compile(
- t.update().where(t.c.somecolumn=="q").
- values(somecolumn="x").
- with_hint("WITH (PAGLOCK)",
- selectable=targ,
- dialect_name=darg),
- "UPDATE sometable WITH (PAGLOCK) "
- "SET somecolumn=:somecolumn "
- "WHERE sometable.somecolumn = :somecolumn_1"
- )
-
- def test_update_exclude_hint(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(
- t.update().where(t.c.somecolumn=="q").
- values(somecolumn="x").
- with_hint("XYZ", "mysql"),
- "UPDATE sometable SET somecolumn=:somecolumn "
- "WHERE sometable.somecolumn = :somecolumn_1"
- )
-
- def test_delete_hint(self):
- t = table('sometable', column('somecolumn'))
- for targ in (None, t):
- for darg in ("*", "mssql"):
- self.assert_compile(
- t.delete().where(t.c.somecolumn=="q").
- with_hint("WITH (PAGLOCK)",
- selectable=targ,
- dialect_name=darg),
- "DELETE FROM sometable WITH (PAGLOCK) "
- "WHERE sometable.somecolumn = :somecolumn_1"
- )
-
- def test_delete_exclude_hint(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(
- t.delete().\
- where(t.c.somecolumn=="q").\
- with_hint("XYZ", dialect_name="mysql"),
- "DELETE FROM sometable WHERE "
- "sometable.somecolumn = :somecolumn_1"
- )
-
- def test_update_from_hint(self):
- t = table('sometable', column('somecolumn'))
- t2 = table('othertable', column('somecolumn'))
- for darg in ("*", "mssql"):
- self.assert_compile(
- t.update().where(t.c.somecolumn==t2.c.somecolumn).
- values(somecolumn="x").
- with_hint("WITH (PAGLOCK)",
- selectable=t2,
- dialect_name=darg),
- "UPDATE sometable SET somecolumn=:somecolumn "
- "FROM sometable, othertable WITH (PAGLOCK) "
- "WHERE sometable.somecolumn = othertable.somecolumn"
- )
-
- # TODO: not supported yet.
- #def test_delete_from_hint(self):
- # t = table('sometable', column('somecolumn'))
- # t2 = table('othertable', column('somecolumn'))
- # for darg in ("*", "mssql"):
- # self.assert_compile(
- # t.delete().where(t.c.somecolumn==t2.c.somecolumn).
- # with_hint("WITH (PAGLOCK)",
- # selectable=t2,
- # dialect_name=darg),
- # ""
- # )
-
- def test_strict_binds(self):
- """test the 'strict' compiler binds."""
-
- from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler
- mxodbc_dialect = mxodbc.dialect()
- mxodbc_dialect.statement_compiler = MSSQLStrictCompiler
-
- t = table('sometable', column('foo'))
-
- for expr, compile in [
- (
- select([literal("x"), literal("y")]),
- "SELECT 'x' AS anon_1, 'y' AS anon_2",
- ),
- (
- select([t]).where(t.c.foo.in_(['x', 'y', 'z'])),
- "SELECT sometable.foo FROM sometable WHERE sometable.foo "
- "IN ('x', 'y', 'z')",
- ),
- (
- t.c.foo.in_([None]),
- "sometable.foo IN (NULL)"
- )
- ]:
- self.assert_compile(expr, compile, dialect=mxodbc_dialect)
-
- def test_in_with_subqueries(self):
- """Test removal of legacy behavior that converted "x==subquery"
- to use IN.
-
- """
-
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.select().where(t.c.somecolumn
- == t.select()),
- 'SELECT sometable.somecolumn FROM '
- 'sometable WHERE sometable.somecolumn = '
- '(SELECT sometable.somecolumn FROM '
- 'sometable)')
- self.assert_compile(t.select().where(t.c.somecolumn
- != t.select()),
- 'SELECT sometable.somecolumn FROM '
- 'sometable WHERE sometable.somecolumn != '
- '(SELECT sometable.somecolumn FROM '
- 'sometable)')
-
- def test_count(self):
- t = table('sometable', column('somecolumn'))
- self.assert_compile(t.count(),
- 'SELECT count(sometable.somecolumn) AS '
- 'tbl_row_count FROM sometable')
-
- def test_noorderby_insubquery(self):
- """test that the ms-sql dialect removes ORDER BY clauses from
- subqueries"""
-
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String),
- column('description', String),
- )
-
- q = select([table1.c.myid],
- order_by=[table1.c.myid]).alias('foo')
- crit = q.c.myid == table1.c.myid
- self.assert_compile(select(['*'], crit),
- "SELECT * FROM (SELECT mytable.myid AS "
- "myid FROM mytable) AS foo, mytable WHERE "
- "foo.myid = mytable.myid")
-
-
-
- def test_delete_schema(self):
- metadata = MetaData()
- tbl = Table('test', metadata, Column('id', Integer,
- primary_key=True), schema='paj')
- self.assert_compile(tbl.delete(tbl.c.id == 1),
- 'DELETE FROM paj.test WHERE paj.test.id = '
- ':id_1')
- s = select([tbl.c.id]).where(tbl.c.id == 1)
- self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
- 'DELETE FROM paj.test WHERE paj.test.id IN '
- '(SELECT test_1.id FROM paj.test AS test_1 '
- 'WHERE test_1.id = :id_1)')
-
- def test_delete_schema_multipart(self):
- metadata = MetaData()
- tbl = Table('test', metadata, Column('id', Integer,
- primary_key=True), schema='banana.paj')
- self.assert_compile(tbl.delete(tbl.c.id == 1),
- 'DELETE FROM banana.paj.test WHERE '
- 'banana.paj.test.id = :id_1')
- s = select([tbl.c.id]).where(tbl.c.id == 1)
- self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
- 'DELETE FROM banana.paj.test WHERE '
- 'banana.paj.test.id IN (SELECT test_1.id '
- 'FROM banana.paj.test AS test_1 WHERE '
- 'test_1.id = :id_1)')
-
- def test_delete_schema_multipart_needs_quoting(self):
- metadata = MetaData()
- tbl = Table('test', metadata, Column('id', Integer,
- primary_key=True), schema='banana split.paj')
- self.assert_compile(tbl.delete(tbl.c.id == 1),
- 'DELETE FROM [banana split].paj.test WHERE '
- '[banana split].paj.test.id = :id_1')
- s = select([tbl.c.id]).where(tbl.c.id == 1)
- self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
- 'DELETE FROM [banana split].paj.test WHERE '
- '[banana split].paj.test.id IN (SELECT '
- 'test_1.id FROM [banana split].paj.test AS '
- 'test_1 WHERE test_1.id = :id_1)')
-
- def test_delete_schema_multipart_both_need_quoting(self):
- metadata = MetaData()
- tbl = Table('test', metadata, Column('id', Integer,
- primary_key=True),
- schema='banana split.paj with a space')
- self.assert_compile(tbl.delete(tbl.c.id == 1),
- 'DELETE FROM [banana split].[paj with a '
- 'space].test WHERE [banana split].[paj '
- 'with a space].test.id = :id_1')
- s = select([tbl.c.id]).where(tbl.c.id == 1)
- self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
- 'DELETE FROM [banana split].[paj with a '
- 'space].test WHERE [banana split].[paj '
- 'with a space].test.id IN (SELECT '
- 'test_1.id FROM [banana split].[paj with a '
- 'space].test AS test_1 WHERE test_1.id = '
- ':id_1)')
-
- def test_union(self):
- t1 = table('t1', column('col1'), column('col2'), column('col3'
- ), column('col4'))
- t2 = table('t2', column('col1'), column('col2'), column('col3'
- ), column('col4'))
- s1, s2 = select([t1.c.col3.label('col3'), t1.c.col4.label('col4'
- )], t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \
- select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
- t2.c.col2.in_(['t2col2r2', 't2col2r3']))
- u = union(s1, s2, order_by=['col3', 'col4'])
- self.assert_compile(u,
- 'SELECT t1.col3 AS col3, t1.col4 AS col4 '
- 'FROM t1 WHERE t1.col2 IN (:col2_1, '
- ':col2_2) UNION SELECT t2.col3 AS col3, '
- 't2.col4 AS col4 FROM t2 WHERE t2.col2 IN '
- '(:col2_3, :col2_4) ORDER BY col3, col4')
- self.assert_compile(u.alias('bar').select(),
- 'SELECT bar.col3, bar.col4 FROM (SELECT '
- 't1.col3 AS col3, t1.col4 AS col4 FROM t1 '
- 'WHERE t1.col2 IN (:col2_1, :col2_2) UNION '
- 'SELECT t2.col3 AS col3, t2.col4 AS col4 '
- 'FROM t2 WHERE t2.col2 IN (:col2_3, '
- ':col2_4)) AS bar')
-
- def test_function(self):
- self.assert_compile(func.foo(1, 2), 'foo(:foo_1, :foo_2)')
- self.assert_compile(func.current_time(), 'CURRENT_TIME')
- self.assert_compile(func.foo(), 'foo()')
- m = MetaData()
- t = Table('sometable', m, Column('col1', Integer), Column('col2'
- , Integer))
- self.assert_compile(select([func.max(t.c.col1)]),
- 'SELECT max(sometable.col1) AS max_1 FROM '
- 'sometable')
-
- def test_function_overrides(self):
- self.assert_compile(func.current_date(), "GETDATE()")
- self.assert_compile(func.length(3), "LEN(:length_1)")
-
- def test_extract(self):
- t = table('t', column('col1'))
-
- for field in 'day', 'month', 'year':
- self.assert_compile(
- select([extract(field, t.c.col1)]),
- 'SELECT DATEPART("%s", t.col1) AS anon_1 FROM t' % field)
-
- def test_update_returning(self):
- table1 = table('mytable', column('myid', Integer), column('name'
- , String(128)), column('description',
- String(128)))
- u = update(table1, values=dict(name='foo'
- )).returning(table1.c.myid, table1.c.name)
- self.assert_compile(u,
- 'UPDATE mytable SET name=:name OUTPUT '
- 'inserted.myid, inserted.name')
- u = update(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(u,
- 'UPDATE mytable SET name=:name OUTPUT '
- 'inserted.myid, inserted.name, '
- 'inserted.description')
- u = update(table1, values=dict(name='foo'
- )).returning(table1).where(table1.c.name == 'bar')
- self.assert_compile(u,
- 'UPDATE mytable SET name=:name OUTPUT '
- 'inserted.myid, inserted.name, '
- 'inserted.description WHERE mytable.name = '
- ':name_1')
- u = update(table1, values=dict(name='foo'
- )).returning(func.length(table1.c.name))
- self.assert_compile(u,
- 'UPDATE mytable SET name=:name OUTPUT '
- 'LEN(inserted.name) AS length_1')
-
- def test_delete_returning(self):
- table1 = table('mytable', column('myid', Integer), column('name'
- , String(128)), column('description',
- String(128)))
- d = delete(table1).returning(table1.c.myid, table1.c.name)
- self.assert_compile(d,
- 'DELETE FROM mytable OUTPUT deleted.myid, '
- 'deleted.name')
- d = delete(table1).where(table1.c.name == 'bar'
- ).returning(table1.c.myid,
- table1.c.name)
- self.assert_compile(d,
- 'DELETE FROM mytable OUTPUT deleted.myid, '
- 'deleted.name WHERE mytable.name = :name_1')
-
- def test_insert_returning(self):
- table1 = table('mytable', column('myid', Integer), column('name'
- , String(128)), column('description',
- String(128)))
- i = insert(table1, values=dict(name='foo'
- )).returning(table1.c.myid, table1.c.name)
- self.assert_compile(i,
- 'INSERT INTO mytable (name) OUTPUT '
- 'inserted.myid, inserted.name VALUES '
- '(:name)')
- i = insert(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(i,
- 'INSERT INTO mytable (name) OUTPUT '
- 'inserted.myid, inserted.name, '
- 'inserted.description VALUES (:name)')
- i = insert(table1, values=dict(name='foo'
- )).returning(func.length(table1.c.name))
- self.assert_compile(i,
- 'INSERT INTO mytable (name) OUTPUT '
- 'LEN(inserted.name) AS length_1 VALUES '
- '(:name)')
-
- def test_limit_using_top(self):
- t = table('t', column('x', Integer), column('y', Integer))
-
- s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(10)
-
- self.assert_compile(
- s,
- "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={'x_1': 5}
- )
-
- def test_limit_zero_using_top(self):
- t = table('t', column('x', Integer), column('y', Integer))
-
- s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(0)
-
- self.assert_compile(
- s,
- "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={'x_1': 5}
- )
-
- def test_offset_using_window(self):
- t = table('t', column('x', Integer), column('y', Integer))
-
- s = select([t]).where(t.c.x==5).order_by(t.c.y).offset(20)
-
- # test that the select is not altered with subsequent compile
- # calls
- for i in range(2):
- self.assert_compile(
- s,
- "SELECT anon_1.x, anon_1.y FROM (SELECT t.x AS x, t.y "
- "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS "
- "mssql_rn FROM t WHERE t.x = :x_1) AS "
- "anon_1 WHERE mssql_rn > :mssql_rn_1",
- checkparams={'mssql_rn_1': 20, 'x_1': 5}
- )
-
- def test_limit_offset_using_window(self):
- t = table('t', column('x', Integer), column('y', Integer))
-
- s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(10).offset(20)
-
- self.assert_compile(
- s,
- "SELECT anon_1.x, anon_1.y "
- "FROM (SELECT t.x AS x, t.y AS y, "
- "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn "
- "FROM t "
- "WHERE t.x = :x_1) AS anon_1 "
- "WHERE mssql_rn > :mssql_rn_1 AND mssql_rn <= :mssql_rn_2",
- checkparams={'mssql_rn_1': 20, 'mssql_rn_2': 30, 'x_1': 5}
- )
-
- def test_limit_offset_with_correlated_order_by(self):
- t1 = table('t1', column('x', Integer), column('y', Integer))
- t2 = table('t2', column('x', Integer), column('y', Integer))
-
- order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar()
- s = select([t1]).where(t1.c.x == 5).order_by(order_by) \
- .limit(10).offset(20)
-
- self.assert_compile(
- s,
- "SELECT anon_1.x, anon_1.y "
- "FROM (SELECT t1.x AS x, t1.y AS y, "
- "ROW_NUMBER() OVER (ORDER BY "
- "(SELECT t2.y FROM t2 WHERE t1.x = t2.x)"
- ") AS mssql_rn "
- "FROM t1 "
- "WHERE t1.x = :x_1) AS anon_1 "
- "WHERE mssql_rn > :mssql_rn_1 AND mssql_rn <= :mssql_rn_2",
- checkparams={'mssql_rn_1': 20, 'mssql_rn_2': 30, 'x_1': 5}
- )
-
- def test_limit_zero_offset_using_window(self):
- t = table('t', column('x', Integer), column('y', Integer))
-
- s = select([t]).where(t.c.x==5).order_by(t.c.y).limit(0).offset(0)
-
- # render the LIMIT of zero, but not the OFFSET
- # of zero, so produces TOP 0
- self.assert_compile(
- s,
- "SELECT TOP 0 t.x, t.y FROM t "
- "WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={'x_1': 5}
- )
-
- def test_sequence_start_0(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('id', Integer, Sequence('', 0), primary_key=True))
- self.assert_compile(schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), "
- "PRIMARY KEY (id))"
- )
-
- def test_sequence_non_primary_key(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('id', Integer, Sequence(''), primary_key=False))
- self.assert_compile(schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))"
- )
-
- def test_sequence_ignore_nullability(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('id', Integer, Sequence(''), nullable=True))
- self.assert_compile(schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))"
- )
-
- def test_index_clustering(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('id', Integer))
- idx = Index("foo", tbl.c.id, mssql_clustered=True)
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE CLUSTERED INDEX foo ON test (id)"
- )
-
- def test_index_ordering(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('x', Integer), Column('y', Integer), Column('z', Integer))
- idx = Index("foo", tbl.c.x.desc(), "y")
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE INDEX foo ON test (x DESC, y)"
- )
-
- def test_index_extra_include_1(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('x', Integer), Column('y', Integer), Column('z', Integer))
- idx = Index("foo", tbl.c.x, mssql_include=['y'])
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE INDEX foo ON test (x) INCLUDE (y)"
- )
-
- def test_index_extra_include_2(self):
- metadata = MetaData()
- tbl = Table('test', metadata,
- Column('x', Integer), Column('y', Integer), Column('z', Integer))
- idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y])
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE INDEX foo ON test (x) INCLUDE (y)"
- )
-
-class SchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
- """SQL server cannot reference schema-qualified tables in a SELECT statement, they
- must be aliased.
- """
- __dialect__ = mssql.dialect()
-
- def setup(self):
- metadata = MetaData()
- self.t1 = table('t1',
- column('a', Integer),
- column('b', String),
- column('c', String),
- )
- self.t2 = Table(
- 't2', metadata,
- Column("a", Integer),
- Column("b", Integer),
- Column("c", Integer),
- schema = 'schema'
- )
-
- def test_result_map(self):
- s = self.t2.select()
- c = s.compile(dialect=self.__dialect__)
- assert self.t2.c.a in set(c.result_map['a'][1])
-
- def test_result_map_use_labels(self):
- s = self.t2.select(use_labels=True)
- c = s.compile(dialect=self.__dialect__)
- assert self.t2.c.a in set(c.result_map['schema_t2_a'][1])
-
- def test_straight_select(self):
- self.assert_compile(self.t2.select(),
- "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1"
- )
-
- def test_straight_select_use_labels(self):
- self.assert_compile(
- self.t2.select(use_labels=True),
- "SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, "
- "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1"
- )
-
- def test_join_to_schema(self):
- t1, t2 = self.t1, self.t2
- self.assert_compile(
- t1.join(t2, t1.c.a==t2.c.a).select(),
- "SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 "
- "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a"
- )
-
- def test_union_schema_to_non(self):
- t1, t2 = self.t1, self.t2
- s = select([t2.c.a, t2.c.b]).apply_labels().\
- union(
- select([t1.c.a, t1.c.b]).apply_labels()
- ).alias().select()
- self.assert_compile(
- s,
- "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
- "(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b "
- "FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, "
- "t1.b AS t1_b FROM t1) AS anon_1"
- )
-
- def test_column_subquery_to_alias(self):
- a1 = self.t2.alias('a1')
- s = select([self.t2, select([a1.c.a]).as_scalar()])
- self.assert_compile(
- s,
- "SELECT t2_1.a, t2_1.b, t2_1.c, "
- "(SELECT a1.a FROM [schema].t2 AS a1) "
- "AS anon_1 FROM [schema].t2 AS t2_1"
-
- )
-
-class IdentityInsertTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'mssql'
- __dialect__ = mssql.MSDialect()
-
- @classmethod
- def setup_class(cls):
- global metadata, cattable
- metadata = MetaData(testing.db)
-
- cattable = Table('cattable', metadata,
- Column('id', Integer),
- Column('description', String(50)),
- PrimaryKeyConstraint('id', name='PK_cattable'),
- )
-
- def setup(self):
- metadata.create_all()
-
- def teardown(self):
- metadata.drop_all()
-
- def test_compiled(self):
- self.assert_compile(cattable.insert().values(id=9,
- description='Python'),
- 'INSERT INTO cattable (id, description) '
- 'VALUES (:id, :description)')
-
- def test_execute(self):
- cattable.insert().values(id=9, description='Python').execute()
-
- cats = cattable.select().order_by(cattable.c.id).execute()
- eq_([(9, 'Python')], list(cats))
-
- result = cattable.insert().values(description='PHP').execute()
- eq_([10], result.inserted_primary_key)
- lastcat = cattable.select().order_by(desc(cattable.c.id)).execute()
- eq_((10, 'PHP'), lastcat.first())
-
- def test_executemany(self):
- cattable.insert().execute([{'id': 89, 'description': 'Python'},
- {'id': 8, 'description': 'Ruby'},
- {'id': 3, 'description': 'Perl'},
- {'id': 1, 'description': 'Java'}])
- cats = cattable.select().order_by(cattable.c.id).execute()
- eq_([(1, 'Java'), (3, 'Perl'), (8, 'Ruby'), (89, 'Python')],
- list(cats))
- cattable.insert().execute([{'description': 'PHP'},
- {'description': 'Smalltalk'}])
- lastcats = \
- cattable.select().order_by(desc(cattable.c.id)).limit(2).execute()
- eq_([(91, 'Smalltalk'), (90, 'PHP')], list(lastcats))
-
-
-class ReflectionTest(fixtures.TestBase, ComparesTables):
- __only_on__ = 'mssql'
-
- @testing.provide_metadata
- def test_basic_reflection(self):
- meta = self.metadata
-
- users = Table(
- 'engine_users',
- meta,
- Column('user_id', types.INT, primary_key=True),
- Column('user_name', types.VARCHAR(20), nullable=False),
- Column('test1', types.CHAR(5), nullable=False),
- Column('test2', types.Float(5), nullable=False),
- Column('test3', types.Text),
- Column('test4', types.Numeric, nullable=False),
- Column('test5', types.DateTime),
- Column('parent_user_id', types.Integer,
- ForeignKey('engine_users.user_id')),
- Column('test6', types.DateTime, nullable=False),
- Column('test7', types.Text),
- Column('test8', types.LargeBinary),
- Column('test_passivedefault2', types.Integer,
- server_default='5'),
- Column('test9', types.BINARY(100)),
- Column('test_numeric', types.Numeric()),
- )
-
- addresses = Table(
- 'engine_email_addresses',
- meta,
- Column('address_id', types.Integer, primary_key=True),
- Column('remote_user_id', types.Integer,
- ForeignKey(users.c.user_id)),
- Column('email_address', types.String(20)),
- )
- meta.create_all()
-
- meta2 = MetaData()
- reflected_users = Table('engine_users', meta2,
- autoload=True,
- autoload_with=testing.db)
- reflected_addresses = Table('engine_email_addresses',
- meta2, autoload=True, autoload_with=testing.db)
- self.assert_tables_equal(users, reflected_users)
- self.assert_tables_equal(addresses, reflected_addresses)
-
- @testing.provide_metadata
- def test_identity(self):
- metadata = self.metadata
- table = Table(
- 'identity_test', metadata,
- Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
- )
- table.create()
-
- meta2 = MetaData(testing.db)
- table2 = Table('identity_test', meta2, autoload=True)
- sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
- and table2.c['col1'].default
- assert sequence.start == 2
- assert sequence.increment == 3
-
- @testing.emits_warning("Did not recognize")
- @testing.provide_metadata
- def test_skip_types(self):
- metadata = self.metadata
- testing.db.execute("""
- create table foo (id integer primary key, data xml)
- """)
- t1 = Table('foo', metadata, autoload=True)
- assert isinstance(t1.c.id.type, Integer)
- assert isinstance(t1.c.data.type, types.NullType)
-
-
- @testing.provide_metadata
- def test_db_qualified_items(self):
- metadata = self.metadata
- Table('foo', metadata, Column('id', Integer, primary_key=True))
- Table('bar', metadata,
- Column('id', Integer, primary_key=True),
- Column('foo_id', Integer, ForeignKey('foo.id', name="fkfoo"))
- )
- metadata.create_all()
-
- dbname = testing.db.scalar("select db_name()")
- owner = testing.db.scalar("SELECT user_name()")
-
- inspector = inspect(testing.db)
- bar_via_db = inspector.get_foreign_keys(
- "bar", schema="%s.%s" % (dbname, owner))
- eq_(
- bar_via_db,
- [{
- 'referred_table': 'foo',
- 'referred_columns': ['id'],
- 'referred_schema': 'test.dbo',
- 'name': 'fkfoo',
- 'constrained_columns': ['foo_id']}]
- )
-
- assert testing.db.has_table("bar", schema="test.dbo")
-
- m2 = MetaData()
- Table('bar', m2, schema="test.dbo", autoload=True,
- autoload_with=testing.db)
- eq_(m2.tables["test.dbo.foo"].schema, "test.dbo")
-
-
- @testing.provide_metadata
- def test_indexes_cols(self):
- metadata = self.metadata
-
- t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer))
- Index('foo', t1.c.x, t1.c.y)
- metadata.create_all()
-
- m2 = MetaData()
- t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
-
- eq_(
- set(list(t2.indexes)[0].columns),
- set([t2.c['x'], t2.c.y])
- )
-
- @testing.provide_metadata
- def test_indexes_cols_with_commas(self):
- metadata = self.metadata
-
- t1 = Table('t', metadata,
- Column('x, col', Integer, key='x'),
- Column('y', Integer)
- )
- Index('foo', t1.c.x, t1.c.y)
- metadata.create_all()
-
- m2 = MetaData()
- t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
-
- eq_(
- set(list(t2.indexes)[0].columns),
- set([t2.c['x, col'], t2.c.y])
- )
-
- @testing.provide_metadata
- def test_indexes_cols_with_spaces(self):
- metadata = self.metadata
-
- t1 = Table('t', metadata, Column('x col', Integer, key='x'),
- Column('y', Integer))
- Index('foo', t1.c.x, t1.c.y)
- metadata.create_all()
-
- m2 = MetaData()
- t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
-
- eq_(
- set(list(t2.indexes)[0].columns),
- set([t2.c['x col'], t2.c.y])
- )
-
-class QueryUnicodeTest(fixtures.TestBase):
-
- __only_on__ = 'mssql'
-
- def test_convert_unicode(self):
- meta = MetaData(testing.db)
- t1 = Table('unitest_table', meta, Column('id', Integer,
- primary_key=True), Column('descr',
- mssql.MSText(convert_unicode=True)))
- meta.create_all()
- con = testing.db.connect()
-
- # encode in UTF-8 (sting object) because this is the default
- # dialect encoding
-
- con.execute(ue("insert into unitest_table values ('bien u\
- umang\xc3\xa9')").encode('UTF-8'))
- try:
- r = t1.select().execute().first()
- assert isinstance(r[1], util.text_type), \
- '%s is %s instead of unicode, working on %s' % (r[1],
- type(r[1]), meta.bind)
- finally:
- meta.drop_all()
-
-from sqlalchemy.testing.assertsql import ExactSQL
-class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
- __only_on__ = 'mssql'
-
- def test_fetchid_trigger(self):
- """
- Verify identity return value on inserting to a trigger table.
-
- MSSQL's OUTPUT INSERTED clause does not work for the
- case of a table having an identity (autoincrement)
- primary key column, and which also has a trigger configured
- to fire upon each insert and subsequently perform an
- insert into a different table.
-
- SQLALchemy's MSSQL dialect by default will attempt to
- use an OUTPUT_INSERTED clause, which in this case will
- raise the following error:
-
- ProgrammingError: (ProgrammingError) ('42000', 334,
- "[Microsoft][SQL Server Native Client 10.0][SQL Server]The
- target table 't1' of the DML statement cannot have any enabled
- triggers if the statement contains an OUTPUT clause without
- INTO clause.", 7748) 'INSERT INTO t1 (descr) OUTPUT inserted.id
- VALUES (?)' ('hello',)
-
- This test verifies a workaround, which is to rely on the
- older SCOPE_IDENTITY() call, which still works for this scenario.
- To enable the workaround, the Table must be instantiated
- with the init parameter 'implicit_returning = False'.
- """
-
- #todo: this same test needs to be tried in a multithreaded context
- # with multiple threads inserting to the same table.
- #todo: check whether this error also occurs with clients other
- # than the SQL Server Native Client. Maybe an assert_raises
- # test should be written.
- meta = MetaData(testing.db)
- t1 = Table('t1', meta,
- Column('id', Integer, Sequence('fred', 100, 1),
- primary_key=True),
- Column('descr', String(200)),
- # the following flag will prevent the
- # MSSQLCompiler.returning_clause from getting called,
- # though the ExecutionContext will still have a
- # _select_lastrowid, so the SELECT SCOPE_IDENTITY() will
- # hopefully be called instead.
- implicit_returning = False
- )
- t2 = Table('t2', meta,
- Column('id', Integer, Sequence('fred', 200, 1),
- primary_key=True),
- Column('descr', String(200)))
- meta.create_all()
- con = testing.db.connect()
- con.execute("""create trigger paj on t1 for insert as
- insert into t2 (descr) select descr from inserted""")
-
- try:
- tr = con.begin()
- r = con.execute(t2.insert(), descr='hello')
- self.assert_(r.inserted_primary_key == [200])
- r = con.execute(t1.insert(), descr='hello')
- self.assert_(r.inserted_primary_key == [100])
-
- finally:
- tr.commit()
- con.execute("""drop trigger paj""")
- meta.drop_all()
-
- @testing.provide_metadata
- def test_disable_scope_identity(self):
- engine = engines.testing_engine(options={"use_scope_identity":False})
- metadata = self.metadata
- metadata.bind = engine
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
- implicit_returning=False
- )
- metadata.create_all()
-
- self.assert_sql_execution(
- testing.db,
- lambda: engine.execute(t1.insert()),
- ExactSQL("INSERT INTO t1 DEFAULT VALUES"),
- # we dont have an event for
- # "SELECT @@IDENTITY" part here.
- # this will be in 0.8 with #2459
- )
- assert not engine.dialect.use_scope_identity
-
- def test_insertid_schema(self):
- meta = MetaData(testing.db)
- con = testing.db.connect()
- con.execute('create schema paj')
- tbl = Table('test', meta,
- Column('id', Integer, primary_key=True), schema='paj')
- tbl.create()
- try:
- tbl.insert().execute({'id':1})
- finally:
- tbl.drop()
- con.execute('drop schema paj')
-
- def test_returning_no_autoinc(self):
- meta = MetaData(testing.db)
- table = Table('t1', meta, Column('id', Integer,
- primary_key=True), Column('data', String(50)))
- table.create()
- try:
- result = table.insert().values(id=1,
- data=func.lower('SomeString'
- )).returning(table.c.id, table.c.data).execute()
- eq_(result.fetchall(), [(1, 'somestring')])
- finally:
-
- # this will hang if the "SET IDENTITY_INSERT t1 OFF" occurs
- # before the result is fetched
-
- table.drop()
-
- def test_delete_schema(self):
- meta = MetaData(testing.db)
- con = testing.db.connect()
- con.execute('create schema paj')
- tbl = Table('test', meta, Column('id', Integer,
- primary_key=True), schema='paj')
- tbl.create()
- try:
- tbl.insert().execute({'id': 1})
- tbl.delete(tbl.c.id == 1).execute()
- finally:
- tbl.drop()
- con.execute('drop schema paj')
-
- def test_insertid_reserved(self):
- meta = MetaData(testing.db)
- table = Table(
- 'select', meta,
- Column('col', Integer, primary_key=True)
- )
- table.create()
-
- meta2 = MetaData(testing.db)
- try:
- table.insert().execute(col=7)
- finally:
- table.drop()
-
-
-class Foo(object):
- def __init__(self, **kw):
- for k in kw:
- setattr(self, k, kw[k])
-
-class GenerativeQueryTest(fixtures.TestBase):
- __only_on__ = 'mssql'
-
- @classmethod
- def setup_class(cls):
- global foo, metadata
- metadata = MetaData(testing.db)
- foo = Table('foo', metadata,
- Column('id', Integer, Sequence('foo_id_seq'),
- primary_key=True),
- Column('bar', Integer),
- Column('range', Integer))
-
- mapper(Foo, foo)
- metadata.create_all()
-
- sess = create_session(bind=testing.db)
- for i in range(100):
- sess.add(Foo(bar=i, range=i%10))
- sess.flush()
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
- clear_mappers()
-
- def test_slice_mssql(self):
- sess = create_session(bind=testing.db)
- query = sess.query(Foo)
- orig = query.all()
- assert list(query[:10]) == orig[:10]
- assert list(query[:10]) == orig[:10]
-
-
-class SchemaTest(fixtures.TestBase):
-
- def setup(self):
- t = Table('sometable', MetaData(),
- Column('pk_column', Integer),
- Column('test_column', String)
- )
- self.column = t.c.test_column
-
- dialect = mssql.dialect()
- self.ddl_compiler = dialect.ddl_compiler(dialect,
- schema.CreateTable(t))
-
- def _column_spec(self):
- return self.ddl_compiler.get_column_specification(self.column)
-
- def test_that_mssql_default_nullability_emits_null(self):
- eq_("test_column VARCHAR(max) NULL", self._column_spec())
-
- def test_that_mssql_none_nullability_does_not_emit_nullability(self):
- self.column.nullable = None
- eq_("test_column VARCHAR(max)", self._column_spec())
-
- def test_that_mssql_specified_nullable_emits_null(self):
- self.column.nullable = True
- eq_("test_column VARCHAR(max) NULL", self._column_spec())
-
- def test_that_mssql_specified_not_nullable_emits_not_null(self):
- self.column.nullable = False
- eq_("test_column VARCHAR(max) NOT NULL", self._column_spec())
-
-
-def full_text_search_missing():
- """Test if full text search is not implemented and return False if
- it is and True otherwise."""
-
- try:
- connection = testing.db.connect()
- try:
- connection.execute('CREATE FULLTEXT CATALOG Catalog AS '
- 'DEFAULT')
- return False
- except:
- return True
- finally:
- connection.close()
-
-class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __only_on__ = 'mssql'
- __skip_if__ = full_text_search_missing,
-
- @classmethod
- def setup_class(cls):
- global metadata, cattable, matchtable
- metadata = MetaData(testing.db)
- cattable = Table('cattable', metadata, Column('id', Integer),
- Column('description', String(50)),
- PrimaryKeyConstraint('id', name='PK_cattable'))
- matchtable = Table(
- 'matchtable',
- metadata,
- Column('id', Integer),
- Column('title', String(200)),
- Column('category_id', Integer, ForeignKey('cattable.id')),
- PrimaryKeyConstraint('id', name='PK_matchtable'),
- )
- DDL("""CREATE FULLTEXT INDEX
- ON cattable (description)
- KEY INDEX PK_cattable""").execute_at('after-create'
- , matchtable)
- DDL("""CREATE FULLTEXT INDEX
- ON matchtable (title)
- KEY INDEX PK_matchtable""").execute_at('after-create'
- , matchtable)
- metadata.create_all()
- cattable.insert().execute([{'id': 1, 'description': 'Python'},
- {'id': 2, 'description': 'Ruby'}])
- matchtable.insert().execute([{'id': 1, 'title'
- : 'Agile Web Development with Rails'
- , 'category_id': 2}, {'id': 2,
- 'title': 'Dive Into Python',
- 'category_id': 1}, {'id': 3, 'title'
- : "Programming Matz's Ruby",
- 'category_id': 2}, {'id': 4, 'title'
- : 'The Definitive Guide to Django',
- 'category_id': 1}, {'id': 5, 'title'
- : 'Python in a Nutshell',
- 'category_id': 1}])
- DDL("WAITFOR DELAY '00:00:05'"
- ).execute(bind=engines.testing_engine())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
- connection = testing.db.connect()
- connection.execute("DROP FULLTEXT CATALOG Catalog")
- connection.close()
-
- def test_expression(self):
- self.assert_compile(matchtable.c.title.match('somstr'),
- 'CONTAINS (matchtable.title, ?)')
-
- def test_simple_match(self):
- results = \
- matchtable.select().where(matchtable.c.title.match('python'
- )).order_by(matchtable.c.id).execute().fetchall()
- eq_([2, 5], [r.id for r in results])
-
- def test_simple_match_with_apostrophe(self):
- results = \
- matchtable.select().where(matchtable.c.title.match("Matz's"
- )).execute().fetchall()
- eq_([3], [r.id for r in results])
-
- def test_simple_prefix_match(self):
- results = \
- matchtable.select().where(matchtable.c.title.match('"nut*"'
- )).execute().fetchall()
- eq_([5], [r.id for r in results])
-
- def test_simple_inflectional_match(self):
- results = \
- matchtable.select().where(
- matchtable.c.title.match('FORMSOF(INFLECTIONAL, "dives")'
- )).execute().fetchall()
- eq_([2], [r.id for r in results])
-
- def test_or_match(self):
- results1 = \
- matchtable.select().where(or_(matchtable.c.title.match('nutshell'
- ), matchtable.c.title.match('ruby'
- ))).order_by(matchtable.c.id).execute().fetchall()
- eq_([3, 5], [r.id for r in results1])
- results2 = \
- matchtable.select().where(
- matchtable.c.title.match('nutshell OR ruby'
- )).order_by(matchtable.c.id).execute().fetchall()
- eq_([3, 5], [r.id for r in results2])
-
- def test_and_match(self):
- results1 = \
- matchtable.select().where(and_(matchtable.c.title.match('python'
- ), matchtable.c.title.match('nutshell'
- ))).execute().fetchall()
- eq_([5], [r.id for r in results1])
- results2 = \
- matchtable.select().where(
- matchtable.c.title.match('python AND nutshell'
- )).execute().fetchall()
- eq_([5], [r.id for r in results2])
-
- def test_match_across_joins(self):
- results = matchtable.select().where(and_(cattable.c.id
- == matchtable.c.category_id,
- or_(cattable.c.description.match('Ruby'),
- matchtable.c.title.match('nutshell'
- )))).order_by(matchtable.c.id).execute().fetchall()
- eq_([1, 3, 5], [r.id for r in results])
-
-
-class ParseConnectTest(fixtures.TestBase):
-
- def test_pyodbc_connect_dsn_trusted(self):
- dialect = pyodbc.dialect()
- u = url.make_url('mssql://mydsn')
- connection = dialect.create_connect_args(u)
- eq_([['dsn=mydsn;Trusted_Connection=Yes'], {}], connection)
-
- def test_pyodbc_connect_old_style_dsn_trusted(self):
- dialect = pyodbc.dialect()
- u = url.make_url('mssql:///?dsn=mydsn')
- connection = dialect.create_connect_args(u)
- eq_([['dsn=mydsn;Trusted_Connection=Yes'], {}], connection)
-
- def test_pyodbc_connect_dsn_non_trusted(self):
- dialect = pyodbc.dialect()
- u = url.make_url('mssql://username:password@mydsn')
- connection = dialect.create_connect_args(u)
- eq_([['dsn=mydsn;UID=username;PWD=password'], {}], connection)
-
- def test_pyodbc_connect_dsn_extra(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql://username:password@mydsn/?LANGUAGE=us_'
- 'english&foo=bar')
- connection = dialect.create_connect_args(u)
- dsn_string = connection[0][0]
- assert ";LANGUAGE=us_english" in dsn_string
- assert ";foo=bar" in dsn_string
-
- def test_pyodbc_connect(self):
- dialect = pyodbc.dialect()
- u = url.make_url('mssql://username:password@hostspec/database')
- connection = dialect.create_connect_args(u)
- eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI'
- 'D=username;PWD=password'], {}], connection)
-
- def test_pyodbc_connect_comma_port(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql://username:password@hostspec:12345/data'
- 'base')
- connection = dialect.create_connect_args(u)
- eq_([['DRIVER={SQL Server};Server=hostspec,12345;Database=datab'
- 'ase;UID=username;PWD=password'], {}], connection)
-
- def test_pyodbc_connect_config_port(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql://username:password@hostspec/database?p'
- 'ort=12345')
- connection = dialect.create_connect_args(u)
- eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI'
- 'D=username;PWD=password;port=12345'], {}], connection)
-
- def test_pyodbc_extra_connect(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql://username:password@hostspec/database?L'
- 'ANGUAGE=us_english&foo=bar')
- connection = dialect.create_connect_args(u)
- eq_(connection[1], {})
- eq_(connection[0][0]
- in ('DRIVER={SQL Server};Server=hostspec;Database=database;'
- 'UID=username;PWD=password;foo=bar;LANGUAGE=us_english',
- 'DRIVER={SQL Server};Server=hostspec;Database=database;UID='
- 'username;PWD=password;LANGUAGE=us_english;foo=bar'), True)
-
- def test_pyodbc_odbc_connect(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql:///?odbc_connect=DRIVER%3D%7BSQL+Server'
- '%7D%3BServer%3Dhostspec%3BDatabase%3Ddatabase'
- '%3BUID%3Dusername%3BPWD%3Dpassword')
- connection = dialect.create_connect_args(u)
- eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI'
- 'D=username;PWD=password'], {}], connection)
-
- def test_pyodbc_odbc_connect_with_dsn(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase'
- '%3Ddatabase%3BUID%3Dusername%3BPWD%3Dpassword'
- )
- connection = dialect.create_connect_args(u)
- eq_([['dsn=mydsn;Database=database;UID=username;PWD=password'],
- {}], connection)
-
- def test_pyodbc_odbc_connect_ignores_other_values(self):
- dialect = pyodbc.dialect()
- u = \
- url.make_url('mssql://userdiff:passdiff@localhost/dbdiff?od'
- 'bc_connect=DRIVER%3D%7BSQL+Server%7D%3BServer'
- '%3Dhostspec%3BDatabase%3Ddatabase%3BUID%3Duse'
- 'rname%3BPWD%3Dpassword')
- connection = dialect.create_connect_args(u)
- eq_([['DRIVER={SQL Server};Server=hostspec;Database=database;UI'
- 'D=username;PWD=password'], {}], connection)
-
- def test_pymssql_port_setting(self):
- dialect = pymssql.dialect()
-
- u = \
- url.make_url('mssql+pymssql://scott:tiger@somehost/test')
- connection = dialect.create_connect_args(u)
- eq_(
- [[], {'host': 'somehost', 'password': 'tiger',
- 'user': 'scott', 'database': 'test'}], connection
- )
-
- u = \
- url.make_url('mssql+pymssql://scott:tiger@somehost:5000/test')
- connection = dialect.create_connect_args(u)
- eq_(
- [[], {'host': 'somehost:5000', 'password': 'tiger',
- 'user': 'scott', 'database': 'test'}], connection
- )
-
- def test_pymssql_disconnect(self):
- dialect = pymssql.dialect()
-
- for error in [
- 'Adaptive Server connection timed out',
- 'message 20003',
- "Error 10054",
- "Not connected to any MS SQL server",
- "Connection is closed"
- ]:
- eq_(dialect.is_disconnect(error, None, None), True)
-
- eq_(dialect.is_disconnect("not an error", None, None), False)
-
- @testing.only_on(['mssql+pyodbc', 'mssql+pymssql'],
- "FreeTDS specific test")
- def test_bad_freetds_warning(self):
- engine = engines.testing_engine()
-
- def _bad_version(connection):
- return 95, 10, 255
-
- engine.dialect._get_server_version_info = _bad_version
- assert_raises_message(exc.SAWarning,
- 'Unrecognized server version info',
- engine.connect)
-
-class TimeTypeTest(fixtures.TestBase):
-
- def test_result_processor_no_microseconds(self):
- expected = datetime.time(12, 34, 56)
- self._assert_result_processor(expected, '12:34:56')
-
- def test_result_processor_too_many_microseconds(self):
- # microsecond must be in 0..999999, should truncate (6 vs 7 digits)
- expected = datetime.time(12, 34, 56, 123456)
- self._assert_result_processor(expected, '12:34:56.1234567')
-
- def _assert_result_processor(self, expected, value):
- mssql_time_type = TIME()
- result_processor = mssql_time_type.result_processor(None, None)
- eq_(expected, result_processor(value))
-
-
-class TypeDDLTest(fixtures.TestBase):
- def test_boolean(self):
- "Exercise type specification for boolean type."
-
- columns = [
- # column type, args, kwargs, expected ddl
- (Boolean, [], {},
- 'BIT'),
- ]
-
- metadata = MetaData()
- table_args = ['test_mssql_boolean', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(
- Column('c%s' % index, type_(*args, **kw), nullable=None))
-
- boolean_table = Table(*table_args)
- dialect = mssql.dialect()
- gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table))
-
- for col in boolean_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
- self.assert_(repr(col))
-
-
- def test_numeric(self):
- "Exercise type specification and options for numeric types."
-
- columns = [
- # column type, args, kwargs, expected ddl
- (types.NUMERIC, [], {},
- 'NUMERIC'),
- (types.NUMERIC, [None], {},
- 'NUMERIC'),
- (types.NUMERIC, [12, 4], {},
- 'NUMERIC(12, 4)'),
-
- (types.Float, [], {},
- 'FLOAT'),
- (types.Float, [None], {},
- 'FLOAT'),
- (types.Float, [12], {},
- 'FLOAT(12)'),
- (mssql.MSReal, [], {},
- 'REAL'),
-
- (types.Integer, [], {},
- 'INTEGER'),
- (types.BigInteger, [], {},
- 'BIGINT'),
- (mssql.MSTinyInteger, [], {},
- 'TINYINT'),
- (types.SmallInteger, [], {},
- 'SMALLINT'),
- ]
-
- metadata = MetaData()
- table_args = ['test_mssql_numeric', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(
- Column('c%s' % index, type_(*args, **kw), nullable=None))
-
- numeric_table = Table(*table_args)
- dialect = mssql.dialect()
- gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table))
-
- for col in numeric_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
- self.assert_(repr(col))
-
-
- def test_char(self):
- """Exercise COLLATE-ish options on string types."""
-
- columns = [
- (mssql.MSChar, [], {},
- 'CHAR'),
- (mssql.MSChar, [1], {},
- 'CHAR(1)'),
- (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'},
- 'CHAR(1) COLLATE Latin1_General_CI_AS'),
-
- (mssql.MSNChar, [], {},
- 'NCHAR'),
- (mssql.MSNChar, [1], {},
- 'NCHAR(1)'),
- (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'},
- 'NCHAR(1) COLLATE Latin1_General_CI_AS'),
-
- (mssql.MSString, [], {},
- 'VARCHAR(max)'),
- (mssql.MSString, [1], {},
- 'VARCHAR(1)'),
- (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'},
- 'VARCHAR(1) COLLATE Latin1_General_CI_AS'),
-
- (mssql.MSNVarchar, [], {},
- 'NVARCHAR(max)'),
- (mssql.MSNVarchar, [1], {},
- 'NVARCHAR(1)'),
- (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'},
- 'NVARCHAR(1) COLLATE Latin1_General_CI_AS'),
-
- (mssql.MSText, [], {},
- 'TEXT'),
- (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'},
- 'TEXT COLLATE Latin1_General_CI_AS'),
-
- (mssql.MSNText, [], {},
- 'NTEXT'),
- (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
- 'NTEXT COLLATE Latin1_General_CI_AS'),
- ]
-
- metadata = MetaData()
- table_args = ['test_mssql_charset', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(
- Column('c%s' % index, type_(*args, **kw), nullable=None))
-
- charset_table = Table(*table_args)
- dialect = mssql.dialect()
- gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table))
-
- for col in charset_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
- self.assert_(repr(col))
-
-
- def test_timestamp(self):
- """Exercise TIMESTAMP column."""
-
- dialect = mssql.dialect()
-
- metadata = MetaData()
- spec, expected = (TIMESTAMP, 'TIMESTAMP')
- t = Table('mssql_ts', metadata,
- Column('id', Integer, primary_key=True),
- Column('t', spec, nullable=None))
- gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
- testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
- self.assert_(repr(t.c.t))
-
- def test_money(self):
- """Exercise type specification for money types."""
-
- columns = [(mssql.MSMoney, [], {}, 'MONEY'),
- (mssql.MSSmallMoney, [], {}, 'SMALLMONEY')]
- metadata = MetaData()
- table_args = ['test_mssql_money', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(Column('c%s' % index, type_(*args, **kw),
- nullable=None))
- money_table = Table(*table_args)
- dialect = mssql.dialect()
- gen = dialect.ddl_compiler(dialect,
- schema.CreateTable(money_table))
- for col in money_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col), '%s %s'
- % (col.name, columns[index][3]))
- self.assert_(repr(col))
-
- def test_binary(self):
- "Exercise type specification for binary types."
-
- columns = [
- # column type, args, kwargs, expected ddl
- (mssql.MSBinary, [], {},
- 'BINARY'),
- (mssql.MSBinary, [10], {},
- 'BINARY(10)'),
-
- (types.BINARY, [], {},
- 'BINARY'),
- (types.BINARY, [10], {},
- 'BINARY(10)'),
-
- (mssql.MSVarBinary, [], {},
- 'VARBINARY(max)'),
- (mssql.MSVarBinary, [10], {},
- 'VARBINARY(10)'),
-
- (types.VARBINARY, [10], {},
- 'VARBINARY(10)'),
- (types.VARBINARY, [], {},
- 'VARBINARY(max)'),
-
- (mssql.MSImage, [], {},
- 'IMAGE'),
-
- (mssql.IMAGE, [], {},
- 'IMAGE'),
-
- (types.LargeBinary, [], {},
- 'IMAGE'),
- ]
-
- metadata = MetaData()
- table_args = ['test_mssql_binary', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(Column('c%s' % index, type_(*args, **kw),
- nullable=None))
- binary_table = Table(*table_args)
- dialect = mssql.dialect()
- gen = dialect.ddl_compiler(dialect,
- schema.CreateTable(binary_table))
- for col in binary_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col), '%s %s'
- % (col.name, columns[index][3]))
- self.assert_(repr(col))
-
-class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables):
- __only_on__ = 'mssql'
-
- @classmethod
- def setup_class(cls):
- global metadata
- metadata = MetaData(testing.db)
-
- def teardown(self):
- metadata.drop_all()
-
- @testing.fails_on_everything_except('mssql+pyodbc',
- 'this is some pyodbc-specific feature')
- def test_decimal_notation(self):
- numeric_table = Table('numeric_table', metadata, Column('id',
- Integer, Sequence('numeric_id_seq',
- optional=True), primary_key=True),
- Column('numericcol',
- Numeric(precision=38, scale=20,
- asdecimal=True)))
- metadata.create_all()
- test_items = [decimal.Decimal(d) for d in (
- '1500000.00000000000000000000',
- '-1500000.00000000000000000000',
- '1500000',
- '0.0000000000000000002',
- '0.2',
- '-0.0000000000000000002',
- '-2E-2',
- '156666.458923543',
- '-156666.458923543',
- '1',
- '-1',
- '-1234',
- '1234',
- '2E-12',
- '4E8',
- '3E-6',
- '3E-7',
- '4.1',
- '1E-1',
- '1E-2',
- '1E-3',
- '1E-4',
- '1E-5',
- '1E-6',
- '1E-7',
- '1E-1',
- '1E-8',
- '0.2732E2',
- '-0.2432E2',
- '4.35656E2',
- '-02452E-2',
- '45125E-2',
- '1234.58965E-2',
- '1.521E+15',
- '-1E-25',
- '1E-25',
- '1254E-25',
- '-1203E-25',
- '0',
- '-0.00',
- '-0',
- '4585E12',
- '000000000000000000012',
- '000000000000.32E12',
- '00000000000000.1E+12',
- '000000000000.2E-32',
- )]
-
- for value in test_items:
- numeric_table.insert().execute(numericcol=value)
-
- for value in select([numeric_table.c.numericcol]).execute():
- assert value[0] in test_items, "%r not in test_items" % value[0]
-
- def test_float(self):
- float_table = Table('float_table', metadata, Column('id',
- Integer, Sequence('numeric_id_seq',
- optional=True), primary_key=True),
- Column('floatcol', Float()))
- metadata.create_all()
- try:
- test_items = [float(d) for d in (
- '1500000.00000000000000000000',
- '-1500000.00000000000000000000',
- '1500000',
- '0.0000000000000000002',
- '0.2',
- '-0.0000000000000000002',
- '156666.458923543',
- '-156666.458923543',
- '1',
- '-1',
- '1234',
- '2E-12',
- '4E8',
- '3E-6',
- '3E-7',
- '4.1',
- '1E-1',
- '1E-2',
- '1E-3',
- '1E-4',
- '1E-5',
- '1E-6',
- '1E-7',
- '1E-8',
- )]
- for value in test_items:
- float_table.insert().execute(floatcol=value)
- except Exception as e:
- raise e
-
-
- # todo this should suppress warnings, but it does not
- @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
- def test_dates(self):
- "Exercise type specification for date types."
-
- columns = [
- # column type, args, kwargs, expected ddl
- (mssql.MSDateTime, [], {},
- 'DATETIME', []),
-
- (types.DATE, [], {},
- 'DATE', ['>=', (10,)]),
- (types.Date, [], {},
- 'DATE', ['>=', (10,)]),
- (types.Date, [], {},
- 'DATETIME', ['<', (10,)], mssql.MSDateTime),
- (mssql.MSDate, [], {},
- 'DATE', ['>=', (10,)]),
- (mssql.MSDate, [], {},
- 'DATETIME', ['<', (10,)], mssql.MSDateTime),
-
- (types.TIME, [], {},
- 'TIME', ['>=', (10,)]),
- (types.Time, [], {},
- 'TIME', ['>=', (10,)]),
- (mssql.MSTime, [], {},
- 'TIME', ['>=', (10,)]),
- (mssql.MSTime, [1], {},
- 'TIME(1)', ['>=', (10,)]),
- (types.Time, [], {},
- 'DATETIME', ['<', (10,)], mssql.MSDateTime),
- (mssql.MSTime, [], {},
- 'TIME', ['>=', (10,)]),
-
- (mssql.MSSmallDateTime, [], {},
- 'SMALLDATETIME', []),
-
- (mssql.MSDateTimeOffset, [], {},
- 'DATETIMEOFFSET', ['>=', (10,)]),
- (mssql.MSDateTimeOffset, [1], {},
- 'DATETIMEOFFSET(1)', ['>=', (10,)]),
-
- (mssql.MSDateTime2, [], {},
- 'DATETIME2', ['>=', (10,)]),
- (mssql.MSDateTime2, [1], {},
- 'DATETIME2(1)', ['>=', (10,)]),
-
- ]
-
- table_args = ['test_mssql_dates', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res, requires = spec[0:5]
- if requires and testing._is_excluded('mssql', *requires) \
- or not requires:
- c = Column('c%s' % index, type_(*args,
- **kw), nullable=None)
- testing.db.dialect.type_descriptor(c.type)
- table_args.append(c)
- dates_table = Table(*table_args)
- gen = testing.db.dialect.ddl_compiler(testing.db.dialect,
- schema.CreateTable(dates_table))
- for col in dates_table.c:
- index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col), '%s %s'
- % (col.name, columns[index][3]))
- self.assert_(repr(col))
- dates_table.create(checkfirst=True)
- reflected_dates = Table('test_mssql_dates',
- MetaData(testing.db), autoload=True)
- for col in reflected_dates.c:
- self.assert_types_base(col, dates_table.c[col.key])
-
- def test_date_roundtrip(self):
- t = Table('test_dates', metadata,
- Column('id', Integer,
- Sequence('datetest_id_seq', optional=True),
- primary_key=True),
- Column('adate', Date),
- Column('atime', Time),
- Column('adatetime', DateTime))
- metadata.create_all()
- d1 = datetime.date(2007, 10, 30)
- t1 = datetime.time(11, 2, 32)
- d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
- t.insert().execute(adate=d2, adatetime=d2, atime=d2)
-
- x = t.select().execute().fetchall()[0]
- self.assert_(x.adate.__class__ == datetime.date)
- self.assert_(x.atime.__class__ == datetime.time)
- self.assert_(x.adatetime.__class__ == datetime.datetime)
-
- t.delete().execute()
-
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
-
- eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate
- == d1).execute().fetchall(), [(d1, t1, d2)])
-
- @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
- @testing.provide_metadata
- def test_binary_reflection(self):
- "Exercise type specification for binary types."
-
- columns = [
- # column type, args, kwargs, expected ddl
- (mssql.MSBinary, [], {},
- 'BINARY'),
- (mssql.MSBinary, [10], {},
- 'BINARY(10)'),
-
- (types.BINARY, [], {},
- 'BINARY'),
- (types.BINARY, [10], {},
- 'BINARY(10)'),
-
- (mssql.MSVarBinary, [], {},
- 'VARBINARY(max)'),
- (mssql.MSVarBinary, [10], {},
- 'VARBINARY(10)'),
-
- (types.VARBINARY, [10], {},
- 'VARBINARY(10)'),
- (types.VARBINARY, [], {},
- 'VARBINARY(max)'),
-
- (mssql.MSImage, [], {},
- 'IMAGE'),
-
- (mssql.IMAGE, [], {},
- 'IMAGE'),
-
- (types.LargeBinary, [], {},
- 'IMAGE'),
- ]
-
- metadata = self.metadata
- table_args = ['test_mssql_binary', metadata]
- for index, spec in enumerate(columns):
- type_, args, kw, res = spec
- table_args.append(Column('c%s' % index, type_(*args, **kw),
- nullable=None))
- binary_table = Table(*table_args)
- metadata.create_all()
- reflected_binary = Table('test_mssql_binary',
- MetaData(testing.db), autoload=True)
- for col in reflected_binary.c:
- c1 = testing.db.dialect.type_descriptor(col.type).__class__
- c2 = \
- testing.db.dialect.type_descriptor(
- binary_table.c[col.name].type).__class__
- assert issubclass(c1, c2), '%r is not a subclass of %r' \
- % (c1, c2)
- if binary_table.c[col.name].type.length:
- testing.eq_(col.type.length,
- binary_table.c[col.name].type.length)
-
-
- def test_autoincrement(self):
- Table('ai_1', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_2', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_3', metadata,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_y', Integer, primary_key=True))
- Table('ai_4', metadata,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_n2', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_5', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_6', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_7', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_8', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True))
- metadata.create_all()
-
- table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
- 'ai_5', 'ai_6', 'ai_7', 'ai_8']
- mr = MetaData(testing.db)
-
- for name in table_names:
- tbl = Table(name, mr, autoload=True)
- tbl = metadata.tables[name]
- for c in tbl.c:
- if c.name.startswith('int_y'):
- assert c.autoincrement, name
- assert tbl._autoincrement_column is c, name
- elif c.name.startswith('int_n'):
- assert not c.autoincrement, name
- assert tbl._autoincrement_column is not c, name
-
- # mxodbc can't handle scope_identity() with DEFAULT VALUES
-
- if testing.db.driver == 'mxodbc':
- eng = \
- [engines.testing_engine(options={'implicit_returning'
- : True})]
- else:
- eng = \
- [engines.testing_engine(options={'implicit_returning'
- : False}),
- engines.testing_engine(options={'implicit_returning'
- : True})]
-
- for counter, engine in enumerate(eng):
- engine.execute(tbl.insert())
- if 'int_y' in tbl.c:
- assert engine.scalar(select([tbl.c.int_y])) \
- == counter + 1
- assert list(engine.execute(tbl.select()).first()).\
- count(counter + 1) == 1
- else:
- assert 1 \
- not in list(engine.execute(tbl.select()).first())
- engine.execute(tbl.delete())
-
-class MonkeyPatchedBinaryTest(fixtures.TestBase):
- __only_on__ = 'mssql+pymssql'
-
- def test_unicode(self):
- module = __import__('pymssql')
- result = module.Binary('foo')
- eq_(result, 'foo')
-
- def test_bytes(self):
- module = __import__('pymssql')
- input = b('\x80\x03]q\x00X\x03\x00\x00\x00oneq\x01a.')
- expected_result = input
- result = module.Binary(input)
- eq_(result, expected_result)
-
-class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
- """Test the Binary and VarBinary types"""
-
- __only_on__ = 'mssql'
-
- @classmethod
- def setup_class(cls):
- global binary_table, MyPickleType
-
- class MyPickleType(types.TypeDecorator):
- impl = PickleType
-
- def process_bind_param(self, value, dialect):
- if value:
- value.stuff = 'this is modified stuff'
- return value
-
- def process_result_value(self, value, dialect):
- if value:
- value.stuff = 'this is the right stuff'
- return value
-
- binary_table = Table(
- 'binary_table',
- MetaData(testing.db),
- Column('primary_id', Integer, Sequence('binary_id_seq',
- optional=True), primary_key=True),
- Column('data', mssql.MSVarBinary(8000)),
- Column('data_image', mssql.MSImage),
- Column('data_slice', types.BINARY(100)),
- Column('misc', String(30)),
- Column('pickled', PickleType),
- Column('mypickle', MyPickleType),
- )
- binary_table.create()
-
- def teardown(self):
- binary_table.delete().execute()
-
- @classmethod
- def teardown_class(cls):
- binary_table.drop()
-
- def test_binary(self):
- testobj1 = pickleable.Foo('im foo 1')
- testobj2 = pickleable.Foo('im foo 2')
- testobj3 = pickleable.Foo('im foo 3')
- stream1 = self.load_stream('binary_data_one.dat')
- stream2 = self.load_stream('binary_data_two.dat')
- binary_table.insert().execute(
- primary_id=1,
- misc='binary_data_one.dat',
- data=stream1,
- data_image=stream1,
- data_slice=stream1[0:100],
- pickled=testobj1,
- mypickle=testobj3,
- )
- binary_table.insert().execute(
- primary_id=2,
- misc='binary_data_two.dat',
- data=stream2,
- data_image=stream2,
- data_slice=stream2[0:99],
- pickled=testobj2,
- )
-
- # TODO: pyodbc does not seem to accept "None" for a VARBINARY
- # column (data=None). error: [Microsoft][ODBC SQL Server
- # Driver][SQL Server]Implicit conversion from data type varchar
- # to varbinary is not allowed. Use the CONVERT function to run
- # this query. (257) binary_table.insert().execute(primary_id=3,
- # misc='binary_data_two.dat', data=None, data_image=None,
- # data_slice=stream2[0:99], pickled=None)
-
- binary_table.insert().execute(primary_id=3,
- misc='binary_data_two.dat', data_image=None,
- data_slice=stream2[0:99], pickled=None)
- for stmt in \
- binary_table.select(order_by=binary_table.c.primary_id), \
- text('select * from binary_table order by '
- 'binary_table.primary_id',
- typemap=dict(data=mssql.MSVarBinary(8000),
- data_image=mssql.MSImage,
- data_slice=types.BINARY(100), pickled=PickleType,
- mypickle=MyPickleType), bind=testing.db):
- l = stmt.execute().fetchall()
- eq_(list(stream1), list(l[0]['data']))
- paddedstream = list(stream1[0:100])
- paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
- eq_(paddedstream, list(l[0]['data_slice']))
- eq_(list(stream2), list(l[1]['data']))
- eq_(list(stream2), list(l[1]['data_image']))
- eq_(testobj1, l[0]['pickled'])
- eq_(testobj2, l[1]['pickled'])
- eq_(testobj3.moredata, l[0]['mypickle'].moredata)
- eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
-
- def load_stream(self, name, len=3000):
- fp = open(os.path.join(os.path.dirname(__file__), "..", name), 'rb')
- stream = fp.read(len)
- fp.close()
- return stream
-
-from sqlalchemy.dialects.mssql.information_schema import CoerceUnicode, tables
-from sqlalchemy.dialects.mssql import base
-
-class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL):
- def test_info_unicode_coercion(self):
-
- dialect = mssql.dialect()
- value = CoerceUnicode().bind_processor(dialect)('a string')
- assert isinstance(value, util.text_type)
-
- def test_info_unicode_cast_no_2000(self):
- dialect = mssql.dialect()
- dialect.server_version_info = base.MS_2000_VERSION
- stmt = tables.c.table_name == 'somename'
- self.assert_compile(
- stmt,
- "[TABLES_1].[TABLE_NAME] = :TABLE_NAME_1",
- dialect=dialect
- )
-
- def test_info_unicode_cast(self):
- dialect = mssql.dialect()
- dialect.server_version_info = base.MS_2005_VERSION
- stmt = tables.c.table_name == 'somename'
- self.assert_compile(
- stmt,
- "[TABLES_1].[TABLE_NAME] = CAST(:TABLE_NAME_1 AS NVARCHAR(max))",
- dialect=dialect
- )
-
-class ReflectHugeViewTest(fixtures.TestBase):
- __only_on__ = 'mssql'
-
- def setup(self):
- self.col_num = 150
-
- self.metadata = MetaData(testing.db)
- t = Table('base_table', self.metadata,
- *[
- Column("long_named_column_number_%d" % i, Integer)
- for i in range(self.col_num)
- ]
- )
- self.view_str = view_str = \
- "CREATE VIEW huge_named_view AS SELECT %s FROM base_table" % (
- ",".join("long_named_column_number_%d" % i
- for i in range(self.col_num))
- )
- assert len(view_str) > 4000
-
- event.listen(t, 'after_create', DDL(view_str) )
- event.listen(t, 'before_drop', DDL("DROP VIEW huge_named_view") )
-
- self.metadata.create_all()
-
- def teardown(self):
- self.metadata.drop_all()
-
- def test_inspect_view_definition(self):
- inspector = Inspector.from_engine(testing.db)
- view_def = inspector.get_view_definition("huge_named_view")
- eq_(view_def, self.view_str)
-