diff options
-rw-r--r-- | CHANGES | 5 | ||||
-rw-r--r-- | lib/sqlalchemy/databases/mssql.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/sql/compiler.py | 13 | ||||
-rw-r--r-- | lib/sqlalchemy/sql/expression.py | 5 | ||||
-rw-r--r-- | test/dialect/postgres.py | 26 | ||||
-rw-r--r-- | test/sql/select.py | 14 |
6 files changed, 45 insertions, 20 deletions
@@ -129,6 +129,11 @@ CHANGES mapper-config'ed order_by when using select_from() - sql + - schema-qualified tables now will place the schemaname + ahead of the tablename in all column expressions as well + as when generating column labels. This prevents cross- + schema name collisions in all cases [ticket:999] + - can now allow selects which correlate all FROM clauses and have no FROM themselves. These are typically used in a scalar context, i.e. SELECT x, (SELECT x WHERE y) diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py index 0ebb84a16..da42ea105 100644 --- a/lib/sqlalchemy/databases/mssql.py +++ b/lib/sqlalchemy/databases/mssql.py @@ -987,8 +987,6 @@ class MSSQLCompiler(compiler.DefaultCompiler): t = self._schema_aliased_table(column.table) if t is not None: return self.process(expression._corresponding_column_or_error(t, column)) - else: - kwargs['use_schema'] = True return super(MSSQLCompiler, self).visit_column(column, **kwargs) def visit_binary(self, binary, **kwargs): diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index ee3ecc1f1..76e2ca260 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -249,14 +249,9 @@ class DefaultCompiler(engine.Compiled): return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)]) - def visit_column(self, column, result_map=None, use_schema=False, **kwargs): - # there is actually somewhat of a ruleset when you would *not* necessarily - # want to truncate a column identifier, if its mapped to the name of a - # physical column. but thats very hard to identify at this point, and - # the identifier length should be greater than the id lengths of any physical - # columns so should not matter. - - if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None): + def visit_column(self, column, result_map=None, **kwargs): + + if getattr(column, 'table', None) and getattr(column.table, 'schema', None): schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.' else: schema_prefix = '' @@ -278,7 +273,7 @@ class DefaultCompiler(engine.Compiled): return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n elif len(column.table.primary_key) != 0: pk = list(column.table.primary_key)[0] - return self.visit_column(pk, result_map=result_map, use_schema=use_schema, **kwargs) + return self.visit_column(pk, result_map=result_map, **kwargs) else: return None elif column.table is None or not column.table.named_with_column: diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 3d95948cb..2cd10720a 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -2632,7 +2632,10 @@ class _ColumnClause(ColumnElement): return None if self.__label is None: if self.table is not None and self.table.named_with_column: - self.__label = self.table.name + "_" + self.name + if getattr(self.table, 'schema', None): + self.__label = "_".join([self.table.schema, self.table.name, self.name]) + else: + self.__label = "_".join([self.table.name, self.name]) counter = 1 while self.__label in self.table.c: self.__label = self.__label + "_%d" % counter diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py index 21b11f114..e68fd4d74 100644 --- a/test/dialect/postgres.py +++ b/test/dialect/postgres.py @@ -568,7 +568,31 @@ class MiscTest(TestBase, AssertsExecutionResults): self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause)) finally: meta1.drop_all() - + + def test_schema_roundtrips(self): + meta = MetaData(testing.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(50)), schema='alt_schema') + users.create() + try: + users.insert().execute(id=1, name='name1') + users.insert().execute(id=2, name='name2') + users.insert().execute(id=3, name='name3') + users.insert().execute(id=4, name='name4') + + self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')]) + self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')]) + + users.delete().where(users.c.id==3).execute() + self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), []) + + users.update().where(users.c.name=='name4').execute(name='newname') + self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')]) + + finally: + users.drop() + def test_preexecute_passivedefault(self): """test that when we get a primary key column back from reflecting a table which has a default value on it, we pre-execute diff --git a/test/sql/select.py b/test/sql/select.py index b8d1f4f68..06679e956 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -1360,16 +1360,16 @@ class SchemaTest(TestBase, AssertsCompiledSQL): @testing.fails_on('mssql') def test_select(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables - self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") + self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable") self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), - "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\ - "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1") + "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "\ + "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1") s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')) s.use_labels = True - self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value "\ - "AS remotetable_value FROM remote_owner.remotetable WHERE "\ - "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1") + self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\ + "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\ + "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1") def test_alias(self): a = alias(table4, 'remtable') @@ -1378,7 +1378,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL): def test_update(self): self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\ - "WHERE remotetable.value = :remotetable_value_1") + "WHERE remote_owner.remotetable.value = :remote_owner_remotetable_value_1") def test_insert(self): self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\ |