summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES5
-rw-r--r--lib/sqlalchemy/databases/mssql.py2
-rw-r--r--lib/sqlalchemy/sql/compiler.py13
-rw-r--r--lib/sqlalchemy/sql/expression.py5
-rw-r--r--test/dialect/postgres.py26
-rw-r--r--test/sql/select.py14
6 files changed, 45 insertions, 20 deletions
diff --git a/CHANGES b/CHANGES
index 27280f225..41ddd52b7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -129,6 +129,11 @@ CHANGES
mapper-config'ed order_by when using select_from()
- sql
+ - schema-qualified tables now will place the schemaname
+ ahead of the tablename in all column expressions as well
+ as when generating column labels. This prevents cross-
+ schema name collisions in all cases [ticket:999]
+
- can now allow selects which correlate all FROM clauses
and have no FROM themselves. These are typically
used in a scalar context, i.e. SELECT x, (SELECT x WHERE y)
diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py
index 0ebb84a16..da42ea105 100644
--- a/lib/sqlalchemy/databases/mssql.py
+++ b/lib/sqlalchemy/databases/mssql.py
@@ -987,8 +987,6 @@ class MSSQLCompiler(compiler.DefaultCompiler):
t = self._schema_aliased_table(column.table)
if t is not None:
return self.process(expression._corresponding_column_or_error(t, column))
- else:
- kwargs['use_schema'] = True
return super(MSSQLCompiler, self).visit_column(column, **kwargs)
def visit_binary(self, binary, **kwargs):
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index ee3ecc1f1..76e2ca260 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -249,14 +249,9 @@ class DefaultCompiler(engine.Compiled):
return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])
- def visit_column(self, column, result_map=None, use_schema=False, **kwargs):
- # there is actually somewhat of a ruleset when you would *not* necessarily
- # want to truncate a column identifier, if its mapped to the name of a
- # physical column. but thats very hard to identify at this point, and
- # the identifier length should be greater than the id lengths of any physical
- # columns so should not matter.
-
- if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None):
+ def visit_column(self, column, result_map=None, **kwargs):
+
+ if getattr(column, 'table', None) and getattr(column.table, 'schema', None):
schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.'
else:
schema_prefix = ''
@@ -278,7 +273,7 @@ class DefaultCompiler(engine.Compiled):
return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n
elif len(column.table.primary_key) != 0:
pk = list(column.table.primary_key)[0]
- return self.visit_column(pk, result_map=result_map, use_schema=use_schema, **kwargs)
+ return self.visit_column(pk, result_map=result_map, **kwargs)
else:
return None
elif column.table is None or not column.table.named_with_column:
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 3d95948cb..2cd10720a 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -2632,7 +2632,10 @@ class _ColumnClause(ColumnElement):
return None
if self.__label is None:
if self.table is not None and self.table.named_with_column:
- self.__label = self.table.name + "_" + self.name
+ if getattr(self.table, 'schema', None):
+ self.__label = "_".join([self.table.schema, self.table.name, self.name])
+ else:
+ self.__label = "_".join([self.table.name, self.name])
counter = 1
while self.__label in self.table.c:
self.__label = self.__label + "_%d" % counter
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py
index 21b11f114..e68fd4d74 100644
--- a/test/dialect/postgres.py
+++ b/test/dialect/postgres.py
@@ -568,7 +568,31 @@ class MiscTest(TestBase, AssertsExecutionResults):
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
-
+
+ def test_schema_roundtrips(self):
+ meta = MetaData(testing.db)
+ users = Table('users', meta,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(50)), schema='alt_schema')
+ users.create()
+ try:
+ users.insert().execute(id=1, name='name1')
+ users.insert().execute(id=2, name='name2')
+ users.insert().execute(id=3, name='name3')
+ users.insert().execute(id=4, name='name4')
+
+ self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
+ self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
+
+ users.delete().where(users.c.id==3).execute()
+ self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), [])
+
+ users.update().where(users.c.name=='name4').execute(name='newname')
+ self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
+
+ finally:
+ users.drop()
+
def test_preexecute_passivedefault(self):
"""test that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
diff --git a/test/sql/select.py b/test/sql/select.py
index b8d1f4f68..06679e956 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -1360,16 +1360,16 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
@testing.fails_on('mssql')
def test_select(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
- self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
+ self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable")
self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
- "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\
- "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
+ "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "\
+ "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'))
s.use_labels = True
- self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value "\
- "AS remotetable_value FROM remote_owner.remotetable WHERE "\
- "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
+ self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\
+ "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\
+ "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
def test_alias(self):
a = alias(table4, 'remtable')
@@ -1378,7 +1378,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
def test_update(self):
self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\
- "WHERE remotetable.value = :remotetable_value_1")
+ "WHERE remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
def test_insert(self):
self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\