diff options
-rw-r--r-- | lib/sqlalchemy/databases/oracle.py | 55 | ||||
-rw-r--r-- | test/engine/reflection.py | 5 | ||||
-rw-r--r-- | test/sql/defaults.py | 8 | ||||
-rw-r--r-- | test/sql/quote.py | 4 | ||||
-rw-r--r-- | test/sql/rowcount.py | 2 | ||||
-rw-r--r-- | test/sql/unicode.py | 6 |
6 files changed, 51 insertions, 29 deletions
diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py index 10b7cc962..9b8bb2f9e 100644 --- a/lib/sqlalchemy/databases/oracle.py +++ b/lib/sqlalchemy/databases/oracle.py @@ -296,11 +296,11 @@ class OracleDialect(ansisql.ANSIDialect): return OracleDefaultRunner(connection, **kwargs) def has_table(self, connection, table_name, schema=None): - cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':table_name.upper()}) + cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':self._denormalize_name(table_name)}) return bool( cursor.fetchone() is not None ) def has_sequence(self, connection, sequence_name): - cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':sequence_name.upper()}) + cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':self._denormalize_name(sequence_name)}) return bool( cursor.fetchone() is not None ) def _locate_owner_row(self, owner, name, rows, raiseerr=False): @@ -364,25 +364,40 @@ class OracleDialect(ansisql.ANSIDialect): dblink = '' return name, owner, dblink raise - + + def _normalize_name(self, name): + if name is None: + return None + elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower(), True): + return name.lower() + else: + return name + + def _denormalize_name(self, name): + if name is None: + return None + elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower(), True): + return name.upper() + else: + return name + def table_names(self, connection, schema): - # sorry, I have no idea what that dblink stuff is about :) + # note that table_names() isnt loading DBLINKed or synonym'ed tables s = "select table_name from all_tables where tablespace_name NOT IN ('SYSTEM', 'SYSAUX')" - return [row[0] for row in connection.execute(s)] + return [self._normalize_name(row[0]) for row in connection.execute(s)] def reflecttable(self, connection, table, include_columns): preparer = self.identifier_preparer - if not preparer.should_quote(table): - name = table.name.upper() - else: - name = table.name # search for table, including across synonyms and dblinks. # locate the actual name of the table, the real owner, and any dblink clause needed. - actual_name, owner, dblink = self._resolve_table_owner(connection, name, table) + actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table) + + print "ACTUALNAME:", actual_name c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner}) + while True: row = c.fetchone() if row is None: @@ -390,11 +405,7 @@ class OracleDialect(ansisql.ANSIDialect): found_table = True #print "ROW:" , row - (colname, coltype, length, precision, scale, nullable, default) = (row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6]) - - # if name comes back as all upper, assume its case folded - if (colname.upper() == colname): - colname = colname.lower() + (colname, coltype, length, precision, scale, nullable, default) = (self._normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6]) if include_columns and colname not in include_columns: continue @@ -433,10 +444,10 @@ class OracleDialect(ansisql.ANSIDialect): c = connection.execute("""SELECT ac.constraint_name, ac.constraint_type, - LOWER(loc.column_name) AS local_column, - LOWER(rem.table_name) AS remote_table, - LOWER(rem.column_name) AS remote_column, - LOWER(rem.owner) AS remote_owner + loc.column_name AS local_column, + rem.table_name AS remote_table, + rem.column_name AS remote_column, + rem.owner AS remote_owner FROM all_constraints%(dblink)s ac, all_cons_columns%(dblink)s loc, all_cons_columns%(dblink)s rem @@ -457,7 +468,7 @@ class OracleDialect(ansisql.ANSIDialect): if row is None: break #print "ROW:" , row - (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row + (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = row[0:2] + tuple([self._normalize_name(x) for x in row[2:]]) if cons_type == 'P': table.primary_key.add(table.c[local_column]) elif cons_type == 'R': @@ -640,7 +651,7 @@ class OracleSchemaGenerator(ansisql.ANSISchemaGenerator): class OracleSchemaDropper(ansisql.ANSISchemaDropper): def visit_sequence(self, sequence): if not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name): - self.append("DROP SEQUENCE %s" % sequence.name) + self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence)) self.execute() class OracleDefaultRunner(ansisql.ANSIDefaultRunner): @@ -649,6 +660,6 @@ class OracleDefaultRunner(ansisql.ANSIDefaultRunner): return self.connection.execute(c).scalar() def visit_sequence(self, seq): - return self.connection.execute("SELECT " + seq.name + ".nextval FROM DUAL").scalar() + return self.connection.execute("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL").scalar() dialect = OracleDialect diff --git a/test/engine/reflection.py b/test/engine/reflection.py index a704b6df1..5a9364713 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -29,7 +29,10 @@ class ReflectionTest(PersistTest): # the colon thing isnt working out for PG reflection just yet #defval3 = '1999-09-09 00:00:00' deftype3 = Date - defval3 = '1999-09-09' + if testbase.db.engine.name == 'oracle': + defval3 = text("to_date('09-09-1999', 'MM-DD-YYYY')") + else: + defval3 = '1999-09-09' else: deftype2, deftype3 = Integer, Integer defval2, defval3 = "15", "16" diff --git a/test/sql/defaults.py b/test/sql/defaults.py index a3fe8c07a..0df49ea39 100644 --- a/test/sql/defaults.py +++ b/test/sql/defaults.py @@ -208,7 +208,7 @@ class AutoIncrementTest(PersistTest): def testwithautoincrement(self): meta = MetaData(testbase.db) table = Table("aitest", meta, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True), Column('data', String(20))) table.create(checkfirst=True) try: @@ -223,7 +223,7 @@ class AutoIncrementTest(PersistTest): meta = MetaData(testbase.db) table = Table("aitest", meta, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True), Column('data', String(20))) table.create(checkfirst=True) @@ -231,7 +231,7 @@ class AutoIncrementTest(PersistTest): # simulate working on a table that doesn't already exist meta2 = MetaData(testbase.db) table2 = Table("aitest", meta2, - Column('id', Integer, primary_key=True), + Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True), Column('data', String(20))) class AiTest(object): pass @@ -260,7 +260,7 @@ class SequenceTest(PersistTest): sometable = Table( 'Manager', metadata, Column( 'obj_id', Integer, Sequence('obj_id_seq'), ), Column( 'name', String, ), - Column( 'id', Integer, primary_key= True, ), + Column( 'id', Integer, Sequence('Manager_id_seq', optional=True), primary_key=True), ) metadata.create_all() diff --git a/test/sql/quote.py b/test/sql/quote.py index eb0239124..9791bf946 100644 --- a/test/sql/quote.py +++ b/test/sql/quote.py @@ -107,7 +107,9 @@ class QuoteTest(PersistTest): x = select([sql.literal_column("'FooCol'").label("SomeLabel")], from_obj=[table]) x = x.select() assert str(x) == '''SELECT "SomeLabel" \nFROM (SELECT 'FooCol' AS "SomeLabel" \nFROM "ImATable")''' - + + # oracle doesn't support non-case-sensitive until ticket #726 is fixed + @testing.unsupported('oracle') def testlabelsnocase(self): metadata = MetaData() table1 = Table('SomeCase1', metadata, diff --git a/test/sql/rowcount.py b/test/sql/rowcount.py index e0da96a81..cf9ba30d9 100644 --- a/test/sql/rowcount.py +++ b/test/sql/rowcount.py @@ -11,7 +11,7 @@ class FoundRowsTest(AssertMixin): global employees_table employees_table = Table('employees', metadata, - Column('employee_id', Integer, primary_key=True), + Column('employee_id', Integer, Sequence('employee_id_seq', optional=True), primary_key=True), Column('name', String(50)), Column('department', String(1)), ) diff --git a/test/sql/unicode.py b/test/sql/unicode.py index cb96121b0..abeac370e 100644 --- a/test/sql/unicode.py +++ b/test/sql/unicode.py @@ -9,6 +9,7 @@ from testlib.engines import utf8_engine class UnicodeSchemaTest(PersistTest): + @testing.unsupported('oracle') def setUpAll(self): global unicode_bind, metadata, t1, t2 @@ -25,16 +26,19 @@ class UnicodeSchemaTest(PersistTest): ) metadata.create_all() + @testing.unsupported('oracle') def tearDown(self): if metadata.tables: t2.delete().execute() t1.delete().execute() + @testing.unsupported('oracle') def tearDownAll(self): global unicode_bind metadata.drop_all() del unicode_bind + @testing.unsupported('oracle') def test_insert(self): t1.insert().execute({u'méil':1, u'\u6e2c\u8a66':5}) t2.insert().execute({'a':1, 'b':1}) @@ -42,6 +46,7 @@ class UnicodeSchemaTest(PersistTest): assert t1.select().execute().fetchall() == [(1, 5)] assert t2.select().execute().fetchall() == [(1, 1)] + @testing.unsupported('oracle') def test_reflect(self): t1.insert().execute({u'méil':2, u'\u6e2c\u8a66':7}) t2.insert().execute({'a':2, 'b':2}) @@ -60,6 +65,7 @@ class UnicodeSchemaTest(PersistTest): meta.drop_all() metadata.create_all() + @testing.unsupported('oracle') def test_mapping(self): # TODO: this test should be moved to the ORM tests, tests should be # added to this module testing SQL syntax and joins, etc. |