1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
import testenv; testenv.configure_for_tests()
import re
from sqlalchemy.interfaces import ConnectionProxy
from testlib.sa import MetaData, Table, Column, Integer, String, INT, \
VARCHAR, func, bindparam
import testlib.sa as tsa
from testlib import TestBase, testing, engines
users, metadata = None, None
class ExecuteTest(TestBase):
def setUpAll(self):
global users, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
def tearDown(self):
testing.db.connect().execute(users.delete())
def tearDownAll(self):
metadata.drop_all()
@testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite')
def test_raw_qmark(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"])
conn.execute("insert into users (user_id, user_name) values (?, ?)", [3,"ed"], [4,"horse"])
conn.execute("insert into users (user_id, user_name) values (?, ?)", (5,"barney"), (6,"donkey"))
conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally')
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')]
conn.execute("delete from users")
@testing.fails_on_everything_except('mysql', 'postgres')
# some psycopg2 versions bomb this.
def test_raw_sprintf(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
conn.execute("insert into users (user_id, user_name) values (%s, %s)", [2,"ed"], [3,"horse"])
conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally')
conn.execute("insert into users (user_id) values (%s)", 5)
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally'), (5, None)]
conn.execute("delete from users")
# pyformat is supported for mysql, but skipping because a few driver
# versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
@testing.skip_if(lambda: testing.against('mysql'), 'db-api flaky')
@testing.fails_on_everything_except('postgres')
def test_raw_python(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", id=4, name='sally')
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
@testing.fails_on_everything_except('sqlite', 'oracle')
def test_raw_named(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'})
conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
conn.execute("insert into users (user_id, user_name) values (:id, :name)", id=4, name='sally')
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
def test_exception_wrapping(self):
for conn in (testing.db, testing.db.connect()):
try:
conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef")
assert False
except tsa.exc.DBAPIError:
assert True
@testing.fails_on('mssql', 'rowcount returns -1')
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
result = testing.db.execute(users.insert().values(user_name=bindparam('name')), [])
self.assertEquals(result.rowcount, 1)
class ProxyConnectionTest(TestBase):
@testing.fails_on('firebird', 'Data type unknown')
def test_proxy(self):
stmts = []
cursor_stmts = []
class MyProxy(ConnectionProxy):
def execute(self, conn, execute, clauseelement, *multiparams, **params):
stmts.append(
(str(clauseelement), params,multiparams)
)
return execute(clauseelement, *multiparams, **params)
def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
cursor_stmts.append(
(statement, parameters, None)
)
return execute(cursor, statement, parameters, context)
def assert_stmts(expected, received):
for stmt, params, posn in expected:
if not received:
assert False
while received:
teststmt, testparams, testmultiparams = received.pop(0)
teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip()
if teststmt.startswith(stmt) and (testparams==params or testparams==posn):
break
for engine in (
engines.testing_engine(options=dict(proxy=MyProxy())),
engines.testing_engine(options=dict(proxy=MyProxy(), strategy='threadlocal'))
):
m = MetaData(engine)
t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True))
m.create_all()
try:
t1.insert().execute(c1=5, c2='some data')
t1.insert().execute(c1=6)
assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')]
finally:
m.drop_all()
engine.dispose()
compiled = [
("CREATE TABLE t1", {}, None),
("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None),
("INSERT INTO t1 (c1, c2)", {'c1': 6}, None),
("select * from t1", {}, None),
("DROP TABLE t1", {}, None)
]
if engine.dialect.preexecute_pk_sequences:
cursor = [
("CREATE TABLE t1", {}, None),
("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
("SELECT lower", {'lower_2':'Foo'}, ['Foo']),
("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']),
("select * from t1", {}, None),
("DROP TABLE t1", {}, None)
]
else:
cursor = [
("CREATE TABLE t1", {}, ()),
("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, [6, "Foo"]), # bind param name 'lower_2' might be incorrect
("select * from t1", {}, ()),
("DROP TABLE t1", {}, ())
]
assert_stmts(compiled, stmts)
assert_stmts(cursor, cursor_stmts)
if __name__ == "__main__":
testenv.main()
|