summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-12-05 03:07:21 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-12-05 03:07:21 +0000
commit238c2c8dbe3ca5b92d298b39e96f81eb416d1413 (patch)
treeb123393efbbb06a1e0ebc84385f5964efa98f0b1
parentc6bda7dcc89ae5f7842f0e900d3917024a74eb29 (diff)
downloadsqlalchemy-238c2c8dbe3ca5b92d298b39e96f81eb416d1413.tar.gz
- basic framework for generic functions, [ticket:615]
- changed the various "literal" generation functions to use an anonymous bind parameter. not much changes here except their labels now look like ":param_1", ":param_2" instead of ":literal" - from_obj keyword argument to select() can be a scalar or a list.
-rw-r--r--CHANGES15
-rw-r--r--lib/sqlalchemy/databases/firebird.py6
-rw-r--r--lib/sqlalchemy/engine/base.py2
-rw-r--r--lib/sqlalchemy/sql/compiler.py52
-rw-r--r--lib/sqlalchemy/sql/expression.py37
-rw-r--r--lib/sqlalchemy/sql/functions.py78
-rw-r--r--test/dialect/firebird.py2
-rwxr-xr-xtest/dialect/mssql.py2
-rw-r--r--test/orm/query.py22
-rw-r--r--test/sql/alltests.py1
-rw-r--r--test/sql/functions.py153
-rw-r--r--test/sql/query.py89
-rw-r--r--test/sql/select.py108
13 files changed, 375 insertions, 192 deletions
diff --git a/CHANGES b/CHANGES
index dea30fd21..935bfb6ce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,15 @@ CHANGES
0.4.2
-----
- sql
+ - generic functions ! we introduce a database of known SQL functions, such
+ as current_timestamp, coalesce, and create explicit function objects
+ representing them. These objects have constrained argument lists, are
+ type aware, and can compile in a dialect-specific fashion. So saying
+ func.char_length("foo", "bar") raises an error (too many args),
+ func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15))
+ knows that its return type is a Date. We only have a few functions
+ represented so far but will continue to add to the system [ticket:615]
+
- added new flag to String and create_engine(),
assert_unicode=(True|False|None). Defaults to `False` or `None` on
create_engine() and String, `True` on the Unicode type. When `True`,
@@ -12,9 +21,15 @@ CHANGES
advised that all unicode-aware applications make proper use of Python
unicode objects (i.e. u'hello' and not 'hello').
+ - changed the various "literal" generation functions to use an anonymous
+ bind parameter. not much changes here except their labels now look
+ like ":param_1", ":param_2" instead of ":literal"
+
- column labels in the form "tablename.columname", i.e. with a dot, are now
supported.
+ - from_obj keyword argument to select() can be a scalar or a list.
+
- orm
- new synonym() behavior: an attribute will be placed on the mapped
diff --git a/lib/sqlalchemy/databases/firebird.py b/lib/sqlalchemy/databases/firebird.py
index 7580a750a..3a13c4b69 100644
--- a/lib/sqlalchemy/databases/firebird.py
+++ b/lib/sqlalchemy/databases/firebird.py
@@ -354,11 +354,11 @@ class FBCompiler(compiler.DefaultCompiler):
else:
return self.process(alias.original, **kwargs)
- def apply_function_parens(self, func):
+ def function_argspec(self, func):
if func.clauses:
- return super(FBCompiler, self).apply_function_parens(func)
+ return self.process(func.clause_expr)
else:
- return False
+ return ""
def default_from(self):
return " FROM rdb$database"
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 4e6247810..45e4c036f 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -1353,7 +1353,7 @@ class ResultProxy(object):
if self.context.result_map:
try:
- (name, obj, type_) = self.context.result_map[colname]
+ (name, obj, type_) = self.context.result_map[colname.lower()]
except KeyError:
(name, obj, type_) = (colname, None, typemap.get(item[1], types.NULLTYPE))
else:
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index d94d3fef6..ea7c1b734 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -8,15 +8,10 @@
import string, re
from sqlalchemy import schema, engine, util, exceptions
-from sqlalchemy.sql import operators, visitors
+from sqlalchemy.sql import operators, visitors, functions
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import expression as sql
-ANSI_FUNCS = util.Set([
- 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP',
- 'CURRENT_USER', 'LOCALTIME', 'LOCALTIMESTAMP',
- 'SESSION_USER', 'USER'])
-
RESERVED_WORDS = util.Set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
'as', 'asc', 'asymmetric', 'authorization', 'between',
@@ -87,6 +82,18 @@ OPERATORS = {
operators.isnot : 'IS NOT'
}
+FUNCTIONS = {
+ functions.coalesce : 'coalesce%(expr)s',
+ functions.current_date: 'CURRENT_DATE',
+ functions.current_time: 'CURRENT_TIME',
+ functions.current_timestamp: 'CURRENT_TIMESTAMP',
+ functions.current_user: 'CURRENT_USER',
+ functions.localtime: 'LOCALTIME',
+ functions.localtimestamp: 'LOCALTIMESTAMP',
+ functions.session_user :'SESSION_USER',
+ functions.user: 'USER'
+}
+
class DefaultCompiler(engine.Compiled):
"""Default implementation of Compiled.
@@ -97,6 +104,7 @@ class DefaultCompiler(engine.Compiled):
__traverse_options__ = {'column_collections':False, 'entry':True}
operators = OPERATORS
+ functions = FUNCTIONS
def __init__(self, dialect, statement, column_keys=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
@@ -215,7 +223,7 @@ class DefaultCompiler(engine.Compiled):
labelname = self._truncated_identifier("colident", label.name)
if result_map is not None:
- result_map[labelname] = (label.name, (label, label.obj), label.obj.type)
+ result_map[labelname.lower()] = (label.name, (label, label.obj), label.obj.type)
return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])
@@ -231,7 +239,7 @@ class DefaultCompiler(engine.Compiled):
name = column.name
if result_map is not None:
- result_map[name] = (name, (column, ), column.type)
+ result_map[name.lower()] = (name, (column, ), column.type)
if column._is_oid:
n = self.dialect.oid_column_name(column)
@@ -270,7 +278,7 @@ class DefaultCompiler(engine.Compiled):
def visit_textclause(self, textclause, **kwargs):
if textclause.typemap is not None:
for colname, type_ in textclause.typemap.iteritems():
- self.result_map[colname] = (colname, None, type_)
+ self.result_map[colname.lower()] = (colname, None, type_)
def do_bindparam(m):
name = m.group(1)
@@ -297,9 +305,6 @@ class DefaultCompiler(engine.Compiled):
sep = " " + self.operator_string(clauselist.operator) + " "
return sep.join([s for s in [self.process(c) for c in clauselist.clauses] if s is not None])
- def apply_function_parens(self, func):
- return func.name.upper() not in ANSI_FUNCS or len(func.clauses) > 0
-
def visit_calculatedclause(self, clause, **kwargs):
return self.process(clause.clause_expr)
@@ -308,12 +313,20 @@ class DefaultCompiler(engine.Compiled):
def visit_function(self, func, result_map=None, **kwargs):
if result_map is not None:
- result_map[func.name] = (func.name, None, func.type)
+ result_map[func.name.lower()] = (func.name, None, func.type)
+
+ name = self.function_string(func)
- if not self.apply_function_parens(func):
- return ".".join(func.packagenames + [func.name])
+ if callable(name):
+ return name(*[self.process(x) for x in func.clause_expr])
else:
- return ".".join(func.packagenames + [func.name]) + (not func.group and " " or "") + self.process(func.clause_expr)
+ return ".".join(func.packagenames + [name]) % {'expr':self.function_argspec(func)}
+
+ def function_argspec(self, func):
+ return self.process(func.clause_expr)
+
+ def function_string(self, func):
+ return self.functions.get(func.__class__, func.name + "%(expr)s")
def visit_compound_select(self, cs, asfrom=False, parens=True, **kwargs):
stack_entry = {'select':cs}
@@ -358,10 +371,11 @@ class DefaultCompiler(engine.Compiled):
def operator_string(self, operator):
return self.operators.get(operator, str(operator))
-
+
def visit_bindparam(self, bindparam, **kwargs):
- # apply truncation to the ultimate generated name
-
+ # TODO: remove this whole "unique" thing, just use regular
+ # anonymous params to implement. params used for inserts/updates
+ # etc. should no longer be "unique".
if bindparam.unique:
count = 1
key = bindparam.key
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index c21b102c7..6ad29218f 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -26,10 +26,11 @@ to stay the same in future releases.
"""
import re
+import datetime
from sqlalchemy import util, exceptions
from sqlalchemy.sql import operators, visitors
from sqlalchemy import types as sqltypes
-
+functions = None
__all__ = [
'Alias', 'ClauseElement',
@@ -596,7 +597,7 @@ def literal(value, type_=None):
bind-parameter translation for this literal.
"""
- return _BindParamClause('literal', value, type_=type_, unique=True)
+ return _BindParamClause(None, value, type_=type_, unique=True)
def label(name, obj):
"""Return a [sqlalchemy.sql.expression#_Label] object for the given [sqlalchemy.sql.expression#ColumnElement].
@@ -766,6 +767,14 @@ class _FunctionGenerator(object):
def __call__(self, *c, **kwargs):
o = self.opts.copy()
o.update(kwargs)
+ if len(self.__names) == 1:
+ global functions
+ if functions is None:
+ from sqlalchemy.sql import functions
+ func = getattr(functions, self.__names[-1].lower(), None)
+ if func is not None:
+ return func(*c, **o)
+
return _Function(self.__names[-1], packagenames=self.__names[0:-1], *c, **o)
func = _FunctionGenerator()
@@ -798,7 +807,7 @@ def _literal_as_column(element):
else:
return element
-def _literal_as_binds(element, name='literal', type_=None):
+def _literal_as_binds(element, name=None, type_=None):
if isinstance(element, Operators):
return element.expression_element()
elif _is_literal(element):
@@ -1344,7 +1353,7 @@ class _CompareMixin(ColumnOperators):
return lambda other: self.__operate(operator, other)
def _bind_param(self, obj):
- return _BindParamClause('literal', obj, type_=self.type, unique=True)
+ return _BindParamClause(None, obj, type_=self.type, unique=True)
def _check_literal(self, other):
if isinstance(other, _BindParamClause) and isinstance(other.type, sqltypes.NullType):
@@ -1762,6 +1771,10 @@ class _BindParamClause(ClauseElement, _CompareMixin):
unicode : sqltypes.NCHAR,
int : sqltypes.Integer,
float : sqltypes.Numeric,
+ datetime.date : sqltypes.Date,
+ datetime.datetime : sqltypes.DateTime,
+ datetime.time : sqltypes.Time,
+ datetime.timedelta : sqltypes.Interval,
type(None):sqltypes.NullType
}
@@ -1997,11 +2010,12 @@ class _Function(_CalculatedClause, FromClause):
def __init__(self, name, *clauses, **kwargs):
self.packagenames = kwargs.get('packagenames', None) or []
self.oid_column = None
- kwargs['operator'] = operators.comma_op
- _CalculatedClause.__init__(self, name, **kwargs)
- for c in clauses:
- self.append(c)
-
+ self.name = name
+ self._bind = kwargs.get('bind', None)
+ args = [_literal_as_binds(c, self.name) for c in clauses]
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group()
+ self.type = sqltypes.to_instance(kwargs.get('type_', None))
+
key = property(lambda self:self.name)
columns = property(lambda self:[self])
@@ -2012,9 +2026,6 @@ class _Function(_CalculatedClause, FromClause):
def get_children(self, **kwargs):
return _CalculatedClause.get_children(self, **kwargs)
- def append(self, clause):
- self.clauses.append(_literal_as_binds(clause, self.name))
-
class _Cast(ColumnElement):
@@ -2984,7 +2995,7 @@ class Select(_SelectBaseMixin, FromClause):
if from_obj:
self._froms = util.Set([
_is_literal(f) and _TextFromClause(f) or f
- for f in from_obj
+ for f in util.to_list(from_obj)
])
else:
self._froms = util.Set()
diff --git a/lib/sqlalchemy/sql/functions.py b/lib/sqlalchemy/sql/functions.py
new file mode 100644
index 000000000..d39032b91
--- /dev/null
+++ b/lib/sqlalchemy/sql/functions.py
@@ -0,0 +1,78 @@
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.expression import _Function, _literal_as_binds, ClauseList, _FigureVisitName
+from sqlalchemy.sql import operators
+
+class _GenericMeta(_FigureVisitName):
+ def __init__(cls, clsname, bases, dict):
+ cls.__visit_name__ = 'function'
+ type.__init__(cls, clsname, bases, dict)
+
+ def __call__(self, *args, **kwargs):
+ args = [_literal_as_binds(c) for c in args]
+ return type.__call__(self, *args, **kwargs)
+
+class GenericFunction(_Function):
+ __metaclass__ = _GenericMeta
+
+ def __init__(self, type_=None, group=True, args=(), **kwargs):
+ self.packagenames = []
+ self.oid_column = None
+ self.name = self.__class__.__name__
+ self._bind = kwargs.get('bind', None)
+ if group:
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args).self_group()
+ else:
+ self.clause_expr = ClauseList(operator=operators.comma_op, group_contents=True, *args)
+ self.type = sqltypes.to_instance(type_ or getattr(self, '__return_type__', None))
+
+class AnsiFunction(GenericFunction):
+ def __init__(self, **kwargs):
+ GenericFunction.__init__(self, **kwargs)
+
+
+class coalesce(GenericFunction):
+ def __init__(self, *args, **kwargs):
+ kwargs.setdefault('type_', _type_from_args(args))
+ GenericFunction.__init__(self, args=args, **kwargs)
+
+class concat(GenericFunction):
+ __return_type__ = sqltypes.String
+ def __init__(self, *args, **kwargs):
+ GenericFunction.__init__(self, args=args, **kwargs)
+
+class char_length(GenericFunction):
+ __return_type__ = sqltypes.Integer
+
+ def __init__(self, arg, **kwargs):
+ GenericFunction.__init__(self, args=[arg], **kwargs)
+
+class current_date(AnsiFunction):
+ __return_type__ = sqltypes.Date
+
+class current_time(AnsiFunction):
+ __return_type__ = sqltypes.Time
+
+class current_timestamp(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class current_user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+class localtime(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class localtimestamp(AnsiFunction):
+ __return_type__ = sqltypes.DateTime
+
+class session_user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+class user(AnsiFunction):
+ __return_type__ = sqltypes.String
+
+def _type_from_args(args):
+ for a in args:
+ if not isinstance(a.type, sqltypes.NullType):
+ return a.type
+ else:
+ return sqltypes.NullType \ No newline at end of file
diff --git a/test/dialect/firebird.py b/test/dialect/firebird.py
index 9d043153c..0abbdf029 100644
--- a/test/dialect/firebird.py
+++ b/test/dialect/firebird.py
@@ -22,7 +22,7 @@ class CompileTest(SQLCompileTest):
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
- self.assert_compile(func.current_time(), "current_time")
+ self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo")
m = MetaData()
diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py
index 05d9efd78..3b40ed354 100755
--- a/test/dialect/mssql.py
+++ b/test/dialect/mssql.py
@@ -46,7 +46,7 @@ class CompileTest(SQLCompileTest):
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)")
- self.assert_compile(func.current_time(), "current_time")
+ self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo()")
m = MetaData()
diff --git a/test/orm/query.py b/test/orm/query.py
index ef9e3540d..f1b40e0b1 100644
--- a/test/orm/query.py
+++ b/test/orm/query.py
@@ -170,13 +170,13 @@ class OperatorTest(QueryTest):
):
for (lhs, rhs, res) in (
(5, User.id, ':users_id %s users.id'),
- (5, literal(6), ':literal %s :literal_1'),
+ (5, literal(6), ':param_1 %s :param_2'),
(User.id, 5, 'users.id %s :users_id'),
- (User.id, literal('b'), 'users.id %s :literal'),
+ (User.id, literal('b'), 'users.id %s :param_1'),
(User.id, User.id, 'users.id %s users.id'),
- (literal(5), 'b', ':literal %s :literal_1'),
- (literal(5), User.id, ':literal %s users.id'),
- (literal(5), literal(6), ':literal %s :literal_1'),
+ (literal(5), 'b', ':param_1 %s :param_2'),
+ (literal(5), User.id, ':param_1 %s users.id'),
+ (literal(5), literal(6), ':param_1 %s :param_2'),
):
self._test(py_op(lhs, rhs), res % sql_op)
@@ -190,13 +190,13 @@ class OperatorTest(QueryTest):
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', User.id, ':users_id', 'users.id'),
- ('a', literal('b'), ':literal_1', ':literal'), # note swap!
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
(User.id, 'b', 'users.id', ':users_id'),
- (User.id, literal('b'), 'users.id', ':literal'),
+ (User.id, literal('b'), 'users.id', ':param_1'),
(User.id, User.id, 'users.id', 'users.id'),
- (literal('a'), 'b', ':literal', ':literal_1'),
- (literal('a'), User.id, ':literal', 'users.id'),
- (literal('a'), literal('b'), ':literal', ':literal_1'),
+ (literal('a'), 'b', ':param_1', ':param_2'),
+ (literal('a'), User.id, ':param_1', 'users.id'),
+ (literal('a'), literal('b'), ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
@@ -224,7 +224,7 @@ class OperatorTest(QueryTest):
for (expr, compare) in (
(func.max(User.id), "max(users.id)"),
(User.id.desc(), "users.id DESC"),
- (between(5, User.id, Address.id), ":literal BETWEEN users.id AND addresses.id"),
+ (between(5, User.id, Address.id), ":param_1 BETWEEN users.id AND addresses.id"),
# this one would require adding compile() to InstrumentedScalarAttribute. do we want this ?
#(User.id, "users.id")
):
diff --git a/test/sql/alltests.py b/test/sql/alltests.py
index a669a25f2..5f5c68904 100644
--- a/test/sql/alltests.py
+++ b/test/sql/alltests.py
@@ -17,6 +17,7 @@ def suite():
'sql.unicode',
# assorted round-trip tests
+ 'sql.functions',
'sql.query',
'sql.quote',
'sql.rowcount',
diff --git a/test/sql/functions.py b/test/sql/functions.py
new file mode 100644
index 000000000..177a308b4
--- /dev/null
+++ b/test/sql/functions.py
@@ -0,0 +1,153 @@
+import testbase
+import datetime
+from sqlalchemy import *
+from sqlalchemy import exceptions, sql
+from sqlalchemy.sql.compiler import BIND_TEMPLATES
+from sqlalchemy.engine import default
+from sqlalchemy import types as sqltypes
+from testlib import *
+
+# TODO: add a helper function to testlib for this
+from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
+dialects = [x.dialect() for x in [sqlite, postgres, mysql, oracle, firebird, mssql]]
+
+class CompileTest(SQLCompileTest):
+ def test_compile(self):
+ for dialect in dialects:
+ bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
+ self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect)
+ self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
+ if isinstance(dialect, firebird.dialect):
+ self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect)
+ else:
+ self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
+ self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect)
+
+ def test_constructor(self):
+ try:
+ func.current_timestamp('somearg')
+ assert False
+ except TypeError:
+ assert True
+
+ try:
+ func.char_length('a', 'b')
+ assert False
+ except TypeError:
+ assert True
+
+ try:
+ func.char_length()
+ assert False
+ except TypeError:
+ assert True
+
+ def test_typing(self):
+ assert isinstance(func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)).type, sqltypes.Date)
+
+ assert isinstance(func.coalesce(None, datetime.date(2005, 10, 15)).type, sqltypes.Date)
+
+ assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
+
+class ExecuteTest(PersistTest):
+
+ def test_standalone_execute(self):
+ x = testbase.db.func.current_date().execute().scalar()
+ y = testbase.db.func.current_date().select().execute().scalar()
+ z = testbase.db.func.current_date().scalar()
+ assert (x == y == z) is True
+
+ # ansi func
+ x = testbase.db.func.current_date()
+ assert isinstance(x.type, Date)
+ assert isinstance(x.execute().scalar(), datetime.date)
+
+ def test_conn_execute(self):
+ conn = testbase.db.connect()
+ try:
+ x = conn.execute(func.current_date()).scalar()
+ y = conn.execute(func.current_date().select()).scalar()
+ z = conn.scalar(func.current_date())
+ finally:
+ conn.close()
+ assert (x == y == z) is True
+
+ def test_update(self):
+ """
+ Tests sending functions and SQL expressions to the VALUES and SET
+ clauses of INSERT/UPDATE instances, and that column-level defaults
+ get overridden.
+ """
+
+ meta = MetaData(testbase.db)
+ t = Table('t1', meta,
+ Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
+ Column('value', Integer)
+ )
+ t2 = Table('t2', meta,
+ Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True),
+ Column('value', Integer, default=7),
+ Column('stuff', String(20), onupdate="thisisstuff")
+ )
+ meta.create_all()
+ try:
+ t.insert(values=dict(value=func.length("one"))).execute()
+ assert t.select().execute().fetchone()['value'] == 3
+ t.update(values=dict(value=func.length("asfda"))).execute()
+ assert t.select().execute().fetchone()['value'] == 5
+
+ r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
+ id = r.last_inserted_ids()[0]
+ assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
+ t.update(values={t.c.value:func.length("asdf")}).execute()
+ assert t.select().execute().fetchone()['value'] == 4
+ print "--------------------------"
+ t2.insert().execute()
+ t2.insert(values=dict(value=func.length("one"))).execute()
+ t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi")
+
+ res = exec_sorted(select([t2.c.value, t2.c.stuff]))
+ self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
+
+ t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
+
+ t2.delete().execute()
+
+ t2.insert(values=dict(value=func.length("one") + 8)).execute()
+ assert t2.select().execute().fetchone()['value'] == 11
+
+ t2.update(values=dict(value=func.length("asfda"))).execute()
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
+
+ t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
+ print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
+ assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
+
+ finally:
+ meta.drop_all()
+
+ @testing.supported('postgres')
+ def test_as_from(self):
+ # TODO: shouldnt this work on oracle too ?
+ x = testbase.db.func.current_date().execute().scalar()
+ y = testbase.db.func.current_date().select().execute().scalar()
+ z = testbase.db.func.current_date().scalar()
+ w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
+
+ # construct a column-based FROM object out of a function, like in [ticket:172]
+ s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
+ q = s.execute().fetchone()[s.c.date]
+ r = s.alias('datequery').select().scalar()
+
+ assert x == y == z == w == q == r
+
+def exec_sorted(statement, *args, **kw):
+ """Executes a statement and returns a sorted list plain tuple rows."""
+
+ return sorted([tuple(row)
+ for row in statement.execute(*args, **kw).fetchall()])
+
+if __name__ == '__main__':
+ testbase.main()
+ \ No newline at end of file
diff --git a/test/sql/query.py b/test/sql/query.py
index fa88c5fe6..9b35cff1c 100644
--- a/test/sql/query.py
+++ b/test/sql/query.py
@@ -417,95 +417,6 @@ class QueryTest(PersistTest):
except exceptions.ArgumentError, e:
assert str(e).startswith('Not an executable clause: ')
- def test_functions(self):
- x = testbase.db.func.current_date().execute().scalar()
- y = testbase.db.func.current_date().select().execute().scalar()
- z = testbase.db.func.current_date().scalar()
- assert (x == y == z) is True
-
- x = testbase.db.func.current_date(type_=Date)
- assert isinstance(x.type, Date)
- assert isinstance(x.execute().scalar(), datetime.date)
-
- def test_conn_functions(self):
- conn = testbase.db.connect()
- try:
- x = conn.execute(func.current_date()).scalar()
- y = conn.execute(func.current_date().select()).scalar()
- z = conn.scalar(func.current_date())
- finally:
- conn.close()
- assert (x == y == z) is True
-
- def test_update_functions(self):
- """
- Tests sending functions and SQL expressions to the VALUES and SET
- clauses of INSERT/UPDATE instances, and that column-level defaults
- get overridden.
- """
-
- meta = MetaData(testbase.db)
- t = Table('t1', meta,
- Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
- Column('value', Integer)
- )
- t2 = Table('t2', meta,
- Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True),
- Column('value', Integer, default=7),
- Column('stuff', String(20), onupdate="thisisstuff")
- )
- meta.create_all()
- try:
- t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().fetchone()['value'] == 3
- t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().fetchone()['value'] == 5
-
- r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
- id = r.last_inserted_ids()[0]
- assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
- t.update(values={t.c.value:func.length("asdf")}).execute()
- assert t.select().execute().fetchone()['value'] == 4
- print "--------------------------"
- t2.insert().execute()
- t2.insert(values=dict(value=func.length("one"))).execute()
- t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi")
-
- res = exec_sorted(select([t2.c.value, t2.c.stuff]))
- self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
-
- t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
- assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
-
- t2.delete().execute()
-
- t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().fetchone()['value'] == 11
-
- t2.update(values=dict(value=func.length("asfda"))).execute()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
-
- t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
- print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
-
- finally:
- meta.drop_all()
-
- @testing.supported('postgres')
- def test_functions_with_cols(self):
- # TODO: shouldnt this work on oracle too ?
- x = testbase.db.func.current_date().execute().scalar()
- y = testbase.db.func.current_date().select().execute().scalar()
- z = testbase.db.func.current_date().scalar()
- w = select(['*'], from_obj=[testbase.db.func.current_date()]).scalar()
-
- # construct a column-based FROM object out of a function, like in [ticket:172]
- s = select([sql.column('date', type_=DateTime)], from_obj=[testbase.db.func.current_date()])
- q = s.execute().fetchone()[s.c.date]
- r = s.alias('datequery').select().scalar()
-
- assert x == y == z == w == q == r
def test_column_order_with_simple_query(self):
diff --git a/test/sql/select.py b/test/sql/select.py
index f9aa21f1e..58c4ea3dd 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -150,9 +150,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def testexistsascolumnclause(self):
self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5})
- self.assert_compile(select([table1, exists([1], from_obj=[table2])]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
+ self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={})
- self.assert_compile(select([table1, exists([1], from_obj=[table2]).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
+ self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
def test_generative_exists(self):
self.assert_compile(
@@ -178,7 +178,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def testwheresubquery(self):
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
- select([users, s.c.street], from_obj=[s]),
+ select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
self.assert_compile(
@@ -205,11 +205,11 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
self.assert_compile(
- select([users, s.c.street], from_obj=[s]),
+ select([users, s.c.street], from_obj=s),
"""SELECT users.user_id, users.user_name, users.password, s.street FROM users, (SELECT addresses.street AS street FROM addresses WHERE addresses.user_id = users.user_id) AS s""")
# test constructing the outer query via append_column(), which occurs in the ORM's Query object
- s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=[table1])
+ s = select([], exists([1], table2.c.otherid==table1.c.myid), from_obj=table1)
s.append_column(table1)
self.assert_compile(
s,
@@ -242,9 +242,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
# test expressions against scalar selects
- self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal AS anon_1")
- self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal AS anon_1")
- self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal AS anon_1")
+ self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :param_2 AS anon_1")
+ self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :param_2 AS anon_1")
+ self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :param_2 AS anon_1")
self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo")
@@ -293,12 +293,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
a1 = table2.alias('t2alias')
s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True)
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
- s2 = select([table1, s1], from_obj=[j1])
+ s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
def testlabelcomparison(self):
x = func.lala(table1.c.myid).label('foo')
- self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :literal")
+ self.assert_compile(select([x], x==5), "SELECT lala(mytable.myid) AS foo FROM mytable WHERE lala(mytable.myid) = :param_1")
def testand(self):
self.assert_compile(
@@ -345,13 +345,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
):
for (lhs, rhs, res) in (
(5, table1.c.myid, ':mytable_myid %s mytable.myid'),
- (5, literal(5), ':literal %s :literal_1'),
+ (5, literal(5), ':param_1 %s :param_2'),
(table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'),
- (table1.c.myid, literal(2.7), 'mytable.myid %s :literal'),
+ (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
- (literal(5), 8, ':literal %s :literal_1'),
- (literal(6), table1.c.myid, ':literal %s mytable.myid'),
- (literal(7), literal(5.5), ':literal %s :literal_1'),
+ (literal(5), 8, ':param_1 %s :param_2'),
+ (literal(6), table1.c.myid, ':param_1 %s mytable.myid'),
+ (literal(7), literal(5.5), ':param_1 %s :param_2'),
):
self.assert_compile(py_op(lhs, rhs), res % sql_op)
@@ -364,13 +364,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
(operator.ge, '>=', '<=')):
for (lhs, rhs, l_sql, r_sql) in (
('a', table1.c.myid, ':mytable_myid', 'mytable.myid'),
- ('a', literal('b'), ':literal_1', ':literal'), # note swap!
+ ('a', literal('b'), ':param_2', ':param_1'), # note swap!
(table1.c.myid, 'b', 'mytable.myid', ':mytable_myid'),
- (table1.c.myid, literal('b'), 'mytable.myid', ':literal'),
+ (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
- (literal('a'), 'b', ':literal', ':literal_1'),
- (literal('a'), table1.c.myid, ':literal', 'mytable.myid'),
- (literal('a'), literal('b'), ':literal', ':literal_1'),
+ (literal('a'), 'b', ':param_1', ':param_2'),
+ (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'),
+ (literal('a'), literal('b'), ':param_1', ':param_2'),
):
# the compiled clause should match either (e.g.):
@@ -404,13 +404,13 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
)
self.assert_compile(
- literal("a") + literal("b") * literal("c"), ":literal || :literal_1 * :literal_2"
+ literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
)
# test the op() function, also that its results are further usable in expressions
self.assert_compile(
table1.select(table1.c.myid.op('hoho')(12)==14),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :literal"
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :param_1"
)
# test that clauses can be pickled (operators need to be module-level, etc.)
@@ -555,19 +555,19 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
def testtextcolumns(self):
self.assert_compile(
- select(["column1", "column2"], from_obj=[table1]).alias('somealias').select(),
+ select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
"SELECT somealias.column1, somealias.column2 FROM (SELECT column1, column2 FROM mytable) AS somealias"
)
# test that use_labels doesnt interfere with literal columns
self.assert_compile(
- select(["column1", "column2", table1.c.myid], from_obj=[table1], use_labels=True),
+ select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable"
)
# test that use_labels doesnt interfere with literal columns that have textual labels
self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1], use_labels=True),
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True),
"SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
)
@@ -640,7 +640,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
def testliteral(self):
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
- "SELECT :literal || :literal_1 AS anon_1 FROM mytable")
+ "SELECT :param_2 || :param_3 AS anon_1 FROM mytable")
def testcalculatedcolumns(self):
value_tbl = table('values',
@@ -658,19 +658,19 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
self.assert_compile(
select([value_tbl.c.id], (value_tbl.c.val2 -
value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :literal"
+ "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1"
)
self.assert_compile(
select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
- "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :literal"
+ "SELECT values.id FROM values WHERE values.val1 / (values.val2 - values.val1) / values.val1 > :param_1"
)
def testfunction(self):
"""tests the generation of functions using the func keyword"""
# test an expression with a function
self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
- "lala(:lala, :lala_1, :literal, mytable.myid) * myothertable.otherid")
+ "lala(:lala, :lala_1, :param_1, mytable.myid) * myothertable.otherid")
# test it in a SELECT
self.assert_compile(select([func.count(table1.c.myid)]),
@@ -985,7 +985,7 @@ EXISTS (select yay from foo where boo = lar)",
t = table('foo', column('id'))
s = select([t, literal('lala').label('hoho')])
- self.assert_compile(s, "SELECT foo.id, :literal AS hoho FROM foo")
+ self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.c] == ["id", "hoho"]
def testin(self):
@@ -1002,31 +1002,31 @@ EXISTS (select yay from foo where boo = lar)",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal + :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal || :literal_1, :literal_2)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)")
self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :literal + :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1 + :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal < :literal_1)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)")
self.assert_compile(select([table1], table1.c.myid.in_([table1.c.myid])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)")
@@ -1035,13 +1035,13 @@ EXISTS (select yay from foo where boo = lar)",
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, mytable.myid + :mytable_myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid)")
self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])),
- "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:literal, :mytable_myid + mytable.myid)")
+ "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid + mytable.myid)")
self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)")
@@ -1115,10 +1115,10 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest")
# first test with Postgres engine
- check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s')
+ check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
# then the Oracle engine
- check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':literal')
+ check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1')
# then the sqlite engine
check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
@@ -1132,7 +1132,7 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
Column('date', Date))
self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)})
- self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :literal AND :literal_1", checkparams={'literal':datetime.date(2006,6,1), 'literal_1':datetime.date(2006,6,5)})
+ self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
table = Table('op', metadata,
@@ -1142,13 +1142,13 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
self.assert_compile(table.select((table.c.field + 5) == table.c.field),
"SELECT op.field FROM op WHERE op.field + :op_field = op.field")
self.assert_compile(table.select((table.c.field + 5) * 6),
- "SELECT op.field FROM op WHERE (op.field + :op_field) * :literal")
+ "SELECT op.field FROM op WHERE (op.field + :op_field) * :param_1")
self.assert_compile(table.select((table.c.field * 5) + 6),
- "SELECT op.field FROM op WHERE op.field * :op_field + :literal")
+ "SELECT op.field FROM op WHERE op.field * :op_field + :param_1")
self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
- "SELECT op.field FROM op WHERE :literal + (op.field IN (:op_field, :op_field_1))")
+ "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field, :op_field_1))")
self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
- "SELECT op.field FROM op WHERE :op_field + op.field IN (:literal, :literal_1)")
+ "SELECT op.field FROM op WHERE :op_field + op.field IN (:param_1, :param_2)")
self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
"SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)")
self.assert_compile(table.select(not_(table.c.field == 5)),
@@ -1156,11 +1156,11 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
"SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field AND :op_field_1)")
self.assert_compile(table.select(not_(table.c.field) == 5),
- "SELECT op.field FROM op WHERE (NOT op.field) = :literal")
+ "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1")
+ "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
- "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :literal AND :literal_1")
+ "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
class CRUDTest(SQLCompileTest):
def testinsert(self):
@@ -1223,8 +1223,8 @@ class CRUDTest(SQLCompileTest):
values = {
table1.c.name : table1.c.name + "lala",
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
- }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :literal), name=(mytable.name || :mytable_name) "
- "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :literal_1 || mytable.name || :literal_2")
+ }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name) "
+ "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :param_2 || mytable.name || :param_3")
def testcorrelatedupdate(self):
# test against a straight text subquery