summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py22
-rw-r--r--lib/sqlalchemy/dialects/oracle/cx_oracle.py95
-rw-r--r--test/dialect/test_oracle.py21
3 files changed, 68 insertions, 70 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index a45793367..4863b7ba1 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -437,7 +437,7 @@ class OracleCompiler(compiler.SQLCompiler):
def get_select_hint_text(self, byfroms):
return " ".join(
- "/*+ %s */" % text for table, text in list(byfroms.items())
+ "/*+ %s */" % text for table, text in byfroms.items()
)
def function_argspec(self, fn, **kw):
@@ -661,7 +661,7 @@ class OracleIdentifierPreparer(compiler.IdentifierPreparer):
lc_value = value.lower()
return (lc_value in self.reserved_words
or value[0] in self.illegal_initial_characters
- or not self.legal_characters.match(str(value))
+ or not self.legal_characters.match(util.text_type(value))
)
def format_savepoint(self, savepoint):
@@ -765,10 +765,9 @@ class OracleDialect(default.DefaultDialect):
def normalize_name(self, name):
if name is None:
return None
-# start Py2K
-# if isinstance(name, str):
-# name = name.decode(self.encoding)
-# end Py2K
+ if util.py2k:
+ if isinstance(name, str):
+ name = name.decode(self.encoding)
if name.upper() == name and \
not self.identifier_preparer._requires_quotes(name.lower()):
return name.lower()
@@ -780,12 +779,11 @@ class OracleDialect(default.DefaultDialect):
return None
elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()):
name = name.upper()
-# start Py2K
-# if not self.supports_unicode_binds:
-# name = name.encode(self.encoding)
-# else:
-# name = unicode(name)
-# end Py2K
+ if util.py2k:
+ if not self.supports_unicode_binds:
+ name = name.encode(self.encoding)
+ else:
+ name = unicode(name)
return name
def _get_default_schema_name(self, connection):
diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
index 10ec20dee..99a514776 100644
--- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py
+++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
@@ -268,21 +268,17 @@ class _LOBMixin(object):
class _NativeUnicodeMixin(object):
-# start Py3K
- pass
-# end Py3K
-# start Py2K
-# def bind_processor(self, dialect):
-# if dialect._cx_oracle_with_unicode:
-# def process(value):
-# if value is None:
-# return value
-# else:
-# return unicode(value)
-# return process
-# else:
-# return super(_NativeUnicodeMixin, self).bind_processor(dialect)
-# end Py2K
+ if util.py2k:
+ def bind_processor(self, dialect):
+ if dialect._cx_oracle_with_unicode:
+ def process(value):
+ if value is None:
+ return value
+ else:
+ return unicode(value)
+ return process
+ else:
+ return super(_NativeUnicodeMixin, self).bind_processor(dialect)
# we apply a connection output handler that returns
# unicode in all cases, so the "native_unicode" flag
@@ -392,10 +388,10 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
(fromname.encode(self.dialect.encoding),
toname.encode(self.dialect.encoding))
for fromname, toname in
- list(quoted_bind_names.items())
+ quoted_bind_names.items()
)
for param in self.parameters:
- for fromname, toname in list(quoted_bind_names.items()):
+ for fromname, toname in quoted_bind_names.items():
param[toname] = param[fromname]
del param[fromname]
@@ -409,7 +405,7 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
# if a single execute, check for outparams
if len(self.compiled_parameters) == 1:
- for bindparam in list(self.compiled.binds.values()):
+ for bindparam in self.compiled.binds.values():
if bindparam.isoutparam:
dbtype = bindparam.type.dialect_impl(self.dialect).\
get_dbapi_type(self.dialect.dbapi)
@@ -438,7 +434,7 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
if hasattr(self, 'out_parameters') and self.compiled.returning:
returning_params = dict(
(k, v.getvalue())
- for k, v in list(self.out_parameters.items())
+ for k, v in self.out_parameters.items()
)
return ReturningResultProxy(self, returning_params)
@@ -457,7 +453,7 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
len(self.compiled_parameters) == 1:
result.out_parameters = out_parameters = {}
- for bind, name in list(self.compiled.bind_names.items()):
+ for bind, name in self.compiled.bind_names.items():
if name in self.out_parameters:
type = bind.type
impl_type = type.dialect_impl(self.dialect)
@@ -473,7 +469,7 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext):
else:
result.out_parameters = dict(
(k, v.getvalue())
- for k, v in list(self.out_parameters.items())
+ for k, v in self.out_parameters.items()
)
return result
@@ -494,11 +490,11 @@ class OracleExecutionContext_cx_oracle_with_unicode(OracleExecutionContext_cx_or
"""
def __init__(self, *arg, **kw):
OracleExecutionContext_cx_oracle.__init__(self, *arg, **kw)
- self.statement = str(self.statement)
+ self.statement = util.text_type(self.statement)
def _execute_scalar(self, stmt):
return super(OracleExecutionContext_cx_oracle_with_unicode, self).\
- _execute_scalar(str(stmt))
+ _execute_scalar(util.text_type(stmt))
class ReturningResultProxy(_result.FullyBufferedResultProxy):
@@ -608,19 +604,23 @@ class OracleDialect_cx_oracle(OracleDialect):
self.supports_unicode_statements = True
self.supports_unicode_binds = True
self._cx_oracle_with_unicode = True
-# start Py2K
-# # There's really no reason to run with WITH_UNICODE under Python 2.x.
-# # Give the user a hint.
-# util.warn("cx_Oracle is compiled under Python 2.xx using the "
-# "WITH_UNICODE flag. Consider recompiling cx_Oracle without "
-# "this flag, which is in no way necessary for full support of Unicode. "
-# "Otherwise, all string-holding bind parameters must "
-# "be explicitly typed using SQLAlchemy's String type or one of its subtypes,"
-# "or otherwise be passed as Python unicode. Plain Python strings "
-# "passed as bind parameters will be silently corrupted by cx_Oracle."
-# )
-# self.execution_ctx_cls = OracleExecutionContext_cx_oracle_with_unicode
-# end Py2K
+
+ if util.py2k:
+ # There's really no reason to run with WITH_UNICODE under Python 2.x.
+ # Give the user a hint.
+ util.warn(
+ "cx_Oracle is compiled under Python 2.xx using the "
+ "WITH_UNICODE flag. Consider recompiling cx_Oracle "
+ "without this flag, which is in no way necessary for full "
+ "support of Unicode. Otherwise, all string-holding bind "
+ "parameters must be explicitly typed using SQLAlchemy's "
+ "String type or one of its subtypes,"
+ "or otherwise be passed as Python unicode. "
+ "Plain Python strings passed as bind parameters will be "
+ "silently corrupted by cx_Oracle."
+ )
+ self.execution_ctx_cls = \
+ OracleExecutionContext_cx_oracle_with_unicode
else:
self._cx_oracle_with_unicode = False
@@ -732,7 +732,7 @@ class OracleDialect_cx_oracle(OracleDialect):
arraysize=cursor.arraysize)
# allow all strings to come back natively as Unicode
elif defaultType in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR):
- return cursor.var(str, size, cursor.arraysize)
+ return cursor.var(util.text_type, size, cursor.arraysize)
def on_connect(conn):
conn.outputtypehandler = output_type_handler
@@ -767,20 +767,19 @@ class OracleDialect_cx_oracle(OracleDialect):
twophase=self.allow_twophase,
)
-# start Py2K
-# if self._cx_oracle_with_unicode:
-# for k, v in opts.items():
-# if isinstance(v, str):
-# opts[k] = unicode(v)
-# else:
-# for k, v in opts.items():
-# if isinstance(v, unicode):
-# opts[k] = str(v)
-# end Py2K
+ if util.py2k:
+ if self._cx_oracle_with_unicode:
+ for k, v in opts.items():
+ if isinstance(v, str):
+ opts[k] = unicode(v)
+ else:
+ for k, v in opts.items():
+ if isinstance(v, unicode):
+ opts[k] = str(v)
if 'mode' in url.query:
opts['mode'] = url.query['mode']
- if isinstance(opts['mode'], str):
+ if isinstance(opts['mode'], util.string_types):
mode = opts['mode'].upper()
if mode == 'SYSDBA':
opts['mode'] = self.dbapi.SYSDBA
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index ea5113c29..00bbbcd7c 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -7,6 +7,7 @@ from sqlalchemy import types as sqltypes, exc, schema
from sqlalchemy.sql import table, column
from sqlalchemy.testing import fixtures, AssertsExecutionResults, AssertsCompiledSQL
from sqlalchemy import testing
+from sqlalchemy.util import u
from sqlalchemy.testing import assert_raises, assert_raises_message
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
@@ -1267,7 +1268,7 @@ class TypesTest(fixtures.TestBase):
autoload=True, oracle_resolve_synonyms=True
)
for row in types_table.select().execute().fetchall():
- [row[k] for k in list(row.keys())]
+ [row[k] for k in row.keys()]
@testing.provide_metadata
def test_raw_roundtrip(self):
@@ -1301,11 +1302,11 @@ class TypesTest(fixtures.TestBase):
t2.c.data.type.dialect_impl(testing.db.dialect),
cx_oracle._OracleNVarChar)
- data = 'm’a réveillé.'
+ data = u('m’a réveillé.')
t2.insert().execute(data=data)
res = t2.select().execute().first()['data']
eq_(res, data)
- assert isinstance(res, str)
+ assert isinstance(res, util.text_type)
def test_char_length(self):
@@ -1436,7 +1437,7 @@ class DontReflectIOTTest(fixtures.TestBase):
m = MetaData(testing.db)
m.reflect()
eq_(
- set(t.name for t in list(m.tables.values())),
+ set(t.name for t in m.tables.values()),
set(['admin_docindex'])
)
@@ -1654,25 +1655,25 @@ class UnicodeSchemaTest(fixtures.TestBase):
{'_underscorecolumn': '’é'},
)
result = testing.db.execute(
- table.select().where(table.c._underscorecolumn=='’é')
+ table.select().where(table.c._underscorecolumn==u('’é'))
).scalar()
- eq_(result, '’é')
+ eq_(result, u('’é'))
@testing.provide_metadata
def test_quoted_column_unicode(self):
metadata = self.metadata
table=Table("atable", metadata,
- Column("méil", Unicode(255), primary_key=True),
+ Column(u("méil"), Unicode(255), primary_key=True),
)
metadata.create_all()
table.insert().execute(
- {'méil': '’é'},
+ {u('méil'): u('’é')},
)
result = testing.db.execute(
- table.select().where(table.c['méil']=='’é')
+ table.select().where(table.c[u('méil')] == u('’é'))
).scalar()
- eq_(result, '’é')
+ eq_(result, u('’é'))
class DBLinkReflectionTest(fixtures.TestBase):