1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
from sqlalchemy.testing.assertions import eq_, assert_raises
from sqlalchemy.testing import fixtures
from sqlalchemy import exc, testing
from sqlalchemy.dialects.mysql import insert
from sqlalchemy import Table, Column, Boolean, Integer, String, func
class OnDuplicateTest(fixtures.TablesTest):
__only_on__ = 'mysql',
__backend__ = True
run_define_tables = 'each'
@classmethod
def define_tables(cls, metadata):
Table(
'foos', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('bar', String(10)),
Column('baz', String(10)),
Column('updated_once', Boolean, default=False),
)
def test_bad_args(self):
assert_raises(
ValueError,
insert(self.tables.foos, values={}).on_duplicate_key_update
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos, values={}).on_duplicate_key_update,
{'id': 1, 'bar': 'b'},
id=1,
bar='b',
)
assert_raises(
exc.ArgumentError,
insert(self.tables.foos, values={}).on_duplicate_key_update,
{'id': 1, 'bar': 'b'},
{'id': 2, 'bar': 'baz'},
)
def test_on_duplicate_key_update(self):
foos = self.tables.foos
with testing.db.connect() as conn:
conn.execute(insert(foos, dict(id=1, bar='b', baz='bz')))
stmt = insert(foos).values(
[dict(id=1, bar='ab'), dict(id=2, bar='b')])
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
eq_(result.inserted_primary_key, [2])
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, 'ab', 'bz', False)]
)
def test_on_duplicate_key_update_preserve_order(self):
foos = self.tables.foos
with testing.db.connect() as conn:
conn.execute(insert(foos,
[dict(id=1, bar='b', baz='bz'), dict(id=2, bar='b', baz='bz2')]))
stmt = insert(foos)
update_condition = (foos.c.updated_once == False)
# The following statements show importance of the columns update ordering
# as old values being referenced in UPDATE clause are getting replaced one
# by one from left to right with their new values.
stmt1 = stmt.on_duplicate_key_update([
('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)),
('updated_once', func.if_(update_condition, True, foos.c.updated_once)),
])
stmt2 = stmt.on_duplicate_key_update([
('updated_once', func.if_(update_condition, True, foos.c.updated_once)),
('bar', func.if_(update_condition, func.values(foos.c.bar), foos.c.bar)),
])
# First statement should succeed updating column bar
conn.execute(stmt1, dict(id=1, bar='ab'))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, 'ab', 'bz', True)],
)
# Second statement will do noop update of column bar
conn.execute(stmt2, dict(id=2, bar='ab'))
eq_(
conn.execute(foos.select().where(foos.c.id == 2)).fetchall(),
[(2, 'b', 'bz2', True)]
)
def test_last_inserted_id(self):
foos = self.tables.foos
with testing.db.connect() as conn:
stmt = insert(foos).values({"bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(
bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, [1])
stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
result = conn.execute(
stmt.on_duplicate_key_update(
bar=stmt.inserted.bar, baz="newbz")
)
eq_(result.inserted_primary_key, [1])
|