diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-06 01:14:26 -0500 |
---|---|---|
committer | mike bayer <mike_mp@zzzcomputing.com> | 2019-01-06 17:34:50 +0000 |
commit | 1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch) | |
tree | 28e725c5c8188bd0cfd133d1e268dbca9b524978 /test/dialect/oracle/test_reflection.py | |
parent | 404e69426b05a82d905cbb3ad33adafccddb00dd (diff) | |
download | sqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz |
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits
applied at all.
The black run will format code consistently, however in
some cases that are prevalent in SQLAlchemy code it produces
too-long lines. The too-long lines will be resolved in the
following commit that will resolve all remaining flake8 issues
including shadowed builtins, long lines, import order, unused
imports, duplicate imports, and docstring issues.
Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'test/dialect/oracle/test_reflection.py')
-rw-r--r-- | test/dialect/oracle/test_reflection.py | 553 |
1 files changed, 333 insertions, 220 deletions
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index f749e513a..a88703ab0 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -6,22 +6,61 @@ from sqlalchemy import exc from sqlalchemy.sql import table from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import testing -from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ - Index, MetaData, select, inspect, ForeignKey, String, func, \ - TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ - literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ - ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ - union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ - PrimaryKeyConstraint, FLOAT, INTEGER -from sqlalchemy.dialects.oracle.base import NUMBER, BINARY_DOUBLE, \ - BINARY_FLOAT, DOUBLE_PRECISION +from sqlalchemy import ( + Integer, + Text, + LargeBinary, + Unicode, + UniqueConstraint, + Index, + MetaData, + select, + inspect, + ForeignKey, + String, + func, + TypeDecorator, + bindparam, + Numeric, + TIMESTAMP, + CHAR, + text, + literal_column, + VARCHAR, + create_engine, + Date, + NVARCHAR, + ForeignKeyConstraint, + Sequence, + Float, + DateTime, + cast, + UnicodeText, + union, + except_, + type_coerce, + or_, + outerjoin, + DATE, + NCHAR, + outparam, + PrimaryKeyConstraint, + FLOAT, + INTEGER, +) +from sqlalchemy.dialects.oracle.base import ( + NUMBER, + BINARY_DOUBLE, + BINARY_FLOAT, + DOUBLE_PRECISION, +) from sqlalchemy.testing import assert_raises from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Table, Column class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @classmethod @@ -30,7 +69,8 @@ class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): # don't really know how else to go here unless # we connect as the other user. - for stmt in (""" + for stmt in ( + """ create table %(test_schema)s.parent( id integer primary key, data varchar2(50) @@ -60,13 +100,16 @@ create synonym %(test_schema)s.local_table for local_table; -- so we give it to public. ideas welcome. grant references on %(test_schema)s.parent to public; grant references on %(test_schema)s.child to public; -""" % {"test_schema": testing.config.test_schema}).split(";"): +""" + % {"test_schema": testing.config.test_schema} + ).split(";"): if stmt.strip(): testing.db.execute(stmt) @classmethod def teardown_class(cls): - for stmt in (""" + for stmt in ( + """ drop table %(test_schema)s.child; drop table %(test_schema)s.parent; drop table local_table; @@ -75,7 +118,9 @@ drop synonym %(test_schema)s.ptable; drop synonym %(test_schema)s_pt; drop synonym %(test_schema)s.local_table; -""" % {"test_schema": testing.config.test_schema}).split(";"): +""" + % {"test_schema": testing.config.test_schema} + ).split(";"): if stmt.strip(): testing.db.execute(stmt) @@ -83,192 +128,235 @@ drop synonym %(test_schema)s.local_table; def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name meta = self.metadata - parent = Table('parent', meta, - Column('pid', Integer, primary_key=True), - schema=schema) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', - Integer, - ForeignKey('%s.parent.pid' % schema)), - schema=schema) + parent = Table( + "parent", + meta, + Column("pid", Integer, primary_key=True), + schema=schema, + ) + child = Table( + "child", + meta, + Column("cid", Integer, primary_key=True), + Column("pid", Integer, ForeignKey("%s.parent.pid" % schema)), + schema=schema, + ) meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) + parent.insert().execute({"pid": 1}) + child.insert().execute({"cid": 1, "pid": 1}) eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) - parent = Table('%s_pt' % testing.config.test_schema, - meta, - autoload=True, - oracle_resolve_synonyms=True) - self.assert_compile(parent.select(), - "SELECT %(test_schema)s_pt.id, " - "%(test_schema)s_pt.data FROM %(test_schema)s_pt" - % {"test_schema": testing.config.test_schema}) + parent = Table( + "%s_pt" % testing.config.test_schema, + meta, + autoload=True, + oracle_resolve_synonyms=True, + ) + self.assert_compile( + parent.select(), + "SELECT %(test_schema)s_pt.id, " + "%(test_schema)s_pt.data FROM %(test_schema)s_pt" + % {"test_schema": testing.config.test_schema}, + ) select([parent]).execute().fetchall() def test_reflect_alt_synonym_owner_local_table(self): meta = MetaData(testing.db) parent = Table( - 'local_table', meta, autoload=True, - oracle_resolve_synonyms=True, schema=testing.config.test_schema) + "local_table", + meta, + autoload=True, + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ) self.assert_compile( parent.select(), "SELECT %(test_schema)s.local_table.id, " "%(test_schema)s.local_table.data " - "FROM %(test_schema)s.local_table" % - {"test_schema": testing.config.test_schema} + "FROM %(test_schema)s.local_table" + % {"test_schema": testing.config.test_schema}, ) select([parent]).execute().fetchall() @testing.provide_metadata def test_create_same_names_implicit_schema(self): meta = self.metadata - parent = Table('parent', - meta, - Column('pid', Integer, primary_key=True)) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', Integer, ForeignKey('parent.pid'))) + parent = Table( + "parent", meta, Column("pid", Integer, primary_key=True) + ) + child = Table( + "child", + meta, + Column("cid", Integer, primary_key=True), + Column("pid", Integer, ForeignKey("parent.pid")), + ) meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) + parent.insert().execute({"pid": 1}) + child.insert().execute({"cid": 1, "pid": 1}) eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): meta = MetaData(testing.db) parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) + "parent", meta, autoload=True, schema=testing.config.test_schema + ) child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) + "child", meta, autoload=True, schema=testing.config.test_schema + ) self.assert_compile( parent.join(child), "%(test_schema)s.parent JOIN %(test_schema)s.child ON " - "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % { - "test_schema": testing.config.test_schema - }) - select([parent, child]).\ - select_from(parent.join(child)).\ - execute().fetchall() + "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() def test_reflect_local_to_remote(self): testing.db.execute( - 'CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%(test_schema)s.parent(id))' % { - "test_schema": testing.config.test_schema}) + "CREATE TABLE localtable (id INTEGER " + "PRIMARY KEY, parent_id INTEGER REFERENCES " + "%(test_schema)s.parent(id))" + % {"test_schema": testing.config.test_schema} + ) try: meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True) - parent = meta.tables['%s.parent' % testing.config.test_schema] - self.assert_compile(parent.join(lcl), - '%(test_schema)s.parent JOIN localtable ON ' - '%(test_schema)s.parent.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema} - ) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() + lcl = Table("localtable", meta, autoload=True) + parent = meta.tables["%s.parent" % testing.config.test_schema] + self.assert_compile( + parent.join(lcl), + "%(test_schema)s.parent JOIN localtable ON " + "%(test_schema)s.parent.id = " + "localtable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, lcl]).select_from( + parent.join(lcl) + ).execute().fetchall() finally: - testing.db.execute('DROP TABLE localtable') + testing.db.execute("DROP TABLE localtable") def test_reflect_alt_owner_implicit(self): meta = MetaData(testing.db) parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) + "parent", meta, autoload=True, schema=testing.config.test_schema + ) child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) + "child", meta, autoload=True, schema=testing.config.test_schema + ) self.assert_compile( parent.join(child), - '%(test_schema)s.parent JOIN %(test_schema)s.child ' - 'ON %(test_schema)s.parent.id = ' - '%(test_schema)s.child.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() + "%(test_schema)s.parent JOIN %(test_schema)s.child " + "ON %(test_schema)s.parent.id = " + "%(test_schema)s.child.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() def test_reflect_alt_owner_synonyms(self): - testing.db.execute('CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%s.ptable(id))' % testing.config.test_schema) + testing.db.execute( + "CREATE TABLE localtable (id INTEGER " + "PRIMARY KEY, parent_id INTEGER REFERENCES " + "%s.ptable(id))" % testing.config.test_schema + ) try: meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True, - oracle_resolve_synonyms=True) - parent = meta.tables['%s.ptable' % testing.config.test_schema] + lcl = Table( + "localtable", meta, autoload=True, oracle_resolve_synonyms=True + ) + parent = meta.tables["%s.ptable" % testing.config.test_schema] self.assert_compile( parent.join(lcl), - '%(test_schema)s.ptable JOIN localtable ON ' - '%(test_schema)s.ptable.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() + "%(test_schema)s.ptable JOIN localtable ON " + "%(test_schema)s.ptable.id = " + "localtable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, lcl]).select_from( + parent.join(lcl) + ).execute().fetchall() finally: - testing.db.execute('DROP TABLE localtable') + testing.db.execute("DROP TABLE localtable") def test_reflect_remote_synonyms(self): meta = MetaData(testing.db) - parent = Table('ptable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) - child = Table('ctable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) + parent = Table( + "ptable", + meta, + autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True, + ) + child = Table( + "ctable", + meta, + autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True, + ) self.assert_compile( parent.join(child), - '%(test_schema)s.ptable JOIN ' - '%(test_schema)s.ctable ' - 'ON %(test_schema)s.ptable.id = ' - '%(test_schema)s.ctable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() + "%(test_schema)s.ptable JOIN " + "%(test_schema)s.ctable " + "ON %(test_schema)s.ptable.id = " + "%(test_schema)s.ctable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() class ConstraintTest(fixtures.TablesTest): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True run_deletes = None @classmethod def define_tables(cls, metadata): - Table('foo', metadata, Column('id', Integer, primary_key=True)) + Table("foo", metadata, Column("id", Integer, primary_key=True)) def test_oracle_has_no_on_update_cascade(self): - bar = Table('bar', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', - Integer, - ForeignKey('foo.id', onupdate='CASCADE'))) + bar = Table( + "bar", + self.metadata, + Column("id", Integer, primary_key=True), + Column( + "foo_id", Integer, ForeignKey("foo.id", onupdate="CASCADE") + ), + ) assert_raises(exc.SAWarning, bar.create) - bat = Table('bat', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer), - ForeignKeyConstraint(['foo_id'], ['foo.id'], - onupdate='CASCADE')) + bat = Table( + "bat", + self.metadata, + Column("id", Integer, primary_key=True), + Column("foo_id", Integer), + ForeignKeyConstraint(["foo_id"], ["foo.id"], onupdate="CASCADE"), + ) assert_raises(exc.SAWarning, bat.create) def test_reflect_check_include_all(self): insp = inspect(testing.db) - eq_(insp.get_check_constraints('foo'), []) + eq_(insp.get_check_constraints("foo"), []) eq_( - [rec['sqltext'] - for rec in insp.get_check_constraints('foo', include_all=True)], - ['"ID" IS NOT NULL']) + [ + rec["sqltext"] + for rec in insp.get_check_constraints("foo", include_all=True) + ], + ['"ID" IS NOT NULL'], + ) class SystemTableTablenamesTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True def setup(self): @@ -287,23 +375,20 @@ class SystemTableTablenamesTest(fixtures.TestBase): def test_table_names_no_system(self): insp = inspect(testing.db) - eq_( - insp.get_table_names(), ["my_table"] - ) + eq_(insp.get_table_names(), ["my_table"]) def test_temp_table_names_no_system(self): insp = inspect(testing.db) - eq_( - insp.get_temp_table_names(), ["my_temp_table"] - ) + eq_(insp.get_temp_table_names(), ["my_temp_table"]) def test_table_names_w_system(self): engine = testing_engine(options={"exclude_tablespaces": ["FOO"]}) insp = inspect(engine) eq_( - set(insp.get_table_names()).intersection(["my_table", - "foo_table"]), - set(["my_table", "foo_table"]) + set(insp.get_table_names()).intersection( + ["my_table", "foo_table"] + ), + set(["my_table", "foo_table"]), ) @@ -311,11 +396,12 @@ class DontReflectIOTTest(fixtures.TestBase): """test that index overflow tables aren't included in table_names.""" - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True def setup(self): - testing.db.execute(""" + testing.db.execute( + """ CREATE TABLE admin_docindex( token char(20), doc_id NUMBER, @@ -326,7 +412,8 @@ class DontReflectIOTTest(fixtures.TestBase): TABLESPACE users PCTTHRESHOLD 20 OVERFLOW TABLESPACE users - """) + """ + ) def teardown(self): testing.db.execute("drop table admin_docindex") @@ -334,35 +421,37 @@ class DontReflectIOTTest(fixtures.TestBase): def test_reflect_all(self): m = MetaData(testing.db) m.reflect() - eq_( - set(t.name for t in m.tables.values()), - set(['admin_docindex']) - ) + eq_(set(t.name for t in m.tables.values()), set(["admin_docindex"])) class UnsupportedIndexReflectTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.emits_warning("No column names") @testing.provide_metadata def test_reflect_functional_index(self): metadata = self.metadata - Table('test_index_reflect', metadata, - Column('data', String(20), primary_key=True)) + Table( + "test_index_reflect", + metadata, + Column("data", String(20), primary_key=True), + ) metadata.create_all() - testing.db.execute('CREATE INDEX DATA_IDX ON ' - 'TEST_INDEX_REFLECT (UPPER(DATA))') + testing.db.execute( + "CREATE INDEX DATA_IDX ON " "TEST_INDEX_REFLECT (UPPER(DATA))" + ) m2 = MetaData(testing.db) - Table('test_index_reflect', m2, autoload=True) + Table("test_index_reflect", m2, autoload=True) def all_tables_compression_missing(): try: - testing.db.execute('SELECT compression FROM all_tables') + testing.db.execute("SELECT compression FROM all_tables") if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): + "select * from v$version" + ): return True return False except Exception: @@ -371,9 +460,10 @@ def all_tables_compression_missing(): def all_tables_compress_for_missing(): try: - testing.db.execute('SELECT compress_for FROM all_tables') + testing.db.execute("SELECT compress_for FROM all_tables") if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): + "select * from v$version" + ): return True return False except Exception: @@ -381,7 +471,7 @@ def all_tables_compress_for_missing(): class TableReflectionTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata @@ -389,35 +479,41 @@ class TableReflectionTest(fixtures.TestBase): def test_reflect_basic_compression(self): metadata = self.metadata - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress=True) + tbl = Table( + "test_compress", + metadata, + Column("data", Integer, primary_key=True), + oracle_compress=True, + ) metadata.create_all() m2 = MetaData(testing.db) - tbl = Table('test_compress', m2, autoload=True) + tbl = Table("test_compress", m2, autoload=True) # Don't hardcode the exact value, but it must be non-empty - assert tbl.dialect_options['oracle']['compress'] + assert tbl.dialect_options["oracle"]["compress"] @testing.provide_metadata @testing.fails_if(all_tables_compress_for_missing) def test_reflect_oltp_compression(self): metadata = self.metadata - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress="OLTP") + tbl = Table( + "test_compress", + metadata, + Column("data", Integer, primary_key=True), + oracle_compress="OLTP", + ) metadata.create_all() m2 = MetaData(testing.db) - tbl = Table('test_compress', m2, autoload=True) - assert tbl.dialect_options['oracle']['compress'] == "OLTP" + tbl = Table("test_compress", m2, autoload=True) + assert tbl.dialect_options["oracle"]["compress"] == "OLTP" class RoundTripIndexTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata @@ -425,22 +521,27 @@ class RoundTripIndexTest(fixtures.TestBase): metadata = self.metadata s_table = Table( - "sometable", metadata, + "sometable", + metadata, Column("id_a", Unicode(255), primary_key=True), - Column("id_b", - Unicode(255), - primary_key=True, - unique=True), + Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col', 'group')) + UniqueConstraint("col", "group"), + ) # "group" is a keyword, so lower case - normalind = Index('tableind', s_table.c.id_b, s_table.c.group) - Index('compress1', s_table.c.id_a, s_table.c.id_b, - oracle_compress=True) - Index('compress2', s_table.c.id_a, s_table.c.id_b, s_table.c.col, - oracle_compress=1) + normalind = Index("tableind", s_table.c.id_b, s_table.c.group) + Index( + "compress1", s_table.c.id_a, s_table.c.id_b, oracle_compress=True + ) + Index( + "compress2", + s_table.c.id_a, + s_table.c.id_b, + s_table.c.col, + oracle_compress=1, + ) metadata.create_all() mirror = MetaData(testing.db) @@ -452,9 +553,11 @@ class RoundTripIndexTest(fixtures.TestBase): inspect.reflect() def obj_definition(obj): - return (obj.__class__, - tuple([c.name for c in obj.columns]), - getattr(obj, 'unique', None)) + return ( + obj.__class__, + tuple([c.name for c in obj.columns]), + getattr(obj, "unique", None), + ) # find what the primary k constraint name should be primaryconsname = testing.db.scalar( @@ -463,62 +566,72 @@ class RoundTripIndexTest(fixtures.TestBase): FROM all_constraints WHERE table_name = :table_name AND owner = :owner - AND constraint_type = 'P' """), + AND constraint_type = 'P' """ + ), table_name=s_table.name.upper(), - owner=testing.db.dialect.default_schema_name.upper()) + owner=testing.db.dialect.default_schema_name.upper(), + ) reflectedtable = inspect.tables[s_table.name] # make a dictionary of the reflected objects: - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) + reflected = dict( + [ + (obj_definition(i), i) + for i in reflectedtable.indexes | reflectedtable.constraints + ] + ) # assert we got primary key constraint and its name, Error # if not in dict - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ + assert ( + reflected[ + (PrimaryKeyConstraint, ("id_a", "id_b", "group"), None) + ].name.upper() == primaryconsname.upper() + ) # Error if not in dict - eq_( - reflected[(Index, ('id_b', 'group'), False)].name, - normalind.name - ) - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected + eq_(reflected[(Index, ("id_b", "group"), False)].name, normalind.name) + assert (Index, ("id_b",), True) in reflected + assert (Index, ("col", "group"), True) in reflected - idx = reflected[(Index, ('id_a', 'id_b', ), False)] - assert idx.dialect_options['oracle']['compress'] == 2 + idx = reflected[(Index, ("id_a", "id_b"), False)] + assert idx.dialect_options["oracle"]["compress"] == 2 - idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)] - assert idx.dialect_options['oracle']['compress'] == 1 + idx = reflected[(Index, ("id_a", "id_b", "col"), False)] + assert idx.dialect_options["oracle"]["compress"] == 1 eq_(len(reflectedtable.constraints), 1) eq_(len(reflectedtable.indexes), 5) class DBLinkReflectionTest(fixtures.TestBase): - __requires__ = 'oracle_test_dblink', - __only_on__ = 'oracle' + __requires__ = ("oracle_test_dblink",) + __only_on__ = "oracle" __backend__ = True @classmethod def setup_class(cls): from sqlalchemy.testing import config - cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link') + + cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link") # note that the synonym here is still not totally functional # when accessing via a different username as we do with the # multiprocess test suite, so testing here is minimal with testing.db.connect() as conn: - conn.execute("create table test_table " - "(id integer primary key, data varchar2(50))") - conn.execute("create synonym test_table_syn " - "for test_table@%s" % cls.dblink) + conn.execute( + "create table test_table " + "(id integer primary key, data varchar2(50))" + ) + conn.execute( + "create synonym test_table_syn " + "for test_table@%s" % cls.dblink + ) @classmethod def teardown_class(cls): @@ -530,24 +643,29 @@ class DBLinkReflectionTest(fixtures.TestBase): """test the resolution of the synonym/dblink. """ m = MetaData() - t = Table('test_table_syn', m, autoload=True, - autoload_with=testing.db, oracle_resolve_synonyms=True) - eq_(list(t.c.keys()), ['id', 'data']) + t = Table( + "test_table_syn", + m, + autoload=True, + autoload_with=testing.db, + oracle_resolve_synonyms=True, + ) + eq_(list(t.c.keys()), ["id", "data"]) eq_(list(t.primary_key), [t.c.id]) class TypeReflectionTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata def _run_test(self, specs, attributes): - columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] + columns = [Column("c%i" % (i + 1), t[0]) for i, t in enumerate(specs)] m = self.metadata - Table('oracle_types', m, *columns) + Table("oracle_types", m, *columns) m.create_all() m2 = MetaData(testing.db) - table = Table('oracle_types', m2, autoload=True) + table = Table("oracle_types", m2, autoload=True) for i, (reflected_col, spec) in enumerate(zip(table.c, specs)): expected_spec = spec[1] reflected_type = reflected_col.type @@ -557,28 +675,23 @@ class TypeReflectionTest(fixtures.TestBase): getattr(reflected_type, attr), getattr(expected_spec, attr), "Column %s: Attribute %s value of %s does not " - "match %s for type %s" % ( + "match %s for type %s" + % ( "c%i" % (i + 1), attr, getattr(reflected_type, attr), getattr(expected_spec, attr), - spec[0] - ) + spec[0], + ), ) def test_integer_types(self): - specs = [ - (Integer, INTEGER(),), - (Numeric, INTEGER(),), - ] + specs = [(Integer, INTEGER()), (Numeric, INTEGER())] self._run_test(specs, []) def test_number_types(self): - specs = [ - (Numeric(5, 2), NUMBER(5, 2),), - (NUMBER, NUMBER(),), - ] - self._run_test(specs, ['precision', 'scale']) + specs = [(Numeric(5, 2), NUMBER(5, 2)), (NUMBER, NUMBER())] + self._run_test(specs, ["precision", "scale"]) def test_float_types(self): specs = [ @@ -587,11 +700,11 @@ class TypeReflectionTest(fixtures.TestBase): # (DOUBLE_PRECISION(), oracle.FLOAT(binary_precision=126)), (BINARY_DOUBLE(), BINARY_DOUBLE()), (BINARY_FLOAT(), BINARY_FLOAT()), - (FLOAT(5), FLOAT(),), + (FLOAT(5), FLOAT()), # when binary_precision is supported # (FLOAT(5), oracle.FLOAT(binary_precision=5),), (FLOAT(), FLOAT()), # when binary_precision is supported # (FLOAT(5), oracle.FLOAT(binary_precision=126),), ] - self._run_test(specs, ['precision']) + self._run_test(specs, ["precision"]) |