summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Gorrod <alexander.gorrod@mongodb.com>2017-07-19 15:42:19 +1000
committerGitHub <noreply@github.com>2017-07-19 15:42:19 +1000
commit4a2d9ed4de6616e28203369e03e9188713379f94 (patch)
tree234b87f6b9b49865f531187a68beba85d0519b63
parent454655b0d663fc6cec73e177fbb3b5ed163ed27c (diff)
downloadmongo-4a2d9ed4de6616e28203369e03e9188713379f94.tar.gz
WT-3381 Improve concurrency in the transaction subsystem (#3515)
Removes timestamps from WT_TXN_STATE as a step towards merging WT_TXN_STATE with WT_TXN. This change has transactions add themselves to global, sorted lists (one ordered by commit timestamp, the other ordered by read timestamp). Each list has its own rwlock, and scans have been replaced by peeking at the first (oldest) transaction in the list. The list's lock protects the relevant fields in WT_TXN (i.e., no thread will read txn->commit_timestamp unless the transaction is in the commit_timestamp list). This should reduce contention for txn_global->rwlock, which can be further decomposed in future (e.g., eliminating the scan for __wt_txn_update_oldest and replacing __wt_txn_get_snapshot with a loop that makes a copy of running transaction IDs in order).
-rw-r--r--src/include/extern.h4
-rw-r--r--src/include/txn.h17
-rw-r--r--src/txn/txn.c51
-rw-r--r--src/txn/txn_ckpt.c10
-rw-r--r--src/txn/txn_timestamp.c176
5 files changed, 184 insertions, 74 deletions
diff --git a/src/include/extern.h b/src/include/extern.h
index 416979142e1..8b48fd587bd 100644
--- a/src/include/extern.h
+++ b/src/include/extern.h
@@ -791,3 +791,7 @@ extern int __wt_txn_global_query_timestamp( WT_SESSION_IMPL *session, char *hex_
extern int __wt_txn_update_pinned_timestamp(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_txn_global_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[]) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
extern int __wt_txn_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[]) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result));
+extern void __wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session);
+extern void __wt_txn_clear_commit_timestamp(WT_SESSION_IMPL *session);
+extern void __wt_txn_set_read_timestamp(WT_SESSION_IMPL *session);
+extern void __wt_txn_clear_read_timestamp(WT_SESSION_IMPL *session);
diff --git a/src/include/txn.h b/src/include/txn.h
index bf2d9aa21ef..e4cc0b04046 100644
--- a/src/include/txn.h
+++ b/src/include/txn.h
@@ -74,9 +74,6 @@ struct __wt_txn_state {
volatile uint64_t pinned_id;
volatile uint64_t metadata_pinned;
- WT_DECL_TIMESTAMP(commit_timestamp)
- WT_DECL_TIMESTAMP(read_timestamp)
-
WT_CACHE_LINE_PAD_END
};
@@ -103,6 +100,14 @@ struct __wt_txn_global {
/* Protects the active transaction states. */
WT_RWLOCK rwlock;
+ /* List of transactions sorted by commit timestamp. */
+ WT_RWLOCK commit_timestamp_rwlock;
+ TAILQ_HEAD(__wt_txn_cts_qh, __wt_txn) commit_timestamph;
+
+ /* List of transactions sorted by read timestamp. */
+ WT_RWLOCK read_timestamp_rwlock;
+ TAILQ_HEAD(__wt_txn_rts_qh, __wt_txn) read_timestamph;
+
/*
* Track information about the running checkpoint. The transaction
* snapshot used when checkpointing are special. Checkpoints can run
@@ -117,6 +122,7 @@ struct __wt_txn_global {
volatile bool checkpoint_running; /* Checkpoint running */
volatile uint32_t checkpoint_id; /* Checkpoint's session ID */
WT_TXN_STATE checkpoint_state; /* Checkpoint's txn state */
+ WT_TXN *checkpoint_txn; /* Checkpoint's txn structure */
volatile uint64_t metadata_pinned; /* Oldest ID for metadata */
@@ -194,8 +200,11 @@ struct __wt_txn {
uint32_t snapshot_count;
uint32_t txn_logsync; /* Log sync configuration */
- WT_DECL_TIMESTAMP(read_timestamp)
WT_DECL_TIMESTAMP(commit_timestamp)
+ WT_DECL_TIMESTAMP(read_timestamp)
+
+ TAILQ_ENTRY(__wt_txn) commit_timestampq;
+ TAILQ_ENTRY(__wt_txn) read_timestampq;
/* Array of modifications by this transaction. */
WT_TXN_OP *mod;
diff --git a/src/txn/txn.c b/src/txn/txn.c
index 5fae673ec65..18a3c0021f0 100644
--- a/src/txn/txn.c
+++ b/src/txn/txn.c
@@ -441,25 +441,21 @@ __wt_txn_config(WT_SESSION_IMPL *session, const char *cfg[])
if (cval.len > 0) {
#ifdef HAVE_TIMESTAMPS
WT_TXN_GLOBAL *txn_global = &S2C(session)->txn_global;
- WT_TXN_STATE *txn_state = WT_SESSION_TXN_STATE(session);
wt_timestamp_t oldest_timestamp;
WT_RET(__wt_txn_parse_timestamp(
session, "read", txn->read_timestamp, &cval));
- __wt_writelock(session, &txn_global->rwlock);
+ __wt_readlock(session, &txn_global->rwlock);
__wt_timestamp_set(
oldest_timestamp, txn_global->oldest_timestamp);
+ __wt_readunlock(session, &txn_global->rwlock);
if (__wt_timestamp_cmp(
- txn->read_timestamp, oldest_timestamp) < 0) {
- __wt_writeunlock(session, &txn_global->rwlock);
+ txn->read_timestamp, oldest_timestamp) < 0)
WT_RET_MSG(session, EINVAL,
"read timestamp %.*s older than oldest timestamp",
(int)cval.len, cval.str);
- }
- __wt_timestamp_set(
- txn_state->read_timestamp, txn->read_timestamp);
- __wt_writeunlock(session, &txn_global->rwlock);
- F_SET(txn, WT_TXN_HAS_TS_READ);
+
+ __wt_txn_set_read_timestamp(session);
txn->isolation = WT_ISO_SNAPSHOT;
#else
WT_RET_MSG(session, EINVAL, "read_timestamp requires a "
@@ -532,24 +528,16 @@ __wt_txn_release(WT_SESSION_IMPL *session)
WT_ASSERT(session, txn_state->id != WT_TXN_NONE &&
txn->id != WT_TXN_NONE);
- __wt_writelock(session, &txn_global->rwlock);
WT_PUBLISH(txn_state->id, WT_TXN_NONE);
-#ifdef HAVE_TIMESTAMPS
- if (F_ISSET(txn, WT_TXN_HAS_TS_COMMIT)) {
- /*
- * We rely on a non-zero ID to protect our published
- * commit timestamp. Otherwise we would need a lock
- * here.
- */
- WT_WRITE_BARRIER();
- __wt_timestamp_set_zero(txn_state->commit_timestamp);
- }
-#endif
- __wt_writeunlock(session, &txn_global->rwlock);
txn->id = WT_TXN_NONE;
}
+#ifdef HAVE_TIMESTAMPS
+ __wt_txn_clear_commit_timestamp(session);
+ __wt_txn_clear_read_timestamp(session);
+#endif
+
/* Free the scratch buffer allocated for logging. */
__wt_logrec_free(session, &txn->logrec);
@@ -581,7 +569,6 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
WT_TXN_OP *op;
#ifdef HAVE_TIMESTAMPS
WT_TXN_GLOBAL *txn_global = &S2C(session)->txn_global;
- WT_TXN_STATE *txn_state = WT_SESSION_TXN_STATE(session);
wt_timestamp_t prev_commit_timestamp;
bool update_timestamp;
#endif
@@ -604,13 +591,7 @@ __wt_txn_commit(WT_SESSION_IMPL *session, const char *cfg[])
#ifdef HAVE_TIMESTAMPS
WT_ERR(__wt_txn_parse_timestamp(
session, "commit", txn->commit_timestamp, &cval));
- if (!F_ISSET(txn, WT_TXN_HAS_TS_COMMIT)) {
- __wt_writelock(session, &txn_global->rwlock);
- __wt_timestamp_set(txn_state->commit_timestamp,
- txn->commit_timestamp);
- __wt_writeunlock(session, &txn_global->rwlock);
- F_SET(txn, WT_TXN_HAS_TS_COMMIT);
- }
+ __wt_txn_set_commit_timestamp(session);
#else
WT_ERR_MSG(session, EINVAL, "commit_timestamp requires a "
"version of WiredTiger built with timestamp support");
@@ -947,8 +928,14 @@ __wt_txn_global_init(WT_SESSION_IMPL *session, const char *cfg[])
WT_RET(__wt_spin_init(
session, &txn_global->id_lock, "transaction id lock"));
WT_RET(__wt_rwlock_init(session, &txn_global->rwlock));
- WT_RET(__wt_rwlock_init(session, &txn_global->nsnap_rwlock));
+ WT_RET(__wt_rwlock_init(session, &txn_global->commit_timestamp_rwlock));
+ TAILQ_INIT(&txn_global->commit_timestamph);
+
+ WT_RET(__wt_rwlock_init(session, &txn_global->read_timestamp_rwlock));
+ TAILQ_INIT(&txn_global->read_timestamph);
+
+ WT_RET(__wt_rwlock_init(session, &txn_global->nsnap_rwlock));
txn_global->nsnap_oldest_id = WT_TXN_NONE;
TAILQ_INIT(&txn_global->nsnaph);
@@ -979,6 +966,8 @@ __wt_txn_global_destroy(WT_SESSION_IMPL *session)
__wt_spin_destroy(session, &txn_global->id_lock);
__wt_rwlock_destroy(session, &txn_global->rwlock);
+ __wt_rwlock_destroy(session, &txn_global->commit_timestamp_rwlock);
+ __wt_rwlock_destroy(session, &txn_global->read_timestamp_rwlock);
__wt_rwlock_destroy(session, &txn_global->nsnap_rwlock);
__wt_free(session, txn_global->states);
}
diff --git a/src/txn/txn_ckpt.c b/src/txn/txn_ckpt.c
index 73ad96b5518..519d3469865 100644
--- a/src/txn/txn_ckpt.c
+++ b/src/txn/txn_ckpt.c
@@ -625,6 +625,7 @@ __checkpoint_prepare(WT_SESSION_IMPL *session, const char *cfg[])
*/
__wt_writelock(session, &txn_global->rwlock);
txn_global->checkpoint_state = *txn_state;
+ txn_global->checkpoint_txn = txn;
txn_global->checkpoint_state.pinned_id = WT_MIN(txn->id, txn->snap_min);
/*
@@ -646,6 +647,15 @@ __checkpoint_prepare(WT_SESSION_IMPL *session, const char *cfg[])
txn_state->metadata_pinned = WT_TXN_NONE;
__wt_writeunlock(session, &txn_global->rwlock);
+#ifdef HAVE_TIMESTAMPS
+ /*
+ * Now that the checkpoint transaction is published, clear it from the
+ * regular lists.
+ */
+ __wt_txn_clear_commit_timestamp(session);
+ __wt_txn_clear_read_timestamp(session);
+#endif
+
/*
* Get a list of handles we want to flush; for named checkpoints this
* may pull closed objects into the session cache.
diff --git a/src/txn/txn_timestamp.c b/src/txn/txn_timestamp.c
index efaeac5a580..a975341c189 100644
--- a/src/txn/txn_timestamp.c
+++ b/src/txn/txn_timestamp.c
@@ -80,9 +80,8 @@ __txn_global_query_timestamp(
{
WT_CONNECTION_IMPL *conn;
WT_CONFIG_ITEM cval;
+ WT_TXN *txn;
WT_TXN_GLOBAL *txn_global;
- WT_TXN_STATE *txn_state;
- uint32_t i, session_cnt;
conn = S2C(session);
txn_global = &conn->txn_global;
@@ -93,41 +92,36 @@ __txn_global_query_timestamp(
return (WT_NOTFOUND);
__wt_readlock(session, &txn_global->rwlock);
__wt_timestamp_set(ts, txn_global->commit_timestamp);
- WT_ORDERED_READ(session_cnt, conn->session_cnt);
- for (i = 0, txn_state = txn_global->states; i < session_cnt;
- i++, txn_state++) {
- if (txn_state->id == WT_TXN_NONE ||
- __wt_timestamp_iszero(txn_state->commit_timestamp))
- continue;
- if (__wt_timestamp_cmp(
- txn_state->commit_timestamp, ts) < 0)
- __wt_timestamp_set(
- ts, txn_state->commit_timestamp);
- }
__wt_readunlock(session, &txn_global->rwlock);
+
+ /* Compare with the oldest running transaction. */
+ __wt_readlock(session, &txn_global->commit_timestamp_rwlock);
+ txn = TAILQ_FIRST(&txn_global->commit_timestamph);
+ if (txn != NULL &&
+ __wt_timestamp_cmp(txn->commit_timestamp, ts) < 0)
+ __wt_timestamp_set(ts, txn->commit_timestamp);
+ __wt_readunlock(session, &txn_global->commit_timestamp_rwlock);
} else if (WT_STRING_MATCH("oldest_reader", cval.str, cval.len)) {
if (!txn_global->has_oldest_timestamp)
return (WT_NOTFOUND);
__wt_readlock(session, &txn_global->rwlock);
__wt_timestamp_set(ts, txn_global->oldest_timestamp);
- /* Look at running checkpoints. */
- txn_state = &txn_global->checkpoint_state;
- if (txn_state->pinned_id != WT_TXN_NONE &&
- !__wt_timestamp_iszero(txn_state->read_timestamp) &&
- __wt_timestamp_cmp(txn_state->read_timestamp, ts) < 0)
- __wt_timestamp_set(ts, txn_state->read_timestamp);
- WT_ORDERED_READ(session_cnt, conn->session_cnt);
- for (i = 0, txn_state = txn_global->states; i < session_cnt;
- i++, txn_state++) {
- if (txn_state->pinned_id == WT_TXN_NONE ||
- __wt_timestamp_iszero(txn_state->read_timestamp))
- continue;
- if (__wt_timestamp_cmp(
- txn_state->read_timestamp, ts) < 0)
- __wt_timestamp_set(
- ts, txn_state->read_timestamp);
- }
+
+ /* Check for a running checkpoint */
+ txn = txn_global->checkpoint_txn;
+ if (txn_global->checkpoint_state.pinned_id != WT_TXN_NONE &&
+ !__wt_timestamp_iszero(txn->read_timestamp) &&
+ __wt_timestamp_cmp(txn->read_timestamp, ts) < 0)
+ __wt_timestamp_set(ts, txn->read_timestamp);
__wt_readunlock(session, &txn_global->rwlock);
+
+ /* Look for the oldest ordinary reader. */
+ __wt_readlock(session, &txn_global->read_timestamp_rwlock);
+ txn = TAILQ_FIRST(&txn_global->read_timestamph);
+ if (txn != NULL &&
+ __wt_timestamp_cmp(txn->read_timestamp, ts) < 0)
+ __wt_timestamp_set(ts, txn->read_timestamp);
+ __wt_readunlock(session, &txn_global->read_timestamp_rwlock);
} else
return (__wt_illegal_value(session, NULL));
@@ -149,6 +143,11 @@ __wt_txn_global_query_timestamp(
size_t len;
uint8_t *tsp;
+ /*
+ * Keep clang-analyzer happy: it can't tell that ts will be set
+ * whenever the call below succeeds.
+ */
+ WT_CLEAR(ts);
WT_RET(__txn_global_query_timestamp(session, ts, cfg));
/* Avoid memory allocation: set up an item guaranteed large enough. */
@@ -285,18 +284,10 @@ __wt_txn_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[])
if (ret == 0 && cval.len != 0) {
#ifdef HAVE_TIMESTAMPS
WT_TXN *txn = &session->txn;
- WT_TXN_GLOBAL *txn_global = &S2C(session)->txn_global;
- WT_TXN_STATE *txn_state = WT_SESSION_TXN_STATE(session);
WT_RET(__wt_txn_parse_timestamp(
session, "commit", txn->commit_timestamp, &cval));
- if (!F_ISSET(txn, WT_TXN_HAS_TS_COMMIT)) {
- __wt_writelock(session, &txn_global->rwlock);
- __wt_timestamp_set(txn_state->commit_timestamp,
- txn->commit_timestamp);
- __wt_writeunlock(session, &txn_global->rwlock);
- F_SET(txn, WT_TXN_HAS_TS_COMMIT);
- }
+ __wt_txn_set_commit_timestamp(session);
#else
WT_RET_MSG(session, EINVAL, "commit_timestamp requires a "
"version of WiredTiger built with timestamp support");
@@ -306,3 +297,110 @@ __wt_txn_set_timestamp(WT_SESSION_IMPL *session, const char *cfg[])
return (0);
}
+
+/*
+ * __wt_txn_set_commit_timestamp --
+ * Publish a transaction's commit timestamp.
+ */
+void
+__wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session)
+{
+ WT_TXN *prev, *txn;
+ WT_TXN_GLOBAL *txn_global;
+
+ txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
+
+ if (F_ISSET(txn, WT_TXN_HAS_TS_COMMIT))
+ return;
+
+ __wt_writelock(session, &txn_global->commit_timestamp_rwlock);
+ for (prev = TAILQ_LAST(&txn_global->commit_timestamph, __wt_txn_cts_qh);
+ prev != NULL && __wt_timestamp_cmp(
+ prev->commit_timestamp, txn->commit_timestamp) > 0;
+ prev = TAILQ_PREV(prev, __wt_txn_cts_qh, commit_timestampq))
+ ;
+ if (prev == NULL)
+ TAILQ_INSERT_HEAD(
+ &txn_global->commit_timestamph, txn, commit_timestampq);
+ else
+ TAILQ_INSERT_AFTER(&txn_global->commit_timestamph,
+ prev, txn, commit_timestampq);
+ __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock);
+ F_SET(txn, WT_TXN_HAS_TS_COMMIT);
+}
+
+/*
+ * __wt_txn_clear_commit_timestamp --
+ * Clear a transaction's published commit timestamp.
+ */
+void
+__wt_txn_clear_commit_timestamp(WT_SESSION_IMPL *session)
+{
+ WT_TXN *txn;
+ WT_TXN_GLOBAL *txn_global;
+
+ txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
+
+ if (!F_ISSET(txn, WT_TXN_HAS_TS_COMMIT))
+ return;
+
+ __wt_writelock(session, &txn_global->commit_timestamp_rwlock);
+ TAILQ_REMOVE(&txn_global->commit_timestamph, txn, commit_timestampq);
+ __wt_writeunlock(session, &txn_global->commit_timestamp_rwlock);
+}
+
+/*
+ * __wt_txn_set_read_timestamp --
+ * Publish a transaction's read timestamp.
+ */
+void
+__wt_txn_set_read_timestamp(WT_SESSION_IMPL *session)
+{
+ WT_TXN *prev, *txn;
+ WT_TXN_GLOBAL *txn_global;
+
+ txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
+
+ if (F_ISSET(txn, WT_TXN_HAS_TS_READ))
+ return;
+
+ __wt_writelock(session, &txn_global->read_timestamp_rwlock);
+ for (prev = TAILQ_LAST(&txn_global->read_timestamph, __wt_txn_rts_qh);
+ prev != NULL && __wt_timestamp_cmp(
+ prev->read_timestamp, txn->read_timestamp) > 0;
+ prev = TAILQ_PREV(prev, __wt_txn_rts_qh, read_timestampq))
+ ;
+ if (prev == NULL)
+ TAILQ_INSERT_HEAD(
+ &txn_global->read_timestamph, txn, read_timestampq);
+ else
+ TAILQ_INSERT_AFTER(
+ &txn_global->read_timestamph, prev, txn, read_timestampq);
+ __wt_writeunlock(session, &txn_global->read_timestamp_rwlock);
+ F_SET(txn, WT_TXN_HAS_TS_READ);
+}
+
+/*
+ * __wt_txn_clear_read_timestamp --
+ * Clear a transaction's published read timestamp.
+ */
+void
+__wt_txn_clear_read_timestamp(WT_SESSION_IMPL *session)
+{
+ WT_TXN *txn;
+ WT_TXN_GLOBAL *txn_global;
+
+ txn = &session->txn;
+ txn_global = &S2C(session)->txn_global;
+
+ if (!F_ISSET(txn, WT_TXN_HAS_TS_READ))
+ return;
+
+ __wt_writelock(session, &txn_global->read_timestamp_rwlock);
+ TAILQ_REMOVE(&txn_global->read_timestamph, txn, read_timestampq);
+ __wt_writeunlock(session, &txn_global->read_timestamp_rwlock);
+ F_CLR(txn, WT_TXN_HAS_TS_READ);
+}