1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
/*-
* Copyright (c) 2008-2012 WiredTiger, Inc.
* All rights reserved.
*
* See the file LICENSE for redistribution information.
*/
#include "wt_internal.h"
/*
* __wt_txnid_cmp --
* Compare transaction IDs for sorting / searching.
*/
int
__wt_txnid_cmp(const void *v1, const void *v2)
{
wt_txnid_t id1, id2;
id1 = *(wt_txnid_t *)v1;
id2 = *(wt_txnid_t *)v2;
return ((id1 == id2) ? 0 : TXNID_LT(id1, id2) ? -1 : 1);
}
/*
* __txn_sort_snapshot --
* Sort a snapshot for faster searching and set the min/max bounds.
*/
static void
__txn_sort_snapshot(WT_SESSION_IMPL *session, uint32_t n, wt_txnid_t id)
{
WT_TXN *txn;
txn = &session->txn;
qsort(txn->snapshot, n, sizeof(wt_txnid_t), __wt_txnid_cmp);
txn->snapshot_count = n;
txn->snap_min = (n == 0) ? id : txn->snapshot[0];
txn->snap_max = (n == 0) ? id : txn->snapshot[n - 1];
WT_ASSERT(session, txn->snap_min != WT_TXN_NONE);
}
/*
* __wt_txn_get_snapshot --
* Set up a snapshot in the current transaction, without allocating an ID.
*/
int
__wt_txn_get_snapshot(WT_SESSION_IMPL *session, wt_txnid_t max_id)
{
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
wt_txnid_t current_id, id;
uint32_t i, n, session_cnt;
conn = S2C(session);
n = 0;
txn = &session->txn;
txn_global = &conn->txn_global;
do {
/* Take a copy of the current session ID. */
current_id = txn_global->current;
/* Copy the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
for (i = 0; i < session_cnt; i++)
if ((id = txn_global->ids[i]) != WT_TXN_NONE &&
(max_id == WT_TXN_NONE || TXNID_LT(id, max_id)))
txn->snapshot[n++] = id;
} while (current_id != txn_global->current);
__txn_sort_snapshot(
session, n, (max_id != WT_TXN_NONE) ? max_id : current_id);
return (0);
}
/*
* __wt_txn_begin --
* Begin a transaction.
*/
int
__wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[])
{
WT_CONFIG_ITEM cval;
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
wt_txnid_t id;
uint32_t i, n, session_cnt;
conn = S2C(session);
n = 0;
txn = &session->txn;
txn_global = &conn->txn_global;
if (F_ISSET(txn, TXN_RUNNING))
WT_RET_MSG(session, EINVAL, "Transaction already running");
WT_RET(__wt_config_gets(session, cfg, "isolation", &cval));
txn->isolation = (__wt_config_strcmp(&cval, "snapshot") == 0) ?
TXN_ISO_SNAPSHOT : TXN_ISO_READ_UNCOMMITTED;
WT_ASSERT(session, txn->id == WT_TXN_NONE);
WT_ASSERT(session, txn_global->ids[session->id] == WT_TXN_NONE);
do {
/* Take a copy of the current session ID. */
txn->id = txn_global->current;
WT_PUBLISH(txn_global->ids[session->id], txn->id);
if (txn->isolation == TXN_ISO_SNAPSHOT) {
/* Copy the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
for (i = 0; i < session_cnt; i++)
if ((id = txn_global->ids[i]) != WT_TXN_NONE &&
TXNID_LT(id, txn->id))
txn->snapshot[n++] = id;
}
} while (!WT_ATOMIC_CAS(txn_global->current, txn->id, txn->id + 1) ||
txn->id == WT_TXN_NONE || txn->id == WT_TXN_ABORTED);
if (txn->isolation == TXN_ISO_SNAPSHOT)
__txn_sort_snapshot(session, n, txn->id);
F_SET(txn, TXN_RUNNING);
return (0);
}
/*
* __txn_release --
* Release the resources associated with the current transaction.
*/
static int
__txn_release(WT_SESSION_IMPL *session)
{
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
txn = &session->txn;
txn->mod_count = 0;
if (!F_ISSET(txn, TXN_RUNNING))
WT_RET_MSG(session, EINVAL, "No transaction is active");
/* Clear the transaction's ID from the global table. */
txn_global = &S2C(session)->txn_global;
WT_ASSERT(session, txn_global->ids[session->id] != WT_TXN_NONE &&
txn->id != WT_TXN_NONE);
WT_PUBLISH(txn_global->ids[session->id], txn->id = WT_TXN_NONE);
/* Reset the transaction state to not running. */
txn->isolation = TXN_ISO_READ_UNCOMMITTED;
F_CLR(txn, TXN_ERROR | TXN_RUNNING);
return (0);
}
/*
* __wt_txn_commit --
* Commit the current transaction.
*/
int
__wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
{
WT_UNUSED(cfg);
return (__txn_release(session));
}
/*
* __wt_txn_rollback --
* Roll back the current transaction.
*/
int
__wt_txn_rollback(WT_SESSION_IMPL *session, const char *cfg[])
{
WT_TXN *txn;
wt_txnid_t **m;
u_int i;
WT_UNUSED(cfg);
txn = &session->txn;
for (i = 0, m = txn->mod; i < txn->mod_count; i++, m++)
**m = WT_TXN_ABORTED;
return (__txn_release(session));
}
/*
* __wt_txn_checkpoint --
* Write a checkpoint.
*/
int
__wt_txn_checkpoint(WT_SESSION_IMPL *session, const char *cfg[])
{
WT_CONFIG_ITEM cval;
WT_DECL_RET;
WT_TXN_GLOBAL *txn_global;
const char *snapshot;
const char *txn_cfg[] = { "isolation=snapshot", NULL };
txn_global = &S2C(session)->txn_global;
if ((ret = __wt_config_gets(
session, cfg, "snapshot", &cval)) != 0 && ret != WT_NOTFOUND)
WT_RET(ret);
if (cval.len != 0)
WT_RET(__wt_strndup(session, cval.str, cval.len, &snapshot));
else
snapshot = NULL;
/* Only one checkpoint can be active at a time. */
__wt_writelock(session, S2C(session)->ckpt_rwlock);
WT_ERR(__wt_txn_begin(session, txn_cfg));
/* Prevent eviction from evicting anything newer than this. */
txn_global->ckpt_txnid = session->txn.snap_min;
WT_ERR(__wt_meta_track_on(session));
/*
* If we're doing an ordinary unnamed checkpoint, we only need to flush
* open files. If we're creating a named snapshot, we need to walk the
* entire list of files in the metadata.
*/
WT_TRET((snapshot == NULL) ?
__wt_conn_btree_apply(session, __wt_snapshot, cfg) :
__wt_meta_btree_apply(session,
__wt_snapshot, cfg, WT_BTREE_SNAPSHOT_OP));
/*
* XXX Rolling back the changes here is problematic.
*
* If we unroll here, we need a way to roll back changes to the avail
* list for each tree that was successfully synced before the error
* occurred. Otherwise, the next time we try this operation, we will
* try to free an old snapshot again.
*
* OTOH, if we commit the changes after a failure, we have partially
* overwritten the checkpoint, so what ends up on disk is not
* consistent.
*/
WT_TRET(__wt_meta_track_off(session, ret != 0));
err: txn_global->ckpt_txnid = WT_TXN_NONE;
if (F_ISSET(&session->txn, TXN_RUNNING))
WT_TRET(__txn_release(session));
__wt_rwunlock(session, S2C(session)->ckpt_rwlock);
__wt_free(session, snapshot);
return (ret);
}
/*
* __wt_txn_init --
* Initialize a session's transaction data.
*/
int
__wt_txn_init(WT_SESSION_IMPL *session)
{
WT_TXN *txn;
txn = &session->txn;
txn->id = WT_TXN_NONE;
WT_RET(__wt_calloc_def(session,
S2C(session)->session_size, &txn->snapshot));
return (0);
}
/*
* __wt_txn_destroy --
* Destroy a session's transaction data.
*/
void
__wt_txn_destroy(WT_SESSION_IMPL *session)
{
WT_TXN *txn;
txn = &session->txn;
__wt_free(session, txn->snapshot);
}
/*
* __wt_txn_global_init --
* Initialize the global transaction state.
*/
int
__wt_txn_global_init(WT_CONNECTION_IMPL *conn, const char *cfg[])
{
WT_SESSION_IMPL *session;
WT_TXN_GLOBAL *txn_global;
u_int i;
WT_UNUSED(cfg);
session = conn->default_session;
txn_global = &conn->txn_global;
txn_global->current = 1;
txn_global->ckpt_txnid = WT_TXN_NONE;
WT_RET(__wt_calloc_def(session, conn->session_size, &txn_global->ids));
for (i = 0; i < conn->session_size; i++)
txn_global->ids[i] = WT_TXN_NONE;
return (0);
}
/*
* __wt_txn_global_destroy --
* Destroy the global transaction state.
*/
void
__wt_txn_global_destroy(WT_CONNECTION_IMPL *conn)
{
WT_SESSION_IMPL *session;
WT_TXN_GLOBAL *txn_global;
session = conn->default_session;
txn_global = &conn->txn_global;
__wt_free(session, txn_global->ids);
}
|