diff options
author | mike bayer <mike_mp@zzzcomputing.com> | 2020-07-13 19:07:35 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2020-07-13 19:07:35 +0000 |
commit | 5cae798ba9ead35dabf91da24b216a56415333c3 (patch) | |
tree | e1ac7ad3e4ff1a9193bb1d31151280769a145f91 | |
parent | 99d3f173dcb0eea88871102e89cf05ce3be9b4cb (diff) | |
parent | 9d0fb152069caa8de887aba28cef87f7acb32e37 (diff) | |
download | sqlalchemy-2020_tutorial.tar.gz |
Merge "test single and double quote inspection scenarios"2020_tutorial
-rw-r--r-- | doc/build/changelog/unreleased_13/5456.rst | 9 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mssql/base.py | 9 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/sqlite/base.py | 31 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 7 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_reflection.py | 139 | ||||
-rw-r--r-- | test/requirements.py | 10 |
6 files changed, 185 insertions, 20 deletions
diff --git a/doc/build/changelog/unreleased_13/5456.rst b/doc/build/changelog/unreleased_13/5456.rst new file mode 100644 index 000000000..823f3731b --- /dev/null +++ b/doc/build/changelog/unreleased_13/5456.rst @@ -0,0 +1,9 @@ +.. change:: + :tags: bug, reflection, sqlite, mssql + :tickets: 5456 + + Applied a sweep through all included dialects to ensure names that contain + single or double quotes are properly escaped when querying system tables, + for all :class:`.Inspector` methods that accept object names as an argument + (e.g. table names, view names, etc). SQLite and MSSQL contained two + quoting issues that were repaired. diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 427dd0e1f..c3cc4e425 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2888,9 +2888,12 @@ class MSDialect(default.DefaultDialect): for col in cols: colmap[col["name"]] = col # We also run an sp_columns to check for identity columns: - cursor = connection.exec_driver_sql( - "sp_columns @table_name = '%s', " - "@table_owner = '%s'" % (tablename, owner) + cursor = connection.execute( + sql.text( + "sp_columns @table_name = :table_name, " + "@table_owner = :table_owner", + ), + {"table_name": tablename, "table_owner": owner}, ) ic = None while True: diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 0475a397e..3a8ffa23d 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1668,27 +1668,26 @@ class SQLiteDialect(default.DefaultDialect): if schema is not None: qschema = self.identifier_preparer.quote_identifier(schema) master = "%s.sqlite_master" % qschema - s = ("SELECT sql FROM %s WHERE name = '%s'" "AND type='view'") % ( + s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( master, - view_name, ) - rs = connection.exec_driver_sql(s) + rs = connection.exec_driver_sql(s, (view_name,)) else: try: s = ( "SELECT sql FROM " " (SELECT * FROM sqlite_master UNION ALL " " SELECT * FROM sqlite_temp_master) " - "WHERE name = '%s' " + "WHERE name = ? " "AND type='view'" - ) % view_name - rs = connection.exec_driver_sql(s) + ) + rs = connection.exec_driver_sql(s, (view_name,)) except exc.DBAPIError: s = ( - "SELECT sql FROM sqlite_master WHERE name = '%s' " + "SELECT sql FROM sqlite_master WHERE name = ? " "AND type='view'" - ) % view_name - rs = connection.exec_driver_sql(s) + ) + rs = connection.exec_driver_sql(s, (view_name,)) result = rs.fetchall() if result: @@ -2136,19 +2135,17 @@ class SQLiteDialect(default.DefaultDialect): "SELECT sql FROM " " (SELECT * FROM %(schema)ssqlite_master UNION ALL " " SELECT * FROM %(schema)ssqlite_temp_master) " - "WHERE name = '%(table)s' " - "AND type = 'table'" - % {"schema": schema_expr, "table": table_name} + "WHERE name = ? " + "AND type = 'table'" % {"schema": schema_expr} ) - rs = connection.exec_driver_sql(s) + rs = connection.exec_driver_sql(s, (table_name,)) except exc.DBAPIError: s = ( "SELECT sql FROM %(schema)ssqlite_master " - "WHERE name = '%(table)s' " - "AND type = 'table'" - % {"schema": schema_expr, "table": table_name} + "WHERE name = ? " + "AND type = 'table'" % {"schema": schema_expr} ) - rs = connection.exec_driver_sql(s) + rs = connection.exec_driver_sql(s, (table_name,)) return rs.scalar() def _get_table_pragma(self, connection, pragma, table_name, schema=None): diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 1c350da3b..72f2612aa 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -635,6 +635,13 @@ class SuiteRequirements(Requirements): return exclusions.closed() @property + def symbol_names_w_double_quote(self): + """Target driver can create tables with a name like 'some " table' + + """ + return exclusions.open() + + @property def datetime_literals(self): """target dialect supports rendering of a date, time, or datetime as a literal string, e.g. via the TypeEngine.literal_processor() method. diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index 84b3aba5b..776d559d4 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -134,6 +134,144 @@ class HasIndexTest(fixtures.TablesTest): ) +class QuotedNameArgumentTest(fixtures.TablesTest): + run_create_tables = "once" + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "quote ' one", + metadata, + Column("id", Integer), + Column("name", String(50)), + Column("data", String(50)), + Column("related_id", Integer), + sa.PrimaryKeyConstraint("id", name="pk quote ' one"), + sa.Index("ix quote ' one", "name"), + sa.UniqueConstraint("data", name="uq quote' one",), + sa.ForeignKeyConstraint( + ["id"], ["related.id"], name="fk quote ' one" + ), + sa.CheckConstraint("name != 'foo'", name="ck quote ' one"), + comment=r"""quote ' one comment""", + test_needs_fk=True, + ) + + if testing.requires.symbol_names_w_double_quote.enabled: + Table( + 'quote " two', + metadata, + Column("id", Integer), + Column("name", String(50)), + Column("data", String(50)), + Column("related_id", Integer), + sa.PrimaryKeyConstraint("id", name='pk quote " two'), + sa.Index('ix quote " two', "name"), + sa.UniqueConstraint("data", name='uq quote" two',), + sa.ForeignKeyConstraint( + ["id"], ["related.id"], name='fk quote " two' + ), + sa.CheckConstraint("name != 'foo'", name='ck quote " two '), + comment=r"""quote " two comment""", + test_needs_fk=True, + ) + + Table( + "related", + metadata, + Column("id", Integer, primary_key=True), + Column("related", Integer), + test_needs_fk=True, + ) + + if testing.requires.view_column_reflection.enabled: + + if testing.requires.symbol_names_w_double_quote.enabled: + names = [ + "quote ' one", + 'quote " two', + ] + else: + names = [ + "quote ' one", + ] + for name in names: + query = "CREATE VIEW %s AS SELECT * FROM %s" % ( + testing.db.dialect.identifier_preparer.quote( + "view %s" % name + ), + testing.db.dialect.identifier_preparer.quote(name), + ) + + event.listen(metadata, "after_create", DDL(query)) + event.listen( + metadata, + "before_drop", + DDL( + "DROP VIEW %s" + % testing.db.dialect.identifier_preparer.quote( + "view %s" % name + ) + ), + ) + + def quote_fixtures(fn): + return testing.combinations( + ("quote ' one",), + ('quote " two', testing.requires.symbol_names_w_double_quote), + )(fn) + + @quote_fixtures + def test_get_table_options(self, name): + insp = inspect(testing.db) + + insp.get_table_options(name) + + @quote_fixtures + def test_get_view_definition(self, name): + insp = inspect(testing.db) + assert insp.get_view_definition("view %s" % name) + + @quote_fixtures + def test_get_columns(self, name): + insp = inspect(testing.db) + assert insp.get_columns(name) + + @quote_fixtures + def test_get_pk_constraint(self, name): + insp = inspect(testing.db) + assert insp.get_pk_constraint(name) + + @quote_fixtures + def test_get_foreign_keys(self, name): + insp = inspect(testing.db) + assert insp.get_foreign_keys(name) + + @quote_fixtures + def test_get_indexes(self, name): + insp = inspect(testing.db) + assert insp.get_indexes(name) + + @quote_fixtures + @testing.requires.unique_constraint_reflection + def test_get_unique_constraints(self, name): + insp = inspect(testing.db) + assert insp.get_unique_constraints(name) + + @quote_fixtures + @testing.requires.comment_reflection + def test_get_table_comment(self, name): + insp = inspect(testing.db) + assert insp.get_table_comment(name) + + @quote_fixtures + @testing.requires.check_constraint_reflection + def test_get_check_constraints(self, name): + insp = inspect(testing.db) + assert insp.get_check_constraints(name) + + class ComponentReflectionTest(fixtures.TablesTest): run_inserts = run_deletes = None @@ -1254,6 +1392,7 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest): __all__ = ( "ComponentReflectionTest", + "QuotedNameArgumentTest", "HasTableTest", "HasIndexTest", "NormalizedNameTest", diff --git a/test/requirements.py b/test/requirements.py index 1ab0993c6..28f955fa5 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -846,6 +846,16 @@ class DefaultRequirements(SuiteRequirements): ) @property + def symbol_names_w_double_quote(self): + """Target driver can create tables with a name like 'some " table' + + """ + + return skip_if( + [no_support("oracle", "ORA-03001: unimplemented feature")] + ) + + @property def emulated_lastrowid(self): """"target dialect retrieves cursor.lastrowid or an equivalent after an insert() construct executes. |