summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2020-07-13 19:07:35 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2020-07-13 19:07:35 +0000
commit5cae798ba9ead35dabf91da24b216a56415333c3 (patch)
treee1ac7ad3e4ff1a9193bb1d31151280769a145f91
parent99d3f173dcb0eea88871102e89cf05ce3be9b4cb (diff)
parent9d0fb152069caa8de887aba28cef87f7acb32e37 (diff)
downloadsqlalchemy-2020_tutorial.tar.gz
Merge "test single and double quote inspection scenarios"2020_tutorial
-rw-r--r--doc/build/changelog/unreleased_13/5456.rst9
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py9
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py31
-rw-r--r--lib/sqlalchemy/testing/requirements.py7
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py139
-rw-r--r--test/requirements.py10
6 files changed, 185 insertions, 20 deletions
diff --git a/doc/build/changelog/unreleased_13/5456.rst b/doc/build/changelog/unreleased_13/5456.rst
new file mode 100644
index 000000000..823f3731b
--- /dev/null
+++ b/doc/build/changelog/unreleased_13/5456.rst
@@ -0,0 +1,9 @@
+.. change::
+ :tags: bug, reflection, sqlite, mssql
+ :tickets: 5456
+
+ Applied a sweep through all included dialects to ensure names that contain
+ single or double quotes are properly escaped when querying system tables,
+ for all :class:`.Inspector` methods that accept object names as an argument
+ (e.g. table names, view names, etc). SQLite and MSSQL contained two
+ quoting issues that were repaired.
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index 427dd0e1f..c3cc4e425 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -2888,9 +2888,12 @@ class MSDialect(default.DefaultDialect):
for col in cols:
colmap[col["name"]] = col
# We also run an sp_columns to check for identity columns:
- cursor = connection.exec_driver_sql(
- "sp_columns @table_name = '%s', "
- "@table_owner = '%s'" % (tablename, owner)
+ cursor = connection.execute(
+ sql.text(
+ "sp_columns @table_name = :table_name, "
+ "@table_owner = :table_owner",
+ ),
+ {"table_name": tablename, "table_owner": owner},
)
ic = None
while True:
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index 0475a397e..3a8ffa23d 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -1668,27 +1668,26 @@ class SQLiteDialect(default.DefaultDialect):
if schema is not None:
qschema = self.identifier_preparer.quote_identifier(schema)
master = "%s.sqlite_master" % qschema
- s = ("SELECT sql FROM %s WHERE name = '%s'" "AND type='view'") % (
+ s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
master,
- view_name,
)
- rs = connection.exec_driver_sql(s)
+ rs = connection.exec_driver_sql(s, (view_name,))
else:
try:
s = (
"SELECT sql FROM "
" (SELECT * FROM sqlite_master UNION ALL "
" SELECT * FROM sqlite_temp_master) "
- "WHERE name = '%s' "
+ "WHERE name = ? "
"AND type='view'"
- ) % view_name
- rs = connection.exec_driver_sql(s)
+ )
+ rs = connection.exec_driver_sql(s, (view_name,))
except exc.DBAPIError:
s = (
- "SELECT sql FROM sqlite_master WHERE name = '%s' "
+ "SELECT sql FROM sqlite_master WHERE name = ? "
"AND type='view'"
- ) % view_name
- rs = connection.exec_driver_sql(s)
+ )
+ rs = connection.exec_driver_sql(s, (view_name,))
result = rs.fetchall()
if result:
@@ -2136,19 +2135,17 @@ class SQLiteDialect(default.DefaultDialect):
"SELECT sql FROM "
" (SELECT * FROM %(schema)ssqlite_master UNION ALL "
" SELECT * FROM %(schema)ssqlite_temp_master) "
- "WHERE name = '%(table)s' "
- "AND type = 'table'"
- % {"schema": schema_expr, "table": table_name}
+ "WHERE name = ? "
+ "AND type = 'table'" % {"schema": schema_expr}
)
- rs = connection.exec_driver_sql(s)
+ rs = connection.exec_driver_sql(s, (table_name,))
except exc.DBAPIError:
s = (
"SELECT sql FROM %(schema)ssqlite_master "
- "WHERE name = '%(table)s' "
- "AND type = 'table'"
- % {"schema": schema_expr, "table": table_name}
+ "WHERE name = ? "
+ "AND type = 'table'" % {"schema": schema_expr}
)
- rs = connection.exec_driver_sql(s)
+ rs = connection.exec_driver_sql(s, (table_name,))
return rs.scalar()
def _get_table_pragma(self, connection, pragma, table_name, schema=None):
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index 1c350da3b..72f2612aa 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -635,6 +635,13 @@ class SuiteRequirements(Requirements):
return exclusions.closed()
@property
+ def symbol_names_w_double_quote(self):
+ """Target driver can create tables with a name like 'some " table'
+
+ """
+ return exclusions.open()
+
+ @property
def datetime_literals(self):
"""target dialect supports rendering of a date, time, or datetime as a
literal string, e.g. via the TypeEngine.literal_processor() method.
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index 84b3aba5b..776d559d4 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -134,6 +134,144 @@ class HasIndexTest(fixtures.TablesTest):
)
+class QuotedNameArgumentTest(fixtures.TablesTest):
+ run_create_tables = "once"
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "quote ' one",
+ metadata,
+ Column("id", Integer),
+ Column("name", String(50)),
+ Column("data", String(50)),
+ Column("related_id", Integer),
+ sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
+ sa.Index("ix quote ' one", "name"),
+ sa.UniqueConstraint("data", name="uq quote' one",),
+ sa.ForeignKeyConstraint(
+ ["id"], ["related.id"], name="fk quote ' one"
+ ),
+ sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
+ comment=r"""quote ' one comment""",
+ test_needs_fk=True,
+ )
+
+ if testing.requires.symbol_names_w_double_quote.enabled:
+ Table(
+ 'quote " two',
+ metadata,
+ Column("id", Integer),
+ Column("name", String(50)),
+ Column("data", String(50)),
+ Column("related_id", Integer),
+ sa.PrimaryKeyConstraint("id", name='pk quote " two'),
+ sa.Index('ix quote " two', "name"),
+ sa.UniqueConstraint("data", name='uq quote" two',),
+ sa.ForeignKeyConstraint(
+ ["id"], ["related.id"], name='fk quote " two'
+ ),
+ sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
+ comment=r"""quote " two comment""",
+ test_needs_fk=True,
+ )
+
+ Table(
+ "related",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("related", Integer),
+ test_needs_fk=True,
+ )
+
+ if testing.requires.view_column_reflection.enabled:
+
+ if testing.requires.symbol_names_w_double_quote.enabled:
+ names = [
+ "quote ' one",
+ 'quote " two',
+ ]
+ else:
+ names = [
+ "quote ' one",
+ ]
+ for name in names:
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (
+ testing.db.dialect.identifier_preparer.quote(
+ "view %s" % name
+ ),
+ testing.db.dialect.identifier_preparer.quote(name),
+ )
+
+ event.listen(metadata, "after_create", DDL(query))
+ event.listen(
+ metadata,
+ "before_drop",
+ DDL(
+ "DROP VIEW %s"
+ % testing.db.dialect.identifier_preparer.quote(
+ "view %s" % name
+ )
+ ),
+ )
+
+ def quote_fixtures(fn):
+ return testing.combinations(
+ ("quote ' one",),
+ ('quote " two', testing.requires.symbol_names_w_double_quote),
+ )(fn)
+
+ @quote_fixtures
+ def test_get_table_options(self, name):
+ insp = inspect(testing.db)
+
+ insp.get_table_options(name)
+
+ @quote_fixtures
+ def test_get_view_definition(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_view_definition("view %s" % name)
+
+ @quote_fixtures
+ def test_get_columns(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_columns(name)
+
+ @quote_fixtures
+ def test_get_pk_constraint(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_pk_constraint(name)
+
+ @quote_fixtures
+ def test_get_foreign_keys(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_foreign_keys(name)
+
+ @quote_fixtures
+ def test_get_indexes(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_indexes(name)
+
+ @quote_fixtures
+ @testing.requires.unique_constraint_reflection
+ def test_get_unique_constraints(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_unique_constraints(name)
+
+ @quote_fixtures
+ @testing.requires.comment_reflection
+ def test_get_table_comment(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_table_comment(name)
+
+ @quote_fixtures
+ @testing.requires.check_constraint_reflection
+ def test_get_check_constraints(self, name):
+ insp = inspect(testing.db)
+ assert insp.get_check_constraints(name)
+
+
class ComponentReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
@@ -1254,6 +1392,7 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
__all__ = (
"ComponentReflectionTest",
+ "QuotedNameArgumentTest",
"HasTableTest",
"HasIndexTest",
"NormalizedNameTest",
diff --git a/test/requirements.py b/test/requirements.py
index 1ab0993c6..28f955fa5 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -846,6 +846,16 @@ class DefaultRequirements(SuiteRequirements):
)
@property
+ def symbol_names_w_double_quote(self):
+ """Target driver can create tables with a name like 'some " table'
+
+ """
+
+ return skip_if(
+ [no_support("oracle", "ORA-03001: unimplemented feature")]
+ )
+
+ @property
def emulated_lastrowid(self):
""""target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes.