diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-09-01 00:11:10 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-09-11 14:17:10 -0400 |
commit | 31f80b9eaeb3c3435b7f6679b41e434478b1d11c (patch) | |
tree | 9802d4470d78768ba2a8812b47fae0f91e689d5c | |
parent | 4c97ea116c3686cb03f566f16b0a0e9a9fd33968 (diff) | |
download | sqlalchemy-oracle_numeric.tar.gz |
Refactor for cx_Oracle version 6oracle_numeric
Drops support for cx_Oracle prior to version 5.x, reworks
numeric and binary support.
Fixes: #4064
Change-Id: Ib9ae9aba430c15cd2a6eeb4e5e3fd8e97b5fe480
25 files changed, 3073 insertions, 2906 deletions
diff --git a/doc/build/changelog/migration_12.rst b/doc/build/changelog/migration_12.rst index 87c08dc21..a670bb394 100644 --- a/doc/build/changelog/migration_12.rst +++ b/doc/build/changelog/migration_12.rst @@ -1358,6 +1358,72 @@ The above will render:: Dialect Improvements and Changes - Oracle ========================================= +.. _change_cxoracle_12: + +Major Refactor to cx_Oracle Dialect, Typing System +-------------------------------------------------- + +With the introduction of the 6.x series of the cx_Oracle DBAPI, SQLAlchemy's +cx_Oracle dialect has been reworked and simplified to take advantage of recent +improvements in cx_Oracle as well as dropping support for patterns that were +more relevant before the 5.x series of cx_Oracle. + +* The minimum cx_Oracle version supported is now 5.1.3; 5.3 or the most recent + 6.x series are recommended. + +* The handling of datatypes has been refactored. The ``cursor.setinputsizes()`` + method is no longer used for any datatype except LOB types, per advice from + cx_Oracle's developers. As a result, the parameters ``auto_setinputsizes`` + and ``exclude_setinputsizes`` are deprecated and no longer have any effect. + +* The ``coerce_to_decimal`` flag, when set to False to indicate that coercion + of numeric types with precision and scale to ``Decimal`` should not occur, + only impacts untyped (e.g. plain string with no :class:`.TypeEngine` objects) + statements. A Core expression that includes a :class:`.Numeric` type or + subtype will now follow the decimal coercion rules of that type. + +* The "two phase" transaction support in the dialect, already dropped for the + 6.x series of cx_Oracle, has now been removed entirely as this feature has + never worked correctly and is unlikely to have been in production use. + As a result, the ``allow_twophase`` dialect flag is deprecated and also has no + effect. + +* Fixed a bug involving the column keys present with RETURNING. Given + a statement as follows:: + + result = conn.execute(table.insert().values(x=5).returning(table.c.a, table.c.b)) + + Previously, the keys in each row of the result would be ``ret_0`` and ``ret_1``, + which are identifiers internal to the cx_Oracle RETURNING implementation. + The keys will now be ``a`` and ``b`` as is expected for other dialects. + +* cx_Oracle's LOB datatype represents return values as a ``cx_Oracle.LOB`` + object, which is a cursor-associated proxy that returns the ultimate data + value via a ``.read()`` method. Historically, if more rows were read before + these LOB objects were consumed (specifically, more rows than the value of + cursor.arraysize which causes a new batch of rows to be read), these LOB + objects would raise the error "LOB variable no longer valid after subsequent + fetch". SQLAlchemy worked around this by both automatically calling + ``.read()`` upon these LOBs within its typing system, as well as using a + special ``BufferedColumnResultSet`` which would ensure this data was buffered + in case a call like ``cursor.fetchmany()`` or ``cursor.fetchall()`` were + used. + + The dialect now makes use of a cx_Oracle outpttypehandler to handle these + ``.read()`` calls, so that they are always called up front regardless of how + many rows are being fetched, so that this error can no longer occur. As a + result, the use of the ``BufferedColumnResultSet``, as well as some other + internals to the Core ``ResultSet`` that were specific to this use case, + have been removed. The type objects are also simplified as they no longer + need to process a binary column result. + + Additionally, cx_Oracle 6.x has removed the conditions under which this error + occurs in any case, so the error is no longer possible. The error + can occur on SQLAlchemy in the case that the seldom (if ever) used + ``auto_convert_lobs=False`` option is in use, in conjunction with the + previous 5.x series of cx_Oracle, and more rows are read before the LOB + objects can be consumed. Upgrading to cx_Oracle 6.x will resolve that issue. + .. _change_4003: Oracle Unique, Check constraints now reflected diff --git a/doc/build/changelog/unreleased_12/oracle_refactor.rst b/doc/build/changelog/unreleased_12/oracle_refactor.rst new file mode 100644 index 000000000..2a30645fe --- /dev/null +++ b/doc/build/changelog/unreleased_12/oracle_refactor.rst @@ -0,0 +1,61 @@ +.. change:: + :tags: bug, oracle + :tickets: 4064 + + Partial support for persisting and retrieving the Oracle value + "infinity" is implemented with cx_Oracle, using Python float values + only, e.g. ``float("inf")``. Decimal support is not yet fulfilled by + the cx_Oracle DBAPI driver. + +.. change:: + :tags: bug, oracle + + The cx_Oracle dialect has been reworked and modernized to take advantage of + new patterns that weren't present in the old 4.x series of cx_Oracle. This + includes that the minimum cx_Oracle version is the 5.x series and that + cx_Oracle 6.x is now fully tested. The most significant change involves + type conversions, primarily regarding the numeric / floating point and LOB + datatypes, making more effective use of cx_Oracle type handling hooks to + simplify how bind parameter and result data is processed. + + .. seealso:: + + :ref:`change_cxoracle_12` + +.. change:: + :tags: bug, oracle + :tickets: 3997 + + two phase support for cx_Oracle has been completely removed for all + versions of cx_Oracle, whereas in 1.2.0b1 this change only took effect for + the 6.x series of cx_Oracle. This feature never worked correctly + in any version of cx_Oracle and in cx_Oracle 6.x, the API which SQLAlchemy + relied upon was removed. + + .. seealso:: + + :ref:`change_cxoracle_12` + +.. change:: + :tags: bug, oracle + + The column keys present in a result set when using :meth:`.Insert.returning` + with the cx_Oracle backend now use the correct column / label names + like that of all other dialects. Previously, these came out as + ``ret_nnn``. + + .. seealso:: + + :ref:`change_cxoracle_12` + +.. change:: + :tags: bug, oracle + + Several parameters to the cx_Oracle dialect are now deprecated and will + have no effect: ``auto_setinputsizes``, ``exclude_setinputsizes``, + ``allow_twophase``. + + .. seealso:: + + :ref:`change_cxoracle_12` + diff --git a/lib/sqlalchemy/dialects/oracle/__init__.py b/lib/sqlalchemy/dialects/oracle/__init__.py index 210fe501f..b081b134a 100644 --- a/lib/sqlalchemy/dialects/oracle/__init__.py +++ b/lib/sqlalchemy/dialects/oracle/__init__.py @@ -19,6 +19,7 @@ from sqlalchemy.dialects.oracle.base import \ __all__ = ( 'VARCHAR', 'NVARCHAR', 'CHAR', 'DATE', 'NUMBER', 'BLOB', 'BFILE', 'CLOB', 'NCLOB', 'TIMESTAMP', 'RAW', - 'FLOAT', 'DOUBLE_PRECISION', 'LONG', 'dialect', 'INTERVAL', + 'FLOAT', 'DOUBLE_PRECISION', 'BINARY_DOUBLE' 'BINARY_FLOAT', + 'LONG', 'dialect', 'INTERVAL', 'VARCHAR2', 'NVARCHAR2', 'ROWID' ) diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index 9478f6531..b2eb44b5b 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -422,6 +422,28 @@ class DOUBLE_PRECISION(sqltypes.Numeric): precision=precision, scale=scale, asdecimal=asdecimal) +class BINARY_DOUBLE(sqltypes.Numeric): + __visit_name__ = 'BINARY_DOUBLE' + + def __init__(self, precision=None, scale=None, asdecimal=None): + if asdecimal is None: + asdecimal = False + + super(BINARY_DOUBLE, self).__init__( + precision=precision, scale=scale, asdecimal=asdecimal) + + +class BINARY_FLOAT(sqltypes.Numeric): + __visit_name__ = 'BINARY_FLOAT' + + def __init__(self, precision=None, scale=None, asdecimal=None): + if asdecimal is None: + asdecimal = False + + super(BINARY_FLOAT, self).__init__( + precision=precision, scale=scale, asdecimal=asdecimal) + + class BFILE(sqltypes.LargeBinary): __visit_name__ = 'BFILE' @@ -557,6 +579,12 @@ class OracleTypeCompiler(compiler.GenericTypeCompiler): def visit_DOUBLE_PRECISION(self, type_, **kw): return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) + def visit_BINARY_DOUBLE(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) + + def visit_BINARY_FLOAT(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_FLOAT", **kw) + def visit_NUMBER(self, type_, **kw): return self._generate_numeric(type_, "NUMBER", **kw) @@ -746,12 +774,14 @@ class OracleCompiler(compiler.SQLCompiler): def returning_clause(self, stmt, returning_cols): columns = [] binds = [] + for i, column in enumerate( expression._select_iterables(returning_cols)): if column.type._has_column_expression: col_expr = column.type.column_expression(column) else: col_expr = column + outparam = sql.outparam("ret_%d" % i, type_=column.type) self.binds[outparam.key] = outparam binds.append( @@ -760,7 +790,8 @@ class OracleCompiler(compiler.SQLCompiler): self.process(col_expr, within_columns_clause=False)) self._add_to_result_map( - outparam.key, outparam.key, + getattr(col_expr, 'name', col_expr.anon_label), + getattr(col_expr, 'name', col_expr.anon_label), (column, getattr(column, 'name', None), getattr(column, 'key', None)), column.type diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index 86562dfd0..56a0425c8 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -25,11 +25,6 @@ directly as a TNS name. Additional arguments which may be specified either as query string arguments on the URL, or as keyword arguments to :func:`.create_engine()` are: -* ``allow_twophase`` - enable two-phase transactions. This argument is - **deprecated** as of the cx_Oracle 5.x series, two phase transactions are - not supported under cx_Oracle and as of cx_Oracle 6.0b1 this feature is - removed entirely. - * ``arraysize`` - set the cx_oracle.arraysize value on cursors, defaulted to 50. This setting is significant with cx_Oracle as the contents of LOB objects are only readable within a "live" row (e.g. within a batch of @@ -37,25 +32,10 @@ on the URL, or as keyword arguments to :func:`.create_engine()` are: * ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`. -* ``auto_setinputsizes`` - the cx_oracle.setinputsizes() call is issued for - all bind parameters. This is required for LOB datatypes but can be - disabled to reduce overhead. Defaults to ``True``. Specific types - can be excluded from this process using the ``exclude_setinputsizes`` - parameter. - * ``coerce_to_unicode`` - see :ref:`cx_oracle_unicode` for detail. * ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail. -* ``exclude_setinputsizes`` - a tuple or list of string DBAPI type names to - be excluded from the "auto setinputsizes" feature. The type names here - must match DBAPI types that are found in the "cx_Oracle" module namespace, - such as cx_Oracle.UNICODE, cx_Oracle.NCLOB, etc. Defaults to - ``(STRING, UNICODE)``. - - .. versionadded:: 0.8 specific DBAPI types can be excluded from the - auto_setinputsizes feature via the exclude_setinputsizes attribute. - * ``mode`` - This is given the string value of SYSDBA or SYSOPER, or alternatively an integer value. This value is only available as a URL query string argument. @@ -86,89 +66,43 @@ unicode those column values that are of type ``NVARCHAR`` or ``NCLOB``. For column values that are of type ``VARCHAR`` or other non-unicode string types, it will return values as Python strings (e.g. bytestrings). -The cx_Oracle SQLAlchemy dialect presents two different options for the use -case of returning ``VARCHAR`` column values as Python unicode objects under +The cx_Oracle SQLAlchemy dialect presents several different options for the use +case of receiving ``VARCHAR`` column values as Python unicode objects under Python 2: -* the cx_Oracle DBAPI has the ability to coerce all string results to Python - unicode objects unconditionally using output type handlers. This has - the advantage that the unicode conversion is global to all statements - at the cx_Oracle driver level, meaning it works with raw textual SQL - statements that have no typing information associated. However, this system - has been observed to incur signfiicant performance overhead, not only - because it takes effect for all string values unconditionally, but also - because cx_Oracle under Python 2 seems to use a pure-Python function call in - order to do the decode operation, which under cPython can orders of - magnitude slower than doing it using C functions alone. - -* SQLAlchemy has unicode-decoding services built in, and when using - SQLAlchemy's C extensions, these functions do not use any Python function - calls and are very fast. The disadvantage to this approach is that the - unicode conversion only takes effect for statements where the - :class:`.Unicode` type or :class:`.String` type with - ``convert_unicode=True`` is explicitly associated with the result column. - This is the case for any ORM or Core query or SQL expression as well as for - a :func:`.text` construct that specifies output column types, so in the vast - majority of cases this is not an issue. However, when sending a completely - raw string to :meth:`.Connection.execute`, this typing information isn't - present, unless the string is handled within a :func:`.text` construct that - adds typing information. - -As of version 0.9.2 of SQLAlchemy, the default approach is to use SQLAlchemy's -typing system. This keeps cx_Oracle's expensive Python 2 approach -disabled unless the user explicitly wants it. Under Python 3, SQLAlchemy -detects that cx_Oracle is returning unicode objects natively and cx_Oracle's -system is used. - -To re-enable cx_Oracle's output type handler under Python 2, the -``coerce_to_unicode=True`` flag (new in 0.9.4) can be passed to -:func:`.create_engine`:: - - engine = create_engine("oracle+cx_oracle://dsn", coerce_to_unicode=True) +* When using Core expression objects as well as the ORM, SQLAlchemy's + unicode-decoding services are available, which are established by + using either the :class:`.Unicode` datatype or by using the + :class:`.String` datatype with :paramref:`.String.convert_unicode` set + to True. -Alternatively, to run a pure string SQL statement and get ``VARCHAR`` results -as Python unicode under Python 2 without using cx_Oracle's native handlers, -the :func:`.text` feature can be used:: +* When using raw SQL strings, typing behavior can be added for unicode + conversion using the :func:`.text` construct:: from sqlalchemy import text, Unicode result = conn.execute( text("select username from user").columns(username=Unicode)) -.. versionchanged:: 0.9.2 cx_Oracle's outputtypehandlers are no longer used - for unicode results of non-unicode datatypes in Python 2, after they were - identified as a major performance bottleneck. SQLAlchemy's own unicode - facilities are used instead. +* Otherwise, when using raw SQL strings sent directly to an ``.execute()`` + method without any Core typing behavior added, the flag + ``coerce_to_unicode=True`` flag can be passed to :func:`.create_engine` + which will add an unconditional unicode processor to cx_Oracle for all + string values:: + + engine = create_engine("oracle+cx_oracle://dsn", coerce_to_unicode=True) + + The above approach will add significant latency to result-set fetches + of plain string values. -.. versionadded:: 0.9.4 Added the ``coerce_to_unicode`` flag, to re-enable - cx_Oracle's outputtypehandler and revert to pre-0.9.2 behavior. .. _cx_oracle_returning: RETURNING Support ----------------- -The cx_oracle DBAPI supports a limited subset of Oracle's already limited -RETURNING support. Typically, results can only be guaranteed for at most one -column being returned; this is the typical case when SQLAlchemy uses RETURNING -to get just the value of a primary-key-associated sequence value. -Additional column expressions will cause problems in a non-determinative way, -due to cx_oracle's lack of support for the OCI_DATA_AT_EXEC API which is -required for more complex RETURNING scenarios. - -For this reason, stability may be enhanced by disabling RETURNING support -completely; SQLAlchemy otherwise will use RETURNING to fetch newly -sequence-generated primary keys. As illustrated in :ref:`oracle_returning`:: - - engine = create_engine("oracle://scott:tiger@dsn", - implicit_returning=False) - -.. seealso:: - - http://docs.oracle.com/cd/B10501_01/appdev.920/a96584/oci05bnd.htm#420693 - - OCI documentation for RETURNING - - http://sourceforge.net/mailarchive/message.php?msg_id=31338136 - - cx_oracle developer commentary +The cx_Oracle dialect implements RETURNING using OUT parameters. +The dialect supports RETURNING fully, however cx_Oracle 6 is recommended +for complete support. .. _cx_oracle_lob: @@ -177,16 +111,26 @@ LOB Objects cx_oracle returns oracle LOBs using the cx_oracle.LOB object. SQLAlchemy converts these to strings so that the interface of the Binary type is -consistent with that of other backends, and so that the linkage to a live -cursor is not needed in scenarios like result.fetchmany() and -result.fetchall(). This means that by default, LOB objects are fully fetched -unconditionally by SQLAlchemy, and the linkage to a live cursor is broken. +consistent with that of other backends, which takes place within a cx_Oracle +outputtypehandler. + +cx_Oracle prior to version 6 would require that LOB objects be read before +a new batch of rows would be read, as determined by the ``cursor.arraysize``. +As of the 6 series, this limitation has been lifted. Nevertheless, because +SQLAlchemy pre-reads these LOBs up front, this issue is avoided in any case. -To disable this processing, pass ``auto_convert_lobs=False`` to -:func:`.create_engine()`. +To disable the auto "read()" feature of the dialect, the flag +``auto_convert_lobs=False`` may be passed to :func:`.create_engine`. Under +the cx_Oracle 5 series, having this flag turned off means there is the chance +of reading from a stale LOB object if not read as it is fetched. With +cx_Oracle 6, this issue is resolved. -Two Phase Transaction Support ------------------------------ +.. versionchanged:: 1.2 the LOB handling system has been greatly simplified + internally to make use of outputtypehandlers, and no longer makes use + of alternate "buffered" result set objects. + +Two Phase Transactions Not Supported +------------------------------------- Two phase transactions are **not supported** under cx_Oracle due to poor driver support. As of cx_Oracle 6.0b1, the interface for @@ -194,72 +138,45 @@ two phase transactions has been changed to be more of a direct pass-through to the underlying OCI layer with less automation. The additional logic to support this system is not implemented in SQLAlchemy. - .. _cx_oracle_numeric: Precision Numerics ------------------ -The SQLAlchemy dialect goes through a lot of steps to ensure -that decimal numbers are sent and received with full accuracy. -An "outputtypehandler" callable is associated with each -cx_oracle connection object which detects numeric types and -receives them as string values, instead of receiving a Python -``float`` directly, which is then passed to the Python -``Decimal`` constructor. The :class:`.Numeric` and -:class:`.Float` types under the cx_oracle dialect are aware of -this behavior, and will coerce the ``Decimal`` to ``float`` if -the ``asdecimal`` flag is ``False`` (default on :class:`.Float`, -optional on :class:`.Numeric`). - -Because the handler coerces to ``Decimal`` in all cases first, -the feature can detract significantly from performance. -If precision numerics aren't required, the decimal handling -can be disabled by passing the flag ``coerce_to_decimal=False`` -to :func:`.create_engine`:: +SQLAlchemy's numeric types can handle receiving and returning values as Python +``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a +subclass such as :class:`.Float`, :class:`.oracle.DOUBLE_PRECISION` etc. is in +use, the :paramref:`.Numeric.asdecimal` flag determines if values should be +coerced to ``Decimal`` upon return, or returned as float objects. To make +matters more complicated under Oracle, Oracle's ``NUMBER`` type can also +represent integer values if the "scale" is zero, so the Oracle-specific +:class:`.oracle.NUMBER` type takes this into account as well. + +The cx_Oracle dialect makes extensive use of connection- and cursor-level +"outputtypehandler" callables in order to coerce numeric values as requested. +These callables are specific to the specific flavor of :class:`.Numeric` in +use, as well as if no SQLAlchemy typing objects are present. There are +observed scenarios where Oracle may sends incomplete or ambiguous information +about the numeric types being returned, such as a query where the numeric types +are buried under multiple levels of subquery. The type handlers do their best +to make the right decision in all cases, deferring to the underlying cx_Oracle +DBAPI for all those cases where the driver can make the best decision. + +When no typing objects are present, as when executing plain SQL strings, a +default "outputtypehandler" is present which will generally return numeric +values which specify precision and scale as Python ``Decimal`` objects. To +disable this coercion to decimal for performance reasons, pass the flag +``coerce_to_decimal=False`` to :func:`.create_engine`:: engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False) -.. versionadded:: 0.7.6 - Add the ``coerce_to_decimal`` flag. - -Another alternative to performance is to use the -`cdecimal <http://pypi.python.org/pypi/cdecimal/>`_ library; -see :class:`.Numeric` for additional notes. - -The handler attempts to use the "precision" and "scale" -attributes of the result set column to best determine if -subsequent incoming values should be received as ``Decimal`` as -opposed to int (in which case no processing is added). There are -several scenarios where OCI_ does not provide unambiguous data -as to the numeric type, including some situations where -individual rows may return a combination of floating point and -integer values. Certain values for "precision" and "scale" have -been observed to determine this scenario. When it occurs, the -outputtypehandler receives as string and then passes off to a -processing function which detects, for each returned value, if a -decimal point is present, and if so converts to ``Decimal``, -otherwise to int. The intention is that simple int-based -statements like "SELECT my_seq.nextval() FROM DUAL" continue to -return ints and not ``Decimal`` objects, and that any kind of -floating point value is received as a string so that there is no -floating point loss of precision. - -The "decimal point is present" logic itself is also sensitive to -locale. Under OCI_, this is controlled by the NLS_LANG -environment variable. Upon first connection, the dialect runs a -test to determine the current "decimal" character, which can be -a comma "," for European locales. From that point forward the -outputtypehandler uses that character to represent a decimal -point. Note that cx_oracle 5.0.3 or greater is required -when dealing with numerics with locale settings that don't use -a period "." as the decimal character. - -.. versionchanged:: 0.6.6 - The outputtypehandler supports the case where the locale uses a - comma "," character to represent a decimal point. - -.. _OCI: http://www.oracle.com/technetwork/database/features/oci/index.html +The ``coerce_to_decimal`` flag only impacts the results of plain string +SQL staements that are not otherwise associated with a :class:`.Numeric` +SQLAlchemy type (or a subclass of such). + +.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been + reworked to take advantage of newer cx_Oracle features as well + as better integration of outputtypehandlers. """ @@ -269,7 +186,6 @@ from .base import OracleCompiler, OracleDialect, OracleExecutionContext from . import base as oracle from ...engine import result as _result from sqlalchemy import types as sqltypes, util, exc, processors -from sqlalchemy import util import random import collections import decimal @@ -277,47 +193,99 @@ import re import time +class _OracleInteger(sqltypes.Integer): + def _cx_oracle_var(self, dialect, cursor): + cx_Oracle = dialect.dbapi + return cursor.var( + cx_Oracle.STRING, + 255, + arraysize=cursor.arraysize, + outconverter=int + ) + + def _cx_oracle_outputtypehandler(self, dialect): + def handler(cursor, name, + default_type, size, precision, scale): + return self._cx_oracle_var(dialect, cursor) + return handler + + class _OracleNumeric(sqltypes.Numeric): + is_number = False + def bind_processor(self, dialect): - # cx_oracle accepts Decimal objects and floats - return None + if self.scale == 0: + return None + elif self.asdecimal: + processor = processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale) + + def process(value): + if isinstance(value, (int, float)): + return processor(value) + else: + return value + return process + else: + return processors.to_float def result_processor(self, dialect, coltype): - # we apply a cx_oracle type handler to all connections - # that converts floating point strings to Decimal(). - # However, in some subquery situations, Oracle doesn't - # give us enough information to determine int or Decimal. - # It could even be int/Decimal differently on each row, - # regardless of the scale given for the originating type. - # So we still need an old school isinstance() handler - # here for decimals. - - if dialect.supports_native_decimal: - if self.asdecimal: - fstring = "%%.%df" % self._effective_decimal_return_scale - - def to_decimal(value): - if value is None: - return None - elif isinstance(value, decimal.Decimal): - return value - else: - return decimal.Decimal(fstring % value) + return None + + def _cx_oracle_outputtypehandler(self, dialect): + cx_Oracle = dialect.dbapi + + is_cx_oracle_6 = dialect._is_cx_oracle_6 + has_native_int = dialect._has_native_int - return to_decimal + def handler(cursor, name, default_type, size, precision, scale): + outconverter = None + if precision: + if self.asdecimal: + if is_cx_oracle_6: + type_ = decimal.Decimal + else: + type_ = cx_Oracle.STRING + outconverter = dialect._to_decimal + else: + if self.is_number and scale == 0: + if has_native_int: + type_ = cx_Oracle.NATIVE_INT + else: + type_ = cx_Oracle.NUMBER + outconverter = int + else: + type_ = cx_Oracle.NATIVE_FLOAT else: - if self.precision is None and self.scale is None: - return processors.to_float - elif not getattr(self, '_is_oracle_number', False) \ - and self.scale is not None: - return processors.to_float + if self.asdecimal: + if is_cx_oracle_6: + type_ = decimal.Decimal + else: + type_ = cx_Oracle.STRING + outconverter = dialect._to_decimal else: - return None - else: - # cx_oracle 4 behavior, will assume - # floats - return super(_OracleNumeric, self).\ - result_processor(dialect, coltype) + if self.is_number and scale == 0: + if has_native_int: + type_ = cx_Oracle.NATIVE_INT + else: + type_ = cx_Oracle.NUMBER + outconverter = int + else: + type_ = cx_Oracle.NATIVE_FLOAT + + return cursor.var( + type_, 255, + arraysize=cursor.arraysize, + outconverter=outconverter + ) + + return handler + + +class _OracleNUMBER(_OracleNumeric): + is_number = True + + class _OracleDate(sqltypes.Date): @@ -333,118 +301,59 @@ class _OracleDate(sqltypes.Date): return process -class _LOBMixin(object): - def result_processor(self, dialect, coltype): - if not dialect.auto_convert_lobs: - # return the cx_oracle.LOB directly. - return None - - def process(value): - if value is not None: - return value.read() - else: - return value - return process - - -class _NativeUnicodeMixin(object): - if util.py2k: - def bind_processor(self, dialect): - if dialect._cx_oracle_with_unicode: - def process(value): - if value is None: - return value - else: - return unicode(value) - return process - else: - return super( - _NativeUnicodeMixin, self).bind_processor(dialect) - - # we apply a connection output handler that returns - # unicode in all cases, so the "native_unicode" flag - # will be set for the default String.result_processor. - - -class _OracleChar(_NativeUnicodeMixin, sqltypes.CHAR): +class _OracleChar(sqltypes.CHAR): def get_dbapi_type(self, dbapi): return dbapi.FIXED_CHAR -class _OracleNVarChar(_NativeUnicodeMixin, sqltypes.NVARCHAR): +class _OracleNVarChar(sqltypes.NVARCHAR): def get_dbapi_type(self, dbapi): return getattr(dbapi, 'UNICODE', dbapi.STRING) -class _OracleText(_LOBMixin, sqltypes.Text): +class _OracleText(sqltypes.Text): def get_dbapi_type(self, dbapi): return dbapi.CLOB class _OracleLong(oracle.LONG): - # a raw LONG is a text type, but does *not* - # get the LobMixin with cx_oracle. - def get_dbapi_type(self, dbapi): return dbapi.LONG_STRING -class _OracleString(_NativeUnicodeMixin, sqltypes.String): +class _OracleString(sqltypes.String): pass -class _OracleEnum(_NativeUnicodeMixin, sqltypes.Enum): + +class _OracleEnum(sqltypes.Enum): def bind_processor(self, dialect): enum_proc = sqltypes.Enum.bind_processor(self, dialect) - if util.py2k: - unicode_proc = _NativeUnicodeMixin.bind_processor(self, dialect) - else: - unicode_proc = None def process(value): raw_str = enum_proc(value) - if unicode_proc: - raw_str = unicode_proc(raw_str) return raw_str return process -class _OracleUnicodeText( - _LOBMixin, _NativeUnicodeMixin, sqltypes.UnicodeText): +class _OracleUnicodeText(sqltypes.UnicodeText): def get_dbapi_type(self, dbapi): return dbapi.NCLOB - def result_processor(self, dialect, coltype): - lob_processor = _LOBMixin.result_processor(self, dialect, coltype) - if lob_processor is None: - return None - - string_processor = sqltypes.UnicodeText.result_processor( - self, dialect, coltype) - - if string_processor is None: - return lob_processor - else: - def process(value): - return string_processor(lob_processor(value)) - return process - - -class _OracleInteger(sqltypes.Integer): - def result_processor(self, dialect, coltype): - def to_int(val): - if val is not None: - val = int(val) - return val - return to_int - -class _OracleBinary(_LOBMixin, sqltypes.LargeBinary): +class _OracleBinary(sqltypes.LargeBinary): def get_dbapi_type(self, dbapi): return dbapi.BLOB def bind_processor(self, dialect): return None + def result_processor(self, dialect, coltype): + if not dialect.auto_convert_lobs: + return None + else: + return super(_OracleBinary, self).result_processor( + dialect, coltype) + class _OracleInterval(oracle.INTERVAL): def get_dbapi_type(self, dbapi): @@ -461,6 +370,8 @@ class _OracleRowid(oracle.ROWID): class OracleCompiler_cx_oracle(OracleCompiler): + _oracle_cx_sql_compiler = True + def bindparam_string(self, name, **kw): quote = getattr(name, 'quote', None) if quote is True or quote is not False and \ @@ -473,57 +384,92 @@ class OracleCompiler_cx_oracle(OracleCompiler): class OracleExecutionContext_cx_oracle(OracleExecutionContext): + out_parameters = None - def pre_exec(self): - quoted_bind_names = \ - getattr(self.compiled, '_quoted_bind_names', None) + def _setup_quoted_bind_names(self): + quoted_bind_names = self.compiled._quoted_bind_names if quoted_bind_names: - if not self.dialect.supports_unicode_statements: - # if DBAPI doesn't accept unicode statements, - # keys in self.parameters would have been encoded - # here. so convert names in quoted_bind_names - # to encoded as well. - quoted_bind_names = \ - dict( - (fromname.encode(self.dialect.encoding), - toname.encode(self.dialect.encoding)) - for fromname, toname in - quoted_bind_names.items() - ) for param in self.parameters: for fromname, toname in quoted_bind_names.items(): param[toname] = param[fromname] del param[fromname] - if self.dialect.auto_setinputsizes: - # cx_oracle really has issues when you setinputsizes - # on String, including that outparams/RETURNING - # breaks for varchars - self.set_input_sizes( - quoted_bind_names, - exclude_types=self.dialect.exclude_setinputsizes - ) - + def _handle_out_parameters(self): # if a single execute, check for outparams if len(self.compiled_parameters) == 1: + quoted_bind_names = self.compiled._quoted_bind_names for bindparam in self.compiled.binds.values(): if bindparam.isoutparam: - dbtype = bindparam.type.dialect_impl(self.dialect).\ - get_dbapi_type(self.dialect.dbapi) - if not hasattr(self, 'out_parameters'): - self.out_parameters = {} - if dbtype is None: - raise exc.InvalidRequestError( - "Cannot create out parameter for parameter " - "%r - its type %r is not supported by" - " cx_oracle" % - (bindparam.key, bindparam.type) - ) name = self.compiled.bind_names[bindparam] - self.out_parameters[name] = self.cursor.var(dbtype) + type_impl = bindparam.type.dialect_impl(self.dialect) + if hasattr(type_impl, '_cx_oracle_var'): + self.out_parameters[name] = type_impl._cx_oracle_var( + self.dialect, self.cursor) + else: + dbtype = type_impl.get_dbapi_type(self.dialect.dbapi) + if dbtype is None: + raise exc.InvalidRequestError( + "Cannot create out parameter for parameter " + "%r - its type %r is not supported by" + " cx_oracle" % + (bindparam.key, bindparam.type) + ) + self.out_parameters[name] = self.cursor.var(dbtype) self.parameters[0][quoted_bind_names.get(name, name)] = \ self.out_parameters[name] + def _generate_cursor_outputtype_handler(self): + output_handlers = {} + + for (keyname, name, objects, type_) in self.compiled._result_columns: + handler = type_._cached_custom_processor( + self.dialect, + 'cx_oracle_outputtypehandler', + self._get_cx_oracle_type_handler) + + if handler: + denormalized_name = self.dialect.denormalize_name(keyname) + output_handlers[denormalized_name] = handler + + if output_handlers: + default_handler = self._dbapi_connection.outputtypehandler + + def output_type_handler(cursor, name, default_type, + size, precision, scale): + if name in output_handlers: + return output_handlers[name]( + cursor, name, + default_type, size, precision, scale) + else: + return default_handler( + cursor, name, default_type, size, precision, scale + ) + self.cursor.outputtypehandler = output_type_handler + + def _get_cx_oracle_type_handler(self, impl): + if hasattr(impl, "_cx_oracle_outputtypehandler"): + return impl._cx_oracle_outputtypehandler(self.dialect) + else: + return None + + def pre_exec(self): + if not getattr(self.compiled, "_oracle_cx_sql_compiler", False): + return + + self.out_parameters = {} + + if self.compiled._quoted_bind_names: + self._setup_quoted_bind_names() + + self.set_input_sizes( + self.compiled._quoted_bind_names, + include_types=self.dialect._include_setinputsizes + ) + + self._handle_out_parameters() + + self._generate_cursor_outputtype_handler() + def create_cursor(self): c = self._dbapi_connection.cursor() if self.dialect.arraysize: @@ -532,24 +478,16 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): return c def get_result_proxy(self): - if hasattr(self, 'out_parameters') and self.compiled.returning: - returning_params = dict( - (k, v.getvalue()) - for k, v in self.out_parameters.items() - ) + if self.out_parameters and self.compiled.returning: + returning_params = [ + self.out_parameters["ret_%d" % i].getvalue() + for i in range(len(self.out_parameters)) + ] return ReturningResultProxy(self, returning_params) - result = None - if self.cursor.description is not None: - for column in self.cursor.description: - type_code = column[1] - if type_code in self.dialect._cx_oracle_binary_types: - result = _result.BufferedColumnResultProxy(self) + result = _result.ResultProxy(self) - if result is None: - result = _result.ResultProxy(self) - - if hasattr(self, 'out_parameters'): + if self.out_parameters: if self.compiled_parameters is not None and \ len(self.compiled_parameters) == 1: result.out_parameters = out_parameters = {} @@ -579,30 +517,6 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): return result -class OracleExecutionContext_cx_oracle_with_unicode( - OracleExecutionContext_cx_oracle): - """Support WITH_UNICODE in Python 2.xx. - - WITH_UNICODE allows cx_Oracle's Python 3 unicode handling - behavior under Python 2.x. This mode in some cases disallows - and in other cases silently passes corrupted data when - non-Python-unicode strings (a.k.a. plain old Python strings) - are passed as arguments to connect(), the statement sent to execute(), - or any of the bind parameter keys or values sent to execute(). - This optional context therefore ensures that all statements are - passed as Python unicode objects. - - """ - - def __init__(self, *arg, **kw): - OracleExecutionContext_cx_oracle.__init__(self, *arg, **kw) - self.statement = util.text_type(self.statement) - - def _execute_scalar(self, stmt, type_): - return super(OracleExecutionContext_cx_oracle_with_unicode, self).\ - _execute_scalar(util.text_type(stmt), type_) - - class ReturningResultProxy(_result.FullyBufferedResultProxy): """Result proxy which stuffs the _returning clause + outparams into the fetch.""" @@ -614,14 +528,13 @@ class ReturningResultProxy(_result.FullyBufferedResultProxy): def _cursor_description(self): returning = self.context.compiled.returning return [ - ("ret_%d" % i, None) - for i, col in enumerate(returning) + (getattr(col, 'name', col.anon_label), None) + for col in returning ] def _buffer_rows(self): return collections.deque( - [tuple(self._returning_params["ret_%d" % i] - for i, c in enumerate(self._returning_params))] + [tuple(self._returning_params)] ) @@ -632,11 +545,17 @@ class OracleDialect_cx_oracle(OracleDialect): supports_sane_rowcount = True supports_sane_multi_rowcount = True + supports_unicode_statements = True + supports_unicode_binds = True + driver = "cx_oracle" colspecs = colspecs = { sqltypes.Numeric: _OracleNumeric, - # generic type, assume datetime.date is desired + sqltypes.Float: _OracleNumeric, + sqltypes.Integer: _OracleInteger, + oracle.NUMBER: _OracleNUMBER, + sqltypes.Date: _OracleDate, sqltypes.LargeBinary: _OracleBinary, sqltypes.Boolean: oracle._OracleBoolean, @@ -648,14 +567,7 @@ class OracleDialect_cx_oracle(OracleDialect): sqltypes.CHAR: _OracleChar, sqltypes.Enum: _OracleEnum, - # a raw LONG is a text type, but does *not* - # get the LobMixin with cx_oracle. oracle.LONG: _OracleLong, - - # this is only needed for OUT parameters. - # it would be nice if we could not use it otherwise. - sqltypes.Integer: _OracleInteger, - oracle.RAW: _OracleRaw, sqltypes.Unicode: _OracleNVarChar, sqltypes.NVARCHAR: _OracleNVarChar, @@ -665,106 +577,58 @@ class OracleDialect_cx_oracle(OracleDialect): execute_sequence_format = list def __init__(self, - auto_setinputsizes=True, - exclude_setinputsizes=("STRING", "UNICODE"), auto_convert_lobs=True, threaded=True, - allow_twophase=True, - coerce_to_decimal=True, coerce_to_unicode=False, - arraysize=50, _retry_on_12516=False, + coerce_to_decimal=True, + arraysize=50, **kwargs): + + self._pop_deprecated_kwargs(kwargs) + OracleDialect.__init__(self, **kwargs) self.threaded = threaded self.arraysize = arraysize - self.allow_twophase = allow_twophase - self.supports_timestamp = self.dbapi is None or \ - hasattr(self.dbapi, 'TIMESTAMP') - self.auto_setinputsizes = auto_setinputsizes self.auto_convert_lobs = auto_convert_lobs - self._retry_on_12516 = _retry_on_12516 + self.coerce_to_unicode = coerce_to_unicode + self.coerce_to_decimal = coerce_to_decimal - if hasattr(self.dbapi, 'version'): - self.cx_oracle_ver = self._parse_cx_oracle_ver(self.dbapi.version) + cx_Oracle = self.dbapi - else: + if cx_Oracle is None: + self._include_setinputsizes = {} self.cx_oracle_ver = (0, 0, 0) - - def types(*names): - return set( - getattr(self.dbapi, name, None) for name in names - ).difference([None]) - - self.exclude_setinputsizes = types(*(exclude_setinputsizes or ())) - self._cx_oracle_string_types = types("STRING", "UNICODE", - "NCLOB", "CLOB") - self._cx_oracle_unicode_types = types("UNICODE", "NCLOB") - self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB") - self.supports_unicode_binds = self.cx_oracle_ver >= (5, 0) - - self._enable_twophase = self.cx_oracle_ver < (6, 0) - - self.supports_sane_multi_rowcount = self.cx_oracle_ver >= (5, 0) - - self.coerce_to_unicode = ( - self.cx_oracle_ver >= (5, 0) and - coerce_to_unicode - ) - - self.supports_native_decimal = ( - self.cx_oracle_ver >= (5, 0) and - coerce_to_decimal - ) - - self._cx_oracle_native_nvarchar = self.cx_oracle_ver >= (5, 0) - - if self.cx_oracle_ver is None: - # this occurs in tests with mock DBAPIs - self._cx_oracle_string_types = set() - self._cx_oracle_with_unicode = False - elif util.py3k or ( - self.cx_oracle_ver >= (5,) and - self.cx_oracle_ver < (5, 1) and not - hasattr(self.dbapi, 'UNICODE') - ): - # cx_Oracle WITH_UNICODE mode. *only* python - # unicode objects accepted for anything. This - # mode of operation is implicit for Python 3, - # however under Python 2 it existed as a never-used build-time - # option for cx_Oracle 5.0 only and was removed in 5.1. - self.supports_unicode_statements = True - self.supports_unicode_binds = True - self._cx_oracle_with_unicode = True - - if util.py2k: - # There's really no reason to run with WITH_UNICODE under - # Python 2.x. Give the user a hint. - util.warn( - "cx_Oracle is compiled under Python 2.xx using the " - "WITH_UNICODE flag. Consider recompiling cx_Oracle " - "without this flag, which is in no way necessary for " - "full support of Unicode and causes significant " - "performance issues.") - self.execution_ctx_cls = \ - OracleExecutionContext_cx_oracle_with_unicode else: - self._cx_oracle_with_unicode = False + self.cx_oracle_ver = self._parse_cx_oracle_ver(cx_Oracle.version) + if self.cx_oracle_ver < (5, 0) and self.cx_oracle_ver > (0, 0, 0): + raise exc.InvalidRequestError( + "cx_Oracle version 5.0 and above are supported") - if self.cx_oracle_ver is None or \ - not self.auto_convert_lobs or \ - not hasattr(self.dbapi, 'CLOB'): - self.dbapi_type_map = {} - else: - # only use this for LOB objects. using it for strings, dates - # etc. leads to a little too much magic, reflection doesn't know - # if it should expect encoded strings or unicodes, etc. - self.dbapi_type_map = { - self.dbapi.CLOB: oracle.CLOB(), - self.dbapi.NCLOB: oracle.NCLOB(), - self.dbapi.BLOB: oracle.BLOB(), - self.dbapi.BINARY: oracle.RAW(), + self._has_native_int = hasattr(cx_Oracle, "NATIVE_INT") + + self._include_setinputsizes = { + cx_Oracle.NCLOB, cx_Oracle.CLOB, cx_Oracle.LOB, + cx_Oracle.BLOB, cx_Oracle.FIXED_CHAR, } + self._is_cx_oracle_6 = self.cx_oracle_ver >= (6, ) + + def _pop_deprecated_kwargs(self, kwargs): + auto_setinputsizes = kwargs.pop('auto_setinputsizes', None) + exclude_setinputsizes = kwargs.pop('exclude_setinputsizes', None) + if auto_setinputsizes or exclude_setinputsizes: + util.warn_deprecated( + "auto_setinputsizes and exclude_setinputsizes are deprecated. " + "Modern cx_Oracle only requires that LOB types are part " + "of this behavior, and these parameters no longer have any " + "effect.") + allow_twophase = kwargs.pop('allow_twophase', None) + if allow_twophase is not None: + util.warn.deprecated( + "allow_twophase is deprecated. The cx_Oracle dialect no " + "longer supports two-phase transaction mode." + ) + def _parse_cx_oracle_ver(self, version): m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?', version) if m: @@ -780,115 +644,94 @@ class OracleDialect_cx_oracle(OracleDialect): import cx_Oracle return cx_Oracle - def connect(self, *cargs, **cparams): - if self._retry_on_12516: - # emergency flag for the SQLAlchemy test suite, which has - # decreased in stability since cx_oracle 5.3; generalized - # "retry on connect" functionality is part of an upcoming - # SQLAlchemy feature - try: - return self.dbapi.connect(*cargs, **cparams) - except self.dbapi.DatabaseError as err: - if "ORA-12516" in str(err): - time.sleep(2) - return self.dbapi.connect(*cargs, **cparams) - else: - raise - else: - return super(OracleDialect_cx_oracle, self).connect( - *cargs, **cparams) - def initialize(self, connection): super(OracleDialect_cx_oracle, self).initialize(connection) if self._is_oracle_8: self.supports_unicode_binds = False + self._detect_decimal_char(connection) def _detect_decimal_char(self, connection): - """detect if the decimal separator character is not '.', as - is the case with European locale settings for NLS_LANG. - - cx_oracle itself uses similar logic when it formats Python - Decimal objects to strings on the bind side (as of 5.0.3), - as Oracle sends/receives string numerics only in the - current locale. - - """ - if self.cx_oracle_ver < (5,): - # no output type handlers before version 5 - return - - cx_Oracle = self.dbapi - conn = connection.connection - - # override the output_type_handler that's - # on the cx_oracle connection with a plain - # one on the cursor - - def output_type_handler(cursor, name, defaultType, - size, precision, scale): - return cursor.var( - cx_Oracle.STRING, - 255, arraysize=cursor.arraysize) - - cursor = conn.cursor() - cursor.outputtypehandler = output_type_handler - cursor.execute("SELECT 0.1 FROM DUAL") - val = cursor.fetchone()[0] - cursor.close() - char = re.match(r"([\.,])", val).group(1) - if char != '.': + # we have the option to change this setting upon connect, + # or just look at what it is upon connect and convert. + # to minimize the chance of interference with changes to + # NLS_TERRITORY or formatting behavior of the DB, we opt + # to just look at it + + self._decimal_char = connection.scalar( + "select value from nls_session_parameters " + "where parameter = 'NLS_NUMERIC_CHARACTERS'")[0] + if self._decimal_char != '.': _detect_decimal = self._detect_decimal - self._detect_decimal = \ - lambda value: _detect_decimal(value.replace(char, '.')) - self._to_decimal = \ - lambda value: decimal.Decimal(value.replace(char, '.')) + _to_decimal = self._to_decimal + + self._detect_decimal = lambda value: _detect_decimal( + value.replace(self._decimal_char, ".")) + self._to_decimal = lambda value: _to_decimal( + value.replace(self._decimal_char, ".")) def _detect_decimal(self, value): if "." in value: - return decimal.Decimal(value) + return self._to_decimal(value) else: return int(value) _to_decimal = decimal.Decimal - def on_connect(self): - if self.cx_oracle_ver < (5,): - # no output type handlers before version 5 - return + def _generate_connection_outputtype_handler(self): + """establish the default outputtypehandler established at the + connection level. - cx_Oracle = self.dbapi + """ - def output_type_handler(cursor, name, defaultType, + dialect = self + cx_Oracle = dialect.dbapi + + number_handler = _OracleNUMBER(asdecimal=True).\ + _cx_oracle_outputtypehandler(dialect) + float_handler = _OracleNUMBER(asdecimal=False).\ + _cx_oracle_outputtypehandler(dialect) + + def output_type_handler(cursor, name, default_type, size, precision, scale): - # convert all NUMBER with precision + positive scale to Decimal - # this almost allows "native decimal" mode. + if default_type == cx_Oracle.NUMBER: + if not dialect.coerce_to_decimal: + return None + elif precision == 0 and scale in (0, -127): + # ambiguous type, this occurs when selecting + # numbers from deep subqueries + return cursor.var( + cx_Oracle.STRING, + 255, + outconverter=dialect._detect_decimal, + arraysize=cursor.arraysize) + elif precision and scale > 0: + return number_handler( + cursor, name, default_type, size, precision, scale + ) + else: + return float_handler( + cursor, name, default_type, size, precision, scale + ) - if self.supports_native_decimal and \ - defaultType == cx_Oracle.NUMBER and \ - precision and scale > 0: + # allow all strings to come back natively as Unicode + elif dialect.coerce_to_unicode and \ + default_type in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR): return cursor.var( - cx_Oracle.STRING, - 255, - outconverter=self._to_decimal, - arraysize=cursor.arraysize) - # if NUMBER with zero precision and 0 or neg scale, this appears - # to indicate "ambiguous". Use a slower converter that will - # make a decision based on each value received - the type - # may change from row to row (!). This kills - # off "native decimal" mode, handlers still needed. - elif self.supports_native_decimal and \ - defaultType == cx_Oracle.NUMBER \ - and not precision and scale <= 0: + util.text_type, size, cursor.arraysize + ) + elif dialect.auto_convert_lobs and default_type in ( + cx_Oracle.CLOB, cx_Oracle.NCLOB, cx_Oracle.BLOB + ): return cursor.var( - cx_Oracle.STRING, - 255, - outconverter=self._detect_decimal, - arraysize=cursor.arraysize) - # allow all strings to come back natively as Unicode - elif self.coerce_to_unicode and \ - defaultType in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR): - return cursor.var(util.text_type, size, cursor.arraysize) + default_type, size, cursor.arraysize, + outconverter=lambda value: value.read() + ) + return output_type_handler + + def on_connect(self): + + output_type_handler = self._generate_connection_outputtype_handler() def on_connect(conn): conn.outputtypehandler = output_type_handler @@ -932,9 +775,6 @@ class OracleDialect_cx_oracle(OracleDialect): threaded=self.threaded, ) - if self._enable_twophase: - opts['twophase'] = self.allow_twophase - if dsn is not None: opts['dsn'] = dsn if url.password is not None: @@ -942,16 +782,6 @@ class OracleDialect_cx_oracle(OracleDialect): if url.username is not None: opts['user'] = url.username - if util.py2k: - if self._cx_oracle_with_unicode: - for k, v in opts.items(): - if isinstance(v, str): - opts[k] = unicode(v) - else: - for k, v in opts.items(): - if isinstance(v, unicode): - opts[k] = str(v) - if 'mode' in url.query: opts['mode'] = url.query['mode'] if isinstance(opts['mode'], util.string_types): diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 8b72c0001..227ff0845 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -108,7 +108,6 @@ class DefaultDialect(interfaces.Dialect): supports_sane_rowcount = True supports_sane_multi_rowcount = True - dbapi_type_map = {} colspecs = {} default_paramstyle = 'named' supports_default_values = False @@ -1112,7 +1111,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return (self.isinsert or self.isupdate) and \ bool(self.compiled.postfetch) - def set_input_sizes(self, translate=None, exclude_types=None): + def set_input_sizes( + self, translate=None, include_types=None, exclude_types=None): """Given a cursor and ClauseParameters, call the appropriate style of ``setinputsizes()`` on the cursor, using DB-API types from the bind parameter's ``TypeEngine`` objects. @@ -1136,7 +1136,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext): dbtype = typeengine.dialect_impl(self.dialect).\ get_dbapi_type(self.dialect.dbapi) if dbtype is not None and \ - (not exclude_types or dbtype not in exclude_types): + (not exclude_types or dbtype not in exclude_types) and \ + (not include_types or dbtype in include_types): if key in self._expanded_parameters: inputsizes.extend( [dbtype] * len(self._expanded_parameters[key])) @@ -1154,7 +1155,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext): dbtype = typeengine.dialect_impl(self.dialect).\ get_dbapi_type(self.dialect.dbapi) if dbtype is not None and \ - (not exclude_types or dbtype not in exclude_types): + (not exclude_types or dbtype not in exclude_types) and \ + (not include_types or dbtype in include_types): if translate: # TODO: this part won't work w/ the # expanded_parameters feature, e.g. for cx_oracle diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 3e09e4971..518038d29 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -110,16 +110,6 @@ class Dialect(object): the "implicit" functionality is not used and inserted_primary_key will not be available. - dbapi_type_map - A mapping of DB-API type objects present in this Dialect's - DB-API implementation mapped to TypeEngine implementations used - by the dialect. - - This is used to apply types to result sets based on the DB-API - types present in cursor.description; it only takes effect for - result sets against textual statements where no explicit - typemap was present. - colspecs A dictionary of TypeEngine classes from sqlalchemy.types mapped to subclasses that are specific to the dialect class. This diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 907dc7bd2..3aae932f2 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -447,7 +447,6 @@ class ResultMetaData(object): def _merge_textual_cols_by_position( self, context, cursor_description, result_columns): dialect = context.dialect - typemap = dialect.dbapi_type_map num_ctx_cols = len(result_columns) if result_columns else None if num_ctx_cols > len(cursor_description): @@ -470,14 +469,13 @@ class ResultMetaData(object): "in textual SQL: %r" % obj[0]) seen.add(obj[0]) else: - mapped_type = typemap.get(coltype, sqltypes.NULLTYPE) + mapped_type = sqltypes.NULLTYPE obj = None yield idx, colname, mapped_type, coltype, obj, untranslated def _merge_cols_by_name(self, context, cursor_description, result_columns): dialect = context.dialect - typemap = dialect.dbapi_type_map case_sensitive = dialect.case_sensitive result_map = self._create_result_map(result_columns, case_sensitive) @@ -487,7 +485,7 @@ class ResultMetaData(object): try: ctx_rec = result_map[colname] except KeyError: - mapped_type = typemap.get(coltype, sqltypes.NULLTYPE) + mapped_type = sqltypes.NULLTYPE obj = None else: obj = ctx_rec[1] @@ -496,11 +494,9 @@ class ResultMetaData(object): def _merge_cols_by_none(self, context, cursor_description): dialect = context.dialect - typemap = dialect.dbapi_type_map for idx, colname, untranslated, coltype in \ self._colnames_from_description(context, cursor_description): - mapped_type = typemap.get(coltype, sqltypes.NULLTYPE) - yield idx, colname, mapped_type, coltype, None, untranslated + yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated @classmethod def _create_result_map(cls, result_columns, case_sensitive=True): @@ -1385,8 +1381,10 @@ class BufferedColumnResultProxy(ResultProxy): fetchone() is called. If fetchmany() or fetchall() are called, the full grid of results is fetched. This is to operate with databases where result rows contain "live" results that fall out - of scope unless explicitly fetched. Currently this includes - cx_Oracle LOB objects. + of scope unless explicitly fetched. + + .. versionchanged:: 1.2 This :class:`.ResultProxy` is not used by + any SQLAlchemy-included dialects. """ diff --git a/lib/sqlalchemy/sql/type_api.py b/lib/sqlalchemy/sql/type_api.py index 69dd80938..89c2c9de5 100644 --- a/lib/sqlalchemy/sql/type_api.py +++ b/lib/sqlalchemy/sql/type_api.py @@ -477,6 +477,15 @@ class TypeEngine(Visitable): d[coltype] = rp = d['impl'].result_processor(dialect, coltype) return rp + def _cached_custom_processor(self, dialect, key, fn): + try: + return dialect._type_memos[self][key] + except KeyError: + d = self._dialect_info(dialect) + impl = d['impl'] + d[key] = result = fn(impl) + return result + def _dialect_info(self, dialect): """Return a dialect-specific registry which caches a dialect-specific implementation, bind processing diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 08d0f0aac..a51705001 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -295,7 +295,6 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): msg, util.text_type(e), re.UNICODE), "%r !~ %s" % (msg, e) print(util.text_type(e).encode('utf-8')) - class AssertsCompiledSQL(object): def assert_compile(self, clause, result, params=None, checkparams=None, dialect=None, diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 17ddbb567..687f84b18 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -284,7 +284,7 @@ def _oracle_drop_db(cfg, eng, ident): @_update_db_opts.for_db("oracle") def _oracle_update_db_opts(db_url, db_opts): - db_opts['_retry_on_12516'] = True + pass def reap_dbs(idents_file): diff --git a/lib/sqlalchemy/testing/suite/test_types.py b/lib/sqlalchemy/testing/suite/test_types.py index 022e7b92d..b9bb179ab 100644 --- a/lib/sqlalchemy/testing/suite/test_types.py +++ b/lib/sqlalchemy/testing/suite/test_types.py @@ -511,8 +511,7 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase): numbers ) - @testing.skip_if( - "oracle", "temporary skip until cx_oracle refactor is merged") + @testing.fails_if(testing.requires.broken_cx_oracle6_numerics) @testing.requires.precision_numerics_enotation_large def test_enotation_decimal_large(self): """test exceedingly large decimals. @@ -9,7 +9,7 @@ first-package-wins = true where = test [tool:pytest] -addopts= --tb native -v -r fxX --maxfail=25 -p no:warnings +addopts= --tb native -v -r sfxX --maxfail=25 -p no:warnings python_files=test/*test_*.py [upload] diff --git a/test/aaa_profiling/test_resultset.py b/test/aaa_profiling/test_resultset.py index 9ffa21cb6..2b0e8de9e 100644 --- a/test/aaa_profiling/test_resultset.py +++ b/test/aaa_profiling/test_resultset.py @@ -51,6 +51,20 @@ class ResultSetTest(fixtures.TestBase, AssertsExecutionResults): def test_unicode(self): [tuple(row) for row in t2.select().execute().fetchall()] + @profiling.function_call_count() + def test_raw_string(self): + stmt = 'SELECT %s FROM "table"' % ( + ", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)) + ) + [tuple(row) for row in testing.db.execute(stmt).fetchall()] + + @profiling.function_call_count() + def test_raw_unicode(self): + stmt = "SELECT %s FROM table2" % ( + ", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)) + ) + [tuple(row) for row in testing.db.execute(stmt).fetchall()] + def test_contains_doesnt_compile(self): row = t.select().execute().first() c1 = Column('some column', Integer) + \ diff --git a/test/dialect/oracle/__init__.py b/test/dialect/oracle/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/test/dialect/oracle/__init__.py diff --git a/test/dialect/oracle/test_compiler.py b/test/dialect/oracle/test_compiler.py new file mode 100644 index 000000000..305359085 --- /dev/null +++ b/test/dialect/oracle/test_compiler.py @@ -0,0 +1,792 @@ +# coding: utf-8 + + +from sqlalchemy.testing import eq_ +from sqlalchemy import types as sqltypes, exc, schema +from sqlalchemy.sql import table, column +from sqlalchemy.testing import (fixtures, + AssertsExecutionResults, + AssertsCompiledSQL) +from sqlalchemy import testing +from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ + Index, MetaData, select, inspect, ForeignKey, String, func, \ + TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ + literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ + ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ + union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ + PrimaryKeyConstraint, FLOAT +from sqlalchemy.util import u, b +from sqlalchemy import util +from sqlalchemy.testing import assert_raises, assert_raises_message +from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.dialects.oracle import cx_oracle, base as oracle +from sqlalchemy.engine import default +import decimal +from sqlalchemy.engine import url +from sqlalchemy.testing.schema import Table, Column +import datetime +import os +from sqlalchemy import sql +from sqlalchemy.testing.mock import Mock + + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "oracle" + + def test_true_false(self): + self.assert_compile( + sql.false(), "0" + ) + self.assert_compile( + sql.true(), + "1" + ) + + def test_owner(self): + meta = MetaData() + parent = Table('parent', meta, Column('id', Integer, + primary_key=True), Column('name', String(50)), + schema='ed') + child = Table('child', meta, Column('id', Integer, + primary_key=True), Column('parent_id', Integer, + ForeignKey('ed.parent.id')), schema='ed') + self.assert_compile(parent.join(child), + 'ed.parent JOIN ed.child ON ed.parent.id = ' + 'ed.child.parent_id') + + def test_subquery(self): + t = table('sometable', column('col1'), column('col2')) + s = select([t]) + s = select([s.c.col1, s.c.col2]) + + self.assert_compile(s, "SELECT col1, col2 FROM (SELECT " + "sometable.col1 AS col1, sometable.col2 " + "AS col2 FROM sometable)") + + def test_bindparam_quote(self): + """test that bound parameters take on quoting for reserved words, + column names quote flag enabled.""" + # note: this is only in cx_oracle at the moment. not sure + # what other hypothetical oracle dialects might need + + self.assert_compile( + bindparam("option"), ':"option"' + ) + self.assert_compile( + bindparam("plain"), ':plain' + ) + t = Table("s", MetaData(), Column('plain', Integer, quote=True)) + self.assert_compile( + t.insert().values(plain=5), + 'INSERT INTO s ("plain") VALUES (:"plain")' + ) + self.assert_compile( + t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"' + ) + + def test_cte(self): + part = table( + 'part', + column('part'), + column('sub_part'), + column('quantity') + ) + + included_parts = select([ + part.c.sub_part, part.c.part, part.c.quantity + ]).where(part.c.part == "p1").\ + cte(name="included_parts", recursive=True).\ + suffix_with( + "search depth first by part set ord1", + "cycle part set y_cycle to 1 default 0", dialect='oracle') + + incl_alias = included_parts.alias("pr1") + parts_alias = part.alias("p") + included_parts = included_parts.union_all( + select([ + parts_alias.c.sub_part, + parts_alias.c.part, parts_alias.c.quantity + ]).where(parts_alias.c.part == incl_alias.c.sub_part) + ) + + q = select([ + included_parts.c.sub_part, + func.sum(included_parts.c.quantity).label('total_quantity')]).\ + group_by(included_parts.c.sub_part) + + self.assert_compile( + q, + "WITH included_parts(sub_part, part, quantity) AS " + "(SELECT part.sub_part AS sub_part, part.part AS part, " + "part.quantity AS quantity FROM part WHERE part.part = :part_1 " + "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, " + "p.quantity AS quantity FROM part p, included_parts pr1 " + "WHERE p.part = pr1.sub_part) " + "search depth first by part set ord1 cycle part set " + "y_cycle to 1 default 0 " + "SELECT included_parts.sub_part, sum(included_parts.quantity) " + "AS total_quantity FROM included_parts " + "GROUP BY included_parts.sub_part" + ) + + def test_limit(self): + t = table('sometable', column('col1'), column('col2')) + s = select([t]) + c = s.compile(dialect=oracle.OracleDialect()) + assert t.c.col1 in set(c._create_result_map()['col1'][1]) + s = select([t]).limit(10).offset(20) + self.assert_compile(s, + 'SELECT col1, col2 FROM (SELECT col1, ' + 'col2, ROWNUM AS ora_rn FROM (SELECT ' + 'sometable.col1 AS col1, sometable.col2 AS ' + 'col2 FROM sometable) WHERE ROWNUM <= ' + ':param_1 + :param_2) WHERE ora_rn > :param_2', + checkparams={'param_1': 10, 'param_2': 20}) + + c = s.compile(dialect=oracle.OracleDialect()) + eq_(len(c._result_columns), 2) + assert t.c.col1 in set(c._create_result_map()['col1'][1]) + + s2 = select([s.c.col1, s.c.col2]) + self.assert_compile(s2, + 'SELECT col1, col2 FROM (SELECT col1, col2 ' + 'FROM (SELECT col1, col2, ROWNUM AS ora_rn ' + 'FROM (SELECT sometable.col1 AS col1, ' + 'sometable.col2 AS col2 FROM sometable) ' + 'WHERE ROWNUM <= :param_1 + :param_2) ' + 'WHERE ora_rn > :param_2)', + checkparams={'param_1': 10, 'param_2': 20}) + + self.assert_compile(s2, + 'SELECT col1, col2 FROM (SELECT col1, col2 ' + 'FROM (SELECT col1, col2, ROWNUM AS ora_rn ' + 'FROM (SELECT sometable.col1 AS col1, ' + 'sometable.col2 AS col2 FROM sometable) ' + 'WHERE ROWNUM <= :param_1 + :param_2) ' + 'WHERE ora_rn > :param_2)') + c = s2.compile(dialect=oracle.OracleDialect()) + eq_(len(c._result_columns), 2) + assert s.c.col1 in set(c._create_result_map()['col1'][1]) + + s = select([t]).limit(10).offset(20).order_by(t.c.col2) + self.assert_compile(s, + 'SELECT col1, col2 FROM (SELECT col1, ' + 'col2, ROWNUM AS ora_rn FROM (SELECT ' + 'sometable.col1 AS col1, sometable.col2 AS ' + 'col2 FROM sometable ORDER BY ' + 'sometable.col2) WHERE ROWNUM <= ' + ':param_1 + :param_2) WHERE ora_rn > :param_2', + checkparams={'param_1': 10, 'param_2': 20} + ) + c = s.compile(dialect=oracle.OracleDialect()) + eq_(len(c._result_columns), 2) + assert t.c.col1 in set(c._create_result_map()['col1'][1]) + + s = select([t], for_update=True).limit(10).order_by(t.c.col2) + self.assert_compile(s, + 'SELECT col1, col2 FROM (SELECT ' + 'sometable.col1 AS col1, sometable.col2 AS ' + 'col2 FROM sometable ORDER BY ' + 'sometable.col2) WHERE ROWNUM <= :param_1 ' + 'FOR UPDATE') + + s = select([t], + for_update=True).limit(10).offset(20).order_by(t.c.col2) + self.assert_compile(s, + 'SELECT col1, col2 FROM (SELECT col1, ' + 'col2, ROWNUM AS ora_rn FROM (SELECT ' + 'sometable.col1 AS col1, sometable.col2 AS ' + 'col2 FROM sometable ORDER BY ' + 'sometable.col2) WHERE ROWNUM <= ' + ':param_1 + :param_2) WHERE ora_rn > :param_2 FOR ' + 'UPDATE') + + def test_for_update(self): + table1 = table('mytable', + column('myid'), column('name'), column('description')) + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + self.assert_compile( + table1 + .select(table1.c.myid == 7) + .with_for_update(of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 " + "FOR UPDATE OF mytable.myid") + + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(nowait=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT") + + self.assert_compile( + table1 + .select(table1.c.myid == 7) + .with_for_update(nowait=True, of=table1.c.myid), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 " + "FOR UPDATE OF mytable.myid NOWAIT") + + self.assert_compile( + table1 + .select(table1.c.myid == 7) + .with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " + "mytable.myid, mytable.name NOWAIT") + + self.assert_compile( + table1.select(table1.c.myid == 7) + .with_for_update(skip_locked=True, + of=[table1.c.myid, table1.c.name]), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " + "mytable.myid, mytable.name SKIP LOCKED") + + # key_share has no effect + self.assert_compile( + table1.select(table1.c.myid == 7).with_for_update(key_share=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + # read has no effect + self.assert_compile( + table1 + .select(table1.c.myid == 7) + .with_for_update(read=True, key_share=True), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + + ta = table1.alias() + self.assert_compile( + ta + .select(ta.c.myid == 7) + .with_for_update(of=[ta.c.myid, ta.c.name]), + "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " + "FROM mytable mytable_1 " + "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF " + "mytable_1.myid, mytable_1.name" + ) + + def test_for_update_of_w_limit_adaption_col_present(self): + table1 = table('mytable', column('myid'), column('name')) + + self.assert_compile( + select([table1.c.myid, table1.c.name]). + where(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.name). + limit(10), + "SELECT myid, name FROM " + "(SELECT mytable.myid AS myid, mytable.name AS name " + "FROM mytable WHERE mytable.myid = :myid_1) " + "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT", + ) + + def test_for_update_of_w_limit_adaption_col_unpresent(self): + table1 = table('mytable', column('myid'), column('name')) + + self.assert_compile( + select([table1.c.myid]). + where(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.name). + limit(10), + "SELECT myid FROM " + "(SELECT mytable.myid AS myid, mytable.name AS name " + "FROM mytable WHERE mytable.myid = :myid_1) " + "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT", + ) + + def test_for_update_of_w_limit_offset_adaption_col_present(self): + table1 = table('mytable', column('myid'), column('name')) + + self.assert_compile( + select([table1.c.myid, table1.c.name]). + where(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.name). + limit(10).offset(50), + "SELECT myid, name FROM (SELECT myid, name, ROWNUM AS ora_rn " + "FROM (SELECT mytable.myid AS myid, mytable.name AS name " + "FROM mytable WHERE mytable.myid = :myid_1) " + "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " + "FOR UPDATE OF name NOWAIT", + ) + + def test_for_update_of_w_limit_offset_adaption_col_unpresent(self): + table1 = table('mytable', column('myid'), column('name')) + + self.assert_compile( + select([table1.c.myid]). + where(table1.c.myid == 7). + with_for_update(nowait=True, of=table1.c.name). + limit(10).offset(50), + "SELECT myid FROM (SELECT myid, ROWNUM AS ora_rn, name " + "FROM (SELECT mytable.myid AS myid, mytable.name AS name " + "FROM mytable WHERE mytable.myid = :myid_1) " + "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " + "FOR UPDATE OF name NOWAIT", + ) + + def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self): + table1 = table('mytable', column('myid'), column('foo'), column('bar')) + + self.assert_compile( + select([table1.c.myid, table1.c.bar]). + where(table1.c.myid == 7). + with_for_update(nowait=True, of=[table1.c.foo, table1.c.bar]). + limit(10).offset(50), + "SELECT myid, bar FROM (SELECT myid, bar, ROWNUM AS ora_rn, " + "foo FROM (SELECT mytable.myid AS myid, mytable.bar AS bar, " + "mytable.foo AS foo FROM mytable WHERE mytable.myid = :myid_1) " + "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " + "FOR UPDATE OF foo, bar NOWAIT" + ) + + def test_limit_preserves_typing_information(self): + class MyType(TypeDecorator): + impl = Integer + + stmt = select([type_coerce(column('x'), MyType).label('foo')]).limit(1) + dialect = oracle.dialect() + compiled = stmt.compile(dialect=dialect) + assert isinstance(compiled._create_result_map()['foo'][-1], MyType) + + def test_use_binds_for_limits_disabled(self): + t = table('sometable', column('col1'), column('col2')) + dialect = oracle.OracleDialect(use_binds_for_limits=False) + + self.assert_compile( + select([t]).limit(10), + "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " + "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM <= 10", + dialect=dialect) + + self.assert_compile( + select([t]).offset(10), + "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " + "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " + "FROM sometable)) WHERE ora_rn > 10", + dialect=dialect) + + self.assert_compile( + select([t]).limit(10).offset(10), + "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " + "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " + "FROM sometable) WHERE ROWNUM <= 20) WHERE ora_rn > 10", + dialect=dialect) + + def test_use_binds_for_limits_enabled(self): + t = table('sometable', column('col1'), column('col2')) + dialect = oracle.OracleDialect(use_binds_for_limits=True) + + self.assert_compile( + select([t]).limit(10), + "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " + "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM " + "<= :param_1", + dialect=dialect) + + self.assert_compile( + select([t]).offset(10), + "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " + "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " + "FROM sometable)) WHERE ora_rn > :param_1", + dialect=dialect) + + self.assert_compile( + select([t]).limit(10).offset(10), + "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " + "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " + "FROM sometable) WHERE ROWNUM <= :param_1 + :param_2) " + "WHERE ora_rn > :param_2", + dialect=dialect, + checkparams={'param_1': 10, 'param_2': 10}) + + def test_long_labels(self): + dialect = default.DefaultDialect() + dialect.max_identifier_length = 30 + + ora_dialect = oracle.dialect() + + m = MetaData() + a_table = Table( + 'thirty_characters_table_xxxxxx', + m, + Column('id', Integer, primary_key=True) + ) + + other_table = Table( + 'other_thirty_characters_table_', + m, + Column('id', Integer, primary_key=True), + Column('thirty_characters_table_id', + Integer, + ForeignKey('thirty_characters_table_xxxxxx.id'), + primary_key=True)) + + anon = a_table.alias() + self.assert_compile(select([other_table, + anon]). + select_from( + other_table.outerjoin(anon)).apply_labels(), + 'SELECT other_thirty_characters_table_.id ' + 'AS other_thirty_characters__1, ' + 'other_thirty_characters_table_.thirty_char' + 'acters_table_id AS other_thirty_characters' + '__2, thirty_characters_table__1.id AS ' + 'thirty_characters_table__3 FROM ' + 'other_thirty_characters_table_ LEFT OUTER ' + 'JOIN thirty_characters_table_xxxxxx AS ' + 'thirty_characters_table__1 ON ' + 'thirty_characters_table__1.id = ' + 'other_thirty_characters_table_.thirty_char' + 'acters_table_id', dialect=dialect) + self.assert_compile(select([other_table, + anon]).select_from( + other_table.outerjoin(anon)).apply_labels(), + 'SELECT other_thirty_characters_table_.id ' + 'AS other_thirty_characters__1, ' + 'other_thirty_characters_table_.thirty_char' + 'acters_table_id AS other_thirty_characters' + '__2, thirty_characters_table__1.id AS ' + 'thirty_characters_table__3 FROM ' + 'other_thirty_characters_table_ LEFT OUTER ' + 'JOIN thirty_characters_table_xxxxxx ' + 'thirty_characters_table__1 ON ' + 'thirty_characters_table__1.id = ' + 'other_thirty_characters_table_.thirty_char' + 'acters_table_id', dialect=ora_dialect) + + def test_outer_join(self): + table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String)) + + table2 = table( + 'myothertable', + column('otherid', Integer), + column('othername', String), + ) + + table3 = table( + 'thirdtable', + column('userid', Integer), + column('otherstuff', String), + ) + + query = select([table1, table2], + or_(table1.c.name == 'fred', + table1.c.myid == 10, table2.c.othername != 'jack', + text('EXISTS (select yay from foo where boo = lar)') + ), + from_obj=[outerjoin(table1, + table2, + table1.c.myid == table2.c.otherid)]) + self.assert_compile(query, + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, myothertable.otherid,' + ' myothertable.othername FROM mytable, ' + 'myothertable WHERE (mytable.name = ' + ':name_1 OR mytable.myid = :myid_1 OR ' + 'myothertable.othername != :othername_1 OR ' + 'EXISTS (select yay from foo where boo = ' + 'lar)) AND mytable.myid = ' + 'myothertable.otherid(+)', + dialect=oracle.OracleDialect(use_ansi=False)) + query = table1.outerjoin(table2, + table1.c.myid == table2.c.otherid) \ + .outerjoin(table3, table3.c.userid == table2.c.otherid) + self.assert_compile(query.select(), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, myothertable.otherid,' + ' myothertable.othername, ' + 'thirdtable.userid, thirdtable.otherstuff ' + 'FROM mytable LEFT OUTER JOIN myothertable ' + 'ON mytable.myid = myothertable.otherid ' + 'LEFT OUTER JOIN thirdtable ON ' + 'thirdtable.userid = myothertable.otherid') + + self.assert_compile(query.select(), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, myothertable.otherid,' + ' myothertable.othername, ' + 'thirdtable.userid, thirdtable.otherstuff ' + 'FROM mytable, myothertable, thirdtable ' + 'WHERE thirdtable.userid(+) = ' + 'myothertable.otherid AND mytable.myid = ' + 'myothertable.otherid(+)', + dialect=oracle.dialect(use_ansi=False)) + query = table1.join(table2, + table1.c.myid == table2.c.otherid) \ + .join(table3, table3.c.userid == table2.c.otherid) + self.assert_compile(query.select(), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description, myothertable.otherid,' + ' myothertable.othername, ' + 'thirdtable.userid, thirdtable.otherstuff ' + 'FROM mytable, myothertable, thirdtable ' + 'WHERE thirdtable.userid = ' + 'myothertable.otherid AND mytable.myid = ' + 'myothertable.otherid', + dialect=oracle.dialect(use_ansi=False)) + query = table1.join(table2, + table1.c.myid == table2.c.otherid) \ + .outerjoin(table3, table3.c.userid == table2.c.otherid) + self.assert_compile(query.select().order_by(table1.c.name). + limit(10).offset(5), + 'SELECT myid, name, description, otherid, ' + 'othername, userid, otherstuff FROM ' + '(SELECT myid, name, description, otherid, ' + 'othername, userid, otherstuff, ROWNUM AS ' + 'ora_rn FROM (SELECT mytable.myid AS myid, ' + 'mytable.name AS name, mytable.description ' + 'AS description, myothertable.otherid AS ' + 'otherid, myothertable.othername AS ' + 'othername, thirdtable.userid AS userid, ' + 'thirdtable.otherstuff AS otherstuff FROM ' + 'mytable, myothertable, thirdtable WHERE ' + 'thirdtable.userid(+) = ' + 'myothertable.otherid AND mytable.myid = ' + 'myothertable.otherid ORDER BY mytable.name) ' + 'WHERE ROWNUM <= :param_1 + :param_2) ' + 'WHERE ora_rn > :param_2', + checkparams={'param_1': 10, 'param_2': 5}, + dialect=oracle.dialect(use_ansi=False)) + + subq = select([table1]).select_from( + table1.outerjoin(table2, table1.c.myid == table2.c.otherid)) \ + .alias() + q = select([table3]).select_from( + table3.outerjoin(subq, table3.c.userid == subq.c.myid)) + + self.assert_compile(q, + 'SELECT thirdtable.userid, ' + 'thirdtable.otherstuff FROM thirdtable ' + 'LEFT OUTER JOIN (SELECT mytable.myid AS ' + 'myid, mytable.name AS name, ' + 'mytable.description AS description FROM ' + 'mytable LEFT OUTER JOIN myothertable ON ' + 'mytable.myid = myothertable.otherid) ' + 'anon_1 ON thirdtable.userid = anon_1.myid', + dialect=oracle.dialect(use_ansi=True)) + + self.assert_compile(q, + 'SELECT thirdtable.userid, ' + 'thirdtable.otherstuff FROM thirdtable, ' + '(SELECT mytable.myid AS myid, ' + 'mytable.name AS name, mytable.description ' + 'AS description FROM mytable, myothertable ' + 'WHERE mytable.myid = myothertable.otherid(' + '+)) anon_1 WHERE thirdtable.userid = ' + 'anon_1.myid(+)', + dialect=oracle.dialect(use_ansi=False)) + + q = select([table1.c.name]).where(table1.c.name == 'foo') + self.assert_compile(q, + 'SELECT mytable.name FROM mytable WHERE ' + 'mytable.name = :name_1', + dialect=oracle.dialect(use_ansi=False)) + subq = select([table3.c.otherstuff]) \ + .where(table3.c.otherstuff == table1.c.name).label('bar') + q = select([table1.c.name, subq]) + self.assert_compile(q, + 'SELECT mytable.name, (SELECT ' + 'thirdtable.otherstuff FROM thirdtable ' + 'WHERE thirdtable.otherstuff = ' + 'mytable.name) AS bar FROM mytable', + dialect=oracle.dialect(use_ansi=False)) + + def test_nonansi_nested_right_join(self): + a = table('a', column('a')) + b = table('b', column('b')) + c = table('c', column('c')) + + j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b) + + self.assert_compile( + select([j]), + "SELECT a.a, b.b, c.c FROM a, b, c " + "WHERE a.a = b.b AND b.b = c.c", + dialect=oracle.OracleDialect(use_ansi=False) + ) + + j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b) + + self.assert_compile( + select([j]), + "SELECT a.a, b.b, c.c FROM a, b, c " + "WHERE a.a = b.b(+) AND b.b = c.c", + dialect=oracle.OracleDialect(use_ansi=False) + ) + + j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b) + + self.assert_compile( + select([j]), + "SELECT a.a, b.b, c.c FROM a, b, c " + "WHERE a.a = b.b AND b.b = c.c(+)", + dialect=oracle.OracleDialect(use_ansi=False) + ) + + def test_alias_outer_join(self): + address_types = table('address_types', column('id'), + column('name')) + addresses = table('addresses', column('id'), column('user_id'), + column('address_type_id'), + column('email_address')) + at_alias = address_types.alias() + s = select([at_alias, addresses]) \ + .select_from( + addresses.outerjoin( + at_alias, + addresses.c.address_type_id == at_alias.c.id)) \ + .where(addresses.c.user_id == 7) \ + .order_by(addresses.c.id, address_types.c.id) + self.assert_compile(s, + 'SELECT address_types_1.id, ' + 'address_types_1.name, addresses.id, ' + 'addresses.user_id, addresses.address_type_' + 'id, addresses.email_address FROM ' + 'addresses LEFT OUTER JOIN address_types ' + 'address_types_1 ON addresses.address_type_' + 'id = address_types_1.id WHERE ' + 'addresses.user_id = :user_id_1 ORDER BY ' + 'addresses.id, address_types.id') + + def test_returning_insert(self): + t1 = table('t1', column('c1'), column('c2'), column('c3')) + self.assert_compile( + t1.insert().values(c1=1).returning(t1.c.c2, t1.c.c3), + "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " + "t1.c2, t1.c3 INTO :ret_0, :ret_1") + + def test_returning_insert_functional(self): + t1 = table('t1', + column('c1'), + column('c2', String()), + column('c3', String())) + fn = func.lower(t1.c.c2, type_=String()) + stmt = t1.insert().values(c1=1).returning(fn, t1.c.c3) + compiled = stmt.compile(dialect=oracle.dialect()) + eq_(compiled._create_result_map(), + {'c3': ('c3', (t1.c.c3, 'c3', 'c3'), t1.c.c3.type), + 'lower': ('lower', (fn, 'lower', None), fn.type)}) + + self.assert_compile( + stmt, + "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " + "lower(t1.c2), t1.c3 INTO :ret_0, :ret_1") + + def test_returning_insert_labeled(self): + t1 = table('t1', column('c1'), column('c2'), column('c3')) + self.assert_compile( + t1.insert().values(c1=1).returning( + t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), + "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " + "t1.c2, t1.c3 INTO :ret_0, :ret_1") + + def test_compound(self): + t1 = table('t1', column('c1'), column('c2'), column('c3')) + t2 = table('t2', column('c1'), column('c2'), column('c3')) + self.assert_compile(union(t1.select(), t2.select()), + 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 UNION ' + 'SELECT t2.c1, t2.c2, t2.c3 FROM t2') + self.assert_compile(except_(t1.select(), t2.select()), + 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 MINUS ' + 'SELECT t2.c1, t2.c2, t2.c3 FROM t2') + + def test_no_paren_fns(self): + for fn, expected in [ + (func.uid(), "uid"), + (func.UID(), "UID"), + (func.sysdate(), "sysdate"), + (func.row_number(), "row_number()"), + (func.rank(), "rank()"), + (func.now(), "CURRENT_TIMESTAMP"), + (func.current_timestamp(), "CURRENT_TIMESTAMP"), + (func.user(), "USER"), + ]: + self.assert_compile(fn, expected) + + def test_create_index_alt_schema(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer), + schema="alt_schema") + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x)), + "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)" + ) + + def test_create_index_expr(self): + m = MetaData() + t1 = Table('foo', m, + Column('x', Integer)) + self.assert_compile( + schema.CreateIndex(Index("bar", t1.c.x > 5)), + "CREATE INDEX bar ON foo (x > 5)" + ) + + def test_table_options(self): + m = MetaData() + + t = Table( + 'foo', m, + Column('x', Integer), + prefixes=["GLOBAL TEMPORARY"], + oracle_on_commit="PRESERVE ROWS" + ) + + self.assert_compile( + schema.CreateTable(t), + "CREATE GLOBAL TEMPORARY TABLE " + "foo (x INTEGER) ON COMMIT PRESERVE ROWS" + ) + + def test_create_table_compress(self): + m = MetaData() + tbl1 = Table('testtbl1', m, Column('data', Integer), + oracle_compress=True) + tbl2 = Table('testtbl2', m, Column('data', Integer), + oracle_compress="OLTP") + + self.assert_compile(schema.CreateTable(tbl1), + "CREATE TABLE testtbl1 (data INTEGER) COMPRESS") + self.assert_compile(schema.CreateTable(tbl2), + "CREATE TABLE testtbl2 (data INTEGER) " + "COMPRESS FOR OLTP") + + def test_create_index_bitmap_compress(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', Integer)) + idx1 = Index('idx1', tbl.c.data, oracle_compress=True) + idx2 = Index('idx2', tbl.c.data, oracle_compress=1) + idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True) + + self.assert_compile(schema.CreateIndex(idx1), + "CREATE INDEX idx1 ON testtbl (data) COMPRESS") + self.assert_compile(schema.CreateIndex(idx2), + "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1") + self.assert_compile(schema.CreateIndex(idx3), + "CREATE BITMAP INDEX idx3 ON testtbl (data)") + + +class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): + + def test_basic(self): + seq = Sequence('my_seq_no_schema') + dialect = oracle.OracleDialect() + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'my_seq_no_schema' + seq = Sequence('my_seq', schema='some_schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'some_schema.my_seq' + seq = Sequence('My_Seq', schema='Some_Schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == '"Some_Schema"."My_Seq"' + + diff --git a/test/dialect/oracle/test_dialect.py b/test/dialect/oracle/test_dialect.py new file mode 100644 index 000000000..83a875c2e --- /dev/null +++ b/test/dialect/oracle/test_dialect.py @@ -0,0 +1,316 @@ +# coding: utf-8 + + +from sqlalchemy.testing import eq_ +from sqlalchemy import types as sqltypes, exc, schema +from sqlalchemy.sql import table, column +from sqlalchemy.testing import (fixtures, + AssertsExecutionResults, + AssertsCompiledSQL) +from sqlalchemy import testing +from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ + Index, MetaData, select, inspect, ForeignKey, String, func, \ + TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ + literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ + ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ + union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ + PrimaryKeyConstraint, FLOAT +from sqlalchemy.util import u, b +from sqlalchemy import util +from sqlalchemy.testing import assert_raises, assert_raises_message +from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.dialects.oracle import cx_oracle, base as oracle +from sqlalchemy.engine import default +import decimal +from sqlalchemy.engine import url +from sqlalchemy.testing.schema import Table, Column +import datetime +import os +from sqlalchemy import sql +from sqlalchemy.testing.mock import Mock + + +class DialectTest(fixtures.TestBase): + def test_cx_oracle_version_parse(self): + dialect = cx_oracle.OracleDialect_cx_oracle() + + eq_( + dialect._parse_cx_oracle_ver("5.2"), + (5, 2) + ) + + eq_( + dialect._parse_cx_oracle_ver("5.0.1"), + (5, 0, 1) + ) + + eq_( + dialect._parse_cx_oracle_ver("6.0b1"), + (6, 0) + ) + + +class OutParamTest(fixtures.TestBase, AssertsExecutionResults): + __only_on__ = 'oracle+cx_oracle' + __backend__ = True + + @classmethod + def setup_class(cls): + testing.db.execute(""" + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; + """) + + def test_out_params(self): + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + ':z_out); end;', + bindparams=[bindparam('x_in', Float), + outparam('x_out', Integer), + outparam('y_out', Float), + outparam('z_out', String)]), + x_in=5) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) + assert isinstance(result.out_parameters['x_out'], int) + + @classmethod + def teardown_class(cls): + testing.db.execute("DROP PROCEDURE foo") + + +class QuotedBindRoundTripTest(fixtures.TestBase): + + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + def test_table_round_trip(self): + oracle.RESERVED_WORDS.remove('UNION') + + metadata = self.metadata + table = Table("t1", metadata, + Column("option", Integer), + Column("plain", Integer, quote=True), + # test that quote works for a reserved word + # that the dialect isn't aware of when quote + # is set + Column("union", Integer, quote=True)) + metadata.create_all() + + table.insert().execute( + {"option": 1, "plain": 1, "union": 1} + ) + eq_( + testing.db.execute(table.select()).first(), + (1, 1, 1) + ) + table.update().values(option=2, plain=2, union=2).execute() + eq_( + testing.db.execute(table.select()).first(), + (2, 2, 2) + ) + + def test_numeric_bind_round_trip(self): + eq_( + testing.db.scalar( + select([ + literal_column("2", type_=Integer()) + + bindparam("2_1", value=2)]) + ), + 4 + ) + + @testing.provide_metadata + def test_numeric_bind_in_crud(self): + t = Table( + "asfd", self.metadata, + Column("100K", Integer) + ) + t.create() + + testing.db.execute(t.insert(), {"100K": 10}) + eq_( + testing.db.scalar(t.select()), 10 + ) + + +class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): + + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version + + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) + dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + dialect._detect_decimal_char = Mock() + return dialect + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) + + # before connect, assume modern DB + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + + dialect.initialize(Mock()) + assert not dialect.implicit_returning + assert not dialect._supports_char_length + assert not dialect._supports_nchar + assert not dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) + + dialect = self._dialect((8, 2, 5), implicit_returning=True) + dialect.initialize(testing.db.connect()) + assert dialect.implicit_returning + + def test_default_flags(self): + """test with no initialization or server version info""" + + dialect = self._dialect(None) + + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) + + def test_ora10_flags(self): + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) + + +class ExecuteTest(fixtures.TestBase): + + __only_on__ = 'oracle' + __backend__ = True + + def test_basic(self): + eq_(testing.db.execute('/*+ this is a comment */ SELECT 1 FROM ' + 'DUAL').fetchall(), [(1, )]) + + def test_sequences_are_integers(self): + seq = Sequence('foo_seq') + seq.create(testing.db) + try: + val = testing.db.execute(seq) + eq_(val, 1) + assert type(val) is int + finally: + seq.drop(testing.db) + + @testing.provide_metadata + def test_limit_offset_for_update(self): + metadata = self.metadata + # oracle can't actually do the ROWNUM thing with FOR UPDATE + # very well. + + t = Table('t1', + metadata, + Column('id', Integer, primary_key=True), + Column('data', Integer)) + metadata.create_all() + + t.insert().execute( + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, + ) + + # here, we can't use ORDER BY. + eq_( + t.select(for_update=True).limit(2).execute().fetchall(), + [(1, 1), + (2, 7)] + ) + + # here, its impossible. But we'd prefer it to raise ORA-02014 + # instead of issuing a syntax error. + assert_raises_message( + exc.DatabaseError, + "ORA-02014", + t.select(for_update=True).limit(2).offset(3).execute + ) + + +class UnicodeSchemaTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + def test_quoted_column_non_unicode(self): + metadata = self.metadata + table = Table("atable", metadata, + Column("_underscorecolumn", + Unicode(255), + primary_key=True)) + metadata.create_all() + + table.insert().execute( + {'_underscorecolumn': u('’é')}, + ) + result = testing.db.execute( + table.select().where(table.c._underscorecolumn == u('’é')) + ).scalar() + eq_(result, u('’é')) + + @testing.provide_metadata + def test_quoted_column_unicode(self): + metadata = self.metadata + table = Table("atable", metadata, + Column(u("méil"), Unicode(255), primary_key=True)) + metadata.create_all() + + table.insert().execute( + {u('méil'): u('’é')}, + ) + result = testing.db.execute( + table.select().where(table.c[u('méil')] == u('’é')) + ).scalar() + eq_(result, u('’é')) + + +class ServiceNameTest(fixtures.TestBase): + __only_on__ = 'oracle+cx_oracle' + __backend__ = True + + def test_cx_oracle_service_name(self): + url_string = 'oracle+cx_oracle://scott:tiger@host/?service_name=hr' + eng = create_engine(url_string, _initialize=False) + cargs, cparams = eng.dialect.create_connect_args(eng.url) + + assert 'SERVICE_NAME=hr' in cparams['dsn'] + assert 'SID=hr' not in cparams['dsn'] + + def test_cx_oracle_service_name_bad(self): + url_string = 'oracle+cx_oracle://scott:tiger@host/hr1?service_name=hr2' + assert_raises( + exc.InvalidRequestError, + create_engine, url_string, + _initialize=False + ) + diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py new file mode 100644 index 000000000..d09f12e60 --- /dev/null +++ b/test/dialect/oracle/test_reflection.py @@ -0,0 +1,536 @@ +# coding: utf-8 + + +from sqlalchemy.testing import eq_ +from sqlalchemy import exc +from sqlalchemy.sql import table +from sqlalchemy.testing import fixtures, AssertsCompiledSQL +from sqlalchemy import testing +from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ + Index, MetaData, select, inspect, ForeignKey, String, func, \ + TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ + literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ + ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ + union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ + PrimaryKeyConstraint, FLOAT +from sqlalchemy.testing import assert_raises +from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.testing.schema import Table, Column + + +class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): + __only_on__ = 'oracle' + __backend__ = True + + @classmethod + def setup_class(cls): + # currently assuming full DBA privs for the user. + # don't really know how else to go here unless + # we connect as the other user. + + for stmt in (""" +create table %(test_schema)s.parent( + id integer primary key, + data varchar2(50) +); + +create table %(test_schema)s.child( + id integer primary key, + data varchar2(50), + parent_id integer references %(test_schema)s.parent(id) +); + +create table local_table( + id integer primary key, + data varchar2(50) +); + +create synonym %(test_schema)s.ptable for %(test_schema)s.parent; +create synonym %(test_schema)s.ctable for %(test_schema)s.child; + +create synonym %(test_schema)s_pt for %(test_schema)s.parent; + +create synonym %(test_schema)s.local_table for local_table; + +-- can't make a ref from local schema to the +-- remote schema's table without this, +-- *and* cant give yourself a grant ! +-- so we give it to public. ideas welcome. +grant references on %(test_schema)s.parent to public; +grant references on %(test_schema)s.child to public; +""" % {"test_schema": testing.config.test_schema}).split(";"): + if stmt.strip(): + testing.db.execute(stmt) + + @classmethod + def teardown_class(cls): + for stmt in (""" +drop table %(test_schema)s.child; +drop table %(test_schema)s.parent; +drop table local_table; +drop synonym %(test_schema)s.ctable; +drop synonym %(test_schema)s.ptable; +drop synonym %(test_schema)s_pt; +drop synonym %(test_schema)s.local_table; + +""" % {"test_schema": testing.config.test_schema}).split(";"): + if stmt.strip(): + testing.db.execute(stmt) + + @testing.provide_metadata + def test_create_same_names_explicit_schema(self): + schema = testing.db.dialect.default_schema_name + meta = self.metadata + parent = Table('parent', meta, + Column('pid', Integer, primary_key=True), + schema=schema) + child = Table('child', meta, + Column('cid', Integer, primary_key=True), + Column('pid', + Integer, + ForeignKey('%s.parent.pid' % schema)), + schema=schema) + meta.create_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) + + def test_reflect_alt_table_owner_local_synonym(self): + meta = MetaData(testing.db) + parent = Table('%s_pt' % testing.config.test_schema, + meta, + autoload=True, + oracle_resolve_synonyms=True) + self.assert_compile(parent.select(), + "SELECT %(test_schema)s_pt.id, " + "%(test_schema)s_pt.data FROM %(test_schema)s_pt" + % {"test_schema": testing.config.test_schema}) + select([parent]).execute().fetchall() + + def test_reflect_alt_synonym_owner_local_table(self): + meta = MetaData(testing.db) + parent = Table( + 'local_table', meta, autoload=True, + oracle_resolve_synonyms=True, schema=testing.config.test_schema) + self.assert_compile( + parent.select(), + "SELECT %(test_schema)s.local_table.id, " + "%(test_schema)s.local_table.data " + "FROM %(test_schema)s.local_table" % + {"test_schema": testing.config.test_schema} + ) + select([parent]).execute().fetchall() + + @testing.provide_metadata + def test_create_same_names_implicit_schema(self): + meta = self.metadata + parent = Table('parent', + meta, + Column('pid', Integer, primary_key=True)) + child = Table('child', meta, + Column('cid', Integer, primary_key=True), + Column('pid', Integer, ForeignKey('parent.pid'))) + meta.create_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) + + def test_reflect_alt_owner_explicit(self): + meta = MetaData(testing.db) + parent = Table( + 'parent', meta, autoload=True, + schema=testing.config.test_schema) + child = Table( + 'child', meta, autoload=True, + schema=testing.config.test_schema) + + self.assert_compile( + parent.join(child), + "%(test_schema)s.parent JOIN %(test_schema)s.child ON " + "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % { + "test_schema": testing.config.test_schema + }) + select([parent, child]).\ + select_from(parent.join(child)).\ + execute().fetchall() + + def test_reflect_local_to_remote(self): + testing.db.execute( + 'CREATE TABLE localtable (id INTEGER ' + 'PRIMARY KEY, parent_id INTEGER REFERENCES ' + '%(test_schema)s.parent(id))' % { + "test_schema": testing.config.test_schema}) + try: + meta = MetaData(testing.db) + lcl = Table('localtable', meta, autoload=True) + parent = meta.tables['%s.parent' % testing.config.test_schema] + self.assert_compile(parent.join(lcl), + '%(test_schema)s.parent JOIN localtable ON ' + '%(test_schema)s.parent.id = ' + 'localtable.parent_id' % { + "test_schema": testing.config.test_schema} + ) + select([parent, + lcl]).select_from(parent.join(lcl)).execute().fetchall() + finally: + testing.db.execute('DROP TABLE localtable') + + def test_reflect_alt_owner_implicit(self): + meta = MetaData(testing.db) + parent = Table( + 'parent', meta, autoload=True, + schema=testing.config.test_schema) + child = Table( + 'child', meta, autoload=True, + schema=testing.config.test_schema) + self.assert_compile( + parent.join(child), + '%(test_schema)s.parent JOIN %(test_schema)s.child ' + 'ON %(test_schema)s.parent.id = ' + '%(test_schema)s.child.parent_id' % { + "test_schema": testing.config.test_schema}) + select([parent, + child]).select_from(parent.join(child)).execute().fetchall() + + def test_reflect_alt_owner_synonyms(self): + testing.db.execute('CREATE TABLE localtable (id INTEGER ' + 'PRIMARY KEY, parent_id INTEGER REFERENCES ' + '%s.ptable(id))' % testing.config.test_schema) + try: + meta = MetaData(testing.db) + lcl = Table('localtable', meta, autoload=True, + oracle_resolve_synonyms=True) + parent = meta.tables['%s.ptable' % testing.config.test_schema] + self.assert_compile( + parent.join(lcl), + '%(test_schema)s.ptable JOIN localtable ON ' + '%(test_schema)s.ptable.id = ' + 'localtable.parent_id' % { + "test_schema": testing.config.test_schema}) + select([parent, + lcl]).select_from(parent.join(lcl)).execute().fetchall() + finally: + testing.db.execute('DROP TABLE localtable') + + def test_reflect_remote_synonyms(self): + meta = MetaData(testing.db) + parent = Table('ptable', meta, autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True) + child = Table('ctable', meta, autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True) + self.assert_compile( + parent.join(child), + '%(test_schema)s.ptable JOIN ' + '%(test_schema)s.ctable ' + 'ON %(test_schema)s.ptable.id = ' + '%(test_schema)s.ctable.parent_id' % { + "test_schema": testing.config.test_schema}) + select([parent, + child]).select_from(parent.join(child)).execute().fetchall() + + +class ConstraintTest(fixtures.TablesTest): + + __only_on__ = 'oracle' + __backend__ = True + run_deletes = None + + @classmethod + def define_tables(cls, metadata): + Table('foo', metadata, Column('id', Integer, primary_key=True)) + + def test_oracle_has_no_on_update_cascade(self): + bar = Table('bar', self.metadata, + Column('id', Integer, primary_key=True), + Column('foo_id', + Integer, + ForeignKey('foo.id', onupdate='CASCADE'))) + assert_raises(exc.SAWarning, bar.create) + + bat = Table('bat', self.metadata, + Column('id', Integer, primary_key=True), + Column('foo_id', Integer), + ForeignKeyConstraint(['foo_id'], ['foo.id'], + onupdate='CASCADE')) + assert_raises(exc.SAWarning, bat.create) + + def test_reflect_check_include_all(self): + insp = inspect(testing.db) + eq_(insp.get_check_constraints('foo'), []) + eq_( + [rec['sqltext'] + for rec in insp.get_check_constraints('foo', include_all=True)], + ['"ID" IS NOT NULL']) + + +class SystemTableTablenamesTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + def setup(self): + testing.db.execute("create table my_table (id integer)") + testing.db.execute( + "create global temporary table my_temp_table (id integer)" + ) + testing.db.execute( + "create table foo_table (id integer) tablespace SYSTEM" + ) + + def teardown(self): + testing.db.execute("drop table my_temp_table") + testing.db.execute("drop table my_table") + testing.db.execute("drop table foo_table") + + def test_table_names_no_system(self): + insp = inspect(testing.db) + eq_( + insp.get_table_names(), ["my_table"] + ) + + def test_temp_table_names_no_system(self): + insp = inspect(testing.db) + eq_( + insp.get_temp_table_names(), ["my_temp_table"] + ) + + def test_table_names_w_system(self): + engine = testing_engine(options={"exclude_tablespaces": ["FOO"]}) + insp = inspect(engine) + eq_( + set(insp.get_table_names()).intersection(["my_table", + "foo_table"]), + set(["my_table", "foo_table"]) + ) + + +class DontReflectIOTTest(fixtures.TestBase): + """test that index overflow tables aren't included in + table_names.""" + + __only_on__ = 'oracle' + __backend__ = True + + def setup(self): + testing.db.execute(""" + CREATE TABLE admin_docindex( + token char(20), + doc_id NUMBER, + token_frequency NUMBER, + token_offsets VARCHAR2(2000), + CONSTRAINT pk_admin_docindex PRIMARY KEY (token, doc_id)) + ORGANIZATION INDEX + TABLESPACE users + PCTTHRESHOLD 20 + OVERFLOW TABLESPACE users + """) + + def teardown(self): + testing.db.execute("drop table admin_docindex") + + def test_reflect_all(self): + m = MetaData(testing.db) + m.reflect() + eq_( + set(t.name for t in m.tables.values()), + set(['admin_docindex']) + ) + + +class UnsupportedIndexReflectTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + @testing.emits_warning("No column names") + @testing.provide_metadata + def test_reflect_functional_index(self): + metadata = self.metadata + Table('test_index_reflect', metadata, + Column('data', String(20), primary_key=True)) + metadata.create_all() + + testing.db.execute('CREATE INDEX DATA_IDX ON ' + 'TEST_INDEX_REFLECT (UPPER(DATA))') + m2 = MetaData(testing.db) + Table('test_index_reflect', m2, autoload=True) + + +def all_tables_compression_missing(): + try: + testing.db.execute('SELECT compression FROM all_tables') + if "Enterprise Edition" not in testing.db.scalar( + "select * from v$version"): + return True + return False + except Exception: + return True + + +def all_tables_compress_for_missing(): + try: + testing.db.execute('SELECT compress_for FROM all_tables') + if "Enterprise Edition" not in testing.db.scalar( + "select * from v$version"): + return True + return False + except Exception: + return True + + +class TableReflectionTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + @testing.fails_if(all_tables_compression_missing) + def test_reflect_basic_compression(self): + metadata = self.metadata + + tbl = Table('test_compress', metadata, + Column('data', Integer, primary_key=True), + oracle_compress=True) + metadata.create_all() + + m2 = MetaData(testing.db) + + tbl = Table('test_compress', m2, autoload=True) + # Don't hardcode the exact value, but it must be non-empty + assert tbl.dialect_options['oracle']['compress'] + + @testing.provide_metadata + @testing.fails_if(all_tables_compress_for_missing) + def test_reflect_oltp_compression(self): + metadata = self.metadata + + tbl = Table('test_compress', metadata, + Column('data', Integer, primary_key=True), + oracle_compress="OLTP") + metadata.create_all() + + m2 = MetaData(testing.db) + + tbl = Table('test_compress', m2, autoload=True) + assert tbl.dialect_options['oracle']['compress'] == "OLTP" + + +class RoundTripIndexTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + def test_basic(self): + metadata = self.metadata + + s_table = Table( + "sometable", metadata, + Column("id_a", Unicode(255), primary_key=True), + Column("id_b", + Unicode(255), + primary_key=True, + unique=True), + Column("group", Unicode(255), primary_key=True), + Column("col", Unicode(255)), + UniqueConstraint('col', 'group')) + + # "group" is a keyword, so lower case + normalind = Index('tableind', s_table.c.id_b, s_table.c.group) + Index('compress1', s_table.c.id_a, s_table.c.id_b, + oracle_compress=True) + Index('compress2', s_table.c.id_a, s_table.c.id_b, s_table.c.col, + oracle_compress=1) + + metadata.create_all() + mirror = MetaData(testing.db) + mirror.reflect() + metadata.drop_all() + mirror.create_all() + + inspect = MetaData(testing.db) + inspect.reflect() + + def obj_definition(obj): + return (obj.__class__, + tuple([c.name for c in obj.columns]), + getattr(obj, 'unique', None)) + + # find what the primary k constraint name should be + primaryconsname = testing.db.scalar( + text( + """SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=s_table.name.upper(), + owner=testing.db.dialect.default_schema_name.upper()) + + reflectedtable = inspect.tables[s_table.name] + + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + eq_( + reflected[(Index, ('id_b', 'group'), False)].name, + normalind.name + ) + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + + idx = reflected[(Index, ('id_a', 'id_b', ), False)] + assert idx.dialect_options['oracle']['compress'] == 2 + + idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)] + assert idx.dialect_options['oracle']['compress'] == 1 + + eq_(len(reflectedtable.constraints), 1) + eq_(len(reflectedtable.indexes), 5) + + +class DBLinkReflectionTest(fixtures.TestBase): + __requires__ = 'oracle_test_dblink', + __only_on__ = 'oracle' + __backend__ = True + + @classmethod + def setup_class(cls): + from sqlalchemy.testing import config + cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link') + + # note that the synonym here is still not totally functional + # when accessing via a different username as we do with the + # multiprocess test suite, so testing here is minimal + with testing.db.connect() as conn: + conn.execute("create table test_table " + "(id integer primary key, data varchar2(50))") + conn.execute("create synonym test_table_syn " + "for test_table@%s" % cls.dblink) + + @classmethod + def teardown_class(cls): + with testing.db.connect() as conn: + conn.execute("drop synonym test_table_syn") + conn.execute("drop table test_table") + + def test_reflection(self): + """test the resolution of the synonym/dblink. """ + m = MetaData() + + t = Table('test_table_syn', m, autoload=True, + autoload_with=testing.db, oracle_resolve_synonyms=True) + eq_(list(t.c.keys()), ['id', 'data']) + eq_(list(t.primary_key), [t.c.id]) + + diff --git a/test/dialect/oracle/test_types.py b/test/dialect/oracle/test_types.py new file mode 100644 index 000000000..3d08657d8 --- /dev/null +++ b/test/dialect/oracle/test_types.py @@ -0,0 +1,787 @@ +# coding: utf-8 + + +from sqlalchemy.testing import eq_ +from sqlalchemy import types as sqltypes, exc, schema +from sqlalchemy.sql import table, column +from sqlalchemy.testing import (fixtures, + AssertsExecutionResults, + AssertsCompiledSQL) +from sqlalchemy import testing +from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ + Index, MetaData, select, inspect, ForeignKey, String, func, \ + TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ + literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ + ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ + union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ + PrimaryKeyConstraint, FLOAT +from sqlalchemy.util import u, b +from sqlalchemy import util +from sqlalchemy.testing import assert_raises, assert_raises_message +from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.dialects.oracle import cx_oracle, base as oracle +from sqlalchemy.engine import default +import decimal +from sqlalchemy.engine import url +from sqlalchemy.testing.schema import Table, Column +import datetime +import os +from sqlalchemy import sql +from sqlalchemy.testing.mock import Mock + + +class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = oracle.OracleDialect() + + def test_no_clobs_for_string_params(self): + """test that simple string params get a DBAPI type of + VARCHAR, not CLOB. This is to prevent setinputsizes + from setting up cx_oracle.CLOBs on + string-based bind params [ticket:793].""" + + class FakeDBAPI(object): + def __getattr__(self, attr): + return attr + + dialect = oracle.OracleDialect() + dbapi = FakeDBAPI() + + b = bindparam("foo", "hello world!") + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) + + b = bindparam("foo", "hello world!") + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) + + def test_long(self): + self.assert_compile(oracle.LONG(), "LONG") + + def test_type_adapt(self): + dialect = cx_oracle.dialect() + + for start, test in [ + (Date(), cx_oracle._OracleDate), + (oracle.OracleRaw(), cx_oracle._OracleRaw), + (String(), String), + (VARCHAR(), cx_oracle._OracleString), + (DATE(), cx_oracle._OracleDate), + (oracle.DATE(), oracle.DATE), + (String(50), cx_oracle._OracleString), + (Unicode(), cx_oracle._OracleNVarChar), + (Text(), cx_oracle._OracleText), + (UnicodeText(), cx_oracle._OracleUnicodeText), + (NCHAR(), cx_oracle._OracleNVarChar), + (oracle.RAW(50), cx_oracle._OracleRaw), + ]: + assert isinstance(start.dialect_impl(dialect), test), \ + "wanted %r got %r" % (test, start.dialect_impl(dialect)) + + def test_raw_compile(self): + self.assert_compile(oracle.RAW(), "RAW") + self.assert_compile(oracle.RAW(35), "RAW(35)") + + def test_char_length(self): + self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") + + oracle8dialect = oracle.dialect() + oracle8dialect.server_version_info = (8, 0) + self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) + + self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") + self.assert_compile(CHAR(50), "CHAR(50)") + + def test_varchar_types(self): + dialect = oracle.dialect() + for typ, exp in [ + (String(50), "VARCHAR2(50 CHAR)"), + (Unicode(50), "NVARCHAR2(50)"), + (NVARCHAR(50), "NVARCHAR2(50)"), + (VARCHAR(50), "VARCHAR(50 CHAR)"), + (oracle.NVARCHAR2(50), "NVARCHAR2(50)"), + (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"), + (String(), "VARCHAR2"), + (Unicode(), "NVARCHAR2"), + (NVARCHAR(), "NVARCHAR2"), + (VARCHAR(), "VARCHAR"), + (oracle.NVARCHAR2(), "NVARCHAR2"), + (oracle.VARCHAR2(), "VARCHAR2"), + ]: + self.assert_compile(typ, exp, dialect=dialect) + + def test_interval(self): + for type_, expected in [(oracle.INTERVAL(), + 'INTERVAL DAY TO SECOND'), + (oracle.INTERVAL(day_precision=3), + 'INTERVAL DAY(3) TO SECOND'), + (oracle.INTERVAL(second_precision=5), + 'INTERVAL DAY TO SECOND(5)'), + (oracle.INTERVAL(day_precision=2, + second_precision=5), + 'INTERVAL DAY(2) TO SECOND(5)')]: + self.assert_compile(type_, expected) + + +class TypesTest(fixtures.TestBase): + __only_on__ = 'oracle' + __dialect__ = oracle.OracleDialect() + __backend__ = True + + @testing.fails_on('+zxjdbc', 'zxjdbc lacks the FIXED_CHAR dbapi type') + def test_fixed_char(self): + m = MetaData(testing.db) + t = Table('t1', m, + Column('id', Integer, primary_key=True), + Column('data', CHAR(30), nullable=False)) + + t.create() + try: + t.insert().execute( + dict(id=1, data="value 1"), + dict(id=2, data="value 2"), + dict(id=3, data="value 3") + ) + + eq_( + t.select().where(t.c.data == 'value 2').execute().fetchall(), + [(2, 'value 2 ')] + ) + + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + assert type(t2.c.data.type) is CHAR + eq_( + t2.select().where(t2.c.data == 'value 2').execute().fetchall(), + [(2, 'value 2 ')] + ) + + finally: + t.drop() + + @testing.requires.returning + @testing.provide_metadata + def test_int_not_float(self): + m = self.metadata + t1 = Table('t1', m, Column('foo', Integer)) + t1.create() + r = t1.insert().values(foo=5).returning(t1.c.foo).execute() + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) + + @testing.requires.returning + @testing.provide_metadata + def test_int_not_float_no_coerce_decimal(self): + engine = testing_engine(options=dict(coerce_to_decimal=False)) + + m = self.metadata + t1 = Table('t1', m, Column('foo', Integer)) + t1.create() + r = engine.execute(t1.insert().values(foo=5).returning(t1.c.foo)) + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) + + @testing.provide_metadata + def test_rowid(self): + metadata = self.metadata + t = Table('t1', metadata, Column('x', Integer)) + t.create() + t.insert().execute(x=5) + s1 = select([t]) + s2 = select([column('rowid')]).select_from(s1) + rowid = s2.scalar() + + # the ROWID type is not really needed here, + # as cx_oracle just treats it as a string, + # but we want to make sure the ROWID works... + rowid_col = column('rowid', oracle.ROWID) + s3 = select([t.c.x, rowid_col]) \ + .where(rowid_col == cast(rowid, oracle.ROWID)) + eq_(s3.select().execute().fetchall(), [(5, rowid)]) + + @testing.fails_on('+zxjdbc', + 'Not yet known how to pass values of the ' + 'INTERVAL type') + @testing.provide_metadata + def test_interval(self): + metadata = self.metadata + interval_table = Table('intervaltable', metadata, Column('id', + Integer, primary_key=True, + test_needs_autoincrement=True), + Column('day_interval', + oracle.INTERVAL(day_precision=3))) + metadata.create_all() + interval_table.insert().\ + execute(day_interval=datetime.timedelta(days=35, seconds=5743)) + row = interval_table.select().execute().first() + eq_(row['day_interval'], datetime.timedelta(days=35, + seconds=5743)) + + @testing.provide_metadata + def test_numerics(self): + m = self.metadata + t1 = Table('t1', m, + Column('intcol', Integer), + Column('numericcol', Numeric(precision=9, scale=2)), + Column('floatcol1', Float()), + Column('floatcol2', FLOAT()), + Column('doubleprec', oracle.DOUBLE_PRECISION), + Column('numbercol1', oracle.NUMBER(9)), + Column('numbercol2', oracle.NUMBER(9, 3)), + Column('numbercol3', oracle.NUMBER)) + t1.create() + t1.insert().execute( + intcol=1, + numericcol=5.2, + floatcol1=6.5, + floatcol2=8.5, + doubleprec=9.5, + numbercol1=12, + numbercol2=14.85, + numbercol3=15.76 + ) + + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + + for row in ( + t1.select().execute().first(), + t2.select().execute().first() + ): + for i, (val, type_) in enumerate(( + (1, int), + (decimal.Decimal("5.2"), decimal.Decimal), + (6.5, float), + (8.5, float), + (9.5, float), + (12, int), + (decimal.Decimal("14.85"), decimal.Decimal), + (15.76, float), + )): + eq_(row[i], val) + assert isinstance(row[i], type_), '%r is not %r' \ + % (row[i], type_) + + @testing.provide_metadata + def test_numeric_infinity_float(self): + m = self.metadata + t1 = Table('t1', m, + Column("intcol", Integer), + Column("numericcol", oracle.BINARY_DOUBLE(asdecimal=False))) + t1.create() + t1.insert().execute( + intcol=1, + numericcol=float("inf"), + ) + + eq_( + select([t1.c.numericcol]).scalar(), + float("inf") + ) + + eq_( + testing.db.scalar("select numericcol from t1"), + float("inf")) + + @testing.provide_metadata + def test_numerics_broken_inspection(self): + """Numeric scenarios where Oracle type info is 'broken', + returning us precision, scale of the form (0, 0) or (0, -127). + We convert to Decimal and let int()/float() processors take over. + + """ + + metadata = self.metadata + + # this test requires cx_oracle 5 + + foo = Table('foo', metadata, + Column('idata', Integer), + Column('ndata', Numeric(20, 2)), + Column('ndata2', Numeric(20, 2)), + Column('nidata', Numeric(5, 0)), + Column('fdata', Float())) + foo.create() + + foo.insert().execute({ + 'idata': 5, + 'ndata': decimal.Decimal("45.6"), + 'ndata2': decimal.Decimal("45.0"), + 'nidata': decimal.Decimal('53'), + 'fdata': 45.68392 + }) + + stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" + + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, int, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + 53, 45.683920000000001) + ) + + # with a nested subquery, + # both Numeric values that don't have decimal places, regardless + # of their originating type, come back as ints with no useful + # typing information beyond "numeric". So native handler + # must convert to int. + # this means our Decimal converters need to run no matter what. + # totally sucks. + + stmt = """ + SELECT + (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata, + (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) FROM DUAL) + AS ndata, + (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) FROM DUAL) + AS ndata2, + (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) FROM DUAL) + AS nidata, + (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata + FROM dual + """ + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) + + row = testing.db.execute(text(stmt, + typemap={ + 'idata': Integer(), + 'ndata': Numeric(20, 2), + 'ndata2': Numeric(20, 2), + 'nidata': Numeric(5, 0), + 'fdata': Float()})).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) + ) + + stmt = """ + SELECT + anon_1.idata AS anon_1_idata, + anon_1.ndata AS anon_1_ndata, + anon_1.ndata2 AS anon_1_ndata2, + anon_1.nidata AS anon_1_nidata, + anon_1.fdata AS anon_1_fdata + FROM (SELECT idata, ndata, ndata2, nidata, fdata + FROM ( + SELECT + (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata, + (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) + FROM DUAL) AS ndata, + (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) + FROM DUAL) AS ndata2, + (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) + FROM DUAL) AS nidata, + (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) + AS fdata + FROM dual + ) + WHERE ROWNUM >= 0) anon_1 + """ + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) + + row = testing.db.execute(text(stmt, + typemap={ + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2), + 'anon_1_ndata2': Numeric(20, 2), + 'anon_1_nidata': Numeric(5, 0), + 'anon_1_fdata': Float() + })).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) + ) + + row = testing.db.execute(text( + stmt, + typemap={ + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2, asdecimal=False), + 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), + 'anon_1_nidata': Numeric(5, 0, asdecimal=False), + 'anon_1_fdata': Float(asdecimal=True) + })).fetchall()[0] + eq_( + [type(x) for x in row], + [int, float, float, float, decimal.Decimal] + ) + eq_( + row, + (5, 45.6, 45, 53, decimal.Decimal('45.68392')) + ) + + def test_numeric_no_coerce_decimal_mode(self): + engine = testing_engine(options=dict(coerce_to_decimal=False)) + + # raw SQL no longer coerces to decimal + value = engine.scalar("SELECT 5.66 FROM DUAL") + assert isinstance(value, float) + + # explicit typing still *does* coerce to decimal + # (change in 1.2) + value = engine.scalar( + text("SELECT 5.66 AS foo FROM DUAL"). + columns(foo=Numeric(4, 2, asdecimal=True))) + assert isinstance(value, decimal.Decimal) + + # default behavior is raw SQL coerces to decimal + value = testing.db.scalar("SELECT 5.66 FROM DUAL") + assert isinstance(value, decimal.Decimal) + + @testing.only_on("oracle+cx_oracle", "cx_oracle-specific feature") + @testing.fails_if( + testing.requires.python3, + "cx_oracle always returns unicode on py3k") + def test_coerce_to_unicode(self): + engine = testing_engine(options=dict(coerce_to_unicode=True)) + value = engine.scalar("SELECT 'hello' FROM DUAL") + assert isinstance(value, util.text_type) + + value = testing.db.scalar("SELECT 'hello' FROM DUAL") + assert isinstance(value, util.binary_type) + + @testing.provide_metadata + def test_reflect_dates(self): + metadata = self.metadata + Table( + "date_types", metadata, + Column('d1', sqltypes.DATE), + Column('d2', oracle.DATE), + Column('d3', TIMESTAMP), + Column('d4', TIMESTAMP(timezone=True)), + Column('d5', oracle.INTERVAL(second_precision=5)), + ) + metadata.create_all() + m = MetaData(testing.db) + t1 = Table( + "date_types", m, + autoload=True) + assert isinstance(t1.c.d1.type, oracle.DATE) + assert isinstance(t1.c.d1.type, DateTime) + assert isinstance(t1.c.d2.type, oracle.DATE) + assert isinstance(t1.c.d2.type, DateTime) + assert isinstance(t1.c.d3.type, TIMESTAMP) + assert not t1.c.d3.type.timezone + assert isinstance(t1.c.d4.type, TIMESTAMP) + assert t1.c.d4.type.timezone + assert isinstance(t1.c.d5.type, oracle.INTERVAL) + + def _dont_test_reflect_all_types_schema(self): + types_table = Table('all_types', MetaData(testing.db), + Column('owner', String(30), primary_key=True), + Column('type_name', String(30), primary_key=True), + autoload=True, oracle_resolve_synonyms=True) + for row in types_table.select().execute().fetchall(): + [row[k] for k in row.keys()] + + @testing.provide_metadata + def test_raw_roundtrip(self): + metadata = self.metadata + raw_table = Table('raw', metadata, + Column('id', Integer, primary_key=True), + Column('data', oracle.RAW(35))) + metadata.create_all() + testing.db.execute(raw_table.insert(), id=1, data=b("ABCDEF")) + eq_( + testing.db.execute(raw_table.select()).first(), + (1, b("ABCDEF")) + ) + + @testing.provide_metadata + def test_reflect_nvarchar(self): + metadata = self.metadata + Table('tnv', metadata, Column('data', sqltypes.NVARCHAR(255))) + metadata.create_all() + m2 = MetaData(testing.db) + t2 = Table('tnv', m2, autoload=True) + assert isinstance(t2.c.data.type, sqltypes.NVARCHAR) + + if testing.against('oracle+cx_oracle'): + # nvarchar returns unicode natively. cx_oracle + # _OracleNVarChar type should be at play here. + assert isinstance( + t2.c.data.type.dialect_impl(testing.db.dialect), + cx_oracle._OracleNVarChar) + + data = u('m’a réveillé.') + t2.insert().execute(data=data) + res = t2.select().execute().first()['data'] + eq_(res, data) + assert isinstance(res, util.text_type) + + @testing.provide_metadata + def test_char_length(self): + metadata = self.metadata + t1 = Table('t1', metadata, + Column("c1", VARCHAR(50)), + Column("c2", NVARCHAR(250)), + Column("c3", CHAR(200))) + t1.create() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.length, 50) + eq_(t2.c.c2.type.length, 250) + eq_(t2.c.c3.type.length, 200) + + @testing.provide_metadata + def test_long_type(self): + metadata = self.metadata + + t = Table('t', metadata, Column('data', oracle.LONG)) + metadata.create_all(testing.db) + testing.db.execute(t.insert(), data='xyz') + eq_( + testing.db.scalar(select([t.c.data])), + "xyz" + ) + + def test_longstring(self): + metadata = MetaData(testing.db) + testing.db.execute(""" + CREATE TABLE Z_TEST + ( + ID NUMERIC(22) PRIMARY KEY, + ADD_USER VARCHAR2(20) NOT NULL + ) + """) + try: + t = Table("z_test", metadata, autoload=True) + t.insert().execute(id=1.0, add_user='foobar') + assert t.select().execute().fetchall() == [(1, 'foobar')] + finally: + testing.db.execute("DROP TABLE Z_TEST") + + +class LOBFetchTest(fixtures.TablesTest): + __only_on__ = 'oracle' + __backend__ = True + + run_inserts = 'once' + run_deletes = None + + @classmethod + def define_tables(cls, metadata): + Table( + "z_test", + metadata, + Column('id', Integer, primary_key=True), + Column('data', Text), + Column('bindata', LargeBinary)) + + Table( + 'binary_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', LargeBinary) + ) + + @classmethod + def insert_data(cls): + cls.data = data = [ + dict( + id=i, data='this is text %d' % i, + bindata=b('this is binary %d' % i) + ) for i in range(1, 20) + ] + + testing.db.execute(cls.tables.z_test.insert(), data) + + binary_table = cls.tables.binary_table + fname = os.path.join( + os.path.dirname(__file__), "..", "..", + 'binary_data_one.dat') + with open(fname, "rb") as file_: + cls.stream = stream = file_.read(12000) + + for i in range(1, 11): + binary_table.insert().execute(id=i, data=stream) + + def test_lobs_without_convert(self): + engine = testing_engine(options=dict(auto_convert_lobs=False)) + t = self.tables.z_test + row = engine.execute(t.select().where(t.c.id == 1)).first() + eq_(row['data'].read(), 'this is text 1') + eq_(row['bindata'].read(), b('this is binary 1')) + + def test_lobs_with_convert(self): + t = self.tables.z_test + row = testing.db.execute(t.select().where(t.c.id == 1)).first() + eq_(row['data'], 'this is text 1') + eq_(row['bindata'], b('this is binary 1')) + + def test_lobs_with_convert_raw(self): + row = testing.db.execute("select data, bindata from z_test").first() + eq_(row['data'], 'this is text 1') + eq_(row['bindata'], b('this is binary 1')) + + def test_lobs_without_convert_many_rows(self): + engine = testing_engine( + options=dict(auto_convert_lobs=False, arraysize=1)) + result = engine.execute( + "select id, data, bindata from z_test order by id") + results = result.fetchall() + + def go(): + eq_( + [ + dict( + id=row["id"], + data=row["data"].read(), + bindata=row["bindata"].read() + ) for row in results + ], + self.data) + # this comes from cx_Oracle because these are raw + # cx_Oracle.Variable objects + if testing.requires.oracle5x.enabled: + assert_raises_message( + testing.db.dialect.dbapi.ProgrammingError, + "LOB variable no longer valid after subsequent fetch", + go + ) + else: + go() + + def test_lobs_with_convert_many_rows(self): + # even with low arraysize, lobs are fine in autoconvert + engine = testing_engine( + options=dict(auto_convert_lobs=True, arraysize=1)) + result = engine.execute( + "select id, data, bindata from z_test order by id") + results = result.fetchall() + + eq_( + [ + dict( + id=row["id"], + data=row["data"], + bindata=row["bindata"] + ) for row in results + ], + self.data) + + def test_large_stream(self): + binary_table = self.tables.binary_table + result = binary_table.select().order_by(binary_table.c.id).\ + execute().fetchall() + eq_(result, [(i, self.stream) for i in range(1, 11)]) + + def test_large_stream_single_arraysize(self): + binary_table = self.tables.binary_table + eng = testing_engine(options={'arraysize': 1}) + result = eng.execute(binary_table.select(). + order_by(binary_table.c.id)).fetchall() + eq_(result, [(i, self.stream) for i in range(1, 11)]) + + +class EuroNumericTest(fixtures.TestBase): + """ + test the numeric output_type_handler when using non-US locale for NLS_LANG. + """ + + __only_on__ = 'oracle+cx_oracle' + __backend__ = True + + def setup(self): + connect = testing.db.pool._creator + + def _creator(): + conn = connect() + cursor = conn.cursor() + cursor.execute("ALTER SESSION SET NLS_TERRITORY='GERMANY'") + cursor.close() + return conn + + self.engine = testing_engine(options={"creator": _creator}) + + def teardown(self): + self.engine.dispose() + + def test_were_getting_a_comma(self): + connection = self.engine.pool._creator() + cursor = connection.cursor() + try: + cx_Oracle = self.engine.dialect.dbapi + + def output_type_handler(cursor, name, defaultType, + size, precision, scale): + return cursor.var(cx_Oracle.STRING, 255, + arraysize=cursor.arraysize) + cursor.outputtypehandler = output_type_handler + cursor.execute("SELECT 1.1 FROM DUAL") + row = cursor.fetchone() + eq_(row[0], "1,1") + finally: + cursor.close() + connection.close() + + def test_output_type_handler(self): + with self.engine.connect() as conn: + for stmt, exp, kw in [ + ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), + ("SELECT CAST(15 AS INTEGER) FROM DUAL", 15, {}), + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", + decimal.Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", + decimal.Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), + {'num': decimal.Decimal("2.5")}), + + ( + text( + "SELECT CAST(28.532 AS NUMERIC(5, 3)) " + "AS val FROM DUAL").columns( + val=Numeric(5, 3, asdecimal=True)), + decimal.Decimal("28.532"), {} + ) + ]: + test_exp = conn.scalar(stmt, **kw) + eq_( + test_exp, + exp + ) + assert type(test_exp) is type(exp) + + diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py deleted file mode 100644 index 5ca95aea9..000000000 --- a/test/dialect/test_oracle.py +++ /dev/null @@ -1,2316 +0,0 @@ -# coding: utf-8 - - -from sqlalchemy.testing import eq_ -from sqlalchemy import * -from sqlalchemy import types as sqltypes, exc, schema -from sqlalchemy.sql import table, column -from sqlalchemy.sql.elements import quoted_name -from sqlalchemy.testing import (fixtures, - AssertsExecutionResults, - AssertsCompiledSQL) -from sqlalchemy import testing -from sqlalchemy.util import u, b -from sqlalchemy import util -from sqlalchemy.testing import assert_raises, assert_raises_message -from sqlalchemy.testing.engines import testing_engine -from sqlalchemy.dialects.oracle import cx_oracle, base as oracle -from sqlalchemy.engine import default -import decimal -from sqlalchemy.engine import url -from sqlalchemy.testing.schema import Table, Column -import datetime -import os -from sqlalchemy import sql -from sqlalchemy.testing.mock import Mock - - -class DialectTest(fixtures.TestBase): - def test_cx_oracle_version_parse(self): - dialect = cx_oracle.OracleDialect_cx_oracle() - - eq_( - dialect._parse_cx_oracle_ver("5.2"), - (5, 2) - ) - - eq_( - dialect._parse_cx_oracle_ver("5.0.1"), - (5, 0, 1) - ) - - eq_( - dialect._parse_cx_oracle_ver("6.0b1"), - (6, 0) - ) - - def test_twophase_arg(self): - - mock_dbapi = Mock(version="5.0.3") - dialect = cx_oracle.OracleDialect_cx_oracle(dbapi=mock_dbapi) - args = dialect.create_connect_args( - url.make_url("oracle+cx_oracle://a:b@host/db")) - - eq_(args[1]['twophase'], True) - - mock_dbapi = Mock(version="5.0.3") - dialect = cx_oracle.OracleDialect_cx_oracle( - dbapi=mock_dbapi, allow_twophase=False) - args = dialect.create_connect_args( - url.make_url("oracle+cx_oracle://a:b@host/db")) - - eq_(args[1]['twophase'], False) - - mock_dbapi = Mock(version="6.0b1") - dialect = cx_oracle.OracleDialect_cx_oracle(dbapi=mock_dbapi) - args = dialect.create_connect_args( - url.make_url("oracle+cx_oracle://a:b@host/db")) - - assert 'twophase' not in args[1] - - -class OutParamTest(fixtures.TestBase, AssertsExecutionResults): - __only_on__ = 'oracle+cx_oracle' - __backend__ = True - - @classmethod - def setup_class(cls): - testing.db.execute(""" - create or replace procedure foo(x_in IN number, x_out OUT number, - y_out OUT number, z_out OUT varchar) IS - retval number; - begin - retval := 6; - x_out := 10; - y_out := x_in * 15; - z_out := NULL; - end; - """) - - def test_out_params(self): - result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' - ':z_out); end;', - bindparams=[bindparam('x_in', Float), - outparam('x_out', Integer), - outparam('y_out', Float), - outparam('z_out', String)]), - x_in=5) - eq_(result.out_parameters, - {'x_out': 10, 'y_out': 75, 'z_out': None}) - assert isinstance(result.out_parameters['x_out'], int) - - @classmethod - def teardown_class(cls): - testing.db.execute("DROP PROCEDURE foo") - - -class CXOracleArgsTest(fixtures.TestBase): - __only_on__ = 'oracle+cx_oracle' - __backend__ = True - - def test_autosetinputsizes(self): - dialect = cx_oracle.dialect() - assert dialect.auto_setinputsizes - - dialect = cx_oracle.dialect(auto_setinputsizes=False) - assert not dialect.auto_setinputsizes - - def test_exclude_inputsizes_none(self): - dialect = cx_oracle.dialect(exclude_setinputsizes=None) - eq_(dialect.exclude_setinputsizes, set()) - - def test_exclude_inputsizes_custom(self): - import cx_Oracle - dialect = cx_oracle.dialect(dbapi=cx_Oracle, - exclude_setinputsizes=('NCLOB',)) - eq_(dialect.exclude_setinputsizes, set([cx_Oracle.NCLOB])) - - -class QuotedBindRoundTripTest(fixtures.TestBase): - - __only_on__ = 'oracle' - __backend__ = True - - @testing.provide_metadata - def test_table_round_trip(self): - oracle.RESERVED_WORDS.remove('UNION') - - metadata = self.metadata - table = Table("t1", metadata, - Column("option", Integer), - Column("plain", Integer, quote=True), - # test that quote works for a reserved word - # that the dialect isn't aware of when quote - # is set - Column("union", Integer, quote=True)) - metadata.create_all() - - table.insert().execute( - {"option": 1, "plain": 1, "union": 1} - ) - eq_( - testing.db.execute(table.select()).first(), - (1, 1, 1) - ) - table.update().values(option=2, plain=2, union=2).execute() - eq_( - testing.db.execute(table.select()).first(), - (2, 2, 2) - ) - - def test_numeric_bind_round_trip(self): - eq_( - testing.db.scalar( - select([ - literal_column("2", type_=Integer()) + - bindparam("2_1", value=2)]) - ), - 4 - ) - - @testing.provide_metadata - def test_numeric_bind_in_crud(self): - t = Table( - "asfd", self.metadata, - Column("100K", Integer) - ) - t.create() - - testing.db.execute(t.insert(), {"100K": 10}) - eq_( - testing.db.scalar(t.select()), 10 - ) - - -class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = "oracle" # oracle.dialect() - - def test_true_false(self): - self.assert_compile( - sql.false(), "0" - ) - self.assert_compile( - sql.true(), - "1" - ) - - def test_owner(self): - meta = MetaData() - parent = Table('parent', meta, Column('id', Integer, - primary_key=True), Column('name', String(50)), - schema='ed') - child = Table('child', meta, Column('id', Integer, - primary_key=True), Column('parent_id', Integer, - ForeignKey('ed.parent.id')), schema='ed') - self.assert_compile(parent.join(child), - 'ed.parent JOIN ed.child ON ed.parent.id = ' - 'ed.child.parent_id') - - def test_subquery(self): - t = table('sometable', column('col1'), column('col2')) - s = select([t]) - s = select([s.c.col1, s.c.col2]) - - self.assert_compile(s, "SELECT col1, col2 FROM (SELECT " - "sometable.col1 AS col1, sometable.col2 " - "AS col2 FROM sometable)") - - def test_bindparam_quote(self): - """test that bound parameters take on quoting for reserved words, - column names quote flag enabled.""" - # note: this is only in cx_oracle at the moment. not sure - # what other hypothetical oracle dialects might need - - self.assert_compile( - bindparam("option"), ':"option"' - ) - self.assert_compile( - bindparam("plain"), ':plain' - ) - t = Table("s", MetaData(), Column('plain', Integer, quote=True)) - self.assert_compile( - t.insert().values(plain=5), - 'INSERT INTO s ("plain") VALUES (:"plain")' - ) - self.assert_compile( - t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"' - ) - - def test_cte(self): - part = table( - 'part', - column('part'), - column('sub_part'), - column('quantity') - ) - - included_parts = select([ - part.c.sub_part, part.c.part, part.c.quantity - ]).where(part.c.part == "p1").\ - cte(name="included_parts", recursive=True).\ - suffix_with( - "search depth first by part set ord1", - "cycle part set y_cycle to 1 default 0", dialect='oracle') - - incl_alias = included_parts.alias("pr1") - parts_alias = part.alias("p") - included_parts = included_parts.union_all( - select([ - parts_alias.c.sub_part, - parts_alias.c.part, parts_alias.c.quantity - ]).where(parts_alias.c.part == incl_alias.c.sub_part) - ) - - q = select([ - included_parts.c.sub_part, - func.sum(included_parts.c.quantity).label('total_quantity')]).\ - group_by(included_parts.c.sub_part) - - self.assert_compile( - q, - "WITH included_parts(sub_part, part, quantity) AS " - "(SELECT part.sub_part AS sub_part, part.part AS part, " - "part.quantity AS quantity FROM part WHERE part.part = :part_1 " - "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, " - "p.quantity AS quantity FROM part p, included_parts pr1 " - "WHERE p.part = pr1.sub_part) " - "search depth first by part set ord1 cycle part set " - "y_cycle to 1 default 0 " - "SELECT included_parts.sub_part, sum(included_parts.quantity) " - "AS total_quantity FROM included_parts " - "GROUP BY included_parts.sub_part" - ) - - def test_limit(self): - t = table('sometable', column('col1'), column('col2')) - s = select([t]) - c = s.compile(dialect=oracle.OracleDialect()) - assert t.c.col1 in set(c._create_result_map()['col1'][1]) - s = select([t]).limit(10).offset(20) - self.assert_compile(s, - 'SELECT col1, col2 FROM (SELECT col1, ' - 'col2, ROWNUM AS ora_rn FROM (SELECT ' - 'sometable.col1 AS col1, sometable.col2 AS ' - 'col2 FROM sometable) WHERE ROWNUM <= ' - ':param_1 + :param_2) WHERE ora_rn > :param_2', - checkparams={'param_1': 10, 'param_2': 20}) - - c = s.compile(dialect=oracle.OracleDialect()) - eq_(len(c._result_columns), 2) - assert t.c.col1 in set(c._create_result_map()['col1'][1]) - - s2 = select([s.c.col1, s.c.col2]) - self.assert_compile(s2, - 'SELECT col1, col2 FROM (SELECT col1, col2 ' - 'FROM (SELECT col1, col2, ROWNUM AS ora_rn ' - 'FROM (SELECT sometable.col1 AS col1, ' - 'sometable.col2 AS col2 FROM sometable) ' - 'WHERE ROWNUM <= :param_1 + :param_2) ' - 'WHERE ora_rn > :param_2)', - checkparams={'param_1': 10, 'param_2': 20}) - - self.assert_compile(s2, - 'SELECT col1, col2 FROM (SELECT col1, col2 ' - 'FROM (SELECT col1, col2, ROWNUM AS ora_rn ' - 'FROM (SELECT sometable.col1 AS col1, ' - 'sometable.col2 AS col2 FROM sometable) ' - 'WHERE ROWNUM <= :param_1 + :param_2) ' - 'WHERE ora_rn > :param_2)') - c = s2.compile(dialect=oracle.OracleDialect()) - eq_(len(c._result_columns), 2) - assert s.c.col1 in set(c._create_result_map()['col1'][1]) - - s = select([t]).limit(10).offset(20).order_by(t.c.col2) - self.assert_compile(s, - 'SELECT col1, col2 FROM (SELECT col1, ' - 'col2, ROWNUM AS ora_rn FROM (SELECT ' - 'sometable.col1 AS col1, sometable.col2 AS ' - 'col2 FROM sometable ORDER BY ' - 'sometable.col2) WHERE ROWNUM <= ' - ':param_1 + :param_2) WHERE ora_rn > :param_2', - checkparams={'param_1': 10, 'param_2': 20} - ) - c = s.compile(dialect=oracle.OracleDialect()) - eq_(len(c._result_columns), 2) - assert t.c.col1 in set(c._create_result_map()['col1'][1]) - - s = select([t], for_update=True).limit(10).order_by(t.c.col2) - self.assert_compile(s, - 'SELECT col1, col2 FROM (SELECT ' - 'sometable.col1 AS col1, sometable.col2 AS ' - 'col2 FROM sometable ORDER BY ' - 'sometable.col2) WHERE ROWNUM <= :param_1 ' - 'FOR UPDATE') - - s = select([t], - for_update=True).limit(10).offset(20).order_by(t.c.col2) - self.assert_compile(s, - 'SELECT col1, col2 FROM (SELECT col1, ' - 'col2, ROWNUM AS ora_rn FROM (SELECT ' - 'sometable.col1 AS col1, sometable.col2 AS ' - 'col2 FROM sometable ORDER BY ' - 'sometable.col2) WHERE ROWNUM <= ' - ':param_1 + :param_2) WHERE ora_rn > :param_2 FOR ' - 'UPDATE') - - def test_for_update(self): - table1 = table('mytable', - column('myid'), column('name'), column('description')) - - self.assert_compile( - table1.select(table1.c.myid == 7).with_for_update(), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - - self.assert_compile( - table1 - .select(table1.c.myid == 7) - .with_for_update(of=table1.c.myid), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 " - "FOR UPDATE OF mytable.myid") - - self.assert_compile( - table1.select(table1.c.myid == 7).with_for_update(nowait=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT") - - self.assert_compile( - table1 - .select(table1.c.myid == 7) - .with_for_update(nowait=True, of=table1.c.myid), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 " - "FOR UPDATE OF mytable.myid NOWAIT") - - self.assert_compile( - table1 - .select(table1.c.myid == 7) - .with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " - "mytable.myid, mytable.name NOWAIT") - - self.assert_compile( - table1.select(table1.c.myid == 7) - .with_for_update(skip_locked=True, - of=[table1.c.myid, table1.c.name]), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF " - "mytable.myid, mytable.name SKIP LOCKED") - - # key_share has no effect - self.assert_compile( - table1.select(table1.c.myid == 7).with_for_update(key_share=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - - # read has no effect - self.assert_compile( - table1 - .select(table1.c.myid == 7) - .with_for_update(read=True, key_share=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - - ta = table1.alias() - self.assert_compile( - ta - .select(ta.c.myid == 7) - .with_for_update(of=[ta.c.myid, ta.c.name]), - "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " - "FROM mytable mytable_1 " - "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF " - "mytable_1.myid, mytable_1.name" - ) - - def test_for_update_of_w_limit_adaption_col_present(self): - table1 = table('mytable', column('myid'), column('name')) - - self.assert_compile( - select([table1.c.myid, table1.c.name]). - where(table1.c.myid == 7). - with_for_update(nowait=True, of=table1.c.name). - limit(10), - "SELECT myid, name FROM " - "(SELECT mytable.myid AS myid, mytable.name AS name " - "FROM mytable WHERE mytable.myid = :myid_1) " - "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT", - ) - - def test_for_update_of_w_limit_adaption_col_unpresent(self): - table1 = table('mytable', column('myid'), column('name')) - - self.assert_compile( - select([table1.c.myid]). - where(table1.c.myid == 7). - with_for_update(nowait=True, of=table1.c.name). - limit(10), - "SELECT myid FROM " - "(SELECT mytable.myid AS myid, mytable.name AS name " - "FROM mytable WHERE mytable.myid = :myid_1) " - "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT", - ) - - def test_for_update_of_w_limit_offset_adaption_col_present(self): - table1 = table('mytable', column('myid'), column('name')) - - self.assert_compile( - select([table1.c.myid, table1.c.name]). - where(table1.c.myid == 7). - with_for_update(nowait=True, of=table1.c.name). - limit(10).offset(50), - "SELECT myid, name FROM (SELECT myid, name, ROWNUM AS ora_rn " - "FROM (SELECT mytable.myid AS myid, mytable.name AS name " - "FROM mytable WHERE mytable.myid = :myid_1) " - "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " - "FOR UPDATE OF name NOWAIT", - ) - - def test_for_update_of_w_limit_offset_adaption_col_unpresent(self): - table1 = table('mytable', column('myid'), column('name')) - - self.assert_compile( - select([table1.c.myid]). - where(table1.c.myid == 7). - with_for_update(nowait=True, of=table1.c.name). - limit(10).offset(50), - "SELECT myid FROM (SELECT myid, ROWNUM AS ora_rn, name " - "FROM (SELECT mytable.myid AS myid, mytable.name AS name " - "FROM mytable WHERE mytable.myid = :myid_1) " - "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " - "FOR UPDATE OF name NOWAIT", - ) - - def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self): - table1 = table('mytable', column('myid'), column('foo'), column('bar')) - - self.assert_compile( - select([table1.c.myid, table1.c.bar]). - where(table1.c.myid == 7). - with_for_update(nowait=True, of=[table1.c.foo, table1.c.bar]). - limit(10).offset(50), - "SELECT myid, bar FROM (SELECT myid, bar, ROWNUM AS ora_rn, " - "foo FROM (SELECT mytable.myid AS myid, mytable.bar AS bar, " - "mytable.foo AS foo FROM mytable WHERE mytable.myid = :myid_1) " - "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 " - "FOR UPDATE OF foo, bar NOWAIT" - ) - - def test_limit_preserves_typing_information(self): - class MyType(TypeDecorator): - impl = Integer - - stmt = select([type_coerce(column('x'), MyType).label('foo')]).limit(1) - dialect = oracle.dialect() - compiled = stmt.compile(dialect=dialect) - assert isinstance(compiled._create_result_map()['foo'][-1], MyType) - - def test_use_binds_for_limits_disabled(self): - t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits=False) - - self.assert_compile( - select([t]).limit(10), - "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " - "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM <= 10", - dialect=dialect) - - self.assert_compile( - select([t]).offset(10), - "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " - "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " - "FROM sometable)) WHERE ora_rn > 10", - dialect=dialect) - - self.assert_compile( - select([t]).limit(10).offset(10), - "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " - "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " - "FROM sometable) WHERE ROWNUM <= 20) WHERE ora_rn > 10", - dialect=dialect) - - def test_use_binds_for_limits_enabled(self): - t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits=True) - - self.assert_compile( - select([t]).limit(10), - "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " - "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM " - "<= :param_1", - dialect=dialect) - - self.assert_compile( - select([t]).offset(10), - "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " - "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " - "FROM sometable)) WHERE ora_rn > :param_1", - dialect=dialect) - - self.assert_compile( - select([t]).limit(10).offset(10), - "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " - "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " - "FROM sometable) WHERE ROWNUM <= :param_1 + :param_2) " - "WHERE ora_rn > :param_2", - dialect=dialect, - checkparams={'param_1': 10, 'param_2': 10}) - - def test_long_labels(self): - dialect = default.DefaultDialect() - dialect.max_identifier_length = 30 - - ora_dialect = oracle.dialect() - - m = MetaData() - a_table = Table( - 'thirty_characters_table_xxxxxx', - m, - Column('id', Integer, primary_key=True) - ) - - other_table = Table( - 'other_thirty_characters_table_', - m, - Column('id', Integer, primary_key=True), - Column('thirty_characters_table_id', - Integer, - ForeignKey('thirty_characters_table_xxxxxx.id'), - primary_key=True)) - - anon = a_table.alias() - self.assert_compile(select([other_table, - anon]). - select_from( - other_table.outerjoin(anon)).apply_labels(), - 'SELECT other_thirty_characters_table_.id ' - 'AS other_thirty_characters__1, ' - 'other_thirty_characters_table_.thirty_char' - 'acters_table_id AS other_thirty_characters' - '__2, thirty_characters_table__1.id AS ' - 'thirty_characters_table__3 FROM ' - 'other_thirty_characters_table_ LEFT OUTER ' - 'JOIN thirty_characters_table_xxxxxx AS ' - 'thirty_characters_table__1 ON ' - 'thirty_characters_table__1.id = ' - 'other_thirty_characters_table_.thirty_char' - 'acters_table_id', dialect=dialect) - self.assert_compile(select([other_table, - anon]).select_from( - other_table.outerjoin(anon)).apply_labels(), - 'SELECT other_thirty_characters_table_.id ' - 'AS other_thirty_characters__1, ' - 'other_thirty_characters_table_.thirty_char' - 'acters_table_id AS other_thirty_characters' - '__2, thirty_characters_table__1.id AS ' - 'thirty_characters_table__3 FROM ' - 'other_thirty_characters_table_ LEFT OUTER ' - 'JOIN thirty_characters_table_xxxxxx ' - 'thirty_characters_table__1 ON ' - 'thirty_characters_table__1.id = ' - 'other_thirty_characters_table_.thirty_char' - 'acters_table_id', dialect=ora_dialect) - - def test_outer_join(self): - table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String)) - - table2 = table( - 'myothertable', - column('otherid', Integer), - column('othername', String), - ) - - table3 = table( - 'thirdtable', - column('userid', Integer), - column('otherstuff', String), - ) - - query = select([table1, table2], - or_(table1.c.name == 'fred', - table1.c.myid == 10, table2.c.othername != 'jack', - text('EXISTS (select yay from foo where boo = lar)') - ), - from_obj=[outerjoin(table1, - table2, - table1.c.myid == table2.c.otherid)]) - self.assert_compile(query, - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, myothertable.otherid,' - ' myothertable.othername FROM mytable, ' - 'myothertable WHERE (mytable.name = ' - ':name_1 OR mytable.myid = :myid_1 OR ' - 'myothertable.othername != :othername_1 OR ' - 'EXISTS (select yay from foo where boo = ' - 'lar)) AND mytable.myid = ' - 'myothertable.otherid(+)', - dialect=oracle.OracleDialect(use_ansi=False)) - query = table1.outerjoin(table2, - table1.c.myid == table2.c.otherid) \ - .outerjoin(table3, table3.c.userid == table2.c.otherid) - self.assert_compile(query.select(), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, myothertable.otherid,' - ' myothertable.othername, ' - 'thirdtable.userid, thirdtable.otherstuff ' - 'FROM mytable LEFT OUTER JOIN myothertable ' - 'ON mytable.myid = myothertable.otherid ' - 'LEFT OUTER JOIN thirdtable ON ' - 'thirdtable.userid = myothertable.otherid') - - self.assert_compile(query.select(), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, myothertable.otherid,' - ' myothertable.othername, ' - 'thirdtable.userid, thirdtable.otherstuff ' - 'FROM mytable, myothertable, thirdtable ' - 'WHERE thirdtable.userid(+) = ' - 'myothertable.otherid AND mytable.myid = ' - 'myothertable.otherid(+)', - dialect=oracle.dialect(use_ansi=False)) - query = table1.join(table2, - table1.c.myid == table2.c.otherid) \ - .join(table3, table3.c.userid == table2.c.otherid) - self.assert_compile(query.select(), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, myothertable.otherid,' - ' myothertable.othername, ' - 'thirdtable.userid, thirdtable.otherstuff ' - 'FROM mytable, myothertable, thirdtable ' - 'WHERE thirdtable.userid = ' - 'myothertable.otherid AND mytable.myid = ' - 'myothertable.otherid', - dialect=oracle.dialect(use_ansi=False)) - query = table1.join(table2, - table1.c.myid == table2.c.otherid) \ - .outerjoin(table3, table3.c.userid == table2.c.otherid) - self.assert_compile(query.select().order_by(table1.c.name). - limit(10).offset(5), - 'SELECT myid, name, description, otherid, ' - 'othername, userid, otherstuff FROM ' - '(SELECT myid, name, description, otherid, ' - 'othername, userid, otherstuff, ROWNUM AS ' - 'ora_rn FROM (SELECT mytable.myid AS myid, ' - 'mytable.name AS name, mytable.description ' - 'AS description, myothertable.otherid AS ' - 'otherid, myothertable.othername AS ' - 'othername, thirdtable.userid AS userid, ' - 'thirdtable.otherstuff AS otherstuff FROM ' - 'mytable, myothertable, thirdtable WHERE ' - 'thirdtable.userid(+) = ' - 'myothertable.otherid AND mytable.myid = ' - 'myothertable.otherid ORDER BY mytable.name) ' - 'WHERE ROWNUM <= :param_1 + :param_2) ' - 'WHERE ora_rn > :param_2', - checkparams={'param_1': 10, 'param_2': 5}, - dialect=oracle.dialect(use_ansi=False)) - - subq = select([table1]).select_from( - table1.outerjoin(table2, table1.c.myid == table2.c.otherid)) \ - .alias() - q = select([table3]).select_from( - table3.outerjoin(subq, table3.c.userid == subq.c.myid)) - - self.assert_compile(q, - 'SELECT thirdtable.userid, ' - 'thirdtable.otherstuff FROM thirdtable ' - 'LEFT OUTER JOIN (SELECT mytable.myid AS ' - 'myid, mytable.name AS name, ' - 'mytable.description AS description FROM ' - 'mytable LEFT OUTER JOIN myothertable ON ' - 'mytable.myid = myothertable.otherid) ' - 'anon_1 ON thirdtable.userid = anon_1.myid', - dialect=oracle.dialect(use_ansi=True)) - - self.assert_compile(q, - 'SELECT thirdtable.userid, ' - 'thirdtable.otherstuff FROM thirdtable, ' - '(SELECT mytable.myid AS myid, ' - 'mytable.name AS name, mytable.description ' - 'AS description FROM mytable, myothertable ' - 'WHERE mytable.myid = myothertable.otherid(' - '+)) anon_1 WHERE thirdtable.userid = ' - 'anon_1.myid(+)', - dialect=oracle.dialect(use_ansi=False)) - - q = select([table1.c.name]).where(table1.c.name == 'foo') - self.assert_compile(q, - 'SELECT mytable.name FROM mytable WHERE ' - 'mytable.name = :name_1', - dialect=oracle.dialect(use_ansi=False)) - subq = select([table3.c.otherstuff]) \ - .where(table3.c.otherstuff == table1.c.name).label('bar') - q = select([table1.c.name, subq]) - self.assert_compile(q, - 'SELECT mytable.name, (SELECT ' - 'thirdtable.otherstuff FROM thirdtable ' - 'WHERE thirdtable.otherstuff = ' - 'mytable.name) AS bar FROM mytable', - dialect=oracle.dialect(use_ansi=False)) - - def test_nonansi_nested_right_join(self): - a = table('a', column('a')) - b = table('b', column('b')) - c = table('c', column('c')) - - j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b) - - self.assert_compile( - select([j]), - "SELECT a.a, b.b, c.c FROM a, b, c " - "WHERE a.a = b.b AND b.b = c.c", - dialect=oracle.OracleDialect(use_ansi=False) - ) - - j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b) - - self.assert_compile( - select([j]), - "SELECT a.a, b.b, c.c FROM a, b, c " - "WHERE a.a = b.b(+) AND b.b = c.c", - dialect=oracle.OracleDialect(use_ansi=False) - ) - - j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b) - - self.assert_compile( - select([j]), - "SELECT a.a, b.b, c.c FROM a, b, c " - "WHERE a.a = b.b AND b.b = c.c(+)", - dialect=oracle.OracleDialect(use_ansi=False) - ) - - def test_alias_outer_join(self): - address_types = table('address_types', column('id'), - column('name')) - addresses = table('addresses', column('id'), column('user_id'), - column('address_type_id'), - column('email_address')) - at_alias = address_types.alias() - s = select([at_alias, addresses]) \ - .select_from( - addresses.outerjoin( - at_alias, - addresses.c.address_type_id == at_alias.c.id)) \ - .where(addresses.c.user_id == 7) \ - .order_by(addresses.c.id, address_types.c.id) - self.assert_compile(s, - 'SELECT address_types_1.id, ' - 'address_types_1.name, addresses.id, ' - 'addresses.user_id, addresses.address_type_' - 'id, addresses.email_address FROM ' - 'addresses LEFT OUTER JOIN address_types ' - 'address_types_1 ON addresses.address_type_' - 'id = address_types_1.id WHERE ' - 'addresses.user_id = :user_id_1 ORDER BY ' - 'addresses.id, address_types.id') - - def test_returning_insert(self): - t1 = table('t1', column('c1'), column('c2'), column('c3')) - self.assert_compile( - t1.insert().values(c1=1).returning(t1.c.c2, t1.c.c3), - "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " - "t1.c2, t1.c3 INTO :ret_0, :ret_1") - - def test_returning_insert_functional(self): - t1 = table('t1', - column('c1'), - column('c2', String()), - column('c3', String())) - fn = func.lower(t1.c.c2, type_=String()) - stmt = t1.insert().values(c1=1).returning(fn, t1.c.c3) - compiled = stmt.compile(dialect=oracle.dialect()) - eq_(compiled._create_result_map(), - {'ret_1': ('ret_1', (t1.c.c3, 'c3', 'c3'), t1.c.c3.type), - 'ret_0': ('ret_0', (fn, 'lower', None), fn.type)}) - self.assert_compile( - stmt, - "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " - "lower(t1.c2), t1.c3 INTO :ret_0, :ret_1") - - def test_returning_insert_labeled(self): - t1 = table('t1', column('c1'), column('c2'), column('c3')) - self.assert_compile( - t1.insert().values(c1=1).returning( - t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), - "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " - "t1.c2, t1.c3 INTO :ret_0, :ret_1") - - def test_compound(self): - t1 = table('t1', column('c1'), column('c2'), column('c3')) - t2 = table('t2', column('c1'), column('c2'), column('c3')) - self.assert_compile(union(t1.select(), t2.select()), - 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 UNION ' - 'SELECT t2.c1, t2.c2, t2.c3 FROM t2') - self.assert_compile(except_(t1.select(), t2.select()), - 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 MINUS ' - 'SELECT t2.c1, t2.c2, t2.c3 FROM t2') - - def test_no_paren_fns(self): - for fn, expected in [ - (func.uid(), "uid"), - (func.UID(), "UID"), - (func.sysdate(), "sysdate"), - (func.row_number(), "row_number()"), - (func.rank(), "rank()"), - (func.now(), "CURRENT_TIMESTAMP"), - (func.current_timestamp(), "CURRENT_TIMESTAMP"), - (func.user(), "USER"), - ]: - self.assert_compile(fn, expected) - - def test_create_index_alt_schema(self): - m = MetaData() - t1 = Table('foo', m, - Column('x', Integer), - schema="alt_schema") - self.assert_compile( - schema.CreateIndex(Index("bar", t1.c.x)), - "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)" - ) - - def test_create_index_expr(self): - m = MetaData() - t1 = Table('foo', m, - Column('x', Integer)) - self.assert_compile( - schema.CreateIndex(Index("bar", t1.c.x > 5)), - "CREATE INDEX bar ON foo (x > 5)" - ) - - def test_table_options(self): - m = MetaData() - - t = Table( - 'foo', m, - Column('x', Integer), - prefixes=["GLOBAL TEMPORARY"], - oracle_on_commit="PRESERVE ROWS" - ) - - self.assert_compile( - schema.CreateTable(t), - "CREATE GLOBAL TEMPORARY TABLE " - "foo (x INTEGER) ON COMMIT PRESERVE ROWS" - ) - - def test_create_table_compress(self): - m = MetaData() - tbl1 = Table('testtbl1', m, Column('data', Integer), - oracle_compress=True) - tbl2 = Table('testtbl2', m, Column('data', Integer), - oracle_compress="OLTP") - - self.assert_compile(schema.CreateTable(tbl1), - "CREATE TABLE testtbl1 (data INTEGER) COMPRESS") - self.assert_compile(schema.CreateTable(tbl2), - "CREATE TABLE testtbl2 (data INTEGER) " - "COMPRESS FOR OLTP") - - def test_create_index_bitmap_compress(self): - m = MetaData() - tbl = Table('testtbl', m, Column('data', Integer)) - idx1 = Index('idx1', tbl.c.data, oracle_compress=True) - idx2 = Index('idx2', tbl.c.data, oracle_compress=1) - idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True) - - self.assert_compile(schema.CreateIndex(idx1), - "CREATE INDEX idx1 ON testtbl (data) COMPRESS") - self.assert_compile(schema.CreateIndex(idx2), - "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1") - self.assert_compile(schema.CreateIndex(idx3), - "CREATE BITMAP INDEX idx3 ON testtbl (data)") - - -class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): - - def _dialect(self, server_version, **kw): - def server_version_info(conn): - return server_version - - dialect = oracle.dialect( - dbapi=Mock(version="0.0.0", paramstyle="named"), - **kw) - dialect._get_server_version_info = server_version_info - dialect._check_unicode_returns = Mock() - dialect._check_unicode_description = Mock() - dialect._get_default_schema_name = Mock() - return dialect - - def test_ora8_flags(self): - dialect = self._dialect((8, 2, 5)) - - # before connect, assume modern DB - assert dialect._supports_char_length - assert dialect._supports_nchar - assert dialect.use_ansi - - dialect.initialize(Mock()) - assert not dialect.implicit_returning - assert not dialect._supports_char_length - assert not dialect._supports_nchar - assert not dialect.use_ansi - self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) - self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) - self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) - - dialect = self._dialect((8, 2, 5), implicit_returning=True) - dialect.initialize(testing.db.connect()) - assert dialect.implicit_returning - - def test_default_flags(self): - """test with no initialization or server version info""" - - dialect = self._dialect(None) - - assert dialect._supports_char_length - assert dialect._supports_nchar - assert dialect.use_ansi - self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) - self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) - self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) - - def test_ora10_flags(self): - dialect = self._dialect((10, 2, 5)) - - dialect.initialize(Mock()) - assert dialect._supports_char_length - assert dialect._supports_nchar - assert dialect.use_ansi - self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) - self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) - self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) - - -class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - __backend__ = True - - @classmethod - def setup_class(cls): - # currently assuming full DBA privs for the user. - # don't really know how else to go here unless - # we connect as the other user. - - for stmt in (""" -create table %(test_schema)s.parent( - id integer primary key, - data varchar2(50) -); - -create table %(test_schema)s.child( - id integer primary key, - data varchar2(50), - parent_id integer references %(test_schema)s.parent(id) -); - -create table local_table( - id integer primary key, - data varchar2(50) -); - -create synonym %(test_schema)s.ptable for %(test_schema)s.parent; -create synonym %(test_schema)s.ctable for %(test_schema)s.child; - -create synonym %(test_schema)s_pt for %(test_schema)s.parent; - -create synonym %(test_schema)s.local_table for local_table; - --- can't make a ref from local schema to the --- remote schema's table without this, --- *and* cant give yourself a grant ! --- so we give it to public. ideas welcome. -grant references on %(test_schema)s.parent to public; -grant references on %(test_schema)s.child to public; -""" % {"test_schema": testing.config.test_schema}).split(";"): - if stmt.strip(): - testing.db.execute(stmt) - - @classmethod - def teardown_class(cls): - for stmt in (""" -drop table %(test_schema)s.child; -drop table %(test_schema)s.parent; -drop table local_table; -drop synonym %(test_schema)s.ctable; -drop synonym %(test_schema)s.ptable; -drop synonym %(test_schema)s_pt; -drop synonym %(test_schema)s.local_table; - -""" % {"test_schema": testing.config.test_schema}).split(";"): - if stmt.strip(): - testing.db.execute(stmt) - - @testing.provide_metadata - def test_create_same_names_explicit_schema(self): - schema = testing.db.dialect.default_schema_name - meta = self.metadata - parent = Table('parent', meta, - Column('pid', Integer, primary_key=True), - schema=schema) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', - Integer, - ForeignKey('%s.parent.pid' % schema)), - schema=schema) - meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - - def test_reflect_alt_table_owner_local_synonym(self): - meta = MetaData(testing.db) - parent = Table('%s_pt' % testing.config.test_schema, - meta, - autoload=True, - oracle_resolve_synonyms=True) - self.assert_compile(parent.select(), - "SELECT %(test_schema)s_pt.id, " - "%(test_schema)s_pt.data FROM %(test_schema)s_pt" - % {"test_schema": testing.config.test_schema}) - select([parent]).execute().fetchall() - - def test_reflect_alt_synonym_owner_local_table(self): - meta = MetaData(testing.db) - parent = Table( - 'local_table', meta, autoload=True, - oracle_resolve_synonyms=True, schema=testing.config.test_schema) - self.assert_compile( - parent.select(), - "SELECT %(test_schema)s.local_table.id, " - "%(test_schema)s.local_table.data " - "FROM %(test_schema)s.local_table" % - {"test_schema": testing.config.test_schema} - ) - select([parent]).execute().fetchall() - - @testing.provide_metadata - def test_create_same_names_implicit_schema(self): - meta = self.metadata - parent = Table('parent', - meta, - Column('pid', Integer, primary_key=True)) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', Integer, ForeignKey('parent.pid'))) - meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - - def test_reflect_alt_owner_explicit(self): - meta = MetaData(testing.db) - parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) - child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) - - self.assert_compile( - parent.join(child), - "%(test_schema)s.parent JOIN %(test_schema)s.child ON " - "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % { - "test_schema": testing.config.test_schema - }) - select([parent, child]).\ - select_from(parent.join(child)).\ - execute().fetchall() - - def test_reflect_local_to_remote(self): - testing.db.execute( - 'CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%(test_schema)s.parent(id))' % { - "test_schema": testing.config.test_schema}) - try: - meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True) - parent = meta.tables['%s.parent' % testing.config.test_schema] - self.assert_compile(parent.join(lcl), - '%(test_schema)s.parent JOIN localtable ON ' - '%(test_schema)s.parent.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema} - ) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() - finally: - testing.db.execute('DROP TABLE localtable') - - def test_reflect_alt_owner_implicit(self): - meta = MetaData(testing.db) - parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) - child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) - self.assert_compile( - parent.join(child), - '%(test_schema)s.parent JOIN %(test_schema)s.child ' - 'ON %(test_schema)s.parent.id = ' - '%(test_schema)s.child.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() - - def test_reflect_alt_owner_synonyms(self): - testing.db.execute('CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%s.ptable(id))' % testing.config.test_schema) - try: - meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True, - oracle_resolve_synonyms=True) - parent = meta.tables['%s.ptable' % testing.config.test_schema] - self.assert_compile( - parent.join(lcl), - '%(test_schema)s.ptable JOIN localtable ON ' - '%(test_schema)s.ptable.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() - finally: - testing.db.execute('DROP TABLE localtable') - - def test_reflect_remote_synonyms(self): - meta = MetaData(testing.db) - parent = Table('ptable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) - child = Table('ctable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) - self.assert_compile( - parent.join(child), - '%(test_schema)s.ptable JOIN ' - '%(test_schema)s.ctable ' - 'ON %(test_schema)s.ptable.id = ' - '%(test_schema)s.ctable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() - - -class ConstraintTest(fixtures.TablesTest): - - __only_on__ = 'oracle' - __backend__ = True - run_deletes = None - - @classmethod - def define_tables(cls, metadata): - Table('foo', metadata, Column('id', Integer, primary_key=True)) - - def test_oracle_has_no_on_update_cascade(self): - bar = Table('bar', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', - Integer, - ForeignKey('foo.id', onupdate='CASCADE'))) - assert_raises(exc.SAWarning, bar.create) - - bat = Table('bat', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer), - ForeignKeyConstraint(['foo_id'], ['foo.id'], - onupdate='CASCADE')) - assert_raises(exc.SAWarning, bat.create) - - def test_reflect_check_include_all(self): - insp = inspect(testing.db) - eq_(insp.get_check_constraints('foo'), []) - eq_( - [rec['sqltext'] - for rec in insp.get_check_constraints('foo', include_all=True)], - ['"ID" IS NOT NULL']) - - -class TwoPhaseTest(fixtures.TablesTest): - """test cx_oracle two phase, which remains in a semi-broken state - so requires a carefully written test.""" - - __only_on__ = 'oracle+cx_oracle' - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table('datatable', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(50))) - - def _connection(self): - conn = testing.db.connect() - conn.detach() - return conn - - def _assert_data(self, rows): - eq_( - testing.db.scalar("select count(*) from datatable"), - rows - ) - - def test_twophase_prepare_false(self): - conn = self._connection() - for i in range(2): - trans = conn.begin_twophase() - conn.execute("select 1 from dual") - trans.prepare() - trans.commit() - conn.close() - self._assert_data(0) - - def test_twophase_prepare_true(self): - conn = self._connection() - for i in range(2): - trans = conn.begin_twophase() - conn.execute("insert into datatable (id, data) " - "values (%s, 'somedata')" % i) - trans.prepare() - trans.commit() - conn.close() - self._assert_data(2) - - def test_twophase_rollback(self): - conn = self._connection() - trans = conn.begin_twophase() - conn.execute("insert into datatable (id, data) " - "values (%s, 'somedata')" % 1) - trans.rollback() - - trans = conn.begin_twophase() - conn.execute("insert into datatable (id, data) " - "values (%s, 'somedata')" % 1) - trans.prepare() - trans.commit() - - conn.close() - self._assert_data(1) - - def test_not_prepared(self): - conn = self._connection() - trans = conn.begin_twophase() - conn.execute("insert into datatable (id, data) " - "values (%s, 'somedata')" % 1) - trans.commit() - conn.close() - self._assert_data(1) - - -class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = oracle.OracleDialect() - - def test_no_clobs_for_string_params(self): - """test that simple string params get a DBAPI type of - VARCHAR, not CLOB. This is to prevent setinputsizes - from setting up cx_oracle.CLOBs on - string-based bind params [ticket:793].""" - - class FakeDBAPI(object): - def __getattr__(self, attr): - return attr - - dialect = oracle.OracleDialect() - dbapi = FakeDBAPI() - - b = bindparam("foo", "hello world!") - eq_( - b.type.dialect_impl(dialect).get_dbapi_type(dbapi), - 'STRING' - ) - - b = bindparam("foo", "hello world!") - eq_( - b.type.dialect_impl(dialect).get_dbapi_type(dbapi), - 'STRING' - ) - - def test_long(self): - self.assert_compile(oracle.LONG(), "LONG") - - def test_type_adapt(self): - dialect = cx_oracle.dialect() - - for start, test in [ - (Date(), cx_oracle._OracleDate), - (oracle.OracleRaw(), cx_oracle._OracleRaw), - (String(), String), - (VARCHAR(), cx_oracle._OracleString), - (DATE(), cx_oracle._OracleDate), - (oracle.DATE(), oracle.DATE), - (String(50), cx_oracle._OracleString), - (Unicode(), cx_oracle._OracleNVarChar), - (Text(), cx_oracle._OracleText), - (UnicodeText(), cx_oracle._OracleUnicodeText), - (NCHAR(), cx_oracle._OracleNVarChar), - (oracle.RAW(50), cx_oracle._OracleRaw), - ]: - assert isinstance(start.dialect_impl(dialect), test), \ - "wanted %r got %r" % (test, start.dialect_impl(dialect)) - - def test_raw_compile(self): - self.assert_compile(oracle.RAW(), "RAW") - self.assert_compile(oracle.RAW(35), "RAW(35)") - - def test_char_length(self): - self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") - - oracle8dialect = oracle.dialect() - oracle8dialect.server_version_info = (8, 0) - self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) - - self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") - self.assert_compile(CHAR(50), "CHAR(50)") - - def test_varchar_types(self): - dialect = oracle.dialect() - for typ, exp in [ - (String(50), "VARCHAR2(50 CHAR)"), - (Unicode(50), "NVARCHAR2(50)"), - (NVARCHAR(50), "NVARCHAR2(50)"), - (VARCHAR(50), "VARCHAR(50 CHAR)"), - (oracle.NVARCHAR2(50), "NVARCHAR2(50)"), - (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"), - (String(), "VARCHAR2"), - (Unicode(), "NVARCHAR2"), - (NVARCHAR(), "NVARCHAR2"), - (VARCHAR(), "VARCHAR"), - (oracle.NVARCHAR2(), "NVARCHAR2"), - (oracle.VARCHAR2(), "VARCHAR2"), - ]: - self.assert_compile(typ, exp, dialect=dialect) - - def test_interval(self): - for type_, expected in [(oracle.INTERVAL(), - 'INTERVAL DAY TO SECOND'), - (oracle.INTERVAL(day_precision=3), - 'INTERVAL DAY(3) TO SECOND'), - (oracle.INTERVAL(second_precision=5), - 'INTERVAL DAY TO SECOND(5)'), - (oracle.INTERVAL(day_precision=2, - second_precision=5), - 'INTERVAL DAY(2) TO SECOND(5)')]: - self.assert_compile(type_, expected) - - -class TypesTest(fixtures.TestBase): - __only_on__ = 'oracle' - __dialect__ = oracle.OracleDialect() - __backend__ = True - - @testing.fails_on('+zxjdbc', 'zxjdbc lacks the FIXED_CHAR dbapi type') - def test_fixed_char(self): - m = MetaData(testing.db) - t = Table('t1', m, - Column('id', Integer, primary_key=True), - Column('data', CHAR(30), nullable=False)) - - t.create() - try: - t.insert().execute( - dict(id=1, data="value 1"), - dict(id=2, data="value 2"), - dict(id=3, data="value 3") - ) - - eq_( - t.select().where(t.c.data == 'value 2').execute().fetchall(), - [(2, 'value 2 ')] - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - assert type(t2.c.data.type) is CHAR - eq_( - t2.select().where(t2.c.data == 'value 2').execute().fetchall(), - [(2, 'value 2 ')] - ) - - finally: - t.drop() - - @testing.requires.returning - @testing.provide_metadata - def test_int_not_float(self): - m = self.metadata - t1 = Table('t1', m, Column('foo', Integer)) - t1.create() - r = t1.insert().values(foo=5).returning(t1.c.foo).execute() - x = r.scalar() - assert x == 5 - assert isinstance(x, int) - - x = t1.select().scalar() - assert x == 5 - assert isinstance(x, int) - - @testing.provide_metadata - def test_rowid(self): - metadata = self.metadata - t = Table('t1', metadata, Column('x', Integer)) - t.create() - t.insert().execute(x=5) - s1 = select([t]) - s2 = select([column('rowid')]).select_from(s1) - rowid = s2.scalar() - - # the ROWID type is not really needed here, - # as cx_oracle just treats it as a string, - # but we want to make sure the ROWID works... - rowid_col = column('rowid', oracle.ROWID) - s3 = select([t.c.x, rowid_col]) \ - .where(rowid_col == cast(rowid, oracle.ROWID)) - eq_(s3.select().execute().fetchall(), [(5, rowid)]) - - @testing.fails_on('+zxjdbc', - 'Not yet known how to pass values of the ' - 'INTERVAL type') - @testing.provide_metadata - def test_interval(self): - metadata = self.metadata - interval_table = Table('intervaltable', metadata, Column('id', - Integer, primary_key=True, - test_needs_autoincrement=True), - Column('day_interval', - oracle.INTERVAL(day_precision=3))) - metadata.create_all() - interval_table.insert().\ - execute(day_interval=datetime.timedelta(days=35, seconds=5743)) - row = interval_table.select().execute().first() - eq_(row['day_interval'], datetime.timedelta(days=35, - seconds=5743)) - - @testing.provide_metadata - def test_numerics(self): - m = self.metadata - t1 = Table('t1', m, - Column('intcol', Integer), - Column('numericcol', Numeric(precision=9, scale=2)), - Column('floatcol1', Float()), - Column('floatcol2', FLOAT()), - Column('doubleprec', oracle.DOUBLE_PRECISION), - Column('numbercol1', oracle.NUMBER(9)), - Column('numbercol2', oracle.NUMBER(9, 3)), - Column('numbercol3', oracle.NUMBER)) - t1.create() - t1.insert().execute( - intcol=1, - numericcol=5.2, - floatcol1=6.5, - floatcol2=8.5, - doubleprec=9.5, - numbercol1=12, - numbercol2=14.85, - numbercol3=15.76 - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - - for row in ( - t1.select().execute().first(), - t2.select().execute().first() - ): - for i, (val, type_) in enumerate(( - (1, int), - (decimal.Decimal("5.2"), decimal.Decimal), - (6.5, float), - (8.5, float), - (9.5, float), - (12, int), - (decimal.Decimal("14.85"), decimal.Decimal), - (15.76, float), - )): - eq_(row[i], val) - assert isinstance(row[i], type_), '%r is not %r' \ - % (row[i], type_) - - def test_numeric_no_decimal_mode(self): - engine = testing_engine(options=dict(coerce_to_decimal=False)) - value = engine.scalar("SELECT 5.66 FROM DUAL") - assert isinstance(value, float) - - value = testing.db.scalar("SELECT 5.66 FROM DUAL") - assert isinstance(value, decimal.Decimal) - - @testing.only_on("oracle+cx_oracle", "cx_oracle-specific feature") - @testing.fails_if( - testing.requires.python3, - "cx_oracle always returns unicode on py3k") - def test_coerce_to_unicode(self): - engine = testing_engine(options=dict(coerce_to_unicode=True)) - value = engine.scalar("SELECT 'hello' FROM DUAL") - assert isinstance(value, util.text_type) - - value = testing.db.scalar("SELECT 'hello' FROM DUAL") - assert isinstance(value, util.binary_type) - - @testing.provide_metadata - def test_numerics_broken_inspection(self): - """Numeric scenarios where Oracle type info is 'broken', - returning us precision, scale of the form (0, 0) or (0, -127). - We convert to Decimal and let int()/float() processors take over. - - """ - - metadata = self.metadata - - # this test requires cx_oracle 5 - - foo = Table('foo', metadata, - Column('idata', Integer), - Column('ndata', Numeric(20, 2)), - Column('ndata2', Numeric(20, 2)), - Column('nidata', Numeric(5, 0)), - Column('fdata', Float())) - foo.create() - - foo.insert().execute({ - 'idata': 5, - 'ndata': decimal.Decimal("45.6"), - 'ndata2': decimal.Decimal("45.0"), - 'nidata': decimal.Decimal('53'), - 'fdata': 45.68392 - }) - - stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" - - row = testing.db.execute(stmt).fetchall()[0] - eq_( - [type(x) for x in row], - [int, decimal.Decimal, decimal.Decimal, int, float] - ) - eq_( - row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), - 53, 45.683920000000001) - ) - - # with a nested subquery, - # both Numeric values that don't have decimal places, regardless - # of their originating type, come back as ints with no useful - # typing information beyond "numeric". So native handler - # must convert to int. - # this means our Decimal converters need to run no matter what. - # totally sucks. - - stmt = """ - SELECT - (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata, - (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) FROM DUAL) - AS ndata, - (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) FROM DUAL) - AS ndata2, - (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) FROM DUAL) - AS nidata, - (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata - FROM dual - """ - row = testing.db.execute(stmt).fetchall()[0] - eq_( - [type(x) for x in row], - [int, decimal.Decimal, int, int, decimal.Decimal] - ) - eq_( - row, - (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) - ) - - row = testing.db.execute(text(stmt, - typemap={ - 'idata': Integer(), - 'ndata': Numeric(20, 2), - 'ndata2': Numeric(20, 2), - 'nidata': Numeric(5, 0), - 'fdata': Float()})).fetchall()[0] - eq_( - [type(x) for x in row], - [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] - ) - eq_( - row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), - decimal.Decimal('53'), 45.683920000000001) - ) - - stmt = """ - SELECT - anon_1.idata AS anon_1_idata, - anon_1.ndata AS anon_1_ndata, - anon_1.ndata2 AS anon_1_ndata2, - anon_1.nidata AS anon_1_nidata, - anon_1.fdata AS anon_1_fdata - FROM (SELECT idata, ndata, ndata2, nidata, fdata - FROM ( - SELECT - (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata, - (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) - FROM DUAL) AS ndata, - (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) - FROM DUAL) AS ndata2, - (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) - FROM DUAL) AS nidata, - (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) - AS fdata - FROM dual - ) - WHERE ROWNUM >= 0) anon_1 - """ - row = testing.db.execute(stmt).fetchall()[0] - eq_( - [type(x) for x in row], - [int, decimal.Decimal, int, int, decimal.Decimal] - ) - eq_( - row, - (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) - ) - - row = testing.db.execute(text(stmt, - typemap={ - 'anon_1_idata': Integer(), - 'anon_1_ndata': Numeric(20, 2), - 'anon_1_ndata2': Numeric(20, 2), - 'anon_1_nidata': Numeric(5, 0), - 'anon_1_fdata': Float() - })).fetchall()[0] - eq_( - [type(x) for x in row], - [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] - ) - eq_( - row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), - decimal.Decimal('53'), 45.683920000000001) - ) - - row = testing.db.execute(text( - stmt, - typemap={ - 'anon_1_idata': Integer(), - 'anon_1_ndata': Numeric(20, 2, asdecimal=False), - 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), - 'anon_1_nidata': Numeric(5, 0, asdecimal=False), - 'anon_1_fdata': Float(asdecimal=True) - })).fetchall()[0] - eq_( - [type(x) for x in row], - [int, float, float, float, decimal.Decimal] - ) - eq_( - row, - (5, 45.6, 45, 53, decimal.Decimal('45.68392')) - ) - - @testing.provide_metadata - def test_reflect_dates(self): - metadata = self.metadata - Table( - "date_types", metadata, - Column('d1', sqltypes.DATE), - Column('d2', oracle.DATE), - Column('d3', TIMESTAMP), - Column('d4', TIMESTAMP(timezone=True)), - Column('d5', oracle.INTERVAL(second_precision=5)), - ) - metadata.create_all() - m = MetaData(testing.db) - t1 = Table( - "date_types", m, - autoload=True) - assert isinstance(t1.c.d1.type, oracle.DATE) - assert isinstance(t1.c.d1.type, DateTime) - assert isinstance(t1.c.d2.type, oracle.DATE) - assert isinstance(t1.c.d2.type, DateTime) - assert isinstance(t1.c.d3.type, TIMESTAMP) - assert not t1.c.d3.type.timezone - assert isinstance(t1.c.d4.type, TIMESTAMP) - assert t1.c.d4.type.timezone - assert isinstance(t1.c.d5.type, oracle.INTERVAL) - - def test_reflect_all_types_schema(self): - types_table = Table('all_types', MetaData(testing.db), - Column('owner', String(30), primary_key=True), - Column('type_name', String(30), primary_key=True), - autoload=True, oracle_resolve_synonyms=True) - for row in types_table.select().execute().fetchall(): - [row[k] for k in row.keys()] - - @testing.provide_metadata - def test_raw_roundtrip(self): - metadata = self.metadata - raw_table = Table('raw', metadata, - Column('id', Integer, primary_key=True), - Column('data', oracle.RAW(35))) - metadata.create_all() - testing.db.execute(raw_table.insert(), id=1, data=b("ABCDEF")) - eq_( - testing.db.execute(raw_table.select()).first(), - (1, b("ABCDEF")) - ) - - @testing.provide_metadata - def test_reflect_nvarchar(self): - metadata = self.metadata - Table('tnv', metadata, Column('data', sqltypes.NVARCHAR(255))) - metadata.create_all() - m2 = MetaData(testing.db) - t2 = Table('tnv', m2, autoload=True) - assert isinstance(t2.c.data.type, sqltypes.NVARCHAR) - - if testing.against('oracle+cx_oracle'): - # nvarchar returns unicode natively. cx_oracle - # _OracleNVarChar type should be at play here. - assert isinstance( - t2.c.data.type.dialect_impl(testing.db.dialect), - cx_oracle._OracleNVarChar) - - data = u('m’a réveillé.') - t2.insert().execute(data=data) - res = t2.select().execute().first()['data'] - eq_(res, data) - assert isinstance(res, util.text_type) - - @testing.provide_metadata - def test_char_length(self): - metadata = self.metadata - t1 = Table('t1', metadata, - Column("c1", VARCHAR(50)), - Column("c2", NVARCHAR(250)), - Column("c3", CHAR(200))) - t1.create() - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - eq_(t2.c.c1.type.length, 50) - eq_(t2.c.c2.type.length, 250) - eq_(t2.c.c3.type.length, 200) - - @testing.provide_metadata - def test_long_type(self): - metadata = self.metadata - - t = Table('t', metadata, Column('data', oracle.LONG)) - metadata.create_all(testing.db) - testing.db.execute(t.insert(), data='xyz') - eq_( - testing.db.scalar(select([t.c.data])), - "xyz" - ) - - def test_longstring(self): - metadata = MetaData(testing.db) - testing.db.execute(""" - CREATE TABLE Z_TEST - ( - ID NUMERIC(22) PRIMARY KEY, - ADD_USER VARCHAR2(20) NOT NULL - ) - """) - try: - t = Table("z_test", metadata, autoload=True) - t.insert().execute(id=1.0, add_user='foobar') - assert t.select().execute().fetchall() == [(1, 'foobar')] - finally: - testing.db.execute("DROP TABLE Z_TEST") - - @testing.fails_on('+zxjdbc', 'auto_convert_lobs not applicable') - def test_lobs_without_convert(self): - engine = testing_engine(options=dict(auto_convert_lobs=False)) - metadata = MetaData() - t = Table("z_test", - metadata, - Column('id', Integer, primary_key=True), - Column('data', Text), - Column('bindata', LargeBinary)) - t.create(engine) - try: - engine.execute(t.insert(), - id=1, - data='this is text', - bindata=b('this is binary')) - row = engine.execute(t.select()).first() - eq_(row['data'].read(), 'this is text') - eq_(row['bindata'].read(), b('this is binary')) - finally: - t.drop(engine) - - -class EuroNumericTest(fixtures.TestBase): - """ - test the numeric output_type_handler when using non-US locale for NLS_LANG. - """ - - __only_on__ = 'oracle+cx_oracle' - __backend__ = True - - def setup(self): - self.old_nls_lang = os.environ.get('NLS_LANG', False) - os.environ['NLS_LANG'] = "GERMAN" - self.engine = testing_engine() - - def teardown(self): - if self.old_nls_lang is not False: - os.environ['NLS_LANG'] = self.old_nls_lang - else: - del os.environ['NLS_LANG'] - self.engine.dispose() - - def test_output_type_handler(self): - for stmt, exp, kw in [ - ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), - ("SELECT 15 FROM DUAL", 15, {}), - ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", - decimal.Decimal("15"), {}), - ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", - decimal.Decimal("0.1"), {}), - ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), - {'num': decimal.Decimal("2.5")}) - ]: - test_exp = self.engine.scalar(stmt, **kw) - eq_( - test_exp, - exp - ) - assert type(test_exp) is type(exp) - - -class SystemTableTablenamesTest(fixtures.TestBase): - __only_on__ = 'oracle' - __backend__ = True - - def setup(self): - testing.db.execute("create table my_table (id integer)") - testing.db.execute( - "create global temporary table my_temp_table (id integer)" - ) - testing.db.execute( - "create table foo_table (id integer) tablespace SYSTEM" - ) - - def teardown(self): - testing.db.execute("drop table my_temp_table") - testing.db.execute("drop table my_table") - testing.db.execute("drop table foo_table") - - def test_table_names_no_system(self): - insp = inspect(testing.db) - eq_( - insp.get_table_names(), ["my_table"] - ) - - def test_temp_table_names_no_system(self): - insp = inspect(testing.db) - eq_( - insp.get_temp_table_names(), ["my_temp_table"] - ) - - def test_table_names_w_system(self): - engine = testing_engine(options={"exclude_tablespaces": ["FOO"]}) - insp = inspect(engine) - eq_( - set(insp.get_table_names()).intersection(["my_table", - "foo_table"]), - set(["my_table", "foo_table"]) - ) - - -class DontReflectIOTTest(fixtures.TestBase): - """test that index overflow tables aren't included in - table_names.""" - - __only_on__ = 'oracle' - __backend__ = True - - def setup(self): - testing.db.execute(""" - CREATE TABLE admin_docindex( - token char(20), - doc_id NUMBER, - token_frequency NUMBER, - token_offsets VARCHAR2(2000), - CONSTRAINT pk_admin_docindex PRIMARY KEY (token, doc_id)) - ORGANIZATION INDEX - TABLESPACE users - PCTTHRESHOLD 20 - OVERFLOW TABLESPACE users - """) - - def teardown(self): - testing.db.execute("drop table admin_docindex") - - def test_reflect_all(self): - m = MetaData(testing.db) - m.reflect() - eq_( - set(t.name for t in m.tables.values()), - set(['admin_docindex']) - ) - - -class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - __backend__ = True - - @classmethod - def setup_class(cls): - global binary_table, stream, meta - meta = MetaData(testing.db) - binary_table = Table('binary_table', meta, - Column('id', Integer, primary_key=True), - Column('data', LargeBinary)) - meta.create_all() - stream = os.path.join( - os.path.dirname(__file__), "..", - 'binary_data_one.dat') - with open(stream, "rb") as file_: - stream = file_.read(12000) - - for i in range(1, 11): - binary_table.insert().execute(id=i, data=stream) - - @classmethod - def teardown_class(cls): - meta.drop_all() - - def test_fetch(self): - result = binary_table.select().order_by(binary_table.c.id).\ - execute().fetchall() - eq_(result, [(i, stream) for i in range(1, 11)]) - - @testing.fails_on('+zxjdbc', 'FIXME: zxjdbc should support this') - def test_fetch_single_arraysize(self): - eng = testing_engine(options={'arraysize': 1}) - result = eng.execute(binary_table.select(). - order_by(binary_table.c.id)).fetchall() - eq_(result, [(i, stream) for i in range(1, 11)]) - - -class UnsupportedIndexReflectTest(fixtures.TestBase): - __only_on__ = 'oracle' - __backend__ = True - - @testing.emits_warning("No column names") - @testing.provide_metadata - def test_reflect_functional_index(self): - metadata = self.metadata - Table('test_index_reflect', metadata, - Column('data', String(20), primary_key=True)) - metadata.create_all() - - testing.db.execute('CREATE INDEX DATA_IDX ON ' - 'TEST_INDEX_REFLECT (UPPER(DATA))') - m2 = MetaData(testing.db) - Table('test_index_reflect', m2, autoload=True) - - -def all_tables_compression_missing(): - try: - testing.db.execute('SELECT compression FROM all_tables') - if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): - return True - return False - except Exception: - return True - - -def all_tables_compress_for_missing(): - try: - testing.db.execute('SELECT compress_for FROM all_tables') - if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): - return True - return False - except Exception: - return True - - -class TableReflectionTest(fixtures.TestBase): - __only_on__ = 'oracle' - __backend__ = True - - @testing.provide_metadata - @testing.fails_if(all_tables_compression_missing) - def test_reflect_basic_compression(self): - metadata = self.metadata - - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress=True) - metadata.create_all() - - m2 = MetaData(testing.db) - - tbl = Table('test_compress', m2, autoload=True) - # Don't hardcode the exact value, but it must be non-empty - assert tbl.dialect_options['oracle']['compress'] - - @testing.provide_metadata - @testing.fails_if(all_tables_compress_for_missing) - def test_reflect_oltp_compression(self): - metadata = self.metadata - - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress="OLTP") - metadata.create_all() - - m2 = MetaData(testing.db) - - tbl = Table('test_compress', m2, autoload=True) - assert tbl.dialect_options['oracle']['compress'] == "OLTP" - - -class RoundTripIndexTest(fixtures.TestBase): - __only_on__ = 'oracle' - __backend__ = True - - @testing.provide_metadata - def test_basic(self): - metadata = self.metadata - - table = Table("sometable", metadata, - Column("id_a", Unicode(255), primary_key=True), - Column("id_b", - Unicode(255), - primary_key=True, - unique=True), - Column("group", Unicode(255), primary_key=True), - Column("col", Unicode(255)), - UniqueConstraint('col', 'group')) - - # "group" is a keyword, so lower case - normalind = Index('tableind', table.c.id_b, table.c.group) - compress1 = Index('compress1', table.c.id_a, table.c.id_b, - oracle_compress=True) - compress2 = Index('compress2', table.c.id_a, table.c.id_b, table.c.col, - oracle_compress=1) - - metadata.create_all() - mirror = MetaData(testing.db) - mirror.reflect() - metadata.drop_all() - mirror.create_all() - - inspect = MetaData(testing.db) - inspect.reflect() - - def obj_definition(obj): - return (obj.__class__, - tuple([c.name for c in obj.columns]), - getattr(obj, 'unique', None)) - - # find what the primary k constraint name should be - primaryconsname = testing.db.scalar( - text( - """SELECT constraint_name - FROM all_constraints - WHERE table_name = :table_name - AND owner = :owner - AND constraint_type = 'P' """), - table_name=table.name.upper(), - owner=testing.db.dialect.default_schema_name.upper()) - - reflectedtable = inspect.tables[table.name] - - # make a dictionary of the reflected objects: - - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) - - # assert we got primary key constraint and its name, Error - # if not in dict - - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ - == primaryconsname.upper() - - # Error if not in dict - - eq_( - reflected[(Index, ('id_b', 'group'), False)].name, - normalind.name - ) - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected - - idx = reflected[(Index, ('id_a', 'id_b', ), False)] - assert idx.dialect_options['oracle']['compress'] == 2 - - idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)] - assert idx.dialect_options['oracle']['compress'] == 1 - - eq_(len(reflectedtable.constraints), 1) - eq_(len(reflectedtable.indexes), 5) - - -class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): - - def test_basic(self): - seq = Sequence('my_seq_no_schema') - dialect = oracle.OracleDialect() - assert dialect.identifier_preparer.format_sequence(seq) \ - == 'my_seq_no_schema' - seq = Sequence('my_seq', schema='some_schema') - assert dialect.identifier_preparer.format_sequence(seq) \ - == 'some_schema.my_seq' - seq = Sequence('My_Seq', schema='Some_Schema') - assert dialect.identifier_preparer.format_sequence(seq) \ - == '"Some_Schema"."My_Seq"' - - -class ExecuteTest(fixtures.TestBase): - - __only_on__ = 'oracle' - __backend__ = True - - def test_basic(self): - eq_(testing.db.execute('/*+ this is a comment */ SELECT 1 FROM ' - 'DUAL').fetchall(), [(1, )]) - - def test_sequences_are_integers(self): - seq = Sequence('foo_seq') - seq.create(testing.db) - try: - val = testing.db.execute(seq) - eq_(val, 1) - assert type(val) is int - finally: - seq.drop(testing.db) - - @testing.provide_metadata - def test_limit_offset_for_update(self): - metadata = self.metadata - # oracle can't actually do the ROWNUM thing with FOR UPDATE - # very well. - - t = Table('t1', - metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer)) - metadata.create_all() - - t.insert().execute( - {'id': 1, 'data': 1}, - {'id': 2, 'data': 7}, - {'id': 3, 'data': 12}, - {'id': 4, 'data': 15}, - {'id': 5, 'data': 32}, - ) - - # here, we can't use ORDER BY. - eq_( - t.select(for_update=True).limit(2).execute().fetchall(), - [(1, 1), - (2, 7)] - ) - - # here, its impossible. But we'd prefer it to raise ORA-02014 - # instead of issuing a syntax error. - assert_raises_message( - exc.DatabaseError, - "ORA-02014", - t.select(for_update=True).limit(2).offset(3).execute - ) - - -class UnicodeSchemaTest(fixtures.TestBase): - __only_on__ = 'oracle' - __backend__ = True - - @testing.provide_metadata - def test_quoted_column_non_unicode(self): - metadata = self.metadata - table = Table("atable", metadata, - Column("_underscorecolumn", - Unicode(255), - primary_key=True)) - metadata.create_all() - - table.insert().execute( - {'_underscorecolumn': u('’é')}, - ) - result = testing.db.execute( - table.select().where(table.c._underscorecolumn == u('’é')) - ).scalar() - eq_(result, u('’é')) - - @testing.provide_metadata - def test_quoted_column_unicode(self): - metadata = self.metadata - table = Table("atable", metadata, - Column(u("méil"), Unicode(255), primary_key=True)) - metadata.create_all() - - table.insert().execute( - {u('méil'): u('’é')}, - ) - result = testing.db.execute( - table.select().where(table.c[u('méil')] == u('’é')) - ).scalar() - eq_(result, u('’é')) - - -class DBLinkReflectionTest(fixtures.TestBase): - __requires__ = 'oracle_test_dblink', - __only_on__ = 'oracle' - __backend__ = True - - @classmethod - def setup_class(cls): - from sqlalchemy.testing import config - cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link') - - # note that the synonym here is still not totally functional - # when accessing via a different username as we do with the - # multiprocess test suite, so testing here is minimal - with testing.db.connect() as conn: - conn.execute("create table test_table " - "(id integer primary key, data varchar2(50))") - conn.execute("create synonym test_table_syn " - "for test_table@%s" % cls.dblink) - - @classmethod - def teardown_class(cls): - with testing.db.connect() as conn: - conn.execute("drop synonym test_table_syn") - conn.execute("drop table test_table") - - def test_reflection(self): - """test the resolution of the synonym/dblink. """ - m = MetaData() - - t = Table('test_table_syn', m, autoload=True, - autoload_with=testing.db, oracle_resolve_synonyms=True) - eq_(list(t.c.keys()), ['id', 'data']) - eq_(list(t.primary_key), [t.c.id]) - - -class ServiceNameTest(fixtures.TestBase): - __only_on__ = 'oracle+cx_oracle' - __backend__ = True - - def test_cx_oracle_service_name(self): - url_string = 'oracle+cx_oracle://scott:tiger@host/?service_name=hr' - eng = create_engine(url_string, _initialize=False) - cargs, cparams = eng.dialect.create_connect_args(eng.url) - - assert 'SERVICE_NAME=hr' in cparams['dsn'] - assert 'SID=hr' not in cparams['dsn'] - - def test_cx_oracle_service_name_bad(self): - url_string = 'oracle+cx_oracle://scott:tiger@host/hr1?service_name=hr2' - assert_raises( - exc.InvalidRequestError, - create_engine, url_string, - _initialize=False - ) - diff --git a/test/profiles.txt b/test/profiles.txt index e76f14cdb..e1bcf0aef 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -1,15 +1,15 @@ # /home/classic/dev/sqlalchemy/test/profiles.txt # This file is written out on a per-environment basis. -# For each test in aaa_profiling, the corresponding function and +# For each test in aaa_profiling, the corresponding function and # environment is located within this file. If it doesn't exist, # the test is skipped. -# If a callcount does exist, it is compared to what we received. +# If a callcount does exist, it is compared to what we received. # assertions are raised if the counts do not match. -# -# To add a new callcount test, apply the function_call_count -# decorator and re-run the tests using the --write-profiles +# +# To add a new callcount test, apply the function_call_count +# decorator and re-run the tests using the --write-profiles # option - this file will be rewritten including the new count. -# +# # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert @@ -367,6 +367,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_mysqldb_dbapiunicode_nocextensions 52 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_pymysql_dbapiunicode_cextensions 48 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_pymysql_dbapiunicode_nocextensions 52 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_oracle_cx_oracle_dbapiunicode_cextensions 47 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 51 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_postgresql_psycopg2_dbapiunicode_cextensions 48 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 52 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_sqlite_pysqlite_dbapiunicode_cextensions 48 @@ -379,6 +381,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 56 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_sqlite_pysqlite_dbapiunicode_cextensions 52 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 56 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.6_oracle_cx_oracle_dbapiunicode_cextensions 51 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 55 # TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute @@ -386,6 +390,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_dbapiunicode_nocextensions 91 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_pymysql_dbapiunicode_cextensions 87 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_pymysql_dbapiunicode_nocextensions 91 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_dbapiunicode_cextensions 86 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 90 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_dbapiunicode_cextensions 87 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 91 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_dbapiunicode_cextensions 87 @@ -398,6 +404,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 95 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_sqlite_pysqlite_dbapiunicode_cextensions 91 test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 95 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.6_oracle_cx_oracle_dbapiunicode_cextensions 90 +test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 94 # TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile @@ -405,6 +413,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_mysqldb_dbapiunicode_nocextensions 15 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_pymysql_dbapiunicode_cextensions 15 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_pymysql_dbapiunicode_nocextensions 15 +test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_oracle_cx_oracle_dbapiunicode_cextensions 15 +test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_postgresql_psycopg2_dbapiunicode_cextensions 15 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_sqlite_pysqlite_dbapiunicode_cextensions 15 @@ -417,6 +427,22 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 16 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_sqlite_pysqlite_dbapiunicode_cextensions 16 test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 16 +test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.6_oracle_cx_oracle_dbapiunicode_cextensions 16 +test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 16 + +# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string + +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 2.7_oracle_cx_oracle_dbapiunicode_cextensions 357 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15379 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 3.6_oracle_cx_oracle_dbapiunicode_cextensions 326 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14330 + +# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode + +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 2.7_oracle_cx_oracle_dbapiunicode_cextensions 20361 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 35383 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 3.6_oracle_cx_oracle_dbapiunicode_cextensions 330 +test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14334 # TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_string @@ -424,6 +450,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_db test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_dbapiunicode_nocextensions 55515 test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_pymysql_dbapiunicode_cextensions 122475 test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_pymysql_dbapiunicode_nocextensions 137477 +test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_oracle_cx_oracle_dbapiunicode_cextensions 518 +test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15520 test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_dbapiunicode_cextensions 498 test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15500 test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_sqlite_pysqlite_dbapiunicode_cextensions 440 @@ -436,6 +464,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_postgresql_psyco test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 14527 test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_sqlite_pysqlite_dbapiunicode_cextensions 460 test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 14464 +test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.6_oracle_cx_oracle_dbapiunicode_cextensions 522 +test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14526 # TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_unicode @@ -443,6 +473,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_d test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_dbapiunicode_nocextensions 55515 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_pymysql_dbapiunicode_cextensions 122475 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_pymysql_dbapiunicode_nocextensions 137477 +test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_oracle_cx_oracle_dbapiunicode_cextensions 20518 +test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 55520 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_dbapiunicode_cextensions 498 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15500 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_sqlite_pysqlite_dbapiunicode_cextensions 440 @@ -455,6 +487,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_postgresql_psyc test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 14527 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_sqlite_pysqlite_dbapiunicode_cextensions 460 test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 14464 +test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.6_oracle_cx_oracle_dbapiunicode_cextensions 522 +test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14526 # TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation diff --git a/test/requirements.py b/test/requirements.py index 9b01a22dd..0829607cd 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -750,10 +750,14 @@ class DefaultRequirements(SuiteRequirements): """target backend supports Decimal() objects using E notation to represent very large values.""" - return skip_if( - [("sybase+pyodbc", None, None, - "Don't know how do get these values through FreeTDS + Sybase"), - ("firebird", None, None, "Precision must be from 1 to 18")]) + return fails_if( + [ + ("sybase+pyodbc", None, None, + "Don't know how do get these values through FreeTDS + Sybase" + ), + ("firebird", None, None, "Precision must be from 1 to 18") + ] + ) @property def precision_numerics_many_significant_digits(self): @@ -761,15 +765,17 @@ class DefaultRequirements(SuiteRequirements): such as 319438950232418390.273596, 87673.594069654243 """ - def cx_oracle_6_config(config): - return config.db.driver == "cx_oracle" and \ - config.db.dialect.cx_oracle_ver >= (6, ) + + def broken_cx_oracle(config): + return against(config, 'oracle+cx_oracle') and \ + config.db.dialect.cx_oracle_ver <= (6, 0, 2) and \ + config.db.dialect.cx_oracle_ver > (6, ) return fails_if( - [cx_oracle_6_config, - ('sqlite', None, None, 'TODO'), - ("firebird", None, None, "Precision must be from 1 to 18"), - ("sybase+pysybase", None, None, "TODO"), + [ + ('sqlite', None, None, 'TODO'), + ("firebird", None, None, "Precision must be from 1 to 18"), + ("sybase+pysybase", None, None, "TODO"), ] ) @@ -781,9 +787,7 @@ class DefaultRequirements(SuiteRequirements): return fails_if( [ - ('oracle', None, None, - "this may be a bug due to the difficulty in handling " - "oracle precision numerics"), + ("oracle", None, None, "driver doesn't do this automatically"), ("firebird", None, None, "database and/or driver truncates decimal places.") ] @@ -999,3 +1003,19 @@ class DefaultRequirements(SuiteRequirements): lambda config: against(config, 'postgresql') and config.db.scalar("show server_encoding").lower() == "utf8" ) + + @property + def broken_cx_oracle6_numerics(config): + return exclusions.LambdaPredicate( + lambda config: against(config, 'oracle+cx_oracle') and + config.db.dialect.cx_oracle_ver <= (6, 0, 2) and + config.db.dialect.cx_oracle_ver > (6, ), + "cx_Oracle github issue #77" + ) + + @property + def oracle5x(self): + return only_if( + lambda config: against(config, "oracle+cx_oracle") and + config.db.dialect.cx_oracle_ver < (6, ) + )
\ No newline at end of file diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py index 947fe0dc5..f8d183b71 100644 --- a/test/sql/test_returning.py +++ b/test/sql/test_returning.py @@ -360,7 +360,6 @@ class ReturnDefaultsTest(fixtures.TablesTest): [None] ) - @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") def test_insert_non_default_plus_default(self): t1 = self.tables.t1 result = testing.db.execute( @@ -372,7 +371,6 @@ class ReturnDefaultsTest(fixtures.TablesTest): {"id": 1, "data": None, "insdef": 0} ) - @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") def test_update_non_default_plus_default(self): t1 = self.tables.t1 testing.db.execute( @@ -387,7 +385,6 @@ class ReturnDefaultsTest(fixtures.TablesTest): {"data": None, 'upddef': 1} ) - @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") def test_insert_all(self): t1 = self.tables.t1 result = testing.db.execute( @@ -398,7 +395,6 @@ class ReturnDefaultsTest(fixtures.TablesTest): {"id": 1, "data": None, "insdef": 0} ) - @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") def test_update_all(self): t1 = self.tables.t1 testing.db.execute( diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py index 009911538..ea29bcf7e 100644 --- a/test/sql/test_rowcount.py +++ b/test/sql/test_rowcount.py @@ -66,7 +66,8 @@ class FoundRowsTest(fixtures.TestBase, AssertsExecutionResults): assert r.rowcount == 3 @testing.skip_if( - "oracle", "temporary skip until cx_oracle refactor is merged") + testing.requires.oracle5x, + "unknown DBAPI error fixed in later version") @testing.requires.sane_rowcount_w_returning def test_update_rowcount_return_defaults(self): department = employees_table.c.department diff --git a/test/sql/test_types.py b/test/sql/test_types.py index b6cc04322..bbd8a221b 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -1579,6 +1579,7 @@ binary_table = MyPickleType = metadata = None class BinaryTest(fixtures.TestBase, AssertsExecutionResults): + __backend__ = True @classmethod def setup_class(cls): |