diff options
Diffstat (limited to 'test/sql/test_metadata.py')
-rw-r--r-- | test/sql/test_metadata.py | 476 |
1 files changed, 308 insertions, 168 deletions
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 36c777c9a..dd3f4fc3d 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -306,8 +306,168 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): ) - @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') - def test_to_metadata(self): + + def test_pickle_metadata_sequence_restated(self): + m1 = MetaData() + Table('a', m1, + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq"))) + + m2 = pickle.loads(pickle.dumps(m1)) + + s2 = Sequence("x_seq") + t2 = Table('a', m2, + Column('id', Integer, primary_key=True), + Column('x', Integer, s2), + extend_existing=True) + + assert m2._sequences['x_seq'] is t2.c.x.default + assert m2._sequences['x_seq'] is s2 + + + def test_sequence_restated_replaced(self): + """Test restatement of Sequence replaces.""" + + m1 = MetaData() + s1 = Sequence("x_seq") + t = Table('a', m1, + Column('x', Integer, s1) + ) + assert m1._sequences['x_seq'] is s1 + + s2 = Sequence('x_seq') + Table('a', m1, + Column('x', Integer, s2), + extend_existing=True + ) + assert t.c.x.default is s2 + assert m1._sequences['x_seq'] is s2 + + + def test_pickle_metadata_sequence_implicit(self): + m1 = MetaData() + Table('a', m1, + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq"))) + + m2 = pickle.loads(pickle.dumps(m1)) + + t2 = Table('a', m2, extend_existing=True) + + eq_(m2._sequences, {'x_seq': t2.c.x.default}) + + def test_pickle_metadata_schema(self): + m1 = MetaData() + Table('a', m1, + Column('id', Integer, primary_key=True), + Column('x', Integer, Sequence("x_seq")), + schema='y') + + m2 = pickle.loads(pickle.dumps(m1)) + + Table('a', m2, schema='y', + extend_existing=True) + + eq_(m2._schemas, m1._schemas) + + def test_metadata_schema_arg(self): + m1 = MetaData(schema='sch1') + m2 = MetaData(schema='sch1', quote_schema=True) + m3 = MetaData(schema='sch1', quote_schema=False) + m4 = MetaData() + + for i, (name, metadata, schema, quote_schema, + exp_schema, exp_quote_schema) in enumerate([ + ('t1', m1, None, None, 'sch1', None), + ('t2', m1, 'sch2', None, 'sch2', None), + ('t3', m1, 'sch2', True, 'sch2', True), + ('t4', m1, 'sch1', None, 'sch1', None), + ('t1', m2, None, None, 'sch1', True), + ('t2', m2, 'sch2', None, 'sch2', None), + ('t3', m2, 'sch2', True, 'sch2', True), + ('t4', m2, 'sch1', None, 'sch1', None), + ('t1', m3, None, None, 'sch1', False), + ('t2', m3, 'sch2', None, 'sch2', None), + ('t3', m3, 'sch2', True, 'sch2', True), + ('t4', m3, 'sch1', None, 'sch1', None), + ('t1', m4, None, None, None, None), + ('t2', m4, 'sch2', None, 'sch2', None), + ('t3', m4, 'sch2', True, 'sch2', True), + ('t4', m4, 'sch1', None, 'sch1', None), + ]): + kw = {} + if schema is not None: + kw['schema'] = schema + if quote_schema is not None: + kw['quote_schema'] = quote_schema + t = Table(name, metadata, **kw) + eq_(t.schema, exp_schema, "test %d, table schema" % i) + eq_(t.schema.quote if t.schema is not None else None, + exp_quote_schema, + "test %d, table quote_schema" % i) + seq = Sequence(name, metadata=metadata, **kw) + eq_(seq.schema, exp_schema, "test %d, seq schema" % i) + eq_(seq.schema.quote if seq.schema is not None else None, + exp_quote_schema, + "test %d, seq quote_schema" % i) + + def test_manual_dependencies(self): + meta = MetaData() + a = Table('a', meta, Column('foo', Integer)) + b = Table('b', meta, Column('foo', Integer)) + c = Table('c', meta, Column('foo', Integer)) + d = Table('d', meta, Column('foo', Integer)) + e = Table('e', meta, Column('foo', Integer)) + + e.add_is_dependent_on(c) + a.add_is_dependent_on(b) + b.add_is_dependent_on(d) + e.add_is_dependent_on(b) + c.add_is_dependent_on(a) + eq_( + meta.sorted_tables, + [d, b, a, c, e] + ) + + def test_nonexistent(self): + assert_raises(tsa.exc.NoSuchTableError, Table, + 'fake_table', + MetaData(testing.db), autoload=True) + + def test_assorted_repr(self): + t1 = Table("foo", MetaData(), Column("x", Integer)) + i1 = Index("bar", t1.c.x) + ck = schema.CheckConstraint("x > y", name="someconstraint") + + for const, exp in ( + (Sequence("my_seq"), + "Sequence('my_seq')"), + (Sequence("my_seq", start=5), + "Sequence('my_seq', start=5)"), + (Column("foo", Integer), + "Column('foo', Integer(), table=None)"), + (Table("bar", MetaData(), Column("x", String)), + "Table('bar', MetaData(bind=None), " + "Column('x', String(), table=<bar>), schema=None)"), + (schema.DefaultGenerator(for_update=True), + "DefaultGenerator(for_update=True)"), + (schema.Index("bar", "c"), "Index('bar')"), + (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), + (schema.FetchedValue(), "FetchedValue()"), + (ck, + "CheckConstraint(" + "%s" + ", name='someconstraint')" % repr(ck.sqltext)), + ): + eq_( + repr(const), + exp + ) + + +class ToMetaDataTest(fixtures.TestBase, ComparesTables): + + def test_copy(self): from sqlalchemy.testing.schema import Table meta = MetaData() @@ -403,7 +563,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): finally: meta.drop_all(testing.db) - def test_col_key_fk_parent_tometadata(self): + def test_col_key_fk_parent(self): # test #2643 m1 = MetaData() a = Table('a', m1, Column('x', Integer)) @@ -416,70 +576,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): assert b2.c.y.references(a2.c.x) - def test_pickle_metadata_sequence_restated(self): - m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) - - m2 = pickle.loads(pickle.dumps(m1)) - - s2 = Sequence("x_seq") - t2 = Table('a', m2, - Column('id', Integer, primary_key=True), - Column('x', Integer, s2), - extend_existing=True) - - assert m2._sequences['x_seq'] is t2.c.x.default - assert m2._sequences['x_seq'] is s2 - - - def test_sequence_restated_replaced(self): - """Test restatement of Sequence replaces.""" - - m1 = MetaData() - s1 = Sequence("x_seq") - t = Table('a', m1, - Column('x', Integer, s1) - ) - assert m1._sequences['x_seq'] is s1 - - s2 = Sequence('x_seq') - Table('a', m1, - Column('x', Integer, s2), - extend_existing=True - ) - assert t.c.x.default is s2 - assert m1._sequences['x_seq'] is s2 - - - def test_pickle_metadata_sequence_implicit(self): - m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq"))) - - m2 = pickle.loads(pickle.dumps(m1)) - - t2 = Table('a', m2, extend_existing=True) - - eq_(m2._sequences, {'x_seq': t2.c.x.default}) - - def test_pickle_metadata_schema(self): - m1 = MetaData() - Table('a', m1, - Column('id', Integer, primary_key=True), - Column('x', Integer, Sequence("x_seq")), - schema='y') - - m2 = pickle.loads(pickle.dumps(m1)) - - Table('a', m2, schema='y', - extend_existing=True) - - eq_(m2._schemas, m1._schemas) - - def test_tometadata_with_schema(self): + def test_change_schema(self): meta = MetaData() table = Table('mytable', meta, @@ -504,7 +601,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), 'someschema.mytable.myid = someschema.othertable.myid') - def test_tometadata_with_default_schema(self): + def test_retain_table_schema(self): meta = MetaData() table = Table('mytable', meta, @@ -531,7 +628,144 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), 'myschema.mytable.myid = myschema.othertable.myid') - def test_tometadata_copy_info(self): + def _assert_fk(self, t2, schema, expected, referred_schema_fn=None): + m2 = MetaData() + existing_schema = t2.schema + if schema: + t2c = t2.tometadata(m2, schema=schema, + referred_schema_fn=referred_schema_fn) + eq_(t2c.schema, schema) + else: + t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn) + eq_(t2c.schema, existing_schema) + eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected) + + def test_fk_has_schema_string_retain_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) + self._assert_fk(t2, None, "q.t1.x") + + Table('t1', m, Column('x', Integer), schema='q') + self._assert_fk(t2, None, "q.t1.x") + + def test_fk_has_schema_string_new_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x'))) + self._assert_fk(t2, "z", "q.t1.x") + + Table('t1', m, Column('x', Integer), schema='q') + self._assert_fk(t2, "z", "q.t1.x") + + def test_fk_has_schema_col_retain_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='q') + t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) + + self._assert_fk(t2, "z", "q.t1.x") + + def test_fk_has_schema_col_new_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='q') + t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x))) + + self._assert_fk(t2, "z", "q.t1.x") + + def test_fk_and_referent_has_same_schema_string_retain_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, + ForeignKey('q.t1.x')), schema="q") + + self._assert_fk(t2, None, "q.t1.x") + + Table('t1', m, Column('x', Integer), schema='q') + self._assert_fk(t2, None, "q.t1.x") + + def test_fk_and_referent_has_same_schema_string_new_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, + ForeignKey('q.t1.x')), schema="q") + + self._assert_fk(t2, "z", "z.t1.x") + + Table('t1', m, Column('x', Integer), schema='q') + self._assert_fk(t2, "z", "z.t1.x") + + def test_fk_and_referent_has_same_schema_col_retain_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='q') + t2 = Table('t2', m, Column('y', Integer, + ForeignKey(t1.c.x)), schema='q') + self._assert_fk(t2, None, "q.t1.x") + + + def test_fk_and_referent_has_same_schema_col_new_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='q') + t2 = Table('t2', m, Column('y', Integer, + ForeignKey(t1.c.x)), schema='q') + self._assert_fk(t2, 'z', "z.t1.x") + + def test_fk_and_referent_has_diff_schema_string_retain_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, + ForeignKey('p.t1.x')), schema="q") + + self._assert_fk(t2, None, "p.t1.x") + + Table('t1', m, Column('x', Integer), schema='p') + self._assert_fk(t2, None, "p.t1.x") + + def test_fk_and_referent_has_diff_schema_string_new_schema(self): + m = MetaData() + + t2 = Table('t2', m, Column('y', Integer, + ForeignKey('p.t1.x')), schema="q") + + self._assert_fk(t2, "z", "p.t1.x") + + Table('t1', m, Column('x', Integer), schema='p') + self._assert_fk(t2, "z", "p.t1.x") + + def test_fk_and_referent_has_diff_schema_col_retain_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='p') + t2 = Table('t2', m, Column('y', Integer, + ForeignKey(t1.c.x)), schema='q') + self._assert_fk(t2, None, "p.t1.x") + + + def test_fk_and_referent_has_diff_schema_col_new_schema(self): + m = MetaData() + + t1 = Table('t1', m, Column('x', Integer), schema='p') + t2 = Table('t2', m, Column('y', Integer, + ForeignKey(t1.c.x)), schema='q') + self._assert_fk(t2, 'z', "p.t1.x") + + def test_fk_custom_system(self): + m = MetaData() + t2 = Table('t2', m, Column('y', Integer, + ForeignKey('p.t1.x')), schema='q') + + def ref_fn(table, to_schema, constraint, referred_schema): + assert table is t2 + eq_(to_schema, "z") + eq_(referred_schema, "p") + return "h" + self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn) + + + def test_copy_info(self): m = MetaData() fk = ForeignKey('t2.id') c = Column('c', Integer, fk) @@ -573,7 +807,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(ck2.info, {"ckinfo": True}) - def test_tometadata_kwargs(self): + def test_dialect_kwargs(self): meta = MetaData() table = Table('mytable', meta, @@ -588,7 +822,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(table.kwargs, table_c.kwargs) - def test_tometadata_indexes(self): + def test_indexes(self): meta = MetaData() table = Table('mytable', meta, @@ -612,7 +846,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): ) @emits_warning("Table '.+' already exists within the given MetaData") - def test_tometadata_already_there(self): + def test_already_exists(self): meta1 = MetaData() table1 = Table('mytable', meta1, @@ -629,66 +863,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): # d'oh! assert table_c is table_d - def test_metadata_schema_arg(self): - m1 = MetaData(schema='sch1') - m2 = MetaData(schema='sch1', quote_schema=True) - m3 = MetaData(schema='sch1', quote_schema=False) - m4 = MetaData() - - for i, (name, metadata, schema, quote_schema, - exp_schema, exp_quote_schema) in enumerate([ - ('t1', m1, None, None, 'sch1', None), - ('t2', m1, 'sch2', None, 'sch2', None), - ('t3', m1, 'sch2', True, 'sch2', True), - ('t4', m1, 'sch1', None, 'sch1', None), - ('t1', m2, None, None, 'sch1', True), - ('t2', m2, 'sch2', None, 'sch2', None), - ('t3', m2, 'sch2', True, 'sch2', True), - ('t4', m2, 'sch1', None, 'sch1', None), - ('t1', m3, None, None, 'sch1', False), - ('t2', m3, 'sch2', None, 'sch2', None), - ('t3', m3, 'sch2', True, 'sch2', True), - ('t4', m3, 'sch1', None, 'sch1', None), - ('t1', m4, None, None, None, None), - ('t2', m4, 'sch2', None, 'sch2', None), - ('t3', m4, 'sch2', True, 'sch2', True), - ('t4', m4, 'sch1', None, 'sch1', None), - ]): - kw = {} - if schema is not None: - kw['schema'] = schema - if quote_schema is not None: - kw['quote_schema'] = quote_schema - t = Table(name, metadata, **kw) - eq_(t.schema, exp_schema, "test %d, table schema" % i) - eq_(t.schema.quote if t.schema is not None else None, - exp_quote_schema, - "test %d, table quote_schema" % i) - seq = Sequence(name, metadata=metadata, **kw) - eq_(seq.schema, exp_schema, "test %d, seq schema" % i) - eq_(seq.schema.quote if seq.schema is not None else None, - exp_quote_schema, - "test %d, seq quote_schema" % i) - - def test_manual_dependencies(self): - meta = MetaData() - a = Table('a', meta, Column('foo', Integer)) - b = Table('b', meta, Column('foo', Integer)) - c = Table('c', meta, Column('foo', Integer)) - d = Table('d', meta, Column('foo', Integer)) - e = Table('e', meta, Column('foo', Integer)) - - e.add_is_dependent_on(c) - a.add_is_dependent_on(b) - b.add_is_dependent_on(d) - e.add_is_dependent_on(b) - c.add_is_dependent_on(a) - eq_( - meta.sorted_tables, - [d, b, a, c, e] - ) - - def test_tometadata_default_schema_metadata(self): + def test_default_schema_metadata(self): meta = MetaData(schema='myschema') table = Table('mytable', meta, @@ -712,7 +887,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), "someschema.mytable.myid = someschema.othertable.myid") - def test_tometadata_strip_schema(self): + def test_strip_schema(self): meta = MetaData() table = Table('mytable', meta, @@ -737,41 +912,6 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), 'mytable.myid = othertable.myid') - def test_nonexistent(self): - assert_raises(tsa.exc.NoSuchTableError, Table, - 'fake_table', - MetaData(testing.db), autoload=True) - - def test_assorted_repr(self): - t1 = Table("foo", MetaData(), Column("x", Integer)) - i1 = Index("bar", t1.c.x) - ck = schema.CheckConstraint("x > y", name="someconstraint") - - for const, exp in ( - (Sequence("my_seq"), - "Sequence('my_seq')"), - (Sequence("my_seq", start=5), - "Sequence('my_seq', start=5)"), - (Column("foo", Integer), - "Column('foo', Integer(), table=None)"), - (Table("bar", MetaData(), Column("x", String)), - "Table('bar', MetaData(bind=None), " - "Column('x', String(), table=<bar>), schema=None)"), - (schema.DefaultGenerator(for_update=True), - "DefaultGenerator(for_update=True)"), - (schema.Index("bar", "c"), "Index('bar')"), - (i1, "Index('bar', Column('x', Integer(), table=<foo>))"), - (schema.FetchedValue(), "FetchedValue()"), - (ck, - "CheckConstraint(" - "%s" - ", name='someconstraint')" % repr(ck.sqltext)), - ): - eq_( - repr(const), - exp - ) - class TableTest(fixtures.TestBase, AssertsCompiledSQL): @testing.skip_if('mssql', 'different col format') def test_prefixes(self): |