diff options
author | Michael Cahill <michael.cahill@mongodb.com> | 2015-06-29 14:31:44 +1000 |
---|---|---|
committer | Michael Cahill <michael.cahill@mongodb.com> | 2015-06-29 14:31:44 +1000 |
commit | 5499cc46b946671cbe4b11d4ba193bc9633a326e (patch) | |
tree | bf07e2388285b76412a840b88e0c03a6e7cf08ee /src/txn | |
parent | 6a667b9b0f00fcbf4e03360f2d9fd6c7a08abdb6 (diff) | |
download | mongo-5499cc46b946671cbe4b11d4ba193bc9633a326e.tar.gz |
WT-1907 Improve performance of transaction refresh
(cherry picked from commit 5f8d5b4a84e36ae366fbee9a2026995af58e07a9)
Diffstat (limited to 'src/txn')
-rw-r--r-- | src/txn/txn.c | 207 |
1 files changed, 124 insertions, 83 deletions
diff --git a/src/txn/txn.c b/src/txn/txn.c index c838785a9c3..bd8c56cf127 100644 --- a/src/txn/txn.c +++ b/src/txn/txn.c @@ -57,50 +57,29 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session) txn = &session->txn; txn_state = &S2C(session)->txn_global.states[session->id]; - if (txn_state->snap_min != WT_TXN_NONE) { - WT_ASSERT(session, - session->txn.isolation == TXN_ISO_READ_UNCOMMITTED || - !__wt_txn_visible_all(session, txn_state->snap_min)); - txn_state->snap_min = WT_TXN_NONE; - } - F_CLR(txn, TXN_HAS_SNAPSHOT); -} + WT_ASSERT(session, + txn_state->snap_min == WT_TXN_NONE || + session->txn.isolation == TXN_ISO_READ_UNCOMMITTED || + !__wt_txn_visible_all(session, txn_state->snap_min)); -/* - * __wt_txn_update_oldest -- - * Sweep the running transactions to update the oldest ID required. - */ -void -__wt_txn_update_oldest(WT_SESSION_IMPL *session) -{ - /* - * !!! - * If a data-source is calling the WT_EXTENSION_API.transaction_oldest - * method (for the oldest transaction ID not yet visible to a running - * transaction), and then comparing that oldest ID against committed - * transactions to see if updates for a committed transaction are still - * visible to running transactions, the oldest transaction ID may be - * the same as the last committed transaction ID, if the transaction - * state wasn't refreshed after the last transaction committed. Push - * past the last committed transaction. - */ - __wt_txn_refresh(session, 0); + txn_state->snap_min = WT_TXN_NONE; + F_CLR(txn, TXN_HAS_SNAPSHOT); } /* - * __wt_txn_refresh -- - * Allocate a transaction ID and/or a snapshot. + * __wt_txn_get_snapshot -- + * Allocate a snapshot. */ void -__wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) +__wt_txn_get_snapshot(WT_SESSION_IMPL *session) { WT_CONNECTION_IMPL *conn; WT_TXN *txn; WT_TXN_GLOBAL *txn_global; WT_TXN_STATE *s, *txn_state; - uint64_t current_id, id, oldest_id; + uint64_t ckpt_id, current_id, id; uint64_t prev_oldest_id, snap_min; - uint32_t i, n, oldest_session, session_cnt; + uint32_t i, n, session_cnt; int32_t count; conn = S2C(session); @@ -113,10 +92,9 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) /* For pure read-only workloads, avoid scanning. */ if (prev_oldest_id == current_id) { - if (get_snapshot) { - txn_state->snap_min = current_id; - __txn_sort_snapshot(session, 0, current_id); - } + txn_state->snap_min = current_id; + __txn_sort_snapshot(session, 0, current_id); + /* Check that the oldest ID has not moved in the meantime. */ if (prev_oldest_id == txn_global->oldest_id && txn_global->scan_count == 0) @@ -136,15 +114,14 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) /* The oldest ID cannot change until the scan count goes to zero. */ prev_oldest_id = txn_global->oldest_id; - current_id = oldest_id = snap_min = txn_global->current; - oldest_session = 0; + current_id = snap_min = txn_global->current; /* Walk the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) { /* Skip the checkpoint transaction; it is never read from. */ - if (txn_global->checkpoint_id != WT_TXN_NONE && - s->id == txn_global->checkpoint_id) + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) continue; /* @@ -160,18 +137,104 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) if (s != txn_state && (id = s->id) != WT_TXN_NONE && TXNID_LE(prev_oldest_id, id)) { - if (get_snapshot) - txn->snapshot[n++] = id; + txn->snapshot[n++] = id; if (TXNID_LT(id, snap_min)) snap_min = id; } + } + + /* + * If we got a new snapshot, update the published snap_min for this + * session. + */ + WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min)); + WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id); + txn_state->snap_min = snap_min; + + /* Update the last running ID if we have a much newer value. */ + if (snap_min > txn_global->last_running + 100) + txn_global->last_running = snap_min; + + WT_ASSERT(session, txn_global->scan_count > 0); + (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1); + + __txn_sort_snapshot(session, n, current_id); +} + +/* + * __wt_txn_update_oldest -- + * Sweep the running transactions to update the oldest ID required. + * !!! + * If a data-source is calling the WT_EXTENSION_API.transaction_oldest + * method (for the oldest transaction ID not yet visible to a running + * transaction), and then comparing that oldest ID against committed + * transactions to see if updates for a committed transaction are still + * visible to running transactions, the oldest transaction ID may be + * the same as the last committed transaction ID, if the transaction + * state wasn't refreshed after the last transaction committed. Push + * past the last committed transaction. +*/ +void +__wt_txn_update_oldest(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_SESSION_IMPL *oldest_session; + WT_TXN_GLOBAL *txn_global; + WT_TXN_STATE *s; + uint64_t ckpt_id, current_id, id, oldest_id, prev_oldest_id, snap_min; + uint32_t i, session_cnt; + int32_t count; + int last_running_moved; + + conn = S2C(session); + txn_global = &conn->txn_global; + + current_id = snap_min = txn_global->current; + oldest_session = NULL; + prev_oldest_id = txn_global->oldest_id; + + /* For pure read-only workloads, avoid scanning. */ + if (prev_oldest_id == current_id) { + /* Check that the oldest ID has not moved in the meantime. */ + if (prev_oldest_id == txn_global->oldest_id && + txn_global->scan_count == 0) + return; + } + + /* + * We're going to scan. Increment the count of scanners to prevent the + * oldest ID from moving forwards. Spin if the count is negative, + * which indicates that some thread is moving the oldest ID forwards. + */ + do { + if ((count = txn_global->scan_count) < 0) + WT_PAUSE(); + } while (count < 0 || + !WT_ATOMIC_CAS4(txn_global->scan_count, count, count + 1)); + + /* The oldest ID cannot change until the scan count goes to zero. */ + prev_oldest_id = txn_global->oldest_id; + current_id = oldest_id = snap_min = txn_global->current; + + /* Walk the array of concurrent transactions. */ + WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; + for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) { + /* Skip the checkpoint transaction; it is never read from. */ + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) + continue; /* - * Ignore the session's own snap_min: we are about to update - * it. + * Update the oldest ID. + * + * Ignore: IDs older than the oldest ID we saw. This can happen + * if we race with a thread that is allocating an ID -- the ID + * will not be used because the thread will keep spinning until + * it gets a valid one. */ - if (get_snapshot && s == txn_state) - continue; + if ((id = s->id) != WT_TXN_NONE && + TXNID_LE(prev_oldest_id, id) && TXNID_LT(id, snap_min)) + snap_min = id; /* * !!! @@ -184,49 +247,31 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) if ((id = s->snap_min) != WT_TXN_NONE && TXNID_LT(id, oldest_id)) { oldest_id = id; - oldest_session = i; + oldest_session = &conn->sessions[i]; } } if (TXNID_LT(snap_min, oldest_id)) oldest_id = snap_min; - if (txn->id != WT_TXN_NONE && TXNID_LT(txn->id, oldest_id)) - oldest_id = txn->id; - /* - * If we got a new snapshot, update the published snap_min for this - * session. - */ - if (get_snapshot) { - WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min)); - WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id); - txn_state->snap_min = snap_min; - } - - /* - * Update the last running ID if we have a much newer value or we are - * forcing an update. - */ - if (!get_snapshot || snap_min > txn_global->last_running + 100) + /* Update the last running ID. */ + if (TXNID_LT(txn_global->last_running, snap_min)) { txn_global->last_running = snap_min; + last_running_moved = 1; + } else + last_running_moved = 0; - /* - * Update the oldest ID if we have a newer ID and we can get exclusive - * access. During normal snapshot refresh, only do this if we have a - * much newer value. Once we get exclusive access, do another pass to - * make sure nobody else is using an earlier ID. - */ + /* Update the oldest ID. */ if (TXNID_LT(prev_oldest_id, oldest_id) && - (!get_snapshot || oldest_id - prev_oldest_id > 100) && WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) { WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) { /* * Skip the checkpoint transaction; it is never read * from. */ - if (txn_global->checkpoint_id != WT_TXN_NONE && - s->id == txn_global->checkpoint_id) + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) continue; if ((id = s->id) != WT_TXN_NONE && @@ -241,23 +286,19 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) txn_global->scan_count = 0; } else { if (WT_VERBOSE_ISSET(session, WT_VERB_TRANSACTION) && - current_id - oldest_id > 10000 && - txn_global->oldest_session != oldest_session) { + current_id - oldest_id > 10000 && last_running_moved && + oldest_session != NULL) { (void)__wt_verbose(session, WT_VERB_TRANSACTION, "old snapshot %" PRIu64 " pinned in session %d [%s]" " with snap_min %" PRIu64 "\n", - oldest_id, oldest_session, - conn->sessions[oldest_session].lastop, - conn->sessions[oldest_session].txn.snap_min); - txn_global->oldest_session = oldest_session; + oldest_id, oldest_session->id, + oldest_session->lastop, + oldest_session->txn.snap_min); } WT_ASSERT(session, txn_global->scan_count > 0); (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1); } - - if (get_snapshot) - __txn_sort_snapshot(session, n, current_id); } /* @@ -304,7 +345,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]) if (txn->isolation == TXN_ISO_SNAPSHOT) { if (session->ncursors > 0) WT_RET(__wt_session_copy_values(session)); - __wt_txn_refresh(session, 1); + __wt_txn_get_snapshot(session); } return (0); } |