summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@mongodb.com>2015-06-29 14:31:44 +1000
committerMichael Cahill <michael.cahill@mongodb.com>2015-06-29 14:31:44 +1000
commit5499cc46b946671cbe4b11d4ba193bc9633a326e (patch)
treebf07e2388285b76412a840b88e0c03a6e7cf08ee
parent6a667b9b0f00fcbf4e03360f2d9fd6c7a08abdb6 (diff)
downloadmongo-5499cc46b946671cbe4b11d4ba193bc9633a326e.tar.gz
WT-1907 Improve performance of transaction refresh
(cherry picked from commit 5f8d5b4a84e36ae366fbee9a2026995af58e07a9)
-rw-r--r--src/btree/bt_sync.c2
-rw-r--r--src/include/extern.h2
-rw-r--r--src/include/txn.h3
-rw-r--r--src/include/txn.i20
-rw-r--r--src/txn/txn.c207
5 files changed, 136 insertions, 98 deletions
diff --git a/src/btree/bt_sync.c b/src/btree/bt_sync.c
index 71b0d0abdb3..cc52f63f1f5 100644
--- a/src/btree/bt_sync.c
+++ b/src/btree/bt_sync.c
@@ -71,7 +71,7 @@ __sync_file(WT_SESSION_IMPL *session, int syncop)
__wt_txn_visible_all(
session, page->modify->update_txn)) {
if (txn->isolation == TXN_ISO_READ_COMMITTED)
- __wt_txn_refresh(session, 1);
+ __wt_txn_get_snapshot(session);
leaf_bytes += page->memory_footprint;
++leaf_pages;
WT_ERR(__wt_reconcile(session, walk, NULL, 0));
diff --git a/src/include/extern.h b/src/include/extern.h
index 6069bb0dcf2..be002681d9c 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -660,8 +660,8 @@ extern void __wt_stat_init_connection_stats(WT_CONNECTION_STATS *stats);
extern void __wt_stat_refresh_connection_stats(void *stats_arg);
extern int WT_CDECL __wt_txnid_cmp(const void *v1, const void *v2);
extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session);
+extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session);
extern void __wt_txn_update_oldest(WT_SESSION_IMPL *session);
-extern void __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot);
extern int __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[]);
extern void __wt_txn_release(WT_SESSION_IMPL *session);
extern int __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[]);
diff --git a/src/include/txn.h b/src/include/txn.h
index 927ab09d5f9..62f565c0535 100644
--- a/src/include/txn.h
+++ b/src/include/txn.h
@@ -42,9 +42,6 @@ struct __wt_txn_global {
*/
volatile uint64_t oldest_id;
- /* The oldest session found in the last scan. */
- uint32_t oldest_session;
-
/* Count of scanning threads, or -1 for exclusive access. */
volatile int32_t scan_count;
diff --git a/src/include/txn.i b/src/include/txn.i
index 4ae80231c65..9e8d9cee748 100644
--- a/src/include/txn.i
+++ b/src/include/txn.i
@@ -154,20 +154,20 @@ __wt_txn_visible(WT_SESSION_IMPL *session, uint64_t id)
txn = &session->txn;
- /*
- * Eviction only sees globally visible updates, or if there is a
- * checkpoint transaction running, use its transaction.
- */
- if (txn->isolation == TXN_ISO_EVICTION)
- return (__wt_txn_visible_all(session, id));
+ /* Changes with no associated transaction are always visible. */
+ if (id == WT_TXN_NONE)
+ return (1);
/* Nobody sees the results of aborted transactions. */
if (id == WT_TXN_ABORTED)
return (0);
- /* Changes with no associated transaction are always visible. */
- if (id == WT_TXN_NONE)
- return (1);
+ /*
+ * Eviction only sees globally visible updates, or if there is a
+ * checkpoint transaction running, use its transaction.
+ */
+ if (txn->isolation == TXN_ISO_EVICTION)
+ return (__wt_txn_visible_all(session, id));
/*
* Read-uncommitted transactions see all other changes.
@@ -418,7 +418,7 @@ __wt_txn_cursor_op(WT_SESSION_IMPL *session)
if (txn->isolation != TXN_ISO_READ_UNCOMMITTED &&
!F_ISSET(txn, TXN_HAS_SNAPSHOT))
- __wt_txn_refresh(session, 1);
+ __wt_txn_get_snapshot(session);
}
/*
diff --git a/src/txn/txn.c b/src/txn/txn.c
index c838785a9c3..bd8c56cf127 100644
--- a/src/txn/txn.c
+++ b/src/txn/txn.c
@@ -57,50 +57,29 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session)
txn = &session->txn;
txn_state = &S2C(session)->txn_global.states[session->id];
- if (txn_state->snap_min != WT_TXN_NONE) {
- WT_ASSERT(session,
- session->txn.isolation == TXN_ISO_READ_UNCOMMITTED ||
- !__wt_txn_visible_all(session, txn_state->snap_min));
- txn_state->snap_min = WT_TXN_NONE;
- }
- F_CLR(txn, TXN_HAS_SNAPSHOT);
-}
+ WT_ASSERT(session,
+ txn_state->snap_min == WT_TXN_NONE ||
+ session->txn.isolation == TXN_ISO_READ_UNCOMMITTED ||
+ !__wt_txn_visible_all(session, txn_state->snap_min));
-/*
- * __wt_txn_update_oldest --
- * Sweep the running transactions to update the oldest ID required.
- */
-void
-__wt_txn_update_oldest(WT_SESSION_IMPL *session)
-{
- /*
- * !!!
- * If a data-source is calling the WT_EXTENSION_API.transaction_oldest
- * method (for the oldest transaction ID not yet visible to a running
- * transaction), and then comparing that oldest ID against committed
- * transactions to see if updates for a committed transaction are still
- * visible to running transactions, the oldest transaction ID may be
- * the same as the last committed transaction ID, if the transaction
- * state wasn't refreshed after the last transaction committed. Push
- * past the last committed transaction.
- */
- __wt_txn_refresh(session, 0);
+ txn_state->snap_min = WT_TXN_NONE;
+ F_CLR(txn, TXN_HAS_SNAPSHOT);
}
/*
- * __wt_txn_refresh --
- * Allocate a transaction ID and/or a snapshot.
+ * __wt_txn_get_snapshot --
+ * Allocate a snapshot.
*/
void
-__wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
+__wt_txn_get_snapshot(WT_SESSION_IMPL *session)
{
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
WT_TXN_STATE *s, *txn_state;
- uint64_t current_id, id, oldest_id;
+ uint64_t ckpt_id, current_id, id;
uint64_t prev_oldest_id, snap_min;
- uint32_t i, n, oldest_session, session_cnt;
+ uint32_t i, n, session_cnt;
int32_t count;
conn = S2C(session);
@@ -113,10 +92,9 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
/* For pure read-only workloads, avoid scanning. */
if (prev_oldest_id == current_id) {
- if (get_snapshot) {
- txn_state->snap_min = current_id;
- __txn_sort_snapshot(session, 0, current_id);
- }
+ txn_state->snap_min = current_id;
+ __txn_sort_snapshot(session, 0, current_id);
+
/* Check that the oldest ID has not moved in the meantime. */
if (prev_oldest_id == txn_global->oldest_id &&
txn_global->scan_count == 0)
@@ -136,15 +114,14 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
/* The oldest ID cannot change until the scan count goes to zero. */
prev_oldest_id = txn_global->oldest_id;
- current_id = oldest_id = snap_min = txn_global->current;
- oldest_session = 0;
+ current_id = snap_min = txn_global->current;
/* Walk the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/* Skip the checkpoint transaction; it is never read from. */
- if (txn_global->checkpoint_id != WT_TXN_NONE &&
- s->id == txn_global->checkpoint_id)
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
continue;
/*
@@ -160,18 +137,104 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
if (s != txn_state &&
(id = s->id) != WT_TXN_NONE &&
TXNID_LE(prev_oldest_id, id)) {
- if (get_snapshot)
- txn->snapshot[n++] = id;
+ txn->snapshot[n++] = id;
if (TXNID_LT(id, snap_min))
snap_min = id;
}
+ }
+
+ /*
+ * If we got a new snapshot, update the published snap_min for this
+ * session.
+ */
+ WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min));
+ WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id);
+ txn_state->snap_min = snap_min;
+
+ /* Update the last running ID if we have a much newer value. */
+ if (snap_min > txn_global->last_running + 100)
+ txn_global->last_running = snap_min;
+
+ WT_ASSERT(session, txn_global->scan_count > 0);
+ (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1);
+
+ __txn_sort_snapshot(session, n, current_id);
+}
+
+/*
+ * __wt_txn_update_oldest --
+ * Sweep the running transactions to update the oldest ID required.
+ * !!!
+ * If a data-source is calling the WT_EXTENSION_API.transaction_oldest
+ * method (for the oldest transaction ID not yet visible to a running
+ * transaction), and then comparing that oldest ID against committed
+ * transactions to see if updates for a committed transaction are still
+ * visible to running transactions, the oldest transaction ID may be
+ * the same as the last committed transaction ID, if the transaction
+ * state wasn't refreshed after the last transaction committed. Push
+ * past the last committed transaction.
+*/
+void
+__wt_txn_update_oldest(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_SESSION_IMPL *oldest_session;
+ WT_TXN_GLOBAL *txn_global;
+ WT_TXN_STATE *s;
+ uint64_t ckpt_id, current_id, id, oldest_id, prev_oldest_id, snap_min;
+ uint32_t i, session_cnt;
+ int32_t count;
+ int last_running_moved;
+
+ conn = S2C(session);
+ txn_global = &conn->txn_global;
+
+ current_id = snap_min = txn_global->current;
+ oldest_session = NULL;
+ prev_oldest_id = txn_global->oldest_id;
+
+ /* For pure read-only workloads, avoid scanning. */
+ if (prev_oldest_id == current_id) {
+ /* Check that the oldest ID has not moved in the meantime. */
+ if (prev_oldest_id == txn_global->oldest_id &&
+ txn_global->scan_count == 0)
+ return;
+ }
+
+ /*
+ * We're going to scan. Increment the count of scanners to prevent the
+ * oldest ID from moving forwards. Spin if the count is negative,
+ * which indicates that some thread is moving the oldest ID forwards.
+ */
+ do {
+ if ((count = txn_global->scan_count) < 0)
+ WT_PAUSE();
+ } while (count < 0 ||
+ !WT_ATOMIC_CAS4(txn_global->scan_count, count, count + 1));
+
+ /* The oldest ID cannot change until the scan count goes to zero. */
+ prev_oldest_id = txn_global->oldest_id;
+ current_id = oldest_id = snap_min = txn_global->current;
+
+ /* Walk the array of concurrent transactions. */
+ WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
+ for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
+ /* Skip the checkpoint transaction; it is never read from. */
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
+ continue;
/*
- * Ignore the session's own snap_min: we are about to update
- * it.
+ * Update the oldest ID.
+ *
+ * Ignore: IDs older than the oldest ID we saw. This can happen
+ * if we race with a thread that is allocating an ID -- the ID
+ * will not be used because the thread will keep spinning until
+ * it gets a valid one.
*/
- if (get_snapshot && s == txn_state)
- continue;
+ if ((id = s->id) != WT_TXN_NONE &&
+ TXNID_LE(prev_oldest_id, id) && TXNID_LT(id, snap_min))
+ snap_min = id;
/*
* !!!
@@ -184,49 +247,31 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
if ((id = s->snap_min) != WT_TXN_NONE &&
TXNID_LT(id, oldest_id)) {
oldest_id = id;
- oldest_session = i;
+ oldest_session = &conn->sessions[i];
}
}
if (TXNID_LT(snap_min, oldest_id))
oldest_id = snap_min;
- if (txn->id != WT_TXN_NONE && TXNID_LT(txn->id, oldest_id))
- oldest_id = txn->id;
- /*
- * If we got a new snapshot, update the published snap_min for this
- * session.
- */
- if (get_snapshot) {
- WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min));
- WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id);
- txn_state->snap_min = snap_min;
- }
-
- /*
- * Update the last running ID if we have a much newer value or we are
- * forcing an update.
- */
- if (!get_snapshot || snap_min > txn_global->last_running + 100)
+ /* Update the last running ID. */
+ if (TXNID_LT(txn_global->last_running, snap_min)) {
txn_global->last_running = snap_min;
+ last_running_moved = 1;
+ } else
+ last_running_moved = 0;
- /*
- * Update the oldest ID if we have a newer ID and we can get exclusive
- * access. During normal snapshot refresh, only do this if we have a
- * much newer value. Once we get exclusive access, do another pass to
- * make sure nobody else is using an earlier ID.
- */
+ /* Update the oldest ID. */
if (TXNID_LT(prev_oldest_id, oldest_id) &&
- (!get_snapshot || oldest_id - prev_oldest_id > 100) &&
WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) {
WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/*
* Skip the checkpoint transaction; it is never read
* from.
*/
- if (txn_global->checkpoint_id != WT_TXN_NONE &&
- s->id == txn_global->checkpoint_id)
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
continue;
if ((id = s->id) != WT_TXN_NONE &&
@@ -241,23 +286,19 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
txn_global->scan_count = 0;
} else {
if (WT_VERBOSE_ISSET(session, WT_VERB_TRANSACTION) &&
- current_id - oldest_id > 10000 &&
- txn_global->oldest_session != oldest_session) {
+ current_id - oldest_id > 10000 && last_running_moved &&
+ oldest_session != NULL) {
(void)__wt_verbose(session, WT_VERB_TRANSACTION,
"old snapshot %" PRIu64
" pinned in session %d [%s]"
" with snap_min %" PRIu64 "\n",
- oldest_id, oldest_session,
- conn->sessions[oldest_session].lastop,
- conn->sessions[oldest_session].txn.snap_min);
- txn_global->oldest_session = oldest_session;
+ oldest_id, oldest_session->id,
+ oldest_session->lastop,
+ oldest_session->txn.snap_min);
}
WT_ASSERT(session, txn_global->scan_count > 0);
(void)WT_ATOMIC_SUB4(txn_global->scan_count, 1);
}
-
- if (get_snapshot)
- __txn_sort_snapshot(session, n, current_id);
}
/*
@@ -304,7 +345,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[])
if (txn->isolation == TXN_ISO_SNAPSHOT) {
if (session->ncursors > 0)
WT_RET(__wt_session_copy_values(session));
- __wt_txn_refresh(session, 1);
+ __wt_txn_get_snapshot(session);
}
return (0);
}