summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-03-12 23:45:53 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2010-03-12 23:45:53 +0000
commit0744264be82d13b48538a5efd210251ec340205b (patch)
tree9fe71902fe3e56030e35412db94bd8f28433dc3c
parentd816c09f66f7d1735a764d27a187fbd0cd1f576e (diff)
parentb9a1e94a731b8d41b0dbac0ef565eb7d8b6ccb34 (diff)
downloadsqlalchemy-0744264be82d13b48538a5efd210251ec340205b.tar.gz
further refinement
-rw-r--r--lib/sqlalchemy/dialects/oracle/cx_oracle.py49
-rw-r--r--lib/sqlalchemy/engine/default.py28
2 files changed, 39 insertions, 38 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
index b9f567d34..ab92cc466 100644
--- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py
+++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py
@@ -329,35 +329,37 @@ class Oracle_cx_oracle(OracleDialect):
self.auto_setinputsizes = auto_setinputsizes
self.auto_convert_lobs = auto_convert_lobs
- def vers(num):
- return tuple([int(x) for x in num.split('.')])
-
if hasattr(self.dbapi, 'version'):
- cx_oracle_ver = vers(self.dbapi.version)
+ cx_oracle_ver = tuple([int(x) for x in self.dbapi.version.split('.')])
self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0)
-
- if self.dbapi is not None and not hasattr(self.dbapi, 'UNICODE'):
- # cx_Oracle WITH_UNICODE mode. *only* python
- # unicode objects accepted for anything
- self._cx_oracle_string_types = set([self.dbapi.STRING, self.dbapi.NCLOB])
- self.supports_unicode_statements = True
- self.supports_unicode_binds = True
- self._cx_oracle_with_unicode = True
+ else:
+ cx_oracle_ver = None
+
+ def types(*names):
+ return set([getattr(self.dbapi, name, None) for name in names]).difference([None])
+
+ self._cx_oracle_string_types = types("STRING", "UNICODE", "NCLOB", "CLOB")
+ self._cx_oracle_unicode_types = types("UNICODE", "NCLOB")
+ self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB")
+
+ if cx_oracle_ver is None:
+ # this occurs in tests with mock DBAPIs
+ self._cx_oracle_string_types = set()
+ self._cx_oracle_with_unicode = False
+ elif not hasattr(self.dbapi, 'UNICODE'):
+ # cx_Oracle WITH_UNICODE mode. *only* python
+ # unicode objects accepted for anything
+ self.supports_unicode_statements = True
+ self.supports_unicode_binds = True
+ self._cx_oracle_with_unicode = True
else:
- self._cx_oracle_with_unicode = False
- if self.dbapi is not None:
- self._cx_oracle_string_types = set([self.dbapi.UNICODE, self.dbapi.NCLOB, self.dbapi.STRING])
- self._cx_oracle_unicode_types = set([self.dbapi.UNICODE, self.dbapi.NCLOB])
- else:
- self._cx_oracle_string_types = set()
-
-
- if self.dbapi is None or \
+ self._cx_oracle_with_unicode = False
+
+ if cx_oracle_ver is None or \
not self.auto_convert_lobs or \
not hasattr(self.dbapi, 'CLOB'):
self.dbapi_type_map = {}
- self._cx_oracle_binary_types = set()
else:
# only use this for LOB objects. using it for strings, dates
# etc. leads to a little too much magic, reflection doesn't know if it should
@@ -368,9 +370,6 @@ class Oracle_cx_oracle(OracleDialect):
self.dbapi.BLOB: oracle.BLOB(),
self.dbapi.BINARY: oracle.RAW(),
}
- self._cx_oracle_binary_types = set([getattr(self.dbapi, k) for k in
- ["BFILE", "CLOB", "NCLOB", "BLOB"]
- if hasattr(self.dbapi, k)])
@classmethod
def dbapi(cls):
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index cfab01dc4..8222c93cd 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -137,7 +137,6 @@ class DefaultDialect(base.Dialect):
self.returns_unicode_strings = self._check_unicode_returns(connection)
def _check_unicode_returns(self, connection):
- cursor = connection.connection.cursor()
# Py2K
if self.supports_unicode_statements:
cast_to = unicode
@@ -147,24 +146,27 @@ class DefaultDialect(base.Dialect):
# Py3K
#cast_to = str
def check_unicode(type_):
- cursor.execute(
- cast_to(
- expression.select(
- [expression.cast(
- expression.literal_column("'test unicode returns'"), type_)
- ]).compile(dialect=self)
+ cursor = connection.connection.cursor()
+ try:
+ cursor.execute(
+ cast_to(
+ expression.select(
+ [expression.cast(
+ expression.literal_column("'test unicode returns'"), type_)
+ ]).compile(dialect=self)
+ )
)
- )
-
- row = cursor.fetchone()
- return isinstance(row[0], unicode)
-
+ row = cursor.fetchone()
+
+ return isinstance(row[0], unicode)
+ finally:
+ cursor.close()
+
# detect plain VARCHAR
unicode_for_varchar = check_unicode(sqltypes.VARCHAR(60))
# detect if there's an NVARCHAR type with different behavior available
unicode_for_unicode = check_unicode(sqltypes.Unicode(60))
- cursor.close()
if unicode_for_unicode and not unicode_for_varchar:
return "conditional"