summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/databases/mssql.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
commit8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch)
treeae9e27d12c9fbf8297bb90469509e1cb6a206242 /lib/sqlalchemy/databases/mssql.py
parent7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff)
downloadsqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz
merge 0.6 series to trunk.
Diffstat (limited to 'lib/sqlalchemy/databases/mssql.py')
-rw-r--r--lib/sqlalchemy/databases/mssql.py1771
1 files changed, 0 insertions, 1771 deletions
diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py
deleted file mode 100644
index d963b7477..000000000
--- a/lib/sqlalchemy/databases/mssql.py
+++ /dev/null
@@ -1,1771 +0,0 @@
-# mssql.py
-
-"""Support for the Microsoft SQL Server database.
-
-Driver
-------
-
-The MSSQL dialect will work with three different available drivers:
-
-* *pyodbc* - http://pyodbc.sourceforge.net/. This is the recommeded
- driver.
-
-* *pymssql* - http://pymssql.sourceforge.net/
-
-* *adodbapi* - http://adodbapi.sourceforge.net/
-
-Drivers are loaded in the order listed above based on availability.
-
-If you need to load a specific driver pass ``module_name`` when
-creating the engine::
-
- engine = create_engine('mssql://dsn', module_name='pymssql')
-
-``module_name`` currently accepts: ``pyodbc``, ``pymssql``, and
-``adodbapi``.
-
-Currently the pyodbc driver offers the greatest level of
-compatibility.
-
-Connecting
-----------
-
-Connecting with create_engine() uses the standard URL approach of
-``mssql://user:pass@host/dbname[?key=value&key=value...]``.
-
-If the database name is present, the tokens are converted to a
-connection string with the specified values. If the database is not
-present, then the host token is taken directly as the DSN name.
-
-Examples of pyodbc connection string URLs:
-
-* *mssql://mydsn* - connects using the specified DSN named ``mydsn``.
- The connection string that is created will appear like::
-
- dsn=mydsn;TrustedConnection=Yes
-
-* *mssql://user:pass@mydsn* - connects using the DSN named
- ``mydsn`` passing in the ``UID`` and ``PWD`` information. The
- connection string that is created will appear like::
-
- dsn=mydsn;UID=user;PWD=pass
-
-* *mssql://user:pass@mydsn/?LANGUAGE=us_english* - connects
- using the DSN named ``mydsn`` passing in the ``UID`` and ``PWD``
- information, plus the additional connection configuration option
- ``LANGUAGE``. The connection string that is created will appear
- like::
-
- dsn=mydsn;UID=user;PWD=pass;LANGUAGE=us_english
-
-* *mssql://user:pass@host/db* - connects using a connection string
- dynamically created that would appear like::
-
- DRIVER={SQL Server};Server=host;Database=db;UID=user;PWD=pass
-
-* *mssql://user:pass@host:123/db* - connects using a connection
- string that is dynamically created, which also includes the port
- information using the comma syntax. If your connection string
- requires the port information to be passed as a ``port`` keyword
- see the next example. This will create the following connection
- string::
-
- DRIVER={SQL Server};Server=host,123;Database=db;UID=user;PWD=pass
-
-* *mssql://user:pass@host/db?port=123* - connects using a connection
- string that is dynamically created that includes the port
- information as a separate ``port`` keyword. This will create the
- following connection string::
-
- DRIVER={SQL Server};Server=host;Database=db;UID=user;PWD=pass;port=123
-
-If you require a connection string that is outside the options
-presented above, use the ``odbc_connect`` keyword to pass in a
-urlencoded connection string. What gets passed in will be urldecoded
-and passed directly.
-
-For example::
-
- mssql:///?odbc_connect=dsn%3Dmydsn%3BDatabase%3Ddb
-
-would create the following connection string::
-
- dsn=mydsn;Database=db
-
-Encoding your connection string can be easily accomplished through
-the python shell. For example::
-
- >>> import urllib
- >>> urllib.quote_plus('dsn=mydsn;Database=db')
- 'dsn%3Dmydsn%3BDatabase%3Ddb'
-
-Additional arguments which may be specified either as query string
-arguments on the URL, or as keyword argument to
-:func:`~sqlalchemy.create_engine()` are:
-
-* *auto_identity_insert* - enables support for IDENTITY inserts by
- automatically turning IDENTITY INSERT ON and OFF as required.
- Defaults to ``True``.
-
-* *query_timeout* - allows you to override the default query timeout.
- Defaults to ``None``. This is only supported on pymssql.
-
-* *text_as_varchar* - if enabled this will treat all TEXT column
- types as their equivalent VARCHAR(max) type. This is often used if
- you need to compare a VARCHAR to a TEXT field, which is not
- supported directly on MSSQL. Defaults to ``False``.
-
-* *use_scope_identity* - allows you to specify that SCOPE_IDENTITY
- should be used in place of the non-scoped version @@IDENTITY.
- Defaults to ``False``. On pymssql this defaults to ``True``, and on
- pyodbc this defaults to ``True`` if the version of pyodbc being
- used supports it.
-
-* *has_window_funcs* - indicates whether or not window functions
- (LIMIT and OFFSET) are supported on the version of MSSQL being
- used. If you're running MSSQL 2005 or later turn this on to get
- OFFSET support. Defaults to ``False``.
-
-* *max_identifier_length* - allows you to se the maximum length of
- identfiers supported by the database. Defaults to 128. For pymssql
- the default is 30.
-
-* *schema_name* - use to set the schema name. Defaults to ``dbo``.
-
-Auto Increment Behavior
------------------------
-
-``IDENTITY`` columns are supported by using SQLAlchemy
-``schema.Sequence()`` objects. In other words::
-
- Table('test', mss_engine,
- Column('id', Integer,
- Sequence('blah',100,10), primary_key=True),
- Column('name', String(20))
- ).create()
-
-would yield::
-
- CREATE TABLE test (
- id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
- name VARCHAR(20) NULL,
- )
-
-Note that the ``start`` and ``increment`` values for sequences are
-optional and will default to 1,1.
-
-* Support for ``SET IDENTITY_INSERT ON`` mode (automagic on / off for
- ``INSERT`` s)
-
-* Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on
- ``INSERT``
-
-Collation Support
------------------
-
-MSSQL specific string types support a collation parameter that
-creates a column-level specific collation for the column. The
-collation parameter accepts a Windows Collation Name or a SQL
-Collation Name. Supported types are MSChar, MSNChar, MSString,
-MSNVarchar, MSText, and MSNText. For example::
-
- Column('login', String(32, collation='Latin1_General_CI_AS'))
-
-will yield::
-
- login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
-
-LIMIT/OFFSET Support
---------------------
-
-MSSQL has no support for the LIMIT or OFFSET keysowrds. LIMIT is
-supported directly through the ``TOP`` Transact SQL keyword::
-
- select.limit
-
-will yield::
-
- SELECT TOP n
-
-If the ``has_window_funcs`` flag is set then LIMIT with OFFSET
-support is available through the ``ROW_NUMBER OVER`` construct. This
-construct requires an ``ORDER BY`` to be specified as well and is
-only available on MSSQL 2005 and later.
-
-Nullability
------------
-MSSQL has support for three levels of column nullability. The default
-nullability allows nulls and is explicit in the CREATE TABLE
-construct::
-
- name VARCHAR(20) NULL
-
-If ``nullable=None`` is specified then no specification is made. In
-other words the database's configured default is used. This will
-render::
-
- name VARCHAR(20)
-
-If ``nullable`` is ``True`` or ``False`` then the column will be
-``NULL` or ``NOT NULL`` respectively.
-
-Date / Time Handling
---------------------
-For MSSQL versions that support the ``DATE`` and ``TIME`` types
-(MSSQL 2008+) the data type is used. For versions that do not
-support the ``DATE`` and ``TIME`` types a ``DATETIME`` type is used
-instead and the MSSQL dialect handles converting the results
-properly. This means ``Date()`` and ``Time()`` are fully supported
-on all versions of MSSQL. If you do not desire this behavior then
-do not use the ``Date()`` or ``Time()`` types.
-
-Compatibility Levels
---------------------
-MSSQL supports the notion of setting compatibility levels at the
-database level. This allows, for instance, to run a database that
-is compatibile with SQL2000 while running on a SQL2005 database
-server. ``server_version_info`` will always retrun the database
-server version information (in this case SQL2005) and not the
-compatibiility level information. Because of this, if running under
-a backwards compatibility mode SQAlchemy may attempt to use T-SQL
-statements that are unable to be parsed by the database server.
-
-Known Issues
-------------
-
-* No support for more than one ``IDENTITY`` column per table
-
-* pymssql has problems with binary and unicode data that this module
- does **not** work around
-
-"""
-import datetime, decimal, inspect, operator, re, sys, urllib
-
-from sqlalchemy import sql, schema, exc, util
-from sqlalchemy import Table, MetaData, Column, ForeignKey, String, Integer
-from sqlalchemy.sql import select, compiler, expression, operators as sql_operators, functions as sql_functions
-from sqlalchemy.engine import default, base
-from sqlalchemy import types as sqltypes
-from decimal import Decimal as _python_Decimal
-
-
-RESERVED_WORDS = set(
- ['add', 'all', 'alter', 'and', 'any', 'as', 'asc', 'authorization',
- 'backup', 'begin', 'between', 'break', 'browse', 'bulk', 'by', 'cascade',
- 'case', 'check', 'checkpoint', 'close', 'clustered', 'coalesce',
- 'collate', 'column', 'commit', 'compute', 'constraint', 'contains',
- 'containstable', 'continue', 'convert', 'create', 'cross', 'current',
- 'current_date', 'current_time', 'current_timestamp', 'current_user',
- 'cursor', 'database', 'dbcc', 'deallocate', 'declare', 'default',
- 'delete', 'deny', 'desc', 'disk', 'distinct', 'distributed', 'double',
- 'drop', 'dump', 'else', 'end', 'errlvl', 'escape', 'except', 'exec',
- 'execute', 'exists', 'exit', 'external', 'fetch', 'file', 'fillfactor',
- 'for', 'foreign', 'freetext', 'freetexttable', 'from', 'full',
- 'function', 'goto', 'grant', 'group', 'having', 'holdlock', 'identity',
- 'identity_insert', 'identitycol', 'if', 'in', 'index', 'inner', 'insert',
- 'intersect', 'into', 'is', 'join', 'key', 'kill', 'left', 'like',
- 'lineno', 'load', 'merge', 'national', 'nocheck', 'nonclustered', 'not',
- 'null', 'nullif', 'of', 'off', 'offsets', 'on', 'open', 'opendatasource',
- 'openquery', 'openrowset', 'openxml', 'option', 'or', 'order', 'outer',
- 'over', 'percent', 'pivot', 'plan', 'precision', 'primary', 'print',
- 'proc', 'procedure', 'public', 'raiserror', 'read', 'readtext',
- 'reconfigure', 'references', 'replication', 'restore', 'restrict',
- 'return', 'revert', 'revoke', 'right', 'rollback', 'rowcount',
- 'rowguidcol', 'rule', 'save', 'schema', 'securityaudit', 'select',
- 'session_user', 'set', 'setuser', 'shutdown', 'some', 'statistics',
- 'system_user', 'table', 'tablesample', 'textsize', 'then', 'to', 'top',
- 'tran', 'transaction', 'trigger', 'truncate', 'tsequal', 'union',
- 'unique', 'unpivot', 'update', 'updatetext', 'use', 'user', 'values',
- 'varying', 'view', 'waitfor', 'when', 'where', 'while', 'with',
- 'writetext',
- ])
-
-
-class _StringType(object):
- """Base for MSSQL string types."""
-
- def __init__(self, collation=None, **kwargs):
- self.collation = kwargs.get('collate', collation)
-
- def _extend(self, spec):
- """Extend a string-type declaration with standard SQL
- COLLATE annotations.
- """
-
- if self.collation:
- collation = 'COLLATE %s' % self.collation
- else:
- collation = None
-
- return ' '.join([c for c in (spec, collation)
- if c is not None])
-
- def __repr__(self):
- attributes = inspect.getargspec(self.__init__)[0][1:]
- attributes.extend(inspect.getargspec(_StringType.__init__)[0][1:])
-
- params = {}
- for attr in attributes:
- val = getattr(self, attr)
- if val is not None and val is not False:
- params[attr] = val
-
- return "%s(%s)" % (self.__class__.__name__,
- ', '.join(['%s=%r' % (k, params[k]) for k in params]))
-
- def bind_processor(self, dialect):
- if self.convert_unicode or dialect.convert_unicode:
- if self.assert_unicode is None:
- assert_unicode = dialect.assert_unicode
- else:
- assert_unicode = self.assert_unicode
-
- if not assert_unicode:
- return None
-
- def process(value):
- if not isinstance(value, (unicode, sqltypes.NoneType)):
- if assert_unicode == 'warn':
- util.warn("Unicode type received non-unicode bind "
- "param value %r" % value)
- return value
- else:
- raise exc.InvalidRequestError("Unicode type received non-unicode bind param value %r" % value)
- else:
- return value
- return process
- else:
- return None
-
-
-class MSNumeric(sqltypes.Numeric):
- def result_processor(self, dialect):
- if self.asdecimal:
- def process(value):
- if value is not None:
- return _python_Decimal(str(value))
- else:
- return value
- return process
- else:
- def process(value):
- return float(value)
- return process
-
- def bind_processor(self, dialect):
- def process(value):
- if value is None:
- # Not sure that this exception is needed
- return value
-
- elif isinstance(value, decimal.Decimal):
- if value.adjusted() < 0:
- result = "%s0.%s%s" % (
- (value < 0 and '-' or ''),
- '0' * (abs(value.adjusted()) - 1),
- "".join([str(nint) for nint in value._int]))
-
- else:
- if 'E' in str(value):
- result = "%s%s%s" % (
- (value < 0 and '-' or ''),
- "".join([str(s) for s in value._int]),
- "0" * (value.adjusted() - (len(value._int)-1)))
- else:
- if (len(value._int) - 1) > value.adjusted():
- result = "%s%s.%s" % (
- (value < 0 and '-' or ''),
- "".join([str(s) for s in value._int][0:value.adjusted() + 1]),
- "".join([str(s) for s in value._int][value.adjusted() + 1:]))
- else:
- result = "%s%s" % (
- (value < 0 and '-' or ''),
- "".join([str(s) for s in value._int][0:value.adjusted() + 1]))
-
- return result
-
- else:
- return value
-
- return process
-
- def get_col_spec(self):
- if self.precision is None:
- return "NUMERIC"
- else:
- return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale}
-
-
-class MSFloat(sqltypes.Float):
- def get_col_spec(self):
- if self.precision is None:
- return "FLOAT"
- else:
- return "FLOAT(%(precision)s)" % {'precision': self.precision}
-
-
-class MSReal(MSFloat):
- """A type for ``real`` numbers."""
-
- def __init__(self):
- """
- Construct a Real.
-
- """
- super(MSReal, self).__init__(precision=24)
-
- def adapt(self, impltype):
- return impltype()
-
- def get_col_spec(self):
- return "REAL"
-
-
-class MSInteger(sqltypes.Integer):
- def get_col_spec(self):
- return "INTEGER"
-
-
-class MSBigInteger(MSInteger):
- def get_col_spec(self):
- return "BIGINT"
-
-
-class MSTinyInteger(MSInteger):
- def get_col_spec(self):
- return "TINYINT"
-
-
-class MSSmallInteger(MSInteger):
- def get_col_spec(self):
- return "SMALLINT"
-
-
-class _DateTimeType(object):
- """Base for MSSQL datetime types."""
-
- def bind_processor(self, dialect):
- # if we receive just a date we can manipulate it
- # into a datetime since the db-api may not do this.
- def process(value):
- if type(value) is datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- return value
- return process
-
-
-class MSDateTime(_DateTimeType, sqltypes.DateTime):
- def get_col_spec(self):
- return "DATETIME"
-
-
-class MSDate(sqltypes.Date):
- def get_col_spec(self):
- return "DATE"
-
-
-class MSTime(sqltypes.Time):
- def __init__(self, precision=None, **kwargs):
- self.precision = precision
- super(MSTime, self).__init__()
-
- def get_col_spec(self):
- if self.precision:
- return "TIME(%s)" % self.precision
- else:
- return "TIME"
-
-
-class MSSmallDateTime(_DateTimeType, sqltypes.TypeEngine):
- def get_col_spec(self):
- return "SMALLDATETIME"
-
-
-class MSDateTime2(_DateTimeType, sqltypes.TypeEngine):
- def __init__(self, precision=None, **kwargs):
- self.precision = precision
-
- def get_col_spec(self):
- if self.precision:
- return "DATETIME2(%s)" % self.precision
- else:
- return "DATETIME2"
-
-
-class MSDateTimeOffset(_DateTimeType, sqltypes.TypeEngine):
- def __init__(self, precision=None, **kwargs):
- self.precision = precision
-
- def get_col_spec(self):
- if self.precision:
- return "DATETIMEOFFSET(%s)" % self.precision
- else:
- return "DATETIMEOFFSET"
-
-
-class MSDateTimeAsDate(_DateTimeType, MSDate):
- """ This is an implementation of the Date type for versions of MSSQL that
- do not support that specific type. In order to make it work a ``DATETIME``
- column specification is used and the results get converted back to just
- the date portion.
-
- """
-
- def get_col_spec(self):
- return "DATETIME"
-
- def result_processor(self, dialect):
- def process(value):
- # If the DBAPI returns the value as datetime.datetime(), truncate
- # it back to datetime.date()
- if type(value) is datetime.datetime:
- return value.date()
- return value
- return process
-
-
-class MSDateTimeAsTime(MSTime):
- """ This is an implementation of the Time type for versions of MSSQL that
- do not support that specific type. In order to make it work a ``DATETIME``
- column specification is used and the results get converted back to just
- the time portion.
-
- """
-
- __zero_date = datetime.date(1900, 1, 1)
-
- def get_col_spec(self):
- return "DATETIME"
-
- def bind_processor(self, dialect):
- def process(value):
- if type(value) is datetime.datetime:
- value = datetime.datetime.combine(self.__zero_date, value.time())
- elif type(value) is datetime.time:
- value = datetime.datetime.combine(self.__zero_date, value)
- return value
- return process
-
- def result_processor(self, dialect):
- def process(value):
- if type(value) is datetime.datetime:
- return value.time()
- elif type(value) is datetime.date:
- return datetime.time(0, 0, 0)
- return value
- return process
-
-
-class MSDateTime_adodbapi(MSDateTime):
- def result_processor(self, dialect):
- def process(value):
- # adodbapi will return datetimes with empty time values as datetime.date() objects.
- # Promote them back to full datetime.datetime()
- if type(value) is datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- return value
- return process
-
-
-class MSText(_StringType, sqltypes.Text):
- """MSSQL TEXT type, for variable-length text up to 2^31 characters."""
-
- def __init__(self, *args, **kwargs):
- """Construct a TEXT.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.Text.__init__(self, None,
- convert_unicode=kwargs.get('convert_unicode', False),
- assert_unicode=kwargs.get('assert_unicode', None))
-
- def get_col_spec(self):
- if self.dialect.text_as_varchar:
- return self._extend("VARCHAR(max)")
- else:
- return self._extend("TEXT")
-
-
-class MSNText(_StringType, sqltypes.UnicodeText):
- """MSSQL NTEXT type, for variable-length unicode text up to 2^30
- characters."""
-
- def __init__(self, *args, **kwargs):
- """Construct a NTEXT.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.UnicodeText.__init__(self, None,
- convert_unicode=kwargs.get('convert_unicode', True),
- assert_unicode=kwargs.get('assert_unicode', 'warn'))
-
- def get_col_spec(self):
- if self.dialect.text_as_varchar:
- return self._extend("NVARCHAR(max)")
- else:
- return self._extend("NTEXT")
-
-
-class MSString(_StringType, sqltypes.String):
- """MSSQL VARCHAR type, for variable-length non-Unicode data with a maximum
- of 8,000 characters."""
-
- def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
- """Construct a VARCHAR.
-
- :param length: Optinal, maximum data length, in characters.
-
- :param convert_unicode: defaults to False. If True, convert
- ``unicode`` data sent to the database to a ``str``
- bytestring, and convert bytestrings coming back from the
- database into ``unicode``.
-
- Bytestrings are encoded using the dialect's
- :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
- defaults to `utf-8`.
-
- If False, may be overridden by
- :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
-
- :param assert_unicode:
-
- If None (the default), no assertion will take place unless
- overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
-
- If 'warn', will issue a runtime warning if a ``str``
- instance is used as a bind value.
-
- If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.String.__init__(self, length=length,
- convert_unicode=convert_unicode,
- assert_unicode=assert_unicode)
-
- def get_col_spec(self):
- if self.length:
- return self._extend("VARCHAR(%s)" % self.length)
- else:
- return self._extend("VARCHAR")
-
-
-class MSNVarchar(_StringType, sqltypes.Unicode):
- """MSSQL NVARCHAR type.
-
- For variable-length unicode character data up to 4,000 characters."""
-
- def __init__(self, length=None, **kwargs):
- """Construct a NVARCHAR.
-
- :param length: Optional, Maximum data length, in characters.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.Unicode.__init__(self, length=length,
- convert_unicode=kwargs.get('convert_unicode', True),
- assert_unicode=kwargs.get('assert_unicode', 'warn'))
-
- def adapt(self, impltype):
- return impltype(length=self.length,
- convert_unicode=self.convert_unicode,
- assert_unicode=self.assert_unicode,
- collation=self.collation)
-
- def get_col_spec(self):
- if self.length:
- return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
- else:
- return self._extend("NVARCHAR")
-
-
-class MSChar(_StringType, sqltypes.CHAR):
- """MSSQL CHAR type, for fixed-length non-Unicode data with a maximum
- of 8,000 characters."""
-
- def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
- """Construct a CHAR.
-
- :param length: Optinal, maximum data length, in characters.
-
- :param convert_unicode: defaults to False. If True, convert
- ``unicode`` data sent to the database to a ``str``
- bytestring, and convert bytestrings coming back from the
- database into ``unicode``.
-
- Bytestrings are encoded using the dialect's
- :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
- defaults to `utf-8`.
-
- If False, may be overridden by
- :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
-
- :param assert_unicode:
-
- If None (the default), no assertion will take place unless
- overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
-
- If 'warn', will issue a runtime warning if a ``str``
- instance is used as a bind value.
-
- If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.CHAR.__init__(self, length=length,
- convert_unicode=convert_unicode,
- assert_unicode=assert_unicode)
-
- def get_col_spec(self):
- if self.length:
- return self._extend("CHAR(%s)" % self.length)
- else:
- return self._extend("CHAR")
-
-
-class MSNChar(_StringType, sqltypes.NCHAR):
- """MSSQL NCHAR type.
-
- For fixed-length unicode character data up to 4,000 characters."""
-
- def __init__(self, length=None, **kwargs):
- """Construct an NCHAR.
-
- :param length: Optional, Maximum data length, in characters.
-
- :param collation: Optional, a column-level collation for this string
- value. Accepts a Windows Collation Name or a SQL Collation Name.
-
- """
- _StringType.__init__(self, **kwargs)
- sqltypes.NCHAR.__init__(self, length=length,
- convert_unicode=kwargs.get('convert_unicode', True),
- assert_unicode=kwargs.get('assert_unicode', 'warn'))
-
- def get_col_spec(self):
- if self.length:
- return self._extend("NCHAR(%(length)s)" % {'length' : self.length})
- else:
- return self._extend("NCHAR")
-
-
-class MSGenericBinary(sqltypes.Binary):
- """The Binary type assumes that a Binary specification without a length
- is an unbound Binary type whereas one with a length specification results
- in a fixed length Binary type.
-
- If you want standard MSSQL ``BINARY`` behavior use the ``MSBinary`` type.
-
- """
-
- def get_col_spec(self):
- if self.length:
- return "BINARY(%s)" % self.length
- else:
- return "IMAGE"
-
-
-class MSBinary(MSGenericBinary):
- def get_col_spec(self):
- if self.length:
- return "BINARY(%s)" % self.length
- else:
- return "BINARY"
-
-
-class MSVarBinary(MSGenericBinary):
- def get_col_spec(self):
- if self.length:
- return "VARBINARY(%s)" % self.length
- else:
- return "VARBINARY"
-
-
-class MSImage(MSGenericBinary):
- def get_col_spec(self):
- return "IMAGE"
-
-
-class MSBoolean(sqltypes.Boolean):
- def get_col_spec(self):
- return "BIT"
-
- def result_processor(self, dialect):
- def process(value):
- if value is None:
- return None
- return value and True or False
- return process
-
- def bind_processor(self, dialect):
- def process(value):
- if value is True:
- return 1
- elif value is False:
- return 0
- elif value is None:
- return None
- else:
- return value and True or False
- return process
-
-
-class MSTimeStamp(sqltypes.TIMESTAMP):
- def get_col_spec(self):
- return "TIMESTAMP"
-
-
-class MSMoney(sqltypes.TypeEngine):
- def get_col_spec(self):
- return "MONEY"
-
-
-class MSSmallMoney(MSMoney):
- def get_col_spec(self):
- return "SMALLMONEY"
-
-
-class MSUniqueIdentifier(sqltypes.TypeEngine):
- def get_col_spec(self):
- return "UNIQUEIDENTIFIER"
-
-
-class MSVariant(sqltypes.TypeEngine):
- def get_col_spec(self):
- return "SQL_VARIANT"
-
-ischema = MetaData()
-
-schemata = Table("SCHEMATA", ischema,
- Column("CATALOG_NAME", String, key="catalog_name"),
- Column("SCHEMA_NAME", String, key="schema_name"),
- Column("SCHEMA_OWNER", String, key="schema_owner"),
- schema="INFORMATION_SCHEMA")
-
-tables = Table("TABLES", ischema,
- Column("TABLE_CATALOG", String, key="table_catalog"),
- Column("TABLE_SCHEMA", String, key="table_schema"),
- Column("TABLE_NAME", String, key="table_name"),
- Column("TABLE_TYPE", String, key="table_type"),
- schema="INFORMATION_SCHEMA")
-
-columns = Table("COLUMNS", ischema,
- Column("TABLE_SCHEMA", String, key="table_schema"),
- Column("TABLE_NAME", String, key="table_name"),
- Column("COLUMN_NAME", String, key="column_name"),
- Column("IS_NULLABLE", Integer, key="is_nullable"),
- Column("DATA_TYPE", String, key="data_type"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- Column("CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"),
- Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
- Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
- Column("COLUMN_DEFAULT", Integer, key="column_default"),
- Column("COLLATION_NAME", String, key="collation_name"),
- schema="INFORMATION_SCHEMA")
-
-constraints = Table("TABLE_CONSTRAINTS", ischema,
- Column("TABLE_SCHEMA", String, key="table_schema"),
- Column("TABLE_NAME", String, key="table_name"),
- Column("CONSTRAINT_NAME", String, key="constraint_name"),
- Column("CONSTRAINT_TYPE", String, key="constraint_type"),
- schema="INFORMATION_SCHEMA")
-
-column_constraints = Table("CONSTRAINT_COLUMN_USAGE", ischema,
- Column("TABLE_SCHEMA", String, key="table_schema"),
- Column("TABLE_NAME", String, key="table_name"),
- Column("COLUMN_NAME", String, key="column_name"),
- Column("CONSTRAINT_NAME", String, key="constraint_name"),
- schema="INFORMATION_SCHEMA")
-
-key_constraints = Table("KEY_COLUMN_USAGE", ischema,
- Column("TABLE_SCHEMA", String, key="table_schema"),
- Column("TABLE_NAME", String, key="table_name"),
- Column("COLUMN_NAME", String, key="column_name"),
- Column("CONSTRAINT_NAME", String, key="constraint_name"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- schema="INFORMATION_SCHEMA")
-
-ref_constraints = Table("REFERENTIAL_CONSTRAINTS", ischema,
- Column("CONSTRAINT_CATALOG", String, key="constraint_catalog"),
- Column("CONSTRAINT_SCHEMA", String, key="constraint_schema"),
- Column("CONSTRAINT_NAME", String, key="constraint_name"),
- Column("UNIQUE_CONSTRAINT_CATLOG", String, key="unique_constraint_catalog"),
- Column("UNIQUE_CONSTRAINT_SCHEMA", String, key="unique_constraint_schema"),
- Column("UNIQUE_CONSTRAINT_NAME", String, key="unique_constraint_name"),
- Column("MATCH_OPTION", String, key="match_option"),
- Column("UPDATE_RULE", String, key="update_rule"),
- Column("DELETE_RULE", String, key="delete_rule"),
- schema="INFORMATION_SCHEMA")
-
-def _has_implicit_sequence(column):
- return column.primary_key and \
- column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and \
- not column.foreign_keys and \
- (
- column.default is None or
- (
- isinstance(column.default, schema.Sequence) and
- column.default.optional)
- )
-
-def _table_sequence_column(tbl):
- if not hasattr(tbl, '_ms_has_sequence'):
- tbl._ms_has_sequence = None
- for column in tbl.c:
- if getattr(column, 'sequence', False) or _has_implicit_sequence(column):
- tbl._ms_has_sequence = column
- break
- return tbl._ms_has_sequence
-
-class MSSQLExecutionContext(default.DefaultExecutionContext):
- IINSERT = False
- HASIDENT = False
-
- def pre_exec(self):
- """Activate IDENTITY_INSERT if needed."""
-
- if self.compiled.isinsert:
- tbl = self.compiled.statement.table
- seq_column = _table_sequence_column(tbl)
- self.HASIDENT = bool(seq_column)
- if self.dialect.auto_identity_insert and self.HASIDENT:
- self.IINSERT = tbl._ms_has_sequence.key in self.compiled_parameters[0]
- else:
- self.IINSERT = False
-
- if self.IINSERT:
- self.cursor.execute("SET IDENTITY_INSERT %s ON" %
- self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
-
- def handle_dbapi_exception(self, e):
- if self.IINSERT:
- try:
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
- except:
- pass
-
- def post_exec(self):
- """Disable IDENTITY_INSERT if enabled."""
-
- if self.compiled.isinsert and not self.executemany and self.HASIDENT and not self.IINSERT:
- if not self._last_inserted_ids or self._last_inserted_ids[0] is None:
- if self.dialect.use_scope_identity:
- self.cursor.execute("SELECT scope_identity() AS lastrowid")
- else:
- self.cursor.execute("SELECT @@identity AS lastrowid")
- row = self.cursor.fetchone()
- self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:]
-
- if self.IINSERT:
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table))
-
-
-class MSSQLExecutionContext_pyodbc (MSSQLExecutionContext):
- def pre_exec(self):
- """where appropriate, issue "select scope_identity()" in the same statement"""
- super(MSSQLExecutionContext_pyodbc, self).pre_exec()
- if self.compiled.isinsert and self.HASIDENT and not self.IINSERT \
- and len(self.parameters) == 1 and self.dialect.use_scope_identity:
- self.statement += "; select scope_identity()"
-
- def post_exec(self):
- if self.HASIDENT and not self.IINSERT and self.dialect.use_scope_identity and not self.executemany:
- import pyodbc
- # Fetch the last inserted id from the manipulated statement
- # We may have to skip over a number of result sets with no data (due to triggers, etc.)
- while True:
- try:
- row = self.cursor.fetchone()
- break
- except pyodbc.Error, e:
- self.cursor.nextset()
- self._last_inserted_ids = [int(row[0])]
- else:
- super(MSSQLExecutionContext_pyodbc, self).post_exec()
-
-class MSSQLDialect(default.DefaultDialect):
- name = 'mssql'
- supports_default_values = True
- supports_empty_insert = False
- auto_identity_insert = True
- execution_ctx_cls = MSSQLExecutionContext
- text_as_varchar = False
- use_scope_identity = False
- has_window_funcs = False
- max_identifier_length = 128
- schema_name = "dbo"
-
- colspecs = {
- sqltypes.Unicode : MSNVarchar,
- sqltypes.Integer : MSInteger,
- sqltypes.Smallinteger: MSSmallInteger,
- sqltypes.Numeric : MSNumeric,
- sqltypes.Float : MSFloat,
- sqltypes.DateTime : MSDateTime,
- sqltypes.Date : MSDate,
- sqltypes.Time : MSTime,
- sqltypes.String : MSString,
- sqltypes.Binary : MSGenericBinary,
- sqltypes.Boolean : MSBoolean,
- sqltypes.Text : MSText,
- sqltypes.UnicodeText : MSNText,
- sqltypes.CHAR: MSChar,
- sqltypes.NCHAR: MSNChar,
- sqltypes.TIMESTAMP: MSTimeStamp,
- }
-
- ischema_names = {
- 'int' : MSInteger,
- 'bigint': MSBigInteger,
- 'smallint' : MSSmallInteger,
- 'tinyint' : MSTinyInteger,
- 'varchar' : MSString,
- 'nvarchar' : MSNVarchar,
- 'char' : MSChar,
- 'nchar' : MSNChar,
- 'text' : MSText,
- 'ntext' : MSNText,
- 'decimal' : MSNumeric,
- 'numeric' : MSNumeric,
- 'float' : MSFloat,
- 'datetime' : MSDateTime,
- 'datetime2' : MSDateTime2,
- 'datetimeoffset' : MSDateTimeOffset,
- 'date': MSDate,
- 'time': MSTime,
- 'smalldatetime' : MSSmallDateTime,
- 'binary' : MSBinary,
- 'varbinary' : MSVarBinary,
- 'bit': MSBoolean,
- 'real' : MSFloat,
- 'image' : MSImage,
- 'timestamp': MSTimeStamp,
- 'money': MSMoney,
- 'smallmoney': MSSmallMoney,
- 'uniqueidentifier': MSUniqueIdentifier,
- 'sql_variant': MSVariant,
- }
-
- def __new__(cls, *args, **kwargs):
- if cls is not MSSQLDialect:
- # this gets called with the dialect specific class
- return super(MSSQLDialect, cls).__new__(cls)
- dbapi = kwargs.get('dbapi', None)
- if dbapi:
- dialect = dialect_mapping.get(dbapi.__name__)
- return dialect(**kwargs)
- else:
- return object.__new__(cls)
-
- def __init__(self,
- auto_identity_insert=True, query_timeout=None,
- text_as_varchar=False, use_scope_identity=False,
- has_window_funcs=False, max_identifier_length=None,
- schema_name="dbo", **opts):
- self.auto_identity_insert = bool(auto_identity_insert)
- self.query_timeout = int(query_timeout or 0)
- self.schema_name = schema_name
-
- # to-do: the options below should use server version introspection to set themselves on connection
- self.text_as_varchar = bool(text_as_varchar)
- self.use_scope_identity = bool(use_scope_identity)
- self.has_window_funcs = bool(has_window_funcs)
- self.max_identifier_length = int(max_identifier_length or 0) or \
- self.max_identifier_length
- super(MSSQLDialect, self).__init__(**opts)
-
- @classmethod
- def dbapi(cls, module_name=None):
- if module_name:
- try:
- dialect_cls = dialect_mapping[module_name]
- return dialect_cls.import_dbapi()
- except KeyError:
- raise exc.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name)
- else:
- for dialect_cls in [MSSQLDialect_pyodbc, MSSQLDialect_pymssql, MSSQLDialect_adodbapi]:
- try:
- return dialect_cls.import_dbapi()
- except ImportError, e:
- pass
- else:
- raise ImportError('No DBAPI module detected for MSSQL - please install pyodbc, pymssql, or adodbapi')
-
- @base.connection_memoize(('mssql', 'server_version_info'))
- def server_version_info(self, connection):
- """A tuple of the database server version.
-
- Formats the remote server version as a tuple of version values,
- e.g. ``(9, 0, 1399)``. If there are strings in the version number
- they will be in the tuple too, so don't count on these all being
- ``int`` values.
-
- This is a fast check that does not require a round trip. It is also
- cached per-Connection.
- """
- return connection.dialect._server_version_info(connection.connection)
-
- def _server_version_info(self, dbapi_con):
- """Return a tuple of the database's version number."""
- raise NotImplementedError()
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username='user')
- opts.update(url.query)
- if 'auto_identity_insert' in opts:
- self.auto_identity_insert = bool(int(opts.pop('auto_identity_insert')))
- if 'query_timeout' in opts:
- self.query_timeout = int(opts.pop('query_timeout'))
- if 'text_as_varchar' in opts:
- self.text_as_varchar = bool(int(opts.pop('text_as_varchar')))
- if 'use_scope_identity' in opts:
- self.use_scope_identity = bool(int(opts.pop('use_scope_identity')))
- if 'has_window_funcs' in opts:
- self.has_window_funcs = bool(int(opts.pop('has_window_funcs')))
- return self.make_connect_string(opts, url.query)
-
- def type_descriptor(self, typeobj):
- newobj = sqltypes.adapt_type(typeobj, self.colspecs)
- # Some types need to know about the dialect
- if isinstance(newobj, (MSText, MSNText)):
- newobj.dialect = self
- return newobj
-
- def do_savepoint(self, connection, name):
- util.warn("Savepoint support in mssql is experimental and may lead to data loss.")
- connection.execute("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
- connection.execute("SAVE TRANSACTION %s" % name)
-
- def do_release_savepoint(self, connection, name):
- pass
-
- @base.connection_memoize(('dialect', 'default_schema_name'))
- def get_default_schema_name(self, connection):
- query = "SELECT user_name() as user_name;"
- user_name = connection.scalar(sql.text(query))
- if user_name is not None:
- # now, get the default schema
- query = """
- SELECT default_schema_name FROM
- sys.database_principals
- WHERE name = :user_name
- AND type = 'S'
- """
- try:
- default_schema_name = connection.scalar(sql.text(query),
- user_name=user_name)
- if default_schema_name is not None:
- return default_schema_name
- except:
- pass
- return self.schema_name
-
- def table_names(self, connection, schema):
- s = select([tables.c.table_name], tables.c.table_schema==schema)
- return [row[0] for row in connection.execute(s)]
-
-
- def has_table(self, connection, tablename, schema=None):
-
- current_schema = schema or self.get_default_schema_name(connection)
- s = sql.select([columns],
- current_schema
- and sql.and_(columns.c.table_name==tablename, columns.c.table_schema==current_schema)
- or columns.c.table_name==tablename,
- )
-
- c = connection.execute(s)
- row = c.fetchone()
- return row is not None
-
- def reflecttable(self, connection, table, include_columns):
- # Get base columns
- if table.schema is not None:
- current_schema = table.schema
- else:
- current_schema = self.get_default_schema_name(connection)
-
- s = sql.select([columns],
- current_schema
- and sql.and_(columns.c.table_name==table.name, columns.c.table_schema==current_schema)
- or columns.c.table_name==table.name,
- order_by=[columns.c.ordinal_position])
-
- c = connection.execute(s)
- found_table = False
- while True:
- row = c.fetchone()
- if row is None:
- break
- found_table = True
- (name, type, nullable, charlen, numericprec, numericscale, default, collation) = (
- row[columns.c.column_name],
- row[columns.c.data_type],
- row[columns.c.is_nullable] == 'YES',
- row[columns.c.character_maximum_length],
- row[columns.c.numeric_precision],
- row[columns.c.numeric_scale],
- row[columns.c.column_default],
- row[columns.c.collation_name]
- )
- if include_columns and name not in include_columns:
- continue
-
- coltype = self.ischema_names.get(type, None)
-
- kwargs = {}
- if coltype in (MSString, MSChar, MSNVarchar, MSNChar, MSText, MSNText, MSBinary, MSVarBinary, sqltypes.Binary):
- kwargs['length'] = charlen
- if collation:
- kwargs['collation'] = collation
- if coltype == MSText or (coltype in (MSString, MSNVarchar) and charlen == -1):
- kwargs.pop('length')
-
- if issubclass(coltype, sqltypes.Numeric):
- kwargs['scale'] = numericscale
- kwargs['precision'] = numericprec
-
- if coltype is None:
- util.warn("Did not recognize type '%s' of column '%s'" % (type, name))
- coltype = sqltypes.NULLTYPE
-
- coltype = coltype(**kwargs)
- colargs = []
- if default is not None:
- colargs.append(schema.DefaultClause(sql.text(default)))
- table.append_column(schema.Column(name, coltype, nullable=nullable, autoincrement=False, *colargs))
-
- if not found_table:
- raise exc.NoSuchTableError(table.name)
-
- # We also run an sp_columns to check for identity columns:
- cursor = connection.execute("sp_columns @table_name = '%s', @table_owner = '%s'" % (table.name, current_schema))
- ic = None
- while True:
- row = cursor.fetchone()
- if row is None:
- break
- col_name, type_name = row[3], row[5]
- if type_name.endswith("identity") and col_name in table.c:
- ic = table.c[col_name]
- ic.autoincrement = True
- # setup a psuedo-sequence to represent the identity attribute - we interpret this at table.create() time as the identity attribute
- ic.sequence = schema.Sequence(ic.name + '_identity', 1, 1)
- # MSSQL: only one identity per table allowed
- cursor.close()
- break
- if not ic is None:
- try:
- cursor = connection.execute("select ident_seed(?), ident_incr(?)", table.fullname, table.fullname)
- row = cursor.fetchone()
- cursor.close()
- if not row is None:
- ic.sequence.start = int(row[0])
- ic.sequence.increment = int(row[1])
- except:
- # ignoring it, works just like before
- pass
-
- # Add constraints
- RR = ref_constraints
- TC = constraints
- C = key_constraints.alias('C') #information_schema.constraint_column_usage: the constrained column
- R = key_constraints.alias('R') #information_schema.constraint_column_usage: the referenced column
-
- # Primary key constraints
- s = sql.select([C.c.column_name, TC.c.constraint_type], sql.and_(TC.c.constraint_name == C.c.constraint_name,
- C.c.table_name == table.name,
- C.c.table_schema == (table.schema or current_schema)))
- c = connection.execute(s)
- for row in c:
- if 'PRIMARY' in row[TC.c.constraint_type.name] and row[0] in table.c:
- table.primary_key.add(table.c[row[0]])
-
- # Foreign key constraints
- s = sql.select([C.c.column_name,
- R.c.table_schema, R.c.table_name, R.c.column_name,
- RR.c.constraint_name, RR.c.match_option, RR.c.update_rule, RR.c.delete_rule],
- sql.and_(C.c.table_name == table.name,
- C.c.table_schema == (table.schema or current_schema),
- C.c.constraint_name == RR.c.constraint_name,
- R.c.constraint_name == RR.c.unique_constraint_name,
- C.c.ordinal_position == R.c.ordinal_position
- ),
- order_by = [RR.c.constraint_name, R.c.ordinal_position])
- rows = connection.execute(s).fetchall()
-
- def _gen_fkref(table, rschema, rtbl, rcol):
- if rschema == current_schema and not table.schema:
- return '.'.join([rtbl, rcol])
- else:
- return '.'.join([rschema, rtbl, rcol])
-
- # group rows by constraint ID, to handle multi-column FKs
- fknm, scols, rcols = (None, [], [])
- for r in rows:
- scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r
- # if the reflected schema is the default schema then don't set it because this will
- # play into the metadata key causing duplicates.
- if rschema == current_schema and not table.schema:
- schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
- else:
- schema.Table(rtbl, table.metadata, schema=rschema, autoload=True, autoload_with=connection)
- if rfknm != fknm:
- if fknm:
- table.append_constraint(schema.ForeignKeyConstraint(scols, [_gen_fkref(table, s, t, c) for s, t, c in rcols], fknm, link_to_name=True))
- fknm, scols, rcols = (rfknm, [], [])
- if not scol in scols:
- scols.append(scol)
- if not (rschema, rtbl, rcol) in rcols:
- rcols.append((rschema, rtbl, rcol))
-
- if fknm and scols:
- table.append_constraint(schema.ForeignKeyConstraint(scols, [_gen_fkref(table, s, t, c) for s, t, c in rcols], fknm, link_to_name=True))
-
-
-class MSSQLDialect_pymssql(MSSQLDialect):
- supports_sane_rowcount = False
- max_identifier_length = 30
-
- @classmethod
- def import_dbapi(cls):
- import pymssql as module
- # pymmsql doesn't have a Binary method. we use string
- # TODO: monkeypatching here is less than ideal
- module.Binary = lambda st: str(st)
- try:
- module.version_info = tuple(map(int, module.__version__.split('.')))
- except:
- module.version_info = (0, 0, 0)
- return module
-
- def __init__(self, **params):
- super(MSSQLDialect_pymssql, self).__init__(**params)
- self.use_scope_identity = True
-
- # pymssql understands only ascii
- if self.convert_unicode:
- util.warn("pymssql does not support unicode")
- self.encoding = params.get('encoding', 'ascii')
-
- self.colspecs = MSSQLDialect.colspecs.copy()
- self.ischema_names = MSSQLDialect.ischema_names.copy()
- self.ischema_names['date'] = MSDateTimeAsDate
- self.colspecs[sqltypes.Date] = MSDateTimeAsDate
- self.ischema_names['time'] = MSDateTimeAsTime
- self.colspecs[sqltypes.Time] = MSDateTimeAsTime
-
- def create_connect_args(self, url):
- r = super(MSSQLDialect_pymssql, self).create_connect_args(url)
- if hasattr(self, 'query_timeout'):
- if self.dbapi.version_info > (0, 8, 0):
- r[1]['timeout'] = self.query_timeout
- else:
- self.dbapi._mssql.set_query_timeout(self.query_timeout)
- return r
-
- def make_connect_string(self, keys, query):
- if keys.get('port'):
- # pymssql expects port as host:port, not a separate arg
- keys['host'] = ''.join([keys.get('host', ''), ':', str(keys['port'])])
- del keys['port']
- return [[], keys]
-
- def is_disconnect(self, e):
- return isinstance(e, self.dbapi.DatabaseError) and "Error 10054" in str(e)
-
- def do_begin(self, connection):
- pass
-
-
-class MSSQLDialect_pyodbc(MSSQLDialect):
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
- # PyODBC unicode is broken on UCS-4 builds
- supports_unicode = sys.maxunicode == 65535
- supports_unicode_statements = supports_unicode
- execution_ctx_cls = MSSQLExecutionContext_pyodbc
-
- def __init__(self, description_encoding='latin-1', **params):
- super(MSSQLDialect_pyodbc, self).__init__(**params)
- self.description_encoding = description_encoding
-
- if self.server_version_info < (10,):
- self.colspecs = MSSQLDialect.colspecs.copy()
- self.ischema_names = MSSQLDialect.ischema_names.copy()
- self.ischema_names['date'] = MSDateTimeAsDate
- self.colspecs[sqltypes.Date] = MSDateTimeAsDate
- self.ischema_names['time'] = MSDateTimeAsTime
- self.colspecs[sqltypes.Time] = MSDateTimeAsTime
-
- # FIXME: scope_identity sniff should look at server version, not the ODBC driver
- # whether use_scope_identity will work depends on the version of pyodbc
- try:
- import pyodbc
- self.use_scope_identity = hasattr(pyodbc.Cursor, 'nextset')
- except:
- pass
-
- @classmethod
- def import_dbapi(cls):
- import pyodbc as module
- return module
-
- def make_connect_string(self, keys, query):
- if 'max_identifier_length' in keys:
- self.max_identifier_length = int(keys.pop('max_identifier_length'))
-
- if 'odbc_connect' in keys:
- connectors = [urllib.unquote_plus(keys.pop('odbc_connect'))]
- else:
- dsn_connection = 'dsn' in keys or ('host' in keys and 'database' not in keys)
- if dsn_connection:
- connectors= ['dsn=%s' % (keys.pop('host', '') or keys.pop('dsn', ''))]
- else:
- port = ''
- if 'port' in keys and not 'port' in query:
- port = ',%d' % int(keys.pop('port'))
-
- connectors = ["DRIVER={%s}" % keys.pop('driver', 'SQL Server'),
- 'Server=%s%s' % (keys.pop('host', ''), port),
- 'Database=%s' % keys.pop('database', '') ]
-
- user = keys.pop("user", None)
- if user:
- connectors.append("UID=%s" % user)
- connectors.append("PWD=%s" % keys.pop('password', ''))
- else:
- connectors.append("TrustedConnection=Yes")
-
- # if set to 'Yes', the ODBC layer will try to automagically convert
- # textual data from your database encoding to your client encoding
- # This should obviously be set to 'No' if you query a cp1253 encoded
- # database from a latin1 client...
- if 'odbc_autotranslate' in keys:
- connectors.append("AutoTranslate=%s" % keys.pop("odbc_autotranslate"))
-
- connectors.extend(['%s=%s' % (k,v) for k,v in keys.iteritems()])
-
- return [[";".join (connectors)], {}]
-
- def is_disconnect(self, e):
- if isinstance(e, self.dbapi.ProgrammingError):
- return "The cursor's connection has been closed." in str(e) or 'Attempt to use a closed connection.' in str(e)
- elif isinstance(e, self.dbapi.Error):
- return '[08S01]' in str(e)
- else:
- return False
-
-
- def _server_version_info(self, dbapi_con):
- """Convert a pyodbc SQL_DBMS_VER string into a tuple."""
- version = []
- r = re.compile('[.\-]')
- for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)):
- try:
- version.append(int(n))
- except ValueError:
- version.append(n)
- return tuple(version)
-
-class MSSQLDialect_adodbapi(MSSQLDialect):
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = True
- supports_unicode = sys.maxunicode == 65535
- supports_unicode_statements = True
-
- @classmethod
- def import_dbapi(cls):
- import adodbapi as module
- return module
-
- colspecs = MSSQLDialect.colspecs.copy()
- colspecs[sqltypes.DateTime] = MSDateTime_adodbapi
-
- ischema_names = MSSQLDialect.ischema_names.copy()
- ischema_names['datetime'] = MSDateTime_adodbapi
-
- def make_connect_string(self, keys, query):
- connectors = ["Provider=SQLOLEDB"]
- if 'port' in keys:
- connectors.append ("Data Source=%s, %s" % (keys.get("host"), keys.get("port")))
- else:
- connectors.append ("Data Source=%s" % keys.get("host"))
- connectors.append ("Initial Catalog=%s" % keys.get("database"))
- user = keys.get("user")
- if user:
- connectors.append("User Id=%s" % user)
- connectors.append("Password=%s" % keys.get("password", ""))
- else:
- connectors.append("Integrated Security=SSPI")
- return [[";".join (connectors)], {}]
-
- def is_disconnect(self, e):
- return isinstance(e, self.dbapi.adodbapi.DatabaseError) and "'connection failure'" in str(e)
-
-
-dialect_mapping = {
- 'pymssql': MSSQLDialect_pymssql,
- 'pyodbc': MSSQLDialect_pyodbc,
- 'adodbapi': MSSQLDialect_adodbapi
- }
-
-
-class MSSQLCompiler(compiler.DefaultCompiler):
- operators = compiler.OPERATORS.copy()
- operators.update({
- sql_operators.concat_op: '+',
- sql_operators.match_op: lambda x, y: "CONTAINS (%s, %s)" % (x, y)
- })
-
- functions = compiler.DefaultCompiler.functions.copy()
- functions.update (
- {
- sql_functions.now: 'CURRENT_TIMESTAMP',
- sql_functions.current_date: 'GETDATE()',
- 'length': lambda x: "LEN(%s)" % x,
- sql_functions.char_length: lambda x: "LEN(%s)" % x
- }
- )
-
- extract_map = compiler.DefaultCompiler.extract_map.copy()
- extract_map.update ({
- 'doy': 'dayofyear',
- 'dow': 'weekday',
- 'milliseconds': 'millisecond',
- 'microseconds': 'microsecond'
- })
-
- def __init__(self, *args, **kwargs):
- super(MSSQLCompiler, self).__init__(*args, **kwargs)
- self.tablealiases = {}
-
- def get_select_precolumns(self, select):
- """ MS-SQL puts TOP, it's version of LIMIT here """
- if select._distinct or select._limit:
- s = select._distinct and "DISTINCT " or ""
-
- if select._limit:
- if not select._offset:
- s += "TOP %s " % (select._limit,)
- else:
- if not self.dialect.has_window_funcs:
- raise exc.InvalidRequestError('MSSQL does not support LIMIT with an offset')
- return s
- return compiler.DefaultCompiler.get_select_precolumns(self, select)
-
- def limit_clause(self, select):
- # Limit in mssql is after the select keyword
- return ""
-
- def visit_select(self, select, **kwargs):
- """Look for ``LIMIT`` and OFFSET in a select statement, and if
- so tries to wrap it in a subquery with ``row_number()`` criterion.
-
- """
- if self.dialect.has_window_funcs and not getattr(select, '_mssql_visit', None) and select._offset:
- # to use ROW_NUMBER(), an ORDER BY is required.
- orderby = self.process(select._order_by_clause)
- if not orderby:
- raise exc.InvalidRequestError('MSSQL requires an order_by when using an offset.')
-
- _offset = select._offset
- _limit = select._limit
- select._mssql_visit = True
- select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("mssql_rn")).order_by(None).alias()
-
- limitselect = sql.select([c for c in select.c if c.key!='mssql_rn'])
- limitselect.append_whereclause("mssql_rn>%d" % _offset)
- if _limit is not None:
- limitselect.append_whereclause("mssql_rn<=%d" % (_limit + _offset))
- return self.process(limitselect, iswrapper=True, **kwargs)
- else:
- return compiler.DefaultCompiler.visit_select(self, select, **kwargs)
-
- def _schema_aliased_table(self, table):
- if getattr(table, 'schema', None) is not None:
- if table not in self.tablealiases:
- self.tablealiases[table] = table.alias()
- return self.tablealiases[table]
- else:
- return None
-
- def visit_table(self, table, mssql_aliased=False, **kwargs):
- if mssql_aliased:
- return super(MSSQLCompiler, self).visit_table(table, **kwargs)
-
- # alias schema-qualified tables
- alias = self._schema_aliased_table(table)
- if alias is not None:
- return self.process(alias, mssql_aliased=True, **kwargs)
- else:
- return super(MSSQLCompiler, self).visit_table(table, **kwargs)
-
- def visit_alias(self, alias, **kwargs):
- # translate for schema-qualified table aliases
- self.tablealiases[alias.original] = alias
- kwargs['mssql_aliased'] = True
- return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
-
- def visit_extract(self, extract):
- field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
-
- def visit_rollback_to_savepoint(self, savepoint_stmt):
- return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
-
- def visit_column(self, column, result_map=None, **kwargs):
- if column.table is not None and \
- (not self.isupdate and not self.isdelete) or self.is_subquery():
- # translate for schema-qualified table aliases
- t = self._schema_aliased_table(column.table)
- if t is not None:
- converted = expression._corresponding_column_or_error(t, column)
-
- if result_map is not None:
- result_map[column.name.lower()] = (column.name, (column, ), column.type)
-
- return super(MSSQLCompiler, self).visit_column(converted, result_map=None, **kwargs)
-
- return super(MSSQLCompiler, self).visit_column(column, result_map=result_map, **kwargs)
-
- def visit_binary(self, binary, **kwargs):
- """Move bind parameters to the right-hand side of an operator, where
- possible.
-
- """
- if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq \
- and not isinstance(binary.right, expression._BindParamClause):
- return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator), **kwargs)
- else:
- if (binary.operator is operator.eq or binary.operator is operator.ne) and (
- (isinstance(binary.left, expression._FromGrouping) and isinstance(binary.left.element, expression._ScalarSelect)) or \
- (isinstance(binary.right, expression._FromGrouping) and isinstance(binary.right.element, expression._ScalarSelect)) or \
- isinstance(binary.left, expression._ScalarSelect) or isinstance(binary.right, expression._ScalarSelect)):
- op = binary.operator == operator.eq and "IN" or "NOT IN"
- return self.process(expression._BinaryExpression(binary.left, binary.right, op), **kwargs)
- return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
-
- def visit_insert(self, insert_stmt):
- insert_select = False
- if insert_stmt.parameters:
- insert_select = [p for p in insert_stmt.parameters.values() if isinstance(p, sql.Select)]
- if insert_select:
- self.isinsert = True
- colparams = self._get_colparams(insert_stmt)
- preparer = self.preparer
-
- insert = ' '.join(["INSERT"] +
- [self.process(x) for x in insert_stmt._prefixes])
-
- if not colparams and not self.dialect.supports_default_values and not self.dialect.supports_empty_insert:
- raise exc.CompileError(
- "The version of %s you are using does not support empty inserts." % self.dialect.name)
- elif not colparams and self.dialect.supports_default_values:
- return (insert + " INTO %s DEFAULT VALUES" % (
- (preparer.format_table(insert_stmt.table),)))
- else:
- return (insert + " INTO %s (%s) SELECT %s" %
- (preparer.format_table(insert_stmt.table),
- ', '.join([preparer.format_column(c[0])
- for c in colparams]),
- ', '.join([c[1] for c in colparams])))
- else:
- return super(MSSQLCompiler, self).visit_insert(insert_stmt)
-
- def label_select_column(self, select, column, asfrom):
- if isinstance(column, expression.Function):
- return column.label(None)
- else:
- return super(MSSQLCompiler, self).label_select_column(select, column, asfrom)
-
- def for_update_clause(self, select):
- # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
- return ''
-
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
-
- # MSSQL only allows ORDER BY in subqueries if there is a LIMIT
- if order_by and (not self.is_subquery() or select._limit):
- return " ORDER BY " + order_by
- else:
- return ""
-
-
-class MSSQLSchemaGenerator(compiler.SchemaGenerator):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect).get_col_spec()
-
- if column.nullable is not None:
- if not column.nullable or column.primary_key:
- colspec += " NOT NULL"
- else:
- colspec += " NULL"
-
- if not column.table:
- raise exc.InvalidRequestError("mssql requires Table-bound columns in order to generate DDL")
-
- seq_col = _table_sequence_column(column.table)
-
- # install a IDENTITY Sequence if we have an implicit IDENTITY column
- if seq_col is column:
- sequence = getattr(column, 'sequence', None)
- if sequence:
- start, increment = sequence.start or 1, sequence.increment or 1
- else:
- start, increment = 1, 1
- colspec += " IDENTITY(%s,%s)" % (start, increment)
- else:
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
-
- return colspec
-
-class MSSQLSchemaDropper(compiler.SchemaDropper):
- def visit_index(self, index):
- self.append("\nDROP INDEX %s.%s" % (
- self.preparer.quote_identifier(index.table.name),
- self.preparer.quote(self._validate_identifier(index.name, False), index.quote)
- ))
- self.execute()
-
-
-class MSSQLIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
-
- def __init__(self, dialect):
- super(MSSQLIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']')
-
- def _escape_identifier(self, value):
- #TODO: determine MSSQL's escaping rules
- return value
-
- def quote_schema(self, schema, force=True):
- """Prepare a quoted table and schema name."""
- result = '.'.join([self.quote(x, force) for x in schema.split('.')])
- return result
-
-dialect = MSSQLDialect
-dialect.statement_compiler = MSSQLCompiler
-dialect.schemagenerator = MSSQLSchemaGenerator
-dialect.schemadropper = MSSQLSchemaDropper
-dialect.preparer = MSSQLIdentifierPreparer
-