summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/sqlalchemy/databases/oracle.py55
-rw-r--r--test/engine/reflection.py5
-rw-r--r--test/sql/defaults.py8
-rw-r--r--test/sql/quote.py4
-rw-r--r--test/sql/rowcount.py2
-rw-r--r--test/sql/unicode.py6
6 files changed, 51 insertions, 29 deletions
diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py
index 10b7cc962..9b8bb2f9e 100644
--- a/lib/sqlalchemy/databases/oracle.py
+++ b/lib/sqlalchemy/databases/oracle.py
@@ -296,11 +296,11 @@ class OracleDialect(ansisql.ANSIDialect):
return OracleDefaultRunner(connection, **kwargs)
def has_table(self, connection, table_name, schema=None):
- cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':table_name.upper()})
+ cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':self._denormalize_name(table_name)})
return bool( cursor.fetchone() is not None )
def has_sequence(self, connection, sequence_name):
- cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':sequence_name.upper()})
+ cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':self._denormalize_name(sequence_name)})
return bool( cursor.fetchone() is not None )
def _locate_owner_row(self, owner, name, rows, raiseerr=False):
@@ -364,25 +364,40 @@ class OracleDialect(ansisql.ANSIDialect):
dblink = ''
return name, owner, dblink
raise
-
+
+ def _normalize_name(self, name):
+ if name is None:
+ return None
+ elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower(), True):
+ return name.lower()
+ else:
+ return name
+
+ def _denormalize_name(self, name):
+ if name is None:
+ return None
+ elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower(), True):
+ return name.upper()
+ else:
+ return name
+
def table_names(self, connection, schema):
- # sorry, I have no idea what that dblink stuff is about :)
+ # note that table_names() isnt loading DBLINKed or synonym'ed tables
s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM', 'SYSAUX')"
- return [row[0] for row in connection.execute(s)]
+ return [self._normalize_name(row[0]) for row in connection.execute(s)]
def reflecttable(self, connection, table, include_columns):
preparer = self.identifier_preparer
- if not preparer.should_quote(table):
- name = table.name.upper()
- else:
- name = table.name
# search for table, including across synonyms and dblinks.
# locate the actual name of the table, the real owner, and any dblink clause needed.
- actual_name, owner, dblink = self._resolve_table_owner(connection, name, table)
+ actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table)
+
+ print "ACTUALNAME:", actual_name
c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})
+
while True:
row = c.fetchone()
if row is None:
@@ -390,11 +405,7 @@ class OracleDialect(ansisql.ANSIDialect):
found_table = True
#print "ROW:" , row
- (colname, coltype, length, precision, scale, nullable, default) = (row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
-
- # if name comes back as all upper, assume its case folded
- if (colname.upper() == colname):
- colname = colname.lower()
+ (colname, coltype, length, precision, scale, nullable, default) = (self._normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
if include_columns and colname not in include_columns:
continue
@@ -433,10 +444,10 @@ class OracleDialect(ansisql.ANSIDialect):
c = connection.execute("""SELECT
ac.constraint_name,
ac.constraint_type,
- LOWER(loc.column_name) AS local_column,
- LOWER(rem.table_name) AS remote_table,
- LOWER(rem.column_name) AS remote_column,
- LOWER(rem.owner) AS remote_owner
+ loc.column_name AS local_column,
+ rem.table_name AS remote_table,
+ rem.column_name AS remote_column,
+ rem.owner AS remote_owner
FROM all_constraints%(dblink)s ac,
all_cons_columns%(dblink)s loc,
all_cons_columns%(dblink)s rem
@@ -457,7 +468,7 @@ class OracleDialect(ansisql.ANSIDialect):
if row is None:
break
#print "ROW:" , row
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row
+ (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row[0:2] + tuple([self._normalize_name(x) for x in row[2:]])
if cons_type == 'P':
table.primary_key.add(table.c[local_column])
elif cons_type == 'R':
@@ -640,7 +651,7 @@ class OracleSchemaGenerator(ansisql.ANSISchemaGenerator):
class OracleSchemaDropper(ansisql.ANSISchemaDropper):
def visit_sequence(self, sequence):
if not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name):
- self.append("DROP SEQUENCE %s" % sequence.name)
+ self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence))
self.execute()
class OracleDefaultRunner(ansisql.ANSIDefaultRunner):
@@ -649,6 +660,6 @@ class OracleDefaultRunner(ansisql.ANSIDefaultRunner):
return self.connection.execute(c).scalar()
def visit_sequence(self, seq):
- return self.connection.execute("SELECT " + seq.name + ".nextval FROM DUAL").scalar()
+ return self.connection.execute("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL").scalar()
dialect = OracleDialect
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index a704b6df1..5a9364713 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -29,7 +29,10 @@ class ReflectionTest(PersistTest):
# the colon thing isnt working out for PG reflection just yet
#defval3 = '1999-09-09 00:00:00'
deftype3 = Date
- defval3 = '1999-09-09'
+ if testbase.db.engine.name == 'oracle':
+ defval3 = text("to_date('09-09-1999', 'MM-DD-YYYY')")
+ else:
+ defval3 = '1999-09-09'
else:
deftype2, deftype3 = Integer, Integer
defval2, defval3 = "15", "16"
diff --git a/test/sql/defaults.py b/test/sql/defaults.py
index a3fe8c07a..0df49ea39 100644
--- a/test/sql/defaults.py
+++ b/test/sql/defaults.py
@@ -208,7 +208,7 @@ class AutoIncrementTest(PersistTest):
def testwithautoincrement(self):
meta = MetaData(testbase.db)
table = Table("aitest", meta,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True),
Column('data', String(20)))
table.create(checkfirst=True)
try:
@@ -223,7 +223,7 @@ class AutoIncrementTest(PersistTest):
meta = MetaData(testbase.db)
table = Table("aitest", meta,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True),
Column('data', String(20)))
table.create(checkfirst=True)
@@ -231,7 +231,7 @@ class AutoIncrementTest(PersistTest):
# simulate working on a table that doesn't already exist
meta2 = MetaData(testbase.db)
table2 = Table("aitest", meta2,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True),
Column('data', String(20)))
class AiTest(object):
pass
@@ -260,7 +260,7 @@ class SequenceTest(PersistTest):
sometable = Table( 'Manager', metadata,
Column( 'obj_id', Integer, Sequence('obj_id_seq'), ),
Column( 'name', String, ),
- Column( 'id', Integer, primary_key= True, ),
+ Column( 'id', Integer, Sequence('Manager_id_seq', optional=True), primary_key=True),
)
metadata.create_all()
diff --git a/test/sql/quote.py b/test/sql/quote.py
index eb0239124..9791bf946 100644
--- a/test/sql/quote.py
+++ b/test/sql/quote.py
@@ -107,7 +107,9 @@ class QuoteTest(PersistTest):
x = select([sql.literal_column("'FooCol'").label("SomeLabel")], from_obj=[table])
x = x.select()
assert str(x) == '''SELECT "SomeLabel" \nFROM (SELECT 'FooCol' AS "SomeLabel" \nFROM "ImATable")'''
-
+
+ # oracle doesn't support non-case-sensitive until ticket #726 is fixed
+ @testing.unsupported('oracle')
def testlabelsnocase(self):
metadata = MetaData()
table1 = Table('SomeCase1', metadata,
diff --git a/test/sql/rowcount.py b/test/sql/rowcount.py
index e0da96a81..cf9ba30d9 100644
--- a/test/sql/rowcount.py
+++ b/test/sql/rowcount.py
@@ -11,7 +11,7 @@ class FoundRowsTest(AssertMixin):
global employees_table
employees_table = Table('employees', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, Sequence('employee_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
Column('department', String(1)),
)
diff --git a/test/sql/unicode.py b/test/sql/unicode.py
index cb96121b0..abeac370e 100644
--- a/test/sql/unicode.py
+++ b/test/sql/unicode.py
@@ -9,6 +9,7 @@ from testlib.engines import utf8_engine
class UnicodeSchemaTest(PersistTest):
+ @testing.unsupported('oracle')
def setUpAll(self):
global unicode_bind, metadata, t1, t2
@@ -25,16 +26,19 @@ class UnicodeSchemaTest(PersistTest):
)
metadata.create_all()
+ @testing.unsupported('oracle')
def tearDown(self):
if metadata.tables:
t2.delete().execute()
t1.delete().execute()
+ @testing.unsupported('oracle')
def tearDownAll(self):
global unicode_bind
metadata.drop_all()
del unicode_bind
+ @testing.unsupported('oracle')
def test_insert(self):
t1.insert().execute({u'méil':1, u'\u6e2c\u8a66':5})
t2.insert().execute({'a':1, 'b':1})
@@ -42,6 +46,7 @@ class UnicodeSchemaTest(PersistTest):
assert t1.select().execute().fetchall() == [(1, 5)]
assert t2.select().execute().fetchall() == [(1, 1)]
+ @testing.unsupported('oracle')
def test_reflect(self):
t1.insert().execute({u'méil':2, u'\u6e2c\u8a66':7})
t2.insert().execute({'a':2, 'b':2})
@@ -60,6 +65,7 @@ class UnicodeSchemaTest(PersistTest):
meta.drop_all()
metadata.create_all()
+ @testing.unsupported('oracle')
def test_mapping(self):
# TODO: this test should be moved to the ORM tests, tests should be
# added to this module testing SQL syntax and joins, etc.