1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
import re, inspect
from decorator import FunctionMaker
class _SymbolReplacer(object):
"""
A small internal class to parse SQL templates with names arguments.
Returns the names of the arguments and the template interpolated with
a placeholder. Used by get_args_templ.
"""
STRING_OR_COMMENT = re.compile(r"('[^']*'|--.*\n)")
SYMBOL = re.compile(r"(?<!:):(\w+)") # a name prefixed by colons
def __init__(self, placeholder):
self.placeholder = placeholder
self.replaced = []
self.found = set()
def get_args_templ(self, templ):
argnames = []
def repl(mo):
"Replace named args with placeholders"
argname = mo.group(1)
if argname in argnames:
raise NameError('Duplicate argument %r in SQL template'
% argname)
argnames.append(argname)
return self.placeholder or mo.group()
out = []
for i, chunk in enumerate(self.STRING_OR_COMMENT.split(templ)):
if i % 2 == 0: # real sql code
chunk = self.SYMBOL.sub(repl, chunk)
out.append(chunk)
return argnames, ''.join(out)
templ_cache = {}
# used in .execute and do
def get_args_templ(templ, placeholder=None):
# this is small hack instead of a full featured SQL parser
"""
Take a SQL template and replace named arguments with the placeholder, except
in strings and comments. Return the replaced arguments and the new
template. The results are cached.
>>> args, templ = get_args_templ('INSERT INTO book (:title, :author)')
>>> print args
['title', 'author']
>>> print templ
INSERT INTO book (:title, :author)
>>> print get_args_templ('INSERT INTO book (:title, :author)', '?')[1]
INSERT INTO book (?, ?)
"""
if (templ, placeholder) in templ_cache:
return templ_cache[templ, placeholder]
argnames, new_templ = _SymbolReplacer(placeholder).get_args_templ(templ)
templ_cache[templ, placeholder] = argnames, new_templ
return argnames, new_templ
def do(templ, name='sqlquery', defaults=None, scalar=False, ntuple=None):
"""
Compile a SQL query template down to a Python function with attributes
__source__, argnames defaults, scalar, ntuple. defaults is a tuple.
"""
argnames = ', '.join(get_args_templ(templ)[0])
if argnames:
argnames += ','
src = '''def %(name)s(conn, %(argnames)s):
return conn.execute(templ, (%(argnames)s), scalar=scalar, ntuple=ntuple)
''' % locals()
fn = FunctionMaker(
name=name, signature=argnames, defaults=defaults, doc=templ).make(
src, dict(templ=templ, scalar=scalar, ntuple=ntuple), addsource=True)
comment = '# ntuple = %s\n# scalar = %s\n# templ=\n%s\n' % (
ntuple, scalar, '\n'.join('## ' + ln for ln in templ.splitlines()))
fn.__source__ = '%s\n%s' % (comment, fn.__source__)
fn.templ = templ
fn.argnames = argnames
fn.defaults = defaults
fn.scalar = scalar
fn.ntuple = ntuple
return fn
def spec(fn, clause, argnames=None, defaults=None, ntuple=None):
"Add a clause to an SQL Template function"
return do(fn.templ + clause, argnames = argnames or fn.argnames,
defaults=defaults or fn.defaults,
scalar=fn.scalar, ntuple=ntuple or fn.ntuple)
if __name__ == '__main__':
import doctest; doctest.testmod()
|