summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/txn/txn_timestamp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/txn/txn_timestamp.c')
-rw-r--r--src/third_party/wiredtiger/src/txn/txn_timestamp.c104
1 files changed, 77 insertions, 27 deletions
diff --git a/src/third_party/wiredtiger/src/txn/txn_timestamp.c b/src/third_party/wiredtiger/src/txn/txn_timestamp.c
index 41ac970f14e..d07bfecd47c 100644
--- a/src/third_party/wiredtiger/src/txn/txn_timestamp.c
+++ b/src/third_party/wiredtiger/src/txn/txn_timestamp.c
@@ -212,11 +212,20 @@ __txn_global_query_timestamp(
/* Compare with the oldest running transaction. */
__wt_readlock(session, &txn_global->commit_timestamp_rwlock);
- txn = TAILQ_FIRST(&txn_global->commit_timestamph);
- if (txn != NULL &&
- __wt_timestamp_cmp(&txn->first_commit_timestamp, &ts) < 0) {
- __wt_timestamp_set(&ts, &txn->first_commit_timestamp);
- WT_ASSERT(session, !__wt_timestamp_iszero(&ts));
+ TAILQ_FOREACH(txn, &txn_global->commit_timestamph,
+ commit_timestampq) {
+ if (txn->clear_ts_queue)
+ continue;
+ /*
+ * Compare on the first real running transaction.
+ */
+ if (__wt_timestamp_cmp(
+ &txn->first_commit_timestamp, &ts) < 0) {
+ __wt_timestamp_set(
+ &ts, &txn->first_commit_timestamp);
+ WT_ASSERT(session, !__wt_timestamp_iszero(&ts));
+ }
+ break;
}
__wt_readunlock(session, &txn_global->commit_timestamp_rwlock);
} else if (WT_STRING_MATCH("oldest", cval.str, cval.len)) {
@@ -652,7 +661,7 @@ __wt_txn_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[])
void
__wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session)
{
- WT_TXN *prev, *txn;
+ WT_TXN *qtxn, *txn, *txn_tmp;
WT_TXN_GLOBAL *txn_global;
wt_timestamp_t ts;
@@ -668,27 +677,65 @@ __wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session)
* fixed.
*/
__wt_timestamp_set(&ts, &txn->commit_timestamp);
- __wt_timestamp_set(&txn->first_commit_timestamp, &ts);
__wt_writelock(session, &txn_global->commit_timestamp_rwlock);
- prev = TAILQ_LAST(&txn_global->commit_timestamph, __wt_txn_cts_qh);
- if (prev == NULL)
- WT_STAT_CONN_INCR(session, txn_commit_queue_empty);
- for (; prev != NULL &&
- __wt_timestamp_cmp(&prev->first_commit_timestamp, &ts) > 0;
- prev = TAILQ_PREV(prev, __wt_txn_cts_qh, commit_timestampq))
- ;
- if (prev == NULL) {
+ /*
+ * If our transaction is on the queue remove it first. The timestamp
+ * may move earlier so we otherwise might not remove ourselves before
+ * finding where to insert ourselves (which would result in a list
+ * loop) and we don't want to walk more of the list than needed.
+ */
+ if (txn->clear_ts_queue) {
+ TAILQ_REMOVE(&txn_global->commit_timestamph,
+ txn, commit_timestampq);
+ WT_PUBLISH(txn->clear_ts_queue, false);
+ --txn_global->commit_timestampq_len;
+ }
+ /*
+ * Walk the list to look for where to insert our own transaction
+ * and remove any transactions that are not active. We stop when
+ * we get to the location where we want to insert.
+ */
+ if (TAILQ_EMPTY(&txn_global->commit_timestamph)) {
TAILQ_INSERT_HEAD(
&txn_global->commit_timestamph, txn, commit_timestampq);
- WT_STAT_CONN_INCR(session, txn_commit_queue_head);
- } else
- TAILQ_INSERT_AFTER(&txn_global->commit_timestamph,
- prev, txn, commit_timestampq);
+ WT_STAT_CONN_INCR(session, txn_commit_queue_empty);
+ } else {
+ TAILQ_FOREACH_SAFE(qtxn, &txn_global->commit_timestamph,
+ commit_timestampq, txn_tmp) {
+ if (qtxn->clear_ts_queue) {
+ TAILQ_REMOVE(&txn_global->commit_timestamph,
+ qtxn, commit_timestampq);
+ WT_PUBLISH(qtxn->clear_ts_queue, false);
+ --txn_global->commit_timestampq_len;
+ continue;
+ }
+ /*
+ * Only walk the list up until we get to the place where
+ * we want to insert our timestamp. Some other thread
+ * will remove any later transactions.
+ */
+ if (__wt_timestamp_cmp(
+ &qtxn->first_commit_timestamp, &ts) > 0)
+ break;
+ }
+ /*
+ * If we got to the end, then our timestamp is larger than
+ * the last element's timestamp. Insert at the end.
+ */
+ if (qtxn == NULL) {
+ TAILQ_INSERT_TAIL(&txn_global->commit_timestamph,
+ txn, commit_timestampq);
+ WT_STAT_CONN_INCR(session, txn_commit_queue_tail);
+ } else
+ TAILQ_INSERT_BEFORE(qtxn, txn, commit_timestampq);
+ }
+ __wt_timestamp_set(&txn->first_commit_timestamp, &ts);
++txn_global->commit_timestampq_len;
WT_STAT_CONN_INCR(session, txn_commit_queue_inserts);
- __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock);
+ txn->clear_ts_queue = false;
F_SET(txn, WT_TXN_HAS_TS_COMMIT | WT_TXN_PUBLIC_TS_COMMIT);
+ __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock);
}
/*
@@ -699,19 +746,22 @@ void
__wt_txn_clear_commit_timestamp(WT_SESSION_IMPL *session)
{
WT_TXN *txn;
- WT_TXN_GLOBAL *txn_global;
+ uint32_t flags;
txn = &session->txn;
- txn_global = &S2C(session)->txn_global;
if (!F_ISSET(txn, WT_TXN_PUBLIC_TS_COMMIT))
return;
+ flags = txn->flags;
+ LF_CLR(WT_TXN_PUBLIC_TS_COMMIT);
- __wt_writelock(session, &txn_global->commit_timestamp_rwlock);
- TAILQ_REMOVE(&txn_global->commit_timestamph, txn, commit_timestampq);
- --txn_global->commit_timestampq_len;
- __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock);
- F_CLR(txn, WT_TXN_PUBLIC_TS_COMMIT);
+ /*
+ * Notify other threads that our transaction is inactive and can be
+ * cleaned up safely from the commit timestamp queue whenever the next
+ * thread walks the queue. We do not need to remove it now.
+ */
+ WT_PUBLISH(txn->clear_ts_queue, true);
+ WT_PUBLISH(txn->flags, flags);
}
/*