summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-03-17 15:15:44 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2010-03-17 15:15:44 -0400
commit318f47dc80c58dee8c798afcc8c19a5dbb21eef7 (patch)
treeefd951b139017acb14302b33b2cad9ecab88fb3b
parentb81e9741ba26f2740725c9d403d116284af7d7a4 (diff)
parent55367ac4a26dbc3c0e57783c3964cd5c42647a35 (diff)
downloadsqlalchemy-318f47dc80c58dee8c798afcc8c19a5dbb21eef7.tar.gz
- added pyodbc for sybase driver.
- generalized the "freetds" / "unicode statements" behavior of MS-SQL/pyodbc into the base Pyodbc connector, as this seems to apply to Sybase as well. - generalized the python-sybase "use autocommit for DDL" into the pyodbc connector. With pyodbc, the "autocommit" flag on connection is used, as Pyodbc seems to have more database conversation than python-sybase that can't otherwise be suppressed. - Some platforms will now interpret certain literal values as non-bind parameters, rendered literally into the SQL statement. This to support strict SQL-92 rules that are enforced by some platforms including MS-SQL and Sybase. In this model, bind parameters aren't allowed in the columns clause of a SELECT, nor are certain ambiguous expressions like "?=?". When this mode is enabled, the base compiler will render the binds as inline literals, but only across strings and numeric values. Other types such as dates will raise an error, unless the dialect subclass defines a literal rendering function for those. The bind parameter must have an embedded literal value already or an error is raised (i.e. won't work with straight bindparam('x')). Dialects can also expand upon the areas where binds are not accepted, such as within argument lists of functions (which don't work on MS-SQL when native SQL binding is used).
-rw-r--r--CHANGES22
-rw-r--r--doc/build/dbengine.rst2
-rw-r--r--lib/sqlalchemy/connectors/mxodbc.py5
-rw-r--r--lib/sqlalchemy/dialects/access/base.py4
-rw-r--r--lib/sqlalchemy/dialects/maxdb/base.py4
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py44
-rw-r--r--lib/sqlalchemy/dialects/mssql/mxodbc.py11
-rw-r--r--lib/sqlalchemy/dialects/mssql/pyodbc.py13
-rw-r--r--lib/sqlalchemy/dialects/sqlite/base.py4
-rw-r--r--lib/sqlalchemy/dialects/sybase/base.py32
-rw-r--r--lib/sqlalchemy/dialects/sybase/pyodbc.py7
-rw-r--r--lib/sqlalchemy/engine/default.py1
-rw-r--r--lib/sqlalchemy/sql/compiler.py141
-rw-r--r--test/dialect/test_mssql.py55
-rw-r--r--test/sql/test_query.py50
-rw-r--r--test/sql/test_select.py211
16 files changed, 452 insertions, 154 deletions
diff --git a/CHANGES b/CHANGES
index 71db59899..a8f35cf22 100644
--- a/CHANGES
+++ b/CHANGES
@@ -146,6 +146,23 @@ CHANGES
including their ddl listener and other event callables.
[ticket:1694] [ticket:1698]
+ - Some platforms will now interpret certain literal values
+ as non-bind parameters, rendered literally into the SQL
+ statement. This to support strict SQL-92 rules that are
+ enforced by some platforms including MS-SQL and Sybase.
+ In this model, bind parameters aren't allowed in the
+ columns clause of a SELECT, nor are certain ambiguous
+ expressions like "?=?". When this mode is enabled, the base
+ compiler will render the binds as inline literals, but only across
+ strings and numeric values. Other types such as dates
+ will raise an error, unless the dialect subclass defines
+ a literal rendering function for those. The bind parameter
+ must have an embedded literal value already or an error
+ is raised (i.e. won't work with straight bindparam('x')).
+ Dialects can also expand upon the areas where binds are not
+ accepted, such as within argument lists of functions
+ (which don't work on MS-SQL when native SQL binding is used).
+
- Added "unicode_errors" parameter to String, Unicode, etc.
Behaves like the 'errors' keyword argument to
the standard library's string.decode() functions. This flag
@@ -372,8 +389,9 @@ CHANGES
will be returned as a string. [ticket:1685]
- sybase
- - Implemented a preliminary working dialect for Sybase
- based on the Python-Sybase driver. Handles table
+ - Implemented a preliminary working dialect for Sybase,
+ with sub-implementations for Python-Sybase as well
+ as Pyodbc. Handles table
creates/drops and basic round trip functionality.
Does not yet include reflection or comprehensive
support of unicode/special expressions/etc.
diff --git a/doc/build/dbengine.rst b/doc/build/dbengine.rst
index 5c61a67d8..261c54c10 100644
--- a/doc/build/dbengine.rst
+++ b/doc/build/dbengine.rst
@@ -118,7 +118,7 @@ sqlite3_ ``sqlite+pysqlite``\* yes yes
**Sybase ASE**
-------------------------------------------------------------------------------------------------------------------------------
mxodbc_ ``sybase+mxodbc`` development development no yes yes
-pyodbc_ ``sybase+pyodbc`` development development no unknown unknown
+pyodbc_ ``sybase+pyodbc`` partial development no unknown unknown
python-sybase_ ``sybase+pysybase``\* partial development no yes yes
========================= =========================== =========== =========== =========== ================= ============
diff --git a/lib/sqlalchemy/connectors/mxodbc.py b/lib/sqlalchemy/connectors/mxodbc.py
index 29b047d23..68b88019c 100644
--- a/lib/sqlalchemy/connectors/mxodbc.py
+++ b/lib/sqlalchemy/connectors/mxodbc.py
@@ -96,9 +96,4 @@ class MxODBCConnector(Connector):
version.append(n)
return tuple(version)
- def do_execute(self, cursor, statement, parameters, context=None):
- # TODO: dont need tuple() here
- # TODO: use cursor.execute()
- cursor.executedirect(statement, tuple(parameters))
-
diff --git a/lib/sqlalchemy/dialects/access/base.py b/lib/sqlalchemy/dialects/access/base.py
index c10e77011..7dfb3153e 100644
--- a/lib/sqlalchemy/dialects/access/base.py
+++ b/lib/sqlalchemy/dialects/access/base.py
@@ -371,9 +371,9 @@ class AccessCompiler(compiler.SQLCompiler):
return (self.process(join.left, asfrom=True) + (join.isouter and " LEFT OUTER JOIN " or " INNER JOIN ") + \
self.process(join.right, asfrom=True) + " ON " + self.process(join.onclause))
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
class AccessDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
diff --git a/lib/sqlalchemy/dialects/maxdb/base.py b/lib/sqlalchemy/dialects/maxdb/base.py
index 504c31209..758cfaf05 100644
--- a/lib/sqlalchemy/dialects/maxdb/base.py
+++ b/lib/sqlalchemy/dialects/maxdb/base.py
@@ -558,8 +558,8 @@ class MaxDBCompiler(compiler.SQLCompiler):
return labels
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
# ORDER BY clauses in DISTINCT queries must reference aliased
# inner columns by alias name, not true column name.
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index 254aa54fd..4d697854f 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -843,8 +843,9 @@ class MSExecutionContext(default.DefaultExecutionContext):
class MSSQLCompiler(compiler.SQLCompiler):
returning_precedes_values = True
- extract_map = compiler.SQLCompiler.extract_map.copy()
- extract_map.update ({
+ extract_map = util.update_copy(
+ compiler.SQLCompiler.extract_map,
+ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond',
@@ -937,9 +938,9 @@ class MSSQLCompiler(compiler.SQLCompiler):
kwargs['mssql_aliased'] = True
return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def visit_rollback_to_savepoint(self, savepoint_stmt):
return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(savepoint_stmt)
@@ -1011,8 +1012,8 @@ class MSSQLCompiler(compiler.SQLCompiler):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
# MSSQL only allows ORDER BY in subqueries if there is a LIMIT
if order_by and (not self.is_subquery() or select._limit):
@@ -1020,6 +1021,37 @@ class MSSQLCompiler(compiler.SQLCompiler):
else:
return ""
+class MSSQLStrictCompiler(MSSQLCompiler):
+ """A subclass of MSSQLCompiler which disables the usage of bind
+ parameters where not allowed natively by MS-SQL.
+
+ A dialect may use this compiler on a platform where native
+ binds are used.
+
+ """
+ ansi_bind_rules = True
+
+ def visit_in_op(self, binary, **kw):
+ kw['literal_binds'] = True
+ return "%s IN %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
+ def visit_notin_op(self, binary, **kw):
+ kw['literal_binds'] = True
+ return "%s NOT IN %s" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
+ def visit_function(self, func, **kw):
+ kw['literal_binds'] = True
+ return super(MSSQLStrictCompiler, self).visit_function(func, **kw)
+
+ #def render_literal_value(self, value):
+ # TODO! use mxODBC's literal quoting services here
+
class MSDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
diff --git a/lib/sqlalchemy/dialects/mssql/mxodbc.py b/lib/sqlalchemy/dialects/mssql/mxodbc.py
index bf14601b8..59cf65d63 100644
--- a/lib/sqlalchemy/dialects/mssql/mxodbc.py
+++ b/lib/sqlalchemy/dialects/mssql/mxodbc.py
@@ -4,9 +4,10 @@ import sys
from sqlalchemy import types as sqltypes
from sqlalchemy.connectors.mxodbc import MxODBCConnector
from sqlalchemy.dialects.mssql.pyodbc import MSExecutionContext_pyodbc
-from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
-
+from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect, \
+ MSSQLCompiler, MSSQLStrictCompiler
+
class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
"""
The pyodbc execution context is useful for enabling
@@ -20,7 +21,11 @@ class MSExecutionContext_mxodbc(MSExecutionContext_pyodbc):
class MSDialect_mxodbc(MxODBCConnector, MSDialect):
execution_ctx_cls = MSExecutionContext_mxodbc
-
+
+ # TODO: may want to use this only if FreeTDS is not in use,
+ # since FreeTDS doesn't seem to use native binds.
+ statement_compiler = MSSQLStrictCompiler
+
def __init__(self, description_encoding='latin-1', **params):
super(MSDialect_mxodbc, self).__init__(**params)
self.description_encoding = description_encoding
diff --git a/lib/sqlalchemy/dialects/mssql/pyodbc.py b/lib/sqlalchemy/dialects/mssql/pyodbc.py
index 54b43320a..34050271f 100644
--- a/lib/sqlalchemy/dialects/mssql/pyodbc.py
+++ b/lib/sqlalchemy/dialects/mssql/pyodbc.py
@@ -1,3 +1,16 @@
+"""
+Support for MS-SQL via pyodbc.
+
+http://pypi.python.org/pypi/pyodbc/
+
+Connect strings are of the form::
+
+ mssql+pyodbc://<username>:<password>@<dsn>/
+ mssql+pyodbc://<username>:<password>@<host>/<database>
+
+
+"""
+
from sqlalchemy.dialects.mssql.base import MSExecutionContext, MSDialect
from sqlalchemy.connectors.pyodbc import PyODBCConnector
from sqlalchemy import types as sqltypes
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py
index 37e63fbc1..98df8d0cb 100644
--- a/lib/sqlalchemy/dialects/sqlite/base.py
+++ b/lib/sqlalchemy/dialects/sqlite/base.py
@@ -218,10 +218,10 @@ class SQLiteCompiler(compiler.SQLCompiler):
else:
return self.process(cast.clause)
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
try:
return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
- self.extract_map[extract.field], self.process(extract.expr))
+ self.extract_map[extract.field], self.process(extract.expr, **kw))
except KeyError:
raise exc.ArgumentError(
"%s is not a valid extract argument." % extract.field)
diff --git a/lib/sqlalchemy/dialects/sybase/base.py b/lib/sqlalchemy/dialects/sybase/base.py
index 5d20faaf9..c440015d0 100644
--- a/lib/sqlalchemy/dialects/sybase/base.py
+++ b/lib/sqlalchemy/dialects/sybase/base.py
@@ -236,9 +236,11 @@ class SybaseExecutionContext(default.DefaultExecutionContext):
return lastrowid
class SybaseSQLCompiler(compiler.SQLCompiler):
+ ansi_bind_rules = True
- extract_map = compiler.SQLCompiler.extract_map.copy()
- extract_map.update ({
+ extract_map = util.update_copy(
+ compiler.SQLCompiler.extract_map,
+ {
'doy': 'dayofyear',
'dow': 'weekday',
'milliseconds': 'millisecond'
@@ -267,33 +269,17 @@ class SybaseSQLCompiler(compiler.SQLCompiler):
# Limit in sybase is after the select keyword
return ""
- def dont_visit_binary(self, binary):
- """Move bind parameters to the right-hand side of an operator, where possible."""
- if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq:
- return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator))
- else:
- return super(SybaseSQLCompiler, self).visit_binary(binary)
-
- def dont_label_select_column(self, select, column, asfrom):
- if isinstance(column, expression.Function):
- return column.label(None)
- else:
- return super(SybaseSQLCompiler, self).label_select_column(select, column, asfrom)
-
-# def visit_getdate_func(self, fn, **kw):
- # TODO: need to cast? something ?
-# pass
-
- def visit_extract(self, extract):
+ def visit_extract(self, extract, **kw):
field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % (field, self.process(extract.expr))
+ return 'DATEPART("%s", %s)' % (field, self.process(extract.expr, **kw))
def for_update_clause(self, select):
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ kw['literal_binds'] = True
+ order_by = self.process(select._order_by_clause, **kw)
# SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT
if order_by and (not self.is_subquery() or select._limit):
diff --git a/lib/sqlalchemy/dialects/sybase/pyodbc.py b/lib/sqlalchemy/dialects/sybase/pyodbc.py
index 4f89fe334..1bfdb6151 100644
--- a/lib/sqlalchemy/dialects/sybase/pyodbc.py
+++ b/lib/sqlalchemy/dialects/sybase/pyodbc.py
@@ -1,7 +1,12 @@
"""
Support for Sybase via pyodbc.
-This dialect is a stub only and is likely non functional at this time.
+http://pypi.python.org/pypi/pyodbc/
+
+Connect strings are of the form::
+
+ sybase+pyodbc://<username>:<password>@<dsn>/
+ sybase+pyodbc://<username>:<password>@<host>/<database>
"""
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index ce24a9ae4..2ef8fd104 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -62,6 +62,7 @@ class DefaultDialect(base.Dialect):
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
dbapi_type_map = {}
+ colspecs = {}
default_paramstyle = 'named'
supports_default_values = False
supports_empty_insert = True
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index be3375def..4e9175ae8 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -26,6 +26,7 @@ import re
from sqlalchemy import schema, engine, util, exc
from sqlalchemy.sql import operators, functions, util as sql_util, visitors
from sqlalchemy.sql import expression as sql
+import decimal
RESERVED_WORDS = set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
@@ -183,6 +184,12 @@ class SQLCompiler(engine.Compiled):
# clauses before the VALUES or WHERE clause (i.e. MSSQL)
returning_precedes_values = False
+ # SQL 92 doesn't allow bind parameters to be used
+ # in the columns clause of a SELECT, nor does it allow
+ # ambiguous expressions like "? = ?". A compiler
+ # subclass can set this flag to False if the target
+ # driver/DB enforces this
+ ansi_bind_rules = False
def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
@@ -260,9 +267,14 @@ class SQLCompiler(engine.Compiled):
else:
if bindparam.required:
if _group_number:
- raise exc.InvalidRequestError("A value is required for bind parameter %r, in parameter group %d" % (bindparam.key, _group_number))
+ raise exc.InvalidRequestError(
+ "A value is required for bind parameter %r, "
+ "in parameter group %d" %
+ (bindparam.key, _group_number))
else:
- raise exc.InvalidRequestError("A value is required for bind parameter %r" % bindparam.key)
+ raise exc.InvalidRequestError(
+ "A value is required for bind parameter %r"
+ % bindparam.key)
elif util.callable(bindparam.value):
pd[name] = bindparam.value()
else:
@@ -290,10 +302,10 @@ class SQLCompiler(engine.Compiled):
"""
return ""
- def visit_grouping(self, grouping, **kwargs):
- return "(" + self.process(grouping.element) + ")"
+ def visit_grouping(self, grouping, asfrom=False, **kwargs):
+ return "(" + self.process(grouping.element, **kwargs) + ")"
- def visit_label(self, label, result_map=None, within_columns_clause=False):
+ def visit_label(self, label, result_map=None, within_columns_clause=False, **kw):
# only render labels within the columns clause
# or ORDER BY clause of a select. dialect-specific compilers
# can modify this behavior.
@@ -305,11 +317,15 @@ class SQLCompiler(engine.Compiled):
result_map[labelname.lower()] = \
(label.name, (label, label.element, labelname), label.element.type)
- return self.process(label.element) + \
+ return self.process(label.element,
+ within_columns_clause=within_columns_clause,
+ **kw) + \
OPERATORS[operators.as_] + \
self.preparer.format_label(label, labelname)
else:
- return self.process(label.element)
+ return self.process(label.element,
+ within_columns_clause=within_columns_clause,
+ **kw)
def visit_column(self, column, result_map=None, **kwargs):
name = column.name
@@ -384,27 +400,28 @@ class SQLCompiler(engine.Compiled):
sep = " "
else:
sep = OPERATORS[clauselist.operator]
- return sep.join(s for s in (self.process(c) for c in clauselist.clauses)
+ return sep.join(s for s in (self.process(c, **kwargs) for c in clauselist.clauses)
if s is not None)
def visit_case(self, clause, **kwargs):
x = "CASE "
if clause.value is not None:
- x += self.process(clause.value) + " "
+ x += self.process(clause.value, **kwargs) + " "
for cond, result in clause.whens:
- x += "WHEN " + self.process(cond) + " THEN " + self.process(result) + " "
+ x += "WHEN " + self.process(cond, **kwargs) + \
+ " THEN " + self.process(result, **kwargs) + " "
if clause.else_ is not None:
- x += "ELSE " + self.process(clause.else_) + " "
+ x += "ELSE " + self.process(clause.else_, **kwargs) + " "
x += "END"
return x
def visit_cast(self, cast, **kwargs):
return "CAST(%s AS %s)" % \
- (self.process(cast.clause), self.process(cast.typeclause))
+ (self.process(cast.clause, **kwargs), self.process(cast.typeclause, **kwargs))
def visit_extract(self, extract, **kwargs):
field = self.extract_map.get(extract.field, extract.field)
- return "EXTRACT(%s FROM %s)" % (field, self.process(extract.expr))
+ return "EXTRACT(%s FROM %s)" % (field, self.process(extract.expr, **kwargs))
def visit_function(self, func, result_map=None, **kwargs):
if result_map is not None:
@@ -421,22 +438,23 @@ class SQLCompiler(engine.Compiled):
def function_argspec(self, func, **kwargs):
return self.process(func.clause_expr, **kwargs)
- def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):
+ def visit_compound_select(self, cs, asfrom=False, parens=True, compound_index=1, **kwargs):
entry = self.stack and self.stack[-1] or {}
self.stack.append({'from':entry.get('from', None), 'iswrapper':True})
keyword = self.compound_keywords.get(cs.keyword)
text = (" " + keyword + " ").join(
- (self.process(c, asfrom=asfrom, parens=False, compound_index=i)
+ (self.process(c, asfrom=asfrom, parens=False,
+ compound_index=i, **kwargs)
for i, c in enumerate(cs.selects))
)
- group_by = self.process(cs._group_by_clause, asfrom=asfrom)
+ group_by = self.process(cs._group_by_clause, asfrom=asfrom, **kwargs)
if group_by:
text += " GROUP BY " + group_by
- text += self.order_by_clause(cs)
+ text += self.order_by_clause(cs, **kwargs)
text += (cs._limit is not None or cs._offset is not None) and self.limit_clause(cs) or ""
self.stack.pop(-1)
@@ -453,32 +471,47 @@ class SQLCompiler(engine.Compiled):
s = s + OPERATORS[unary.modifier]
return s
- def visit_binary(self, binary, **kwargs):
-
+ def visit_binary(self, binary, **kw):
+ # don't allow "? = ?" to render
+ if self.ansi_bind_rules and \
+ isinstance(binary.left, sql._BindParamClause) and \
+ isinstance(binary.right, sql._BindParamClause):
+ kw['literal_binds'] = True
+
return self._operator_dispatch(binary.operator,
binary,
- lambda opstr: self.process(binary.left) + opstr + self.process(binary.right),
- **kwargs
+ lambda opstr: self.process(binary.left, **kw) +
+ opstr +
+ self.process(binary.right, **kw),
+ **kw
)
def visit_like_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s LIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s LIKE %s' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_notlike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return '%s NOT LIKE %s' % (self.process(binary.left), self.process(binary.right)) \
+ return '%s NOT LIKE %s' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_ilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return 'lower(%s) LIKE lower(%s)' % (self.process(binary.left), self.process(binary.right)) \
+ return 'lower(%s) LIKE lower(%s)' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def visit_notilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
- return 'lower(%s) NOT LIKE lower(%s)' % (self.process(binary.left), self.process(binary.right)) \
+ return 'lower(%s) NOT LIKE lower(%s)' % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
def _operator_dispatch(self, operator, element, fn, **kw):
@@ -491,7 +524,16 @@ class SQLCompiler(engine.Compiled):
else:
return fn(" " + operator + " ")
- def visit_bindparam(self, bindparam, **kwargs):
+ def visit_bindparam(self, bindparam, within_columns_clause=False,
+ literal_binds=False, **kwargs):
+ if literal_binds or \
+ (within_columns_clause and \
+ self.ansi_bind_rules):
+ if bindparam.value is None:
+ raise exc.CompileError("Bind parameter without a "
+ "renderable value not allowed here.")
+ return self.render_literal_bindparam(bindparam, within_columns_clause=True, **kwargs)
+
name = self._truncate_bindparam(bindparam)
if name in self.binds:
existing = self.binds[name]
@@ -510,7 +552,36 @@ class SQLCompiler(engine.Compiled):
self.binds[bindparam.key] = self.binds[name] = bindparam
return self.bindparam_string(name)
-
+
+ def render_literal_bindparam(self, bindparam, **kw):
+ value = bindparam.value
+ processor = bindparam.bind_processor(self.dialect)
+ if processor:
+ value = processor(value)
+ return self.render_literal_value(value, bindparam.type)
+
+ def render_literal_value(self, value, type_):
+ """Render the value of a bind parameter as a quoted literal.
+
+ This is used for statement sections that do not accept bind paramters
+ on the target driver/database.
+
+ This should be implemented by subclasses using the quoting services
+ of the DBAPI.
+
+ """
+ if isinstance(value, basestring):
+ value = value.replace("'", "''")
+ return "'%s'" % value
+ elif value is None:
+ return "NULL"
+ elif isinstance(value, (float, int, long)):
+ return repr(value)
+ elif isinstance(value, decimal.Decimal):
+ return str(value)
+ else:
+ raise NotImplementedError("Don't know how to literal-quote value %r" % value)
+
def _truncate_bindparam(self, bindparam):
if bindparam in self.bind_names:
return self.bind_names[bindparam]
@@ -624,33 +695,33 @@ class SQLCompiler(engine.Compiled):
text = "SELECT " # we're off to a good start !
if select._prefixes:
- text += " ".join(self.process(x) for x in select._prefixes) + " "
+ text += " ".join(self.process(x, **kwargs) for x in select._prefixes) + " "
text += self.get_select_precolumns(select)
text += ', '.join(inner_columns)
if froms:
text += " \nFROM "
- text += ', '.join(self.process(f, asfrom=True) for f in froms)
+ text += ', '.join(self.process(f, asfrom=True, **kwargs) for f in froms)
else:
text += self.default_from()
if select._whereclause is not None:
- t = self.process(select._whereclause)
+ t = self.process(select._whereclause, **kwargs)
if t:
text += " \nWHERE " + t
if select._group_by_clause.clauses:
- group_by = self.process(select._group_by_clause)
+ group_by = self.process(select._group_by_clause, **kwargs)
if group_by:
text += " GROUP BY " + group_by
if select._having is not None:
- t = self.process(select._having)
+ t = self.process(select._having, **kwargs)
if t:
text += " \nHAVING " + t
if select._order_by_clause.clauses:
- text += self.order_by_clause(select)
+ text += self.order_by_clause(select, **kwargs)
if select._limit is not None or select._offset is not None:
text += self.limit_clause(select)
if select.for_update:
@@ -670,8 +741,8 @@ class SQLCompiler(engine.Compiled):
"""
return select._distinct and "DISTINCT " or ""
- def order_by_clause(self, select):
- order_by = self.process(select._order_by_clause)
+ def order_by_clause(self, select, **kw):
+ order_by = self.process(select._order_by_clause, **kw)
if order_by:
return " ORDER BY " + order_by
else:
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
index 89a3af5fb..8092d8cdc 100644
--- a/test/dialect/test_mssql.py
+++ b/test/dialect/test_mssql.py
@@ -6,7 +6,7 @@ from sqlalchemy import types, exc, schema
from sqlalchemy.orm import *
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
-from sqlalchemy.dialects.mssql import pyodbc
+from sqlalchemy.dialects.mssql import pyodbc, mxodbc
from sqlalchemy.engine import url
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_, emits_warning_on
@@ -22,7 +22,35 @@ class CompileTest(TestBase, AssertsCompiledSQL):
def test_update(self):
t = table('sometable', column('somecolumn'))
self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :somecolumn_1", dict(somecolumn=10))
-
+
+ # TODO: should this be for *all* MS-SQL dialects ?
+ def test_mxodbc_binds(self):
+ """mxodbc uses MS-SQL native binds, which aren't allowed in various places."""
+
+ mxodbc_dialect = mxodbc.dialect()
+ t = table('sometable', column('foo'))
+
+ for expr, compile in [
+ (
+ select([literal("x"), literal("y")]),
+ "SELECT 'x', 'y'",
+ ),
+ (
+ select([t]).where(t.c.foo.in_(['x', 'y', 'z'])),
+ "SELECT sometable.foo FROM sometable WHERE sometable.foo IN ('x', 'y', 'z')",
+ ),
+ (
+ func.foobar("x", "y", 4, 5),
+ "foobar('x', 'y', 4, 5)",
+ ),
+ (
+ select([t]).where(func.len('xyz') > func.len(t.c.foo)),
+ "SELECT sometable.foo FROM sometable WHERE len('xyz') > len(sometable.foo)",
+ )
+ ]:
+ self.assert_compile(expr, compile, dialect=mxodbc_dialect)
+
+
def test_in_with_subqueries(self):
"""Test that when using subqueries in a binary expression
the == and != are changed to IN and NOT IN respectively.
@@ -127,15 +155,24 @@ class CompileTest(TestBase, AssertsCompiledSQL):
column('col4'))
(s1, s2) = (
- select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
- select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
+ select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
+ t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
+ select([t2.c.col3.label('col3'), t2.c.col4.label('col4')],
+ t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
)
u = union(s1, s2, order_by=['col3', 'col4'])
- self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:col2_1, :col2_2) "\
- "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4) ORDER BY col3, col4")
-
- self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE "\
- "t1.col2 IN (:col2_1, :col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4)) AS bar")
+ self.assert_compile(u,
+ "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN "
+ "(:col2_1, :col2_2) "\
+ "UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 "
+ "IN (:col2_3, :col2_4) ORDER BY col3, col4")
+
+ self.assert_compile(u.alias('bar').select(),
+ "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, "
+ "t1.col4 AS col4 FROM t1 WHERE "\
+ "t1.col2 IN (:col2_1, :col2_2) UNION SELECT t2.col3 AS col3, "
+ "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:col2_3, :col2_4)) "
+ "AS bar")
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index a189594b7..8664ba6dc 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -234,8 +234,9 @@ class QueryTest(TestBase):
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
- simple labels in ORDER BYs now render as the actual labelname
- which not every database supports.
+ This test should be modified to support [ticket:1068] when that ticket
+ is implemented. For now, you need to put the actual string in the
+ ORDER BY.
"""
users.insert().execute(
@@ -246,26 +247,30 @@ class QueryTest(TestBase):
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
- select([concat]).order_by(concat).execute().fetchall(),
+ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
eq_(
- select([concat]).order_by(concat).execute().fetchall(),
+ select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
- select([concat]).order_by(desc(concat)).execute().fetchall(),
+ select([concat]).order_by(desc('thedata')).execute().fetchall(),
[("test: jack",), ("test: fred",), ("test: ed",)]
)
- concat = ("test: " + users.c.user_name).label('thedata')
- eq_(
- select([concat]).order_by(concat + "x").execute().fetchall(),
- [("test: ed",), ("test: fred",), ("test: jack",)]
- )
+ @testing.fails_on('postgresql', 'only simple labels allowed')
+ @testing.fails_on('sybase', 'only simple labels allowed')
+ def go():
+ concat = ("test: " + users.c.user_name).label('thedata')
+ eq_(
+ select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(),
+ [("test: ed",), ("test: fred",), ("test: jack",)]
+ )
+ go()
def test_row_comparison(self):
@@ -768,11 +773,18 @@ class QueryTest(TestBase):
assert len(r) == 0
@testing.emits_warning('.*empty sequence.*')
- @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
+ @testing.fails_on('firebird', "uses sql-92 rules")
+ @testing.fails_on('sybase', "uses sql-92 rules")
@testing.fails_if(lambda:
testing.against('mssql+pyodbc') and not testing.db.dialect.freetds,
- "not supported by Windows ODBC driver")
+ "uses sql-92 rules")
def test_bind_in(self):
+ """test calling IN against a bind parameter.
+
+ this isn't allowed on several platforms since we
+ generate ? = ?.
+
+ """
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')
users.insert().execute(user_id = 9, user_name = None)
@@ -784,7 +796,21 @@ class QueryTest(TestBase):
assert len(r) == 3
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_literal_in(self):
+ """similar to test_bind_in but use a bind with a value."""
+
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 9, user_name = None)
+ s = users.select(not_(literal("john").in_([])))
+ r = s.execute().fetchall()
+ assert len(r) == 3
+
+
+
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', 'FIXME: unknown')
@testing.fails_on('maxdb', 'FIXME: unknown')
diff --git a/test/sql/test_select.py b/test/sql/test_select.py
index 33bbe5ff4..d27819c18 100644
--- a/test/sql/test_select.py
+++ b/test/sql/test_select.py
@@ -684,58 +684,94 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def test_orderby_groupby(self):
self.assert_compile(
table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
)
self.assert_compile(
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
# generative order_by
self.assert_compile(
table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
- "SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
+ "SELECT myothertable.otherid, myothertable.othername FROM "
+ "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
+ table2.select().order_by(table2.c.otherid).
+ order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
+ select(
+ [table2.c.othername, func.count(table2.c.otherid)],
+ group_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername"
)
# generative group by
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername"
+ select([table2.c.othername, func.count(table2.c.otherid)]).
+ group_by(table2.c.othername),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable"
+ select([table2.c.othername, func.count(table2.c.otherid)]).
+ group_by(table2.c.othername).group_by(None),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable"
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)], group_by = [table2.c.othername], order_by = [table2.c.othername]),
- "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
+ select([table2.c.othername, func.count(table2.c.otherid)],
+ group_by = [table2.c.othername],
+ order_by = [table2.c.othername]),
+ "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
+ "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
)
def test_for_update(self):
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="nowait"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="nowait"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
+ dialect=oracle.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update="read"),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
+ dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
+ dialect=mysql.dialect())
- self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", dialect=oracle.dialect())
+ self.assert_compile(
+ table1.select(table1.c.myid==7, for_update=True),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
+ dialect=oracle.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
@@ -750,32 +786,42 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
,dialect=dialect)
self.assert_compile(
- select([table1.alias()])
- ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1")
+ select([table1.alias()]),
+ "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM mytable AS mytable_1")
- # create a select for a join of two tables. use_labels means the column names will have
- # labels tablename_columnname, which become the column keys accessible off the Selectable object.
- # also, only use one column from the second table and all columns from the first table1.
- q = select([table1, table2.c.otherid], table1.c.myid == table2.c.otherid, use_labels = True)
+ # create a select for a join of two tables. use_labels
+ # means the column names will have labels tablename_columnname,
+ # which become the column keys accessible off the Selectable object.
+ # also, only use one column from the second table and all columns
+ # from the first table1.
+ q = select(
+ [table1, table2.c.otherid],
+ table1.c.myid == table2.c.otherid, use_labels = True
+ )
- # make an alias of the "selectable". column names stay the same (i.e. the labels), table name "changes" to "t2view".
+ # make an alias of the "selectable". column names
+ # stay the same (i.e. the labels), table name "changes" to "t2view".
a = alias(q, 't2view')
# select from that alias, also using labels. two levels of labels should produce two underscores.
# also, reference the column "mytable_myid" off of the t2view alias.
self.assert_compile(
a.select(a.c.mytable_myid == 9, use_labels = True),
- "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name AS t2view_mytable_name, \
-t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \
-(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \
-myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \
-WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
+ "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
+ "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
+ "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
+ "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
+ "mytable.description AS mytable_description, myothertable.otherid AS "
+ "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
+ "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
)
def test_prefixes(self):
self.assert_compile(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
- "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
+ "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING "
+ "mytable.myid, mytable.name, mytable.description FROM mytable"
)
def test_text(self):
@@ -789,16 +835,20 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
["foobar(a)", "pk_foo_bar(syslaal)"],
"a = 12",
from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
+ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
+ )
# test unicode
self.assert_compile(select(
[u"foobar(a)", u"pk_foo_bar(syslaal)"],
u"a = 12",
from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- u"SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar left outer join lala on foobar.foo = lala.foo WHERE a = 12")
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
+ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
+ )
# test building a select query programmatically with text
s = select()
@@ -808,11 +858,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
s.append_whereclause("column2=19")
s = s.order_by("column1")
s.append_from("table1")
- self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE column1=12 AND column2=19 ORDER BY column1")
+ self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
+ "column1=12 AND column2=19 ORDER BY column1")
self.assert_compile(
select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
- "SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
+ "SELECT somealias.column1, somealias.column2 FROM "
+ "(SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
@@ -827,14 +879,13 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
- print "---------------------------------------------"
s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
- print "---------------------------------------------"
# test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
# exported columns dont get quoted
self.assert_compile(
select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
- "SELECT column1 AS foobar, column2 AS hoho, myid FROM (SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
+ "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
+ "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
)
self.assert_compile(
@@ -844,7 +895,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
def test_binds_in_text(self):
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
"select * from foo where lala=:bar and hoho=:whee",
checkparams={'bar':4, 'whee': 7},
)
@@ -858,7 +910,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
dialect = postgresql.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
checkparams={'bar':4, 'whee': 7},
dialect=dialect
@@ -875,7 +928,8 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
dialect = sqlite.dialect()
self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
+ text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=? and hoho=?",
checkparams={'bar':4, 'whee':7},
dialect=dialect
@@ -889,25 +943,80 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
table1.c.myid == table2.c.otherid,
)
),
- "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, sysdate(), foo, bar, lala \
-FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today AND mytable.myid = myothertable.otherid")
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, sysdate(), foo, bar, lala "
+ "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
+ "datetime(foo) = Today AND mytable.myid = myothertable.otherid")
self.assert_compile(select(
[alias(table1, 't'), "foo.f"],
"foo.f = t.id",
from_obj = ["(select f from bar where lala=heyhey) foo"]
),
- "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, (select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
+ "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
+ "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
# test Text embedded within select_from(), using binds
- generate_series = text("generate_series(:x, :y, :z) as s(a)", bindparams=[bindparam('x'), bindparam('y'), bindparam('z')])
-
- s =select([(func.current_date() + literal_column("s.a")).label("dates")]).select_from(generate_series)
- self.assert_compile(s, "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None})
+ generate_series = text(
+ "generate_series(:x, :y, :z) as s(a)",
+ bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
+ )
+
+ s =select([
+ (func.current_date() + literal_column("s.a")).label("dates")
+ ]).select_from(generate_series)
+ self.assert_compile(
+ s,
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': None, 'x': None, 'z': None}
+ )
+
+ self.assert_compile(
+ s.params(x=5, y=6, z=7),
+ "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': 6, 'x': 5, 'z': 7}
+ )
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_render_binds_as_literal(self):
+ """test a compiler that renders binds inline into
+ SQL in the columns clause."""
- self.assert_compile(s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7})
+ dialect = default.DefaultDialect()
+ class Compiler(dialect.statement_compiler):
+ ansi_bind_rules = True
+ dialect.statement_compiler = Compiler
+ self.assert_compile(
+ select([literal("someliteral")]),
+ "SELECT 'someliteral'",
+ dialect=dialect
+ )
+
+ self.assert_compile(
+ select([table1.c.myid + 3]),
+ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
+ dialect=dialect
+ )
+ self.assert_compile(
+ select([table1.c.myid.in_([4, 5, 6])]),
+ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
+ dialect=dialect
+ )
+
+ self.assert_compile(
+ select([literal("foo").in_([])]),
+ "SELECT 'foo' != 'foo' AS anon_1",
+ dialect=dialect
+ )
+
+ assert_raises(
+ exc.CompileError,
+ bindparam("foo").in_([]).compile, dialect=dialect
+ )
+
+
def test_literal(self):
self.assert_compile(select([literal('foo')]), "SELECT :param_1")