summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES22
-rw-r--r--doc/build/dbengine.rst2
-rw-r--r--lib/sqlalchemy/connectors/mxodbc.py5
-rw-r--r--lib/sqlalchemy/dialects/access/base.py4
-rw-r--r--lib/sqlalchemy/dialects/maxdb/base.py4
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py44
-rw-r--r--lib/sqlalchemy/dialects/mssql/mxodbc.py11
-rw-r--r--lib/sqlalchemy/dialects/mssql/pyodbc.py13
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py4
-rw-r--r--lib/sqlalchemy/dialects/sybase/base.py32
-rw-r--r--lib/sqlalchemy/dialects/sybase/pyodbc.py7
-rw-r--r--lib/sqlalchemy/engine/default.py1
-rw-r--r--lib/sqlalchemy/sql/compiler.py141
-rw-r--r--test/dialect/test_mssql.py55
-rw-r--r--test/sql/test_query.py50
-rw-r--r--test/sql/test_select.py211
16 files changed, 452 insertions, 154 deletions
diff --git a/CHANGES b/CHANGES
index 71db59899..a8f35cf22 100644
--- a/CHANGES
+++ b/CHANGES
@@ -146,6 +146,23 @@ CHANGES
including their ddl listener and other event callables.
[ticket:1694] [ticket:1698]
+ - Some platforms will now interpret certain literal values
+ as non-bind parameters, rendered literally into the SQL
+ statement. This to support strict SQL-92 rules that are
+ enforced by some platforms including MS-SQL and Sybase.
+ In this model, bind parameters aren't allowed in the
+ columns clause of a SELECT, nor are certain ambiguous
+ expressions like "?=?". When this mode is enabled, the base
+ compiler will render the binds as inline literals, but only across
+ strings and numeric values. Other types such as dates
+ will raise an error, unless the dialect subclass defines
+ a literal rendering function for those. The bind parameter
+ must have an embedded literal value already or an error
+ is raised (i.e. won't work with straight bindparam('x')).
+ Dialects can also expand upon the areas where binds are not
+ accepted, such as within argument lists of functions
+ (which don't work on MS-SQL when native SQL binding is used).
+
- Added "unicode_errors" parameter to String, Unicode, etc.
Behaves like the 'errors' keyword argument to
the standard library's string.decode() functions. This flag
@@ -372,8 +389,9 @@ CHANGES
will be returned as a string. [ticket:1685]
- sybase
- - Implemented a preliminary working dialect for Sybase
- based on the Python-Sybase driver. Handles table
+ - Implemented a preliminary working dialect for Sybase,
+ with sub-implementations for Python-Sybase as well
+ as Pyodbc. Handles table
creates/drops and basic round trip functionality.
Does not yet include reflection or comprehensive
support of unicode/special expressions/etc.
diff --git a/doc/build/dbengine.rst b/doc/build/dbengine.rst
index 5c61a67d8..261c54c10 100644
--- a/doc/build/dbengine.rst
+++ b/doc/build/dbengine.rst
@@ -118,7 +118,7 @@ sqlite3_ ``sqlite+pysqlite``\* yes yes
**Sybase ASE**
-------------------------------------------------------------------------------------------------------------------------------
mxodbc_ ``sybase+mxodbc`` development development no yes yes
-pyodbc_ ``sybase+pyodbc`` development development no unknown unknown
+pyodbc_ ``sybase+pyodbc`` partial development no unknown unknown
python-sybase_ ``sybase+pysybase``\* partial development no yes yes
========================= =========================== =========== =========== =========== ================= ============
diff --git a/lib/sqlalchemy/connectors/mxodbc.py b/lib/sqlalchemy/connectors/mxodbc.py
index 29b047d23..68b88019c 100644
--- a/lib/sqlalchemy/connectors/mxodbc.py
+++ b/lib/sqlalchemy/connectors/mxodbc.py
@@ -96,9 +96,4 @@ class MxODBCConnector(Connector):
version.append(n)
return tuple(version)
- def do_execute(self, cursor, statement, parameters, context=None):
- # TODO: dont need tuple() here
- # TODO: use cursor.execute()
- cursor.executedirect(statement, tuple(parameters))
-
diff --git a/lib/sqlalchemy/dialects/access/base.py b/lib/sqlalchemy/dialects/access/base.py
index c10e77011..7dfb3153e 100644
--- a/lib/sqlalchemy/dialects/access/base.py
+++ b/lib/sqlalchemy/dialects/access/base.py
@@ -371,9 +371,9 @@ class AccessCompiler(compiler.SQLCompiler):
return (self.process(join.left, asfrom=True) + (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
class AccessDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
diff --git a/lib/sqlalchemy/dialects/maxdb/base.py b/lib/sqlalchemy/dialects/maxdb/base.py
index 504c31209..758cfaf05 100644
--- a/lib/sqlalchemy/dialects/maxdb/base.py
+++ b/lib/sqlalchemy/dialects/maxdb/base.py
@@ -558,8 +558,8 @@ class MaxDBCompiler(compiler.SQLCompiler):
return labels
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
# ORDER BY clauses in DISTINCT queries must reference aliased
# inner columns by alias name, not true column name.
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index 254aa54fd..4d697854f 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -843,8 +843,9 @@ class MSExecutionContext(default.DefaultExecutionContext):
class MSSQLCompiler(compiler.SQLCompiler):
returning_precedes_values = True
- extract_map = compiler.SQLCompiler.extract_map.copy()
- extract_map.update ({
+ extract_map = util.update_copy(
+ compiler.SQLCompiler.extract_map,
+ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond',
@@ -937,9 +938,9 @@ class MSSQLCompiler(compiler.SQLCompiler):
kwargs['mssql_aliased'] = True
return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def visit_rollback_to_savepoint(self, savepoint_stmt):
return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
@@ -1011,8 +1012,8 @@ class MSSQLCompiler(compiler.SQLCompiler):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
# MSSQL only allows ORDER BY in subqueries if there is a LIMIT
if order_by and (not self.is_subquery() or select._limit):
@@ -1020,6 +1021,37 @@ class MSSQLCompiler(compiler.SQLCompiler):
else:
return ""
+class MSSQLStrictCompiler(MSSQLCompiler):
+ """A subclass of MSSQLCompiler which disables the usage of bind
+ parameters where not allowed natively by MS-SQL.
+
+ A dialect may use this compiler on a platform where native
+ binds are used.
+
+ """
+ ansi_bind_rules = True
+
+ def visit_in_op(self, binary, **kw):
+ kw['literal_binds'] = True
+ return "%s IN %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
+ def visit_notin_op(self, binary, **kw):
+ kw['literal_binds'] = True
+ return "%s NOT IN %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
+ def visit_function(self, func, **kw):
+ kw['literal_binds'] = True
+ return super(MSSQLStrictCompiler, self).visit_function(func, **kw)
+
+ #def render_literal_value(self, value):
+ # TODO! use mxODBC's literal quoting services here
+
class MSDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
diff --git a/lib/sqlalchemy/dialects/mssql/mxodbc.py b/lib/sqlalchemy/dialects/mssql/mxodbc.py
index bf14601b8..59cf65d63 100644
--- a/lib/sqlalchemy/dialects/mssql/mxodbc.py
+++ b/lib/sqlalchemy/dialects/mssql/mxodbc.py
@@ -4,9 +4,10 @@ import sys
from sqlalchemy import types as sqltypes
from sqlalchemy.connectors.mxodbc import MxODBCConnector
from sqlalchemy.dialects.mssql.pyodbc import MSExecutionContext_pyodbc
-from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
-
+from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect, \
+ MSSQLCompiler, MSSQLStrictCompiler
+
class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
"""
The pyodbc execution context is useful for enabling
@@ -20,7 +21,11 @@ class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
class MSDialect_mxodbc(MxODBCConnector, MSDialect):
execution_ctx_cls = MSExecutionContext_mxodbc
-
+
+ # TODO: may want to use this only if FreeTDS is not in use,
+ # since FreeTDS doesn't seem to use native binds.
+ statement_compiler = MSSQLStrictCompiler
+
def __init__(self, description_encoding='latin-1', **params):
super(MSDialect_mxodbc, self).__init__(**params)
self.description_encoding = description_encoding
diff --git a/lib/sqlalchemy/dialects/mssql/pyodbc.py b/lib/sqlalchemy/dialects/mssql/pyodbc.py
index 54b43320a..34050271f 100644
--- a/lib/sqlalchemy/dialects/mssql/pyodbc.py
+++ b/lib/sqlalchemy/dialects/mssql/pyodbc.py
@@ -1,3 +1,16 @@
+"""
+Support for MS-SQL via pyodbc.
+
+http://pypi.python.org/pypi/pyodbc/
+
+Connect strings are of the form::
+
+ mssql+pyodbc://<username>:<password>@<dsn>/
+ mssql+pyodbc://<username>:<password>@<host>/<database>
+
+
+"""
+
from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
from sqlalchemy.connectors.pyodbc import PyODBCConnector
from sqlalchemy import types as sqltypes
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index 37e63fbc1..98df8d0cb 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -218,10 +218,10 @@ class SQLiteCompiler(compiler.SQLCompiler):
else:
return self.process(cast.clause)
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
try:
return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
- self.extract_map[extract.field], self.process(extract.expr))
+ self.extract_map[extract.field], self.process(extract.expr, **kw))
except KeyError:
raise exc.ArgumentError(
"%s is not a valid extract argument." % extract.field)
diff --git a/lib/sqlalchemy/dialects/sybase/base.py b/lib/sqlalchemy/dialects/sybase/base.py
index 5d20faaf9..c440015d0 100644
--- a/lib/sqlalchemy/dialects/sybase/base.py
+++ b/lib/sqlalchemy/dialects/sybase/base.py
@@ -236,9 +236,11 @@ class SybaseExecutionContext(default.DefaultExecutionContext):
return lastrowid
class SybaseSQLCompiler(compiler.SQLCompiler):
+ ansi_bind_rules = True
- extract_map = compiler.SQLCompiler.extract_map.copy()
- extract_map.update ({
+ extract_map = util.update_copy(
+ compiler.SQLCompiler.extract_map,
+ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond'
@@ -267,33 +269,17 @@ class SybaseSQLCompiler(compiler.SQLCompiler):
# Limit in sybase is after the select keyword
return ""
- def dont_visit_binary(self, binary):
- """Move bind parameters to the right-hand side of an operator, where possible."""
- if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq:
- return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator))
- else:
- return super(SybaseSQLCompiler, self).visit_binary(binary)
-
- def dont_label_select_column(self, select, column, asfrom):
- if isinstance(column, expression.Function):
- return column.label(None)
- else:
- return super(SybaseSQLCompiler, self).label_select_column(select, column, asfrom)
-
-# def visit_getdate_func(self, fn, **kw):
- # TODO: need to cast? something ?
-# pass
-
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def for_update_clause(self, select):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ kw['literal_binds'] = True
+ order_by = self.process(select._order_by_clause, **kw)
# SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT
if order_by and (not self.is_subquery() or select._limit):
diff --git a/lib/sqlalchemy/dialects/sybase/pyodbc.py b/lib/sqlalchemy/dialects/sybase/pyodbc.py
index 4f89fe334..1bfdb6151 100644
--- a/lib/sqlalchemy/dialects/sybase/pyodbc.py
+++ b/lib/sqlalchemy/dialects/sybase/pyodbc.py
@@ -1,7 +1,12 @@
"""
Support for Sybase via pyodbc.
-This dialect is a stub only and is likely non functional at this time.
+http://pypi.python.org/pypi/pyodbc/
+
+Connect strings are of the form::
+
+ sybase+pyodbc://<username>:<password>@<dsn>/
+ sybase+pyodbc://<username>:<password>@<host>/<database>
"""
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index ce24a9ae4..2ef8fd104 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -62,6 +62,7 @@ class DefaultDialect(base.Dialect):
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
dbapi_type_map = {}
+ colspecs = {}
default_paramstyle = 'named'
supports_default_values = False
supports_empty_insert = True
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index be3375def..4e9175ae8 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -26,6 +26,7 @@ import re
from sqlalchemy import schema, engine, util, exc
from sqlalchemy.sql import operators, functions, util as sql_util, visitors
from sqlalchemy.sql import expression as sql
+import decimal
RESERVED_WORDS = set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
@@ -183,6 +184,12 @@ class SQLCompiler(engine.Compiled):
# clauses before the VALUES or WHERE clause (i.e. MSSQL)
returning_precedes_values = False
+ # SQL 92 doesn't allow bind parameters to be used
+ # in the columns clause of a SELECT, nor does it allow
+ # ambiguous expressions like "? = ?". A compiler
+ # subclass can set this flag to False if the target
+ # driver/DB enforces this
+ ansi_bind_rules = False
def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
@@ -260,9 +267,14 @@ class SQLCompiler(engine.Compiled):
else:
if bindparam.required:
if _group_number:
- raise exc.InvalidRequestError("A value is required for bind parameter %r, in parameter group %d" % (bindparam.key, _group_number))
+ raise exc.InvalidRequestError(
+ "A value is required for bind parameter %r, "
+ "in parameter group %d" %
+ (bindparam.key, _group_number))
else:
- raise exc.InvalidRequestError("A value is required for bind parameter %r" % bindparam.key)
+ raise exc.InvalidRequestError(
+ "A value is required for bind parameter %r"
+ % bindparam.key)
elif util.callable(bindparam.value):
pd[name] = bindparam.value()
else:
@@ -290,10 +302,10 @@ class SQLCompiler(engine.Compiled):
"""
return ""
- def visit_grouping(self, grouping, **kwargs):
- return "(" + self.process(grouping.element) + ")"
+ def visit_grouping(self, grouping, asfrom=False, **kwargs):
+ return "(" + self.process(grouping.element, **kwargs) + ")"
- def visit_label(self, label, result_map=None, within_columns_clause=False):
+ def visit_label(self, label, result_map=None, within_columns_clause=False, **kw):
# only render labels within the columns clause
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
@@ -305,11 +317,15 @@ class SQLCompiler(engine.Compiled):
result_map[labelname.lower()] = \
(label.name, (label, label.element, labelname), label.element.type)
- return self.process(label.element) + \
+ return self.process(label.element,
+ within_columns_clause=within_columns_clause,
+ **kw) + \
OPERATORS[operators.as_] + \
self.preparer.format_label(label, labelname)
else:
- return self.process(label.element)
+ return self.process(label.element,
+ within_columns_clause=within_columns_clause,
+ **kw)
def visit_column(self, column, result_map=None, **kwargs):
name = column.name
@@ -384,27 +400,28 @@ class SQLCompiler(engine.Compiled):
sep = " "
else:
sep = OPERATORS[clauselist.operator]
- return sep.join(s for s in (self.process(c) for c in clauselist.clauses)
+ return sep.join(s for s in (self.process(c, **kwargs) for c in clauselist.clauses)
if s is not None)
def visit_case(self, clause, **kwargs):
x = "CASE "
if clause.value is not None:
- x += self.process(clause.value) + " "
+ x += self.process(clause.value, **kwargs) + " "
for cond, result in clause.whens:
- x += "WHEN " + self.process(cond) + " THEN " + self.process(result) + " "
+ x += "WHEN " + self.process(cond, **kwargs) + \
+ " THEN " + self.process(result, **kwargs) + " "
if clause.else_ is not None:
- x += "ELSE " + self.process(clause.else_) + " "
+ x += "ELSE " + self.process(clause.else_, **kwargs) + " "
x += "END"
return x
def visit_cast(self, cast, **kwargs):
return "CAST(%s AS %s)" % \
- (self.process(cast.clause), self.process(cast.typeclause))
+ (self.process(cast.clause, **kwargs), self.process(cast.typeclause, **kwargs))
def visit_extract(self, extract, **kwargs):
field = self.extract_map.get(extract.field, extract.field)
- return "EXTRACT(%s FROM %s)" % (field, self.process(extract.expr))
+ return "EXTRACT(%s FROM %s)" % (field, self.process(extract.expr, **kwargs))
def visit_function(self, func, result_map=None, **kwargs):
if result_map is not None:
@@ -421,22 +438,23 @@ class SQLCompiler(engine.Compiled):
def function_argspec(self, func, **kwargs):
return self.process(func.clause_expr, **kwargs)
- def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):
+ def visit_compound_select(self, cs, asfrom=False, parens=True, compound_index=1, **kwargs):
entry = self.stack and self.stack[-1] or {}
self.stack.append({'from':entry.get('from', None), 'iswrapper':True})
keyword = self.compound_keywords.get(cs.keyword)
text = (" " + keyword + " ").join(
- (self.process(c, asfrom=asfrom, parens=False, compound_index=i)
+ (self.process(c, asfrom=asfrom, parens=False,
+ compound_index=i, **kwargs)
for i, c in enumerate(cs.selects))
)
- group_by = self.process(cs._group_by_clause, asfrom=asfrom)
+ group_by = self.process(cs._group_by_clause, asfrom=asfrom, **kwargs)
if group_by:
text += " GROUP BY " + group_by
- text += self.order_by_clause(cs)
+ text += self.order_by_clause(cs, **kwargs)
text += (cs._limit is not None or cs._offset is not None) and self.limit_clause(cs) or ""
self.stack.pop(-1)
@@ -453,32 +471,47 @@ class SQLCompiler(engine.Compiled):
s = s + OPERATORS[unary.modifier]
return s
- def visit_binary(self, binary, **kwargs):
-
+ def visit_binary(self, binary, **kw):
+ # don't allow "? = ?" to render
+ if self.ansi_bind_rules and \
+ isinstance(binary.left, sql._BindParamClause) and \
+ isinstance(binary.right, sql._BindParamClause):
+ kw['literal_binds'] = True
+
return self._operator_dispatch(binary.operator,
binary,
- lambda opstr: self.process(binary.left) + opstr + self.process(binary.right),
- **kwargs
+ lambda opstr: self.process(binary.left, **kw) +
+ opstr +
+ self.process(binary.right, **kw),
+ **kw
)
def visit_like_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s LIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s LIKE %s' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_notlike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s NOT LIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s NOT LIKE %s' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_ilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return 'lower(%s) LIKE lower(%s)' % (self.process(binary.left), self.process(binary.right)) \
+ return 'lower(%s) LIKE lower(%s)' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_notilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return 'lower(%s) NOT LIKE lower(%s)' % (self.process(binary.left), self.process(binary.right)) \
+ return 'lower(%s) NOT LIKE lower(%s)' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def _operator_dispatch(self, operator, element, fn, **kw):
@@ -491,7 +524,16 @@ class SQLCompiler(engine.Compiled):
else:
return fn(" " + operator + " ")
- def visit_bindparam(self, bindparam, **kwargs):
+ def visit_bindparam(self, bindparam, within_columns_clause=False,
+ literal_binds=False, **kwargs):
+ if literal_binds or \
+ (within_columns_clause and \
+ self.ansi_bind_rules):
+ if bindparam.value is None:
+ raise exc.CompileError("Bind parameter without a "
+ "renderable value not allowed here.")
+ return self.render_literal_bindparam(bindparam, within_columns_clause=True, **kwargs)
+
name = self._truncate_bindparam(bindparam)
if name in self.binds:
existing = self.binds[name]
@@ -510,7 +552,36 @@ class SQLCompiler(engine.Compiled):
self.binds[bindparam.key] = self.binds[name] = bindparam
return self.bindparam_string(name)
-
+
+ def render_literal_bindparam(self, bindparam, **kw):
+ value = bindparam.value
+ processor = bindparam.bind_processor(self.dialect)
+ if processor:
+ value = processor(value)
+ return self.render_literal_value(value, bindparam.type)
+
+ def render_literal_value(self, value, type_):
+ """Render the value of a bind parameter as a quoted literal.
+
+ This is used for statement sections that do not accept bind paramters
+ on the target driver/database.
+
+ This should be implemented by subclasses using the quoting services
+ of the DBAPI.
+
+ """
+ if isinstance(value, basestring):
+ value = value.replace("'", "''")
+ return "'%s'" % value
+ elif value is None:
+ return "NULL"
+ elif isinstance(value, (float, int, long)):
+ return repr(value)
+ elif isinstance(value, decimal.Decimal):
+ return str(value)
+ else:
+ raise NotImplementedError("Don't know how to literal-quote value %r" % value)
+
def _truncate_bindparam(self, bindparam):
if bindparam in self.bind_names:
return self.bind_names[bindparam]
@@ -624,33 +695,33 @@ class SQLCompiler(engine.Compiled):
text = "SELECT " # we're off to a good start !
if select._prefixes:
- text += " ".join(self.process(x) for x in select._prefixes) + " "
+ text += " ".join(self.process(x, **kwargs) for x in select._prefixes) + " "
text += self.get_select_precolumns(select)
text += ', '.join(inner_columns)
if froms:
text += " \nFROM "
- text += ', '.join(self.process(f, asfrom=True) for f in froms)
+ text += ', '.join(self.process(f, asfrom=True, **kwargs) for f in froms)
else:
text += self.default_from()
if select._whereclause is not None:
- t = self.process(select._whereclause)
+ t = self.process(select._whereclause, **kwargs)
if t:
text += " \nWHERE " + t
if select._group_by_clause.clauses:
- group_by = self.process(select._group_by_clause)
+ group_by = self.process(select._group_by_clause, **kwargs)
if group_by:
text += " GROUP BY " + group_by
if select._having is not None:
- t = self.process(select._having)
+ t = self.process(select._having, **kwargs)
if t:
text += " \nHAVING " + t
if select._order_by_clause.clauses:
- text += self.order_by_clause(select)
+ text += self.order_by_clause(select, **kwargs)
if select._limit is not None or select._offset is not None:
text += self.limit_clause(select)
if select.for_update:
@@ -670,8 +741,8 @@ class SQLCompiler(engine.Compiled):
"""
return select._distinct and "DISTINCT " or ""
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
if order_by:
return " ORDER BY " + order_by
else:
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
index 89a3af5fb..8092d8cdc 100644
--- a/test/dialect/test_mssql.py
+++ b/test/dialect/test_mssql.py
@@ -6,7 +6,7 @@ from sqlalchemy import types, exc, schema
from sqlalchemy.orm import *
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
-from sqlalchemy.dialects.mssql import pyodbc
+from sqlalchemy.dialects.mssql import pyodbc, mxodbc
from sqlalchemy.engine import url
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_, emits_warning_on
@@ -22,7 +22,35 @@ class CompileTest(TestBase, AssertsCompiledSQL):
def test_update(self):
t = table('sometable', column('somecolumn'))
self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :somecolumn_1", dict(somecolumn=10))
-
+
+ # TODO: should this be for *all* MS-SQL dialects ?
+ def test_mxodbc_binds(self):
+ """mxodbc uses MS-SQL native binds, which aren't allowed in various places."""
+
+ mxodbc_dialect = mxodbc.dialect()
+ t = table('sometable', column('foo'))
+
+ for expr, compile in [
+ (
+ select([literal("x"), literal("y")]),
+ "SELECT 'x', 'y'",
+ ),
+ (
+ select([t]).where(t.c.foo.in_(['x', 'y', 'z'])),
+ "SELECT sometable.foo FROM sometable WHERE sometable.foo IN ('x', 'y', 'z')",
+ ),
+ (
+ func.foobar("x", "y", 4, 5),
+ "foobar('x', 'y', 4, 5)",
+ ),
+ (
+ select([t]).where(func.len('xyz') > func.len(t.c.foo)),
+ "SELECT sometable.foo FROM sometable WHERE len('xyz') > len(sometable.foo)",
+ )
+ ]:
+ self.assert_compile(expr, compile, dialect=mxodbc_dialect)
+
+
def test_in_with_subqueries(self):
"""Test that when using subqueries in a binary expression
the == and != are changed to IN and NOT IN respectively.
@@ -127,15 +155,24 @@ class CompileTest(TestBase, AssertsCompiledSQL):
column('col4'))
(s1, s2) = (
- select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
- select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
+ select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
+ t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
+ select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
+ t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
)
u = union(s1, s2, order_by=['col3', 'col4'])
- self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:col2_1, :col2_2) "\
- "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4) ORDER BY col3, col4")
-
- self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE "\
- "t1.col2 IN (:col2_1, :col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4)) AS bar")
+ self.assert_compile(u,
+ "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN "
+ "(:col2_1, :col2_2) "\
+ "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 "
+ "IN (:col2_3, :col2_4) ORDER BY col3, col4")
+
+ self.assert_compile(u.alias('bar').select(),
+ "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, "
+ "t1.col4 AS col4 FROM t1 WHERE "\
+ "t1.col2 IN (:col2_1, :col2_2) UNION SELECT t2.col3 AS col3, "
+ "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4)) "
+ "AS bar")
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index a189594b7..8664ba6dc 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -234,8 +234,9 @@ class QueryTest(TestBase):
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
- simple labels in ORDER BYs now render as the actual labelname
- which not every database supports.
+ This test should be modified to support [ticket:1068] when that ticket
+ is implemented. For now, you need to put the actual string in the
+ ORDER BY.
"""
users.insert().execute(
@@ -246,26 +247,30 @@ class QueryTest(TestBase):
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
- select([concat]).order_by(concat).execute().fetchall(),
+ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
eq_(
- select([concat]).order_by(concat).execute().fetchall(),
+ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
- select([concat]).order_by(desc(concat)).execute().fetchall(),
+ select([concat]).order_by(desc('thedata')).execute().fetchall(),
[("test: jack",), ("test: fred",), ("test: ed",)]
)
- concat = ("test: " + users.c.user_name).label('thedata')
- eq_(
- select([concat]).order_by(concat + "x").execute().fetchall(),
- [("test: ed",), ("test: fred",), ("test: jack",)]
- )
+ @testing.fails_on('postgresql', 'only simple labels allowed')
+ @testing.fails_on('sybase', 'only simple labels allowed')
+ def go():
+ concat = ("test: " + users.c.user_name).label('thedata')
+ eq_(
+ select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(),
+ [("test: ed",), ("test: fred",), ("test: jack",)]
+ )
+ go()
def test_row_comparison(self):
@@ -768,11 +773,18 @@ class QueryTest(TestBase):
assert len(r) == 0
@testing.emits_warning('.*empty sequence.*')
- @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
+ @testing.fails_on('firebird', "uses sql-92 rules")
+ @testing.fails_on('sybase', "uses sql-92 rules")
@testing.fails_if(lambda:
testing.against('mssql+pyodbc') and not testing.db.dialect.freetds,
- "not supported by Windows ODBC driver")
+ "uses sql-92 rules")
def test_bind_in(self):
+ """test calling IN against a bind parameter.
+
+ this isn't allowed on several platforms since we
+ generate ? = ?.
+
+ """
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
users.insert().execute(user_id = 9, user_name = None)
@@ -784,7 +796,21 @@ class QueryTest(TestBase):
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_literal_in(self):
+ """similar to test_bind_in but use a bind with a value."""
+
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 9, user_name = None)
+ s = users.select(not_(literal("john").in_([])))
+ r = s.execute().fetchall()
+ assert len(r) == 3
+
+
+
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', 'FIXME: unknown')
@testing.fails_on('maxdb', 'FIXME: unknown')
diff --git a/test/sql/test_select.py b/test/sql/test_select.py
index 33bbe5ff4..d27819c18 100644
--- a/test/sql/test_select.py
+++ b/test/sql/test_select.py
@@ -684,58 +684,94 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def test_orderby_groupby(self):
self.assert_compile(
table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
)
self.assert_compile(
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
# generative order_by
self.assert_compile(
table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
+ table2.select().order_by(table2.c.otherid).
+ order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
+ select(
+ [table2.c.othername, func.count(table2.c.otherid)],
+ group_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername"
)
# generative group by
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
+ select([table2.c.othername, func.count(table2.c.otherid)]).
+ group_by(table2.c.othername),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable"
+ select([table2.c.othername, func.count(table2.c.otherid)]).
+ group_by(table2.c.othername).group_by(None),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
+ select([table2.c.othername, func.count(table2.c.otherid)],
+ group_by = [table2.c.othername],
+ order_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
def test_for_update(self):
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="nowait"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="nowait"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
+ dialect=oracle.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="read"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
+ dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
+ dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
+ dialect=oracle.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
@@ -750,32 +786,42 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
,dialect=dialect)
self.assert_compile(
- select([table1.alias()])
- ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1")
+ select([table1.alias()]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable AS mytable_1")
- # create a select for a join of two tables. use_labels means the column names will have
- # labels tablename_columnname, which become the column keys accessible off the Selectable object.
- # also, only use one column from the second table and all columns from the first table1.
- q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True)
+ # create a select for a join of two tables. use_labels
+ # means the column names will have labels tablename_columnname,
+ # which become the column keys accessible off the Selectable object.
+ # also, only use one column from the second table and all columns
+ # from the first table1.
+ q = select(
+ [table1, table2.c.otherid],
+ table1.c.myid == table2.c.otherid, use_labels = True
+ )
- # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
+ # make an alias of the "selectable". column names
+ # stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
# select from that alias, also using labels. two levels of labels should produce two underscores.
# also, reference the column "mytable_myid" off of the t2view alias.
self.assert_compile(
a.select(a.c.mytable_myid == 9, use_labels = True),
- "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
-t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
-(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
-myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
-WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
+ "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
+ "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
+ "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
+ "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
+ "mytable.description AS mytable_description, myothertable.otherid AS "
+ "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
+ "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
)
def test_prefixes(self):
self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
- "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
+ "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING "
+ "mytable.myid, mytable.name, mytable.description FROM mytable"
)
def test_text(self):
@@ -789,16 +835,20 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
+ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
+ )
# test unicode
self.assert_compile(select(
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
+ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
+ )
# test building a select query programmatically with text
s = select()
@@ -808,11 +858,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
s.append_whereclause("column2=19")
s = s.order_by("column1")
s.append_from("table1")
- self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1")
+ self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
+ "column1=12 AND column2=19 ORDER BY column1")
self.assert_compile(
select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
- "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
+ "SELECT somealias.column1, somealias.column2 FROM "
+ "(SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
@@ -827,14 +879,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
- print "---------------------------------------------"
s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
- print "---------------------------------------------"
# test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
# exported columns dont get quoted
self.assert_compile(
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
- "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
+ "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
+ "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
)
self.assert_compile(
@@ -844,7 +895,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
def test_binds_in_text(self):
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
)
@@ -858,7 +910,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
dialect = postgresql.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect=dialect
@@ -875,7 +928,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
dialect = sqlite.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
dialect=dialect
@@ -889,25 +943,80 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
table1.c.myid == table2.c.otherid,
)
),
- "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
-FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, sysdate(), foo, bar, lala "
+ "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
+ "datetime(foo) = Today AND mytable.myid = myothertable.otherid")
self.assert_compile(select(
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
from_obj = ["(select f from bar where lala=heyhey) foo"]
),
- "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
+ "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
+ "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
# test Text embedded within select_from(), using binds
- generate_series = text("generate_series(:x, :y, :z) as s(a)", bindparams=[bindparam('x'), bindparam('y'), bindparam('z')])
-
- s =select([(func.current_date() + literal_column("s.a")).label("dates")]).select_from(generate_series)
- self.assert_compile(s, "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None})
+ generate_series = text(
+ "generate_series(:x, :y, :z) as s(a)",
+ bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
+ )
+
+ s =select([
+ (func.current_date() + literal_column("s.a")).label("dates")
+ ]).select_from(generate_series)
+ self.assert_compile(
+ s,
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': None, 'x': None, 'z': None}
+ )
+
+ self.assert_compile(
+ s.params(x=5, y=6, z=7),
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': 6, 'x': 5, 'z': 7}
+ )
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_render_binds_as_literal(self):
+ """test a compiler that renders binds inline into
+ SQL in the columns clause."""
- self.assert_compile(s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7})
+ dialect = default.DefaultDialect()
+ class Compiler(dialect.statement_compiler):
+ ansi_bind_rules = True
+ dialect.statement_compiler = Compiler
+ self.assert_compile(
+ select([literal("someliteral")]),
+ "SELECT 'someliteral'",
+ dialect=dialect
+ )
+
+ self.assert_compile(
+ select([table1.c.myid + 3]),
+ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
+ dialect=dialect
+ )
+ self.assert_compile(
+ select([table1.c.myid.in_([4, 5, 6])]),
+ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
+ dialect=dialect
+ )
+
+ self.assert_compile(
+ select([literal("foo").in_([])]),
+ "SELECT 'foo' != 'foo' AS anon_1",
+ dialect=dialect
+ )
+
+ assert_raises(
+ exc.CompileError,
+ bindparam("foo").in_([]).compile, dialect=dialect
+ )
+
+
def test_literal(self):
self.assert_compile(select([literal('foo')]), "SELECT :param_1")