summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql/functions.py
blob: 7fce3b95b1797cddd552c642f9e8e333aeba016a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from sqlalchemy import types as sqltypes
from sqlalchemy.sql.expression import (
    ClauseList, _FigureVisitName, _Function, _literal_as_binds, text
    )
from sqlalchemy.sql import operators


class _GenericMeta(_FigureVisitName):
    def __init__(cls, clsname, bases, dict):
        cls.__visit_name__ = 'function'
        type.__init__(cls, clsname, bases, dict)

    def __call__(self, *args, **kwargs):
        args = [_literal_as_binds(c) for c in args]
        return type.__call__(self, *args, **kwargs)

class GenericFunction(_Function):
    __metaclass__ = _GenericMeta

    def __init__(self, type_=None, group=True, args=(), **kwargs):
        self.packagenames = []
        self.name = self.__class__.__name__
        self._bind = kwargs.get('bind', None)
        if group:
            self.clause_expr = ClauseList(
                operator=operators.comma_op,
                group_contents=True, *args).self_group()
        else:
            self.clause_expr = ClauseList(
                operator=operators.comma_op,
                group_contents=True, *args)
        self.type = sqltypes.to_instance(
            type_ or getattr(self, '__return_type__', None))

class AnsiFunction(GenericFunction):
    def __init__(self, **kwargs):
        GenericFunction.__init__(self, **kwargs)


class coalesce(GenericFunction):
    def __init__(self, *args, **kwargs):
        kwargs.setdefault('type_', _type_from_args(args))
        GenericFunction.__init__(self, args=args, **kwargs)

class now(GenericFunction):
    __return_type__ = sqltypes.DateTime

class concat(GenericFunction):
    __return_type__ = sqltypes.String
    def __init__(self, *args, **kwargs):
        GenericFunction.__init__(self, args=args, **kwargs)

class char_length(GenericFunction):
    __return_type__ = sqltypes.Integer

    def __init__(self, arg, **kwargs):
        GenericFunction.__init__(self, args=[arg], **kwargs)

class random(GenericFunction):
    def __init__(self, *args, **kwargs):
        kwargs.setdefault('type_', None)
        GenericFunction.__init__(self, args=args, **kwargs)

class count(GenericFunction):
    """The ANSI COUNT aggregate function.  With no arguments, emits COUNT *."""

    __return_type__ = sqltypes.Integer

    def __init__(self, expression=None, **kwargs):
        if expression is None:
            expression = text('*')
        GenericFunction.__init__(self, args=(expression,), **kwargs)

class current_date(AnsiFunction):
    __return_type__ = sqltypes.Date

class current_time(AnsiFunction):
    __return_type__ = sqltypes.Time

class current_timestamp(AnsiFunction):
    __return_type__ = sqltypes.DateTime

class current_user(AnsiFunction):
    __return_type__ = sqltypes.String

class localtime(AnsiFunction):
    __return_type__ = sqltypes.DateTime

class localtimestamp(AnsiFunction):
    __return_type__ = sqltypes.DateTime

class session_user(AnsiFunction):
    __return_type__ = sqltypes.String

class sysdate(AnsiFunction):
    __return_type__ = sqltypes.DateTime

class user(AnsiFunction):
    __return_type__ = sqltypes.String

def _type_from_args(args):
    for a in args:
        if not isinstance(a.type, sqltypes.NullType):
            return a.type
    else:
        return sqltypes.NullType