diff options
Diffstat (limited to 'src/txn/txn.c')
-rw-r--r-- | src/txn/txn.c | 260 |
1 files changed, 98 insertions, 162 deletions
diff --git a/src/txn/txn.c b/src/txn/txn.c index 3c7373446c5..ca454f8266a 100644 --- a/src/txn/txn.c +++ b/src/txn/txn.c @@ -28,11 +28,14 @@ __wt_txnid_cmp(const void *v1, const void *v2) */ static void __txn_sort_snapshot(WT_SESSION_IMPL *session, - uint32_t n, wt_txnid_t id, wt_txnid_t oldest_snap_min) + uint32_t n, wt_txnid_t id, wt_txnid_t oldest_id) { WT_TXN *txn; + WT_TXN_GLOBAL *txn_global; + wt_txnid_t id_copy; txn = &session->txn; + txn_global = &S2C(session)->txn_global; if (n > 1) qsort(txn->snapshot, n, sizeof(wt_txnid_t), __wt_txnid_cmp); @@ -40,11 +43,18 @@ __txn_sort_snapshot(WT_SESSION_IMPL *session, txn->snap_min = (n == 0) ? id : txn->snapshot[0]; txn->snap_max = id; WT_ASSERT(session, n == 0 || txn->snap_min != WT_TXN_NONE); - if (TXNID_LT(txn->snap_min, oldest_snap_min)) - oldest_snap_min = txn->snap_min; + if (TXNID_LT(txn->snap_min, oldest_id)) + oldest_id = txn->snap_min; - if (TXNID_LT(txn->oldest_snap_min, oldest_snap_min)) - txn->oldest_snap_min = oldest_snap_min; + /* + * Update the oldest snapshot if we have a newer value. We're not + * holding a lock, so copy then swap into place if our oldest version + * is newer. + */ + do { + id_copy = txn_global->oldest_id; + } while (TXNID_LT(id_copy, oldest_id) && + !WT_ATOMIC_CAS(txn_global->oldest_id, id_copy, oldest_id)); } /* @@ -61,131 +71,117 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session) } /* - * __wt_txn_get_oldest -- - * Update the current transaction's cached copy of the oldest possible - * snap_min value. + * __wt_txn_refresh -- + * Allocate a transaction ID and/or a snapshot. */ void -__wt_txn_get_oldest(WT_SESSION_IMPL *session) +__wt_txn_refresh( + WT_SESSION_IMPL *session, wt_txnid_t max_id, int alloc_id, int get_snapshot) { WT_CONNECTION_IMPL *conn; WT_TXN *txn; WT_TXN_GLOBAL *txn_global; - WT_TXN_STATE *s; - wt_txnid_t current_id, id, oldest_snap_min; - uint32_t i, session_cnt; + WT_TXN_STATE *s, *txn_state; + wt_txnid_t current_id, id, oldest_id; + uint32_t i, n, session_cnt; conn = S2C(session); txn = &session->txn; txn_global = &conn->txn_global; + txn_state = &txn_global->states[session->id]; + oldest_id = txn_global->oldest_id; + + /* If nothing has changed since last time, we're done. */ + if (!alloc_id && + txn->id == max_id && + txn->last_id == txn_global->current && + txn->last_gen == txn_global->gen && + TXNID_LE(oldest_id, txn->snap_min)) + goto done; do { - current_id = txn_global->current; - oldest_snap_min = - (txn->id != WT_TXN_NONE) ? txn->id : current_id + 1; + /* Take a copy of the current transaction generation. */ + txn->last_gen = txn_global->gen; - WT_ORDERED_READ(session_cnt, conn->session_cnt); - for (i = 0, s = txn_global->states; - i < session_cnt; - i++, s++) { - if ((id = s->snap_min) != WT_TXN_NONE && - TXNID_LT(id, oldest_snap_min)) - oldest_snap_min = id; + if (alloc_id) { /* - * It is possible that there is no snapshot active, - * even though there are transactions running (at - * isolation levels lower than snapshot isolation). If - * a new snapshot is taken, it will have a snap_min - * value of the lowest running transaction. + * Allocate a transaction ID. + * + * We use an atomic compare and swap to ensure that we + * get a unique ID that is published before the global + * counter is updated. * - * We need to make sure that the oldest snap_min we - * calculate won't be made invalid in that case, so - * make sure it is at least as old as the oldest - * running transaction. + * If two threads race to allocate an ID, only the + * latest ID will proceed. The winning thread can be + * sure its snapshot contains all of the earlier active + * IDs. Threads that race and get an earlier ID may + * not appear in the snapshot, but they will loop and + * allocate a new ID before proceeding to make any + * updates. + * + * This potentially wastes transaction IDs when threads + * race to begin transactions: that is the price we pay + * to keep this path latch free. */ - if ((id = s->id) != WT_TXN_NONE && - TXNID_LT(id, oldest_snap_min)) - oldest_snap_min = id; + do { + current_id = txn_global->current; + txn_state->id = txn->id = current_id + 1; + } while (!WT_ATOMIC_CAS(txn_global->current, + current_id, txn->id) || + txn->id == WT_TXN_NONE || + txn->id == WT_TXN_ABORTED); + txn->last_id = oldest_id = max_id = txn->id; + } else { + txn->last_id = current_id = txn_global->current; + oldest_id = + (max_id != WT_TXN_NONE) ? max_id : current_id + 1; } - } while (current_id != txn_global->current); - - if (TXNID_LT(txn->oldest_snap_min, oldest_snap_min)) - txn->oldest_snap_min = oldest_snap_min; -} - -/* - * __wt_txn_get_snapshot -- - * Set up a snapshot in the current transaction, without allocating an ID. - */ -void -__wt_txn_get_snapshot(WT_SESSION_IMPL *session, - wt_txnid_t my_id, wt_txnid_t max_id, int committing) -{ - WT_CONNECTION_IMPL *conn; - WT_TXN *txn; - WT_TXN_GLOBAL *txn_global; - WT_TXN_STATE *s, *txn_state; - wt_txnid_t current_id, id, oldest_snap_min; - uint32_t i, n, session_cnt; - - conn = S2C(session); - txn = &session->txn; - txn_global = &conn->txn_global; - txn_state = &txn_global->states[session->id]; - /* If nothing has changed since last time, we're done. */ - if (!committing && txn->last_id == txn_global->current && - txn->last_gen == txn_global->gen) { - WT_ASSERT(session, - TXNID_LE(txn->oldest_snap_min, txn->snap_min)); - txn_state->snap_min = txn->snap_min; - return; - } + if (!get_snapshot) + return; - do { - /* Take a copy of the current session ID. */ - txn->last_gen = txn_global->gen; - txn->last_id = current_id = txn_global->current; - oldest_snap_min = current_id + 1; + /* + * Publish a new snap_min value that we refine below. This + * prevents the global oldest value from moving forward + * underneath us. + */ + txn_state->snap_min = txn_global->oldest_id; /* Copy the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) { - if (!committing && (id = s->id) != WT_TXN_NONE && - TXNID_LT(id, oldest_snap_min)) - oldest_snap_min = id; /* - * Ignore everything else about the session's own + * Ignore everything about the session's own * transaction: we are in the process of updating it. */ - if (i == session->id) + if (s == txn_state) continue; - if (id != WT_TXN_NONE && - (max_id == WT_TXN_NONE || TXNID_LT(id, max_id))) + if ((id = s->id) != WT_TXN_NONE) { txn->snapshot[n++] = id; - /* Ignore the session's own transaction. */ + if (TXNID_LT(id, oldest_id)) + oldest_id = id; + } if ((id = s->snap_min) != WT_TXN_NONE && - TXNID_LT(id, oldest_snap_min)) - oldest_snap_min = id; + TXNID_LT(id, oldest_id)) + oldest_id = id; } + __txn_sort_snapshot(session, n, current_id + 1, oldest_id); + /* * Ensure the snapshot reads are scheduled before re-checking * the global current ID. */ WT_READ_BARRIER(); - } while (current_id != txn_global->current || - txn->last_gen != txn_global->gen); - - __txn_sort_snapshot(session, n, - (max_id != WT_TXN_NONE) ? max_id : current_id + 1, - oldest_snap_min); - id = (my_id == WT_TXN_NONE || TXNID_LT(txn->snap_min, my_id)) ? - txn->snap_min : my_id; - WT_ASSERT(session, committing || TXNID_LE(oldest_snap_min, id)); - txn_state->snap_min = id; + } while (txn->last_id != txn_global->current || + txn->last_gen != txn_global->gen || + TXNID_LT(txn->snap_min, txn_global->oldest_id)); + +done: WT_ASSERT(session, + TXNID_LE(txn_global->oldest_id, txn->snap_min)); + txn_state->snap_min = txn->snap_min; } /* @@ -196,17 +192,17 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session, void __wt_txn_get_evict_snapshot(WT_SESSION_IMPL *session) { - WT_TXN *txn; + WT_TXN_GLOBAL *txn_global; + wt_txnid_t oldest_id; - txn = &session->txn; + txn_global = &S2C(session)->txn_global; /* * The oldest active snapshot ID in the system should *not* be visible * to eviction. Create a snapshot containing that ID. */ - __wt_txn_get_oldest(session); - __txn_sort_snapshot( - session, 0, txn->oldest_snap_min, txn->oldest_snap_min); + oldest_id = txn_global->oldest_id; + __txn_sort_snapshot(session, 0, oldest_id, oldest_id); /* * Note that we carefully don't update the global table with this @@ -226,9 +222,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]) WT_CONNECTION_IMPL *conn; WT_TXN *txn; WT_TXN_GLOBAL *txn_global; - WT_TXN_STATE *s, *txn_state; - wt_txnid_t id, oldest_snap_min; - uint32_t i, n, session_cnt; + WT_TXN_STATE *txn_state; conn = S2C(session); txn = &session->txn; @@ -248,66 +242,8 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]) TXN_ISO_READ_COMMITTED : TXN_ISO_READ_UNCOMMITTED; F_SET(txn, TXN_RUNNING); - - do { - /* - * Allocate a transaction ID. - * - * We use an atomic increment to ensure that we get a unique - * ID, then publish that to the global state table. - * - * If two threads race to allocate an ID, only the latest ID - * will proceed. The winning thread can be sure its snapshot - * contains all of the earlier active IDs. Threads that race - * and get an earlier ID may not appear in the snapshot, - * but they will loop and allocate a new ID before proceeding - * to make any updates. - * - * This potentially wastes transaction IDs when threads race to - * begin transactions, but that is the price we pay to keep - * this path latch free. - */ - do { - txn->id = WT_ATOMIC_ADD(txn_global->current, 1); - } while (txn->id == WT_TXN_NONE || txn->id == WT_TXN_ABORTED); - WT_PUBLISH(txn_state->id, txn->id); - - /* - * If we are starting a snapshot isolation transaction, get - * a snapshot of the running transactions. - * - * If we already have a snapshot (e.g., for an auto-commit - * operation), update it so that the newly-allocated ID is - * visible. - */ - if (txn->isolation == TXN_ISO_SNAPSHOT) { - txn->last_gen = txn_global->gen; - oldest_snap_min = txn->id; - - /* Copy the array of concurrent transactions. */ - WT_ORDERED_READ(session_cnt, conn->session_cnt); - for (i = n = 0, s = txn_global->states; - i < session_cnt; - i++, s++) { - if ((id = s->snap_min) != WT_TXN_NONE && - TXNID_LT(id, oldest_snap_min)) - oldest_snap_min = id; - if ((id = s->id) != WT_TXN_NONE) - txn->snapshot[n++] = id; - } - - __txn_sort_snapshot( - session, n, txn->id, oldest_snap_min); - txn_state->snap_min = txn->snap_min; - } - - /* - * Ensure the snapshot reads are complete before re-checking - * the global current ID. - */ - WT_READ_BARRIER(); - } while (txn->id != txn_global->current); - + __wt_txn_refresh( + session, WT_TXN_NONE, 1, txn->isolation == TXN_ISO_SNAPSHOT); return (0); } @@ -373,8 +309,8 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[]) * the cursor. Get the new snapshot before releasing the ID for the * commit. */ - if (session->ncursors > 0) - __wt_txn_get_snapshot(session, txn->id + 1, WT_TXN_NONE, 1); + if (session->ncursors > 0 && txn->isolation != TXN_ISO_READ_UNCOMMITTED) + __wt_txn_refresh(session, txn->id + 1, 0, 1); __wt_txn_release(session); return (0); } |