summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/databases/oracle.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/databases/oracle.py')
-rw-r--r--lib/sqlalchemy/databases/oracle.py904
1 files changed, 0 insertions, 904 deletions
diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py
deleted file mode 100644
index 852cab448..000000000
--- a/lib/sqlalchemy/databases/oracle.py
+++ /dev/null
@@ -1,904 +0,0 @@
-# oracle.py
-# Copyright (C) 2005, 2006, 2007, 2008, 2009 Michael Bayer mike_mp@zzzcomputing.com
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Support for the Oracle database.
-
-Oracle version 8 through current (11g at the time of this writing) are supported.
-
-Driver
-------
-
-The Oracle dialect uses the cx_oracle driver, available at
-http://cx-oracle.sourceforge.net/ . The dialect has several behaviors
-which are specifically tailored towards compatibility with this module.
-
-Connecting
-----------
-
-Connecting with create_engine() uses the standard URL approach of
-``oracle://user:pass@host:port/dbname[?key=value&key=value...]``. If dbname is present, the
-host, port, and dbname tokens are converted to a TNS name using the cx_oracle
-:func:`makedsn()` function. Otherwise, the host token is taken directly as a TNS name.
-
-Additional arguments which may be specified either as query string arguments on the
-URL, or as keyword arguments to :func:`~sqlalchemy.create_engine()` are:
-
-* *allow_twophase* - enable two-phase transactions. Defaults to ``True``.
-
-* *auto_convert_lobs* - defaults to True, see the section on LOB objects.
-
-* *auto_setinputsizes* - the cx_oracle.setinputsizes() call is issued for all bind parameters.
- This is required for LOB datatypes but can be disabled to reduce overhead. Defaults
- to ``True``.
-
-* *mode* - This is given the string value of SYSDBA or SYSOPER, or alternatively an
- integer value. This value is only available as a URL query string argument.
-
-* *threaded* - enable multithreaded access to cx_oracle connections. Defaults
- to ``True``. Note that this is the opposite default of cx_oracle itself.
-
-* *use_ansi* - Use ANSI JOIN constructs (see the section on Oracle 8). Defaults
- to ``True``. If ``False``, Oracle-8 compatible constructs are used for joins.
-
-* *optimize_limits* - defaults to ``False``. see the section on LIMIT/OFFSET.
-
-Auto Increment Behavior
------------------------
-
-SQLAlchemy Table objects which include integer primary keys are usually assumed to have
-"autoincrementing" behavior, meaning they can generate their own primary key values upon
-INSERT. Since Oracle has no "autoincrement" feature, SQLAlchemy relies upon sequences
-to produce these values. With the Oracle dialect, *a sequence must always be explicitly
-specified to enable autoincrement*. This is divergent with the majority of documentation
-examples which assume the usage of an autoincrement-capable database. To specify sequences,
-use the sqlalchemy.schema.Sequence object which is passed to a Column construct::
-
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq'), primary_key=True),
- Column(...), ...
- )
-
-This step is also required when using table reflection, i.e. autoload=True::
-
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq'), primary_key=True),
- autoload=True
- )
-
-LOB Objects
------------
-
-cx_oracle presents some challenges when fetching LOB objects. A LOB object in a result set
-is presented by cx_oracle as a cx_oracle.LOB object which has a read() method. By default,
-SQLAlchemy converts these LOB objects into Python strings. This is for two reasons. First,
-the LOB object requires an active cursor association, meaning if you were to fetch many rows
-at once such that cx_oracle had to go back to the database and fetch a new batch of rows,
-the LOB objects in the already-fetched rows are now unreadable and will raise an error.
-SQLA "pre-reads" all LOBs so that their data is fetched before further rows are read.
-The size of a "batch of rows" is controlled by the cursor.arraysize value, which SQLAlchemy
-defaults to 50 (cx_oracle normally defaults this to one).
-
-Secondly, the LOB object is not a standard DBAPI return value so SQLAlchemy seeks to
-"normalize" the results to look more like other DBAPIs.
-
-The conversion of LOB objects by this dialect is unique in SQLAlchemy in that it takes place
-for all statement executions, even plain string-based statements for which SQLA has no awareness
-of result typing. This is so that calls like fetchmany() and fetchall() can work in all cases
-without raising cursor errors. The conversion of LOB in all cases, as well as the "prefetch"
-of LOB objects, can be disabled using auto_convert_lobs=False.
-
-LIMIT/OFFSET Support
---------------------
-
-Oracle has no support for the LIMIT or OFFSET keywords. Whereas previous versions of SQLAlchemy
-used the "ROW NUMBER OVER..." construct to simulate LIMIT/OFFSET, SQLAlchemy 0.5 now uses
-a wrapped subquery approach in conjunction with ROWNUM. The exact methodology is taken from
-http://www.oracle.com/technology/oramag/oracle/06-sep/o56asktom.html . Note that the
-"FIRST ROWS()" optimization keyword mentioned is not used by default, as the user community felt
-this was stepping into the bounds of optimization that is better left on the DBA side, but this
-prefix can be added by enabling the optimize_limits=True flag on create_engine().
-
-Two Phase Transaction Support
------------------------------
-
-Two Phase transactions are implemented using XA transactions. Success has been reported of them
-working successfully but this should be regarded as an experimental feature.
-
-Oracle 8 Compatibility
-----------------------
-
-When using Oracle 8, a "use_ansi=False" flag is available which converts all
-JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
-makes use of Oracle's (+) operator.
-
-Synonym/DBLINK Reflection
--------------------------
-
-When using reflection with Table objects, the dialect can optionally search for tables
-indicated by synonyms that reference DBLINK-ed tables by passing the flag
-oracle_resolve_synonyms=True as a keyword argument to the Table construct. If DBLINK
-is not in use this flag should be left off.
-
-"""
-
-import datetime, random, re
-
-from sqlalchemy import util, sql, schema, log
-from sqlalchemy.engine import default, base
-from sqlalchemy.sql import compiler, visitors, expression
-from sqlalchemy.sql import operators as sql_operators, functions as sql_functions
-from sqlalchemy import types as sqltypes
-
-
-class OracleNumeric(sqltypes.Numeric):
- def get_col_spec(self):
- if self.precision is None:
- return "NUMERIC"
- else:
- return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale}
-
-class OracleInteger(sqltypes.Integer):
- def get_col_spec(self):
- return "INTEGER"
-
-class OracleSmallInteger(sqltypes.Smallinteger):
- def get_col_spec(self):
- return "SMALLINT"
-
-class OracleDate(sqltypes.Date):
- def get_col_spec(self):
- return "DATE"
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect):
- def process(value):
- if not isinstance(value, datetime.datetime):
- return value
- else:
- return value.date()
- return process
-
-class OracleDateTime(sqltypes.DateTime):
- def get_col_spec(self):
- return "DATE"
-
- def result_processor(self, dialect):
- def process(value):
- if value is None or isinstance(value, datetime.datetime):
- return value
- else:
- # convert cx_oracle datetime object returned pre-python 2.4
- return datetime.datetime(value.year, value.month,
- value.day,value.hour, value.minute, value.second)
- return process
-
-# Note:
-# Oracle DATE == DATETIME
-# Oracle does not allow milliseconds in DATE
-# Oracle does not support TIME columns
-
-# only if cx_oracle contains TIMESTAMP
-class OracleTimestamp(sqltypes.TIMESTAMP):
- def get_col_spec(self):
- return "TIMESTAMP"
-
- def get_dbapi_type(self, dialect):
- return dialect.TIMESTAMP
-
- def result_processor(self, dialect):
- def process(value):
- if value is None or isinstance(value, datetime.datetime):
- return value
- else:
- # convert cx_oracle datetime object returned pre-python 2.4
- return datetime.datetime(value.year, value.month,
- value.day,value.hour, value.minute, value.second)
- return process
-
-class OracleString(sqltypes.String):
- def get_col_spec(self):
- return "VARCHAR(%(length)s)" % {'length' : self.length}
-
-class OracleNVarchar(sqltypes.Unicode, OracleString):
- def get_col_spec(self):
- return "NVARCHAR2(%(length)s)" % {'length' : self.length}
-
-class OracleText(sqltypes.Text):
- def get_dbapi_type(self, dbapi):
- return dbapi.CLOB
-
- def get_col_spec(self):
- return "CLOB"
-
- def result_processor(self, dialect):
- super_process = super(OracleText, self).result_processor(dialect)
- if not dialect.auto_convert_lobs:
- return super_process
- lob = dialect.dbapi.LOB
- def process(value):
- if isinstance(value, lob):
- if super_process:
- return super_process(value.read())
- else:
- return value.read()
- else:
- if super_process:
- return super_process(value)
- else:
- return value
- return process
-
-
-class OracleChar(sqltypes.CHAR):
- def get_col_spec(self):
- return "CHAR(%(length)s)" % {'length' : self.length}
-
-class OracleBinary(sqltypes.Binary):
- def get_dbapi_type(self, dbapi):
- return dbapi.BLOB
-
- def get_col_spec(self):
- return "BLOB"
-
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect):
- if not dialect.auto_convert_lobs:
- return None
- lob = dialect.dbapi.LOB
- def process(value):
- if isinstance(value, lob):
- return value.read()
- else:
- return value
- return process
-
-class OracleRaw(OracleBinary):
- def get_col_spec(self):
- return "RAW(%(length)s)" % {'length' : self.length}
-
-class OracleBoolean(sqltypes.Boolean):
- def get_col_spec(self):
- return "SMALLINT"
-
- def result_processor(self, dialect):
- def process(value):
- if value is None:
- return None
- return value and True or False
- return process
-
- def bind_processor(self, dialect):
- def process(value):
- if value is True:
- return 1
- elif value is False:
- return 0
- elif value is None:
- return None
- else:
- return value and True or False
- return process
-
-colspecs = {
- sqltypes.Integer : OracleInteger,
- sqltypes.Smallinteger : OracleSmallInteger,
- sqltypes.Numeric : OracleNumeric,
- sqltypes.Float : OracleNumeric,
- sqltypes.DateTime : OracleDateTime,
- sqltypes.Date : OracleDate,
- sqltypes.String : OracleString,
- sqltypes.Binary : OracleBinary,
- sqltypes.Boolean : OracleBoolean,
- sqltypes.Text : OracleText,
- sqltypes.TIMESTAMP : OracleTimestamp,
- sqltypes.CHAR: OracleChar,
-}
-
-ischema_names = {
- 'VARCHAR2' : OracleString,
- 'NVARCHAR2' : OracleNVarchar,
- 'CHAR' : OracleString,
- 'DATE' : OracleDateTime,
- 'DATETIME' : OracleDateTime,
- 'NUMBER' : OracleNumeric,
- 'BLOB' : OracleBinary,
- 'BFILE' : OracleBinary,
- 'CLOB' : OracleText,
- 'TIMESTAMP' : OracleTimestamp,
- 'RAW' : OracleRaw,
- 'FLOAT' : OracleNumeric,
- 'DOUBLE PRECISION' : OracleNumeric,
- 'LONG' : OracleText,
-}
-
-class OracleExecutionContext(default.DefaultExecutionContext):
- def pre_exec(self):
- super(OracleExecutionContext, self).pre_exec()
- if self.dialect.auto_setinputsizes:
- self.set_input_sizes()
- if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
- for key in self.compiled.binds:
- bindparam = self.compiled.binds[key]
- name = self.compiled.bind_names[bindparam]
- value = self.compiled_parameters[0][name]
- if bindparam.isoutparam:
- dbtype = bindparam.type.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
- if not hasattr(self, 'out_parameters'):
- self.out_parameters = {}
- self.out_parameters[name] = self.cursor.var(dbtype)
- self.parameters[0][name] = self.out_parameters[name]
-
- def create_cursor(self):
- c = self._connection.connection.cursor()
- if self.dialect.arraysize:
- c.cursor.arraysize = self.dialect.arraysize
- return c
-
- def get_result_proxy(self):
- if hasattr(self, 'out_parameters'):
- if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
- for bind, name in self.compiled.bind_names.iteritems():
- if name in self.out_parameters:
- type = bind.type
- result_processor = type.dialect_impl(self.dialect).result_processor(self.dialect)
- if result_processor is not None:
- self.out_parameters[name] = result_processor(self.out_parameters[name].getvalue())
- else:
- self.out_parameters[name] = self.out_parameters[name].getvalue()
- else:
- for k in self.out_parameters:
- self.out_parameters[k] = self.out_parameters[k].getvalue()
-
- if self.cursor.description is not None:
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect.ORACLE_BINARY_TYPES:
- return base.BufferedColumnResultProxy(self)
-
- return base.ResultProxy(self)
-
-class OracleDialect(default.DefaultDialect):
- name = 'oracle'
- supports_alter = True
- supports_unicode_statements = False
- max_identifier_length = 30
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = False
- preexecute_pk_sequences = True
- supports_pk_autoincrement = False
- default_paramstyle = 'named'
-
- def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, optimize_limits=False, arraysize=50, **kwargs):
- default.DefaultDialect.__init__(self, **kwargs)
- self.use_ansi = use_ansi
- self.threaded = threaded
- self.arraysize = arraysize
- self.allow_twophase = allow_twophase
- self.optimize_limits = optimize_limits
- self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
- self.auto_setinputsizes = auto_setinputsizes
- self.auto_convert_lobs = auto_convert_lobs
- if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__:
- self.dbapi_type_map = {}
- self.ORACLE_BINARY_TYPES = []
- else:
- # only use this for LOB objects. using it for strings, dates
- # etc. leads to a little too much magic, reflection doesn't know if it should
- # expect encoded strings or unicodes, etc.
- self.dbapi_type_map = {
- self.dbapi.CLOB: OracleText(),
- self.dbapi.BLOB: OracleBinary(),
- self.dbapi.BINARY: OracleRaw(),
- }
- self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB"] if hasattr(self.dbapi, k)]
-
- def dbapi(cls):
- import cx_Oracle
- return cx_Oracle
- dbapi = classmethod(dbapi)
-
- def create_connect_args(self, url):
- dialect_opts = dict(url.query)
- for opt in ('use_ansi', 'auto_setinputsizes', 'auto_convert_lobs',
- 'threaded', 'allow_twophase'):
- if opt in dialect_opts:
- util.coerce_kw_type(dialect_opts, opt, bool)
- setattr(self, opt, dialect_opts[opt])
-
- if url.database:
- # if we have a database, then we have a remote host
- port = url.port
- if port:
- port = int(port)
- else:
- port = 1521
- dsn = self.dbapi.makedsn(url.host, port, url.database)
- else:
- # we have a local tnsname
- dsn = url.host
-
- opts = dict(
- user=url.username,
- password=url.password,
- dsn=dsn,
- threaded=self.threaded,
- twophase=self.allow_twophase,
- )
- if 'mode' in url.query:
- opts['mode'] = url.query['mode']
- if isinstance(opts['mode'], basestring):
- mode = opts['mode'].upper()
- if mode == 'SYSDBA':
- opts['mode'] = self.dbapi.SYSDBA
- elif mode == 'SYSOPER':
- opts['mode'] = self.dbapi.SYSOPER
- else:
- util.coerce_kw_type(opts, 'mode', int)
- # Can't set 'handle' or 'pool' via URL query args, use connect_args
-
- return ([], opts)
-
- def is_disconnect(self, e):
- if isinstance(e, self.dbapi.InterfaceError):
- return "not connected" in str(e)
- else:
- return "ORA-03114" in str(e) or "ORA-03113" in str(e)
-
- def type_descriptor(self, typeobj):
- return sqltypes.adapt_type(typeobj, colspecs)
-
- def create_xid(self):
- """create a two-phase transaction ID.
-
- this id will be passed to do_begin_twophase(), do_rollback_twophase(),
- do_commit_twophase(). its format is unspecified."""
-
- id = random.randint(0, 2 ** 128)
- return (0x1234, "%032x" % id, "%032x" % 9)
-
- def do_release_savepoint(self, connection, name):
- # Oracle does not support RELEASE SAVEPOINT
- pass
-
- def do_begin_twophase(self, connection, xid):
- connection.connection.begin(*xid)
-
- def do_prepare_twophase(self, connection, xid):
- connection.connection.prepare()
-
- def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
- self.do_rollback(connection.connection)
-
- def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):
- self.do_commit(connection.connection)
-
- def do_recover_twophase(self, connection):
- pass
-
- def has_table(self, connection, table_name, schema=None):
- if not schema:
- schema = self.get_default_schema_name(connection)
- cursor = connection.execute("""select table_name from all_tables where table_name=:name and owner=:schema_name""", {'name':self._denormalize_name(table_name), 'schema_name':self._denormalize_name(schema)})
- return cursor.fetchone() is not None
-
- def has_sequence(self, connection, sequence_name, schema=None):
- if not schema:
- schema = self.get_default_schema_name(connection)
- cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name and sequence_owner=:schema_name""", {'name':self._denormalize_name(sequence_name), 'schema_name':self._denormalize_name(schema)})
- return cursor.fetchone() is not None
-
- def _normalize_name(self, name):
- if name is None:
- return None
- elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding)):
- return name.lower().decode(self.encoding)
- else:
- return name.decode(self.encoding)
-
- def _denormalize_name(self, name):
- if name is None:
- return None
- elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()):
- return name.upper().encode(self.encoding)
- else:
- return name.encode(self.encoding)
-
- def get_default_schema_name(self, connection):
- return self._normalize_name(connection.execute('SELECT USER FROM DUAL').scalar())
- get_default_schema_name = base.connection_memoize(
- ('dialect', 'default_schema_name'))(get_default_schema_name)
-
- def table_names(self, connection, schema):
- # note that table_names() isnt loading DBLINKed or synonym'ed tables
- if schema is None:
- s = "select table_name from all_tables where nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX')"
- cursor = connection.execute(s)
- else:
- s = "select table_name from all_tables where nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM','SYSAUX') AND OWNER = :owner"
- cursor = connection.execute(s, {'owner': self._denormalize_name(schema)})
- return [self._normalize_name(row[0]) for row in cursor]
-
- def _resolve_synonym(self, connection, desired_owner=None, desired_synonym=None, desired_table=None):
- """search for a local synonym matching the given desired owner/name.
-
- if desired_owner is None, attempts to locate a distinct owner.
-
- returns the actual name, owner, dblink name, and synonym name if found.
- """
-
- sql = """select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK, SYNONYM_NAME
- from ALL_SYNONYMS WHERE """
-
- clauses = []
- params = {}
- if desired_synonym:
- clauses.append("SYNONYM_NAME=:synonym_name")
- params['synonym_name'] = desired_synonym
- if desired_owner:
- clauses.append("TABLE_OWNER=:desired_owner")
- params['desired_owner'] = desired_owner
- if desired_table:
- clauses.append("TABLE_NAME=:tname")
- params['tname'] = desired_table
-
- sql += " AND ".join(clauses)
-
- result = connection.execute(sql, **params)
- if desired_owner:
- row = result.fetchone()
- if row:
- return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
- else:
- return None, None, None, None
- else:
- rows = result.fetchall()
- if len(rows) > 1:
- raise AssertionError("There are multiple tables visible to the schema, you must specify owner")
- elif len(rows) == 1:
- row = rows[0]
- return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
- else:
- return None, None, None, None
-
- def reflecttable(self, connection, table, include_columns):
- preparer = self.identifier_preparer
-
- resolve_synonyms = table.kwargs.get('oracle_resolve_synonyms', False)
-
- if resolve_synonyms:
- actual_name, owner, dblink, synonym = self._resolve_synonym(connection, desired_owner=self._denormalize_name(table.schema), desired_synonym=self._denormalize_name(table.name))
- else:
- actual_name, owner, dblink, synonym = None, None, None, None
-
- if not actual_name:
- actual_name = self._denormalize_name(table.name)
- if not dblink:
- dblink = ''
- if not owner:
- owner = self._denormalize_name(table.schema or self.get_default_schema_name(connection))
-
- c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})
-
- while True:
- row = c.fetchone()
- if row is None:
- break
-
- (colname, coltype, length, precision, scale, nullable, default) = (self._normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
-
- if include_columns and colname not in include_columns:
- continue
-
- # INTEGER if the scale is 0 and precision is null
- # NUMBER if the scale and precision are both null
- # NUMBER(9,2) if the precision is 9 and the scale is 2
- # NUMBER(3) if the precision is 3 and scale is 0
- #length is ignored except for CHAR and VARCHAR2
- if coltype == 'NUMBER' :
- if precision is None and scale is None:
- coltype = OracleNumeric
- elif precision is None and scale == 0 :
- coltype = OracleInteger
- else :
- coltype = OracleNumeric(precision, scale)
- elif coltype=='CHAR' or coltype=='VARCHAR2':
- coltype = ischema_names.get(coltype, OracleString)(length)
- else:
- coltype = re.sub(r'\(\d+\)', '', coltype)
- try:
- coltype = ischema_names[coltype]
- except KeyError:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (coltype, colname))
- coltype = sqltypes.NULLTYPE
-
- colargs = []
- if default is not None:
- colargs.append(schema.DefaultClause(sql.text(default)))
-
- table.append_column(schema.Column(colname, coltype, nullable=nullable, *colargs))
-
- if not table.columns:
- raise AssertionError("Couldn't find any column information for table %s" % actual_name)
-
- c = connection.execute("""SELECT
- ac.constraint_name,
- ac.constraint_type,
- loc.column_name AS local_column,
- rem.table_name AS remote_table,
- rem.column_name AS remote_column,
- rem.owner AS remote_owner
- FROM all_constraints%(dblink)s ac,
- all_cons_columns%(dblink)s loc,
- all_cons_columns%(dblink)s rem
- WHERE ac.table_name = :table_name
- AND ac.constraint_type IN ('R','P')
- AND ac.owner = :owner
- AND ac.owner = loc.owner
- AND ac.constraint_name = loc.constraint_name
- AND ac.r_owner = rem.owner(+)
- AND ac.r_constraint_name = rem.constraint_name(+)
- -- order multiple primary keys correctly
- ORDER BY ac.constraint_name, loc.position, rem.position"""
- % {'dblink':dblink}, {'table_name' : actual_name, 'owner' : owner})
-
- fks = {}
- while True:
- row = c.fetchone()
- if row is None:
- break
- #print "ROW:" , row
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row[0:2] + tuple([self._normalize_name(x) for x in row[2:]])
- if cons_type == 'P':
- table.primary_key.add(table.c[local_column])
- elif cons_type == 'R':
- try:
- fk = fks[cons_name]
- except KeyError:
- fk = ([], [])
- fks[cons_name] = fk
- if remote_table is None:
- # ticket 363
- util.warn(
- ("Got 'None' querying 'table_name' from "
- "all_cons_columns%(dblink)s - does the user have "
- "proper rights to the table?") % {'dblink':dblink})
- continue
-
- if resolve_synonyms:
- ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = self._resolve_synonym(connection, desired_owner=self._denormalize_name(remote_owner), desired_table=self._denormalize_name(remote_table))
- if ref_synonym:
- remote_table = self._normalize_name(ref_synonym)
- remote_owner = self._normalize_name(ref_remote_owner)
-
- if not table.schema and self._denormalize_name(remote_owner) == owner:
- refspec = ".".join([remote_table, remote_column])
- t = schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
- else:
- refspec = ".".join([x for x in [remote_owner, remote_table, remote_column] if x])
- t = schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, schema=remote_owner, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
-
- if local_column not in fk[0]:
- fk[0].append(local_column)
- if refspec not in fk[1]:
- fk[1].append(refspec)
-
- for name, value in fks.iteritems():
- table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], name=name, link_to_name=True))
-
-
-class _OuterJoinColumn(sql.ClauseElement):
- __visit_name__ = 'outer_join_column'
-
- def __init__(self, column):
- self.column = column
-
-class OracleCompiler(compiler.DefaultCompiler):
- """Oracle compiler modifies the lexical structure of Select
- statements to work under non-ANSI configured Oracle databases, if
- the use_ansi flag is False.
- """
-
- operators = compiler.DefaultCompiler.operators.copy()
- operators.update(
- {
- sql_operators.mod : lambda x, y:"mod(%s, %s)" % (x, y),
- sql_operators.match_op: lambda x, y: "CONTAINS (%s, %s)" % (x, y)
- }
- )
-
- functions = compiler.DefaultCompiler.functions.copy()
- functions.update (
- {
- sql_functions.now : 'CURRENT_TIMESTAMP'
- }
- )
-
- def __init__(self, *args, **kwargs):
- super(OracleCompiler, self).__init__(*args, **kwargs)
- self.__wheres = {}
-
- def default_from(self):
- """Called when a ``SELECT`` statement has no froms, and no ``FROM`` clause is to be appended.
-
- The Oracle compiler tacks a "FROM DUAL" to the statement.
- """
-
- return " FROM DUAL"
-
- def apply_function_parens(self, func):
- return len(func.clauses) > 0
-
- def visit_join(self, join, **kwargs):
- if self.dialect.use_ansi:
- return compiler.DefaultCompiler.visit_join(self, join, **kwargs)
- else:
- return self.process(join.left, asfrom=True) + ", " + self.process(join.right, asfrom=True)
-
- def _get_nonansi_join_whereclause(self, froms):
- clauses = []
-
- def visit_join(join):
- if join.isouter:
- def visit_binary(binary):
- if binary.operator == sql_operators.eq:
- if binary.left.table is join.right:
- binary.left = _OuterJoinColumn(binary.left)
- elif binary.right.table is join.right:
- binary.right = _OuterJoinColumn(binary.right)
- clauses.append(visitors.cloned_traverse(join.onclause, {}, {'binary':visit_binary}))
- else:
- clauses.append(join.onclause)
-
- for f in froms:
- visitors.traverse(f, {}, {'join':visit_join})
- return sql.and_(*clauses)
-
- def visit_outer_join_column(self, vc):
- return self.process(vc.column) + "(+)"
-
- def visit_sequence(self, seq):
- return self.dialect.identifier_preparer.format_sequence(seq) + ".nextval"
-
- def visit_alias(self, alias, asfrom=False, **kwargs):
- """Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??"""
-
- if asfrom:
- alias_name = isinstance(alias.name, expression._generated_label) and \
- self._truncated_identifier("alias", alias.name) or alias.name
-
- return self.process(alias.original, asfrom=True, **kwargs) + " " +\
- self.preparer.format_alias(alias, alias_name)
- else:
- return self.process(alias.original, **kwargs)
-
- def _TODO_visit_compound_select(self, select):
- """Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle."""
- pass
-
- def visit_select(self, select, **kwargs):
- """Look for ``LIMIT`` and OFFSET in a select statement, and if
- so tries to wrap it in a subquery with ``rownum`` criterion.
- """
-
- if not getattr(select, '_oracle_visit', None):
- if not self.dialect.use_ansi:
- if self.stack and 'from' in self.stack[-1]:
- existingfroms = self.stack[-1]['from']
- else:
- existingfroms = None
-
- froms = select._get_display_froms(existingfroms)
- whereclause = self._get_nonansi_join_whereclause(froms)
- if whereclause:
- select = select.where(whereclause)
- select._oracle_visit = True
-
- if select._limit is not None or select._offset is not None:
- # See http://www.oracle.com/technology/oramag/oracle/06-sep/o56asktom.html
- #
- # Generalized form of an Oracle pagination query:
- # select ... from (
- # select /*+ FIRST_ROWS(N) */ ...., rownum as ora_rn from (
- # select distinct ... where ... order by ...
- # ) where ROWNUM <= :limit+:offset
- # ) where ora_rn > :offset
- # Outer select and "ROWNUM as ora_rn" can be dropped if limit=0
-
- # TODO: use annotations instead of clone + attr set ?
- select = select._generate()
- select._oracle_visit = True
-
- # Wrap the middle select and add the hint
- limitselect = sql.select([c for c in select.c])
- if select._limit and self.dialect.optimize_limits:
- limitselect = limitselect.prefix_with("/*+ FIRST_ROWS(%d) */" % select._limit)
-
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
-
- # If needed, add the limiting clause
- if select._limit is not None:
- max_row = select._limit
- if select._offset is not None:
- max_row += select._offset
- limitselect.append_whereclause(
- sql.literal_column("ROWNUM")<=max_row)
-
- # If needed, add the ora_rn, and wrap again with offset.
- if select._offset is None:
- select = limitselect
- else:
- limitselect = limitselect.column(
- sql.literal_column("ROWNUM").label("ora_rn"))
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
-
- offsetselect = sql.select(
- [c for c in limitselect.c if c.key!='ora_rn'])
- offsetselect._oracle_visit = True
- offsetselect._is_wrapper = True
-
- offsetselect.append_whereclause(
- sql.literal_column("ora_rn")>select._offset)
-
- select = offsetselect
-
- kwargs['iswrapper'] = getattr(select, '_is_wrapper', False)
- return compiler.DefaultCompiler.visit_select(self, select, **kwargs)
-
- def limit_clause(self, select):
- return ""
-
- def for_update_clause(self, select):
- if select.for_update == "nowait":
- return " FOR UPDATE NOWAIT"
- else:
- return super(OracleCompiler, self).for_update_clause(select)
-
-
-class OracleSchemaGenerator(compiler.SchemaGenerator):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
- colspec += " " + column.type.dialect_impl(self.dialect).get_col_spec()
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
-
- if not column.nullable:
- colspec += " NOT NULL"
- return colspec
-
- def visit_sequence(self, sequence):
- if not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name, sequence.schema):
- self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence))
- self.execute()
-
-class OracleSchemaDropper(compiler.SchemaDropper):
- def visit_sequence(self, sequence):
- if not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name, sequence.schema):
- self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence))
- self.execute()
-
-class OracleDefaultRunner(base.DefaultRunner):
- def visit_sequence(self, seq):
- return self.execute_string("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL", {})
-
-class OracleIdentifierPreparer(compiler.IdentifierPreparer):
- def format_savepoint(self, savepoint):
- name = re.sub(r'^_+', '', savepoint.ident)
- return super(OracleIdentifierPreparer, self).format_savepoint(savepoint, name)
-
-
-dialect = OracleDialect
-dialect.statement_compiler = OracleCompiler
-dialect.schemagenerator = OracleSchemaGenerator
-dialect.schemadropper = OracleSchemaDropper
-dialect.preparer = OracleIdentifierPreparer
-dialect.defaultrunner = OracleDefaultRunner
-dialect.execution_ctx_cls = OracleExecutionContext