summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-02-05 14:22:55 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2012-02-05 14:22:55 -0500
commita4e3bc61bcb1f1aeaa334f6da4f3b9fcb3059d00 (patch)
treea1c1e25d24e6a65c7a368a85125818975f28c59a
parente0ec05366f7363edd1873c4d095e11151cdd4dff (diff)
downloadsqlalchemy-a4e3bc61bcb1f1aeaa334f6da4f3b9fcb3059d00.tar.gz
- [bug] A significant change to how labeling
is applied to columns in SELECT statements allows "truncated" labels, that is label names that are generated in Python which exceed the maximum identifier length (note this is configurable via label_length on create_engine()), to be properly referenced when rendered inside of a subquery, as well as to be present in a result set row using their original in-Python names. [ticket:2396] - apply pep8 to test_labels
-rw-r--r--CHANGES12
-rw-r--r--lib/sqlalchemy/dialects/firebird/base.py2
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py2
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py25
-rw-r--r--lib/sqlalchemy/engine/base.py37
-rw-r--r--lib/sqlalchemy/engine/default.py5
-rw-r--r--lib/sqlalchemy/schema.py4
-rw-r--r--lib/sqlalchemy/sql/compiler.py20
-rw-r--r--lib/sqlalchemy/sql/expression.py74
-rw-r--r--test/aaa_profiling/test_zoomark_orm.py2
-rw-r--r--test/sql/test_labels.py383
-rw-r--r--test/sql/test_query.py60
12 files changed, 452 insertions, 174 deletions
diff --git a/CHANGES b/CHANGES
index 5034b2475..336627a27 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,18 @@ CHANGES
happen if there's really an UPDATE to occur.
[ticket:2390]
+- sql
+ - [bug] A significant change to how labeling
+ is applied to columns in SELECT statements
+ allows "truncated" labels, that is label names
+ that are generated in Python which exceed
+ the maximum identifier length (note this is
+ configurable via label_length on create_engine()),
+ to be properly referenced when rendered inside
+ of a subquery, as well as to be present
+ in a result set row using their original
+ in-Python names. [ticket:2396]
+
- engine
- [feature] Added pool_reset_on_return argument
to create_engine, allows control over
diff --git a/lib/sqlalchemy/dialects/firebird/base.py b/lib/sqlalchemy/dialects/firebird/base.py
index 8cf2ded2f..031c68919 100644
--- a/lib/sqlalchemy/dialects/firebird/base.py
+++ b/lib/sqlalchemy/dialects/firebird/base.py
@@ -215,7 +215,7 @@ class FBCompiler(sql.compiler.SQLCompiler):
# Override to not use the AS keyword which FB 1.5 does not like
if asfrom:
alias_name = isinstance(alias.name,
- expression._generated_label) and \
+ expression._truncated_label) and \
self._truncated_identifier("alias",
alias.name) or alias.name
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index 88e506287..56838f803 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -481,7 +481,7 @@ class OracleCompiler(compiler.SQLCompiler):
"""Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??"""
if asfrom or ashint:
- alias_name = isinstance(alias.name, expression._generated_label) and \
+ alias_name = isinstance(alias.name, expression._truncated_label) and \
self._truncated_identifier("alias", alias.name) or alias.name
if ashint:
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index f9520affa..10a0d882b 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -441,20 +441,6 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
result = self.quote_schema(index.table.schema, index.table.quote_schema) + "." + result
return result
-class SQLiteExecutionContext(default.DefaultExecutionContext):
- def get_result_proxy(self):
- rp = base.ResultProxy(self)
- if rp._metadata:
- # adjust for dotted column names. SQLite
- # in the case of UNION may store col names as
- # "tablename.colname"
- # in cursor.description
- for colname in rp._metadata.keys:
- if "." in colname:
- trunc_col = colname.split(".")[1]
- rp._metadata._set_keymap_synonym(trunc_col, colname)
- return rp
-
class SQLiteDialect(default.DefaultDialect):
name = 'sqlite'
supports_alter = False
@@ -472,7 +458,6 @@ class SQLiteDialect(default.DefaultDialect):
ischema_names = ischema_names
colspecs = colspecs
isolation_level = None
- execution_ctx_cls = SQLiteExecutionContext
supports_cast = True
supports_default_values = True
@@ -540,6 +525,16 @@ class SQLiteDialect(default.DefaultDialect):
else:
return None
+ def _translate_colname(self, colname):
+ # adjust for dotted column names. SQLite
+ # in the case of UNION may store col names as
+ # "tablename.colname"
+ # in cursor.description
+ if "." in colname:
+ return colname.split(".")[1], colname
+ else:
+ return colname, None
+
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
if schema is not None:
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index db19fe7de..3175ebabd 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2575,6 +2575,10 @@ class ResultMetaData(object):
context = parent.context
dialect = context.dialect
typemap = dialect.dbapi_type_map
+ translate_colname = dialect._translate_colname
+
+ # high precedence key values.
+ primary_keymap = {}
for i, rec in enumerate(metadata):
colname = rec[0]
@@ -2583,6 +2587,9 @@ class ResultMetaData(object):
if dialect.description_encoding:
colname = dialect._description_decoder(colname)
+ if translate_colname:
+ colname, untranslated = translate_colname(colname)
+
if context.result_map:
try:
name, obj, type_ = context.result_map[colname.lower()]
@@ -2600,15 +2607,17 @@ class ResultMetaData(object):
# indexes as keys. This is only needed for the Python version of
# RowProxy (the C version uses a faster path for integer indexes).
- keymap[i] = rec
-
- # Column names as keys
- if keymap.setdefault(name.lower(), rec) is not rec:
- # We do not raise an exception directly because several
- # columns colliding by name is not a problem as long as the
- # user does not try to access them (ie use an index directly,
- # or the more precise ColumnElement)
- keymap[name.lower()] = (processor, obj, None)
+ primary_keymap[i] = rec
+
+ # populate primary keymap, looking for conflicts.
+ if primary_keymap.setdefault(name.lower(), rec) is not rec:
+ # place a record that doesn't have the "index" - this
+ # is interpreted later as an AmbiguousColumnError,
+ # but only when actually accessed. Columns
+ # colliding by name is not a problem if those names
+ # aren't used; integer and ColumnElement access is always
+ # unambiguous.
+ primary_keymap[name.lower()] = (processor, obj, None)
if dialect.requires_name_normalize:
colname = dialect.normalize_name(colname)
@@ -2618,10 +2627,20 @@ class ResultMetaData(object):
for o in obj:
keymap[o] = rec
+ if translate_colname and \
+ untranslated:
+ keymap[untranslated] = rec
+
+ # overwrite keymap values with those of the
+ # high precedence keymap.
+ keymap.update(primary_keymap)
+
if parent._echo:
context.engine.logger.debug(
"Col %r", tuple(x[0] for x in metadata))
+ @util.pending_deprecation("0.8", "sqlite dialect uses "
+ "_translate_colname() now")
def _set_keymap_synonym(self, name, origname):
"""Set a synonym for the given name.
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 73bd7fd71..a1e5a5799 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -44,6 +44,7 @@ class DefaultDialect(base.Dialect):
postfetch_lastrowid = True
implicit_returning = False
+
supports_native_enum = False
supports_native_boolean = False
@@ -95,6 +96,10 @@ class DefaultDialect(base.Dialect):
# and denormalize_name() must be provided.
requires_name_normalize = False
+ # a hook for SQLite's translation of
+ # result column names
+ _translate_colname = None
+
reflection_options = ()
def __init__(self, convert_unicode=False, assert_unicode=False,
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index f0a929776..c183e4385 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -1028,7 +1028,7 @@ class Column(SchemaItem, expression.ColumnClause):
"The 'index' keyword argument on Column is boolean only. "
"To create indexes with a specific name, create an "
"explicit Index object external to the Table.")
- Index(expression._generated_label('ix_%s' % self._label), self, unique=self.unique)
+ Index(expression._truncated_label('ix_%s' % self._label), self, unique=self.unique)
elif self.unique:
if isinstance(self.unique, basestring):
raise exc.ArgumentError(
@@ -1093,7 +1093,7 @@ class Column(SchemaItem, expression.ColumnClause):
"been assigned.")
try:
c = self._constructor(
- name or self.name,
+ expression._as_truncated(name or self.name),
self.type,
key = name or self.key,
primary_key = self.primary_key,
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index b0a55b886..598e87932 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -354,7 +354,7 @@ class SQLCompiler(engine.Compiled):
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
if within_columns_clause and not within_label_clause:
- if isinstance(label.name, sql._generated_label):
+ if isinstance(label.name, sql._truncated_label):
labelname = self._truncated_identifier("colident", label.name)
else:
labelname = label.name
@@ -376,17 +376,17 @@ class SQLCompiler(engine.Compiled):
**kw)
def visit_column(self, column, result_map=None, **kwargs):
- name = column.name
+ name = orig_name = column.name
if name is None:
raise exc.CompileError("Cannot compile Column object until "
"it's 'name' is assigned.")
is_literal = column.is_literal
- if not is_literal and isinstance(name, sql._generated_label):
+ if not is_literal and isinstance(name, sql._truncated_label):
name = self._truncated_identifier("colident", name)
if result_map is not None:
- result_map[name.lower()] = (name, (column, ), column.type)
+ result_map[name.lower()] = (orig_name, (column, name), column.type)
if is_literal:
name = self.escape_literal_column(name)
@@ -404,7 +404,7 @@ class SQLCompiler(engine.Compiled):
else:
schema_prefix = ''
tablename = table.name
- if isinstance(tablename, sql._generated_label):
+ if isinstance(tablename, sql._truncated_label):
tablename = self._truncated_identifier("alias", tablename)
return schema_prefix + \
@@ -703,7 +703,7 @@ class SQLCompiler(engine.Compiled):
return self.bind_names[bindparam]
bind_name = bindparam.key
- if isinstance(bind_name, sql._generated_label):
+ if isinstance(bind_name, sql._truncated_label):
bind_name = self._truncated_identifier("bindparam", bind_name)
# add to bind_names for translation
@@ -715,7 +715,7 @@ class SQLCompiler(engine.Compiled):
if (ident_class, name) in self.truncated_names:
return self.truncated_names[(ident_class, name)]
- anonname = name % self.anon_map
+ anonname = name.apply_map(self.anon_map)
if len(anonname) > self.label_length:
counter = self.truncated_names.get(ident_class, 1)
@@ -747,7 +747,7 @@ class SQLCompiler(engine.Compiled):
def visit_alias(self, alias, asfrom=False, ashint=False,
fromhints=None, **kwargs):
if asfrom or ashint:
- if isinstance(alias.name, sql._generated_label):
+ if isinstance(alias.name, sql._truncated_label):
alias_name = self._truncated_identifier("alias", alias.name)
else:
alias_name = alias.name
@@ -784,7 +784,7 @@ class SQLCompiler(engine.Compiled):
not column.is_literal and \
column.table is not None and \
not isinstance(column.table, sql.Select):
- return _CompileLabel(column, sql._generated_label(column.name))
+ return _CompileLabel(column, sql._as_truncated(column.name))
elif not isinstance(column,
(sql._UnaryExpression, sql._TextClause)) \
and (not hasattr(column, 'name') or \
@@ -1445,7 +1445,7 @@ class DDLCompiler(engine.Compiled):
return "\nDROP TABLE " + self.preparer.format_table(drop.element)
def _index_identifier(self, ident):
- if isinstance(ident, sql._generated_label):
+ if isinstance(ident, sql._truncated_label):
max = self.dialect.max_index_name_length or \
self.dialect.max_identifier_length
if len(ident) > max:
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 859ee0437..939456b9a 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -1283,14 +1283,48 @@ func = _FunctionGenerator()
# TODO: use UnaryExpression for this instead ?
modifier = _FunctionGenerator(group=False)
-class _generated_label(unicode):
- """A unicode subclass used to identify dynamically generated names."""
+class _truncated_label(unicode):
+ """A unicode subclass used to identify symbolic "
+ "names that may require truncation."""
-def _escape_for_generated(x):
- if isinstance(x, _generated_label):
- return x
+ def apply_map(self, map_):
+ return self
+
+# for backwards compatibility in case
+# someone is re-implementing the
+# _truncated_identifier() sequence in a custom
+# compiler
+_generated_label = _truncated_label
+
+class _anonymous_label(_truncated_label):
+ """A unicode subclass used to identify anonymously
+ generated names."""
+
+ def __add__(self, other):
+ return _anonymous_label(
+ unicode(self) +
+ unicode(other))
+
+ def __radd__(self, other):
+ return _anonymous_label(
+ unicode(other) +
+ unicode(self))
+
+ def apply_map(self, map_):
+ return self % map_
+
+def _as_truncated(value):
+ """coerce the given value to :class:`._truncated_label`.
+
+ Existing :class:`._truncated_label` and
+ :class:`._anonymous_label` objects are passed
+ unchanged.
+ """
+
+ if isinstance(value, _truncated_label):
+ return value
else:
- return x.replace('%', '%%')
+ return _truncated_label(value)
def _string_or_unprintable(element):
if isinstance(element, basestring):
@@ -2117,7 +2151,9 @@ class ColumnElement(ClauseElement, _CompareMixin):
else:
key = name
- co = ColumnClause(name, selectable, type_=getattr(self,
+ co = ColumnClause(_as_truncated(name),
+ selectable,
+ type_=getattr(self,
'type', None))
co.proxies = [self]
selectable._columns[key] = co
@@ -2165,7 +2201,7 @@ class ColumnElement(ClauseElement, _CompareMixin):
expressions and function calls.
"""
- return _generated_label('%%(%d %s)s' % (id(self), getattr(self,
+ return _anonymous_label('%%(%d %s)s' % (id(self), getattr(self,
'name', 'anon')))
class ColumnCollection(util.OrderedProperties):
@@ -2588,10 +2624,10 @@ class _BindParamClause(ColumnElement):
"""
if unique:
- self.key = _generated_label('%%(%d %s)s' % (id(self), key
+ self.key = _anonymous_label('%%(%d %s)s' % (id(self), key
or 'param'))
else:
- self.key = key or _generated_label('%%(%d param)s'
+ self.key = key or _anonymous_label('%%(%d param)s'
% id(self))
# identifiying key that won't change across
@@ -2639,14 +2675,14 @@ class _BindParamClause(ColumnElement):
def _clone(self):
c = ClauseElement._clone(self)
if self.unique:
- c.key = _generated_label('%%(%d %s)s' % (id(c), c._orig_key
+ c.key = _anonymous_label('%%(%d %s)s' % (id(c), c._orig_key
or 'param'))
return c
def _convert_to_unique(self):
if not self.unique:
self.unique = True
- self.key = _generated_label('%%(%d %s)s' % (id(self),
+ self.key = _anonymous_label('%%(%d %s)s' % (id(self),
self._orig_key or 'param'))
def compare(self, other, **kw):
@@ -3615,7 +3651,7 @@ class Alias(FromClause):
if name is None:
if self.original.named_with_column:
name = getattr(self.original, 'name', None)
- name = _generated_label('%%(%d %s)s' % (id(self), name
+ name = _anonymous_label('%%(%d %s)s' % (id(self), name
or 'anon'))
self.name = name
@@ -3816,7 +3852,7 @@ class _Label(ColumnElement):
while isinstance(element, _Label):
element = element.element
self.name = self.key = self._label = name \
- or _generated_label('%%(%d %s)s' % (id(self),
+ or _anonymous_label('%%(%d %s)s' % (id(self),
getattr(element, 'name', 'anon')))
self._element = element
self._type = type_
@@ -3973,11 +4009,9 @@ class ColumnClause(_Immutable, ColumnElement):
elif t is not None and t.named_with_column:
if getattr(t, 'schema', None):
label = t.schema.replace('.', '_') + "_" + \
- _escape_for_generated(t.name) + "_" + \
- _escape_for_generated(self.name)
+ t.name + "_" + self.name
else:
- label = _escape_for_generated(t.name) + "_" + \
- _escape_for_generated(self.name)
+ label = t.name + "_" + self.name
# ensure the label name doesn't conflict with that
# of an existing column
@@ -3989,7 +4023,7 @@ class ColumnClause(_Immutable, ColumnElement):
counter += 1
label = _label
- return _generated_label(label)
+ return _as_truncated(label)
else:
return self.name
@@ -4018,7 +4052,7 @@ class ColumnClause(_Immutable, ColumnElement):
# otherwise its considered to be a label
is_literal = self.is_literal and (name is None or name == self.name)
c = self._constructor(
- name or self.name,
+ _as_truncated(name or self.name),
selectable=selectable,
type_=self.type,
is_literal=is_literal
diff --git a/test/aaa_profiling/test_zoomark_orm.py b/test/aaa_profiling/test_zoomark_orm.py
index 483ca335a..59391b499 100644
--- a/test/aaa_profiling/test_zoomark_orm.py
+++ b/test/aaa_profiling/test_zoomark_orm.py
@@ -340,7 +340,7 @@ class ZooMarkTest(fixtures.TestBase):
def test_profile_1a_populate(self):
self.test_baseline_1a_populate()
- @profiling.function_call_count(393, {'3.2':360})
+ @profiling.function_call_count(413, {'3.2':360})
def test_profile_2_insert(self):
self.test_baseline_2_insert()
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index 9f26d899f..ad4727b9b 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -1,8 +1,10 @@
-from test.lib.testing import assert_raises, assert_raises_message
+from test.lib.testing import assert_raises, assert_raises_message, eq_
+from test.lib.engines import testing_engine
+from test.lib import fixtures, AssertsCompiledSQL, testing
from sqlalchemy import *
from sqlalchemy import exc as exceptions
-from test.lib import *
from sqlalchemy.engine import default
+from sqlalchemy.sql import table, column
IDENT_LENGTH = 29
@@ -13,132 +15,187 @@ class LabelTypeTest(fixtures.TestBase):
Column('col1', Integer),
Column('col2', Float))
assert isinstance(t.c.col1.label('hi').type, Integer)
- assert isinstance(select([t.c.col2]).as_scalar().label('lala').type, Float)
+ assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
+ Float)
+
+class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
+ run_inserts = 'once'
+ run_deletes = None
-class LongLabelsTest(fixtures.TestBase, AssertsCompiledSQL):
@classmethod
- def setup_class(cls):
- global metadata, table1, table2, maxlen
- metadata = MetaData(testing.db)
+ def define_tables(cls, metadata):
table1 = Table("some_large_named_table", metadata,
- Column("this_is_the_primarykey_column", Integer, Sequence("this_is_some_large_seq"), primary_key=True),
+ Column("this_is_the_primarykey_column", Integer,
+ primary_key=True),
Column("this_is_the_data_column", String(30))
)
table2 = Table("table_with_exactly_29_characs", metadata,
- Column("this_is_the_primarykey_column", Integer, Sequence("some_seq"), primary_key=True),
+ Column("this_is_the_primarykey_column", Integer,
+ primary_key=True),
Column("this_is_the_data_column", String(30))
)
+ cls.tables.table1 = table1
+ cls.tables.table2 = table2
- metadata.create_all()
+ @classmethod
+ def insert_data(cls):
+ table1 = cls.tables.table1
+ table2 = cls.tables.table2
+ for data in [
+ {"this_is_the_primarykey_column":1,
+ "this_is_the_data_column":"data1"},
+ {"this_is_the_primarykey_column":2,
+ "this_is_the_data_column":"data2"},
+ {"this_is_the_primarykey_column":3,
+ "this_is_the_data_column":"data3"},
+ {"this_is_the_primarykey_column":4,
+ "this_is_the_data_column":"data4"}
+ ]:
+ testing.db.execute(
+ table1.insert(),
+ **data
+ )
+ testing.db.execute(
+ table2.insert(),
+ {"this_is_the_primary_key_column":1,
+ "this_is_the_data_column":"data"}
+ )
- maxlen = testing.db.dialect.max_identifier_length
+ @classmethod
+ def setup_class(cls):
+ super(LongLabelsTest, cls).setup_class()
+ cls.maxlen = testing.db.dialect.max_identifier_length
testing.db.dialect.max_identifier_length = IDENT_LENGTH
- @engines.close_first
- def teardown(self):
- table1.delete().execute()
-
@classmethod
def teardown_class(cls):
- metadata.drop_all()
- testing.db.dialect.max_identifier_length = maxlen
+ testing.db.dialect.max_identifier_length = cls.maxlen
+ super(LongLabelsTest, cls).teardown_class()
def test_too_long_name_disallowed(self):
m = MetaData(testing.db)
- t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test", m, Column('foo', Integer))
+ t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test",
+ m, Column('foo', Integer))
assert_raises(exceptions.IdentifierError, m.create_all)
assert_raises(exceptions.IdentifierError, m.drop_all)
assert_raises(exceptions.IdentifierError, t1.create)
assert_raises(exceptions.IdentifierError, t1.drop)
- def test_result(self):
- table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"})
-
- s = table1.select(use_labels=True, order_by=[table1.c.this_is_the_primarykey_column])
- r = s.execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
+ def test_basic_result(self):
+ table1 = self.tables.table1
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column])
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
(1, "data1"),
(2, "data2"),
(3, "data3"),
(4, "data4"),
- ], repr(result)
+ ])
- # some dialects such as oracle (and possibly ms-sql in a future version)
+ def test_result_limit(self):
+ table1 = self.tables.table1
+ # some dialects such as oracle (and possibly ms-sql
+ # in a future version)
# generate a subquery for limits/offsets.
- # ensure that the generated result map corresponds to the selected table, not
+ # ensure that the generated result map corresponds
+ # to the selected table, not
# the select query
- r = s.limit(2).execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column]).\
+ limit(2)
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
(1, "data1"),
(2, "data2"),
- ], repr(result)
-
- @testing.requires.offset
- def go():
- r = s.limit(2).offset(1).execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
- (2, "data2"),
- (3, "data3"),
- ], repr(result)
- go()
-
- def test_table_alias_names(self):
+ ])
+
+ @testing.requires.offset
+ def test_result_limit_offset(self):
+ table1 = self.tables.table1
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column]).\
+ limit(2).offset(1)
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
+ (2, "data2"),
+ (3, "data3"),
+ ])
+
+ def test_table_alias_1(self):
+ table2 = self.tables.table2
if testing.against('oracle'):
self.assert_compile(
table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs table_with_exactly_29_c_1"
+ "SELECT table_with_exactly_29_c_1."
+ "this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "FROM table_with_exactly_29_characs "
+ "table_with_exactly_29_c_1"
)
else:
self.assert_compile(
table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
+ "SELECT table_with_exactly_29_c_1."
+ "this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "FROM table_with_exactly_29_characs AS "
+ "table_with_exactly_29_c_1"
)
+ def test_table_alias_2(self):
+ table1 = self.tables.table1
+ table2 = self.tables.table2
ta = table2.alias()
dialect = default.DefaultDialect()
dialect.max_identifier_length = IDENT_LENGTH
self.assert_compile(
- select([table1, ta]).select_from(table1.join(ta, table1.c.this_is_the_data_column==ta.c.this_is_the_data_column)).\
+ select([table1, ta]).select_from(
+ table1.join(ta,
+ table1.c.this_is_the_data_column==
+ ta.c.this_is_the_data_column)).\
where(ta.c.this_is_the_data_column=='data3'),
- "SELECT some_large_named_table.this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column, "
- "table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM "
- "some_large_named_table JOIN table_with_exactly_29_characs AS table_with_exactly_29_c_1 ON "
- "some_large_named_table.this_is_the_data_column = table_with_exactly_29_c_1.this_is_the_data_column "
- "WHERE table_with_exactly_29_c_1.this_is_the_data_column = :this_is_the_data_column_1",
+ "SELECT some_large_named_table.this_is_the_primarykey_column, "
+ "some_large_named_table.this_is_the_data_column, "
+ "table_with_exactly_29_c_1.this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column FROM "
+ "some_large_named_table JOIN table_with_exactly_29_characs "
+ "AS table_with_exactly_29_c_1 ON "
+ "some_large_named_table.this_is_the_data_column = "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "WHERE table_with_exactly_29_c_1.this_is_the_data_column = "
+ ":this_is_the_data_column_1",
dialect=dialect
)
- table2.insert().execute(
- {"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"},
- {"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"},
- {"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"},
- {"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"},
+ def test_table_alias_3(self):
+ table2 = self.tables.table2
+ eq_(
+ testing.db.execute(table2.alias().select()).first(),
+ (1, "data")
)
- r = table2.alias().select().execute()
- assert r.fetchall() == [(x, "data%d" % x) for x in range(1, 5)]
-
def test_colbinds(self):
- table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"})
-
- r = table1.select(table1.c.this_is_the_primarykey_column == 4).execute()
+ table1 = self.tables.table1
+ r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
+ execute()
assert r.fetchall() == [(4, "data4")]
r = table1.select(or_(
@@ -147,59 +204,189 @@ class LongLabelsTest(fixtures.TestBase, AssertsCompiledSQL):
)).execute()
assert r.fetchall() == [(2, "data2"), (4, "data4")]
+ @testing.provide_metadata
def test_insert_no_pk(self):
- table1.insert().execute(**{"this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_data_column":"data4"})
+ t = Table("some_other_large_named_table", self.metadata,
+ Column("this_is_the_primarykey_column", Integer,
+ Sequence("this_is_some_large_seq"),
+ primary_key=True),
+ Column("this_is_the_data_column", String(30))
+ )
+ t.create(testing.db, checkfirst=True)
+ testing.db.execute(t.insert(),
+ **{"this_is_the_data_column":"data1"})
@testing.requires.subqueries
def test_subquery(self):
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
- x = select([q])
- print x.execute().fetchall()
+ table1 = self.tables.table1
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
+ alias('foo')
+ eq_(
+ list(testing.db.execute(select([q]))),
+ [(4, u'data4')]
+ )
@testing.requires.subqueries
def test_anon_alias(self):
+ table1 = self.tables.table1
compile_dialect = default.DefaultDialect()
compile_dialect.max_identifier_length = IDENT_LENGTH
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
- self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column AS anon_1_this_is_the_data_2 "
- "FROM (SELECT some_large_named_table.this_is_the_primarykey_column AS this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column AS this_is_the_data_column "
+ self.assert_compile(x,
+ "SELECT anon_1.this_is_the_primarykey_column AS "
+ "anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column "
+ "AS anon_1_this_is_the_data_2 "
+ "FROM (SELECT some_large_named_table."
+ "this_is_the_primarykey_column AS "
+ "this_is_the_primarykey_column, "
+ "some_large_named_table.this_is_the_data_column "
+ "AS this_is_the_data_column "
"FROM some_large_named_table "
- "WHERE some_large_named_table.this_is_the_primarykey_column = :this_is_the_primarykey__1) AS anon_1", dialect=compile_dialect)
+ "WHERE some_large_named_table.this_is_the_primarykey_column "
+ "= :this_is_the_primarykey__1) AS anon_1",
+ dialect=compile_dialect)
- print x.execute().fetchall()
+ eq_(
+ list(testing.db.execute(x)),
+ [(4, u'data4')]
+ )
def test_adjustable(self):
+ table1 = self.tables.table1
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
x = select([q])
compile_dialect = default.DefaultDialect(label_length=10)
- self.assert_compile(x, "SELECT foo.this_is_the_primarykey_column, foo.this_is_the_data_column FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS this_1, some_large_named_table.this_is_the_data_column "
- "AS this_2 FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo", dialect=compile_dialect)
+ self.assert_compile(x,
+ "SELECT foo.this_1, foo.this_2 FROM "
+ "(SELECT some_large_named_table."
+ "this_is_the_primarykey_column AS this_1, "
+ "some_large_named_table.this_is_the_data_column AS this_2 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo",
+ dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
- self.assert_compile(x, "SELECT foo.this_is_the_primarykey_column, foo.this_is_the_data_column FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS _1, some_large_named_table.this_is_the_data_column AS _2 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS foo", dialect=compile_dialect)
+ self.assert_compile(x, "SELECT foo._1, foo._2 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS _1, some_large_named_table.this_is_the_data_column AS _2 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :_1) AS foo",
+ dialect=compile_dialect)
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
compile_dialect = default.DefaultDialect(label_length=10)
- self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS anon_1, anon_1.this_is_the_data_column AS anon_2 FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS this_3, some_large_named_table.this_is_the_data_column AS this_4 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1", dialect=compile_dialect)
+ self.assert_compile(x,
+ "SELECT anon_1.this_2 AS anon_1, anon_1.this_4 AS anon_3 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS this_2, some_large_named_table.this_is_the_data_column AS this_4 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1",
+ dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
- self.assert_compile(x, "SELECT _1.this_is_the_primarykey_column AS _1, _1.this_is_the_data_column AS _2 FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS _3, some_large_named_table.this_is_the_data_column AS _4 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", dialect=compile_dialect)
+ self.assert_compile(x, "SELECT _1._2 AS _1, _1._4 AS _3 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS _2, some_large_named_table.this_is_the_data_column AS _4 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :_1) AS _1",
+ dialect=compile_dialect)
+
+ def test_adjustable_result_schema_column(self):
+ table1 = self.tables.table1
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ x = select([q])
+ e = testing_engine(options={"label_length":10})
+ e.pool = testing.db.pool
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+ eq_(row['this_1'], 4)
+
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+
+ def test_adjustable_result_lightweight_column(self):
+
+ table1 = table("some_large_named_table",
+ column("this_is_the_primarykey_column"),
+ column("this_is_the_data_column")
+ )
+
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ x = select([q])
+
+ e = testing_engine(options={"label_length":10})
+ e.pool = testing.db.pool
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+
+ def test_table_plus_column_exceeds_length(self):
+ """test that the truncation occurs if tablename / colname are only
+ greater than the max when concatenated."""
+
+ compile_dialect = default.DefaultDialect(label_length=30)
+ m = MetaData()
+ a_table = Table(
+ 'thirty_characters_table_xxxxxx',
+ m,
+ Column('id', Integer, primary_key=True)
+ )
+
+ other_table = Table(
+ 'other_thirty_characters_table_',
+ m,
+ Column('id', Integer, primary_key=True),
+ Column('thirty_characters_table_id',
+ Integer,
+ ForeignKey('thirty_characters_table_xxxxxx.id'),
+ primary_key=True
+ )
+ )
+
+ anon = a_table.alias()
+ self.assert_compile(
+ select([other_table,anon]).
+ select_from(
+ other_table.outerjoin(anon)
+ ).apply_labels(),
+ "SELECT other_thirty_characters_table_.id AS "
+ "other_thirty_characters__1, "
+ "other_thirty_characters_table_.thirty_characters_table_id "
+ "AS other_thirty_characters__2, thirty_characters_table__1.id "
+ "AS thirty_characters_table__3 "
+ "FROM other_thirty_characters_table_ "
+ "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
+ "AS thirty_characters_table__1 ON "
+ "thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
+ dialect=compile_dialect)
+
+ self.assert_compile(
+ select([other_table, anon]).
+ select_from(
+ other_table.outerjoin(anon)
+ ).apply_labels(),
+ "SELECT other_thirty_characters_table_.id AS "
+ "other_thirty_characters__1, "
+ "other_thirty_characters_table_.thirty_characters_table_id "
+ "AS other_thirty_characters__2, "
+ "thirty_characters_table__1.id AS thirty_characters_table__3 "
+ "FROM other_thirty_characters_table_ "
+ "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
+ "AS thirty_characters_table__1 ON "
+ "thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
+ dialect=compile_dialect
+ )
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 9725d3d3a..f9ec82a6a 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -1094,12 +1094,16 @@ class PercentSchemaNamesTest(fixtures.TestBase):
@classmethod
def setup_class(cls):
- global percent_table, metadata
+ global percent_table, metadata, lightweight_percent_table
metadata = MetaData(testing.db)
percent_table = Table('percent%table', metadata,
Column("percent%", Integer),
Column("spaces % more spaces", Integer),
)
+ lightweight_percent_table = sql.table('percent%table',
+ sql.column("percent%"),
+ sql.column("spaces % more spaces"),
+ )
metadata.create_all()
def teardown(self):
@@ -1109,7 +1113,8 @@ class PercentSchemaNamesTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.skip_if(lambda: testing.against('postgresql'), "psycopg2 2.4 no longer accepts % in bind placeholders")
+ @testing.skip_if(lambda: testing.against('postgresql'),
+ "psycopg2 2.4 no longer accepts % in bind placeholders")
def test_single_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, 'spaces % more spaces':12},
@@ -1125,8 +1130,10 @@ class PercentSchemaNamesTest(fixtures.TestBase):
)
self._assert_table()
- @testing.skip_if(lambda: testing.against('postgresql'), "psycopg2 2.4 no longer accepts % in bind placeholders")
- @testing.crashes('mysql+mysqldb', 'MySQLdb handles executemany() inconsistently vs. execute()')
+ @testing.skip_if(lambda: testing.against('postgresql'),
+ "psycopg2 2.4 no longer accepts % in bind placeholders")
+ @testing.crashes('mysql+mysqldb', "MySQLdb handles executemany() "
+ "inconsistently vs. execute()")
def test_executemany_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, 'spaces % more spaces':12},
@@ -1139,9 +1146,17 @@ class PercentSchemaNamesTest(fixtures.TestBase):
self._assert_table()
def _assert_table(self):
- for table in (percent_table, percent_table.alias()):
+ for table in (
+ percent_table,
+ percent_table.alias(),
+ lightweight_percent_table,
+ lightweight_percent_table.alias()):
eq_(
- table.select().order_by(table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ table.select().order_by(table.c['percent%'])
+ )
+ ),
[
(5, 12),
(7, 11),
@@ -1151,28 +1166,39 @@ class PercentSchemaNamesTest(fixtures.TestBase):
)
eq_(
- table.select().
- where(table.c['spaces % more spaces'].in_([9, 10])).
- order_by(table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ table.select().
+ where(table.c['spaces % more spaces'].in_([9, 10])).
+ order_by(table.c['percent%']),
+ )
+ ),
[
(9, 10),
(11, 9)
]
)
- result = table.select().order_by(table.c['percent%']).execute()
- row = result.fetchone()
+ row = testing.db.execute(table.select().\
+ order_by(table.c['percent%'])).first()
+ eq_(row['percent%'], 5)
+ eq_(row['spaces % more spaces'], 12)
+
eq_(row[table.c['percent%']], 5)
eq_(row[table.c['spaces % more spaces']], 12)
- row = result.fetchone()
- eq_(row['percent%'], 7)
- eq_(row['spaces % more spaces'], 11)
- result.close()
- percent_table.update().values({percent_table.c['spaces % more spaces']:15}).execute()
+ percent_table.update().values(
+ {percent_table.c['spaces % more spaces']:15}
+ ).execute()
eq_(
- percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ percent_table.\
+ select().\
+ order_by(percent_table.c['percent%'])
+ )
+ ),
[
(5, 15),
(7, 15),