diff options
Diffstat (limited to 'test/dialect/oracle/test_dialect.py')
-rw-r--r-- | test/dialect/oracle/test_dialect.py | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/test/dialect/oracle/test_dialect.py b/test/dialect/oracle/test_dialect.py new file mode 100644 index 000000000..83a875c2e --- /dev/null +++ b/test/dialect/oracle/test_dialect.py @@ -0,0 +1,316 @@ +# coding: utf-8 + + +from sqlalchemy.testing import eq_ +from sqlalchemy import types as sqltypes, exc, schema +from sqlalchemy.sql import table, column +from sqlalchemy.testing import (fixtures, + AssertsExecutionResults, + AssertsCompiledSQL) +from sqlalchemy import testing +from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ + Index, MetaData, select, inspect, ForeignKey, String, func, \ + TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ + literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ + ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ + union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ + PrimaryKeyConstraint, FLOAT +from sqlalchemy.util import u, b +from sqlalchemy import util +from sqlalchemy.testing import assert_raises, assert_raises_message +from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.dialects.oracle import cx_oracle, base as oracle +from sqlalchemy.engine import default +import decimal +from sqlalchemy.engine import url +from sqlalchemy.testing.schema import Table, Column +import datetime +import os +from sqlalchemy import sql +from sqlalchemy.testing.mock import Mock + + +class DialectTest(fixtures.TestBase): + def test_cx_oracle_version_parse(self): + dialect = cx_oracle.OracleDialect_cx_oracle() + + eq_( + dialect._parse_cx_oracle_ver("5.2"), + (5, 2) + ) + + eq_( + dialect._parse_cx_oracle_ver("5.0.1"), + (5, 0, 1) + ) + + eq_( + dialect._parse_cx_oracle_ver("6.0b1"), + (6, 0) + ) + + +class OutParamTest(fixtures.TestBase, AssertsExecutionResults): + __only_on__ = 'oracle+cx_oracle' + __backend__ = True + + @classmethod + def setup_class(cls): + testing.db.execute(""" + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; + """) + + def test_out_params(self): + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + ':z_out); end;', + bindparams=[bindparam('x_in', Float), + outparam('x_out', Integer), + outparam('y_out', Float), + outparam('z_out', String)]), + x_in=5) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) + assert isinstance(result.out_parameters['x_out'], int) + + @classmethod + def teardown_class(cls): + testing.db.execute("DROP PROCEDURE foo") + + +class QuotedBindRoundTripTest(fixtures.TestBase): + + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + def test_table_round_trip(self): + oracle.RESERVED_WORDS.remove('UNION') + + metadata = self.metadata + table = Table("t1", metadata, + Column("option", Integer), + Column("plain", Integer, quote=True), + # test that quote works for a reserved word + # that the dialect isn't aware of when quote + # is set + Column("union", Integer, quote=True)) + metadata.create_all() + + table.insert().execute( + {"option": 1, "plain": 1, "union": 1} + ) + eq_( + testing.db.execute(table.select()).first(), + (1, 1, 1) + ) + table.update().values(option=2, plain=2, union=2).execute() + eq_( + testing.db.execute(table.select()).first(), + (2, 2, 2) + ) + + def test_numeric_bind_round_trip(self): + eq_( + testing.db.scalar( + select([ + literal_column("2", type_=Integer()) + + bindparam("2_1", value=2)]) + ), + 4 + ) + + @testing.provide_metadata + def test_numeric_bind_in_crud(self): + t = Table( + "asfd", self.metadata, + Column("100K", Integer) + ) + t.create() + + testing.db.execute(t.insert(), {"100K": 10}) + eq_( + testing.db.scalar(t.select()), 10 + ) + + +class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): + + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version + + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) + dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + dialect._detect_decimal_char = Mock() + return dialect + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) + + # before connect, assume modern DB + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + + dialect.initialize(Mock()) + assert not dialect.implicit_returning + assert not dialect._supports_char_length + assert not dialect._supports_nchar + assert not dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) + + dialect = self._dialect((8, 2, 5), implicit_returning=True) + dialect.initialize(testing.db.connect()) + assert dialect.implicit_returning + + def test_default_flags(self): + """test with no initialization or server version info""" + + dialect = self._dialect(None) + + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) + + def test_ora10_flags(self): + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) + assert dialect._supports_char_length + assert dialect._supports_nchar + assert dialect.use_ansi + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) + + +class ExecuteTest(fixtures.TestBase): + + __only_on__ = 'oracle' + __backend__ = True + + def test_basic(self): + eq_(testing.db.execute('/*+ this is a comment */ SELECT 1 FROM ' + 'DUAL').fetchall(), [(1, )]) + + def test_sequences_are_integers(self): + seq = Sequence('foo_seq') + seq.create(testing.db) + try: + val = testing.db.execute(seq) + eq_(val, 1) + assert type(val) is int + finally: + seq.drop(testing.db) + + @testing.provide_metadata + def test_limit_offset_for_update(self): + metadata = self.metadata + # oracle can't actually do the ROWNUM thing with FOR UPDATE + # very well. + + t = Table('t1', + metadata, + Column('id', Integer, primary_key=True), + Column('data', Integer)) + metadata.create_all() + + t.insert().execute( + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, + ) + + # here, we can't use ORDER BY. + eq_( + t.select(for_update=True).limit(2).execute().fetchall(), + [(1, 1), + (2, 7)] + ) + + # here, its impossible. But we'd prefer it to raise ORA-02014 + # instead of issuing a syntax error. + assert_raises_message( + exc.DatabaseError, + "ORA-02014", + t.select(for_update=True).limit(2).offset(3).execute + ) + + +class UnicodeSchemaTest(fixtures.TestBase): + __only_on__ = 'oracle' + __backend__ = True + + @testing.provide_metadata + def test_quoted_column_non_unicode(self): + metadata = self.metadata + table = Table("atable", metadata, + Column("_underscorecolumn", + Unicode(255), + primary_key=True)) + metadata.create_all() + + table.insert().execute( + {'_underscorecolumn': u('’é')}, + ) + result = testing.db.execute( + table.select().where(table.c._underscorecolumn == u('’é')) + ).scalar() + eq_(result, u('’é')) + + @testing.provide_metadata + def test_quoted_column_unicode(self): + metadata = self.metadata + table = Table("atable", metadata, + Column(u("méil"), Unicode(255), primary_key=True)) + metadata.create_all() + + table.insert().execute( + {u('méil'): u('’é')}, + ) + result = testing.db.execute( + table.select().where(table.c[u('méil')] == u('’é')) + ).scalar() + eq_(result, u('’é')) + + +class ServiceNameTest(fixtures.TestBase): + __only_on__ = 'oracle+cx_oracle' + __backend__ = True + + def test_cx_oracle_service_name(self): + url_string = 'oracle+cx_oracle://scott:tiger@host/?service_name=hr' + eng = create_engine(url_string, _initialize=False) + cargs, cparams = eng.dialect.create_connect_args(eng.url) + + assert 'SERVICE_NAME=hr' in cparams['dsn'] + assert 'SID=hr' not in cparams['dsn'] + + def test_cx_oracle_service_name_bad(self): + url_string = 'oracle+cx_oracle://scott:tiger@host/hr1?service_name=hr2' + assert_raises( + exc.InvalidRequestError, + create_engine, url_string, + _initialize=False + ) + |