summaryrefslogtreecommitdiff
path: root/src/txn/txn.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/txn/txn.c')
-rw-r--r--src/txn/txn.c260
1 files changed, 98 insertions, 162 deletions
diff --git a/src/txn/txn.c b/src/txn/txn.c
index 3c7373446c5..ca454f8266a 100644
--- a/src/txn/txn.c
+++ b/src/txn/txn.c
@@ -28,11 +28,14 @@ __wt_txnid_cmp(const void *v1, const void *v2)
*/
static void
__txn_sort_snapshot(WT_SESSION_IMPL *session,
- uint32_t n, wt_txnid_t id, wt_txnid_t oldest_snap_min)
+ uint32_t n, wt_txnid_t id, wt_txnid_t oldest_id)
{
WT_TXN *txn;
+ WT_TXN_GLOBAL *txn_global;
+ wt_txnid_t id_copy;
txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
if (n > 1)
qsort(txn->snapshot, n, sizeof(wt_txnid_t), __wt_txnid_cmp);
@@ -40,11 +43,18 @@ __txn_sort_snapshot(WT_SESSION_IMPL *session,
txn->snap_min = (n == 0) ? id : txn->snapshot[0];
txn->snap_max = id;
WT_ASSERT(session, n == 0 || txn->snap_min != WT_TXN_NONE);
- if (TXNID_LT(txn->snap_min, oldest_snap_min))
- oldest_snap_min = txn->snap_min;
+ if (TXNID_LT(txn->snap_min, oldest_id))
+ oldest_id = txn->snap_min;
- if (TXNID_LT(txn->oldest_snap_min, oldest_snap_min))
- txn->oldest_snap_min = oldest_snap_min;
+ /*
+ * Update the oldest snapshot if we have a newer value. We're not
+ * holding a lock, so copy then swap into place if our oldest version
+ * is newer.
+ */
+ do {
+ id_copy = txn_global->oldest_id;
+ } while (TXNID_LT(id_copy, oldest_id) &&
+ !WT_ATOMIC_CAS(txn_global->oldest_id, id_copy, oldest_id));
}
/*
@@ -61,131 +71,117 @@ __wt_txn_release_snapshot(WT_SESSION_IMPL *session)
}
/*
- * __wt_txn_get_oldest --
- * Update the current transaction's cached copy of the oldest possible
- * snap_min value.
+ * __wt_txn_refresh --
+ * Allocate a transaction ID and/or a snapshot.
*/
void
-__wt_txn_get_oldest(WT_SESSION_IMPL *session)
+__wt_txn_refresh(
+ WT_SESSION_IMPL *session, wt_txnid_t max_id, int alloc_id, int get_snapshot)
{
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
- WT_TXN_STATE *s;
- wt_txnid_t current_id, id, oldest_snap_min;
- uint32_t i, session_cnt;
+ WT_TXN_STATE *s, *txn_state;
+ wt_txnid_t current_id, id, oldest_id;
+ uint32_t i, n, session_cnt;
conn = S2C(session);
txn = &session->txn;
txn_global = &conn->txn_global;
+ txn_state = &txn_global->states[session->id];
+ oldest_id = txn_global->oldest_id;
+
+ /* If nothing has changed since last time, we're done. */
+ if (!alloc_id &&
+ txn->id == max_id &&
+ txn->last_id == txn_global->current &&
+ txn->last_gen == txn_global->gen &&
+ TXNID_LE(oldest_id, txn->snap_min))
+ goto done;
do {
- current_id = txn_global->current;
- oldest_snap_min =
- (txn->id != WT_TXN_NONE) ? txn->id : current_id + 1;
+ /* Take a copy of the current transaction generation. */
+ txn->last_gen = txn_global->gen;
- WT_ORDERED_READ(session_cnt, conn->session_cnt);
- for (i = 0, s = txn_global->states;
- i < session_cnt;
- i++, s++) {
- if ((id = s->snap_min) != WT_TXN_NONE &&
- TXNID_LT(id, oldest_snap_min))
- oldest_snap_min = id;
+ if (alloc_id) {
/*
- * It is possible that there is no snapshot active,
- * even though there are transactions running (at
- * isolation levels lower than snapshot isolation). If
- * a new snapshot is taken, it will have a snap_min
- * value of the lowest running transaction.
+ * Allocate a transaction ID.
+ *
+ * We use an atomic compare and swap to ensure that we
+ * get a unique ID that is published before the global
+ * counter is updated.
*
- * We need to make sure that the oldest snap_min we
- * calculate won't be made invalid in that case, so
- * make sure it is at least as old as the oldest
- * running transaction.
+ * If two threads race to allocate an ID, only the
+ * latest ID will proceed. The winning thread can be
+ * sure its snapshot contains all of the earlier active
+ * IDs. Threads that race and get an earlier ID may
+ * not appear in the snapshot, but they will loop and
+ * allocate a new ID before proceeding to make any
+ * updates.
+ *
+ * This potentially wastes transaction IDs when threads
+ * race to begin transactions: that is the price we pay
+ * to keep this path latch free.
*/
- if ((id = s->id) != WT_TXN_NONE &&
- TXNID_LT(id, oldest_snap_min))
- oldest_snap_min = id;
+ do {
+ current_id = txn_global->current;
+ txn_state->id = txn->id = current_id + 1;
+ } while (!WT_ATOMIC_CAS(txn_global->current,
+ current_id, txn->id) ||
+ txn->id == WT_TXN_NONE ||
+ txn->id == WT_TXN_ABORTED);
+ txn->last_id = oldest_id = max_id = txn->id;
+ } else {
+ txn->last_id = current_id = txn_global->current;
+ oldest_id =
+ (max_id != WT_TXN_NONE) ? max_id : current_id + 1;
}
- } while (current_id != txn_global->current);
-
- if (TXNID_LT(txn->oldest_snap_min, oldest_snap_min))
- txn->oldest_snap_min = oldest_snap_min;
-}
-
-/*
- * __wt_txn_get_snapshot --
- * Set up a snapshot in the current transaction, without allocating an ID.
- */
-void
-__wt_txn_get_snapshot(WT_SESSION_IMPL *session,
- wt_txnid_t my_id, wt_txnid_t max_id, int committing)
-{
- WT_CONNECTION_IMPL *conn;
- WT_TXN *txn;
- WT_TXN_GLOBAL *txn_global;
- WT_TXN_STATE *s, *txn_state;
- wt_txnid_t current_id, id, oldest_snap_min;
- uint32_t i, n, session_cnt;
-
- conn = S2C(session);
- txn = &session->txn;
- txn_global = &conn->txn_global;
- txn_state = &txn_global->states[session->id];
- /* If nothing has changed since last time, we're done. */
- if (!committing && txn->last_id == txn_global->current &&
- txn->last_gen == txn_global->gen) {
- WT_ASSERT(session,
- TXNID_LE(txn->oldest_snap_min, txn->snap_min));
- txn_state->snap_min = txn->snap_min;
- return;
- }
+ if (!get_snapshot)
+ return;
- do {
- /* Take a copy of the current session ID. */
- txn->last_gen = txn_global->gen;
- txn->last_id = current_id = txn_global->current;
- oldest_snap_min = current_id + 1;
+ /*
+ * Publish a new snap_min value that we refine below. This
+ * prevents the global oldest value from moving forward
+ * underneath us.
+ */
+ txn_state->snap_min = txn_global->oldest_id;
/* Copy the array of concurrent transactions. */
WT_ORDERED_READ(session_cnt, conn->session_cnt);
for (i = n = 0, s = txn_global->states;
i < session_cnt;
i++, s++) {
- if (!committing && (id = s->id) != WT_TXN_NONE &&
- TXNID_LT(id, oldest_snap_min))
- oldest_snap_min = id;
/*
- * Ignore everything else about the session's own
+ * Ignore everything about the session's own
* transaction: we are in the process of updating it.
*/
- if (i == session->id)
+ if (s == txn_state)
continue;
- if (id != WT_TXN_NONE &&
- (max_id == WT_TXN_NONE || TXNID_LT(id, max_id)))
+ if ((id = s->id) != WT_TXN_NONE) {
txn->snapshot[n++] = id;
- /* Ignore the session's own transaction. */
+ if (TXNID_LT(id, oldest_id))
+ oldest_id = id;
+ }
if ((id = s->snap_min) != WT_TXN_NONE &&
- TXNID_LT(id, oldest_snap_min))
- oldest_snap_min = id;
+ TXNID_LT(id, oldest_id))
+ oldest_id = id;
}
+ __txn_sort_snapshot(session, n, current_id + 1, oldest_id);
+
/*
* Ensure the snapshot reads are scheduled before re-checking
* the global current ID.
*/
WT_READ_BARRIER();
- } while (current_id != txn_global->current ||
- txn->last_gen != txn_global->gen);
-
- __txn_sort_snapshot(session, n,
- (max_id != WT_TXN_NONE) ? max_id : current_id + 1,
- oldest_snap_min);
- id = (my_id == WT_TXN_NONE || TXNID_LT(txn->snap_min, my_id)) ?
- txn->snap_min : my_id;
- WT_ASSERT(session, committing || TXNID_LE(oldest_snap_min, id));
- txn_state->snap_min = id;
+ } while (txn->last_id != txn_global->current ||
+ txn->last_gen != txn_global->gen ||
+ TXNID_LT(txn->snap_min, txn_global->oldest_id));
+
+done: WT_ASSERT(session,
+ TXNID_LE(txn_global->oldest_id, txn->snap_min));
+ txn_state->snap_min = txn->snap_min;
}
/*
@@ -196,17 +192,17 @@ __wt_txn_get_snapshot(WT_SESSION_IMPL *session,
void
__wt_txn_get_evict_snapshot(WT_SESSION_IMPL *session)
{
- WT_TXN *txn;
+ WT_TXN_GLOBAL *txn_global;
+ wt_txnid_t oldest_id;
- txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
/*
* The oldest active snapshot ID in the system should *not* be visible
* to eviction. Create a snapshot containing that ID.
*/
- __wt_txn_get_oldest(session);
- __txn_sort_snapshot(
- session, 0, txn->oldest_snap_min, txn->oldest_snap_min);
+ oldest_id = txn_global->oldest_id;
+ __txn_sort_snapshot(session, 0, oldest_id, oldest_id);
/*
* Note that we carefully don't update the global table with this
@@ -226,9 +222,7 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[])
WT_CONNECTION_IMPL *conn;
WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
- WT_TXN_STATE *s, *txn_state;
- wt_txnid_t id, oldest_snap_min;
- uint32_t i, n, session_cnt;
+ WT_TXN_STATE *txn_state;
conn = S2C(session);
txn = &session->txn;
@@ -248,66 +242,8 @@ __wt_txn_begin(WT_SESSION_IMPL *session, const char *cfg[])
TXN_ISO_READ_COMMITTED : TXN_ISO_READ_UNCOMMITTED;
F_SET(txn, TXN_RUNNING);
-
- do {
- /*
- * Allocate a transaction ID.
- *
- * We use an atomic increment to ensure that we get a unique
- * ID, then publish that to the global state table.
- *
- * If two threads race to allocate an ID, only the latest ID
- * will proceed. The winning thread can be sure its snapshot
- * contains all of the earlier active IDs. Threads that race
- * and get an earlier ID may not appear in the snapshot,
- * but they will loop and allocate a new ID before proceeding
- * to make any updates.
- *
- * This potentially wastes transaction IDs when threads race to
- * begin transactions, but that is the price we pay to keep
- * this path latch free.
- */
- do {
- txn->id = WT_ATOMIC_ADD(txn_global->current, 1);
- } while (txn->id == WT_TXN_NONE || txn->id == WT_TXN_ABORTED);
- WT_PUBLISH(txn_state->id, txn->id);
-
- /*
- * If we are starting a snapshot isolation transaction, get
- * a snapshot of the running transactions.
- *
- * If we already have a snapshot (e.g., for an auto-commit
- * operation), update it so that the newly-allocated ID is
- * visible.
- */
- if (txn->isolation == TXN_ISO_SNAPSHOT) {
- txn->last_gen = txn_global->gen;
- oldest_snap_min = txn->id;
-
- /* Copy the array of concurrent transactions. */
- WT_ORDERED_READ(session_cnt, conn->session_cnt);
- for (i = n = 0, s = txn_global->states;
- i < session_cnt;
- i++, s++) {
- if ((id = s->snap_min) != WT_TXN_NONE &&
- TXNID_LT(id, oldest_snap_min))
- oldest_snap_min = id;
- if ((id = s->id) != WT_TXN_NONE)
- txn->snapshot[n++] = id;
- }
-
- __txn_sort_snapshot(
- session, n, txn->id, oldest_snap_min);
- txn_state->snap_min = txn->snap_min;
- }
-
- /*
- * Ensure the snapshot reads are complete before re-checking
- * the global current ID.
- */
- WT_READ_BARRIER();
- } while (txn->id != txn_global->current);
-
+ __wt_txn_refresh(
+ session, WT_TXN_NONE, 1, txn->isolation == TXN_ISO_SNAPSHOT);
return (0);
}
@@ -373,8 +309,8 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
* the cursor. Get the new snapshot before releasing the ID for the
* commit.
*/
- if (session->ncursors > 0)
- __wt_txn_get_snapshot(session, txn->id + 1, WT_TXN_NONE, 1);
+ if (session->ncursors > 0 && txn->isolation != TXN_ISO_READ_UNCOMMITTED)
+ __wt_txn_refresh(session, txn->id + 1, 0, 1);
__wt_txn_release(session);
return (0);
}