summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2017-09-01 00:11:10 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2017-09-11 14:17:10 -0400
commit31f80b9eaeb3c3435b7f6679b41e434478b1d11c (patch)
tree9802d4470d78768ba2a8812b47fae0f91e689d5c
parent4c97ea116c3686cb03f566f16b0a0e9a9fd33968 (diff)
downloadsqlalchemy-oracle_numeric.tar.gz
Refactor for cx_Oracle version 6oracle_numeric
Drops support for cx_Oracle prior to version 5.x, reworks numeric and binary support. Fixes: #4064 Change-Id: Ib9ae9aba430c15cd2a6eeb4e5e3fd8e97b5fe480
-rw-r--r--doc/build/changelog/migration_12.rst66
-rw-r--r--doc/build/changelog/unreleased_12/oracle_refactor.rst61
-rw-r--r--lib/sqlalchemy/dialects/oracle/__init__.py3
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py33
-rw-r--r--lib/sqlalchemy/dialects/oracle/cx_oracle.py900
-rw-r--r--lib/sqlalchemy/engine/default.py10
-rw-r--r--lib/sqlalchemy/engine/interfaces.py10
-rw-r--r--lib/sqlalchemy/engine/result.py16
-rw-r--r--lib/sqlalchemy/sql/type_api.py9
-rw-r--r--lib/sqlalchemy/testing/assertions.py1
-rw-r--r--lib/sqlalchemy/testing/provision.py2
-rw-r--r--lib/sqlalchemy/testing/suite/test_types.py3
-rw-r--r--setup.cfg2
-rw-r--r--test/aaa_profiling/test_resultset.py14
-rw-r--r--test/dialect/oracle/__init__.py0
-rw-r--r--test/dialect/oracle/test_compiler.py792
-rw-r--r--test/dialect/oracle/test_dialect.py316
-rw-r--r--test/dialect/oracle/test_reflection.py536
-rw-r--r--test/dialect/oracle/test_types.py787
-rw-r--r--test/dialect/test_oracle.py2316
-rw-r--r--test/profiles.txt46
-rw-r--r--test/requirements.py48
-rw-r--r--test/sql/test_returning.py4
-rw-r--r--test/sql/test_rowcount.py3
-rw-r--r--test/sql/test_types.py1
25 files changed, 3073 insertions, 2906 deletions
diff --git a/doc/build/changelog/migration_12.rst b/doc/build/changelog/migration_12.rst
index 87c08dc21..a670bb394 100644
--- a/doc/build/changelog/migration_12.rst
+++ b/doc/build/changelog/migration_12.rst
@@ -1358,6 +1358,72 @@ The above will render::
Dialect Improvements and Changes - Oracle
=========================================
+.. _change_cxoracle_12:
+
+Major Refactor to cx_Oracle Dialect, Typing System
+--------------------------------------------------
+
+With the introduction of the 6.x series of the cx_Oracle DBAPI, SQLAlchemy's
+cx_Oracle dialect has been reworked and simplified to take advantage of recent
+improvements in cx_Oracle as well as dropping support for patterns that were
+more relevant before the 5.x series of cx_Oracle.
+
+* The minimum cx_Oracle version supported is now 5.1.3; 5.3 or the most recent
+ 6.x series are recommended.
+
+* The handling of datatypes has been refactored. The ``cursor.setinputsizes()``
+ method is no longer used for any datatype except LOB types, per advice from
+ cx_Oracle's developers. As a result, the parameters ``auto_setinputsizes``
+ and ``exclude_setinputsizes`` are deprecated and no longer have any effect.
+
+* The ``coerce_to_decimal`` flag, when set to False to indicate that coercion
+ of numeric types with precision and scale to ``Decimal`` should not occur,
+ only impacts untyped (e.g. plain string with no :class:`.TypeEngine` objects)
+ statements. A Core expression that includes a :class:`.Numeric` type or
+ subtype will now follow the decimal coercion rules of that type.
+
+* The "two phase" transaction support in the dialect, already dropped for the
+ 6.x series of cx_Oracle, has now been removed entirely as this feature has
+ never worked correctly and is unlikely to have been in production use.
+ As a result, the ``allow_twophase`` dialect flag is deprecated and also has no
+ effect.
+
+* Fixed a bug involving the column keys present with RETURNING. Given
+ a statement as follows::
+
+ result = conn.execute(table.insert().values(x=5).returning(table.c.a, table.c.b))
+
+ Previously, the keys in each row of the result would be ``ret_0`` and ``ret_1``,
+ which are identifiers internal to the cx_Oracle RETURNING implementation.
+ The keys will now be ``a`` and ``b`` as is expected for other dialects.
+
+* cx_Oracle's LOB datatype represents return values as a ``cx_Oracle.LOB``
+ object, which is a cursor-associated proxy that returns the ultimate data
+ value via a ``.read()`` method. Historically, if more rows were read before
+ these LOB objects were consumed (specifically, more rows than the value of
+ cursor.arraysize which causes a new batch of rows to be read), these LOB
+ objects would raise the error "LOB variable no longer valid after subsequent
+ fetch". SQLAlchemy worked around this by both automatically calling
+ ``.read()`` upon these LOBs within its typing system, as well as using a
+ special ``BufferedColumnResultSet`` which would ensure this data was buffered
+ in case a call like ``cursor.fetchmany()`` or ``cursor.fetchall()`` were
+ used.
+
+ The dialect now makes use of a cx_Oracle outpttypehandler to handle these
+ ``.read()`` calls, so that they are always called up front regardless of how
+ many rows are being fetched, so that this error can no longer occur. As a
+ result, the use of the ``BufferedColumnResultSet``, as well as some other
+ internals to the Core ``ResultSet`` that were specific to this use case,
+ have been removed. The type objects are also simplified as they no longer
+ need to process a binary column result.
+
+ Additionally, cx_Oracle 6.x has removed the conditions under which this error
+ occurs in any case, so the error is no longer possible. The error
+ can occur on SQLAlchemy in the case that the seldom (if ever) used
+ ``auto_convert_lobs=False`` option is in use, in conjunction with the
+ previous 5.x series of cx_Oracle, and more rows are read before the LOB
+ objects can be consumed. Upgrading to cx_Oracle 6.x will resolve that issue.
+
.. _change_4003:
Oracle Unique, Check constraints now reflected
diff --git a/doc/build/changelog/unreleased_12/oracle_refactor.rst b/doc/build/changelog/unreleased_12/oracle_refactor.rst
new file mode 100644
index 000000000..2a30645fe
--- /dev/null
+++ b/doc/build/changelog/unreleased_12/oracle_refactor.rst
@@ -0,0 +1,61 @@
+.. change::
+ :tags: bug, oracle
+ :tickets: 4064
+
+ Partial support for persisting and retrieving the Oracle value
+ "infinity" is implemented with cx_Oracle, using Python float values
+ only, e.g. ``float("inf")``. Decimal support is not yet fulfilled by
+ the cx_Oracle DBAPI driver.
+
+.. change::
+ :tags: bug, oracle
+
+ The cx_Oracle dialect has been reworked and modernized to take advantage of
+ new patterns that weren't present in the old 4.x series of cx_Oracle. This
+ includes that the minimum cx_Oracle version is the 5.x series and that
+ cx_Oracle 6.x is now fully tested. The most significant change involves
+ type conversions, primarily regarding the numeric / floating point and LOB
+ datatypes, making more effective use of cx_Oracle type handling hooks to
+ simplify how bind parameter and result data is processed.
+
+ .. seealso::
+
+ :ref:`change_cxoracle_12`
+
+.. change::
+ :tags: bug, oracle
+ :tickets: 3997
+
+ two phase support for cx_Oracle has been completely removed for all
+ versions of cx_Oracle, whereas in 1.2.0b1 this change only took effect for
+ the 6.x series of cx_Oracle. This feature never worked correctly
+ in any version of cx_Oracle and in cx_Oracle 6.x, the API which SQLAlchemy
+ relied upon was removed.
+
+ .. seealso::
+
+ :ref:`change_cxoracle_12`
+
+.. change::
+ :tags: bug, oracle
+
+ The column keys present in a result set when using :meth:`.Insert.returning`
+ with the cx_Oracle backend now use the correct column / label names
+ like that of all other dialects. Previously, these came out as
+ ``ret_nnn``.
+
+ .. seealso::
+
+ :ref:`change_cxoracle_12`
+
+.. change::
+ :tags: bug, oracle
+
+ Several parameters to the cx_Oracle dialect are now deprecated and will
+ have no effect: ``auto_setinputsizes``, ``exclude_setinputsizes``,
+ ``allow_twophase``.
+
+ .. seealso::
+
+ :ref:`change_cxoracle_12`
+
diff --git a/lib/sqlalchemy/dialects/oracle/__init__.py b/lib/sqlalchemy/dialects/oracle/__init__.py
index 210fe501f..b081b134a 100644
--- a/lib/sqlalchemy/dialects/oracle/__init__.py
+++ b/lib/sqlalchemy/dialects/oracle/__init__.py
@@ -19,6 +19,7 @@ from sqlalchemy.dialects.oracle.base import \
__all__ = (
'VARCHAR', 'NVARCHAR', 'CHAR', 'DATE', 'NUMBER',
'BLOB', 'BFILE', 'CLOB', 'NCLOB', 'TIMESTAMP', 'RAW',
- 'FLOAT', 'DOUBLE_PRECISION', 'LONG', 'dialect', 'INTERVAL',
+ 'FLOAT', 'DOUBLE_PRECISION', 'BINARY_DOUBLE' 'BINARY_FLOAT',
+ 'LONG', 'dialect', 'INTERVAL',
'VARCHAR2', 'NVARCHAR2', 'ROWID'
)
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index 9478f6531..b2eb44b5b 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -422,6 +422,28 @@ class DOUBLE_PRECISION(sqltypes.Numeric):
precision=precision, scale=scale, asdecimal=asdecimal)
+class BINARY_DOUBLE(sqltypes.Numeric):
+ __visit_name__ = 'BINARY_DOUBLE'
+
+ def __init__(self, precision=None, scale=None, asdecimal=None):
+ if asdecimal is None:
+ asdecimal = False
+
+ super(BINARY_DOUBLE, self).__init__(
+ precision=precision, scale=scale, asdecimal=asdecimal)
+
+
+class BINARY_FLOAT(sqltypes.Numeric):
+ __visit_name__ = 'BINARY_FLOAT'
+
+ def __init__(self, precision=None, scale=None, asdecimal=None):
+ if asdecimal is None:
+ asdecimal = False
+
+ super(BINARY_FLOAT, self).__init__(
+ precision=precision, scale=scale, asdecimal=asdecimal)
+
+
class BFILE(sqltypes.LargeBinary):
__visit_name__ = 'BFILE'
@@ -557,6 +579,12 @@ class OracleTypeCompiler(compiler.GenericTypeCompiler):
def visit_DOUBLE_PRECISION(self, type_, **kw):
return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
+ def visit_BINARY_DOUBLE(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
+
+ def visit_BINARY_FLOAT(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
+
def visit_NUMBER(self, type_, **kw):
return self._generate_numeric(type_, "NUMBER", **kw)
@@ -746,12 +774,14 @@ class OracleCompiler(compiler.SQLCompiler):
def returning_clause(self, stmt, returning_cols):
columns = []
binds = []
+
for i, column in enumerate(
expression._select_iterables(returning_cols)):
if column.type._has_column_expression:
col_expr = column.type.column_expression(column)
else:
col_expr = column
+
outparam = sql.outparam("ret_%d" % i, type_=column.type)
self.binds[outparam.key] = outparam
binds.append(
@@ -760,7 +790,8 @@ class OracleCompiler(compiler.SQLCompiler):
self.process(col_expr, within_columns_clause=False))
self._add_to_result_map(
- outparam.key, outparam.key,
+ getattr(col_expr, 'name', col_expr.anon_label),
+ getattr(col_expr, 'name', col_expr.anon_label),
(column, getattr(column, 'name', None),
getattr(column, 'key', None)),
column.type
diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
index 86562dfd0..56a0425c8 100644
--- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py
+++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
@@ -25,11 +25,6 @@ directly as a TNS name.
Additional arguments which may be specified either as query string arguments
on the URL, or as keyword arguments to :func:`.create_engine()` are:
-* ``allow_twophase`` - enable two-phase transactions. This argument is
- **deprecated** as of the cx_Oracle 5.x series, two phase transactions are
- not supported under cx_Oracle and as of cx_Oracle 6.0b1 this feature is
- removed entirely.
-
* ``arraysize`` - set the cx_oracle.arraysize value on cursors, defaulted
to 50. This setting is significant with cx_Oracle as the contents of LOB
objects are only readable within a "live" row (e.g. within a batch of
@@ -37,25 +32,10 @@ on the URL, or as keyword arguments to :func:`.create_engine()` are:
* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`.
-* ``auto_setinputsizes`` - the cx_oracle.setinputsizes() call is issued for
- all bind parameters. This is required for LOB datatypes but can be
- disabled to reduce overhead. Defaults to ``True``. Specific types
- can be excluded from this process using the ``exclude_setinputsizes``
- parameter.
-
* ``coerce_to_unicode`` - see :ref:`cx_oracle_unicode` for detail.
* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail.
-* ``exclude_setinputsizes`` - a tuple or list of string DBAPI type names to
- be excluded from the "auto setinputsizes" feature. The type names here
- must match DBAPI types that are found in the "cx_Oracle" module namespace,
- such as cx_Oracle.UNICODE, cx_Oracle.NCLOB, etc. Defaults to
- ``(STRING, UNICODE)``.
-
- .. versionadded:: 0.8 specific DBAPI types can be excluded from the
- auto_setinputsizes feature via the exclude_setinputsizes attribute.
-
* ``mode`` - This is given the string value of SYSDBA or SYSOPER, or
alternatively an integer value. This value is only available as a URL query
string argument.
@@ -86,89 +66,43 @@ unicode those column values that are of type ``NVARCHAR`` or ``NCLOB``. For
column values that are of type ``VARCHAR`` or other non-unicode string types,
it will return values as Python strings (e.g. bytestrings).
-The cx_Oracle SQLAlchemy dialect presents two different options for the use
-case of returning ``VARCHAR`` column values as Python unicode objects under
+The cx_Oracle SQLAlchemy dialect presents several different options for the use
+case of receiving ``VARCHAR`` column values as Python unicode objects under
Python 2:
-* the cx_Oracle DBAPI has the ability to coerce all string results to Python
- unicode objects unconditionally using output type handlers. This has
- the advantage that the unicode conversion is global to all statements
- at the cx_Oracle driver level, meaning it works with raw textual SQL
- statements that have no typing information associated. However, this system
- has been observed to incur signfiicant performance overhead, not only
- because it takes effect for all string values unconditionally, but also
- because cx_Oracle under Python 2 seems to use a pure-Python function call in
- order to do the decode operation, which under cPython can orders of
- magnitude slower than doing it using C functions alone.
-
-* SQLAlchemy has unicode-decoding services built in, and when using
- SQLAlchemy's C extensions, these functions do not use any Python function
- calls and are very fast. The disadvantage to this approach is that the
- unicode conversion only takes effect for statements where the
- :class:`.Unicode` type or :class:`.String` type with
- ``convert_unicode=True`` is explicitly associated with the result column.
- This is the case for any ORM or Core query or SQL expression as well as for
- a :func:`.text` construct that specifies output column types, so in the vast
- majority of cases this is not an issue. However, when sending a completely
- raw string to :meth:`.Connection.execute`, this typing information isn't
- present, unless the string is handled within a :func:`.text` construct that
- adds typing information.
-
-As of version 0.9.2 of SQLAlchemy, the default approach is to use SQLAlchemy's
-typing system. This keeps cx_Oracle's expensive Python 2 approach
-disabled unless the user explicitly wants it. Under Python 3, SQLAlchemy
-detects that cx_Oracle is returning unicode objects natively and cx_Oracle's
-system is used.
-
-To re-enable cx_Oracle's output type handler under Python 2, the
-``coerce_to_unicode=True`` flag (new in 0.9.4) can be passed to
-:func:`.create_engine`::
-
- engine = create_engine("oracle+cx_oracle://dsn", coerce_to_unicode=True)
+* When using Core expression objects as well as the ORM, SQLAlchemy's
+ unicode-decoding services are available, which are established by
+ using either the :class:`.Unicode` datatype or by using the
+ :class:`.String` datatype with :paramref:`.String.convert_unicode` set
+ to True.
-Alternatively, to run a pure string SQL statement and get ``VARCHAR`` results
-as Python unicode under Python 2 without using cx_Oracle's native handlers,
-the :func:`.text` feature can be used::
+* When using raw SQL strings, typing behavior can be added for unicode
+ conversion using the :func:`.text` construct::
from sqlalchemy import text, Unicode
result = conn.execute(
text("select username from user").columns(username=Unicode))
-.. versionchanged:: 0.9.2 cx_Oracle's outputtypehandlers are no longer used
- for unicode results of non-unicode datatypes in Python 2, after they were
- identified as a major performance bottleneck. SQLAlchemy's own unicode
- facilities are used instead.
+* Otherwise, when using raw SQL strings sent directly to an ``.execute()``
+ method without any Core typing behavior added, the flag
+ ``coerce_to_unicode=True`` flag can be passed to :func:`.create_engine`
+ which will add an unconditional unicode processor to cx_Oracle for all
+ string values::
+
+ engine = create_engine("oracle+cx_oracle://dsn", coerce_to_unicode=True)
+
+ The above approach will add significant latency to result-set fetches
+ of plain string values.
-.. versionadded:: 0.9.4 Added the ``coerce_to_unicode`` flag, to re-enable
- cx_Oracle's outputtypehandler and revert to pre-0.9.2 behavior.
.. _cx_oracle_returning:
RETURNING Support
-----------------
-The cx_oracle DBAPI supports a limited subset of Oracle's already limited
-RETURNING support. Typically, results can only be guaranteed for at most one
-column being returned; this is the typical case when SQLAlchemy uses RETURNING
-to get just the value of a primary-key-associated sequence value.
-Additional column expressions will cause problems in a non-determinative way,
-due to cx_oracle's lack of support for the OCI_DATA_AT_EXEC API which is
-required for more complex RETURNING scenarios.
-
-For this reason, stability may be enhanced by disabling RETURNING support
-completely; SQLAlchemy otherwise will use RETURNING to fetch newly
-sequence-generated primary keys. As illustrated in :ref:`oracle_returning`::
-
- engine = create_engine("oracle://scott:tiger@dsn",
- implicit_returning=False)
-
-.. seealso::
-
- http://docs.oracle.com/cd/B10501_01/appdev.920/a96584/oci05bnd.htm#420693
- - OCI documentation for RETURNING
-
- http://sourceforge.net/mailarchive/message.php?msg_id=31338136
- - cx_oracle developer commentary
+The cx_Oracle dialect implements RETURNING using OUT parameters.
+The dialect supports RETURNING fully, however cx_Oracle 6 is recommended
+for complete support.
.. _cx_oracle_lob:
@@ -177,16 +111,26 @@ LOB Objects
cx_oracle returns oracle LOBs using the cx_oracle.LOB object. SQLAlchemy
converts these to strings so that the interface of the Binary type is
-consistent with that of other backends, and so that the linkage to a live
-cursor is not needed in scenarios like result.fetchmany() and
-result.fetchall(). This means that by default, LOB objects are fully fetched
-unconditionally by SQLAlchemy, and the linkage to a live cursor is broken.
+consistent with that of other backends, which takes place within a cx_Oracle
+outputtypehandler.
+
+cx_Oracle prior to version 6 would require that LOB objects be read before
+a new batch of rows would be read, as determined by the ``cursor.arraysize``.
+As of the 6 series, this limitation has been lifted. Nevertheless, because
+SQLAlchemy pre-reads these LOBs up front, this issue is avoided in any case.
-To disable this processing, pass ``auto_convert_lobs=False`` to
-:func:`.create_engine()`.
+To disable the auto "read()" feature of the dialect, the flag
+``auto_convert_lobs=False`` may be passed to :func:`.create_engine`. Under
+the cx_Oracle 5 series, having this flag turned off means there is the chance
+of reading from a stale LOB object if not read as it is fetched. With
+cx_Oracle 6, this issue is resolved.
-Two Phase Transaction Support
------------------------------
+.. versionchanged:: 1.2 the LOB handling system has been greatly simplified
+ internally to make use of outputtypehandlers, and no longer makes use
+ of alternate "buffered" result set objects.
+
+Two Phase Transactions Not Supported
+-------------------------------------
Two phase transactions are **not supported** under cx_Oracle due to poor
driver support. As of cx_Oracle 6.0b1, the interface for
@@ -194,72 +138,45 @@ two phase transactions has been changed to be more of a direct pass-through
to the underlying OCI layer with less automation. The additional logic
to support this system is not implemented in SQLAlchemy.
-
.. _cx_oracle_numeric:
Precision Numerics
------------------
-The SQLAlchemy dialect goes through a lot of steps to ensure
-that decimal numbers are sent and received with full accuracy.
-An "outputtypehandler" callable is associated with each
-cx_oracle connection object which detects numeric types and
-receives them as string values, instead of receiving a Python
-``float`` directly, which is then passed to the Python
-``Decimal`` constructor. The :class:`.Numeric` and
-:class:`.Float` types under the cx_oracle dialect are aware of
-this behavior, and will coerce the ``Decimal`` to ``float`` if
-the ``asdecimal`` flag is ``False`` (default on :class:`.Float`,
-optional on :class:`.Numeric`).
-
-Because the handler coerces to ``Decimal`` in all cases first,
-the feature can detract significantly from performance.
-If precision numerics aren't required, the decimal handling
-can be disabled by passing the flag ``coerce_to_decimal=False``
-to :func:`.create_engine`::
+SQLAlchemy's numeric types can handle receiving and returning values as Python
+``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a
+subclass such as :class:`.Float`, :class:`.oracle.DOUBLE_PRECISION` etc. is in
+use, the :paramref:`.Numeric.asdecimal` flag determines if values should be
+coerced to ``Decimal`` upon return, or returned as float objects. To make
+matters more complicated under Oracle, Oracle's ``NUMBER`` type can also
+represent integer values if the "scale" is zero, so the Oracle-specific
+:class:`.oracle.NUMBER` type takes this into account as well.
+
+The cx_Oracle dialect makes extensive use of connection- and cursor-level
+"outputtypehandler" callables in order to coerce numeric values as requested.
+These callables are specific to the specific flavor of :class:`.Numeric` in
+use, as well as if no SQLAlchemy typing objects are present. There are
+observed scenarios where Oracle may sends incomplete or ambiguous information
+about the numeric types being returned, such as a query where the numeric types
+are buried under multiple levels of subquery. The type handlers do their best
+to make the right decision in all cases, deferring to the underlying cx_Oracle
+DBAPI for all those cases where the driver can make the best decision.
+
+When no typing objects are present, as when executing plain SQL strings, a
+default "outputtypehandler" is present which will generally return numeric
+values which specify precision and scale as Python ``Decimal`` objects. To
+disable this coercion to decimal for performance reasons, pass the flag
+``coerce_to_decimal=False`` to :func:`.create_engine`::
engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False)
-.. versionadded:: 0.7.6
- Add the ``coerce_to_decimal`` flag.
-
-Another alternative to performance is to use the
-`cdecimal <http://pypi.python.org/pypi/cdecimal/>`_ library;
-see :class:`.Numeric` for additional notes.
-
-The handler attempts to use the "precision" and "scale"
-attributes of the result set column to best determine if
-subsequent incoming values should be received as ``Decimal`` as
-opposed to int (in which case no processing is added). There are
-several scenarios where OCI_ does not provide unambiguous data
-as to the numeric type, including some situations where
-individual rows may return a combination of floating point and
-integer values. Certain values for "precision" and "scale" have
-been observed to determine this scenario. When it occurs, the
-outputtypehandler receives as string and then passes off to a
-processing function which detects, for each returned value, if a
-decimal point is present, and if so converts to ``Decimal``,
-otherwise to int. The intention is that simple int-based
-statements like "SELECT my_seq.nextval() FROM DUAL" continue to
-return ints and not ``Decimal`` objects, and that any kind of
-floating point value is received as a string so that there is no
-floating point loss of precision.
-
-The "decimal point is present" logic itself is also sensitive to
-locale. Under OCI_, this is controlled by the NLS_LANG
-environment variable. Upon first connection, the dialect runs a
-test to determine the current "decimal" character, which can be
-a comma "," for European locales. From that point forward the
-outputtypehandler uses that character to represent a decimal
-point. Note that cx_oracle 5.0.3 or greater is required
-when dealing with numerics with locale settings that don't use
-a period "." as the decimal character.
-
-.. versionchanged:: 0.6.6
- The outputtypehandler supports the case where the locale uses a
- comma "," character to represent a decimal point.
-
-.. _OCI: http://www.oracle.com/technetwork/database/features/oci/index.html
+The ``coerce_to_decimal`` flag only impacts the results of plain string
+SQL staements that are not otherwise associated with a :class:`.Numeric`
+SQLAlchemy type (or a subclass of such).
+
+.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been
+ reworked to take advantage of newer cx_Oracle features as well
+ as better integration of outputtypehandlers.
"""
@@ -269,7 +186,6 @@ from .base import OracleCompiler, OracleDialect, OracleExecutionContext
from . import base as oracle
from ...engine import result as _result
from sqlalchemy import types as sqltypes, util, exc, processors
-from sqlalchemy import util
import random
import collections
import decimal
@@ -277,47 +193,99 @@ import re
import time
+class _OracleInteger(sqltypes.Integer):
+ def _cx_oracle_var(self, dialect, cursor):
+ cx_Oracle = dialect.dbapi
+ return cursor.var(
+ cx_Oracle.STRING,
+ 255,
+ arraysize=cursor.arraysize,
+ outconverter=int
+ )
+
+ def _cx_oracle_outputtypehandler(self, dialect):
+ def handler(cursor, name,
+ default_type, size, precision, scale):
+ return self._cx_oracle_var(dialect, cursor)
+ return handler
+
+
class _OracleNumeric(sqltypes.Numeric):
+ is_number = False
+
def bind_processor(self, dialect):
- # cx_oracle accepts Decimal objects and floats
- return None
+ if self.scale == 0:
+ return None
+ elif self.asdecimal:
+ processor = processors.to_decimal_processor_factory(
+ decimal.Decimal, self._effective_decimal_return_scale)
+
+ def process(value):
+ if isinstance(value, (int, float)):
+ return processor(value)
+ else:
+ return value
+ return process
+ else:
+ return processors.to_float
def result_processor(self, dialect, coltype):
- # we apply a cx_oracle type handler to all connections
- # that converts floating point strings to Decimal().
- # However, in some subquery situations, Oracle doesn't
- # give us enough information to determine int or Decimal.
- # It could even be int/Decimal differently on each row,
- # regardless of the scale given for the originating type.
- # So we still need an old school isinstance() handler
- # here for decimals.
-
- if dialect.supports_native_decimal:
- if self.asdecimal:
- fstring = "%%.%df" % self._effective_decimal_return_scale
-
- def to_decimal(value):
- if value is None:
- return None
- elif isinstance(value, decimal.Decimal):
- return value
- else:
- return decimal.Decimal(fstring % value)
+ return None
+
+ def _cx_oracle_outputtypehandler(self, dialect):
+ cx_Oracle = dialect.dbapi
+
+ is_cx_oracle_6 = dialect._is_cx_oracle_6
+ has_native_int = dialect._has_native_int
- return to_decimal
+ def handler(cursor, name, default_type, size, precision, scale):
+ outconverter = None
+ if precision:
+ if self.asdecimal:
+ if is_cx_oracle_6:
+ type_ = decimal.Decimal
+ else:
+ type_ = cx_Oracle.STRING
+ outconverter = dialect._to_decimal
+ else:
+ if self.is_number and scale == 0:
+ if has_native_int:
+ type_ = cx_Oracle.NATIVE_INT
+ else:
+ type_ = cx_Oracle.NUMBER
+ outconverter = int
+ else:
+ type_ = cx_Oracle.NATIVE_FLOAT
else:
- if self.precision is None and self.scale is None:
- return processors.to_float
- elif not getattr(self, '_is_oracle_number', False) \
- and self.scale is not None:
- return processors.to_float
+ if self.asdecimal:
+ if is_cx_oracle_6:
+ type_ = decimal.Decimal
+ else:
+ type_ = cx_Oracle.STRING
+ outconverter = dialect._to_decimal
else:
- return None
- else:
- # cx_oracle 4 behavior, will assume
- # floats
- return super(_OracleNumeric, self).\
- result_processor(dialect, coltype)
+ if self.is_number and scale == 0:
+ if has_native_int:
+ type_ = cx_Oracle.NATIVE_INT
+ else:
+ type_ = cx_Oracle.NUMBER
+ outconverter = int
+ else:
+ type_ = cx_Oracle.NATIVE_FLOAT
+
+ return cursor.var(
+ type_, 255,
+ arraysize=cursor.arraysize,
+ outconverter=outconverter
+ )
+
+ return handler
+
+
+class _OracleNUMBER(_OracleNumeric):
+ is_number = True
+
+
class _OracleDate(sqltypes.Date):
@@ -333,118 +301,59 @@ class _OracleDate(sqltypes.Date):
return process
-class _LOBMixin(object):
- def result_processor(self, dialect, coltype):
- if not dialect.auto_convert_lobs:
- # return the cx_oracle.LOB directly.
- return None
-
- def process(value):
- if value is not None:
- return value.read()
- else:
- return value
- return process
-
-
-class _NativeUnicodeMixin(object):
- if util.py2k:
- def bind_processor(self, dialect):
- if dialect._cx_oracle_with_unicode:
- def process(value):
- if value is None:
- return value
- else:
- return unicode(value)
- return process
- else:
- return super(
- _NativeUnicodeMixin, self).bind_processor(dialect)
-
- # we apply a connection output handler that returns
- # unicode in all cases, so the "native_unicode" flag
- # will be set for the default String.result_processor.
-
-
-class _OracleChar(_NativeUnicodeMixin, sqltypes.CHAR):
+class _OracleChar(sqltypes.CHAR):
def get_dbapi_type(self, dbapi):
return dbapi.FIXED_CHAR
-class _OracleNVarChar(_NativeUnicodeMixin, sqltypes.NVARCHAR):
+class _OracleNVarChar(sqltypes.NVARCHAR):
def get_dbapi_type(self, dbapi):
return getattr(dbapi, 'UNICODE', dbapi.STRING)
-class _OracleText(_LOBMixin, sqltypes.Text):
+class _OracleText(sqltypes.Text):
def get_dbapi_type(self, dbapi):
return dbapi.CLOB
class _OracleLong(oracle.LONG):
- # a raw LONG is a text type, but does *not*
- # get the LobMixin with cx_oracle.
-
def get_dbapi_type(self, dbapi):
return dbapi.LONG_STRING
-class _OracleString(_NativeUnicodeMixin, sqltypes.String):
+class _OracleString(sqltypes.String):
pass
-class _OracleEnum(_NativeUnicodeMixin, sqltypes.Enum):
+
+class _OracleEnum(sqltypes.Enum):
def bind_processor(self, dialect):
enum_proc = sqltypes.Enum.bind_processor(self, dialect)
- if util.py2k:
- unicode_proc = _NativeUnicodeMixin.bind_processor(self, dialect)
- else:
- unicode_proc = None
def process(value):
raw_str = enum_proc(value)
- if unicode_proc:
- raw_str = unicode_proc(raw_str)
return raw_str
return process
-class _OracleUnicodeText(
- _LOBMixin, _NativeUnicodeMixin, sqltypes.UnicodeText):
+class _OracleUnicodeText(sqltypes.UnicodeText):
def get_dbapi_type(self, dbapi):
return dbapi.NCLOB
- def result_processor(self, dialect, coltype):
- lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
- if lob_processor is None:
- return None
-
- string_processor = sqltypes.UnicodeText.result_processor(
- self, dialect, coltype)
-
- if string_processor is None:
- return lob_processor
- else:
- def process(value):
- return string_processor(lob_processor(value))
- return process
-
-
-class _OracleInteger(sqltypes.Integer):
- def result_processor(self, dialect, coltype):
- def to_int(val):
- if val is not None:
- val = int(val)
- return val
- return to_int
-
-class _OracleBinary(_LOBMixin, sqltypes.LargeBinary):
+class _OracleBinary(sqltypes.LargeBinary):
def get_dbapi_type(self, dbapi):
return dbapi.BLOB
def bind_processor(self, dialect):
return None
+ def result_processor(self, dialect, coltype):
+ if not dialect.auto_convert_lobs:
+ return None
+ else:
+ return super(_OracleBinary, self).result_processor(
+ dialect, coltype)
+
class _OracleInterval(oracle.INTERVAL):
def get_dbapi_type(self, dbapi):
@@ -461,6 +370,8 @@ class _OracleRowid(oracle.ROWID):
class OracleCompiler_cx_oracle(OracleCompiler):
+ _oracle_cx_sql_compiler = True
+
def bindparam_string(self, name, **kw):
quote = getattr(name, 'quote', None)
if quote is True or quote is not False and \
@@ -473,57 +384,92 @@ class OracleCompiler_cx_oracle(OracleCompiler):
class OracleExecutionContext_cx_oracle(OracleExecutionContext):
+ out_parameters = None
- def pre_exec(self):
- quoted_bind_names = \
- getattr(self.compiled, '_quoted_bind_names', None)
+ def _setup_quoted_bind_names(self):
+ quoted_bind_names = self.compiled._quoted_bind_names
if quoted_bind_names:
- if not self.dialect.supports_unicode_statements:
- # if DBAPI doesn't accept unicode statements,
- # keys in self.parameters would have been encoded
- # here. so convert names in quoted_bind_names
- # to encoded as well.
- quoted_bind_names = \
- dict(
- (fromname.encode(self.dialect.encoding),
- toname.encode(self.dialect.encoding))
- for fromname, toname in
- quoted_bind_names.items()
- )
for param in self.parameters:
for fromname, toname in quoted_bind_names.items():
param[toname] = param[fromname]
del param[fromname]
- if self.dialect.auto_setinputsizes:
- # cx_oracle really has issues when you setinputsizes
- # on String, including that outparams/RETURNING
- # breaks for varchars
- self.set_input_sizes(
- quoted_bind_names,
- exclude_types=self.dialect.exclude_setinputsizes
- )
-
+ def _handle_out_parameters(self):
# if a single execute, check for outparams
if len(self.compiled_parameters) == 1:
+ quoted_bind_names = self.compiled._quoted_bind_names
for bindparam in self.compiled.binds.values():
if bindparam.isoutparam:
- dbtype = bindparam.type.dialect_impl(self.dialect).\
- get_dbapi_type(self.dialect.dbapi)
- if not hasattr(self, 'out_parameters'):
- self.out_parameters = {}
- if dbtype is None:
- raise exc.InvalidRequestError(
- "Cannot create out parameter for parameter "
- "%r - its type %r is not supported by"
- " cx_oracle" %
- (bindparam.key, bindparam.type)
- )
name = self.compiled.bind_names[bindparam]
- self.out_parameters[name] = self.cursor.var(dbtype)
+ type_impl = bindparam.type.dialect_impl(self.dialect)
+ if hasattr(type_impl, '_cx_oracle_var'):
+ self.out_parameters[name] = type_impl._cx_oracle_var(
+ self.dialect, self.cursor)
+ else:
+ dbtype = type_impl.get_dbapi_type(self.dialect.dbapi)
+ if dbtype is None:
+ raise exc.InvalidRequestError(
+ "Cannot create out parameter for parameter "
+ "%r - its type %r is not supported by"
+ " cx_oracle" %
+ (bindparam.key, bindparam.type)
+ )
+ self.out_parameters[name] = self.cursor.var(dbtype)
self.parameters[0][quoted_bind_names.get(name, name)] = \
self.out_parameters[name]
+ def _generate_cursor_outputtype_handler(self):
+ output_handlers = {}
+
+ for (keyname, name, objects, type_) in self.compiled._result_columns:
+ handler = type_._cached_custom_processor(
+ self.dialect,
+ 'cx_oracle_outputtypehandler',
+ self._get_cx_oracle_type_handler)
+
+ if handler:
+ denormalized_name = self.dialect.denormalize_name(keyname)
+ output_handlers[denormalized_name] = handler
+
+ if output_handlers:
+ default_handler = self._dbapi_connection.outputtypehandler
+
+ def output_type_handler(cursor, name, default_type,
+ size, precision, scale):
+ if name in output_handlers:
+ return output_handlers[name](
+ cursor, name,
+ default_type, size, precision, scale)
+ else:
+ return default_handler(
+ cursor, name, default_type, size, precision, scale
+ )
+ self.cursor.outputtypehandler = output_type_handler
+
+ def _get_cx_oracle_type_handler(self, impl):
+ if hasattr(impl, "_cx_oracle_outputtypehandler"):
+ return impl._cx_oracle_outputtypehandler(self.dialect)
+ else:
+ return None
+
+ def pre_exec(self):
+ if not getattr(self.compiled, "_oracle_cx_sql_compiler", False):
+ return
+
+ self.out_parameters = {}
+
+ if self.compiled._quoted_bind_names:
+ self._setup_quoted_bind_names()
+
+ self.set_input_sizes(
+ self.compiled._quoted_bind_names,
+ include_types=self.dialect._include_setinputsizes
+ )
+
+ self._handle_out_parameters()
+
+ self._generate_cursor_outputtype_handler()
+
def create_cursor(self):
c = self._dbapi_connection.cursor()
if self.dialect.arraysize:
@@ -532,24 +478,16 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
return c
def get_result_proxy(self):
- if hasattr(self, 'out_parameters') and self.compiled.returning:
- returning_params = dict(
- (k, v.getvalue())
- for k, v in self.out_parameters.items()
- )
+ if self.out_parameters and self.compiled.returning:
+ returning_params = [
+ self.out_parameters["ret_%d" % i].getvalue()
+ for i in range(len(self.out_parameters))
+ ]
return ReturningResultProxy(self, returning_params)
- result = None
- if self.cursor.description is not None:
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect._cx_oracle_binary_types:
- result = _result.BufferedColumnResultProxy(self)
+ result = _result.ResultProxy(self)
- if result is None:
- result = _result.ResultProxy(self)
-
- if hasattr(self, 'out_parameters'):
+ if self.out_parameters:
if self.compiled_parameters is not None and \
len(self.compiled_parameters) == 1:
result.out_parameters = out_parameters = {}
@@ -579,30 +517,6 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
return result
-class OracleExecutionContext_cx_oracle_with_unicode(
- OracleExecutionContext_cx_oracle):
- """Support WITH_UNICODE in Python 2.xx.
-
- WITH_UNICODE allows cx_Oracle's Python 3 unicode handling
- behavior under Python 2.x. This mode in some cases disallows
- and in other cases silently passes corrupted data when
- non-Python-unicode strings (a.k.a. plain old Python strings)
- are passed as arguments to connect(), the statement sent to execute(),
- or any of the bind parameter keys or values sent to execute().
- This optional context therefore ensures that all statements are
- passed as Python unicode objects.
-
- """
-
- def __init__(self, *arg, **kw):
- OracleExecutionContext_cx_oracle.__init__(self, *arg, **kw)
- self.statement = util.text_type(self.statement)
-
- def _execute_scalar(self, stmt, type_):
- return super(OracleExecutionContext_cx_oracle_with_unicode, self).\
- _execute_scalar(util.text_type(stmt), type_)
-
-
class ReturningResultProxy(_result.FullyBufferedResultProxy):
"""Result proxy which stuffs the _returning clause + outparams
into the fetch."""
@@ -614,14 +528,13 @@ class ReturningResultProxy(_result.FullyBufferedResultProxy):
def _cursor_description(self):
returning = self.context.compiled.returning
return [
- ("ret_%d" % i, None)
- for i, col in enumerate(returning)
+ (getattr(col, 'name', col.anon_label), None)
+ for col in returning
]
def _buffer_rows(self):
return collections.deque(
- [tuple(self._returning_params["ret_%d" % i]
- for i, c in enumerate(self._returning_params))]
+ [tuple(self._returning_params)]
)
@@ -632,11 +545,17 @@ class OracleDialect_cx_oracle(OracleDialect):
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
+ supports_unicode_statements = True
+ supports_unicode_binds = True
+
driver = "cx_oracle"
colspecs = colspecs = {
sqltypes.Numeric: _OracleNumeric,
- # generic type, assume datetime.date is desired
+ sqltypes.Float: _OracleNumeric,
+ sqltypes.Integer: _OracleInteger,
+ oracle.NUMBER: _OracleNUMBER,
+
sqltypes.Date: _OracleDate,
sqltypes.LargeBinary: _OracleBinary,
sqltypes.Boolean: oracle._OracleBoolean,
@@ -648,14 +567,7 @@ class OracleDialect_cx_oracle(OracleDialect):
sqltypes.CHAR: _OracleChar,
sqltypes.Enum: _OracleEnum,
- # a raw LONG is a text type, but does *not*
- # get the LobMixin with cx_oracle.
oracle.LONG: _OracleLong,
-
- # this is only needed for OUT parameters.
- # it would be nice if we could not use it otherwise.
- sqltypes.Integer: _OracleInteger,
-
oracle.RAW: _OracleRaw,
sqltypes.Unicode: _OracleNVarChar,
sqltypes.NVARCHAR: _OracleNVarChar,
@@ -665,106 +577,58 @@ class OracleDialect_cx_oracle(OracleDialect):
execute_sequence_format = list
def __init__(self,
- auto_setinputsizes=True,
- exclude_setinputsizes=("STRING", "UNICODE"),
auto_convert_lobs=True,
threaded=True,
- allow_twophase=True,
- coerce_to_decimal=True,
coerce_to_unicode=False,
- arraysize=50, _retry_on_12516=False,
+ coerce_to_decimal=True,
+ arraysize=50,
**kwargs):
+
+ self._pop_deprecated_kwargs(kwargs)
+
OracleDialect.__init__(self, **kwargs)
self.threaded = threaded
self.arraysize = arraysize
- self.allow_twophase = allow_twophase
- self.supports_timestamp = self.dbapi is None or \
- hasattr(self.dbapi, 'TIMESTAMP')
- self.auto_setinputsizes = auto_setinputsizes
self.auto_convert_lobs = auto_convert_lobs
- self._retry_on_12516 = _retry_on_12516
+ self.coerce_to_unicode = coerce_to_unicode
+ self.coerce_to_decimal = coerce_to_decimal
- if hasattr(self.dbapi, 'version'):
- self.cx_oracle_ver = self._parse_cx_oracle_ver(self.dbapi.version)
+ cx_Oracle = self.dbapi
- else:
+ if cx_Oracle is None:
+ self._include_setinputsizes = {}
self.cx_oracle_ver = (0, 0, 0)
-
- def types(*names):
- return set(
- getattr(self.dbapi, name, None) for name in names
- ).difference([None])
-
- self.exclude_setinputsizes = types(*(exclude_setinputsizes or ()))
- self._cx_oracle_string_types = types("STRING", "UNICODE",
- "NCLOB", "CLOB")
- self._cx_oracle_unicode_types = types("UNICODE", "NCLOB")
- self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB")
- self.supports_unicode_binds = self.cx_oracle_ver >= (5, 0)
-
- self._enable_twophase = self.cx_oracle_ver < (6, 0)
-
- self.supports_sane_multi_rowcount = self.cx_oracle_ver >= (5, 0)
-
- self.coerce_to_unicode = (
- self.cx_oracle_ver >= (5, 0) and
- coerce_to_unicode
- )
-
- self.supports_native_decimal = (
- self.cx_oracle_ver >= (5, 0) and
- coerce_to_decimal
- )
-
- self._cx_oracle_native_nvarchar = self.cx_oracle_ver >= (5, 0)
-
- if self.cx_oracle_ver is None:
- # this occurs in tests with mock DBAPIs
- self._cx_oracle_string_types = set()
- self._cx_oracle_with_unicode = False
- elif util.py3k or (
- self.cx_oracle_ver >= (5,) and
- self.cx_oracle_ver < (5, 1) and not
- hasattr(self.dbapi, 'UNICODE')
- ):
- # cx_Oracle WITH_UNICODE mode. *only* python
- # unicode objects accepted for anything. This
- # mode of operation is implicit for Python 3,
- # however under Python 2 it existed as a never-used build-time
- # option for cx_Oracle 5.0 only and was removed in 5.1.
- self.supports_unicode_statements = True
- self.supports_unicode_binds = True
- self._cx_oracle_with_unicode = True
-
- if util.py2k:
- # There's really no reason to run with WITH_UNICODE under
- # Python 2.x. Give the user a hint.
- util.warn(
- "cx_Oracle is compiled under Python 2.xx using the "
- "WITH_UNICODE flag. Consider recompiling cx_Oracle "
- "without this flag, which is in no way necessary for "
- "full support of Unicode and causes significant "
- "performance issues.")
- self.execution_ctx_cls = \
- OracleExecutionContext_cx_oracle_with_unicode
else:
- self._cx_oracle_with_unicode = False
+ self.cx_oracle_ver = self._parse_cx_oracle_ver(cx_Oracle.version)
+ if self.cx_oracle_ver < (5, 0) and self.cx_oracle_ver > (0, 0, 0):
+ raise exc.InvalidRequestError(
+ "cx_Oracle version 5.0 and above are supported")
- if self.cx_oracle_ver is None or \
- not self.auto_convert_lobs or \
- not hasattr(self.dbapi, 'CLOB'):
- self.dbapi_type_map = {}
- else:
- # only use this for LOB objects. using it for strings, dates
- # etc. leads to a little too much magic, reflection doesn't know
- # if it should expect encoded strings or unicodes, etc.
- self.dbapi_type_map = {
- self.dbapi.CLOB: oracle.CLOB(),
- self.dbapi.NCLOB: oracle.NCLOB(),
- self.dbapi.BLOB: oracle.BLOB(),
- self.dbapi.BINARY: oracle.RAW(),
+ self._has_native_int = hasattr(cx_Oracle, "NATIVE_INT")
+
+ self._include_setinputsizes = {
+ cx_Oracle.NCLOB, cx_Oracle.CLOB, cx_Oracle.LOB,
+ cx_Oracle.BLOB, cx_Oracle.FIXED_CHAR,
}
+ self._is_cx_oracle_6 = self.cx_oracle_ver >= (6, )
+
+ def _pop_deprecated_kwargs(self, kwargs):
+ auto_setinputsizes = kwargs.pop('auto_setinputsizes', None)
+ exclude_setinputsizes = kwargs.pop('exclude_setinputsizes', None)
+ if auto_setinputsizes or exclude_setinputsizes:
+ util.warn_deprecated(
+ "auto_setinputsizes and exclude_setinputsizes are deprecated. "
+ "Modern cx_Oracle only requires that LOB types are part "
+ "of this behavior, and these parameters no longer have any "
+ "effect.")
+ allow_twophase = kwargs.pop('allow_twophase', None)
+ if allow_twophase is not None:
+ util.warn.deprecated(
+ "allow_twophase is deprecated. The cx_Oracle dialect no "
+ "longer supports two-phase transaction mode."
+ )
+
def _parse_cx_oracle_ver(self, version):
m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?', version)
if m:
@@ -780,115 +644,94 @@ class OracleDialect_cx_oracle(OracleDialect):
import cx_Oracle
return cx_Oracle
- def connect(self, *cargs, **cparams):
- if self._retry_on_12516:
- # emergency flag for the SQLAlchemy test suite, which has
- # decreased in stability since cx_oracle 5.3; generalized
- # "retry on connect" functionality is part of an upcoming
- # SQLAlchemy feature
- try:
- return self.dbapi.connect(*cargs, **cparams)
- except self.dbapi.DatabaseError as err:
- if "ORA-12516" in str(err):
- time.sleep(2)
- return self.dbapi.connect(*cargs, **cparams)
- else:
- raise
- else:
- return super(OracleDialect_cx_oracle, self).connect(
- *cargs, **cparams)
-
def initialize(self, connection):
super(OracleDialect_cx_oracle, self).initialize(connection)
if self._is_oracle_8:
self.supports_unicode_binds = False
+
self._detect_decimal_char(connection)
def _detect_decimal_char(self, connection):
- """detect if the decimal separator character is not '.', as
- is the case with European locale settings for NLS_LANG.
-
- cx_oracle itself uses similar logic when it formats Python
- Decimal objects to strings on the bind side (as of 5.0.3),
- as Oracle sends/receives string numerics only in the
- current locale.
-
- """
- if self.cx_oracle_ver < (5,):
- # no output type handlers before version 5
- return
-
- cx_Oracle = self.dbapi
- conn = connection.connection
-
- # override the output_type_handler that's
- # on the cx_oracle connection with a plain
- # one on the cursor
-
- def output_type_handler(cursor, name, defaultType,
- size, precision, scale):
- return cursor.var(
- cx_Oracle.STRING,
- 255, arraysize=cursor.arraysize)
-
- cursor = conn.cursor()
- cursor.outputtypehandler = output_type_handler
- cursor.execute("SELECT 0.1 FROM DUAL")
- val = cursor.fetchone()[0]
- cursor.close()
- char = re.match(r"([\.,])", val).group(1)
- if char != '.':
+ # we have the option to change this setting upon connect,
+ # or just look at what it is upon connect and convert.
+ # to minimize the chance of interference with changes to
+ # NLS_TERRITORY or formatting behavior of the DB, we opt
+ # to just look at it
+
+ self._decimal_char = connection.scalar(
+ "select value from nls_session_parameters "
+ "where parameter = 'NLS_NUMERIC_CHARACTERS'")[0]
+ if self._decimal_char != '.':
_detect_decimal = self._detect_decimal
- self._detect_decimal = \
- lambda value: _detect_decimal(value.replace(char, '.'))
- self._to_decimal = \
- lambda value: decimal.Decimal(value.replace(char, '.'))
+ _to_decimal = self._to_decimal
+
+ self._detect_decimal = lambda value: _detect_decimal(
+ value.replace(self._decimal_char, "."))
+ self._to_decimal = lambda value: _to_decimal(
+ value.replace(self._decimal_char, "."))
def _detect_decimal(self, value):
if "." in value:
- return decimal.Decimal(value)
+ return self._to_decimal(value)
else:
return int(value)
_to_decimal = decimal.Decimal
- def on_connect(self):
- if self.cx_oracle_ver < (5,):
- # no output type handlers before version 5
- return
+ def _generate_connection_outputtype_handler(self):
+ """establish the default outputtypehandler established at the
+ connection level.
- cx_Oracle = self.dbapi
+ """
- def output_type_handler(cursor, name, defaultType,
+ dialect = self
+ cx_Oracle = dialect.dbapi
+
+ number_handler = _OracleNUMBER(asdecimal=True).\
+ _cx_oracle_outputtypehandler(dialect)
+ float_handler = _OracleNUMBER(asdecimal=False).\
+ _cx_oracle_outputtypehandler(dialect)
+
+ def output_type_handler(cursor, name, default_type,
size, precision, scale):
- # convert all NUMBER with precision + positive scale to Decimal
- # this almost allows "native decimal" mode.
+ if default_type == cx_Oracle.NUMBER:
+ if not dialect.coerce_to_decimal:
+ return None
+ elif precision == 0 and scale in (0, -127):
+ # ambiguous type, this occurs when selecting
+ # numbers from deep subqueries
+ return cursor.var(
+ cx_Oracle.STRING,
+ 255,
+ outconverter=dialect._detect_decimal,
+ arraysize=cursor.arraysize)
+ elif precision and scale > 0:
+ return number_handler(
+ cursor, name, default_type, size, precision, scale
+ )
+ else:
+ return float_handler(
+ cursor, name, default_type, size, precision, scale
+ )
- if self.supports_native_decimal and \
- defaultType == cx_Oracle.NUMBER and \
- precision and scale > 0:
+ # allow all strings to come back natively as Unicode
+ elif dialect.coerce_to_unicode and \
+ default_type in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR):
return cursor.var(
- cx_Oracle.STRING,
- 255,
- outconverter=self._to_decimal,
- arraysize=cursor.arraysize)
- # if NUMBER with zero precision and 0 or neg scale, this appears
- # to indicate "ambiguous". Use a slower converter that will
- # make a decision based on each value received - the type
- # may change from row to row (!). This kills
- # off "native decimal" mode, handlers still needed.
- elif self.supports_native_decimal and \
- defaultType == cx_Oracle.NUMBER \
- and not precision and scale <= 0:
+ util.text_type, size, cursor.arraysize
+ )
+ elif dialect.auto_convert_lobs and default_type in (
+ cx_Oracle.CLOB, cx_Oracle.NCLOB, cx_Oracle.BLOB
+ ):
return cursor.var(
- cx_Oracle.STRING,
- 255,
- outconverter=self._detect_decimal,
- arraysize=cursor.arraysize)
- # allow all strings to come back natively as Unicode
- elif self.coerce_to_unicode and \
- defaultType in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR):
- return cursor.var(util.text_type, size, cursor.arraysize)
+ default_type, size, cursor.arraysize,
+ outconverter=lambda value: value.read()
+ )
+ return output_type_handler
+
+ def on_connect(self):
+
+ output_type_handler = self._generate_connection_outputtype_handler()
def on_connect(conn):
conn.outputtypehandler = output_type_handler
@@ -932,9 +775,6 @@ class OracleDialect_cx_oracle(OracleDialect):
threaded=self.threaded,
)
- if self._enable_twophase:
- opts['twophase'] = self.allow_twophase
-
if dsn is not None:
opts['dsn'] = dsn
if url.password is not None:
@@ -942,16 +782,6 @@ class OracleDialect_cx_oracle(OracleDialect):
if url.username is not None:
opts['user'] = url.username
- if util.py2k:
- if self._cx_oracle_with_unicode:
- for k, v in opts.items():
- if isinstance(v, str):
- opts[k] = unicode(v)
- else:
- for k, v in opts.items():
- if isinstance(v, unicode):
- opts[k] = str(v)
-
if 'mode' in url.query:
opts['mode'] = url.query['mode']
if isinstance(opts['mode'], util.string_types):
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 8b72c0001..227ff0845 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -108,7 +108,6 @@ class DefaultDialect(interfaces.Dialect):
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
- dbapi_type_map = {}
colspecs = {}
default_paramstyle = 'named'
supports_default_values = False
@@ -1112,7 +1111,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
return (self.isinsert or self.isupdate) and \
bool(self.compiled.postfetch)
- def set_input_sizes(self, translate=None, exclude_types=None):
+ def set_input_sizes(
+ self, translate=None, include_types=None, exclude_types=None):
"""Given a cursor and ClauseParameters, call the appropriate
style of ``setinputsizes()`` on the cursor, using DB-API types
from the bind parameter's ``TypeEngine`` objects.
@@ -1136,7 +1136,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
dbtype = typeengine.dialect_impl(self.dialect).\
get_dbapi_type(self.dialect.dbapi)
if dbtype is not None and \
- (not exclude_types or dbtype not in exclude_types):
+ (not exclude_types or dbtype not in exclude_types) and \
+ (not include_types or dbtype in include_types):
if key in self._expanded_parameters:
inputsizes.extend(
[dbtype] * len(self._expanded_parameters[key]))
@@ -1154,7 +1155,8 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
dbtype = typeengine.dialect_impl(self.dialect).\
get_dbapi_type(self.dialect.dbapi)
if dbtype is not None and \
- (not exclude_types or dbtype not in exclude_types):
+ (not exclude_types or dbtype not in exclude_types) and \
+ (not include_types or dbtype in include_types):
if translate:
# TODO: this part won't work w/ the
# expanded_parameters feature, e.g. for cx_oracle
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index 3e09e4971..518038d29 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -110,16 +110,6 @@ class Dialect(object):
the "implicit" functionality is not used and inserted_primary_key
will not be available.
- dbapi_type_map
- A mapping of DB-API type objects present in this Dialect's
- DB-API implementation mapped to TypeEngine implementations used
- by the dialect.
-
- This is used to apply types to result sets based on the DB-API
- types present in cursor.description; it only takes effect for
- result sets against textual statements where no explicit
- typemap was present.
-
colspecs
A dictionary of TypeEngine classes from sqlalchemy.types mapped
to subclasses that are specific to the dialect class. This
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index 907dc7bd2..3aae932f2 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -447,7 +447,6 @@ class ResultMetaData(object):
def _merge_textual_cols_by_position(
self, context, cursor_description, result_columns):
dialect = context.dialect
- typemap = dialect.dbapi_type_map
num_ctx_cols = len(result_columns) if result_columns else None
if num_ctx_cols > len(cursor_description):
@@ -470,14 +469,13 @@ class ResultMetaData(object):
"in textual SQL: %r" % obj[0])
seen.add(obj[0])
else:
- mapped_type = typemap.get(coltype, sqltypes.NULLTYPE)
+ mapped_type = sqltypes.NULLTYPE
obj = None
yield idx, colname, mapped_type, coltype, obj, untranslated
def _merge_cols_by_name(self, context, cursor_description, result_columns):
dialect = context.dialect
- typemap = dialect.dbapi_type_map
case_sensitive = dialect.case_sensitive
result_map = self._create_result_map(result_columns, case_sensitive)
@@ -487,7 +485,7 @@ class ResultMetaData(object):
try:
ctx_rec = result_map[colname]
except KeyError:
- mapped_type = typemap.get(coltype, sqltypes.NULLTYPE)
+ mapped_type = sqltypes.NULLTYPE
obj = None
else:
obj = ctx_rec[1]
@@ -496,11 +494,9 @@ class ResultMetaData(object):
def _merge_cols_by_none(self, context, cursor_description):
dialect = context.dialect
- typemap = dialect.dbapi_type_map
for idx, colname, untranslated, coltype in \
self._colnames_from_description(context, cursor_description):
- mapped_type = typemap.get(coltype, sqltypes.NULLTYPE)
- yield idx, colname, mapped_type, coltype, None, untranslated
+ yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated
@classmethod
def _create_result_map(cls, result_columns, case_sensitive=True):
@@ -1385,8 +1381,10 @@ class BufferedColumnResultProxy(ResultProxy):
fetchone() is called. If fetchmany() or fetchall() are called,
the full grid of results is fetched. This is to operate with
databases where result rows contain "live" results that fall out
- of scope unless explicitly fetched. Currently this includes
- cx_Oracle LOB objects.
+ of scope unless explicitly fetched.
+
+ .. versionchanged:: 1.2 This :class:`.ResultProxy` is not used by
+ any SQLAlchemy-included dialects.
"""
diff --git a/lib/sqlalchemy/sql/type_api.py b/lib/sqlalchemy/sql/type_api.py
index 69dd80938..89c2c9de5 100644
--- a/lib/sqlalchemy/sql/type_api.py
+++ b/lib/sqlalchemy/sql/type_api.py
@@ -477,6 +477,15 @@ class TypeEngine(Visitable):
d[coltype] = rp = d['impl'].result_processor(dialect, coltype)
return rp
+ def _cached_custom_processor(self, dialect, key, fn):
+ try:
+ return dialect._type_memos[self][key]
+ except KeyError:
+ d = self._dialect_info(dialect)
+ impl = d['impl']
+ d[key] = result = fn(impl)
+ return result
+
def _dialect_info(self, dialect):
"""Return a dialect-specific registry which
caches a dialect-specific implementation, bind processing
diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py
index 08d0f0aac..a51705001 100644
--- a/lib/sqlalchemy/testing/assertions.py
+++ b/lib/sqlalchemy/testing/assertions.py
@@ -295,7 +295,6 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
msg, util.text_type(e), re.UNICODE), "%r !~ %s" % (msg, e)
print(util.text_type(e).encode('utf-8'))
-
class AssertsCompiledSQL(object):
def assert_compile(self, clause, result, params=None,
checkparams=None, dialect=None,
diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py
index 17ddbb567..687f84b18 100644
--- a/lib/sqlalchemy/testing/provision.py
+++ b/lib/sqlalchemy/testing/provision.py
@@ -284,7 +284,7 @@ def _oracle_drop_db(cfg, eng, ident):
@_update_db_opts.for_db("oracle")
def _oracle_update_db_opts(db_url, db_opts):
- db_opts['_retry_on_12516'] = True
+ pass
def reap_dbs(idents_file):
diff --git a/lib/sqlalchemy/testing/suite/test_types.py b/lib/sqlalchemy/testing/suite/test_types.py
index 022e7b92d..b9bb179ab 100644
--- a/lib/sqlalchemy/testing/suite/test_types.py
+++ b/lib/sqlalchemy/testing/suite/test_types.py
@@ -511,8 +511,7 @@ class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
numbers
)
- @testing.skip_if(
- "oracle", "temporary skip until cx_oracle refactor is merged")
+ @testing.fails_if(testing.requires.broken_cx_oracle6_numerics)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
"""test exceedingly large decimals.
diff --git a/setup.cfg b/setup.cfg
index f79060c54..7894325e1 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -9,7 +9,7 @@ first-package-wins = true
where = test
[tool:pytest]
-addopts= --tb native -v -r fxX --maxfail=25 -p no:warnings
+addopts= --tb native -v -r sfxX --maxfail=25 -p no:warnings
python_files=test/*test_*.py
[upload]
diff --git a/test/aaa_profiling/test_resultset.py b/test/aaa_profiling/test_resultset.py
index 9ffa21cb6..2b0e8de9e 100644
--- a/test/aaa_profiling/test_resultset.py
+++ b/test/aaa_profiling/test_resultset.py
@@ -51,6 +51,20 @@ class ResultSetTest(fixtures.TestBase, AssertsExecutionResults):
def test_unicode(self):
[tuple(row) for row in t2.select().execute().fetchall()]
+ @profiling.function_call_count()
+ def test_raw_string(self):
+ stmt = 'SELECT %s FROM "table"' % (
+ ", ".join("field%d" % fnum for fnum in range(NUM_FIELDS))
+ )
+ [tuple(row) for row in testing.db.execute(stmt).fetchall()]
+
+ @profiling.function_call_count()
+ def test_raw_unicode(self):
+ stmt = "SELECT %s FROM table2" % (
+ ", ".join("field%d" % fnum for fnum in range(NUM_FIELDS))
+ )
+ [tuple(row) for row in testing.db.execute(stmt).fetchall()]
+
def test_contains_doesnt_compile(self):
row = t.select().execute().first()
c1 = Column('some column', Integer) + \
diff --git a/test/dialect/oracle/__init__.py b/test/dialect/oracle/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/test/dialect/oracle/__init__.py
diff --git a/test/dialect/oracle/test_compiler.py b/test/dialect/oracle/test_compiler.py
new file mode 100644
index 000000000..305359085
--- /dev/null
+++ b/test/dialect/oracle/test_compiler.py
@@ -0,0 +1,792 @@
+# coding: utf-8
+
+
+from sqlalchemy.testing import eq_
+from sqlalchemy import types as sqltypes, exc, schema
+from sqlalchemy.sql import table, column
+from sqlalchemy.testing import (fixtures,
+ AssertsExecutionResults,
+ AssertsCompiledSQL)
+from sqlalchemy import testing
+from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\
+ Index, MetaData, select, inspect, ForeignKey, String, func, \
+ TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \
+ literal_column, VARCHAR, create_engine, Date, NVARCHAR, \
+ ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
+ union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
+ PrimaryKeyConstraint, FLOAT
+from sqlalchemy.util import u, b
+from sqlalchemy import util
+from sqlalchemy.testing import assert_raises, assert_raises_message
+from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
+from sqlalchemy.engine import default
+import decimal
+from sqlalchemy.engine import url
+from sqlalchemy.testing.schema import Table, Column
+import datetime
+import os
+from sqlalchemy import sql
+from sqlalchemy.testing.mock import Mock
+
+
+class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "oracle"
+
+ def test_true_false(self):
+ self.assert_compile(
+ sql.false(), "0"
+ )
+ self.assert_compile(
+ sql.true(),
+ "1"
+ )
+
+ def test_owner(self):
+ meta = MetaData()
+ parent = Table('parent', meta, Column('id', Integer,
+ primary_key=True), Column('name', String(50)),
+ schema='ed')
+ child = Table('child', meta, Column('id', Integer,
+ primary_key=True), Column('parent_id', Integer,
+ ForeignKey('ed.parent.id')), schema='ed')
+ self.assert_compile(parent.join(child),
+ 'ed.parent JOIN ed.child ON ed.parent.id = '
+ 'ed.child.parent_id')
+
+ def test_subquery(self):
+ t = table('sometable', column('col1'), column('col2'))
+ s = select([t])
+ s = select([s.c.col1, s.c.col2])
+
+ self.assert_compile(s, "SELECT col1, col2 FROM (SELECT "
+ "sometable.col1 AS col1, sometable.col2 "
+ "AS col2 FROM sometable)")
+
+ def test_bindparam_quote(self):
+ """test that bound parameters take on quoting for reserved words,
+ column names quote flag enabled."""
+ # note: this is only in cx_oracle at the moment. not sure
+ # what other hypothetical oracle dialects might need
+
+ self.assert_compile(
+ bindparam("option"), ':"option"'
+ )
+ self.assert_compile(
+ bindparam("plain"), ':plain'
+ )
+ t = Table("s", MetaData(), Column('plain', Integer, quote=True))
+ self.assert_compile(
+ t.insert().values(plain=5),
+ 'INSERT INTO s ("plain") VALUES (:"plain")'
+ )
+ self.assert_compile(
+ t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"'
+ )
+
+ def test_cte(self):
+ part = table(
+ 'part',
+ column('part'),
+ column('sub_part'),
+ column('quantity')
+ )
+
+ included_parts = select([
+ part.c.sub_part, part.c.part, part.c.quantity
+ ]).where(part.c.part == "p1").\
+ cte(name="included_parts", recursive=True).\
+ suffix_with(
+ "search depth first by part set ord1",
+ "cycle part set y_cycle to 1 default 0", dialect='oracle')
+
+ incl_alias = included_parts.alias("pr1")
+ parts_alias = part.alias("p")
+ included_parts = included_parts.union_all(
+ select([
+ parts_alias.c.sub_part,
+ parts_alias.c.part, parts_alias.c.quantity
+ ]).where(parts_alias.c.part == incl_alias.c.sub_part)
+ )
+
+ q = select([
+ included_parts.c.sub_part,
+ func.sum(included_parts.c.quantity).label('total_quantity')]).\
+ group_by(included_parts.c.sub_part)
+
+ self.assert_compile(
+ q,
+ "WITH included_parts(sub_part, part, quantity) AS "
+ "(SELECT part.sub_part AS sub_part, part.part AS part, "
+ "part.quantity AS quantity FROM part WHERE part.part = :part_1 "
+ "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, "
+ "p.quantity AS quantity FROM part p, included_parts pr1 "
+ "WHERE p.part = pr1.sub_part) "
+ "search depth first by part set ord1 cycle part set "
+ "y_cycle to 1 default 0 "
+ "SELECT included_parts.sub_part, sum(included_parts.quantity) "
+ "AS total_quantity FROM included_parts "
+ "GROUP BY included_parts.sub_part"
+ )
+
+ def test_limit(self):
+ t = table('sometable', column('col1'), column('col2'))
+ s = select([t])
+ c = s.compile(dialect=oracle.OracleDialect())
+ assert t.c.col1 in set(c._create_result_map()['col1'][1])
+ s = select([t]).limit(10).offset(20)
+ self.assert_compile(s,
+ 'SELECT col1, col2 FROM (SELECT col1, '
+ 'col2, ROWNUM AS ora_rn FROM (SELECT '
+ 'sometable.col1 AS col1, sometable.col2 AS '
+ 'col2 FROM sometable) WHERE ROWNUM <= '
+ ':param_1 + :param_2) WHERE ora_rn > :param_2',
+ checkparams={'param_1': 10, 'param_2': 20})
+
+ c = s.compile(dialect=oracle.OracleDialect())
+ eq_(len(c._result_columns), 2)
+ assert t.c.col1 in set(c._create_result_map()['col1'][1])
+
+ s2 = select([s.c.col1, s.c.col2])
+ self.assert_compile(s2,
+ 'SELECT col1, col2 FROM (SELECT col1, col2 '
+ 'FROM (SELECT col1, col2, ROWNUM AS ora_rn '
+ 'FROM (SELECT sometable.col1 AS col1, '
+ 'sometable.col2 AS col2 FROM sometable) '
+ 'WHERE ROWNUM <= :param_1 + :param_2) '
+ 'WHERE ora_rn > :param_2)',
+ checkparams={'param_1': 10, 'param_2': 20})
+
+ self.assert_compile(s2,
+ 'SELECT col1, col2 FROM (SELECT col1, col2 '
+ 'FROM (SELECT col1, col2, ROWNUM AS ora_rn '
+ 'FROM (SELECT sometable.col1 AS col1, '
+ 'sometable.col2 AS col2 FROM sometable) '
+ 'WHERE ROWNUM <= :param_1 + :param_2) '
+ 'WHERE ora_rn > :param_2)')
+ c = s2.compile(dialect=oracle.OracleDialect())
+ eq_(len(c._result_columns), 2)
+ assert s.c.col1 in set(c._create_result_map()['col1'][1])
+
+ s = select([t]).limit(10).offset(20).order_by(t.c.col2)
+ self.assert_compile(s,
+ 'SELECT col1, col2 FROM (SELECT col1, '
+ 'col2, ROWNUM AS ora_rn FROM (SELECT '
+ 'sometable.col1 AS col1, sometable.col2 AS '
+ 'col2 FROM sometable ORDER BY '
+ 'sometable.col2) WHERE ROWNUM <= '
+ ':param_1 + :param_2) WHERE ora_rn > :param_2',
+ checkparams={'param_1': 10, 'param_2': 20}
+ )
+ c = s.compile(dialect=oracle.OracleDialect())
+ eq_(len(c._result_columns), 2)
+ assert t.c.col1 in set(c._create_result_map()['col1'][1])
+
+ s = select([t], for_update=True).limit(10).order_by(t.c.col2)
+ self.assert_compile(s,
+ 'SELECT col1, col2 FROM (SELECT '
+ 'sometable.col1 AS col1, sometable.col2 AS '
+ 'col2 FROM sometable ORDER BY '
+ 'sometable.col2) WHERE ROWNUM <= :param_1 '
+ 'FOR UPDATE')
+
+ s = select([t],
+ for_update=True).limit(10).offset(20).order_by(t.c.col2)
+ self.assert_compile(s,
+ 'SELECT col1, col2 FROM (SELECT col1, '
+ 'col2, ROWNUM AS ora_rn FROM (SELECT '
+ 'sometable.col1 AS col1, sometable.col2 AS '
+ 'col2 FROM sometable ORDER BY '
+ 'sometable.col2) WHERE ROWNUM <= '
+ ':param_1 + :param_2) WHERE ora_rn > :param_2 FOR '
+ 'UPDATE')
+
+ def test_for_update(self):
+ table1 = table('mytable',
+ column('myid'), column('name'), column('description'))
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+
+ self.assert_compile(
+ table1
+ .select(table1.c.myid == 7)
+ .with_for_update(of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 "
+ "FOR UPDATE OF mytable.myid")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT")
+
+ self.assert_compile(
+ table1
+ .select(table1.c.myid == 7)
+ .with_for_update(nowait=True, of=table1.c.myid),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 "
+ "FOR UPDATE OF mytable.myid NOWAIT")
+
+ self.assert_compile(
+ table1
+ .select(table1.c.myid == 7)
+ .with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
+ "mytable.myid, mytable.name NOWAIT")
+
+ self.assert_compile(
+ table1.select(table1.c.myid == 7)
+ .with_for_update(skip_locked=True,
+ of=[table1.c.myid, table1.c.name]),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
+ "mytable.myid, mytable.name SKIP LOCKED")
+
+ # key_share has no effect
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).with_for_update(key_share=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+
+ # read has no effect
+ self.assert_compile(
+ table1
+ .select(table1.c.myid == 7)
+ .with_for_update(read=True, key_share=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+
+ ta = table1.alias()
+ self.assert_compile(
+ ta
+ .select(ta.c.myid == 7)
+ .with_for_update(of=[ta.c.myid, ta.c.name]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable mytable_1 "
+ "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF "
+ "mytable_1.myid, mytable_1.name"
+ )
+
+ def test_for_update_of_w_limit_adaption_col_present(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.name]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10),
+ "SELECT myid, name FROM "
+ "(SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_adaption_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10),
+ "SELECT myid FROM "
+ "(SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_col_present(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.name]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10).offset(50),
+ "SELECT myid, name FROM (SELECT myid, name, ROWNUM AS ora_rn "
+ "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10).offset(50),
+ "SELECT myid FROM (SELECT myid, ROWNUM AS ora_rn, name "
+ "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('foo'), column('bar'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.bar]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=[table1.c.foo, table1.c.bar]).
+ limit(10).offset(50),
+ "SELECT myid, bar FROM (SELECT myid, bar, ROWNUM AS ora_rn, "
+ "foo FROM (SELECT mytable.myid AS myid, mytable.bar AS bar, "
+ "mytable.foo AS foo FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF foo, bar NOWAIT"
+ )
+
+ def test_limit_preserves_typing_information(self):
+ class MyType(TypeDecorator):
+ impl = Integer
+
+ stmt = select([type_coerce(column('x'), MyType).label('foo')]).limit(1)
+ dialect = oracle.dialect()
+ compiled = stmt.compile(dialect=dialect)
+ assert isinstance(compiled._create_result_map()['foo'][-1], MyType)
+
+ def test_use_binds_for_limits_disabled(self):
+ t = table('sometable', column('col1'), column('col2'))
+ dialect = oracle.OracleDialect(use_binds_for_limits=False)
+
+ self.assert_compile(
+ select([t]).limit(10),
+ "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
+ "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM <= 10",
+ dialect=dialect)
+
+ self.assert_compile(
+ select([t]).offset(10),
+ "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
+ "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
+ "FROM sometable)) WHERE ora_rn > 10",
+ dialect=dialect)
+
+ self.assert_compile(
+ select([t]).limit(10).offset(10),
+ "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
+ "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
+ "FROM sometable) WHERE ROWNUM <= 20) WHERE ora_rn > 10",
+ dialect=dialect)
+
+ def test_use_binds_for_limits_enabled(self):
+ t = table('sometable', column('col1'), column('col2'))
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
+
+ self.assert_compile(
+ select([t]).limit(10),
+ "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
+ "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM "
+ "<= :param_1",
+ dialect=dialect)
+
+ self.assert_compile(
+ select([t]).offset(10),
+ "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
+ "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
+ "FROM sometable)) WHERE ora_rn > :param_1",
+ dialect=dialect)
+
+ self.assert_compile(
+ select([t]).limit(10).offset(10),
+ "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
+ "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
+ "FROM sometable) WHERE ROWNUM <= :param_1 + :param_2) "
+ "WHERE ora_rn > :param_2",
+ dialect=dialect,
+ checkparams={'param_1': 10, 'param_2': 10})
+
+ def test_long_labels(self):
+ dialect = default.DefaultDialect()
+ dialect.max_identifier_length = 30
+
+ ora_dialect = oracle.dialect()
+
+ m = MetaData()
+ a_table = Table(
+ 'thirty_characters_table_xxxxxx',
+ m,
+ Column('id', Integer, primary_key=True)
+ )
+
+ other_table = Table(
+ 'other_thirty_characters_table_',
+ m,
+ Column('id', Integer, primary_key=True),
+ Column('thirty_characters_table_id',
+ Integer,
+ ForeignKey('thirty_characters_table_xxxxxx.id'),
+ primary_key=True))
+
+ anon = a_table.alias()
+ self.assert_compile(select([other_table,
+ anon]).
+ select_from(
+ other_table.outerjoin(anon)).apply_labels(),
+ 'SELECT other_thirty_characters_table_.id '
+ 'AS other_thirty_characters__1, '
+ 'other_thirty_characters_table_.thirty_char'
+ 'acters_table_id AS other_thirty_characters'
+ '__2, thirty_characters_table__1.id AS '
+ 'thirty_characters_table__3 FROM '
+ 'other_thirty_characters_table_ LEFT OUTER '
+ 'JOIN thirty_characters_table_xxxxxx AS '
+ 'thirty_characters_table__1 ON '
+ 'thirty_characters_table__1.id = '
+ 'other_thirty_characters_table_.thirty_char'
+ 'acters_table_id', dialect=dialect)
+ self.assert_compile(select([other_table,
+ anon]).select_from(
+ other_table.outerjoin(anon)).apply_labels(),
+ 'SELECT other_thirty_characters_table_.id '
+ 'AS other_thirty_characters__1, '
+ 'other_thirty_characters_table_.thirty_char'
+ 'acters_table_id AS other_thirty_characters'
+ '__2, thirty_characters_table__1.id AS '
+ 'thirty_characters_table__3 FROM '
+ 'other_thirty_characters_table_ LEFT OUTER '
+ 'JOIN thirty_characters_table_xxxxxx '
+ 'thirty_characters_table__1 ON '
+ 'thirty_characters_table__1.id = '
+ 'other_thirty_characters_table_.thirty_char'
+ 'acters_table_id', dialect=ora_dialect)
+
+ def test_outer_join(self):
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String),
+ column('description', String))
+
+ table2 = table(
+ 'myothertable',
+ column('otherid', Integer),
+ column('othername', String),
+ )
+
+ table3 = table(
+ 'thirdtable',
+ column('userid', Integer),
+ column('otherstuff', String),
+ )
+
+ query = select([table1, table2],
+ or_(table1.c.name == 'fred',
+ table1.c.myid == 10, table2.c.othername != 'jack',
+ text('EXISTS (select yay from foo where boo = lar)')
+ ),
+ from_obj=[outerjoin(table1,
+ table2,
+ table1.c.myid == table2.c.otherid)])
+ self.assert_compile(query,
+ 'SELECT mytable.myid, mytable.name, '
+ 'mytable.description, myothertable.otherid,'
+ ' myothertable.othername FROM mytable, '
+ 'myothertable WHERE (mytable.name = '
+ ':name_1 OR mytable.myid = :myid_1 OR '
+ 'myothertable.othername != :othername_1 OR '
+ 'EXISTS (select yay from foo where boo = '
+ 'lar)) AND mytable.myid = '
+ 'myothertable.otherid(+)',
+ dialect=oracle.OracleDialect(use_ansi=False))
+ query = table1.outerjoin(table2,
+ table1.c.myid == table2.c.otherid) \
+ .outerjoin(table3, table3.c.userid == table2.c.otherid)
+ self.assert_compile(query.select(),
+ 'SELECT mytable.myid, mytable.name, '
+ 'mytable.description, myothertable.otherid,'
+ ' myothertable.othername, '
+ 'thirdtable.userid, thirdtable.otherstuff '
+ 'FROM mytable LEFT OUTER JOIN myothertable '
+ 'ON mytable.myid = myothertable.otherid '
+ 'LEFT OUTER JOIN thirdtable ON '
+ 'thirdtable.userid = myothertable.otherid')
+
+ self.assert_compile(query.select(),
+ 'SELECT mytable.myid, mytable.name, '
+ 'mytable.description, myothertable.otherid,'
+ ' myothertable.othername, '
+ 'thirdtable.userid, thirdtable.otherstuff '
+ 'FROM mytable, myothertable, thirdtable '
+ 'WHERE thirdtable.userid(+) = '
+ 'myothertable.otherid AND mytable.myid = '
+ 'myothertable.otherid(+)',
+ dialect=oracle.dialect(use_ansi=False))
+ query = table1.join(table2,
+ table1.c.myid == table2.c.otherid) \
+ .join(table3, table3.c.userid == table2.c.otherid)
+ self.assert_compile(query.select(),
+ 'SELECT mytable.myid, mytable.name, '
+ 'mytable.description, myothertable.otherid,'
+ ' myothertable.othername, '
+ 'thirdtable.userid, thirdtable.otherstuff '
+ 'FROM mytable, myothertable, thirdtable '
+ 'WHERE thirdtable.userid = '
+ 'myothertable.otherid AND mytable.myid = '
+ 'myothertable.otherid',
+ dialect=oracle.dialect(use_ansi=False))
+ query = table1.join(table2,
+ table1.c.myid == table2.c.otherid) \
+ .outerjoin(table3, table3.c.userid == table2.c.otherid)
+ self.assert_compile(query.select().order_by(table1.c.name).
+ limit(10).offset(5),
+ 'SELECT myid, name, description, otherid, '
+ 'othername, userid, otherstuff FROM '
+ '(SELECT myid, name, description, otherid, '
+ 'othername, userid, otherstuff, ROWNUM AS '
+ 'ora_rn FROM (SELECT mytable.myid AS myid, '
+ 'mytable.name AS name, mytable.description '
+ 'AS description, myothertable.otherid AS '
+ 'otherid, myothertable.othername AS '
+ 'othername, thirdtable.userid AS userid, '
+ 'thirdtable.otherstuff AS otherstuff FROM '
+ 'mytable, myothertable, thirdtable WHERE '
+ 'thirdtable.userid(+) = '
+ 'myothertable.otherid AND mytable.myid = '
+ 'myothertable.otherid ORDER BY mytable.name) '
+ 'WHERE ROWNUM <= :param_1 + :param_2) '
+ 'WHERE ora_rn > :param_2',
+ checkparams={'param_1': 10, 'param_2': 5},
+ dialect=oracle.dialect(use_ansi=False))
+
+ subq = select([table1]).select_from(
+ table1.outerjoin(table2, table1.c.myid == table2.c.otherid)) \
+ .alias()
+ q = select([table3]).select_from(
+ table3.outerjoin(subq, table3.c.userid == subq.c.myid))
+
+ self.assert_compile(q,
+ 'SELECT thirdtable.userid, '
+ 'thirdtable.otherstuff FROM thirdtable '
+ 'LEFT OUTER JOIN (SELECT mytable.myid AS '
+ 'myid, mytable.name AS name, '
+ 'mytable.description AS description FROM '
+ 'mytable LEFT OUTER JOIN myothertable ON '
+ 'mytable.myid = myothertable.otherid) '
+ 'anon_1 ON thirdtable.userid = anon_1.myid',
+ dialect=oracle.dialect(use_ansi=True))
+
+ self.assert_compile(q,
+ 'SELECT thirdtable.userid, '
+ 'thirdtable.otherstuff FROM thirdtable, '
+ '(SELECT mytable.myid AS myid, '
+ 'mytable.name AS name, mytable.description '
+ 'AS description FROM mytable, myothertable '
+ 'WHERE mytable.myid = myothertable.otherid('
+ '+)) anon_1 WHERE thirdtable.userid = '
+ 'anon_1.myid(+)',
+ dialect=oracle.dialect(use_ansi=False))
+
+ q = select([table1.c.name]).where(table1.c.name == 'foo')
+ self.assert_compile(q,
+ 'SELECT mytable.name FROM mytable WHERE '
+ 'mytable.name = :name_1',
+ dialect=oracle.dialect(use_ansi=False))
+ subq = select([table3.c.otherstuff]) \
+ .where(table3.c.otherstuff == table1.c.name).label('bar')
+ q = select([table1.c.name, subq])
+ self.assert_compile(q,
+ 'SELECT mytable.name, (SELECT '
+ 'thirdtable.otherstuff FROM thirdtable '
+ 'WHERE thirdtable.otherstuff = '
+ 'mytable.name) AS bar FROM mytable',
+ dialect=oracle.dialect(use_ansi=False))
+
+ def test_nonansi_nested_right_join(self):
+ a = table('a', column('a'))
+ b = table('b', column('b'))
+ c = table('c', column('c'))
+
+ j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
+
+ self.assert_compile(
+ select([j]),
+ "SELECT a.a, b.b, c.c FROM a, b, c "
+ "WHERE a.a = b.b AND b.b = c.c",
+ dialect=oracle.OracleDialect(use_ansi=False)
+ )
+
+ j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
+
+ self.assert_compile(
+ select([j]),
+ "SELECT a.a, b.b, c.c FROM a, b, c "
+ "WHERE a.a = b.b(+) AND b.b = c.c",
+ dialect=oracle.OracleDialect(use_ansi=False)
+ )
+
+ j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b)
+
+ self.assert_compile(
+ select([j]),
+ "SELECT a.a, b.b, c.c FROM a, b, c "
+ "WHERE a.a = b.b AND b.b = c.c(+)",
+ dialect=oracle.OracleDialect(use_ansi=False)
+ )
+
+ def test_alias_outer_join(self):
+ address_types = table('address_types', column('id'),
+ column('name'))
+ addresses = table('addresses', column('id'), column('user_id'),
+ column('address_type_id'),
+ column('email_address'))
+ at_alias = address_types.alias()
+ s = select([at_alias, addresses]) \
+ .select_from(
+ addresses.outerjoin(
+ at_alias,
+ addresses.c.address_type_id == at_alias.c.id)) \
+ .where(addresses.c.user_id == 7) \
+ .order_by(addresses.c.id, address_types.c.id)
+ self.assert_compile(s,
+ 'SELECT address_types_1.id, '
+ 'address_types_1.name, addresses.id, '
+ 'addresses.user_id, addresses.address_type_'
+ 'id, addresses.email_address FROM '
+ 'addresses LEFT OUTER JOIN address_types '
+ 'address_types_1 ON addresses.address_type_'
+ 'id = address_types_1.id WHERE '
+ 'addresses.user_id = :user_id_1 ORDER BY '
+ 'addresses.id, address_types.id')
+
+ def test_returning_insert(self):
+ t1 = table('t1', column('c1'), column('c2'), column('c3'))
+ self.assert_compile(
+ t1.insert().values(c1=1).returning(t1.c.c2, t1.c.c3),
+ "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
+ "t1.c2, t1.c3 INTO :ret_0, :ret_1")
+
+ def test_returning_insert_functional(self):
+ t1 = table('t1',
+ column('c1'),
+ column('c2', String()),
+ column('c3', String()))
+ fn = func.lower(t1.c.c2, type_=String())
+ stmt = t1.insert().values(c1=1).returning(fn, t1.c.c3)
+ compiled = stmt.compile(dialect=oracle.dialect())
+ eq_(compiled._create_result_map(),
+ {'c3': ('c3', (t1.c.c3, 'c3', 'c3'), t1.c.c3.type),
+ 'lower': ('lower', (fn, 'lower', None), fn.type)})
+
+ self.assert_compile(
+ stmt,
+ "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
+ "lower(t1.c2), t1.c3 INTO :ret_0, :ret_1")
+
+ def test_returning_insert_labeled(self):
+ t1 = table('t1', column('c1'), column('c2'), column('c3'))
+ self.assert_compile(
+ t1.insert().values(c1=1).returning(
+ t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
+ "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
+ "t1.c2, t1.c3 INTO :ret_0, :ret_1")
+
+ def test_compound(self):
+ t1 = table('t1', column('c1'), column('c2'), column('c3'))
+ t2 = table('t2', column('c1'), column('c2'), column('c3'))
+ self.assert_compile(union(t1.select(), t2.select()),
+ 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 UNION '
+ 'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
+ self.assert_compile(except_(t1.select(), t2.select()),
+ 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 MINUS '
+ 'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
+
+ def test_no_paren_fns(self):
+ for fn, expected in [
+ (func.uid(), "uid"),
+ (func.UID(), "UID"),
+ (func.sysdate(), "sysdate"),
+ (func.row_number(), "row_number()"),
+ (func.rank(), "rank()"),
+ (func.now(), "CURRENT_TIMESTAMP"),
+ (func.current_timestamp(), "CURRENT_TIMESTAMP"),
+ (func.user(), "USER"),
+ ]:
+ self.assert_compile(fn, expected)
+
+ def test_create_index_alt_schema(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer),
+ schema="alt_schema")
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x)),
+ "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)"
+ )
+
+ def test_create_index_expr(self):
+ m = MetaData()
+ t1 = Table('foo', m,
+ Column('x', Integer))
+ self.assert_compile(
+ schema.CreateIndex(Index("bar", t1.c.x > 5)),
+ "CREATE INDEX bar ON foo (x > 5)"
+ )
+
+ def test_table_options(self):
+ m = MetaData()
+
+ t = Table(
+ 'foo', m,
+ Column('x', Integer),
+ prefixes=["GLOBAL TEMPORARY"],
+ oracle_on_commit="PRESERVE ROWS"
+ )
+
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE GLOBAL TEMPORARY TABLE "
+ "foo (x INTEGER) ON COMMIT PRESERVE ROWS"
+ )
+
+ def test_create_table_compress(self):
+ m = MetaData()
+ tbl1 = Table('testtbl1', m, Column('data', Integer),
+ oracle_compress=True)
+ tbl2 = Table('testtbl2', m, Column('data', Integer),
+ oracle_compress="OLTP")
+
+ self.assert_compile(schema.CreateTable(tbl1),
+ "CREATE TABLE testtbl1 (data INTEGER) COMPRESS")
+ self.assert_compile(schema.CreateTable(tbl2),
+ "CREATE TABLE testtbl2 (data INTEGER) "
+ "COMPRESS FOR OLTP")
+
+ def test_create_index_bitmap_compress(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', Integer))
+ idx1 = Index('idx1', tbl.c.data, oracle_compress=True)
+ idx2 = Index('idx2', tbl.c.data, oracle_compress=1)
+ idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True)
+
+ self.assert_compile(schema.CreateIndex(idx1),
+ "CREATE INDEX idx1 ON testtbl (data) COMPRESS")
+ self.assert_compile(schema.CreateIndex(idx2),
+ "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1")
+ self.assert_compile(schema.CreateIndex(idx3),
+ "CREATE BITMAP INDEX idx3 ON testtbl (data)")
+
+
+class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
+
+ def test_basic(self):
+ seq = Sequence('my_seq_no_schema')
+ dialect = oracle.OracleDialect()
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'my_seq_no_schema'
+ seq = Sequence('my_seq', schema='some_schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'some_schema.my_seq'
+ seq = Sequence('My_Seq', schema='Some_Schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == '"Some_Schema"."My_Seq"'
+
+
diff --git a/test/dialect/oracle/test_dialect.py b/test/dialect/oracle/test_dialect.py
new file mode 100644
index 000000000..83a875c2e
--- /dev/null
+++ b/test/dialect/oracle/test_dialect.py
@@ -0,0 +1,316 @@
+# coding: utf-8
+
+
+from sqlalchemy.testing import eq_
+from sqlalchemy import types as sqltypes, exc, schema
+from sqlalchemy.sql import table, column
+from sqlalchemy.testing import (fixtures,
+ AssertsExecutionResults,
+ AssertsCompiledSQL)
+from sqlalchemy import testing
+from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\
+ Index, MetaData, select, inspect, ForeignKey, String, func, \
+ TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \
+ literal_column, VARCHAR, create_engine, Date, NVARCHAR, \
+ ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
+ union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
+ PrimaryKeyConstraint, FLOAT
+from sqlalchemy.util import u, b
+from sqlalchemy import util
+from sqlalchemy.testing import assert_raises, assert_raises_message
+from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
+from sqlalchemy.engine import default
+import decimal
+from sqlalchemy.engine import url
+from sqlalchemy.testing.schema import Table, Column
+import datetime
+import os
+from sqlalchemy import sql
+from sqlalchemy.testing.mock import Mock
+
+
+class DialectTest(fixtures.TestBase):
+ def test_cx_oracle_version_parse(self):
+ dialect = cx_oracle.OracleDialect_cx_oracle()
+
+ eq_(
+ dialect._parse_cx_oracle_ver("5.2"),
+ (5, 2)
+ )
+
+ eq_(
+ dialect._parse_cx_oracle_ver("5.0.1"),
+ (5, 0, 1)
+ )
+
+ eq_(
+ dialect._parse_cx_oracle_ver("6.0b1"),
+ (6, 0)
+ )
+
+
+class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
+ __only_on__ = 'oracle+cx_oracle'
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ testing.db.execute("""
+ create or replace procedure foo(x_in IN number, x_out OUT number,
+ y_out OUT number, z_out OUT varchar) IS
+ retval number;
+ begin
+ retval := 6;
+ x_out := 10;
+ y_out := x_in * 15;
+ z_out := NULL;
+ end;
+ """)
+
+ def test_out_params(self):
+ result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
+ ':z_out); end;',
+ bindparams=[bindparam('x_in', Float),
+ outparam('x_out', Integer),
+ outparam('y_out', Float),
+ outparam('z_out', String)]),
+ x_in=5)
+ eq_(result.out_parameters,
+ {'x_out': 10, 'y_out': 75, 'z_out': None})
+ assert isinstance(result.out_parameters['x_out'], int)
+
+ @classmethod
+ def teardown_class(cls):
+ testing.db.execute("DROP PROCEDURE foo")
+
+
+class QuotedBindRoundTripTest(fixtures.TestBase):
+
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @testing.provide_metadata
+ def test_table_round_trip(self):
+ oracle.RESERVED_WORDS.remove('UNION')
+
+ metadata = self.metadata
+ table = Table("t1", metadata,
+ Column("option", Integer),
+ Column("plain", Integer, quote=True),
+ # test that quote works for a reserved word
+ # that the dialect isn't aware of when quote
+ # is set
+ Column("union", Integer, quote=True))
+ metadata.create_all()
+
+ table.insert().execute(
+ {"option": 1, "plain": 1, "union": 1}
+ )
+ eq_(
+ testing.db.execute(table.select()).first(),
+ (1, 1, 1)
+ )
+ table.update().values(option=2, plain=2, union=2).execute()
+ eq_(
+ testing.db.execute(table.select()).first(),
+ (2, 2, 2)
+ )
+
+ def test_numeric_bind_round_trip(self):
+ eq_(
+ testing.db.scalar(
+ select([
+ literal_column("2", type_=Integer()) +
+ bindparam("2_1", value=2)])
+ ),
+ 4
+ )
+
+ @testing.provide_metadata
+ def test_numeric_bind_in_crud(self):
+ t = Table(
+ "asfd", self.metadata,
+ Column("100K", Integer)
+ )
+ t.create()
+
+ testing.db.execute(t.insert(), {"100K": 10})
+ eq_(
+ testing.db.scalar(t.select()), 10
+ )
+
+
+class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
+
+ def _dialect(self, server_version, **kw):
+ def server_version_info(conn):
+ return server_version
+
+ dialect = oracle.dialect(
+ dbapi=Mock(version="0.0.0", paramstyle="named"),
+ **kw)
+ dialect._get_server_version_info = server_version_info
+ dialect._check_unicode_returns = Mock()
+ dialect._check_unicode_description = Mock()
+ dialect._get_default_schema_name = Mock()
+ dialect._detect_decimal_char = Mock()
+ return dialect
+
+ def test_ora8_flags(self):
+ dialect = self._dialect((8, 2, 5))
+
+ # before connect, assume modern DB
+ assert dialect._supports_char_length
+ assert dialect._supports_nchar
+ assert dialect.use_ansi
+
+ dialect.initialize(Mock())
+ assert not dialect.implicit_returning
+ assert not dialect._supports_char_length
+ assert not dialect._supports_nchar
+ assert not dialect.use_ansi
+ self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "CLOB", dialect=dialect)
+
+ dialect = self._dialect((8, 2, 5), implicit_returning=True)
+ dialect.initialize(testing.db.connect())
+ assert dialect.implicit_returning
+
+ def test_default_flags(self):
+ """test with no initialization or server version info"""
+
+ dialect = self._dialect(None)
+
+ assert dialect._supports_char_length
+ assert dialect._supports_nchar
+ assert dialect.use_ansi
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
+
+ def test_ora10_flags(self):
+ dialect = self._dialect((10, 2, 5))
+
+ dialect.initialize(Mock())
+ assert dialect._supports_char_length
+ assert dialect._supports_nchar
+ assert dialect.use_ansi
+ self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+ self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+ self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
+
+
+class ExecuteTest(fixtures.TestBase):
+
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ def test_basic(self):
+ eq_(testing.db.execute('/*+ this is a comment */ SELECT 1 FROM '
+ 'DUAL').fetchall(), [(1, )])
+
+ def test_sequences_are_integers(self):
+ seq = Sequence('foo_seq')
+ seq.create(testing.db)
+ try:
+ val = testing.db.execute(seq)
+ eq_(val, 1)
+ assert type(val) is int
+ finally:
+ seq.drop(testing.db)
+
+ @testing.provide_metadata
+ def test_limit_offset_for_update(self):
+ metadata = self.metadata
+ # oracle can't actually do the ROWNUM thing with FOR UPDATE
+ # very well.
+
+ t = Table('t1',
+ metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', Integer))
+ metadata.create_all()
+
+ t.insert().execute(
+ {'id': 1, 'data': 1},
+ {'id': 2, 'data': 7},
+ {'id': 3, 'data': 12},
+ {'id': 4, 'data': 15},
+ {'id': 5, 'data': 32},
+ )
+
+ # here, we can't use ORDER BY.
+ eq_(
+ t.select(for_update=True).limit(2).execute().fetchall(),
+ [(1, 1),
+ (2, 7)]
+ )
+
+ # here, its impossible. But we'd prefer it to raise ORA-02014
+ # instead of issuing a syntax error.
+ assert_raises_message(
+ exc.DatabaseError,
+ "ORA-02014",
+ t.select(for_update=True).limit(2).offset(3).execute
+ )
+
+
+class UnicodeSchemaTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @testing.provide_metadata
+ def test_quoted_column_non_unicode(self):
+ metadata = self.metadata
+ table = Table("atable", metadata,
+ Column("_underscorecolumn",
+ Unicode(255),
+ primary_key=True))
+ metadata.create_all()
+
+ table.insert().execute(
+ {'_underscorecolumn': u('’é')},
+ )
+ result = testing.db.execute(
+ table.select().where(table.c._underscorecolumn == u('’é'))
+ ).scalar()
+ eq_(result, u('’é'))
+
+ @testing.provide_metadata
+ def test_quoted_column_unicode(self):
+ metadata = self.metadata
+ table = Table("atable", metadata,
+ Column(u("méil"), Unicode(255), primary_key=True))
+ metadata.create_all()
+
+ table.insert().execute(
+ {u('méil'): u('’é')},
+ )
+ result = testing.db.execute(
+ table.select().where(table.c[u('méil')] == u('’é'))
+ ).scalar()
+ eq_(result, u('’é'))
+
+
+class ServiceNameTest(fixtures.TestBase):
+ __only_on__ = 'oracle+cx_oracle'
+ __backend__ = True
+
+ def test_cx_oracle_service_name(self):
+ url_string = 'oracle+cx_oracle://scott:tiger@host/?service_name=hr'
+ eng = create_engine(url_string, _initialize=False)
+ cargs, cparams = eng.dialect.create_connect_args(eng.url)
+
+ assert 'SERVICE_NAME=hr' in cparams['dsn']
+ assert 'SID=hr' not in cparams['dsn']
+
+ def test_cx_oracle_service_name_bad(self):
+ url_string = 'oracle+cx_oracle://scott:tiger@host/hr1?service_name=hr2'
+ assert_raises(
+ exc.InvalidRequestError,
+ create_engine, url_string,
+ _initialize=False
+ )
+
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py
new file mode 100644
index 000000000..d09f12e60
--- /dev/null
+++ b/test/dialect/oracle/test_reflection.py
@@ -0,0 +1,536 @@
+# coding: utf-8
+
+
+from sqlalchemy.testing import eq_
+from sqlalchemy import exc
+from sqlalchemy.sql import table
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL
+from sqlalchemy import testing
+from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\
+ Index, MetaData, select, inspect, ForeignKey, String, func, \
+ TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \
+ literal_column, VARCHAR, create_engine, Date, NVARCHAR, \
+ ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
+ union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
+ PrimaryKeyConstraint, FLOAT
+from sqlalchemy.testing import assert_raises
+from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.testing.schema import Table, Column
+
+
+class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ # currently assuming full DBA privs for the user.
+ # don't really know how else to go here unless
+ # we connect as the other user.
+
+ for stmt in ("""
+create table %(test_schema)s.parent(
+ id integer primary key,
+ data varchar2(50)
+);
+
+create table %(test_schema)s.child(
+ id integer primary key,
+ data varchar2(50),
+ parent_id integer references %(test_schema)s.parent(id)
+);
+
+create table local_table(
+ id integer primary key,
+ data varchar2(50)
+);
+
+create synonym %(test_schema)s.ptable for %(test_schema)s.parent;
+create synonym %(test_schema)s.ctable for %(test_schema)s.child;
+
+create synonym %(test_schema)s_pt for %(test_schema)s.parent;
+
+create synonym %(test_schema)s.local_table for local_table;
+
+-- can't make a ref from local schema to the
+-- remote schema's table without this,
+-- *and* cant give yourself a grant !
+-- so we give it to public. ideas welcome.
+grant references on %(test_schema)s.parent to public;
+grant references on %(test_schema)s.child to public;
+""" % {"test_schema": testing.config.test_schema}).split(";"):
+ if stmt.strip():
+ testing.db.execute(stmt)
+
+ @classmethod
+ def teardown_class(cls):
+ for stmt in ("""
+drop table %(test_schema)s.child;
+drop table %(test_schema)s.parent;
+drop table local_table;
+drop synonym %(test_schema)s.ctable;
+drop synonym %(test_schema)s.ptable;
+drop synonym %(test_schema)s_pt;
+drop synonym %(test_schema)s.local_table;
+
+""" % {"test_schema": testing.config.test_schema}).split(";"):
+ if stmt.strip():
+ testing.db.execute(stmt)
+
+ @testing.provide_metadata
+ def test_create_same_names_explicit_schema(self):
+ schema = testing.db.dialect.default_schema_name
+ meta = self.metadata
+ parent = Table('parent', meta,
+ Column('pid', Integer, primary_key=True),
+ schema=schema)
+ child = Table('child', meta,
+ Column('cid', Integer, primary_key=True),
+ Column('pid',
+ Integer,
+ ForeignKey('%s.parent.pid' % schema)),
+ schema=schema)
+ meta.create_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
+
+ def test_reflect_alt_table_owner_local_synonym(self):
+ meta = MetaData(testing.db)
+ parent = Table('%s_pt' % testing.config.test_schema,
+ meta,
+ autoload=True,
+ oracle_resolve_synonyms=True)
+ self.assert_compile(parent.select(),
+ "SELECT %(test_schema)s_pt.id, "
+ "%(test_schema)s_pt.data FROM %(test_schema)s_pt"
+ % {"test_schema": testing.config.test_schema})
+ select([parent]).execute().fetchall()
+
+ def test_reflect_alt_synonym_owner_local_table(self):
+ meta = MetaData(testing.db)
+ parent = Table(
+ 'local_table', meta, autoload=True,
+ oracle_resolve_synonyms=True, schema=testing.config.test_schema)
+ self.assert_compile(
+ parent.select(),
+ "SELECT %(test_schema)s.local_table.id, "
+ "%(test_schema)s.local_table.data "
+ "FROM %(test_schema)s.local_table" %
+ {"test_schema": testing.config.test_schema}
+ )
+ select([parent]).execute().fetchall()
+
+ @testing.provide_metadata
+ def test_create_same_names_implicit_schema(self):
+ meta = self.metadata
+ parent = Table('parent',
+ meta,
+ Column('pid', Integer, primary_key=True))
+ child = Table('child', meta,
+ Column('cid', Integer, primary_key=True),
+ Column('pid', Integer, ForeignKey('parent.pid')))
+ meta.create_all()
+ parent.insert().execute({'pid': 1})
+ child.insert().execute({'cid': 1, 'pid': 1})
+ eq_(child.select().execute().fetchall(), [(1, 1)])
+
+ def test_reflect_alt_owner_explicit(self):
+ meta = MetaData(testing.db)
+ parent = Table(
+ 'parent', meta, autoload=True,
+ schema=testing.config.test_schema)
+ child = Table(
+ 'child', meta, autoload=True,
+ schema=testing.config.test_schema)
+
+ self.assert_compile(
+ parent.join(child),
+ "%(test_schema)s.parent JOIN %(test_schema)s.child ON "
+ "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % {
+ "test_schema": testing.config.test_schema
+ })
+ select([parent, child]).\
+ select_from(parent.join(child)).\
+ execute().fetchall()
+
+ def test_reflect_local_to_remote(self):
+ testing.db.execute(
+ 'CREATE TABLE localtable (id INTEGER '
+ 'PRIMARY KEY, parent_id INTEGER REFERENCES '
+ '%(test_schema)s.parent(id))' % {
+ "test_schema": testing.config.test_schema})
+ try:
+ meta = MetaData(testing.db)
+ lcl = Table('localtable', meta, autoload=True)
+ parent = meta.tables['%s.parent' % testing.config.test_schema]
+ self.assert_compile(parent.join(lcl),
+ '%(test_schema)s.parent JOIN localtable ON '
+ '%(test_schema)s.parent.id = '
+ 'localtable.parent_id' % {
+ "test_schema": testing.config.test_schema}
+ )
+ select([parent,
+ lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ finally:
+ testing.db.execute('DROP TABLE localtable')
+
+ def test_reflect_alt_owner_implicit(self):
+ meta = MetaData(testing.db)
+ parent = Table(
+ 'parent', meta, autoload=True,
+ schema=testing.config.test_schema)
+ child = Table(
+ 'child', meta, autoload=True,
+ schema=testing.config.test_schema)
+ self.assert_compile(
+ parent.join(child),
+ '%(test_schema)s.parent JOIN %(test_schema)s.child '
+ 'ON %(test_schema)s.parent.id = '
+ '%(test_schema)s.child.parent_id' % {
+ "test_schema": testing.config.test_schema})
+ select([parent,
+ child]).select_from(parent.join(child)).execute().fetchall()
+
+ def test_reflect_alt_owner_synonyms(self):
+ testing.db.execute('CREATE TABLE localtable (id INTEGER '
+ 'PRIMARY KEY, parent_id INTEGER REFERENCES '
+ '%s.ptable(id))' % testing.config.test_schema)
+ try:
+ meta = MetaData(testing.db)
+ lcl = Table('localtable', meta, autoload=True,
+ oracle_resolve_synonyms=True)
+ parent = meta.tables['%s.ptable' % testing.config.test_schema]
+ self.assert_compile(
+ parent.join(lcl),
+ '%(test_schema)s.ptable JOIN localtable ON '
+ '%(test_schema)s.ptable.id = '
+ 'localtable.parent_id' % {
+ "test_schema": testing.config.test_schema})
+ select([parent,
+ lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ finally:
+ testing.db.execute('DROP TABLE localtable')
+
+ def test_reflect_remote_synonyms(self):
+ meta = MetaData(testing.db)
+ parent = Table('ptable', meta, autoload=True,
+ schema=testing.config.test_schema,
+ oracle_resolve_synonyms=True)
+ child = Table('ctable', meta, autoload=True,
+ schema=testing.config.test_schema,
+ oracle_resolve_synonyms=True)
+ self.assert_compile(
+ parent.join(child),
+ '%(test_schema)s.ptable JOIN '
+ '%(test_schema)s.ctable '
+ 'ON %(test_schema)s.ptable.id = '
+ '%(test_schema)s.ctable.parent_id' % {
+ "test_schema": testing.config.test_schema})
+ select([parent,
+ child]).select_from(parent.join(child)).execute().fetchall()
+
+
+class ConstraintTest(fixtures.TablesTest):
+
+ __only_on__ = 'oracle'
+ __backend__ = True
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('foo', metadata, Column('id', Integer, primary_key=True))
+
+ def test_oracle_has_no_on_update_cascade(self):
+ bar = Table('bar', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo_id',
+ Integer,
+ ForeignKey('foo.id', onupdate='CASCADE')))
+ assert_raises(exc.SAWarning, bar.create)
+
+ bat = Table('bat', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo_id', Integer),
+ ForeignKeyConstraint(['foo_id'], ['foo.id'],
+ onupdate='CASCADE'))
+ assert_raises(exc.SAWarning, bat.create)
+
+ def test_reflect_check_include_all(self):
+ insp = inspect(testing.db)
+ eq_(insp.get_check_constraints('foo'), [])
+ eq_(
+ [rec['sqltext']
+ for rec in insp.get_check_constraints('foo', include_all=True)],
+ ['"ID" IS NOT NULL'])
+
+
+class SystemTableTablenamesTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ def setup(self):
+ testing.db.execute("create table my_table (id integer)")
+ testing.db.execute(
+ "create global temporary table my_temp_table (id integer)"
+ )
+ testing.db.execute(
+ "create table foo_table (id integer) tablespace SYSTEM"
+ )
+
+ def teardown(self):
+ testing.db.execute("drop table my_temp_table")
+ testing.db.execute("drop table my_table")
+ testing.db.execute("drop table foo_table")
+
+ def test_table_names_no_system(self):
+ insp = inspect(testing.db)
+ eq_(
+ insp.get_table_names(), ["my_table"]
+ )
+
+ def test_temp_table_names_no_system(self):
+ insp = inspect(testing.db)
+ eq_(
+ insp.get_temp_table_names(), ["my_temp_table"]
+ )
+
+ def test_table_names_w_system(self):
+ engine = testing_engine(options={"exclude_tablespaces": ["FOO"]})
+ insp = inspect(engine)
+ eq_(
+ set(insp.get_table_names()).intersection(["my_table",
+ "foo_table"]),
+ set(["my_table", "foo_table"])
+ )
+
+
+class DontReflectIOTTest(fixtures.TestBase):
+ """test that index overflow tables aren't included in
+ table_names."""
+
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ def setup(self):
+ testing.db.execute("""
+ CREATE TABLE admin_docindex(
+ token char(20),
+ doc_id NUMBER,
+ token_frequency NUMBER,
+ token_offsets VARCHAR2(2000),
+ CONSTRAINT pk_admin_docindex PRIMARY KEY (token, doc_id))
+ ORGANIZATION INDEX
+ TABLESPACE users
+ PCTTHRESHOLD 20
+ OVERFLOW TABLESPACE users
+ """)
+
+ def teardown(self):
+ testing.db.execute("drop table admin_docindex")
+
+ def test_reflect_all(self):
+ m = MetaData(testing.db)
+ m.reflect()
+ eq_(
+ set(t.name for t in m.tables.values()),
+ set(['admin_docindex'])
+ )
+
+
+class UnsupportedIndexReflectTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @testing.emits_warning("No column names")
+ @testing.provide_metadata
+ def test_reflect_functional_index(self):
+ metadata = self.metadata
+ Table('test_index_reflect', metadata,
+ Column('data', String(20), primary_key=True))
+ metadata.create_all()
+
+ testing.db.execute('CREATE INDEX DATA_IDX ON '
+ 'TEST_INDEX_REFLECT (UPPER(DATA))')
+ m2 = MetaData(testing.db)
+ Table('test_index_reflect', m2, autoload=True)
+
+
+def all_tables_compression_missing():
+ try:
+ testing.db.execute('SELECT compression FROM all_tables')
+ if "Enterprise Edition" not in testing.db.scalar(
+ "select * from v$version"):
+ return True
+ return False
+ except Exception:
+ return True
+
+
+def all_tables_compress_for_missing():
+ try:
+ testing.db.execute('SELECT compress_for FROM all_tables')
+ if "Enterprise Edition" not in testing.db.scalar(
+ "select * from v$version"):
+ return True
+ return False
+ except Exception:
+ return True
+
+
+class TableReflectionTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compression_missing)
+ def test_reflect_basic_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress=True)
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ # Don't hardcode the exact value, but it must be non-empty
+ assert tbl.dialect_options['oracle']['compress']
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compress_for_missing)
+ def test_reflect_oltp_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress="OLTP")
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ assert tbl.dialect_options['oracle']['compress'] == "OLTP"
+
+
+class RoundTripIndexTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @testing.provide_metadata
+ def test_basic(self):
+ metadata = self.metadata
+
+ s_table = Table(
+ "sometable", metadata,
+ Column("id_a", Unicode(255), primary_key=True),
+ Column("id_b",
+ Unicode(255),
+ primary_key=True,
+ unique=True),
+ Column("group", Unicode(255), primary_key=True),
+ Column("col", Unicode(255)),
+ UniqueConstraint('col', 'group'))
+
+ # "group" is a keyword, so lower case
+ normalind = Index('tableind', s_table.c.id_b, s_table.c.group)
+ Index('compress1', s_table.c.id_a, s_table.c.id_b,
+ oracle_compress=True)
+ Index('compress2', s_table.c.id_a, s_table.c.id_b, s_table.c.col,
+ oracle_compress=1)
+
+ metadata.create_all()
+ mirror = MetaData(testing.db)
+ mirror.reflect()
+ metadata.drop_all()
+ mirror.create_all()
+
+ inspect = MetaData(testing.db)
+ inspect.reflect()
+
+ def obj_definition(obj):
+ return (obj.__class__,
+ tuple([c.name for c in obj.columns]),
+ getattr(obj, 'unique', None))
+
+ # find what the primary k constraint name should be
+ primaryconsname = testing.db.scalar(
+ text(
+ """SELECT constraint_name
+ FROM all_constraints
+ WHERE table_name = :table_name
+ AND owner = :owner
+ AND constraint_type = 'P' """),
+ table_name=s_table.name.upper(),
+ owner=testing.db.dialect.default_schema_name.upper())
+
+ reflectedtable = inspect.tables[s_table.name]
+
+ # make a dictionary of the reflected objects:
+
+ reflected = dict([(obj_definition(i), i) for i in
+ reflectedtable.indexes
+ | reflectedtable.constraints])
+
+ # assert we got primary key constraint and its name, Error
+ # if not in dict
+
+ assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+ 'group'), None)].name.upper() \
+ == primaryconsname.upper()
+
+ # Error if not in dict
+
+ eq_(
+ reflected[(Index, ('id_b', 'group'), False)].name,
+ normalind.name
+ )
+ assert (Index, ('id_b', ), True) in reflected
+ assert (Index, ('col', 'group'), True) in reflected
+
+ idx = reflected[(Index, ('id_a', 'id_b', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 2
+
+ idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 1
+
+ eq_(len(reflectedtable.constraints), 1)
+ eq_(len(reflectedtable.indexes), 5)
+
+
+class DBLinkReflectionTest(fixtures.TestBase):
+ __requires__ = 'oracle_test_dblink',
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ from sqlalchemy.testing import config
+ cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link')
+
+ # note that the synonym here is still not totally functional
+ # when accessing via a different username as we do with the
+ # multiprocess test suite, so testing here is minimal
+ with testing.db.connect() as conn:
+ conn.execute("create table test_table "
+ "(id integer primary key, data varchar2(50))")
+ conn.execute("create synonym test_table_syn "
+ "for test_table@%s" % cls.dblink)
+
+ @classmethod
+ def teardown_class(cls):
+ with testing.db.connect() as conn:
+ conn.execute("drop synonym test_table_syn")
+ conn.execute("drop table test_table")
+
+ def test_reflection(self):
+ """test the resolution of the synonym/dblink. """
+ m = MetaData()
+
+ t = Table('test_table_syn', m, autoload=True,
+ autoload_with=testing.db, oracle_resolve_synonyms=True)
+ eq_(list(t.c.keys()), ['id', 'data'])
+ eq_(list(t.primary_key), [t.c.id])
+
+
diff --git a/test/dialect/oracle/test_types.py b/test/dialect/oracle/test_types.py
new file mode 100644
index 000000000..3d08657d8
--- /dev/null
+++ b/test/dialect/oracle/test_types.py
@@ -0,0 +1,787 @@
+# coding: utf-8
+
+
+from sqlalchemy.testing import eq_
+from sqlalchemy import types as sqltypes, exc, schema
+from sqlalchemy.sql import table, column
+from sqlalchemy.testing import (fixtures,
+ AssertsExecutionResults,
+ AssertsCompiledSQL)
+from sqlalchemy import testing
+from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\
+ Index, MetaData, select, inspect, ForeignKey, String, func, \
+ TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \
+ literal_column, VARCHAR, create_engine, Date, NVARCHAR, \
+ ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
+ union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
+ PrimaryKeyConstraint, FLOAT
+from sqlalchemy.util import u, b
+from sqlalchemy import util
+from sqlalchemy.testing import assert_raises, assert_raises_message
+from sqlalchemy.testing.engines import testing_engine
+from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
+from sqlalchemy.engine import default
+import decimal
+from sqlalchemy.engine import url
+from sqlalchemy.testing.schema import Table, Column
+import datetime
+import os
+from sqlalchemy import sql
+from sqlalchemy.testing.mock import Mock
+
+
+class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = oracle.OracleDialect()
+
+ def test_no_clobs_for_string_params(self):
+ """test that simple string params get a DBAPI type of
+ VARCHAR, not CLOB. This is to prevent setinputsizes
+ from setting up cx_oracle.CLOBs on
+ string-based bind params [ticket:793]."""
+
+ class FakeDBAPI(object):
+ def __getattr__(self, attr):
+ return attr
+
+ dialect = oracle.OracleDialect()
+ dbapi = FakeDBAPI()
+
+ b = bindparam("foo", "hello world!")
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
+
+ b = bindparam("foo", "hello world!")
+ eq_(
+ b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+ 'STRING'
+ )
+
+ def test_long(self):
+ self.assert_compile(oracle.LONG(), "LONG")
+
+ def test_type_adapt(self):
+ dialect = cx_oracle.dialect()
+
+ for start, test in [
+ (Date(), cx_oracle._OracleDate),
+ (oracle.OracleRaw(), cx_oracle._OracleRaw),
+ (String(), String),
+ (VARCHAR(), cx_oracle._OracleString),
+ (DATE(), cx_oracle._OracleDate),
+ (oracle.DATE(), oracle.DATE),
+ (String(50), cx_oracle._OracleString),
+ (Unicode(), cx_oracle._OracleNVarChar),
+ (Text(), cx_oracle._OracleText),
+ (UnicodeText(), cx_oracle._OracleUnicodeText),
+ (NCHAR(), cx_oracle._OracleNVarChar),
+ (oracle.RAW(50), cx_oracle._OracleRaw),
+ ]:
+ assert isinstance(start.dialect_impl(dialect), test), \
+ "wanted %r got %r" % (test, start.dialect_impl(dialect))
+
+ def test_raw_compile(self):
+ self.assert_compile(oracle.RAW(), "RAW")
+ self.assert_compile(oracle.RAW(35), "RAW(35)")
+
+ def test_char_length(self):
+ self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)")
+
+ oracle8dialect = oracle.dialect()
+ oracle8dialect.server_version_info = (8, 0)
+ self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect)
+
+ self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)")
+ self.assert_compile(CHAR(50), "CHAR(50)")
+
+ def test_varchar_types(self):
+ dialect = oracle.dialect()
+ for typ, exp in [
+ (String(50), "VARCHAR2(50 CHAR)"),
+ (Unicode(50), "NVARCHAR2(50)"),
+ (NVARCHAR(50), "NVARCHAR2(50)"),
+ (VARCHAR(50), "VARCHAR(50 CHAR)"),
+ (oracle.NVARCHAR2(50), "NVARCHAR2(50)"),
+ (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"),
+ (String(), "VARCHAR2"),
+ (Unicode(), "NVARCHAR2"),
+ (NVARCHAR(), "NVARCHAR2"),
+ (VARCHAR(), "VARCHAR"),
+ (oracle.NVARCHAR2(), "NVARCHAR2"),
+ (oracle.VARCHAR2(), "VARCHAR2"),
+ ]:
+ self.assert_compile(typ, exp, dialect=dialect)
+
+ def test_interval(self):
+ for type_, expected in [(oracle.INTERVAL(),
+ 'INTERVAL DAY TO SECOND'),
+ (oracle.INTERVAL(day_precision=3),
+ 'INTERVAL DAY(3) TO SECOND'),
+ (oracle.INTERVAL(second_precision=5),
+ 'INTERVAL DAY TO SECOND(5)'),
+ (oracle.INTERVAL(day_precision=2,
+ second_precision=5),
+ 'INTERVAL DAY(2) TO SECOND(5)')]:
+ self.assert_compile(type_, expected)
+
+
+class TypesTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+ __dialect__ = oracle.OracleDialect()
+ __backend__ = True
+
+ @testing.fails_on('+zxjdbc', 'zxjdbc lacks the FIXED_CHAR dbapi type')
+ def test_fixed_char(self):
+ m = MetaData(testing.db)
+ t = Table('t1', m,
+ Column('id', Integer, primary_key=True),
+ Column('data', CHAR(30), nullable=False))
+
+ t.create()
+ try:
+ t.insert().execute(
+ dict(id=1, data="value 1"),
+ dict(id=2, data="value 2"),
+ dict(id=3, data="value 3")
+ )
+
+ eq_(
+ t.select().where(t.c.data == 'value 2').execute().fetchall(),
+ [(2, 'value 2 ')]
+ )
+
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ assert type(t2.c.data.type) is CHAR
+ eq_(
+ t2.select().where(t2.c.data == 'value 2').execute().fetchall(),
+ [(2, 'value 2 ')]
+ )
+
+ finally:
+ t.drop()
+
+ @testing.requires.returning
+ @testing.provide_metadata
+ def test_int_not_float(self):
+ m = self.metadata
+ t1 = Table('t1', m, Column('foo', Integer))
+ t1.create()
+ r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
+ x = r.scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ x = t1.select().scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ @testing.requires.returning
+ @testing.provide_metadata
+ def test_int_not_float_no_coerce_decimal(self):
+ engine = testing_engine(options=dict(coerce_to_decimal=False))
+
+ m = self.metadata
+ t1 = Table('t1', m, Column('foo', Integer))
+ t1.create()
+ r = engine.execute(t1.insert().values(foo=5).returning(t1.c.foo))
+ x = r.scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ x = t1.select().scalar()
+ assert x == 5
+ assert isinstance(x, int)
+
+ @testing.provide_metadata
+ def test_rowid(self):
+ metadata = self.metadata
+ t = Table('t1', metadata, Column('x', Integer))
+ t.create()
+ t.insert().execute(x=5)
+ s1 = select([t])
+ s2 = select([column('rowid')]).select_from(s1)
+ rowid = s2.scalar()
+
+ # the ROWID type is not really needed here,
+ # as cx_oracle just treats it as a string,
+ # but we want to make sure the ROWID works...
+ rowid_col = column('rowid', oracle.ROWID)
+ s3 = select([t.c.x, rowid_col]) \
+ .where(rowid_col == cast(rowid, oracle.ROWID))
+ eq_(s3.select().execute().fetchall(), [(5, rowid)])
+
+ @testing.fails_on('+zxjdbc',
+ 'Not yet known how to pass values of the '
+ 'INTERVAL type')
+ @testing.provide_metadata
+ def test_interval(self):
+ metadata = self.metadata
+ interval_table = Table('intervaltable', metadata, Column('id',
+ Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('day_interval',
+ oracle.INTERVAL(day_precision=3)))
+ metadata.create_all()
+ interval_table.insert().\
+ execute(day_interval=datetime.timedelta(days=35, seconds=5743))
+ row = interval_table.select().execute().first()
+ eq_(row['day_interval'], datetime.timedelta(days=35,
+ seconds=5743))
+
+ @testing.provide_metadata
+ def test_numerics(self):
+ m = self.metadata
+ t1 = Table('t1', m,
+ Column('intcol', Integer),
+ Column('numericcol', Numeric(precision=9, scale=2)),
+ Column('floatcol1', Float()),
+ Column('floatcol2', FLOAT()),
+ Column('doubleprec', oracle.DOUBLE_PRECISION),
+ Column('numbercol1', oracle.NUMBER(9)),
+ Column('numbercol2', oracle.NUMBER(9, 3)),
+ Column('numbercol3', oracle.NUMBER))
+ t1.create()
+ t1.insert().execute(
+ intcol=1,
+ numericcol=5.2,
+ floatcol1=6.5,
+ floatcol2=8.5,
+ doubleprec=9.5,
+ numbercol1=12,
+ numbercol2=14.85,
+ numbercol3=15.76
+ )
+
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+
+ for row in (
+ t1.select().execute().first(),
+ t2.select().execute().first()
+ ):
+ for i, (val, type_) in enumerate((
+ (1, int),
+ (decimal.Decimal("5.2"), decimal.Decimal),
+ (6.5, float),
+ (8.5, float),
+ (9.5, float),
+ (12, int),
+ (decimal.Decimal("14.85"), decimal.Decimal),
+ (15.76, float),
+ )):
+ eq_(row[i], val)
+ assert isinstance(row[i], type_), '%r is not %r' \
+ % (row[i], type_)
+
+ @testing.provide_metadata
+ def test_numeric_infinity_float(self):
+ m = self.metadata
+ t1 = Table('t1', m,
+ Column("intcol", Integer),
+ Column("numericcol", oracle.BINARY_DOUBLE(asdecimal=False)))
+ t1.create()
+ t1.insert().execute(
+ intcol=1,
+ numericcol=float("inf"),
+ )
+
+ eq_(
+ select([t1.c.numericcol]).scalar(),
+ float("inf")
+ )
+
+ eq_(
+ testing.db.scalar("select numericcol from t1"),
+ float("inf"))
+
+ @testing.provide_metadata
+ def test_numerics_broken_inspection(self):
+ """Numeric scenarios where Oracle type info is 'broken',
+ returning us precision, scale of the form (0, 0) or (0, -127).
+ We convert to Decimal and let int()/float() processors take over.
+
+ """
+
+ metadata = self.metadata
+
+ # this test requires cx_oracle 5
+
+ foo = Table('foo', metadata,
+ Column('idata', Integer),
+ Column('ndata', Numeric(20, 2)),
+ Column('ndata2', Numeric(20, 2)),
+ Column('nidata', Numeric(5, 0)),
+ Column('fdata', Float()))
+ foo.create()
+
+ foo.insert().execute({
+ 'idata': 5,
+ 'ndata': decimal.Decimal("45.6"),
+ 'ndata2': decimal.Decimal("45.0"),
+ 'nidata': decimal.Decimal('53'),
+ 'fdata': 45.68392
+ })
+
+ stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
+
+ row = testing.db.execute(stmt).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, int, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ 53, 45.683920000000001)
+ )
+
+ # with a nested subquery,
+ # both Numeric values that don't have decimal places, regardless
+ # of their originating type, come back as ints with no useful
+ # typing information beyond "numeric". So native handler
+ # must convert to int.
+ # this means our Decimal converters need to run no matter what.
+ # totally sucks.
+
+ stmt = """
+ SELECT
+ (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata,
+ (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) FROM DUAL)
+ AS ndata,
+ (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) FROM DUAL)
+ AS ndata2,
+ (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) FROM DUAL)
+ AS nidata,
+ (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata
+ FROM dual
+ """
+ row = testing.db.execute(stmt).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
+ )
+
+ row = testing.db.execute(text(stmt,
+ typemap={
+ 'idata': Integer(),
+ 'ndata': Numeric(20, 2),
+ 'ndata2': Numeric(20, 2),
+ 'nidata': Numeric(5, 0),
+ 'fdata': Float()})).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
+ )
+
+ stmt = """
+ SELECT
+ anon_1.idata AS anon_1_idata,
+ anon_1.ndata AS anon_1_ndata,
+ anon_1.ndata2 AS anon_1_ndata2,
+ anon_1.nidata AS anon_1_nidata,
+ anon_1.fdata AS anon_1_fdata
+ FROM (SELECT idata, ndata, ndata2, nidata, fdata
+ FROM (
+ SELECT
+ (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata,
+ (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2))
+ FROM DUAL) AS ndata,
+ (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2))
+ FROM DUAL) AS ndata2,
+ (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0))
+ FROM DUAL) AS nidata,
+ (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL)
+ AS fdata
+ FROM dual
+ )
+ WHERE ROWNUM >= 0) anon_1
+ """
+ row = testing.db.execute(stmt).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, int, int, decimal.Decimal]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
+ )
+
+ row = testing.db.execute(text(stmt,
+ typemap={
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2),
+ 'anon_1_ndata2': Numeric(20, 2),
+ 'anon_1_nidata': Numeric(5, 0),
+ 'anon_1_fdata': Float()
+ })).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+ )
+ eq_(
+ row,
+ (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+ decimal.Decimal('53'), 45.683920000000001)
+ )
+
+ row = testing.db.execute(text(
+ stmt,
+ typemap={
+ 'anon_1_idata': Integer(),
+ 'anon_1_ndata': Numeric(20, 2, asdecimal=False),
+ 'anon_1_ndata2': Numeric(20, 2, asdecimal=False),
+ 'anon_1_nidata': Numeric(5, 0, asdecimal=False),
+ 'anon_1_fdata': Float(asdecimal=True)
+ })).fetchall()[0]
+ eq_(
+ [type(x) for x in row],
+ [int, float, float, float, decimal.Decimal]
+ )
+ eq_(
+ row,
+ (5, 45.6, 45, 53, decimal.Decimal('45.68392'))
+ )
+
+ def test_numeric_no_coerce_decimal_mode(self):
+ engine = testing_engine(options=dict(coerce_to_decimal=False))
+
+ # raw SQL no longer coerces to decimal
+ value = engine.scalar("SELECT 5.66 FROM DUAL")
+ assert isinstance(value, float)
+
+ # explicit typing still *does* coerce to decimal
+ # (change in 1.2)
+ value = engine.scalar(
+ text("SELECT 5.66 AS foo FROM DUAL").
+ columns(foo=Numeric(4, 2, asdecimal=True)))
+ assert isinstance(value, decimal.Decimal)
+
+ # default behavior is raw SQL coerces to decimal
+ value = testing.db.scalar("SELECT 5.66 FROM DUAL")
+ assert isinstance(value, decimal.Decimal)
+
+ @testing.only_on("oracle+cx_oracle", "cx_oracle-specific feature")
+ @testing.fails_if(
+ testing.requires.python3,
+ "cx_oracle always returns unicode on py3k")
+ def test_coerce_to_unicode(self):
+ engine = testing_engine(options=dict(coerce_to_unicode=True))
+ value = engine.scalar("SELECT 'hello' FROM DUAL")
+ assert isinstance(value, util.text_type)
+
+ value = testing.db.scalar("SELECT 'hello' FROM DUAL")
+ assert isinstance(value, util.binary_type)
+
+ @testing.provide_metadata
+ def test_reflect_dates(self):
+ metadata = self.metadata
+ Table(
+ "date_types", metadata,
+ Column('d1', sqltypes.DATE),
+ Column('d2', oracle.DATE),
+ Column('d3', TIMESTAMP),
+ Column('d4', TIMESTAMP(timezone=True)),
+ Column('d5', oracle.INTERVAL(second_precision=5)),
+ )
+ metadata.create_all()
+ m = MetaData(testing.db)
+ t1 = Table(
+ "date_types", m,
+ autoload=True)
+ assert isinstance(t1.c.d1.type, oracle.DATE)
+ assert isinstance(t1.c.d1.type, DateTime)
+ assert isinstance(t1.c.d2.type, oracle.DATE)
+ assert isinstance(t1.c.d2.type, DateTime)
+ assert isinstance(t1.c.d3.type, TIMESTAMP)
+ assert not t1.c.d3.type.timezone
+ assert isinstance(t1.c.d4.type, TIMESTAMP)
+ assert t1.c.d4.type.timezone
+ assert isinstance(t1.c.d5.type, oracle.INTERVAL)
+
+ def _dont_test_reflect_all_types_schema(self):
+ types_table = Table('all_types', MetaData(testing.db),
+ Column('owner', String(30), primary_key=True),
+ Column('type_name', String(30), primary_key=True),
+ autoload=True, oracle_resolve_synonyms=True)
+ for row in types_table.select().execute().fetchall():
+ [row[k] for k in row.keys()]
+
+ @testing.provide_metadata
+ def test_raw_roundtrip(self):
+ metadata = self.metadata
+ raw_table = Table('raw', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', oracle.RAW(35)))
+ metadata.create_all()
+ testing.db.execute(raw_table.insert(), id=1, data=b("ABCDEF"))
+ eq_(
+ testing.db.execute(raw_table.select()).first(),
+ (1, b("ABCDEF"))
+ )
+
+ @testing.provide_metadata
+ def test_reflect_nvarchar(self):
+ metadata = self.metadata
+ Table('tnv', metadata, Column('data', sqltypes.NVARCHAR(255)))
+ metadata.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('tnv', m2, autoload=True)
+ assert isinstance(t2.c.data.type, sqltypes.NVARCHAR)
+
+ if testing.against('oracle+cx_oracle'):
+ # nvarchar returns unicode natively. cx_oracle
+ # _OracleNVarChar type should be at play here.
+ assert isinstance(
+ t2.c.data.type.dialect_impl(testing.db.dialect),
+ cx_oracle._OracleNVarChar)
+
+ data = u('m’a réveillé.')
+ t2.insert().execute(data=data)
+ res = t2.select().execute().first()['data']
+ eq_(res, data)
+ assert isinstance(res, util.text_type)
+
+ @testing.provide_metadata
+ def test_char_length(self):
+ metadata = self.metadata
+ t1 = Table('t1', metadata,
+ Column("c1", VARCHAR(50)),
+ Column("c2", NVARCHAR(250)),
+ Column("c3", CHAR(200)))
+ t1.create()
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ eq_(t2.c.c1.type.length, 50)
+ eq_(t2.c.c2.type.length, 250)
+ eq_(t2.c.c3.type.length, 200)
+
+ @testing.provide_metadata
+ def test_long_type(self):
+ metadata = self.metadata
+
+ t = Table('t', metadata, Column('data', oracle.LONG))
+ metadata.create_all(testing.db)
+ testing.db.execute(t.insert(), data='xyz')
+ eq_(
+ testing.db.scalar(select([t.c.data])),
+ "xyz"
+ )
+
+ def test_longstring(self):
+ metadata = MetaData(testing.db)
+ testing.db.execute("""
+ CREATE TABLE Z_TEST
+ (
+ ID NUMERIC(22) PRIMARY KEY,
+ ADD_USER VARCHAR2(20) NOT NULL
+ )
+ """)
+ try:
+ t = Table("z_test", metadata, autoload=True)
+ t.insert().execute(id=1.0, add_user='foobar')
+ assert t.select().execute().fetchall() == [(1, 'foobar')]
+ finally:
+ testing.db.execute("DROP TABLE Z_TEST")
+
+
+class LOBFetchTest(fixtures.TablesTest):
+ __only_on__ = 'oracle'
+ __backend__ = True
+
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "z_test",
+ metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', Text),
+ Column('bindata', LargeBinary))
+
+ Table(
+ 'binary_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', LargeBinary)
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.data = data = [
+ dict(
+ id=i, data='this is text %d' % i,
+ bindata=b('this is binary %d' % i)
+ ) for i in range(1, 20)
+ ]
+
+ testing.db.execute(cls.tables.z_test.insert(), data)
+
+ binary_table = cls.tables.binary_table
+ fname = os.path.join(
+ os.path.dirname(__file__), "..", "..",
+ 'binary_data_one.dat')
+ with open(fname, "rb") as file_:
+ cls.stream = stream = file_.read(12000)
+
+ for i in range(1, 11):
+ binary_table.insert().execute(id=i, data=stream)
+
+ def test_lobs_without_convert(self):
+ engine = testing_engine(options=dict(auto_convert_lobs=False))
+ t = self.tables.z_test
+ row = engine.execute(t.select().where(t.c.id == 1)).first()
+ eq_(row['data'].read(), 'this is text 1')
+ eq_(row['bindata'].read(), b('this is binary 1'))
+
+ def test_lobs_with_convert(self):
+ t = self.tables.z_test
+ row = testing.db.execute(t.select().where(t.c.id == 1)).first()
+ eq_(row['data'], 'this is text 1')
+ eq_(row['bindata'], b('this is binary 1'))
+
+ def test_lobs_with_convert_raw(self):
+ row = testing.db.execute("select data, bindata from z_test").first()
+ eq_(row['data'], 'this is text 1')
+ eq_(row['bindata'], b('this is binary 1'))
+
+ def test_lobs_without_convert_many_rows(self):
+ engine = testing_engine(
+ options=dict(auto_convert_lobs=False, arraysize=1))
+ result = engine.execute(
+ "select id, data, bindata from z_test order by id")
+ results = result.fetchall()
+
+ def go():
+ eq_(
+ [
+ dict(
+ id=row["id"],
+ data=row["data"].read(),
+ bindata=row["bindata"].read()
+ ) for row in results
+ ],
+ self.data)
+ # this comes from cx_Oracle because these are raw
+ # cx_Oracle.Variable objects
+ if testing.requires.oracle5x.enabled:
+ assert_raises_message(
+ testing.db.dialect.dbapi.ProgrammingError,
+ "LOB variable no longer valid after subsequent fetch",
+ go
+ )
+ else:
+ go()
+
+ def test_lobs_with_convert_many_rows(self):
+ # even with low arraysize, lobs are fine in autoconvert
+ engine = testing_engine(
+ options=dict(auto_convert_lobs=True, arraysize=1))
+ result = engine.execute(
+ "select id, data, bindata from z_test order by id")
+ results = result.fetchall()
+
+ eq_(
+ [
+ dict(
+ id=row["id"],
+ data=row["data"],
+ bindata=row["bindata"]
+ ) for row in results
+ ],
+ self.data)
+
+ def test_large_stream(self):
+ binary_table = self.tables.binary_table
+ result = binary_table.select().order_by(binary_table.c.id).\
+ execute().fetchall()
+ eq_(result, [(i, self.stream) for i in range(1, 11)])
+
+ def test_large_stream_single_arraysize(self):
+ binary_table = self.tables.binary_table
+ eng = testing_engine(options={'arraysize': 1})
+ result = eng.execute(binary_table.select().
+ order_by(binary_table.c.id)).fetchall()
+ eq_(result, [(i, self.stream) for i in range(1, 11)])
+
+
+class EuroNumericTest(fixtures.TestBase):
+ """
+ test the numeric output_type_handler when using non-US locale for NLS_LANG.
+ """
+
+ __only_on__ = 'oracle+cx_oracle'
+ __backend__ = True
+
+ def setup(self):
+ connect = testing.db.pool._creator
+
+ def _creator():
+ conn = connect()
+ cursor = conn.cursor()
+ cursor.execute("ALTER SESSION SET NLS_TERRITORY='GERMANY'")
+ cursor.close()
+ return conn
+
+ self.engine = testing_engine(options={"creator": _creator})
+
+ def teardown(self):
+ self.engine.dispose()
+
+ def test_were_getting_a_comma(self):
+ connection = self.engine.pool._creator()
+ cursor = connection.cursor()
+ try:
+ cx_Oracle = self.engine.dialect.dbapi
+
+ def output_type_handler(cursor, name, defaultType,
+ size, precision, scale):
+ return cursor.var(cx_Oracle.STRING, 255,
+ arraysize=cursor.arraysize)
+ cursor.outputtypehandler = output_type_handler
+ cursor.execute("SELECT 1.1 FROM DUAL")
+ row = cursor.fetchone()
+ eq_(row[0], "1,1")
+ finally:
+ cursor.close()
+ connection.close()
+
+ def test_output_type_handler(self):
+ with self.engine.connect() as conn:
+ for stmt, exp, kw in [
+ ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}),
+ ("SELECT CAST(15 AS INTEGER) FROM DUAL", 15, {}),
+ ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL",
+ decimal.Decimal("15"), {}),
+ ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL",
+ decimal.Decimal("0.1"), {}),
+ ("SELECT :num FROM DUAL", decimal.Decimal("2.5"),
+ {'num': decimal.Decimal("2.5")}),
+
+ (
+ text(
+ "SELECT CAST(28.532 AS NUMERIC(5, 3)) "
+ "AS val FROM DUAL").columns(
+ val=Numeric(5, 3, asdecimal=True)),
+ decimal.Decimal("28.532"), {}
+ )
+ ]:
+ test_exp = conn.scalar(stmt, **kw)
+ eq_(
+ test_exp,
+ exp
+ )
+ assert type(test_exp) is type(exp)
+
+
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
deleted file mode 100644
index 5ca95aea9..000000000
--- a/test/dialect/test_oracle.py
+++ /dev/null
@@ -1,2316 +0,0 @@
-# coding: utf-8
-
-
-from sqlalchemy.testing import eq_
-from sqlalchemy import *
-from sqlalchemy import types as sqltypes, exc, schema
-from sqlalchemy.sql import table, column
-from sqlalchemy.sql.elements import quoted_name
-from sqlalchemy.testing import (fixtures,
- AssertsExecutionResults,
- AssertsCompiledSQL)
-from sqlalchemy import testing
-from sqlalchemy.util import u, b
-from sqlalchemy import util
-from sqlalchemy.testing import assert_raises, assert_raises_message
-from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
-from sqlalchemy.engine import default
-import decimal
-from sqlalchemy.engine import url
-from sqlalchemy.testing.schema import Table, Column
-import datetime
-import os
-from sqlalchemy import sql
-from sqlalchemy.testing.mock import Mock
-
-
-class DialectTest(fixtures.TestBase):
- def test_cx_oracle_version_parse(self):
- dialect = cx_oracle.OracleDialect_cx_oracle()
-
- eq_(
- dialect._parse_cx_oracle_ver("5.2"),
- (5, 2)
- )
-
- eq_(
- dialect._parse_cx_oracle_ver("5.0.1"),
- (5, 0, 1)
- )
-
- eq_(
- dialect._parse_cx_oracle_ver("6.0b1"),
- (6, 0)
- )
-
- def test_twophase_arg(self):
-
- mock_dbapi = Mock(version="5.0.3")
- dialect = cx_oracle.OracleDialect_cx_oracle(dbapi=mock_dbapi)
- args = dialect.create_connect_args(
- url.make_url("oracle+cx_oracle://a:b@host/db"))
-
- eq_(args[1]['twophase'], True)
-
- mock_dbapi = Mock(version="5.0.3")
- dialect = cx_oracle.OracleDialect_cx_oracle(
- dbapi=mock_dbapi, allow_twophase=False)
- args = dialect.create_connect_args(
- url.make_url("oracle+cx_oracle://a:b@host/db"))
-
- eq_(args[1]['twophase'], False)
-
- mock_dbapi = Mock(version="6.0b1")
- dialect = cx_oracle.OracleDialect_cx_oracle(dbapi=mock_dbapi)
- args = dialect.create_connect_args(
- url.make_url("oracle+cx_oracle://a:b@host/db"))
-
- assert 'twophase' not in args[1]
-
-
-class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
- __only_on__ = 'oracle+cx_oracle'
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- testing.db.execute("""
- create or replace procedure foo(x_in IN number, x_out OUT number,
- y_out OUT number, z_out OUT varchar) IS
- retval number;
- begin
- retval := 6;
- x_out := 10;
- y_out := x_in * 15;
- z_out := NULL;
- end;
- """)
-
- def test_out_params(self):
- result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
- ':z_out); end;',
- bindparams=[bindparam('x_in', Float),
- outparam('x_out', Integer),
- outparam('y_out', Float),
- outparam('z_out', String)]),
- x_in=5)
- eq_(result.out_parameters,
- {'x_out': 10, 'y_out': 75, 'z_out': None})
- assert isinstance(result.out_parameters['x_out'], int)
-
- @classmethod
- def teardown_class(cls):
- testing.db.execute("DROP PROCEDURE foo")
-
-
-class CXOracleArgsTest(fixtures.TestBase):
- __only_on__ = 'oracle+cx_oracle'
- __backend__ = True
-
- def test_autosetinputsizes(self):
- dialect = cx_oracle.dialect()
- assert dialect.auto_setinputsizes
-
- dialect = cx_oracle.dialect(auto_setinputsizes=False)
- assert not dialect.auto_setinputsizes
-
- def test_exclude_inputsizes_none(self):
- dialect = cx_oracle.dialect(exclude_setinputsizes=None)
- eq_(dialect.exclude_setinputsizes, set())
-
- def test_exclude_inputsizes_custom(self):
- import cx_Oracle
- dialect = cx_oracle.dialect(dbapi=cx_Oracle,
- exclude_setinputsizes=('NCLOB',))
- eq_(dialect.exclude_setinputsizes, set([cx_Oracle.NCLOB]))
-
-
-class QuotedBindRoundTripTest(fixtures.TestBase):
-
- __only_on__ = 'oracle'
- __backend__ = True
-
- @testing.provide_metadata
- def test_table_round_trip(self):
- oracle.RESERVED_WORDS.remove('UNION')
-
- metadata = self.metadata
- table = Table("t1", metadata,
- Column("option", Integer),
- Column("plain", Integer, quote=True),
- # test that quote works for a reserved word
- # that the dialect isn't aware of when quote
- # is set
- Column("union", Integer, quote=True))
- metadata.create_all()
-
- table.insert().execute(
- {"option": 1, "plain": 1, "union": 1}
- )
- eq_(
- testing.db.execute(table.select()).first(),
- (1, 1, 1)
- )
- table.update().values(option=2, plain=2, union=2).execute()
- eq_(
- testing.db.execute(table.select()).first(),
- (2, 2, 2)
- )
-
- def test_numeric_bind_round_trip(self):
- eq_(
- testing.db.scalar(
- select([
- literal_column("2", type_=Integer()) +
- bindparam("2_1", value=2)])
- ),
- 4
- )
-
- @testing.provide_metadata
- def test_numeric_bind_in_crud(self):
- t = Table(
- "asfd", self.metadata,
- Column("100K", Integer)
- )
- t.create()
-
- testing.db.execute(t.insert(), {"100K": 10})
- eq_(
- testing.db.scalar(t.select()), 10
- )
-
-
-class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = "oracle" # oracle.dialect()
-
- def test_true_false(self):
- self.assert_compile(
- sql.false(), "0"
- )
- self.assert_compile(
- sql.true(),
- "1"
- )
-
- def test_owner(self):
- meta = MetaData()
- parent = Table('parent', meta, Column('id', Integer,
- primary_key=True), Column('name', String(50)),
- schema='ed')
- child = Table('child', meta, Column('id', Integer,
- primary_key=True), Column('parent_id', Integer,
- ForeignKey('ed.parent.id')), schema='ed')
- self.assert_compile(parent.join(child),
- 'ed.parent JOIN ed.child ON ed.parent.id = '
- 'ed.child.parent_id')
-
- def test_subquery(self):
- t = table('sometable', column('col1'), column('col2'))
- s = select([t])
- s = select([s.c.col1, s.c.col2])
-
- self.assert_compile(s, "SELECT col1, col2 FROM (SELECT "
- "sometable.col1 AS col1, sometable.col2 "
- "AS col2 FROM sometable)")
-
- def test_bindparam_quote(self):
- """test that bound parameters take on quoting for reserved words,
- column names quote flag enabled."""
- # note: this is only in cx_oracle at the moment. not sure
- # what other hypothetical oracle dialects might need
-
- self.assert_compile(
- bindparam("option"), ':"option"'
- )
- self.assert_compile(
- bindparam("plain"), ':plain'
- )
- t = Table("s", MetaData(), Column('plain', Integer, quote=True))
- self.assert_compile(
- t.insert().values(plain=5),
- 'INSERT INTO s ("plain") VALUES (:"plain")'
- )
- self.assert_compile(
- t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"'
- )
-
- def test_cte(self):
- part = table(
- 'part',
- column('part'),
- column('sub_part'),
- column('quantity')
- )
-
- included_parts = select([
- part.c.sub_part, part.c.part, part.c.quantity
- ]).where(part.c.part == "p1").\
- cte(name="included_parts", recursive=True).\
- suffix_with(
- "search depth first by part set ord1",
- "cycle part set y_cycle to 1 default 0", dialect='oracle')
-
- incl_alias = included_parts.alias("pr1")
- parts_alias = part.alias("p")
- included_parts = included_parts.union_all(
- select([
- parts_alias.c.sub_part,
- parts_alias.c.part, parts_alias.c.quantity
- ]).where(parts_alias.c.part == incl_alias.c.sub_part)
- )
-
- q = select([
- included_parts.c.sub_part,
- func.sum(included_parts.c.quantity).label('total_quantity')]).\
- group_by(included_parts.c.sub_part)
-
- self.assert_compile(
- q,
- "WITH included_parts(sub_part, part, quantity) AS "
- "(SELECT part.sub_part AS sub_part, part.part AS part, "
- "part.quantity AS quantity FROM part WHERE part.part = :part_1 "
- "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, "
- "p.quantity AS quantity FROM part p, included_parts pr1 "
- "WHERE p.part = pr1.sub_part) "
- "search depth first by part set ord1 cycle part set "
- "y_cycle to 1 default 0 "
- "SELECT included_parts.sub_part, sum(included_parts.quantity) "
- "AS total_quantity FROM included_parts "
- "GROUP BY included_parts.sub_part"
- )
-
- def test_limit(self):
- t = table('sometable', column('col1'), column('col2'))
- s = select([t])
- c = s.compile(dialect=oracle.OracleDialect())
- assert t.c.col1 in set(c._create_result_map()['col1'][1])
- s = select([t]).limit(10).offset(20)
- self.assert_compile(s,
- 'SELECT col1, col2 FROM (SELECT col1, '
- 'col2, ROWNUM AS ora_rn FROM (SELECT '
- 'sometable.col1 AS col1, sometable.col2 AS '
- 'col2 FROM sometable) WHERE ROWNUM <= '
- ':param_1 + :param_2) WHERE ora_rn > :param_2',
- checkparams={'param_1': 10, 'param_2': 20})
-
- c = s.compile(dialect=oracle.OracleDialect())
- eq_(len(c._result_columns), 2)
- assert t.c.col1 in set(c._create_result_map()['col1'][1])
-
- s2 = select([s.c.col1, s.c.col2])
- self.assert_compile(s2,
- 'SELECT col1, col2 FROM (SELECT col1, col2 '
- 'FROM (SELECT col1, col2, ROWNUM AS ora_rn '
- 'FROM (SELECT sometable.col1 AS col1, '
- 'sometable.col2 AS col2 FROM sometable) '
- 'WHERE ROWNUM <= :param_1 + :param_2) '
- 'WHERE ora_rn > :param_2)',
- checkparams={'param_1': 10, 'param_2': 20})
-
- self.assert_compile(s2,
- 'SELECT col1, col2 FROM (SELECT col1, col2 '
- 'FROM (SELECT col1, col2, ROWNUM AS ora_rn '
- 'FROM (SELECT sometable.col1 AS col1, '
- 'sometable.col2 AS col2 FROM sometable) '
- 'WHERE ROWNUM <= :param_1 + :param_2) '
- 'WHERE ora_rn > :param_2)')
- c = s2.compile(dialect=oracle.OracleDialect())
- eq_(len(c._result_columns), 2)
- assert s.c.col1 in set(c._create_result_map()['col1'][1])
-
- s = select([t]).limit(10).offset(20).order_by(t.c.col2)
- self.assert_compile(s,
- 'SELECT col1, col2 FROM (SELECT col1, '
- 'col2, ROWNUM AS ora_rn FROM (SELECT '
- 'sometable.col1 AS col1, sometable.col2 AS '
- 'col2 FROM sometable ORDER BY '
- 'sometable.col2) WHERE ROWNUM <= '
- ':param_1 + :param_2) WHERE ora_rn > :param_2',
- checkparams={'param_1': 10, 'param_2': 20}
- )
- c = s.compile(dialect=oracle.OracleDialect())
- eq_(len(c._result_columns), 2)
- assert t.c.col1 in set(c._create_result_map()['col1'][1])
-
- s = select([t], for_update=True).limit(10).order_by(t.c.col2)
- self.assert_compile(s,
- 'SELECT col1, col2 FROM (SELECT '
- 'sometable.col1 AS col1, sometable.col2 AS '
- 'col2 FROM sometable ORDER BY '
- 'sometable.col2) WHERE ROWNUM <= :param_1 '
- 'FOR UPDATE')
-
- s = select([t],
- for_update=True).limit(10).offset(20).order_by(t.c.col2)
- self.assert_compile(s,
- 'SELECT col1, col2 FROM (SELECT col1, '
- 'col2, ROWNUM AS ora_rn FROM (SELECT '
- 'sometable.col1 AS col1, sometable.col2 AS '
- 'col2 FROM sometable ORDER BY '
- 'sometable.col2) WHERE ROWNUM <= '
- ':param_1 + :param_2) WHERE ora_rn > :param_2 FOR '
- 'UPDATE')
-
- def test_for_update(self):
- table1 = table('mytable',
- column('myid'), column('name'), column('description'))
-
- self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-
- self.assert_compile(
- table1
- .select(table1.c.myid == 7)
- .with_for_update(of=table1.c.myid),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 "
- "FOR UPDATE OF mytable.myid")
-
- self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(nowait=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT")
-
- self.assert_compile(
- table1
- .select(table1.c.myid == 7)
- .with_for_update(nowait=True, of=table1.c.myid),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 "
- "FOR UPDATE OF mytable.myid NOWAIT")
-
- self.assert_compile(
- table1
- .select(table1.c.myid == 7)
- .with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
- "mytable.myid, mytable.name NOWAIT")
-
- self.assert_compile(
- table1.select(table1.c.myid == 7)
- .with_for_update(skip_locked=True,
- of=[table1.c.myid, table1.c.name]),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
- "mytable.myid, mytable.name SKIP LOCKED")
-
- # key_share has no effect
- self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(key_share=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-
- # read has no effect
- self.assert_compile(
- table1
- .select(table1.c.myid == 7)
- .with_for_update(read=True, key_share=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-
- ta = table1.alias()
- self.assert_compile(
- ta
- .select(ta.c.myid == 7)
- .with_for_update(of=[ta.c.myid, ta.c.name]),
- "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
- "FROM mytable mytable_1 "
- "WHERE mytable_1.myid = :myid_1 FOR UPDATE OF "
- "mytable_1.myid, mytable_1.name"
- )
-
- def test_for_update_of_w_limit_adaption_col_present(self):
- table1 = table('mytable', column('myid'), column('name'))
-
- self.assert_compile(
- select([table1.c.myid, table1.c.name]).
- where(table1.c.myid == 7).
- with_for_update(nowait=True, of=table1.c.name).
- limit(10),
- "SELECT myid, name FROM "
- "(SELECT mytable.myid AS myid, mytable.name AS name "
- "FROM mytable WHERE mytable.myid = :myid_1) "
- "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
- )
-
- def test_for_update_of_w_limit_adaption_col_unpresent(self):
- table1 = table('mytable', column('myid'), column('name'))
-
- self.assert_compile(
- select([table1.c.myid]).
- where(table1.c.myid == 7).
- with_for_update(nowait=True, of=table1.c.name).
- limit(10),
- "SELECT myid FROM "
- "(SELECT mytable.myid AS myid, mytable.name AS name "
- "FROM mytable WHERE mytable.myid = :myid_1) "
- "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
- )
-
- def test_for_update_of_w_limit_offset_adaption_col_present(self):
- table1 = table('mytable', column('myid'), column('name'))
-
- self.assert_compile(
- select([table1.c.myid, table1.c.name]).
- where(table1.c.myid == 7).
- with_for_update(nowait=True, of=table1.c.name).
- limit(10).offset(50),
- "SELECT myid, name FROM (SELECT myid, name, ROWNUM AS ora_rn "
- "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
- "FROM mytable WHERE mytable.myid = :myid_1) "
- "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
- "FOR UPDATE OF name NOWAIT",
- )
-
- def test_for_update_of_w_limit_offset_adaption_col_unpresent(self):
- table1 = table('mytable', column('myid'), column('name'))
-
- self.assert_compile(
- select([table1.c.myid]).
- where(table1.c.myid == 7).
- with_for_update(nowait=True, of=table1.c.name).
- limit(10).offset(50),
- "SELECT myid FROM (SELECT myid, ROWNUM AS ora_rn, name "
- "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
- "FROM mytable WHERE mytable.myid = :myid_1) "
- "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
- "FOR UPDATE OF name NOWAIT",
- )
-
- def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self):
- table1 = table('mytable', column('myid'), column('foo'), column('bar'))
-
- self.assert_compile(
- select([table1.c.myid, table1.c.bar]).
- where(table1.c.myid == 7).
- with_for_update(nowait=True, of=[table1.c.foo, table1.c.bar]).
- limit(10).offset(50),
- "SELECT myid, bar FROM (SELECT myid, bar, ROWNUM AS ora_rn, "
- "foo FROM (SELECT mytable.myid AS myid, mytable.bar AS bar, "
- "mytable.foo AS foo FROM mytable WHERE mytable.myid = :myid_1) "
- "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
- "FOR UPDATE OF foo, bar NOWAIT"
- )
-
- def test_limit_preserves_typing_information(self):
- class MyType(TypeDecorator):
- impl = Integer
-
- stmt = select([type_coerce(column('x'), MyType).label('foo')]).limit(1)
- dialect = oracle.dialect()
- compiled = stmt.compile(dialect=dialect)
- assert isinstance(compiled._create_result_map()['foo'][-1], MyType)
-
- def test_use_binds_for_limits_disabled(self):
- t = table('sometable', column('col1'), column('col2'))
- dialect = oracle.OracleDialect(use_binds_for_limits=False)
-
- self.assert_compile(
- select([t]).limit(10),
- "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
- "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM <= 10",
- dialect=dialect)
-
- self.assert_compile(
- select([t]).offset(10),
- "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
- "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable)) WHERE ora_rn > 10",
- dialect=dialect)
-
- self.assert_compile(
- select([t]).limit(10).offset(10),
- "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
- "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable) WHERE ROWNUM <= 20) WHERE ora_rn > 10",
- dialect=dialect)
-
- def test_use_binds_for_limits_enabled(self):
- t = table('sometable', column('col1'), column('col2'))
- dialect = oracle.OracleDialect(use_binds_for_limits=True)
-
- self.assert_compile(
- select([t]).limit(10),
- "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
- "sometable.col2 AS col2 FROM sometable) WHERE ROWNUM "
- "<= :param_1",
- dialect=dialect)
-
- self.assert_compile(
- select([t]).offset(10),
- "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
- "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable)) WHERE ora_rn > :param_1",
- dialect=dialect)
-
- self.assert_compile(
- select([t]).limit(10).offset(10),
- "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
- "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable) WHERE ROWNUM <= :param_1 + :param_2) "
- "WHERE ora_rn > :param_2",
- dialect=dialect,
- checkparams={'param_1': 10, 'param_2': 10})
-
- def test_long_labels(self):
- dialect = default.DefaultDialect()
- dialect.max_identifier_length = 30
-
- ora_dialect = oracle.dialect()
-
- m = MetaData()
- a_table = Table(
- 'thirty_characters_table_xxxxxx',
- m,
- Column('id', Integer, primary_key=True)
- )
-
- other_table = Table(
- 'other_thirty_characters_table_',
- m,
- Column('id', Integer, primary_key=True),
- Column('thirty_characters_table_id',
- Integer,
- ForeignKey('thirty_characters_table_xxxxxx.id'),
- primary_key=True))
-
- anon = a_table.alias()
- self.assert_compile(select([other_table,
- anon]).
- select_from(
- other_table.outerjoin(anon)).apply_labels(),
- 'SELECT other_thirty_characters_table_.id '
- 'AS other_thirty_characters__1, '
- 'other_thirty_characters_table_.thirty_char'
- 'acters_table_id AS other_thirty_characters'
- '__2, thirty_characters_table__1.id AS '
- 'thirty_characters_table__3 FROM '
- 'other_thirty_characters_table_ LEFT OUTER '
- 'JOIN thirty_characters_table_xxxxxx AS '
- 'thirty_characters_table__1 ON '
- 'thirty_characters_table__1.id = '
- 'other_thirty_characters_table_.thirty_char'
- 'acters_table_id', dialect=dialect)
- self.assert_compile(select([other_table,
- anon]).select_from(
- other_table.outerjoin(anon)).apply_labels(),
- 'SELECT other_thirty_characters_table_.id '
- 'AS other_thirty_characters__1, '
- 'other_thirty_characters_table_.thirty_char'
- 'acters_table_id AS other_thirty_characters'
- '__2, thirty_characters_table__1.id AS '
- 'thirty_characters_table__3 FROM '
- 'other_thirty_characters_table_ LEFT OUTER '
- 'JOIN thirty_characters_table_xxxxxx '
- 'thirty_characters_table__1 ON '
- 'thirty_characters_table__1.id = '
- 'other_thirty_characters_table_.thirty_char'
- 'acters_table_id', dialect=ora_dialect)
-
- def test_outer_join(self):
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String),
- column('description', String))
-
- table2 = table(
- 'myothertable',
- column('otherid', Integer),
- column('othername', String),
- )
-
- table3 = table(
- 'thirdtable',
- column('userid', Integer),
- column('otherstuff', String),
- )
-
- query = select([table1, table2],
- or_(table1.c.name == 'fred',
- table1.c.myid == 10, table2.c.othername != 'jack',
- text('EXISTS (select yay from foo where boo = lar)')
- ),
- from_obj=[outerjoin(table1,
- table2,
- table1.c.myid == table2.c.otherid)])
- self.assert_compile(query,
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, myothertable.otherid,'
- ' myothertable.othername FROM mytable, '
- 'myothertable WHERE (mytable.name = '
- ':name_1 OR mytable.myid = :myid_1 OR '
- 'myothertable.othername != :othername_1 OR '
- 'EXISTS (select yay from foo where boo = '
- 'lar)) AND mytable.myid = '
- 'myothertable.otherid(+)',
- dialect=oracle.OracleDialect(use_ansi=False))
- query = table1.outerjoin(table2,
- table1.c.myid == table2.c.otherid) \
- .outerjoin(table3, table3.c.userid == table2.c.otherid)
- self.assert_compile(query.select(),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, myothertable.otherid,'
- ' myothertable.othername, '
- 'thirdtable.userid, thirdtable.otherstuff '
- 'FROM mytable LEFT OUTER JOIN myothertable '
- 'ON mytable.myid = myothertable.otherid '
- 'LEFT OUTER JOIN thirdtable ON '
- 'thirdtable.userid = myothertable.otherid')
-
- self.assert_compile(query.select(),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, myothertable.otherid,'
- ' myothertable.othername, '
- 'thirdtable.userid, thirdtable.otherstuff '
- 'FROM mytable, myothertable, thirdtable '
- 'WHERE thirdtable.userid(+) = '
- 'myothertable.otherid AND mytable.myid = '
- 'myothertable.otherid(+)',
- dialect=oracle.dialect(use_ansi=False))
- query = table1.join(table2,
- table1.c.myid == table2.c.otherid) \
- .join(table3, table3.c.userid == table2.c.otherid)
- self.assert_compile(query.select(),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, myothertable.otherid,'
- ' myothertable.othername, '
- 'thirdtable.userid, thirdtable.otherstuff '
- 'FROM mytable, myothertable, thirdtable '
- 'WHERE thirdtable.userid = '
- 'myothertable.otherid AND mytable.myid = '
- 'myothertable.otherid',
- dialect=oracle.dialect(use_ansi=False))
- query = table1.join(table2,
- table1.c.myid == table2.c.otherid) \
- .outerjoin(table3, table3.c.userid == table2.c.otherid)
- self.assert_compile(query.select().order_by(table1.c.name).
- limit(10).offset(5),
- 'SELECT myid, name, description, otherid, '
- 'othername, userid, otherstuff FROM '
- '(SELECT myid, name, description, otherid, '
- 'othername, userid, otherstuff, ROWNUM AS '
- 'ora_rn FROM (SELECT mytable.myid AS myid, '
- 'mytable.name AS name, mytable.description '
- 'AS description, myothertable.otherid AS '
- 'otherid, myothertable.othername AS '
- 'othername, thirdtable.userid AS userid, '
- 'thirdtable.otherstuff AS otherstuff FROM '
- 'mytable, myothertable, thirdtable WHERE '
- 'thirdtable.userid(+) = '
- 'myothertable.otherid AND mytable.myid = '
- 'myothertable.otherid ORDER BY mytable.name) '
- 'WHERE ROWNUM <= :param_1 + :param_2) '
- 'WHERE ora_rn > :param_2',
- checkparams={'param_1': 10, 'param_2': 5},
- dialect=oracle.dialect(use_ansi=False))
-
- subq = select([table1]).select_from(
- table1.outerjoin(table2, table1.c.myid == table2.c.otherid)) \
- .alias()
- q = select([table3]).select_from(
- table3.outerjoin(subq, table3.c.userid == subq.c.myid))
-
- self.assert_compile(q,
- 'SELECT thirdtable.userid, '
- 'thirdtable.otherstuff FROM thirdtable '
- 'LEFT OUTER JOIN (SELECT mytable.myid AS '
- 'myid, mytable.name AS name, '
- 'mytable.description AS description FROM '
- 'mytable LEFT OUTER JOIN myothertable ON '
- 'mytable.myid = myothertable.otherid) '
- 'anon_1 ON thirdtable.userid = anon_1.myid',
- dialect=oracle.dialect(use_ansi=True))
-
- self.assert_compile(q,
- 'SELECT thirdtable.userid, '
- 'thirdtable.otherstuff FROM thirdtable, '
- '(SELECT mytable.myid AS myid, '
- 'mytable.name AS name, mytable.description '
- 'AS description FROM mytable, myothertable '
- 'WHERE mytable.myid = myothertable.otherid('
- '+)) anon_1 WHERE thirdtable.userid = '
- 'anon_1.myid(+)',
- dialect=oracle.dialect(use_ansi=False))
-
- q = select([table1.c.name]).where(table1.c.name == 'foo')
- self.assert_compile(q,
- 'SELECT mytable.name FROM mytable WHERE '
- 'mytable.name = :name_1',
- dialect=oracle.dialect(use_ansi=False))
- subq = select([table3.c.otherstuff]) \
- .where(table3.c.otherstuff == table1.c.name).label('bar')
- q = select([table1.c.name, subq])
- self.assert_compile(q,
- 'SELECT mytable.name, (SELECT '
- 'thirdtable.otherstuff FROM thirdtable '
- 'WHERE thirdtable.otherstuff = '
- 'mytable.name) AS bar FROM mytable',
- dialect=oracle.dialect(use_ansi=False))
-
- def test_nonansi_nested_right_join(self):
- a = table('a', column('a'))
- b = table('b', column('b'))
- c = table('c', column('c'))
-
- j = a.join(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
-
- self.assert_compile(
- select([j]),
- "SELECT a.a, b.b, c.c FROM a, b, c "
- "WHERE a.a = b.b AND b.b = c.c",
- dialect=oracle.OracleDialect(use_ansi=False)
- )
-
- j = a.outerjoin(b.join(c, b.c.b == c.c.c), a.c.a == b.c.b)
-
- self.assert_compile(
- select([j]),
- "SELECT a.a, b.b, c.c FROM a, b, c "
- "WHERE a.a = b.b(+) AND b.b = c.c",
- dialect=oracle.OracleDialect(use_ansi=False)
- )
-
- j = a.join(b.outerjoin(c, b.c.b == c.c.c), a.c.a == b.c.b)
-
- self.assert_compile(
- select([j]),
- "SELECT a.a, b.b, c.c FROM a, b, c "
- "WHERE a.a = b.b AND b.b = c.c(+)",
- dialect=oracle.OracleDialect(use_ansi=False)
- )
-
- def test_alias_outer_join(self):
- address_types = table('address_types', column('id'),
- column('name'))
- addresses = table('addresses', column('id'), column('user_id'),
- column('address_type_id'),
- column('email_address'))
- at_alias = address_types.alias()
- s = select([at_alias, addresses]) \
- .select_from(
- addresses.outerjoin(
- at_alias,
- addresses.c.address_type_id == at_alias.c.id)) \
- .where(addresses.c.user_id == 7) \
- .order_by(addresses.c.id, address_types.c.id)
- self.assert_compile(s,
- 'SELECT address_types_1.id, '
- 'address_types_1.name, addresses.id, '
- 'addresses.user_id, addresses.address_type_'
- 'id, addresses.email_address FROM '
- 'addresses LEFT OUTER JOIN address_types '
- 'address_types_1 ON addresses.address_type_'
- 'id = address_types_1.id WHERE '
- 'addresses.user_id = :user_id_1 ORDER BY '
- 'addresses.id, address_types.id')
-
- def test_returning_insert(self):
- t1 = table('t1', column('c1'), column('c2'), column('c3'))
- self.assert_compile(
- t1.insert().values(c1=1).returning(t1.c.c2, t1.c.c3),
- "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
- "t1.c2, t1.c3 INTO :ret_0, :ret_1")
-
- def test_returning_insert_functional(self):
- t1 = table('t1',
- column('c1'),
- column('c2', String()),
- column('c3', String()))
- fn = func.lower(t1.c.c2, type_=String())
- stmt = t1.insert().values(c1=1).returning(fn, t1.c.c3)
- compiled = stmt.compile(dialect=oracle.dialect())
- eq_(compiled._create_result_map(),
- {'ret_1': ('ret_1', (t1.c.c3, 'c3', 'c3'), t1.c.c3.type),
- 'ret_0': ('ret_0', (fn, 'lower', None), fn.type)})
- self.assert_compile(
- stmt,
- "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
- "lower(t1.c2), t1.c3 INTO :ret_0, :ret_1")
-
- def test_returning_insert_labeled(self):
- t1 = table('t1', column('c1'), column('c2'), column('c3'))
- self.assert_compile(
- t1.insert().values(c1=1).returning(
- t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
- "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
- "t1.c2, t1.c3 INTO :ret_0, :ret_1")
-
- def test_compound(self):
- t1 = table('t1', column('c1'), column('c2'), column('c3'))
- t2 = table('t2', column('c1'), column('c2'), column('c3'))
- self.assert_compile(union(t1.select(), t2.select()),
- 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 UNION '
- 'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
- self.assert_compile(except_(t1.select(), t2.select()),
- 'SELECT t1.c1, t1.c2, t1.c3 FROM t1 MINUS '
- 'SELECT t2.c1, t2.c2, t2.c3 FROM t2')
-
- def test_no_paren_fns(self):
- for fn, expected in [
- (func.uid(), "uid"),
- (func.UID(), "UID"),
- (func.sysdate(), "sysdate"),
- (func.row_number(), "row_number()"),
- (func.rank(), "rank()"),
- (func.now(), "CURRENT_TIMESTAMP"),
- (func.current_timestamp(), "CURRENT_TIMESTAMP"),
- (func.user(), "USER"),
- ]:
- self.assert_compile(fn, expected)
-
- def test_create_index_alt_schema(self):
- m = MetaData()
- t1 = Table('foo', m,
- Column('x', Integer),
- schema="alt_schema")
- self.assert_compile(
- schema.CreateIndex(Index("bar", t1.c.x)),
- "CREATE INDEX alt_schema.bar ON alt_schema.foo (x)"
- )
-
- def test_create_index_expr(self):
- m = MetaData()
- t1 = Table('foo', m,
- Column('x', Integer))
- self.assert_compile(
- schema.CreateIndex(Index("bar", t1.c.x > 5)),
- "CREATE INDEX bar ON foo (x > 5)"
- )
-
- def test_table_options(self):
- m = MetaData()
-
- t = Table(
- 'foo', m,
- Column('x', Integer),
- prefixes=["GLOBAL TEMPORARY"],
- oracle_on_commit="PRESERVE ROWS"
- )
-
- self.assert_compile(
- schema.CreateTable(t),
- "CREATE GLOBAL TEMPORARY TABLE "
- "foo (x INTEGER) ON COMMIT PRESERVE ROWS"
- )
-
- def test_create_table_compress(self):
- m = MetaData()
- tbl1 = Table('testtbl1', m, Column('data', Integer),
- oracle_compress=True)
- tbl2 = Table('testtbl2', m, Column('data', Integer),
- oracle_compress="OLTP")
-
- self.assert_compile(schema.CreateTable(tbl1),
- "CREATE TABLE testtbl1 (data INTEGER) COMPRESS")
- self.assert_compile(schema.CreateTable(tbl2),
- "CREATE TABLE testtbl2 (data INTEGER) "
- "COMPRESS FOR OLTP")
-
- def test_create_index_bitmap_compress(self):
- m = MetaData()
- tbl = Table('testtbl', m, Column('data', Integer))
- idx1 = Index('idx1', tbl.c.data, oracle_compress=True)
- idx2 = Index('idx2', tbl.c.data, oracle_compress=1)
- idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True)
-
- self.assert_compile(schema.CreateIndex(idx1),
- "CREATE INDEX idx1 ON testtbl (data) COMPRESS")
- self.assert_compile(schema.CreateIndex(idx2),
- "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1")
- self.assert_compile(schema.CreateIndex(idx3),
- "CREATE BITMAP INDEX idx3 ON testtbl (data)")
-
-
-class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
-
- def _dialect(self, server_version, **kw):
- def server_version_info(conn):
- return server_version
-
- dialect = oracle.dialect(
- dbapi=Mock(version="0.0.0", paramstyle="named"),
- **kw)
- dialect._get_server_version_info = server_version_info
- dialect._check_unicode_returns = Mock()
- dialect._check_unicode_description = Mock()
- dialect._get_default_schema_name = Mock()
- return dialect
-
- def test_ora8_flags(self):
- dialect = self._dialect((8, 2, 5))
-
- # before connect, assume modern DB
- assert dialect._supports_char_length
- assert dialect._supports_nchar
- assert dialect.use_ansi
-
- dialect.initialize(Mock())
- assert not dialect.implicit_returning
- assert not dialect._supports_char_length
- assert not dialect._supports_nchar
- assert not dialect.use_ansi
- self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect)
- self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect)
- self.assert_compile(UnicodeText(), "CLOB", dialect=dialect)
-
- dialect = self._dialect((8, 2, 5), implicit_returning=True)
- dialect.initialize(testing.db.connect())
- assert dialect.implicit_returning
-
- def test_default_flags(self):
- """test with no initialization or server version info"""
-
- dialect = self._dialect(None)
-
- assert dialect._supports_char_length
- assert dialect._supports_nchar
- assert dialect.use_ansi
- self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
- self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
- self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
-
- def test_ora10_flags(self):
- dialect = self._dialect((10, 2, 5))
-
- dialect.initialize(Mock())
- assert dialect._supports_char_length
- assert dialect._supports_nchar
- assert dialect.use_ansi
- self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
- self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
- self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
-
-
-class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- # currently assuming full DBA privs for the user.
- # don't really know how else to go here unless
- # we connect as the other user.
-
- for stmt in ("""
-create table %(test_schema)s.parent(
- id integer primary key,
- data varchar2(50)
-);
-
-create table %(test_schema)s.child(
- id integer primary key,
- data varchar2(50),
- parent_id integer references %(test_schema)s.parent(id)
-);
-
-create table local_table(
- id integer primary key,
- data varchar2(50)
-);
-
-create synonym %(test_schema)s.ptable for %(test_schema)s.parent;
-create synonym %(test_schema)s.ctable for %(test_schema)s.child;
-
-create synonym %(test_schema)s_pt for %(test_schema)s.parent;
-
-create synonym %(test_schema)s.local_table for local_table;
-
--- can't make a ref from local schema to the
--- remote schema's table without this,
--- *and* cant give yourself a grant !
--- so we give it to public. ideas welcome.
-grant references on %(test_schema)s.parent to public;
-grant references on %(test_schema)s.child to public;
-""" % {"test_schema": testing.config.test_schema}).split(";"):
- if stmt.strip():
- testing.db.execute(stmt)
-
- @classmethod
- def teardown_class(cls):
- for stmt in ("""
-drop table %(test_schema)s.child;
-drop table %(test_schema)s.parent;
-drop table local_table;
-drop synonym %(test_schema)s.ctable;
-drop synonym %(test_schema)s.ptable;
-drop synonym %(test_schema)s_pt;
-drop synonym %(test_schema)s.local_table;
-
-""" % {"test_schema": testing.config.test_schema}).split(";"):
- if stmt.strip():
- testing.db.execute(stmt)
-
- @testing.provide_metadata
- def test_create_same_names_explicit_schema(self):
- schema = testing.db.dialect.default_schema_name
- meta = self.metadata
- parent = Table('parent', meta,
- Column('pid', Integer, primary_key=True),
- schema=schema)
- child = Table('child', meta,
- Column('cid', Integer, primary_key=True),
- Column('pid',
- Integer,
- ForeignKey('%s.parent.pid' % schema)),
- schema=schema)
- meta.create_all()
- parent.insert().execute({'pid': 1})
- child.insert().execute({'cid': 1, 'pid': 1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
-
- def test_reflect_alt_table_owner_local_synonym(self):
- meta = MetaData(testing.db)
- parent = Table('%s_pt' % testing.config.test_schema,
- meta,
- autoload=True,
- oracle_resolve_synonyms=True)
- self.assert_compile(parent.select(),
- "SELECT %(test_schema)s_pt.id, "
- "%(test_schema)s_pt.data FROM %(test_schema)s_pt"
- % {"test_schema": testing.config.test_schema})
- select([parent]).execute().fetchall()
-
- def test_reflect_alt_synonym_owner_local_table(self):
- meta = MetaData(testing.db)
- parent = Table(
- 'local_table', meta, autoload=True,
- oracle_resolve_synonyms=True, schema=testing.config.test_schema)
- self.assert_compile(
- parent.select(),
- "SELECT %(test_schema)s.local_table.id, "
- "%(test_schema)s.local_table.data "
- "FROM %(test_schema)s.local_table" %
- {"test_schema": testing.config.test_schema}
- )
- select([parent]).execute().fetchall()
-
- @testing.provide_metadata
- def test_create_same_names_implicit_schema(self):
- meta = self.metadata
- parent = Table('parent',
- meta,
- Column('pid', Integer, primary_key=True))
- child = Table('child', meta,
- Column('cid', Integer, primary_key=True),
- Column('pid', Integer, ForeignKey('parent.pid')))
- meta.create_all()
- parent.insert().execute({'pid': 1})
- child.insert().execute({'cid': 1, 'pid': 1})
- eq_(child.select().execute().fetchall(), [(1, 1)])
-
- def test_reflect_alt_owner_explicit(self):
- meta = MetaData(testing.db)
- parent = Table(
- 'parent', meta, autoload=True,
- schema=testing.config.test_schema)
- child = Table(
- 'child', meta, autoload=True,
- schema=testing.config.test_schema)
-
- self.assert_compile(
- parent.join(child),
- "%(test_schema)s.parent JOIN %(test_schema)s.child ON "
- "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % {
- "test_schema": testing.config.test_schema
- })
- select([parent, child]).\
- select_from(parent.join(child)).\
- execute().fetchall()
-
- def test_reflect_local_to_remote(self):
- testing.db.execute(
- 'CREATE TABLE localtable (id INTEGER '
- 'PRIMARY KEY, parent_id INTEGER REFERENCES '
- '%(test_schema)s.parent(id))' % {
- "test_schema": testing.config.test_schema})
- try:
- meta = MetaData(testing.db)
- lcl = Table('localtable', meta, autoload=True)
- parent = meta.tables['%s.parent' % testing.config.test_schema]
- self.assert_compile(parent.join(lcl),
- '%(test_schema)s.parent JOIN localtable ON '
- '%(test_schema)s.parent.id = '
- 'localtable.parent_id' % {
- "test_schema": testing.config.test_schema}
- )
- select([parent,
- lcl]).select_from(parent.join(lcl)).execute().fetchall()
- finally:
- testing.db.execute('DROP TABLE localtable')
-
- def test_reflect_alt_owner_implicit(self):
- meta = MetaData(testing.db)
- parent = Table(
- 'parent', meta, autoload=True,
- schema=testing.config.test_schema)
- child = Table(
- 'child', meta, autoload=True,
- schema=testing.config.test_schema)
- self.assert_compile(
- parent.join(child),
- '%(test_schema)s.parent JOIN %(test_schema)s.child '
- 'ON %(test_schema)s.parent.id = '
- '%(test_schema)s.child.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- child]).select_from(parent.join(child)).execute().fetchall()
-
- def test_reflect_alt_owner_synonyms(self):
- testing.db.execute('CREATE TABLE localtable (id INTEGER '
- 'PRIMARY KEY, parent_id INTEGER REFERENCES '
- '%s.ptable(id))' % testing.config.test_schema)
- try:
- meta = MetaData(testing.db)
- lcl = Table('localtable', meta, autoload=True,
- oracle_resolve_synonyms=True)
- parent = meta.tables['%s.ptable' % testing.config.test_schema]
- self.assert_compile(
- parent.join(lcl),
- '%(test_schema)s.ptable JOIN localtable ON '
- '%(test_schema)s.ptable.id = '
- 'localtable.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- lcl]).select_from(parent.join(lcl)).execute().fetchall()
- finally:
- testing.db.execute('DROP TABLE localtable')
-
- def test_reflect_remote_synonyms(self):
- meta = MetaData(testing.db)
- parent = Table('ptable', meta, autoload=True,
- schema=testing.config.test_schema,
- oracle_resolve_synonyms=True)
- child = Table('ctable', meta, autoload=True,
- schema=testing.config.test_schema,
- oracle_resolve_synonyms=True)
- self.assert_compile(
- parent.join(child),
- '%(test_schema)s.ptable JOIN '
- '%(test_schema)s.ctable '
- 'ON %(test_schema)s.ptable.id = '
- '%(test_schema)s.ctable.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- child]).select_from(parent.join(child)).execute().fetchall()
-
-
-class ConstraintTest(fixtures.TablesTest):
-
- __only_on__ = 'oracle'
- __backend__ = True
- run_deletes = None
-
- @classmethod
- def define_tables(cls, metadata):
- Table('foo', metadata, Column('id', Integer, primary_key=True))
-
- def test_oracle_has_no_on_update_cascade(self):
- bar = Table('bar', self.metadata,
- Column('id', Integer, primary_key=True),
- Column('foo_id',
- Integer,
- ForeignKey('foo.id', onupdate='CASCADE')))
- assert_raises(exc.SAWarning, bar.create)
-
- bat = Table('bat', self.metadata,
- Column('id', Integer, primary_key=True),
- Column('foo_id', Integer),
- ForeignKeyConstraint(['foo_id'], ['foo.id'],
- onupdate='CASCADE'))
- assert_raises(exc.SAWarning, bat.create)
-
- def test_reflect_check_include_all(self):
- insp = inspect(testing.db)
- eq_(insp.get_check_constraints('foo'), [])
- eq_(
- [rec['sqltext']
- for rec in insp.get_check_constraints('foo', include_all=True)],
- ['"ID" IS NOT NULL'])
-
-
-class TwoPhaseTest(fixtures.TablesTest):
- """test cx_oracle two phase, which remains in a semi-broken state
- so requires a carefully written test."""
-
- __only_on__ = 'oracle+cx_oracle'
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table('datatable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
-
- def _connection(self):
- conn = testing.db.connect()
- conn.detach()
- return conn
-
- def _assert_data(self, rows):
- eq_(
- testing.db.scalar("select count(*) from datatable"),
- rows
- )
-
- def test_twophase_prepare_false(self):
- conn = self._connection()
- for i in range(2):
- trans = conn.begin_twophase()
- conn.execute("select 1 from dual")
- trans.prepare()
- trans.commit()
- conn.close()
- self._assert_data(0)
-
- def test_twophase_prepare_true(self):
- conn = self._connection()
- for i in range(2):
- trans = conn.begin_twophase()
- conn.execute("insert into datatable (id, data) "
- "values (%s, 'somedata')" % i)
- trans.prepare()
- trans.commit()
- conn.close()
- self._assert_data(2)
-
- def test_twophase_rollback(self):
- conn = self._connection()
- trans = conn.begin_twophase()
- conn.execute("insert into datatable (id, data) "
- "values (%s, 'somedata')" % 1)
- trans.rollback()
-
- trans = conn.begin_twophase()
- conn.execute("insert into datatable (id, data) "
- "values (%s, 'somedata')" % 1)
- trans.prepare()
- trans.commit()
-
- conn.close()
- self._assert_data(1)
-
- def test_not_prepared(self):
- conn = self._connection()
- trans = conn.begin_twophase()
- conn.execute("insert into datatable (id, data) "
- "values (%s, 'somedata')" % 1)
- trans.commit()
- conn.close()
- self._assert_data(1)
-
-
-class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = oracle.OracleDialect()
-
- def test_no_clobs_for_string_params(self):
- """test that simple string params get a DBAPI type of
- VARCHAR, not CLOB. This is to prevent setinputsizes
- from setting up cx_oracle.CLOBs on
- string-based bind params [ticket:793]."""
-
- class FakeDBAPI(object):
- def __getattr__(self, attr):
- return attr
-
- dialect = oracle.OracleDialect()
- dbapi = FakeDBAPI()
-
- b = bindparam("foo", "hello world!")
- eq_(
- b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
- 'STRING'
- )
-
- b = bindparam("foo", "hello world!")
- eq_(
- b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
- 'STRING'
- )
-
- def test_long(self):
- self.assert_compile(oracle.LONG(), "LONG")
-
- def test_type_adapt(self):
- dialect = cx_oracle.dialect()
-
- for start, test in [
- (Date(), cx_oracle._OracleDate),
- (oracle.OracleRaw(), cx_oracle._OracleRaw),
- (String(), String),
- (VARCHAR(), cx_oracle._OracleString),
- (DATE(), cx_oracle._OracleDate),
- (oracle.DATE(), oracle.DATE),
- (String(50), cx_oracle._OracleString),
- (Unicode(), cx_oracle._OracleNVarChar),
- (Text(), cx_oracle._OracleText),
- (UnicodeText(), cx_oracle._OracleUnicodeText),
- (NCHAR(), cx_oracle._OracleNVarChar),
- (oracle.RAW(50), cx_oracle._OracleRaw),
- ]:
- assert isinstance(start.dialect_impl(dialect), test), \
- "wanted %r got %r" % (test, start.dialect_impl(dialect))
-
- def test_raw_compile(self):
- self.assert_compile(oracle.RAW(), "RAW")
- self.assert_compile(oracle.RAW(35), "RAW(35)")
-
- def test_char_length(self):
- self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)")
-
- oracle8dialect = oracle.dialect()
- oracle8dialect.server_version_info = (8, 0)
- self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect)
-
- self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)")
- self.assert_compile(CHAR(50), "CHAR(50)")
-
- def test_varchar_types(self):
- dialect = oracle.dialect()
- for typ, exp in [
- (String(50), "VARCHAR2(50 CHAR)"),
- (Unicode(50), "NVARCHAR2(50)"),
- (NVARCHAR(50), "NVARCHAR2(50)"),
- (VARCHAR(50), "VARCHAR(50 CHAR)"),
- (oracle.NVARCHAR2(50), "NVARCHAR2(50)"),
- (oracle.VARCHAR2(50), "VARCHAR2(50 CHAR)"),
- (String(), "VARCHAR2"),
- (Unicode(), "NVARCHAR2"),
- (NVARCHAR(), "NVARCHAR2"),
- (VARCHAR(), "VARCHAR"),
- (oracle.NVARCHAR2(), "NVARCHAR2"),
- (oracle.VARCHAR2(), "VARCHAR2"),
- ]:
- self.assert_compile(typ, exp, dialect=dialect)
-
- def test_interval(self):
- for type_, expected in [(oracle.INTERVAL(),
- 'INTERVAL DAY TO SECOND'),
- (oracle.INTERVAL(day_precision=3),
- 'INTERVAL DAY(3) TO SECOND'),
- (oracle.INTERVAL(second_precision=5),
- 'INTERVAL DAY TO SECOND(5)'),
- (oracle.INTERVAL(day_precision=2,
- second_precision=5),
- 'INTERVAL DAY(2) TO SECOND(5)')]:
- self.assert_compile(type_, expected)
-
-
-class TypesTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __dialect__ = oracle.OracleDialect()
- __backend__ = True
-
- @testing.fails_on('+zxjdbc', 'zxjdbc lacks the FIXED_CHAR dbapi type')
- def test_fixed_char(self):
- m = MetaData(testing.db)
- t = Table('t1', m,
- Column('id', Integer, primary_key=True),
- Column('data', CHAR(30), nullable=False))
-
- t.create()
- try:
- t.insert().execute(
- dict(id=1, data="value 1"),
- dict(id=2, data="value 2"),
- dict(id=3, data="value 3")
- )
-
- eq_(
- t.select().where(t.c.data == 'value 2').execute().fetchall(),
- [(2, 'value 2 ')]
- )
-
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- assert type(t2.c.data.type) is CHAR
- eq_(
- t2.select().where(t2.c.data == 'value 2').execute().fetchall(),
- [(2, 'value 2 ')]
- )
-
- finally:
- t.drop()
-
- @testing.requires.returning
- @testing.provide_metadata
- def test_int_not_float(self):
- m = self.metadata
- t1 = Table('t1', m, Column('foo', Integer))
- t1.create()
- r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
- x = r.scalar()
- assert x == 5
- assert isinstance(x, int)
-
- x = t1.select().scalar()
- assert x == 5
- assert isinstance(x, int)
-
- @testing.provide_metadata
- def test_rowid(self):
- metadata = self.metadata
- t = Table('t1', metadata, Column('x', Integer))
- t.create()
- t.insert().execute(x=5)
- s1 = select([t])
- s2 = select([column('rowid')]).select_from(s1)
- rowid = s2.scalar()
-
- # the ROWID type is not really needed here,
- # as cx_oracle just treats it as a string,
- # but we want to make sure the ROWID works...
- rowid_col = column('rowid', oracle.ROWID)
- s3 = select([t.c.x, rowid_col]) \
- .where(rowid_col == cast(rowid, oracle.ROWID))
- eq_(s3.select().execute().fetchall(), [(5, rowid)])
-
- @testing.fails_on('+zxjdbc',
- 'Not yet known how to pass values of the '
- 'INTERVAL type')
- @testing.provide_metadata
- def test_interval(self):
- metadata = self.metadata
- interval_table = Table('intervaltable', metadata, Column('id',
- Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('day_interval',
- oracle.INTERVAL(day_precision=3)))
- metadata.create_all()
- interval_table.insert().\
- execute(day_interval=datetime.timedelta(days=35, seconds=5743))
- row = interval_table.select().execute().first()
- eq_(row['day_interval'], datetime.timedelta(days=35,
- seconds=5743))
-
- @testing.provide_metadata
- def test_numerics(self):
- m = self.metadata
- t1 = Table('t1', m,
- Column('intcol', Integer),
- Column('numericcol', Numeric(precision=9, scale=2)),
- Column('floatcol1', Float()),
- Column('floatcol2', FLOAT()),
- Column('doubleprec', oracle.DOUBLE_PRECISION),
- Column('numbercol1', oracle.NUMBER(9)),
- Column('numbercol2', oracle.NUMBER(9, 3)),
- Column('numbercol3', oracle.NUMBER))
- t1.create()
- t1.insert().execute(
- intcol=1,
- numericcol=5.2,
- floatcol1=6.5,
- floatcol2=8.5,
- doubleprec=9.5,
- numbercol1=12,
- numbercol2=14.85,
- numbercol3=15.76
- )
-
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
-
- for row in (
- t1.select().execute().first(),
- t2.select().execute().first()
- ):
- for i, (val, type_) in enumerate((
- (1, int),
- (decimal.Decimal("5.2"), decimal.Decimal),
- (6.5, float),
- (8.5, float),
- (9.5, float),
- (12, int),
- (decimal.Decimal("14.85"), decimal.Decimal),
- (15.76, float),
- )):
- eq_(row[i], val)
- assert isinstance(row[i], type_), '%r is not %r' \
- % (row[i], type_)
-
- def test_numeric_no_decimal_mode(self):
- engine = testing_engine(options=dict(coerce_to_decimal=False))
- value = engine.scalar("SELECT 5.66 FROM DUAL")
- assert isinstance(value, float)
-
- value = testing.db.scalar("SELECT 5.66 FROM DUAL")
- assert isinstance(value, decimal.Decimal)
-
- @testing.only_on("oracle+cx_oracle", "cx_oracle-specific feature")
- @testing.fails_if(
- testing.requires.python3,
- "cx_oracle always returns unicode on py3k")
- def test_coerce_to_unicode(self):
- engine = testing_engine(options=dict(coerce_to_unicode=True))
- value = engine.scalar("SELECT 'hello' FROM DUAL")
- assert isinstance(value, util.text_type)
-
- value = testing.db.scalar("SELECT 'hello' FROM DUAL")
- assert isinstance(value, util.binary_type)
-
- @testing.provide_metadata
- def test_numerics_broken_inspection(self):
- """Numeric scenarios where Oracle type info is 'broken',
- returning us precision, scale of the form (0, 0) or (0, -127).
- We convert to Decimal and let int()/float() processors take over.
-
- """
-
- metadata = self.metadata
-
- # this test requires cx_oracle 5
-
- foo = Table('foo', metadata,
- Column('idata', Integer),
- Column('ndata', Numeric(20, 2)),
- Column('ndata2', Numeric(20, 2)),
- Column('nidata', Numeric(5, 0)),
- Column('fdata', Float()))
- foo.create()
-
- foo.insert().execute({
- 'idata': 5,
- 'ndata': decimal.Decimal("45.6"),
- 'ndata2': decimal.Decimal("45.0"),
- 'nidata': decimal.Decimal('53'),
- 'fdata': 45.68392
- })
-
- stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
-
- row = testing.db.execute(stmt).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, decimal.Decimal, decimal.Decimal, int, float]
- )
- eq_(
- row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
- 53, 45.683920000000001)
- )
-
- # with a nested subquery,
- # both Numeric values that don't have decimal places, regardless
- # of their originating type, come back as ints with no useful
- # typing information beyond "numeric". So native handler
- # must convert to int.
- # this means our Decimal converters need to run no matter what.
- # totally sucks.
-
- stmt = """
- SELECT
- (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata,
- (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2)) FROM DUAL)
- AS ndata,
- (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2)) FROM DUAL)
- AS ndata2,
- (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0)) FROM DUAL)
- AS nidata,
- (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata
- FROM dual
- """
- row = testing.db.execute(stmt).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, decimal.Decimal, int, int, decimal.Decimal]
- )
- eq_(
- row,
- (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
- )
-
- row = testing.db.execute(text(stmt,
- typemap={
- 'idata': Integer(),
- 'ndata': Numeric(20, 2),
- 'ndata2': Numeric(20, 2),
- 'nidata': Numeric(5, 0),
- 'fdata': Float()})).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
- )
- eq_(
- row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
- decimal.Decimal('53'), 45.683920000000001)
- )
-
- stmt = """
- SELECT
- anon_1.idata AS anon_1_idata,
- anon_1.ndata AS anon_1_ndata,
- anon_1.ndata2 AS anon_1_ndata2,
- anon_1.nidata AS anon_1_nidata,
- anon_1.fdata AS anon_1_fdata
- FROM (SELECT idata, ndata, ndata2, nidata, fdata
- FROM (
- SELECT
- (SELECT (SELECT idata FROM foo) FROM DUAL) AS idata,
- (SELECT CAST((SELECT ndata FROM foo) AS NUMERIC(20, 2))
- FROM DUAL) AS ndata,
- (SELECT CAST((SELECT ndata2 FROM foo) AS NUMERIC(20, 2))
- FROM DUAL) AS ndata2,
- (SELECT CAST((SELECT nidata FROM foo) AS NUMERIC(5, 0))
- FROM DUAL) AS nidata,
- (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL)
- AS fdata
- FROM dual
- )
- WHERE ROWNUM >= 0) anon_1
- """
- row = testing.db.execute(stmt).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, decimal.Decimal, int, int, decimal.Decimal]
- )
- eq_(
- row,
- (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
- )
-
- row = testing.db.execute(text(stmt,
- typemap={
- 'anon_1_idata': Integer(),
- 'anon_1_ndata': Numeric(20, 2),
- 'anon_1_ndata2': Numeric(20, 2),
- 'anon_1_nidata': Numeric(5, 0),
- 'anon_1_fdata': Float()
- })).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
- )
- eq_(
- row,
- (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
- decimal.Decimal('53'), 45.683920000000001)
- )
-
- row = testing.db.execute(text(
- stmt,
- typemap={
- 'anon_1_idata': Integer(),
- 'anon_1_ndata': Numeric(20, 2, asdecimal=False),
- 'anon_1_ndata2': Numeric(20, 2, asdecimal=False),
- 'anon_1_nidata': Numeric(5, 0, asdecimal=False),
- 'anon_1_fdata': Float(asdecimal=True)
- })).fetchall()[0]
- eq_(
- [type(x) for x in row],
- [int, float, float, float, decimal.Decimal]
- )
- eq_(
- row,
- (5, 45.6, 45, 53, decimal.Decimal('45.68392'))
- )
-
- @testing.provide_metadata
- def test_reflect_dates(self):
- metadata = self.metadata
- Table(
- "date_types", metadata,
- Column('d1', sqltypes.DATE),
- Column('d2', oracle.DATE),
- Column('d3', TIMESTAMP),
- Column('d4', TIMESTAMP(timezone=True)),
- Column('d5', oracle.INTERVAL(second_precision=5)),
- )
- metadata.create_all()
- m = MetaData(testing.db)
- t1 = Table(
- "date_types", m,
- autoload=True)
- assert isinstance(t1.c.d1.type, oracle.DATE)
- assert isinstance(t1.c.d1.type, DateTime)
- assert isinstance(t1.c.d2.type, oracle.DATE)
- assert isinstance(t1.c.d2.type, DateTime)
- assert isinstance(t1.c.d3.type, TIMESTAMP)
- assert not t1.c.d3.type.timezone
- assert isinstance(t1.c.d4.type, TIMESTAMP)
- assert t1.c.d4.type.timezone
- assert isinstance(t1.c.d5.type, oracle.INTERVAL)
-
- def test_reflect_all_types_schema(self):
- types_table = Table('all_types', MetaData(testing.db),
- Column('owner', String(30), primary_key=True),
- Column('type_name', String(30), primary_key=True),
- autoload=True, oracle_resolve_synonyms=True)
- for row in types_table.select().execute().fetchall():
- [row[k] for k in row.keys()]
-
- @testing.provide_metadata
- def test_raw_roundtrip(self):
- metadata = self.metadata
- raw_table = Table('raw', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', oracle.RAW(35)))
- metadata.create_all()
- testing.db.execute(raw_table.insert(), id=1, data=b("ABCDEF"))
- eq_(
- testing.db.execute(raw_table.select()).first(),
- (1, b("ABCDEF"))
- )
-
- @testing.provide_metadata
- def test_reflect_nvarchar(self):
- metadata = self.metadata
- Table('tnv', metadata, Column('data', sqltypes.NVARCHAR(255)))
- metadata.create_all()
- m2 = MetaData(testing.db)
- t2 = Table('tnv', m2, autoload=True)
- assert isinstance(t2.c.data.type, sqltypes.NVARCHAR)
-
- if testing.against('oracle+cx_oracle'):
- # nvarchar returns unicode natively. cx_oracle
- # _OracleNVarChar type should be at play here.
- assert isinstance(
- t2.c.data.type.dialect_impl(testing.db.dialect),
- cx_oracle._OracleNVarChar)
-
- data = u('m’a réveillé.')
- t2.insert().execute(data=data)
- res = t2.select().execute().first()['data']
- eq_(res, data)
- assert isinstance(res, util.text_type)
-
- @testing.provide_metadata
- def test_char_length(self):
- metadata = self.metadata
- t1 = Table('t1', metadata,
- Column("c1", VARCHAR(50)),
- Column("c2", NVARCHAR(250)),
- Column("c3", CHAR(200)))
- t1.create()
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- eq_(t2.c.c1.type.length, 50)
- eq_(t2.c.c2.type.length, 250)
- eq_(t2.c.c3.type.length, 200)
-
- @testing.provide_metadata
- def test_long_type(self):
- metadata = self.metadata
-
- t = Table('t', metadata, Column('data', oracle.LONG))
- metadata.create_all(testing.db)
- testing.db.execute(t.insert(), data='xyz')
- eq_(
- testing.db.scalar(select([t.c.data])),
- "xyz"
- )
-
- def test_longstring(self):
- metadata = MetaData(testing.db)
- testing.db.execute("""
- CREATE TABLE Z_TEST
- (
- ID NUMERIC(22) PRIMARY KEY,
- ADD_USER VARCHAR2(20) NOT NULL
- )
- """)
- try:
- t = Table("z_test", metadata, autoload=True)
- t.insert().execute(id=1.0, add_user='foobar')
- assert t.select().execute().fetchall() == [(1, 'foobar')]
- finally:
- testing.db.execute("DROP TABLE Z_TEST")
-
- @testing.fails_on('+zxjdbc', 'auto_convert_lobs not applicable')
- def test_lobs_without_convert(self):
- engine = testing_engine(options=dict(auto_convert_lobs=False))
- metadata = MetaData()
- t = Table("z_test",
- metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Text),
- Column('bindata', LargeBinary))
- t.create(engine)
- try:
- engine.execute(t.insert(),
- id=1,
- data='this is text',
- bindata=b('this is binary'))
- row = engine.execute(t.select()).first()
- eq_(row['data'].read(), 'this is text')
- eq_(row['bindata'].read(), b('this is binary'))
- finally:
- t.drop(engine)
-
-
-class EuroNumericTest(fixtures.TestBase):
- """
- test the numeric output_type_handler when using non-US locale for NLS_LANG.
- """
-
- __only_on__ = 'oracle+cx_oracle'
- __backend__ = True
-
- def setup(self):
- self.old_nls_lang = os.environ.get('NLS_LANG', False)
- os.environ['NLS_LANG'] = "GERMAN"
- self.engine = testing_engine()
-
- def teardown(self):
- if self.old_nls_lang is not False:
- os.environ['NLS_LANG'] = self.old_nls_lang
- else:
- del os.environ['NLS_LANG']
- self.engine.dispose()
-
- def test_output_type_handler(self):
- for stmt, exp, kw in [
- ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}),
- ("SELECT 15 FROM DUAL", 15, {}),
- ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL",
- decimal.Decimal("15"), {}),
- ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL",
- decimal.Decimal("0.1"), {}),
- ("SELECT :num FROM DUAL", decimal.Decimal("2.5"),
- {'num': decimal.Decimal("2.5")})
- ]:
- test_exp = self.engine.scalar(stmt, **kw)
- eq_(
- test_exp,
- exp
- )
- assert type(test_exp) is type(exp)
-
-
-class SystemTableTablenamesTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __backend__ = True
-
- def setup(self):
- testing.db.execute("create table my_table (id integer)")
- testing.db.execute(
- "create global temporary table my_temp_table (id integer)"
- )
- testing.db.execute(
- "create table foo_table (id integer) tablespace SYSTEM"
- )
-
- def teardown(self):
- testing.db.execute("drop table my_temp_table")
- testing.db.execute("drop table my_table")
- testing.db.execute("drop table foo_table")
-
- def test_table_names_no_system(self):
- insp = inspect(testing.db)
- eq_(
- insp.get_table_names(), ["my_table"]
- )
-
- def test_temp_table_names_no_system(self):
- insp = inspect(testing.db)
- eq_(
- insp.get_temp_table_names(), ["my_temp_table"]
- )
-
- def test_table_names_w_system(self):
- engine = testing_engine(options={"exclude_tablespaces": ["FOO"]})
- insp = inspect(engine)
- eq_(
- set(insp.get_table_names()).intersection(["my_table",
- "foo_table"]),
- set(["my_table", "foo_table"])
- )
-
-
-class DontReflectIOTTest(fixtures.TestBase):
- """test that index overflow tables aren't included in
- table_names."""
-
- __only_on__ = 'oracle'
- __backend__ = True
-
- def setup(self):
- testing.db.execute("""
- CREATE TABLE admin_docindex(
- token char(20),
- doc_id NUMBER,
- token_frequency NUMBER,
- token_offsets VARCHAR2(2000),
- CONSTRAINT pk_admin_docindex PRIMARY KEY (token, doc_id))
- ORGANIZATION INDEX
- TABLESPACE users
- PCTTHRESHOLD 20
- OVERFLOW TABLESPACE users
- """)
-
- def teardown(self):
- testing.db.execute("drop table admin_docindex")
-
- def test_reflect_all(self):
- m = MetaData(testing.db)
- m.reflect()
- eq_(
- set(t.name for t in m.tables.values()),
- set(['admin_docindex'])
- )
-
-
-class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global binary_table, stream, meta
- meta = MetaData(testing.db)
- binary_table = Table('binary_table', meta,
- Column('id', Integer, primary_key=True),
- Column('data', LargeBinary))
- meta.create_all()
- stream = os.path.join(
- os.path.dirname(__file__), "..",
- 'binary_data_one.dat')
- with open(stream, "rb") as file_:
- stream = file_.read(12000)
-
- for i in range(1, 11):
- binary_table.insert().execute(id=i, data=stream)
-
- @classmethod
- def teardown_class(cls):
- meta.drop_all()
-
- def test_fetch(self):
- result = binary_table.select().order_by(binary_table.c.id).\
- execute().fetchall()
- eq_(result, [(i, stream) for i in range(1, 11)])
-
- @testing.fails_on('+zxjdbc', 'FIXME: zxjdbc should support this')
- def test_fetch_single_arraysize(self):
- eng = testing_engine(options={'arraysize': 1})
- result = eng.execute(binary_table.select().
- order_by(binary_table.c.id)).fetchall()
- eq_(result, [(i, stream) for i in range(1, 11)])
-
-
-class UnsupportedIndexReflectTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @testing.emits_warning("No column names")
- @testing.provide_metadata
- def test_reflect_functional_index(self):
- metadata = self.metadata
- Table('test_index_reflect', metadata,
- Column('data', String(20), primary_key=True))
- metadata.create_all()
-
- testing.db.execute('CREATE INDEX DATA_IDX ON '
- 'TEST_INDEX_REFLECT (UPPER(DATA))')
- m2 = MetaData(testing.db)
- Table('test_index_reflect', m2, autoload=True)
-
-
-def all_tables_compression_missing():
- try:
- testing.db.execute('SELECT compression FROM all_tables')
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"):
- return True
- return False
- except Exception:
- return True
-
-
-def all_tables_compress_for_missing():
- try:
- testing.db.execute('SELECT compress_for FROM all_tables')
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"):
- return True
- return False
- except Exception:
- return True
-
-
-class TableReflectionTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @testing.provide_metadata
- @testing.fails_if(all_tables_compression_missing)
- def test_reflect_basic_compression(self):
- metadata = self.metadata
-
- tbl = Table('test_compress', metadata,
- Column('data', Integer, primary_key=True),
- oracle_compress=True)
- metadata.create_all()
-
- m2 = MetaData(testing.db)
-
- tbl = Table('test_compress', m2, autoload=True)
- # Don't hardcode the exact value, but it must be non-empty
- assert tbl.dialect_options['oracle']['compress']
-
- @testing.provide_metadata
- @testing.fails_if(all_tables_compress_for_missing)
- def test_reflect_oltp_compression(self):
- metadata = self.metadata
-
- tbl = Table('test_compress', metadata,
- Column('data', Integer, primary_key=True),
- oracle_compress="OLTP")
- metadata.create_all()
-
- m2 = MetaData(testing.db)
-
- tbl = Table('test_compress', m2, autoload=True)
- assert tbl.dialect_options['oracle']['compress'] == "OLTP"
-
-
-class RoundTripIndexTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @testing.provide_metadata
- def test_basic(self):
- metadata = self.metadata
-
- table = Table("sometable", metadata,
- Column("id_a", Unicode(255), primary_key=True),
- Column("id_b",
- Unicode(255),
- primary_key=True,
- unique=True),
- Column("group", Unicode(255), primary_key=True),
- Column("col", Unicode(255)),
- UniqueConstraint('col', 'group'))
-
- # "group" is a keyword, so lower case
- normalind = Index('tableind', table.c.id_b, table.c.group)
- compress1 = Index('compress1', table.c.id_a, table.c.id_b,
- oracle_compress=True)
- compress2 = Index('compress2', table.c.id_a, table.c.id_b, table.c.col,
- oracle_compress=1)
-
- metadata.create_all()
- mirror = MetaData(testing.db)
- mirror.reflect()
- metadata.drop_all()
- mirror.create_all()
-
- inspect = MetaData(testing.db)
- inspect.reflect()
-
- def obj_definition(obj):
- return (obj.__class__,
- tuple([c.name for c in obj.columns]),
- getattr(obj, 'unique', None))
-
- # find what the primary k constraint name should be
- primaryconsname = testing.db.scalar(
- text(
- """SELECT constraint_name
- FROM all_constraints
- WHERE table_name = :table_name
- AND owner = :owner
- AND constraint_type = 'P' """),
- table_name=table.name.upper(),
- owner=testing.db.dialect.default_schema_name.upper())
-
- reflectedtable = inspect.tables[table.name]
-
- # make a dictionary of the reflected objects:
-
- reflected = dict([(obj_definition(i), i) for i in
- reflectedtable.indexes
- | reflectedtable.constraints])
-
- # assert we got primary key constraint and its name, Error
- # if not in dict
-
- assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
- 'group'), None)].name.upper() \
- == primaryconsname.upper()
-
- # Error if not in dict
-
- eq_(
- reflected[(Index, ('id_b', 'group'), False)].name,
- normalind.name
- )
- assert (Index, ('id_b', ), True) in reflected
- assert (Index, ('col', 'group'), True) in reflected
-
- idx = reflected[(Index, ('id_a', 'id_b', ), False)]
- assert idx.dialect_options['oracle']['compress'] == 2
-
- idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)]
- assert idx.dialect_options['oracle']['compress'] == 1
-
- eq_(len(reflectedtable.constraints), 1)
- eq_(len(reflectedtable.indexes), 5)
-
-
-class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
-
- def test_basic(self):
- seq = Sequence('my_seq_no_schema')
- dialect = oracle.OracleDialect()
- assert dialect.identifier_preparer.format_sequence(seq) \
- == 'my_seq_no_schema'
- seq = Sequence('my_seq', schema='some_schema')
- assert dialect.identifier_preparer.format_sequence(seq) \
- == 'some_schema.my_seq'
- seq = Sequence('My_Seq', schema='Some_Schema')
- assert dialect.identifier_preparer.format_sequence(seq) \
- == '"Some_Schema"."My_Seq"'
-
-
-class ExecuteTest(fixtures.TestBase):
-
- __only_on__ = 'oracle'
- __backend__ = True
-
- def test_basic(self):
- eq_(testing.db.execute('/*+ this is a comment */ SELECT 1 FROM '
- 'DUAL').fetchall(), [(1, )])
-
- def test_sequences_are_integers(self):
- seq = Sequence('foo_seq')
- seq.create(testing.db)
- try:
- val = testing.db.execute(seq)
- eq_(val, 1)
- assert type(val) is int
- finally:
- seq.drop(testing.db)
-
- @testing.provide_metadata
- def test_limit_offset_for_update(self):
- metadata = self.metadata
- # oracle can't actually do the ROWNUM thing with FOR UPDATE
- # very well.
-
- t = Table('t1',
- metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer))
- metadata.create_all()
-
- t.insert().execute(
- {'id': 1, 'data': 1},
- {'id': 2, 'data': 7},
- {'id': 3, 'data': 12},
- {'id': 4, 'data': 15},
- {'id': 5, 'data': 32},
- )
-
- # here, we can't use ORDER BY.
- eq_(
- t.select(for_update=True).limit(2).execute().fetchall(),
- [(1, 1),
- (2, 7)]
- )
-
- # here, its impossible. But we'd prefer it to raise ORA-02014
- # instead of issuing a syntax error.
- assert_raises_message(
- exc.DatabaseError,
- "ORA-02014",
- t.select(for_update=True).limit(2).offset(3).execute
- )
-
-
-class UnicodeSchemaTest(fixtures.TestBase):
- __only_on__ = 'oracle'
- __backend__ = True
-
- @testing.provide_metadata
- def test_quoted_column_non_unicode(self):
- metadata = self.metadata
- table = Table("atable", metadata,
- Column("_underscorecolumn",
- Unicode(255),
- primary_key=True))
- metadata.create_all()
-
- table.insert().execute(
- {'_underscorecolumn': u('’é')},
- )
- result = testing.db.execute(
- table.select().where(table.c._underscorecolumn == u('’é'))
- ).scalar()
- eq_(result, u('’é'))
-
- @testing.provide_metadata
- def test_quoted_column_unicode(self):
- metadata = self.metadata
- table = Table("atable", metadata,
- Column(u("méil"), Unicode(255), primary_key=True))
- metadata.create_all()
-
- table.insert().execute(
- {u('méil'): u('’é')},
- )
- result = testing.db.execute(
- table.select().where(table.c[u('méil')] == u('’é'))
- ).scalar()
- eq_(result, u('’é'))
-
-
-class DBLinkReflectionTest(fixtures.TestBase):
- __requires__ = 'oracle_test_dblink',
- __only_on__ = 'oracle'
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- from sqlalchemy.testing import config
- cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link')
-
- # note that the synonym here is still not totally functional
- # when accessing via a different username as we do with the
- # multiprocess test suite, so testing here is minimal
- with testing.db.connect() as conn:
- conn.execute("create table test_table "
- "(id integer primary key, data varchar2(50))")
- conn.execute("create synonym test_table_syn "
- "for test_table@%s" % cls.dblink)
-
- @classmethod
- def teardown_class(cls):
- with testing.db.connect() as conn:
- conn.execute("drop synonym test_table_syn")
- conn.execute("drop table test_table")
-
- def test_reflection(self):
- """test the resolution of the synonym/dblink. """
- m = MetaData()
-
- t = Table('test_table_syn', m, autoload=True,
- autoload_with=testing.db, oracle_resolve_synonyms=True)
- eq_(list(t.c.keys()), ['id', 'data'])
- eq_(list(t.primary_key), [t.c.id])
-
-
-class ServiceNameTest(fixtures.TestBase):
- __only_on__ = 'oracle+cx_oracle'
- __backend__ = True
-
- def test_cx_oracle_service_name(self):
- url_string = 'oracle+cx_oracle://scott:tiger@host/?service_name=hr'
- eng = create_engine(url_string, _initialize=False)
- cargs, cparams = eng.dialect.create_connect_args(eng.url)
-
- assert 'SERVICE_NAME=hr' in cparams['dsn']
- assert 'SID=hr' not in cparams['dsn']
-
- def test_cx_oracle_service_name_bad(self):
- url_string = 'oracle+cx_oracle://scott:tiger@host/hr1?service_name=hr2'
- assert_raises(
- exc.InvalidRequestError,
- create_engine, url_string,
- _initialize=False
- )
-
diff --git a/test/profiles.txt b/test/profiles.txt
index e76f14cdb..e1bcf0aef 100644
--- a/test/profiles.txt
+++ b/test/profiles.txt
@@ -1,15 +1,15 @@
# /home/classic/dev/sqlalchemy/test/profiles.txt
# This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and
+# For each test in aaa_profiling, the corresponding function and
# environment is located within this file. If it doesn't exist,
# the test is skipped.
-# If a callcount does exist, it is compared to what we received.
+# If a callcount does exist, it is compared to what we received.
# assertions are raised if the counts do not match.
-#
-# To add a new callcount test, apply the function_call_count
-# decorator and re-run the tests using the --write-profiles
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles
# option - this file will be rewritten including the new count.
-#
+#
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
@@ -367,6 +367,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_mysqldb_dbapiunicode_nocextensions 52
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_pymysql_dbapiunicode_cextensions 48
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_mysql_pymysql_dbapiunicode_nocextensions 52
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_oracle_cx_oracle_dbapiunicode_cextensions 47
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 51
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_postgresql_psycopg2_dbapiunicode_cextensions 48
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 52
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 2.7_sqlite_pysqlite_dbapiunicode_cextensions 48
@@ -379,6 +381,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 56
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_sqlite_pysqlite_dbapiunicode_cextensions 52
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 56
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.6_oracle_cx_oracle_dbapiunicode_cextensions 51
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 55
# TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute
@@ -386,6 +390,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_dbapiunicode_nocextensions 91
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_pymysql_dbapiunicode_cextensions 87
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_pymysql_dbapiunicode_nocextensions 91
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_dbapiunicode_cextensions 86
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 90
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_dbapiunicode_cextensions 87
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 91
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_dbapiunicode_cextensions 87
@@ -398,6 +404,8 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 95
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_sqlite_pysqlite_dbapiunicode_cextensions 91
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 95
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.6_oracle_cx_oracle_dbapiunicode_cextensions 90
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 94
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile
@@ -405,6 +413,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_mysqldb_dbapiunicode_nocextensions 15
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_pymysql_dbapiunicode_cextensions 15
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_mysql_pymysql_dbapiunicode_nocextensions 15
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_oracle_cx_oracle_dbapiunicode_cextensions 15
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_postgresql_psycopg2_dbapiunicode_cextensions 15
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_sqlite_pysqlite_dbapiunicode_cextensions 15
@@ -417,6 +427,22 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 16
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_sqlite_pysqlite_dbapiunicode_cextensions 16
test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 16
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.6_oracle_cx_oracle_dbapiunicode_cextensions 16
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 16
+
+# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string
+
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 2.7_oracle_cx_oracle_dbapiunicode_cextensions 357
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15379
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 3.6_oracle_cx_oracle_dbapiunicode_cextensions 326
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_string 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14330
+
+# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode
+
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 2.7_oracle_cx_oracle_dbapiunicode_cextensions 20361
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 35383
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 3.6_oracle_cx_oracle_dbapiunicode_cextensions 330
+test.aaa_profiling.test_resultset.ResultSetTest.test_raw_unicode 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14334
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_string
@@ -424,6 +450,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_db
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_dbapiunicode_nocextensions 55515
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_pymysql_dbapiunicode_cextensions 122475
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_pymysql_dbapiunicode_nocextensions 137477
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_oracle_cx_oracle_dbapiunicode_cextensions 518
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 15520
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_dbapiunicode_cextensions 498
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15500
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_sqlite_pysqlite_dbapiunicode_cextensions 440
@@ -436,6 +464,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_postgresql_psyco
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 14527
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_sqlite_pysqlite_dbapiunicode_cextensions 460
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 14464
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.6_oracle_cx_oracle_dbapiunicode_cextensions 522
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14526
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_unicode
@@ -443,6 +473,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_d
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_dbapiunicode_nocextensions 55515
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_pymysql_dbapiunicode_cextensions 122475
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_pymysql_dbapiunicode_nocextensions 137477
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_oracle_cx_oracle_dbapiunicode_cextensions 20518
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_oracle_cx_oracle_dbapiunicode_nocextensions 55520
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_dbapiunicode_cextensions 498
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 15500
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_sqlite_pysqlite_dbapiunicode_cextensions 440
@@ -455,6 +487,8 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_postgresql_psyc
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_postgresql_psycopg2_dbapiunicode_nocextensions 14527
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_sqlite_pysqlite_dbapiunicode_cextensions 460
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.5_sqlite_pysqlite_dbapiunicode_nocextensions 14464
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.6_oracle_cx_oracle_dbapiunicode_cextensions 522
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.6_oracle_cx_oracle_dbapiunicode_nocextensions 14526
# TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation
diff --git a/test/requirements.py b/test/requirements.py
index 9b01a22dd..0829607cd 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -750,10 +750,14 @@ class DefaultRequirements(SuiteRequirements):
"""target backend supports Decimal() objects using E notation
to represent very large values."""
- return skip_if(
- [("sybase+pyodbc", None, None,
- "Don't know how do get these values through FreeTDS + Sybase"),
- ("firebird", None, None, "Precision must be from 1 to 18")])
+ return fails_if(
+ [
+ ("sybase+pyodbc", None, None,
+ "Don't know how do get these values through FreeTDS + Sybase"
+ ),
+ ("firebird", None, None, "Precision must be from 1 to 18")
+ ]
+ )
@property
def precision_numerics_many_significant_digits(self):
@@ -761,15 +765,17 @@ class DefaultRequirements(SuiteRequirements):
such as 319438950232418390.273596, 87673.594069654243
"""
- def cx_oracle_6_config(config):
- return config.db.driver == "cx_oracle" and \
- config.db.dialect.cx_oracle_ver >= (6, )
+
+ def broken_cx_oracle(config):
+ return against(config, 'oracle+cx_oracle') and \
+ config.db.dialect.cx_oracle_ver <= (6, 0, 2) and \
+ config.db.dialect.cx_oracle_ver > (6, )
return fails_if(
- [cx_oracle_6_config,
- ('sqlite', None, None, 'TODO'),
- ("firebird", None, None, "Precision must be from 1 to 18"),
- ("sybase+pysybase", None, None, "TODO"),
+ [
+ ('sqlite', None, None, 'TODO'),
+ ("firebird", None, None, "Precision must be from 1 to 18"),
+ ("sybase+pysybase", None, None, "TODO"),
]
)
@@ -781,9 +787,7 @@ class DefaultRequirements(SuiteRequirements):
return fails_if(
[
- ('oracle', None, None,
- "this may be a bug due to the difficulty in handling "
- "oracle precision numerics"),
+ ("oracle", None, None, "driver doesn't do this automatically"),
("firebird", None, None,
"database and/or driver truncates decimal places.")
]
@@ -999,3 +1003,19 @@ class DefaultRequirements(SuiteRequirements):
lambda config: against(config, 'postgresql') and
config.db.scalar("show server_encoding").lower() == "utf8"
)
+
+ @property
+ def broken_cx_oracle6_numerics(config):
+ return exclusions.LambdaPredicate(
+ lambda config: against(config, 'oracle+cx_oracle') and
+ config.db.dialect.cx_oracle_ver <= (6, 0, 2) and
+ config.db.dialect.cx_oracle_ver > (6, ),
+ "cx_Oracle github issue #77"
+ )
+
+ @property
+ def oracle5x(self):
+ return only_if(
+ lambda config: against(config, "oracle+cx_oracle") and
+ config.db.dialect.cx_oracle_ver < (6, )
+ ) \ No newline at end of file
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 947fe0dc5..f8d183b71 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -360,7 +360,6 @@ class ReturnDefaultsTest(fixtures.TablesTest):
[None]
)
- @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
def test_insert_non_default_plus_default(self):
t1 = self.tables.t1
result = testing.db.execute(
@@ -372,7 +371,6 @@ class ReturnDefaultsTest(fixtures.TablesTest):
{"id": 1, "data": None, "insdef": 0}
)
- @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
def test_update_non_default_plus_default(self):
t1 = self.tables.t1
testing.db.execute(
@@ -387,7 +385,6 @@ class ReturnDefaultsTest(fixtures.TablesTest):
{"data": None, 'upddef': 1}
)
- @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
def test_insert_all(self):
t1 = self.tables.t1
result = testing.db.execute(
@@ -398,7 +395,6 @@ class ReturnDefaultsTest(fixtures.TablesTest):
{"id": 1, "data": None, "insdef": 0}
)
- @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
def test_update_all(self):
t1 = self.tables.t1
testing.db.execute(
diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py
index 009911538..ea29bcf7e 100644
--- a/test/sql/test_rowcount.py
+++ b/test/sql/test_rowcount.py
@@ -66,7 +66,8 @@ class FoundRowsTest(fixtures.TestBase, AssertsExecutionResults):
assert r.rowcount == 3
@testing.skip_if(
- "oracle", "temporary skip until cx_oracle refactor is merged")
+ testing.requires.oracle5x,
+ "unknown DBAPI error fixed in later version")
@testing.requires.sane_rowcount_w_returning
def test_update_rowcount_return_defaults(self):
department = employees_table.c.department
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index b6cc04322..bbd8a221b 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -1579,6 +1579,7 @@ binary_table = MyPickleType = metadata = None
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
+ __backend__ = True
@classmethod
def setup_class(cls):