summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES12
-rw-r--r--lib/sqlalchemy/dialects/firebird/base.py2
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py2
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py25
-rw-r--r--lib/sqlalchemy/engine/base.py37
-rw-r--r--lib/sqlalchemy/engine/default.py5
-rw-r--r--lib/sqlalchemy/schema.py4
-rw-r--r--lib/sqlalchemy/sql/compiler.py20
-rw-r--r--lib/sqlalchemy/sql/expression.py74
-rw-r--r--test/aaa_profiling/test_zoomark_orm.py2
-rw-r--r--test/sql/test_labels.py383
-rw-r--r--test/sql/test_query.py60
12 files changed, 452 insertions, 174 deletions
diff --git a/CHANGES b/CHANGES
index 5034b2475..336627a27 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,18 @@ CHANGES
happen if there's really an UPDATE to occur.
[ticket:2390]
+- sql
+ - [bug] A significant change to how labeling
+ is applied to columns in SELECT statements
+ allows "truncated" labels, that is label names
+ that are generated in Python which exceed
+ the maximum identifier length (note this is
+ configurable via label_length on create_engine()),
+ to be properly referenced when rendered inside
+ of a subquery, as well as to be present
+ in a result set row using their original
+ in-Python names. [ticket:2396]
+
- engine
- [feature] Added pool_reset_on_return argument
to create_engine, allows control over
diff --git a/lib/sqlalchemy/dialects/firebird/base.py b/lib/sqlalchemy/dialects/firebird/base.py
index 8cf2ded2f..031c68919 100644
--- a/lib/sqlalchemy/dialects/firebird/base.py
+++ b/lib/sqlalchemy/dialects/firebird/base.py
@@ -215,7 +215,7 @@ class FBCompiler(sql.compiler.SQLCompiler):
# Override to not use the AS keyword which FB 1.5 does not like
if asfrom:
alias_name = isinstance(alias.name,
- expression._generated_label) and \
+ expression._truncated_label) and \
self._truncated_identifier("alias",
alias.name) or alias.name
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index 88e506287..56838f803 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -481,7 +481,7 @@ class OracleCompiler(compiler.SQLCompiler):
"""Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??"""
if asfrom or ashint:
- alias_name = isinstance(alias.name, expression._generated_label) and \
+ alias_name = isinstance(alias.name, expression._truncated_label) and \
self._truncated_identifier("alias", alias.name) or alias.name
if ashint:
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index f9520affa..10a0d882b 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -441,20 +441,6 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
result = self.quote_schema(index.table.schema, index.table.quote_schema) + "." + result
return result
-class SQLiteExecutionContext(default.DefaultExecutionContext):
- def get_result_proxy(self):
- rp = base.ResultProxy(self)
- if rp._metadata:
- # adjust for dotted column names. SQLite
- # in the case of UNION may store col names as
- # "tablename.colname"
- # in cursor.description
- for colname in rp._metadata.keys:
- if "." in colname:
- trunc_col = colname.split(".")[1]
- rp._metadata._set_keymap_synonym(trunc_col, colname)
- return rp
-
class SQLiteDialect(default.DefaultDialect):
name = 'sqlite'
supports_alter = False
@@ -472,7 +458,6 @@ class SQLiteDialect(default.DefaultDialect):
ischema_names = ischema_names
colspecs = colspecs
isolation_level = None
- execution_ctx_cls = SQLiteExecutionContext
supports_cast = True
supports_default_values = True
@@ -540,6 +525,16 @@ class SQLiteDialect(default.DefaultDialect):
else:
return None
+ def _translate_colname(self, colname):
+ # adjust for dotted column names. SQLite
+ # in the case of UNION may store col names as
+ # "tablename.colname"
+ # in cursor.description
+ if "." in colname:
+ return colname.split(".")[1], colname
+ else:
+ return colname, None
+
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
if schema is not None:
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index db19fe7de..3175ebabd 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2575,6 +2575,10 @@ class ResultMetaData(object):
context = parent.context
dialect = context.dialect
typemap = dialect.dbapi_type_map
+ translate_colname = dialect._translate_colname
+
+ # high precedence key values.
+ primary_keymap = {}
for i, rec in enumerate(metadata):
colname = rec[0]
@@ -2583,6 +2587,9 @@ class ResultMetaData(object):
if dialect.description_encoding:
colname = dialect._description_decoder(colname)
+ if translate_colname:
+ colname, untranslated = translate_colname(colname)
+
if context.result_map:
try:
name, obj, type_ = context.result_map[colname.lower()]
@@ -2600,15 +2607,17 @@ class ResultMetaData(object):
# indexes as keys. This is only needed for the Python version of
# RowProxy (the C version uses a faster path for integer indexes).
- keymap[i] = rec
-
- # Column names as keys
- if keymap.setdefault(name.lower(), rec) is not rec:
- # We do not raise an exception directly because several
- # columns colliding by name is not a problem as long as the
- # user does not try to access them (ie use an index directly,
- # or the more precise ColumnElement)
- keymap[name.lower()] = (processor, obj, None)
+ primary_keymap[i] = rec
+
+ # populate primary keymap, looking for conflicts.
+ if primary_keymap.setdefault(name.lower(), rec) is not rec:
+ # place a record that doesn't have the "index" - this
+ # is interpreted later as an AmbiguousColumnError,
+ # but only when actually accessed. Columns
+ # colliding by name is not a problem if those names
+ # aren't used; integer and ColumnElement access is always
+ # unambiguous.
+ primary_keymap[name.lower()] = (processor, obj, None)
if dialect.requires_name_normalize:
colname = dialect.normalize_name(colname)
@@ -2618,10 +2627,20 @@ class ResultMetaData(object):
for o in obj:
keymap[o] = rec
+ if translate_colname and \
+ untranslated:
+ keymap[untranslated] = rec
+
+ # overwrite keymap values with those of the
+ # high precedence keymap.
+ keymap.update(primary_keymap)
+
if parent._echo:
context.engine.logger.debug(
"Col %r", tuple(x[0] for x in metadata))
+ @util.pending_deprecation("0.8", "sqlite dialect uses "
+ "_translate_colname() now")
def _set_keymap_synonym(self, name, origname):
"""Set a synonym for the given name.
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 73bd7fd71..a1e5a5799 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -44,6 +44,7 @@ class DefaultDialect(base.Dialect):
postfetch_lastrowid = True
implicit_returning = False
+
supports_native_enum = False
supports_native_boolean = False
@@ -95,6 +96,10 @@ class DefaultDialect(base.Dialect):
# and denormalize_name() must be provided.
requires_name_normalize = False
+ # a hook for SQLite's translation of
+ # result column names
+ _translate_colname = None
+
reflection_options = ()
def __init__(self, convert_unicode=False, assert_unicode=False,
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index f0a929776..c183e4385 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -1028,7 +1028,7 @@ class Column(SchemaItem, expression.ColumnClause):
"The 'index' keyword argument on Column is boolean only. "
"To create indexes with a specific name, create an "
"explicit Index object external to the Table.")
- Index(expression._generated_label('ix_%s' % self._label), self, unique=self.unique)
+ Index(expression._truncated_label('ix_%s' % self._label), self, unique=self.unique)
elif self.unique:
if isinstance(self.unique, basestring):
raise exc.ArgumentError(
@@ -1093,7 +1093,7 @@ class Column(SchemaItem, expression.ColumnClause):
"been assigned.")
try:
c = self._constructor(
- name or self.name,
+ expression._as_truncated(name or self.name),
self.type,
key = name or self.key,
primary_key = self.primary_key,
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index b0a55b886..598e87932 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -354,7 +354,7 @@ class SQLCompiler(engine.Compiled):
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
if within_columns_clause and not within_label_clause:
- if isinstance(label.name, sql._generated_label):
+ if isinstance(label.name, sql._truncated_label):
labelname = self._truncated_identifier("colident", label.name)
else:
labelname = label.name
@@ -376,17 +376,17 @@ class SQLCompiler(engine.Compiled):
**kw)
def visit_column(self, column, result_map=None, **kwargs):
- name = column.name
+ name = orig_name = column.name
if name is None:
raise exc.CompileError("Cannot compile Column object until "
"it's 'name' is assigned.")
is_literal = column.is_literal
- if not is_literal and isinstance(name, sql._generated_label):
+ if not is_literal and isinstance(name, sql._truncated_label):
name = self._truncated_identifier("colident", name)
if result_map is not None:
- result_map[name.lower()] = (name, (column, ), column.type)
+ result_map[name.lower()] = (orig_name, (column, name), column.type)
if is_literal:
name = self.escape_literal_column(name)
@@ -404,7 +404,7 @@ class SQLCompiler(engine.Compiled):
else:
schema_prefix = ''
tablename = table.name
- if isinstance(tablename, sql._generated_label):
+ if isinstance(tablename, sql._truncated_label):
tablename = self._truncated_identifier("alias", tablename)
return schema_prefix + \
@@ -703,7 +703,7 @@ class SQLCompiler(engine.Compiled):
return self.bind_names[bindparam]
bind_name = bindparam.key
- if isinstance(bind_name, sql._generated_label):
+ if isinstance(bind_name, sql._truncated_label):
bind_name = self._truncated_identifier("bindparam", bind_name)
# add to bind_names for translation
@@ -715,7 +715,7 @@ class SQLCompiler(engine.Compiled):
if (ident_class, name) in self.truncated_names:
return self.truncated_names[(ident_class, name)]
- anonname = name % self.anon_map
+ anonname = name.apply_map(self.anon_map)
if len(anonname) > self.label_length:
counter = self.truncated_names.get(ident_class, 1)
@@ -747,7 +747,7 @@ class SQLCompiler(engine.Compiled):
def visit_alias(self, alias, asfrom=False, ashint=False,
fromhints=None, **kwargs):
if asfrom or ashint:
- if isinstance(alias.name, sql._generated_label):
+ if isinstance(alias.name, sql._truncated_label):
alias_name = self._truncated_identifier("alias", alias.name)
else:
alias_name = alias.name
@@ -784,7 +784,7 @@ class SQLCompiler(engine.Compiled):
not column.is_literal and \
column.table is not None and \
not isinstance(column.table, sql.Select):
- return _CompileLabel(column, sql._generated_label(column.name))
+ return _CompileLabel(column, sql._as_truncated(column.name))
elif not isinstance(column,
(sql._UnaryExpression, sql._TextClause)) \
and (not hasattr(column, 'name') or \
@@ -1445,7 +1445,7 @@ class DDLCompiler(engine.Compiled):
return "\nDROP TABLE " + self.preparer.format_table(drop.element)
def _index_identifier(self, ident):
- if isinstance(ident, sql._generated_label):
+ if isinstance(ident, sql._truncated_label):
max = self.dialect.max_index_name_length or \
self.dialect.max_identifier_length
if len(ident) > max:
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 859ee0437..939456b9a 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -1283,14 +1283,48 @@ func = _FunctionGenerator()
# TODO: use UnaryExpression for this instead ?
modifier = _FunctionGenerator(group=False)
-class _generated_label(unicode):
- """A unicode subclass used to identify dynamically generated names."""
+class _truncated_label(unicode):
+ """A unicode subclass used to identify symbolic "
+ "names that may require truncation."""
-def _escape_for_generated(x):
- if isinstance(x, _generated_label):
- return x
+ def apply_map(self, map_):
+ return self
+
+# for backwards compatibility in case
+# someone is re-implementing the
+# _truncated_identifier() sequence in a custom
+# compiler
+_generated_label = _truncated_label
+
+class _anonymous_label(_truncated_label):
+ """A unicode subclass used to identify anonymously
+ generated names."""
+
+ def __add__(self, other):
+ return _anonymous_label(
+ unicode(self) +
+ unicode(other))
+
+ def __radd__(self, other):
+ return _anonymous_label(
+ unicode(other) +
+ unicode(self))
+
+ def apply_map(self, map_):
+ return self % map_
+
+def _as_truncated(value):
+ """coerce the given value to :class:`._truncated_label`.
+
+ Existing :class:`._truncated_label` and
+ :class:`._anonymous_label` objects are passed
+ unchanged.
+ """
+
+ if isinstance(value, _truncated_label):
+ return value
else:
- return x.replace('%', '%%')
+ return _truncated_label(value)
def _string_or_unprintable(element):
if isinstance(element, basestring):
@@ -2117,7 +2151,9 @@ class ColumnElement(ClauseElement, _CompareMixin):
else:
key = name
- co = ColumnClause(name, selectable, type_=getattr(self,
+ co = ColumnClause(_as_truncated(name),
+ selectable,
+ type_=getattr(self,
'type', None))
co.proxies = [self]
selectable._columns[key] = co
@@ -2165,7 +2201,7 @@ class ColumnElement(ClauseElement, _CompareMixin):
expressions and function calls.
"""
- return _generated_label('%%(%d %s)s' % (id(self), getattr(self,
+ return _anonymous_label('%%(%d %s)s' % (id(self), getattr(self,
'name', 'anon')))
class ColumnCollection(util.OrderedProperties):
@@ -2588,10 +2624,10 @@ class _BindParamClause(ColumnElement):
"""
if unique:
- self.key = _generated_label('%%(%d %s)s' % (id(self), key
+ self.key = _anonymous_label('%%(%d %s)s' % (id(self), key
or 'param'))
else:
- self.key = key or _generated_label('%%(%d param)s'
+ self.key = key or _anonymous_label('%%(%d param)s'
% id(self))
# identifiying key that won't change across
@@ -2639,14 +2675,14 @@ class _BindParamClause(ColumnElement):
def _clone(self):
c = ClauseElement._clone(self)
if self.unique:
- c.key = _generated_label('%%(%d %s)s' % (id(c), c._orig_key
+ c.key = _anonymous_label('%%(%d %s)s' % (id(c), c._orig_key
or 'param'))
return c
def _convert_to_unique(self):
if not self.unique:
self.unique = True
- self.key = _generated_label('%%(%d %s)s' % (id(self),
+ self.key = _anonymous_label('%%(%d %s)s' % (id(self),
self._orig_key or 'param'))
def compare(self, other, **kw):
@@ -3615,7 +3651,7 @@ class Alias(FromClause):
if name is None:
if self.original.named_with_column:
name = getattr(self.original, 'name', None)
- name = _generated_label('%%(%d %s)s' % (id(self), name
+ name = _anonymous_label('%%(%d %s)s' % (id(self), name
or 'anon'))
self.name = name
@@ -3816,7 +3852,7 @@ class _Label(ColumnElement):
while isinstance(element, _Label):
element = element.element
self.name = self.key = self._label = name \
- or _generated_label('%%(%d %s)s' % (id(self),
+ or _anonymous_label('%%(%d %s)s' % (id(self),
getattr(element, 'name', 'anon')))
self._element = element
self._type = type_
@@ -3973,11 +4009,9 @@ class ColumnClause(_Immutable, ColumnElement):
elif t is not None and t.named_with_column:
if getattr(t, 'schema', None):
label = t.schema.replace('.', '_') + "_" + \
- _escape_for_generated(t.name) + "_" + \
- _escape_for_generated(self.name)
+ t.name + "_" + self.name
else:
- label = _escape_for_generated(t.name) + "_" + \
- _escape_for_generated(self.name)
+ label = t.name + "_" + self.name
# ensure the label name doesn't conflict with that
# of an existing column
@@ -3989,7 +4023,7 @@ class ColumnClause(_Immutable, ColumnElement):
counter += 1
label = _label
- return _generated_label(label)
+ return _as_truncated(label)
else:
return self.name
@@ -4018,7 +4052,7 @@ class ColumnClause(_Immutable, ColumnElement):
# otherwise its considered to be a label
is_literal = self.is_literal and (name is None or name == self.name)
c = self._constructor(
- name or self.name,
+ _as_truncated(name or self.name),
selectable=selectable,
type_=self.type,
is_literal=is_literal
diff --git a/test/aaa_profiling/test_zoomark_orm.py b/test/aaa_profiling/test_zoomark_orm.py
index 483ca335a..59391b499 100644
--- a/test/aaa_profiling/test_zoomark_orm.py
+++ b/test/aaa_profiling/test_zoomark_orm.py
@@ -340,7 +340,7 @@ class ZooMarkTest(fixtures.TestBase):
def test_profile_1a_populate(self):
self.test_baseline_1a_populate()
- @profiling.function_call_count(393, {'3.2':360})
+ @profiling.function_call_count(413, {'3.2':360})
def test_profile_2_insert(self):
self.test_baseline_2_insert()
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index 9f26d899f..ad4727b9b 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -1,8 +1,10 @@
-from test.lib.testing import assert_raises, assert_raises_message
+from test.lib.testing import assert_raises, assert_raises_message, eq_
+from test.lib.engines import testing_engine
+from test.lib import fixtures, AssertsCompiledSQL, testing
from sqlalchemy import *
from sqlalchemy import exc as exceptions
-from test.lib import *
from sqlalchemy.engine import default
+from sqlalchemy.sql import table, column
IDENT_LENGTH = 29
@@ -13,132 +15,187 @@ class LabelTypeTest(fixtures.TestBase):
Column('col1', Integer),
Column('col2', Float))
assert isinstance(t.c.col1.label('hi').type, Integer)
- assert isinstance(select([t.c.col2]).as_scalar().label('lala').type, Float)
+ assert isinstance(select([t.c.col2]).as_scalar().label('lala').type,
+ Float)
+
+class LongLabelsTest(fixtures.TablesTest, AssertsCompiledSQL):
+ run_inserts = 'once'
+ run_deletes = None
-class LongLabelsTest(fixtures.TestBase, AssertsCompiledSQL):
@classmethod
- def setup_class(cls):
- global metadata, table1, table2, maxlen
- metadata = MetaData(testing.db)
+ def define_tables(cls, metadata):
table1 = Table("some_large_named_table", metadata,
- Column("this_is_the_primarykey_column", Integer, Sequence("this_is_some_large_seq"), primary_key=True),
+ Column("this_is_the_primarykey_column", Integer,
+ primary_key=True),
Column("this_is_the_data_column", String(30))
)
table2 = Table("table_with_exactly_29_characs", metadata,
- Column("this_is_the_primarykey_column", Integer, Sequence("some_seq"), primary_key=True),
+ Column("this_is_the_primarykey_column", Integer,
+ primary_key=True),
Column("this_is_the_data_column", String(30))
)
+ cls.tables.table1 = table1
+ cls.tables.table2 = table2
- metadata.create_all()
+ @classmethod
+ def insert_data(cls):
+ table1 = cls.tables.table1
+ table2 = cls.tables.table2
+ for data in [
+ {"this_is_the_primarykey_column":1,
+ "this_is_the_data_column":"data1"},
+ {"this_is_the_primarykey_column":2,
+ "this_is_the_data_column":"data2"},
+ {"this_is_the_primarykey_column":3,
+ "this_is_the_data_column":"data3"},
+ {"this_is_the_primarykey_column":4,
+ "this_is_the_data_column":"data4"}
+ ]:
+ testing.db.execute(
+ table1.insert(),
+ **data
+ )
+ testing.db.execute(
+ table2.insert(),
+ {"this_is_the_primary_key_column":1,
+ "this_is_the_data_column":"data"}
+ )
- maxlen = testing.db.dialect.max_identifier_length
+ @classmethod
+ def setup_class(cls):
+ super(LongLabelsTest, cls).setup_class()
+ cls.maxlen = testing.db.dialect.max_identifier_length
testing.db.dialect.max_identifier_length = IDENT_LENGTH
- @engines.close_first
- def teardown(self):
- table1.delete().execute()
-
@classmethod
def teardown_class(cls):
- metadata.drop_all()
- testing.db.dialect.max_identifier_length = maxlen
+ testing.db.dialect.max_identifier_length = cls.maxlen
+ super(LongLabelsTest, cls).teardown_class()
def test_too_long_name_disallowed(self):
m = MetaData(testing.db)
- t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test", m, Column('foo', Integer))
+ t1 = Table("this_name_is_too_long_for_what_were_doing_in_this_test",
+ m, Column('foo', Integer))
assert_raises(exceptions.IdentifierError, m.create_all)
assert_raises(exceptions.IdentifierError, m.drop_all)
assert_raises(exceptions.IdentifierError, t1.create)
assert_raises(exceptions.IdentifierError, t1.drop)
- def test_result(self):
- table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"})
-
- s = table1.select(use_labels=True, order_by=[table1.c.this_is_the_primarykey_column])
- r = s.execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
+ def test_basic_result(self):
+ table1 = self.tables.table1
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column])
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
(1, "data1"),
(2, "data2"),
(3, "data3"),
(4, "data4"),
- ], repr(result)
+ ])
- # some dialects such as oracle (and possibly ms-sql in a future version)
+ def test_result_limit(self):
+ table1 = self.tables.table1
+ # some dialects such as oracle (and possibly ms-sql
+ # in a future version)
# generate a subquery for limits/offsets.
- # ensure that the generated result map corresponds to the selected table, not
+ # ensure that the generated result map corresponds
+ # to the selected table, not
# the select query
- r = s.limit(2).execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column]).\
+ limit(2)
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
(1, "data1"),
(2, "data2"),
- ], repr(result)
-
- @testing.requires.offset
- def go():
- r = s.limit(2).offset(1).execute()
- result = []
- for row in r:
- result.append((row[table1.c.this_is_the_primarykey_column], row[table1.c.this_is_the_data_column]))
- assert result == [
- (2, "data2"),
- (3, "data3"),
- ], repr(result)
- go()
-
- def test_table_alias_names(self):
+ ])
+
+ @testing.requires.offset
+ def test_result_limit_offset(self):
+ table1 = self.tables.table1
+ s = table1.select(use_labels=True,
+ order_by=[table1.c.this_is_the_primarykey_column]).\
+ limit(2).offset(1)
+
+ result = [
+ (row[table1.c.this_is_the_primarykey_column],
+ row[table1.c.this_is_the_data_column])
+ for row in testing.db.execute(s)
+ ]
+ eq_(result, [
+ (2, "data2"),
+ (3, "data3"),
+ ])
+
+ def test_table_alias_1(self):
+ table2 = self.tables.table2
if testing.against('oracle'):
self.assert_compile(
table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs table_with_exactly_29_c_1"
+ "SELECT table_with_exactly_29_c_1."
+ "this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "FROM table_with_exactly_29_characs "
+ "table_with_exactly_29_c_1"
)
else:
self.assert_compile(
table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
+ "SELECT table_with_exactly_29_c_1."
+ "this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "FROM table_with_exactly_29_characs AS "
+ "table_with_exactly_29_c_1"
)
+ def test_table_alias_2(self):
+ table1 = self.tables.table1
+ table2 = self.tables.table2
ta = table2.alias()
dialect = default.DefaultDialect()
dialect.max_identifier_length = IDENT_LENGTH
self.assert_compile(
- select([table1, ta]).select_from(table1.join(ta, table1.c.this_is_the_data_column==ta.c.this_is_the_data_column)).\
+ select([table1, ta]).select_from(
+ table1.join(ta,
+ table1.c.this_is_the_data_column==
+ ta.c.this_is_the_data_column)).\
where(ta.c.this_is_the_data_column=='data3'),
- "SELECT some_large_named_table.this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column, "
- "table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM "
- "some_large_named_table JOIN table_with_exactly_29_characs AS table_with_exactly_29_c_1 ON "
- "some_large_named_table.this_is_the_data_column = table_with_exactly_29_c_1.this_is_the_data_column "
- "WHERE table_with_exactly_29_c_1.this_is_the_data_column = :this_is_the_data_column_1",
+ "SELECT some_large_named_table.this_is_the_primarykey_column, "
+ "some_large_named_table.this_is_the_data_column, "
+ "table_with_exactly_29_c_1.this_is_the_primarykey_column, "
+ "table_with_exactly_29_c_1.this_is_the_data_column FROM "
+ "some_large_named_table JOIN table_with_exactly_29_characs "
+ "AS table_with_exactly_29_c_1 ON "
+ "some_large_named_table.this_is_the_data_column = "
+ "table_with_exactly_29_c_1.this_is_the_data_column "
+ "WHERE table_with_exactly_29_c_1.this_is_the_data_column = "
+ ":this_is_the_data_column_1",
dialect=dialect
)
- table2.insert().execute(
- {"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"},
- {"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"},
- {"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"},
- {"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"},
+ def test_table_alias_3(self):
+ table2 = self.tables.table2
+ eq_(
+ testing.db.execute(table2.alias().select()).first(),
+ (1, "data")
)
- r = table2.alias().select().execute()
- assert r.fetchall() == [(x, "data%d" % x) for x in range(1, 5)]
-
def test_colbinds(self):
- table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"})
-
- r = table1.select(table1.c.this_is_the_primarykey_column == 4).execute()
+ table1 = self.tables.table1
+ r = table1.select(table1.c.this_is_the_primarykey_column == 4).\
+ execute()
assert r.fetchall() == [(4, "data4")]
r = table1.select(or_(
@@ -147,59 +204,189 @@ class LongLabelsTest(fixtures.TestBase, AssertsCompiledSQL):
)).execute()
assert r.fetchall() == [(2, "data2"), (4, "data4")]
+ @testing.provide_metadata
def test_insert_no_pk(self):
- table1.insert().execute(**{"this_is_the_data_column":"data1"})
- table1.insert().execute(**{"this_is_the_data_column":"data2"})
- table1.insert().execute(**{"this_is_the_data_column":"data3"})
- table1.insert().execute(**{"this_is_the_data_column":"data4"})
+ t = Table("some_other_large_named_table", self.metadata,
+ Column("this_is_the_primarykey_column", Integer,
+ Sequence("this_is_some_large_seq"),
+ primary_key=True),
+ Column("this_is_the_data_column", String(30))
+ )
+ t.create(testing.db, checkfirst=True)
+ testing.db.execute(t.insert(),
+ **{"this_is_the_data_column":"data1"})
@testing.requires.subqueries
def test_subquery(self):
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
- x = select([q])
- print x.execute().fetchall()
+ table1 = self.tables.table1
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).\
+ alias('foo')
+ eq_(
+ list(testing.db.execute(select([q]))),
+ [(4, u'data4')]
+ )
@testing.requires.subqueries
def test_anon_alias(self):
+ table1 = self.tables.table1
compile_dialect = default.DefaultDialect()
compile_dialect.max_identifier_length = IDENT_LENGTH
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
- self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column AS anon_1_this_is_the_data_2 "
- "FROM (SELECT some_large_named_table.this_is_the_primarykey_column AS this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column AS this_is_the_data_column "
+ self.assert_compile(x,
+ "SELECT anon_1.this_is_the_primarykey_column AS "
+ "anon_1_this_is_the_prim_1, anon_1.this_is_the_data_column "
+ "AS anon_1_this_is_the_data_2 "
+ "FROM (SELECT some_large_named_table."
+ "this_is_the_primarykey_column AS "
+ "this_is_the_primarykey_column, "
+ "some_large_named_table.this_is_the_data_column "
+ "AS this_is_the_data_column "
"FROM some_large_named_table "
- "WHERE some_large_named_table.this_is_the_primarykey_column = :this_is_the_primarykey__1) AS anon_1", dialect=compile_dialect)
+ "WHERE some_large_named_table.this_is_the_primarykey_column "
+ "= :this_is_the_primarykey__1) AS anon_1",
+ dialect=compile_dialect)
- print x.execute().fetchall()
+ eq_(
+ list(testing.db.execute(x)),
+ [(4, u'data4')]
+ )
def test_adjustable(self):
+ table1 = self.tables.table1
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
x = select([q])
compile_dialect = default.DefaultDialect(label_length=10)
- self.assert_compile(x, "SELECT foo.this_is_the_primarykey_column, foo.this_is_the_data_column FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS this_1, some_large_named_table.this_is_the_data_column "
- "AS this_2 FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo", dialect=compile_dialect)
+ self.assert_compile(x,
+ "SELECT foo.this_1, foo.this_2 FROM "
+ "(SELECT some_large_named_table."
+ "this_is_the_primarykey_column AS this_1, "
+ "some_large_named_table.this_is_the_data_column AS this_2 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :this_1) AS foo",
+ dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
- self.assert_compile(x, "SELECT foo.this_is_the_primarykey_column, foo.this_is_the_data_column FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS _1, some_large_named_table.this_is_the_data_column AS _2 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS foo", dialect=compile_dialect)
+ self.assert_compile(x, "SELECT foo._1, foo._2 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS _1, some_large_named_table.this_is_the_data_column AS _2 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :_1) AS foo",
+ dialect=compile_dialect)
q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
x = select([q], use_labels=True)
compile_dialect = default.DefaultDialect(label_length=10)
- self.assert_compile(x, "SELECT anon_1.this_is_the_primarykey_column AS anon_1, anon_1.this_is_the_data_column AS anon_2 FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS this_3, some_large_named_table.this_is_the_data_column AS this_4 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1", dialect=compile_dialect)
+ self.assert_compile(x,
+ "SELECT anon_1.this_2 AS anon_1, anon_1.this_4 AS anon_3 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS this_2, some_large_named_table.this_is_the_data_column AS this_4 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :this_1) AS anon_1",
+ dialect=compile_dialect)
compile_dialect = default.DefaultDialect(label_length=4)
- self.assert_compile(x, "SELECT _1.this_is_the_primarykey_column AS _1, _1.this_is_the_data_column AS _2 FROM "
- "(SELECT some_large_named_table.this_is_the_primarykey_column AS _3, some_large_named_table.this_is_the_data_column AS _4 "
- "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", dialect=compile_dialect)
+ self.assert_compile(x, "SELECT _1._2 AS _1, _1._4 AS _3 FROM "
+ "(SELECT some_large_named_table.this_is_the_primarykey_column "
+ "AS _2, some_large_named_table.this_is_the_data_column AS _4 "
+ "FROM some_large_named_table WHERE "
+ "some_large_named_table.this_is_the_primarykey_column = :_1) AS _1",
+ dialect=compile_dialect)
+
+ def test_adjustable_result_schema_column(self):
+ table1 = self.tables.table1
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ x = select([q])
+ e = testing_engine(options={"label_length":10})
+ e.pool = testing.db.pool
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+ eq_(row['this_1'], 4)
+
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+
+ def test_adjustable_result_lightweight_column(self):
+
+ table1 = table("some_large_named_table",
+ column("this_is_the_primarykey_column"),
+ column("this_is_the_data_column")
+ )
+
+ q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias('foo')
+ x = select([q])
+
+ e = testing_engine(options={"label_length":10})
+ e.pool = testing.db.pool
+ row = e.execute(x).first()
+ eq_(row.this_is_the_primarykey_column, 4)
+ eq_(row.this_1, 4)
+
+ def test_table_plus_column_exceeds_length(self):
+ """test that the truncation occurs if tablename / colname are only
+ greater than the max when concatenated."""
+
+ compile_dialect = default.DefaultDialect(label_length=30)
+ m = MetaData()
+ a_table = Table(
+ 'thirty_characters_table_xxxxxx',
+ m,
+ Column('id', Integer, primary_key=True)
+ )
+
+ other_table = Table(
+ 'other_thirty_characters_table_',
+ m,
+ Column('id', Integer, primary_key=True),
+ Column('thirty_characters_table_id',
+ Integer,
+ ForeignKey('thirty_characters_table_xxxxxx.id'),
+ primary_key=True
+ )
+ )
+
+ anon = a_table.alias()
+ self.assert_compile(
+ select([other_table,anon]).
+ select_from(
+ other_table.outerjoin(anon)
+ ).apply_labels(),
+ "SELECT other_thirty_characters_table_.id AS "
+ "other_thirty_characters__1, "
+ "other_thirty_characters_table_.thirty_characters_table_id "
+ "AS other_thirty_characters__2, thirty_characters_table__1.id "
+ "AS thirty_characters_table__3 "
+ "FROM other_thirty_characters_table_ "
+ "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
+ "AS thirty_characters_table__1 ON "
+ "thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
+ dialect=compile_dialect)
+
+ self.assert_compile(
+ select([other_table, anon]).
+ select_from(
+ other_table.outerjoin(anon)
+ ).apply_labels(),
+ "SELECT other_thirty_characters_table_.id AS "
+ "other_thirty_characters__1, "
+ "other_thirty_characters_table_.thirty_characters_table_id "
+ "AS other_thirty_characters__2, "
+ "thirty_characters_table__1.id AS thirty_characters_table__3 "
+ "FROM other_thirty_characters_table_ "
+ "LEFT OUTER JOIN thirty_characters_table_xxxxxx "
+ "AS thirty_characters_table__1 ON "
+ "thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
+ dialect=compile_dialect
+ )
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 9725d3d3a..f9ec82a6a 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -1094,12 +1094,16 @@ class PercentSchemaNamesTest(fixtures.TestBase):
@classmethod
def setup_class(cls):
- global percent_table, metadata
+ global percent_table, metadata, lightweight_percent_table
metadata = MetaData(testing.db)
percent_table = Table('percent%table', metadata,
Column("percent%", Integer),
Column("spaces % more spaces", Integer),
)
+ lightweight_percent_table = sql.table('percent%table',
+ sql.column("percent%"),
+ sql.column("spaces % more spaces"),
+ )
metadata.create_all()
def teardown(self):
@@ -1109,7 +1113,8 @@ class PercentSchemaNamesTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.skip_if(lambda: testing.against('postgresql'), "psycopg2 2.4 no longer accepts % in bind placeholders")
+ @testing.skip_if(lambda: testing.against('postgresql'),
+ "psycopg2 2.4 no longer accepts % in bind placeholders")
def test_single_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, 'spaces % more spaces':12},
@@ -1125,8 +1130,10 @@ class PercentSchemaNamesTest(fixtures.TestBase):
)
self._assert_table()
- @testing.skip_if(lambda: testing.against('postgresql'), "psycopg2 2.4 no longer accepts % in bind placeholders")
- @testing.crashes('mysql+mysqldb', 'MySQLdb handles executemany() inconsistently vs. execute()')
+ @testing.skip_if(lambda: testing.against('postgresql'),
+ "psycopg2 2.4 no longer accepts % in bind placeholders")
+ @testing.crashes('mysql+mysqldb', "MySQLdb handles executemany() "
+ "inconsistently vs. execute()")
def test_executemany_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, 'spaces % more spaces':12},
@@ -1139,9 +1146,17 @@ class PercentSchemaNamesTest(fixtures.TestBase):
self._assert_table()
def _assert_table(self):
- for table in (percent_table, percent_table.alias()):
+ for table in (
+ percent_table,
+ percent_table.alias(),
+ lightweight_percent_table,
+ lightweight_percent_table.alias()):
eq_(
- table.select().order_by(table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ table.select().order_by(table.c['percent%'])
+ )
+ ),
[
(5, 12),
(7, 11),
@@ -1151,28 +1166,39 @@ class PercentSchemaNamesTest(fixtures.TestBase):
)
eq_(
- table.select().
- where(table.c['spaces % more spaces'].in_([9, 10])).
- order_by(table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ table.select().
+ where(table.c['spaces % more spaces'].in_([9, 10])).
+ order_by(table.c['percent%']),
+ )
+ ),
[
(9, 10),
(11, 9)
]
)
- result = table.select().order_by(table.c['percent%']).execute()
- row = result.fetchone()
+ row = testing.db.execute(table.select().\
+ order_by(table.c['percent%'])).first()
+ eq_(row['percent%'], 5)
+ eq_(row['spaces % more spaces'], 12)
+
eq_(row[table.c['percent%']], 5)
eq_(row[table.c['spaces % more spaces']], 12)
- row = result.fetchone()
- eq_(row['percent%'], 7)
- eq_(row['spaces % more spaces'], 11)
- result.close()
- percent_table.update().values({percent_table.c['spaces % more spaces']:15}).execute()
+ percent_table.update().values(
+ {percent_table.c['spaces % more spaces']:15}
+ ).execute()
eq_(
- percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(),
+ list(
+ testing.db.execute(
+ percent_table.\
+ select().\
+ order_by(percent_table.c['percent%'])
+ )
+ ),
[
(5, 15),
(7, 15),