diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/txn/txn_timestamp.c')
-rw-r--r-- | src/third_party/wiredtiger/src/txn/txn_timestamp.c | 104 |
1 files changed, 77 insertions, 27 deletions
diff --git a/src/third_party/wiredtiger/src/txn/txn_timestamp.c b/src/third_party/wiredtiger/src/txn/txn_timestamp.c index 41ac970f14e..d07bfecd47c 100644 --- a/src/third_party/wiredtiger/src/txn/txn_timestamp.c +++ b/src/third_party/wiredtiger/src/txn/txn_timestamp.c @@ -212,11 +212,20 @@ __txn_global_query_timestamp( /* Compare with the oldest running transaction. */ __wt_readlock(session, &txn_global->commit_timestamp_rwlock); - txn = TAILQ_FIRST(&txn_global->commit_timestamph); - if (txn != NULL && - __wt_timestamp_cmp(&txn->first_commit_timestamp, &ts) < 0) { - __wt_timestamp_set(&ts, &txn->first_commit_timestamp); - WT_ASSERT(session, !__wt_timestamp_iszero(&ts)); + TAILQ_FOREACH(txn, &txn_global->commit_timestamph, + commit_timestampq) { + if (txn->clear_ts_queue) + continue; + /* + * Compare on the first real running transaction. + */ + if (__wt_timestamp_cmp( + &txn->first_commit_timestamp, &ts) < 0) { + __wt_timestamp_set( + &ts, &txn->first_commit_timestamp); + WT_ASSERT(session, !__wt_timestamp_iszero(&ts)); + } + break; } __wt_readunlock(session, &txn_global->commit_timestamp_rwlock); } else if (WT_STRING_MATCH("oldest", cval.str, cval.len)) { @@ -652,7 +661,7 @@ __wt_txn_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[]) void __wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session) { - WT_TXN *prev, *txn; + WT_TXN *qtxn, *txn, *txn_tmp; WT_TXN_GLOBAL *txn_global; wt_timestamp_t ts; @@ -668,27 +677,65 @@ __wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session) * fixed. */ __wt_timestamp_set(&ts, &txn->commit_timestamp); - __wt_timestamp_set(&txn->first_commit_timestamp, &ts); __wt_writelock(session, &txn_global->commit_timestamp_rwlock); - prev = TAILQ_LAST(&txn_global->commit_timestamph, __wt_txn_cts_qh); - if (prev == NULL) - WT_STAT_CONN_INCR(session, txn_commit_queue_empty); - for (; prev != NULL && - __wt_timestamp_cmp(&prev->first_commit_timestamp, &ts) > 0; - prev = TAILQ_PREV(prev, __wt_txn_cts_qh, commit_timestampq)) - ; - if (prev == NULL) { + /* + * If our transaction is on the queue remove it first. The timestamp + * may move earlier so we otherwise might not remove ourselves before + * finding where to insert ourselves (which would result in a list + * loop) and we don't want to walk more of the list than needed. + */ + if (txn->clear_ts_queue) { + TAILQ_REMOVE(&txn_global->commit_timestamph, + txn, commit_timestampq); + WT_PUBLISH(txn->clear_ts_queue, false); + --txn_global->commit_timestampq_len; + } + /* + * Walk the list to look for where to insert our own transaction + * and remove any transactions that are not active. We stop when + * we get to the location where we want to insert. + */ + if (TAILQ_EMPTY(&txn_global->commit_timestamph)) { TAILQ_INSERT_HEAD( &txn_global->commit_timestamph, txn, commit_timestampq); - WT_STAT_CONN_INCR(session, txn_commit_queue_head); - } else - TAILQ_INSERT_AFTER(&txn_global->commit_timestamph, - prev, txn, commit_timestampq); + WT_STAT_CONN_INCR(session, txn_commit_queue_empty); + } else { + TAILQ_FOREACH_SAFE(qtxn, &txn_global->commit_timestamph, + commit_timestampq, txn_tmp) { + if (qtxn->clear_ts_queue) { + TAILQ_REMOVE(&txn_global->commit_timestamph, + qtxn, commit_timestampq); + WT_PUBLISH(qtxn->clear_ts_queue, false); + --txn_global->commit_timestampq_len; + continue; + } + /* + * Only walk the list up until we get to the place where + * we want to insert our timestamp. Some other thread + * will remove any later transactions. + */ + if (__wt_timestamp_cmp( + &qtxn->first_commit_timestamp, &ts) > 0) + break; + } + /* + * If we got to the end, then our timestamp is larger than + * the last element's timestamp. Insert at the end. + */ + if (qtxn == NULL) { + TAILQ_INSERT_TAIL(&txn_global->commit_timestamph, + txn, commit_timestampq); + WT_STAT_CONN_INCR(session, txn_commit_queue_tail); + } else + TAILQ_INSERT_BEFORE(qtxn, txn, commit_timestampq); + } + __wt_timestamp_set(&txn->first_commit_timestamp, &ts); ++txn_global->commit_timestampq_len; WT_STAT_CONN_INCR(session, txn_commit_queue_inserts); - __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock); + txn->clear_ts_queue = false; F_SET(txn, WT_TXN_HAS_TS_COMMIT | WT_TXN_PUBLIC_TS_COMMIT); + __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock); } /* @@ -699,19 +746,22 @@ void __wt_txn_clear_commit_timestamp(WT_SESSION_IMPL *session) { WT_TXN *txn; - WT_TXN_GLOBAL *txn_global; + uint32_t flags; txn = &session->txn; - txn_global = &S2C(session)->txn_global; if (!F_ISSET(txn, WT_TXN_PUBLIC_TS_COMMIT)) return; + flags = txn->flags; + LF_CLR(WT_TXN_PUBLIC_TS_COMMIT); - __wt_writelock(session, &txn_global->commit_timestamp_rwlock); - TAILQ_REMOVE(&txn_global->commit_timestamph, txn, commit_timestampq); - --txn_global->commit_timestampq_len; - __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock); - F_CLR(txn, WT_TXN_PUBLIC_TS_COMMIT); + /* + * Notify other threads that our transaction is inactive and can be + * cleaned up safely from the commit timestamp queue whenever the next + * thread walks the queue. We do not need to remove it now. + */ + WT_PUBLISH(txn->clear_ts_queue, true); + WT_PUBLISH(txn->flags, flags); } /* |