1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
from sqlalchemy import *
from sqlalchemy.testing import fixtures, AssertsExecutionResults
from sqlalchemy import testing
from sqlalchemy.testing import eq_
class FoundRowsTest(fixtures.TestBase, AssertsExecutionResults):
"""tests rowcount functionality"""
__requires__ = ('sane_rowcount', )
__backend__ = True
@classmethod
def setup_class(cls):
global employees_table, metadata
metadata = MetaData(testing.db)
employees_table = Table(
'employees', metadata,
Column(
'employee_id', Integer,
Sequence('employee_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
Column('department', String(1)))
metadata.create_all()
def setup(self):
global data
data = [('Angela', 'A'),
('Andrew', 'A'),
('Anand', 'A'),
('Bob', 'B'),
('Bobette', 'B'),
('Buffy', 'B'),
('Charlie', 'C'),
('Cynthia', 'C'),
('Chris', 'C')]
i = employees_table.insert()
i.execute(*[{'name': n, 'department': d} for n, d in data])
def teardown(self):
employees_table.delete().execute()
@classmethod
def teardown_class(cls):
metadata.drop_all()
def test_basic(self):
s = employees_table.select()
r = s.execute().fetchall()
assert len(r) == len(data)
def test_update_rowcount1(self):
# WHERE matches 3, 3 rows changed
department = employees_table.c.department
r = employees_table.update(department == 'C').execute(department='Z')
assert r.rowcount == 3
def test_update_rowcount2(self):
# WHERE matches 3, 0 rows changed
department = employees_table.c.department
r = employees_table.update(department == 'C').execute(department='C')
assert r.rowcount == 3
def test_raw_sql_rowcount(self):
# test issue #3622, make sure eager rowcount is called for text
with testing.db.connect() as conn:
result = conn.execute(
"update employees set department='Z' where department='C'")
eq_(result.rowcount, 3)
def test_text_rowcount(self):
# test issue #3622, make sure eager rowcount is called for text
with testing.db.connect() as conn:
result = conn.execute(
text(
"update employees set department='Z' "
"where department='C'"))
eq_(result.rowcount, 3)
def test_delete_rowcount(self):
# WHERE matches 3, 3 rows deleted
department = employees_table.c.department
r = employees_table.delete(department == 'C').execute()
assert r.rowcount == 3
@testing.requires.sane_multi_rowcount
def test_multi_update_rowcount(self):
stmt = employees_table.update().\
where(employees_table.c.name == bindparam('emp_name')).\
values(department="C")
r = testing.db.execute(
stmt,
[{"emp_name": "Bob"}, {"emp_name": "Cynthia"},
{"emp_name": "nonexistent"}]
)
eq_(
r.rowcount, 2
)
@testing.requires.sane_multi_rowcount
def test_multi_delete_rowcount(self):
stmt = employees_table.delete().\
where(employees_table.c.name == bindparam('emp_name'))
r = testing.db.execute(
stmt,
[{"emp_name": "Bob"}, {"emp_name": "Cynthia"},
{"emp_name": "nonexistent"}]
)
eq_(
r.rowcount, 2
)
|