diff options
-rw-r--r-- | src/btree/bt_sync.c | 2 | ||||
-rw-r--r-- | src/include/extern.h | 2 | ||||
-rw-r--r-- | src/include/txn.h | 3 | ||||
-rw-r--r-- | src/include/txn.i | 20 | ||||
-rw-r--r-- | src/txn/txn.c | 207 |
5 files changed, 136 insertions, 98 deletions
diff --git a/src/btree/bt_sync.c b/src/btree/bt_sync.c index 71b0d0abdb3..cc52f63f1f5 100644 --- a/src/btree/bt_sync.c +++ b/src/btree/bt_sync.c @@ -71,7 +71,7 @@ __sync_file(WT_SESSION_IMPL *session, int syncop) __wt_txn_visible_all( session, page->modify->update_txn)) { if (txn->isolation == TXN_ISO_READ_COMMITTED) - __wt_txn_refresh(session, 1); + __wt_txn_get_snapshot(session); leaf_bytes += page->memory_footprint; ++leaf_pages; WT_ERR(__wt_reconcile(session, walk, NULL, 0)); diff --git a/src/include/extern.h b/src/include/extern.h index 6069bb0dcf2..be002681d9c 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -660,8 +660,8 @@ extern void __wt_stat_init_connection_stats(WT_CONNECTION_STATS *stats); extern void __wt_stat_refresh_connection_stats(void *stats_arg); extern int WT_CDECL __wt_txnid_cmp(const void *v1, const void *v2); extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session); +extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_update_oldest(WT_SESSION_IMPL *session); -extern void __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot); extern int __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]); extern void __wt_txn_release(WT_SESSION_IMPL *session); extern int __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[]); diff --git a/src/include/txn.h b/src/include/txn.h index 927ab09d5f9..62f565c0535 100644 --- a/src/include/txn.h +++ b/src/include/txn.h @@ -42,9 +42,6 @@ struct __wt_txn_global { */ volatile uint64_t oldest_id; - /* The oldest session found in the last scan. */ - uint32_t oldest_session; - /* Count of scanning threads, or -1 for exclusive access. */ volatile int32_t scan_count; diff --git a/src/include/txn.i b/src/include/txn.i index 4ae80231c65..9e8d9cee748 100644 --- a/src/include/txn.i +++ b/src/include/txn.i @@ -154,20 +154,20 @@ __wt_txn_visible(WT_SESSION_IMPL *session, uint64_t id) txn = &session->txn; - /* - * Eviction only sees globally visible updates, or if there is a - * checkpoint transaction running, use its transaction. - */ - if (txn->isolation == TXN_ISO_EVICTION) - return (__wt_txn_visible_all(session, id)); + /* Changes with no associated transaction are always visible. */ + if (id == WT_TXN_NONE) + return (1); /* Nobody sees the results of aborted transactions. */ if (id == WT_TXN_ABORTED) return (0); - /* Changes with no associated transaction are always visible. */ - if (id == WT_TXN_NONE) - return (1); + /* + * Eviction only sees globally visible updates, or if there is a + * checkpoint transaction running, use its transaction. + */ + if (txn->isolation == TXN_ISO_EVICTION) + return (__wt_txn_visible_all(session, id)); /* * Read-uncommitted transactions see all other changes. @@ -418,7 +418,7 @@ __wt_txn_cursor_op(WT_SESSION_IMPL *session) if (txn->isolation != TXN_ISO_READ_UNCOMMITTED && !F_ISSET(txn, TXN_HAS_SNAPSHOT)) - __wt_txn_refresh(session, 1); + __wt_txn_get_snapshot(session); } /* diff --git a/src/txn/txn.c b/src/txn/txn.c index c838785a9c3..bd8c56cf127 100644 --- a/src/txn/txn.c +++ b/src/txn/txn.c @@ -57,50 +57,29 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session) txn = &session->txn; txn_state = &S2C(session)->txn_global.states[session->id]; - if (txn_state->snap_min != WT_TXN_NONE) { - WT_ASSERT(session, - session->txn.isolation == TXN_ISO_READ_UNCOMMITTED || - !__wt_txn_visible_all(session, txn_state->snap_min)); - txn_state->snap_min = WT_TXN_NONE; - } - F_CLR(txn, TXN_HAS_SNAPSHOT); -} + WT_ASSERT(session, + txn_state->snap_min == WT_TXN_NONE || + session->txn.isolation == TXN_ISO_READ_UNCOMMITTED || + !__wt_txn_visible_all(session, txn_state->snap_min)); -/* - * __wt_txn_update_oldest -- - * Sweep the running transactions to update the oldest ID required. - */ -void -__wt_txn_update_oldest(WT_SESSION_IMPL *session) -{ - /* - * !!! - * If a data-source is calling the WT_EXTENSION_API.transaction_oldest - * method (for the oldest transaction ID not yet visible to a running - * transaction), and then comparing that oldest ID against committed - * transactions to see if updates for a committed transaction are still - * visible to running transactions, the oldest transaction ID may be - * the same as the last committed transaction ID, if the transaction - * state wasn't refreshed after the last transaction committed. Push - * past the last committed transaction. - */ - __wt_txn_refresh(session, 0); + txn_state->snap_min = WT_TXN_NONE; + F_CLR(txn, TXN_HAS_SNAPSHOT); } /* - * __wt_txn_refresh -- - * Allocate a transaction ID and/or a snapshot. + * __wt_txn_get_snapshot -- + * Allocate a snapshot. */ void -__wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) +__wt_txn_get_snapshot(WT_SESSION_IMPL *session) { WT_CONNECTION_IMPL *conn; WT_TXN *txn; WT_TXN_GLOBAL *txn_global; WT_TXN_STATE *s, *txn_state; - uint64_t current_id, id, oldest_id; + uint64_t ckpt_id, current_id, id; uint64_t prev_oldest_id, snap_min; - uint32_t i, n, oldest_session, session_cnt; + uint32_t i, n, session_cnt; int32_t count; conn = S2C(session); @@ -113,10 +92,9 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) /* For pure read-only workloads, avoid scanning. */ if (prev_oldest_id == current_id) { - if (get_snapshot) { - txn_state->snap_min = current_id; - __txn_sort_snapshot(session, 0, current_id); - } + txn_state->snap_min = current_id; + __txn_sort_snapshot(session, 0, current_id); + /* Check that the oldest ID has not moved in the meantime. */ if (prev_oldest_id == txn_global->oldest_id && txn_global->scan_count == 0) @@ -136,15 +114,14 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) /* The oldest ID cannot change until the scan count goes to zero. */ prev_oldest_id = txn_global->oldest_id; - current_id = oldest_id = snap_min = txn_global->current; - oldest_session = 0; + current_id = snap_min = txn_global->current; /* Walk the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) { /* Skip the checkpoint transaction; it is never read from. */ - if (txn_global->checkpoint_id != WT_TXN_NONE && - s->id == txn_global->checkpoint_id) + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) continue; /* @@ -160,18 +137,104 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) if (s != txn_state && (id = s->id) != WT_TXN_NONE && TXNID_LE(prev_oldest_id, id)) { - if (get_snapshot) - txn->snapshot[n++] = id; + txn->snapshot[n++] = id; if (TXNID_LT(id, snap_min)) snap_min = id; } + } + + /* + * If we got a new snapshot, update the published snap_min for this + * session. + */ + WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min)); + WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id); + txn_state->snap_min = snap_min; + + /* Update the last running ID if we have a much newer value. */ + if (snap_min > txn_global->last_running + 100) + txn_global->last_running = snap_min; + + WT_ASSERT(session, txn_global->scan_count > 0); + (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1); + + __txn_sort_snapshot(session, n, current_id); +} + +/* + * __wt_txn_update_oldest -- + * Sweep the running transactions to update the oldest ID required. + * !!! + * If a data-source is calling the WT_EXTENSION_API.transaction_oldest + * method (for the oldest transaction ID not yet visible to a running + * transaction), and then comparing that oldest ID against committed + * transactions to see if updates for a committed transaction are still + * visible to running transactions, the oldest transaction ID may be + * the same as the last committed transaction ID, if the transaction + * state wasn't refreshed after the last transaction committed. Push + * past the last committed transaction. +*/ +void +__wt_txn_update_oldest(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_SESSION_IMPL *oldest_session; + WT_TXN_GLOBAL *txn_global; + WT_TXN_STATE *s; + uint64_t ckpt_id, current_id, id, oldest_id, prev_oldest_id, snap_min; + uint32_t i, session_cnt; + int32_t count; + int last_running_moved; + + conn = S2C(session); + txn_global = &conn->txn_global; + + current_id = snap_min = txn_global->current; + oldest_session = NULL; + prev_oldest_id = txn_global->oldest_id; + + /* For pure read-only workloads, avoid scanning. */ + if (prev_oldest_id == current_id) { + /* Check that the oldest ID has not moved in the meantime. */ + if (prev_oldest_id == txn_global->oldest_id && + txn_global->scan_count == 0) + return; + } + + /* + * We're going to scan. Increment the count of scanners to prevent the + * oldest ID from moving forwards. Spin if the count is negative, + * which indicates that some thread is moving the oldest ID forwards. + */ + do { + if ((count = txn_global->scan_count) < 0) + WT_PAUSE(); + } while (count < 0 || + !WT_ATOMIC_CAS4(txn_global->scan_count, count, count + 1)); + + /* The oldest ID cannot change until the scan count goes to zero. */ + prev_oldest_id = txn_global->oldest_id; + current_id = oldest_id = snap_min = txn_global->current; + + /* Walk the array of concurrent transactions. */ + WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; + for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) { + /* Skip the checkpoint transaction; it is never read from. */ + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) + continue; /* - * Ignore the session's own snap_min: we are about to update - * it. + * Update the oldest ID. + * + * Ignore: IDs older than the oldest ID we saw. This can happen + * if we race with a thread that is allocating an ID -- the ID + * will not be used because the thread will keep spinning until + * it gets a valid one. */ - if (get_snapshot && s == txn_state) - continue; + if ((id = s->id) != WT_TXN_NONE && + TXNID_LE(prev_oldest_id, id) && TXNID_LT(id, snap_min)) + snap_min = id; /* * !!! @@ -184,49 +247,31 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) if ((id = s->snap_min) != WT_TXN_NONE && TXNID_LT(id, oldest_id)) { oldest_id = id; - oldest_session = i; + oldest_session = &conn->sessions[i]; } } if (TXNID_LT(snap_min, oldest_id)) oldest_id = snap_min; - if (txn->id != WT_TXN_NONE && TXNID_LT(txn->id, oldest_id)) - oldest_id = txn->id; - /* - * If we got a new snapshot, update the published snap_min for this - * session. - */ - if (get_snapshot) { - WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min)); - WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id); - txn_state->snap_min = snap_min; - } - - /* - * Update the last running ID if we have a much newer value or we are - * forcing an update. - */ - if (!get_snapshot || snap_min > txn_global->last_running + 100) + /* Update the last running ID. */ + if (TXNID_LT(txn_global->last_running, snap_min)) { txn_global->last_running = snap_min; + last_running_moved = 1; + } else + last_running_moved = 0; - /* - * Update the oldest ID if we have a newer ID and we can get exclusive - * access. During normal snapshot refresh, only do this if we have a - * much newer value. Once we get exclusive access, do another pass to - * make sure nobody else is using an earlier ID. - */ + /* Update the oldest ID. */ if (TXNID_LT(prev_oldest_id, oldest_id) && - (!get_snapshot || oldest_id - prev_oldest_id > 100) && WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) { WT_ORDERED_READ(session_cnt, conn->session_cnt); + ckpt_id = txn_global->checkpoint_id; for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) { /* * Skip the checkpoint transaction; it is never read * from. */ - if (txn_global->checkpoint_id != WT_TXN_NONE && - s->id == txn_global->checkpoint_id) + if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id) continue; if ((id = s->id) != WT_TXN_NONE && @@ -241,23 +286,19 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot) txn_global->scan_count = 0; } else { if (WT_VERBOSE_ISSET(session, WT_VERB_TRANSACTION) && - current_id - oldest_id > 10000 && - txn_global->oldest_session != oldest_session) { + current_id - oldest_id > 10000 && last_running_moved && + oldest_session != NULL) { (void)__wt_verbose(session, WT_VERB_TRANSACTION, "old snapshot %" PRIu64 " pinned in session %d [%s]" " with snap_min %" PRIu64 "\n", - oldest_id, oldest_session, - conn->sessions[oldest_session].lastop, - conn->sessions[oldest_session].txn.snap_min); - txn_global->oldest_session = oldest_session; + oldest_id, oldest_session->id, + oldest_session->lastop, + oldest_session->txn.snap_min); } WT_ASSERT(session, txn_global->scan_count > 0); (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1); } - - if (get_snapshot) - __txn_sort_snapshot(session, n, current_id); } /* @@ -304,7 +345,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]) if (txn->isolation == TXN_ISO_SNAPSHOT) { if (session->ncursors > 0) WT_RET(__wt_session_copy_values(session)); - __wt_txn_refresh(session, 1); + __wt_txn_get_snapshot(session); } return (0); } |