summaryrefslogtreecommitdiff
path: root/src/txn
diff options
context:
space:
mode:
Diffstat (limited to 'src/txn')
-rw-r--r--src/txn/txn.c207
1 files changed, 124 insertions, 83 deletions
diff --git a/src/txn/txn.c b/src/txn/txn.c
index c838785a9c3..bd8c56cf127 100644
--- a/src/txn/txn.c
+++ b/src/txn/txn.c
@@ -57,50 +57,29 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session)
txn = &session->txn;
txn_state = &S2C(session)->txn_global.states[session->id];
- if (txn_state->snap_min != WT_TXN_NONE) {
- WT_ASSERT(session,
- session->txn.isolation == TXN_ISO_READ_UNCOMMITTED ||
- !__wt_txn_visible_all(session, txn_state->snap_min));
- txn_state->snap_min = WT_TXN_NONE;
- }
- F_CLR(txn, TXN_HAS_SNAPSHOT);
-}
+ WT_ASSERT(session,
+ txn_state->snap_min == WT_TXN_NONE ||
+ session->txn.isolation == TXN_ISO_READ_UNCOMMITTED ||
+ !__wt_txn_visible_all(session, txn_state->snap_min));
-/*
- * __wt_txn_update_oldest --
- * Sweep the running transactions to update the oldest ID required.
- */
-void
-__wt_txn_update_oldest(WT_SESSION_IMPL *session)
-{
- /*
- * !!!
- * If a data-source is calling the WT_EXTENSION_API.transaction_oldest
- * method (for the oldest transaction ID not yet visible to a running
- * transaction), and then comparing that oldest ID against committed
- * transactions to see if updates for a committed transaction are still
- * visible to running transactions, the oldest transaction ID may be
- * the same as the last committed transaction ID, if the transaction
- * state wasn't refreshed after the last transaction committed. Push
- * past the last committed transaction.
- */
- __wt_txn_refresh(session, 0);
+ txn_state->snap_min = WT_TXN_NONE;
+ F_CLR(txn, TXN_HAS_SNAPSHOT);
}
/*
- * __wt_txn_refresh --
- * Allocate a transaction ID and/or a snapshot.
+ * __wt_txn_get_snapshot --
+ * Allocate a snapshot.
*/
void
-__wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
+__wt_txn_get_snapshot(WT_SESSION_IMPL *session)
{
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
WT_TXN_STATE *s, *txn_state;
- uint64_t current_id, id, oldest_id;
+ uint64_t ckpt_id, current_id, id;
uint64_t prev_oldest_id, snap_min;
- uint32_t i, n, oldest_session, session_cnt;
+ uint32_t i, n, session_cnt;
int32_t count;
conn = S2C(session);
@@ -113,10 +92,9 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
/* For pure read-only workloads, avoid scanning. */
if (prev_oldest_id == current_id) {
- if (get_snapshot) {
- txn_state->snap_min = current_id;
- __txn_sort_snapshot(session, 0, current_id);
- }
+ txn_state->snap_min = current_id;
+ __txn_sort_snapshot(session, 0, current_id);
+
/* Check that the oldest ID has not moved in the meantime. */
if (prev_oldest_id == txn_global->oldest_id &&
txn_global->scan_count == 0)
@@ -136,15 +114,14 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
/* The oldest ID cannot change until the scan count goes to zero. */
prev_oldest_id = txn_global->oldest_id;
- current_id = oldest_id = snap_min = txn_global->current;
- oldest_session = 0;
+ current_id = snap_min = txn_global->current;
/* Walk the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
for (i = n = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/* Skip the checkpoint transaction; it is never read from. */
- if (txn_global->checkpoint_id != WT_TXN_NONE &&
- s->id == txn_global->checkpoint_id)
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
continue;
/*
@@ -160,18 +137,104 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
if (s != txn_state &&
(id = s->id) != WT_TXN_NONE &&
TXNID_LE(prev_oldest_id, id)) {
- if (get_snapshot)
- txn->snapshot[n++] = id;
+ txn->snapshot[n++] = id;
if (TXNID_LT(id, snap_min))
snap_min = id;
}
+ }
+
+ /*
+ * If we got a new snapshot, update the published snap_min for this
+ * session.
+ */
+ WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min));
+ WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id);
+ txn_state->snap_min = snap_min;
+
+ /* Update the last running ID if we have a much newer value. */
+ if (snap_min > txn_global->last_running + 100)
+ txn_global->last_running = snap_min;
+
+ WT_ASSERT(session, txn_global->scan_count > 0);
+ (void)WT_ATOMIC_SUB4(txn_global->scan_count, 1);
+
+ __txn_sort_snapshot(session, n, current_id);
+}
+
+/*
+ * __wt_txn_update_oldest --
+ * Sweep the running transactions to update the oldest ID required.
+ * !!!
+ * If a data-source is calling the WT_EXTENSION_API.transaction_oldest
+ * method (for the oldest transaction ID not yet visible to a running
+ * transaction), and then comparing that oldest ID against committed
+ * transactions to see if updates for a committed transaction are still
+ * visible to running transactions, the oldest transaction ID may be
+ * the same as the last committed transaction ID, if the transaction
+ * state wasn't refreshed after the last transaction committed. Push
+ * past the last committed transaction.
+*/
+void
+__wt_txn_update_oldest(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_SESSION_IMPL *oldest_session;
+ WT_TXN_GLOBAL *txn_global;
+ WT_TXN_STATE *s;
+ uint64_t ckpt_id, current_id, id, oldest_id, prev_oldest_id, snap_min;
+ uint32_t i, session_cnt;
+ int32_t count;
+ int last_running_moved;
+
+ conn = S2C(session);
+ txn_global = &conn->txn_global;
+
+ current_id = snap_min = txn_global->current;
+ oldest_session = NULL;
+ prev_oldest_id = txn_global->oldest_id;
+
+ /* For pure read-only workloads, avoid scanning. */
+ if (prev_oldest_id == current_id) {
+ /* Check that the oldest ID has not moved in the meantime. */
+ if (prev_oldest_id == txn_global->oldest_id &&
+ txn_global->scan_count == 0)
+ return;
+ }
+
+ /*
+ * We're going to scan. Increment the count of scanners to prevent the
+ * oldest ID from moving forwards. Spin if the count is negative,
+ * which indicates that some thread is moving the oldest ID forwards.
+ */
+ do {
+ if ((count = txn_global->scan_count) < 0)
+ WT_PAUSE();
+ } while (count < 0 ||
+ !WT_ATOMIC_CAS4(txn_global->scan_count, count, count + 1));
+
+ /* The oldest ID cannot change until the scan count goes to zero. */
+ prev_oldest_id = txn_global->oldest_id;
+ current_id = oldest_id = snap_min = txn_global->current;
+
+ /* Walk the array of concurrent transactions. */
+ WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
+ for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
+ /* Skip the checkpoint transaction; it is never read from. */
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
+ continue;
/*
- * Ignore the session's own snap_min: we are about to update
- * it.
+ * Update the oldest ID.
+ *
+ * Ignore: IDs older than the oldest ID we saw. This can happen
+ * if we race with a thread that is allocating an ID -- the ID
+ * will not be used because the thread will keep spinning until
+ * it gets a valid one.
*/
- if (get_snapshot && s == txn_state)
- continue;
+ if ((id = s->id) != WT_TXN_NONE &&
+ TXNID_LE(prev_oldest_id, id) && TXNID_LT(id, snap_min))
+ snap_min = id;
/*
* !!!
@@ -184,49 +247,31 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
if ((id = s->snap_min) != WT_TXN_NONE &&
TXNID_LT(id, oldest_id)) {
oldest_id = id;
- oldest_session = i;
+ oldest_session = &conn->sessions[i];
}
}
if (TXNID_LT(snap_min, oldest_id))
oldest_id = snap_min;
- if (txn->id != WT_TXN_NONE && TXNID_LT(txn->id, oldest_id))
- oldest_id = txn->id;
- /*
- * If we got a new snapshot, update the published snap_min for this
- * session.
- */
- if (get_snapshot) {
- WT_ASSERT(session, TXNID_LE(prev_oldest_id, snap_min));
- WT_ASSERT(session, prev_oldest_id == txn_global->oldest_id);
- txn_state->snap_min = snap_min;
- }
-
- /*
- * Update the last running ID if we have a much newer value or we are
- * forcing an update.
- */
- if (!get_snapshot || snap_min > txn_global->last_running + 100)
+ /* Update the last running ID. */
+ if (TXNID_LT(txn_global->last_running, snap_min)) {
txn_global->last_running = snap_min;
+ last_running_moved = 1;
+ } else
+ last_running_moved = 0;
- /*
- * Update the oldest ID if we have a newer ID and we can get exclusive
- * access. During normal snapshot refresh, only do this if we have a
- * much newer value. Once we get exclusive access, do another pass to
- * make sure nobody else is using an earlier ID.
- */
+ /* Update the oldest ID. */
if (TXNID_LT(prev_oldest_id, oldest_id) &&
- (!get_snapshot || oldest_id - prev_oldest_id > 100) &&
WT_ATOMIC_CAS4(txn_global->scan_count, 1, -1)) {
WT_ORDERED_READ(session_cnt, conn->session_cnt);
+ ckpt_id = txn_global->checkpoint_id;
for (i = 0, s = txn_global->states; i < session_cnt; i++, s++) {
/*
* Skip the checkpoint transaction; it is never read
* from.
*/
- if (txn_global->checkpoint_id != WT_TXN_NONE &&
- s->id == txn_global->checkpoint_id)
+ if (ckpt_id != WT_TXN_NONE && ckpt_id == s->id)
continue;
if ((id = s->id) != WT_TXN_NONE &&
@@ -241,23 +286,19 @@ __wt_txn_refresh(WT_SESSION_IMPL *session, int get_snapshot)
txn_global->scan_count = 0;
} else {
if (WT_VERBOSE_ISSET(session, WT_VERB_TRANSACTION) &&
- current_id - oldest_id > 10000 &&
- txn_global->oldest_session != oldest_session) {
+ current_id - oldest_id > 10000 && last_running_moved &&
+ oldest_session != NULL) {
(void)__wt_verbose(session, WT_VERB_TRANSACTION,
"old snapshot %" PRIu64
" pinned in session %d [%s]"
" with snap_min %" PRIu64 "\n",
- oldest_id, oldest_session,
- conn->sessions[oldest_session].lastop,
- conn->sessions[oldest_session].txn.snap_min);
- txn_global->oldest_session = oldest_session;
+ oldest_id, oldest_session->id,
+ oldest_session->lastop,
+ oldest_session->txn.snap_min);
}
WT_ASSERT(session, txn_global->scan_count > 0);
(void)WT_ATOMIC_SUB4(txn_global->scan_count, 1);
}
-
- if (get_snapshot)
- __txn_sort_snapshot(session, n, current_id);
}
/*
@@ -304,7 +345,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[])
if (txn->isolation == TXN_ISO_SNAPSHOT) {
if (session->ncursors > 0)
WT_RET(__wt_session_copy_values(session));
- __wt_txn_refresh(session, 1);
+ __wt_txn_get_snapshot(session);
}
return (0);
}