summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2019-03-07 17:11:12 -0500
committerA. Jesse Jiryu Davis <jesse@mongodb.com>2019-03-21 21:22:24 -0400
commit78eaa3cf538764d5aa5a09c5997532a4c3b2bca8 (patch)
tree1b5fcc32ad4b9cc2369b9fcc7ae95be2b09da3f7
parent9fa4a356cc1d89adc1edd4321117503ce90e2d4b (diff)
downloadmongo-78eaa3cf538764d5aa5a09c5997532a4c3b2bca8.tar.gz
SERVER-39679 Get oldest transaction time when snapshotting
-rw-r--r--jstests/replsets/rollover_preserves_active_txns.js126
-rw-r--r--src/mongo/db/db.cpp12
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp2
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp2
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h11
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp11
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h7
-rw-r--r--src/mongo/db/storage/storage_engine.h31
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp157
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h20
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp41
-rw-r--r--src/mongo/db/transaction_participant.cpp69
-rw-r--r--src/mongo/db/transaction_participant.h9
-rw-r--r--src/mongo/db/transaction_participant_test.cpp89
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp4
15 files changed, 480 insertions, 111 deletions
diff --git a/jstests/replsets/rollover_preserves_active_txns.js b/jstests/replsets/rollover_preserves_active_txns.js
new file mode 100644
index 00000000000..e6743c5f25a
--- /dev/null
+++ b/jstests/replsets/rollover_preserves_active_txns.js
@@ -0,0 +1,126 @@
+/**
+ * When a primary's oplog size exceeds the configured maximum, it must truncate the oplog only up to
+ * the oldest active transaction timestamp at the time of the last stable checkpoint. The first
+ * oplog entry that belongs to a prepared uncommitted transaction is preserved, and all entries
+ * after it.
+ *
+ * This tests the oldestActiveTransactionTimestamp, which is calculated from the "startTimestamp"
+ * field of documents in the config.transactions collection.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+ "use strict";
+ load("jstests/core/txns/libs/prepare_helpers.js");
+
+ const oplogSizeMB = 1;
+ const oplogSizeBytes = oplogSizeMB * 1024 * 1024;
+ const tenKB = new Array(10 * 1024).join("a");
+
+ // A new replica set for both the commit and abort tests to ensure the same clean state.
+ function doTest(commitOrAbort) {
+ const replSet = new ReplSetTest({
+ // Oplog can be truncated each "sync" cycle. Increase its frequency to once per second.
+ nodeOptions: {syncdelay: 1},
+ nodes: 2
+ });
+
+ replSet.startSet({oplogSize: oplogSizeMB});
+ replSet.initiate();
+
+ const primary = replSet.getPrimary();
+ const secondary = replSet.getSecondary();
+ const primaryOplog = primary.getDB("local").oplog.rs;
+ assert.lte(primaryOplog.dataSize(), oplogSizeBytes);
+ const secondaryOplog = secondary.getDB("local").oplog.rs;
+ assert.lte(secondaryOplog.dataSize(), oplogSizeBytes);
+
+ const coll = primary.getDB("test").test;
+ assert.commandWorked(coll.insert({}, {writeConcern: {w: "majority"}}));
+
+ jsTestLog("Prepare a transaction");
+
+ const session = primary.startSession();
+ session.startTransaction();
+ assert.commandWorked(session.getDatabase("test").test.insert({myTransaction: 1}));
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+
+ jsTestLog("Get transaction entry from config.transactions");
+
+ const txnEntry = primary.getDB("config").transactions.findOne();
+ assert.eq(txnEntry.startOpTime.ts, prepareTimestamp, tojson(txnEntry));
+
+ assert.soonNoExcept(() => {
+ const secondaryTxnEntry = secondary.getDB("config").transactions.findOne();
+ assert.eq(secondaryTxnEntry, txnEntry, tojson(secondaryTxnEntry));
+ return true;
+ });
+
+ jsTestLog("Find prepare oplog entry");
+
+ const oplogEntry = primaryOplog.findOne({prepare: true});
+ assert.eq(oplogEntry.ts, prepareTimestamp, tojson(oplogEntry));
+ // Must already be written on secondary, since the config.transactions entry is.
+ const secondaryOplogEntry = secondaryOplog.findOne({prepare: true});
+ assert.eq(secondaryOplogEntry.ts, prepareTimestamp, tojson(secondaryOplogEntry));
+
+ jsTestLog("Insert documents until oplog exceeds oplogSize");
+
+ // Oplog with prepared txn grows indefinitely - let it reach twice its supposed max size.
+ while (primaryOplog.dataSize() <= 2 * oplogSizeBytes) {
+ assert.commandWorked(coll.insert({tenKB: tenKB}));
+ }
+
+ jsTestLog(
+ `Oplog dataSize = ${primaryOplog.dataSize()}, check the prepare entry still exists`);
+
+ assert.eq(oplogEntry, primaryOplog.findOne({prepare: true}));
+ assert.soon(() => {
+ return secondaryOplog.dataSize() > oplogSizeBytes;
+ });
+ assert.eq(oplogEntry, secondaryOplog.findOne({prepare: true}));
+
+ if (commitOrAbort === "commit") {
+ jsTestLog("Commit prepared transaction and wait for oplog to shrink to max oplogSize");
+ PrepareHelpers.commitTransactionAfterPrepareTS(session, prepareTimestamp);
+ } else if (commitOrAbort === "abort") {
+ jsTestLog("Abort prepared transaction and wait for oplog to shrink to max oplogSize");
+ session.abortTransaction_forTesting();
+ } else {
+ throw new Error(`Unrecognized value for commitOrAbort: ${commitOrAbort}`);
+ }
+
+ jsTestLog("Add writes after transaction finished to trigger oplog reclamation");
+
+ // Old entries are reclaimed when oplog size reaches new milestone. With a 1MB oplog,
+ // milestones are every 0.1 MB (see WiredTigerRecordStore::OplogStones::OplogStones) so
+ // write about 0.2 MB to be certain.
+ for (var i = 0; i < 200; i++) {
+ assert.commandWorked(coll.insert({tenKB: tenKB}));
+ }
+
+ jsTestLog("Waiting for oplog to shrink to 1MB");
+
+ for (let [nodeName, oplog] of[["primary", primaryOplog], ["secondary", secondaryOplog]]) {
+ assert.soon(function() {
+ const dataSize = oplog.dataSize();
+ const prepareEntryRemoved = (oplog.findOne({prepare: true}) === null);
+ print(
+ `${nodeName} oplog dataSize: ${dataSize}, prepare entry removed: ${prepareEntryRemoved}`);
+ // The oplog milestone system allows the oplog to grow to 110% its max size.
+ if (dataSize < 1.1 * oplogSizeBytes && prepareEntryRemoved) {
+ return true;
+ }
+
+ assert.commandWorked(coll.insert({tenKB: tenKB}, {writeConcern: {w: "majority"}}));
+ return false;
+ }, `waiting for ${nodeName} oplog reclamation`, ReplSetTest.kDefaultTimeoutMS, 1000);
+ }
+
+ replSet.stopSet();
+ }
+
+ doTest("commit");
+ doTest("abort");
+})();
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index a3dc258684c..5df53cc0ef3 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -132,6 +132,7 @@
#include "mongo/db/storage/storage_engine_lock_file.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/system_index.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/db/ttl.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/network_connection_hook.h"
@@ -522,9 +523,16 @@ ExitCode _initAndListen(int listenPort) {
startFreeMonitoring(serviceContext);
+ auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get());
+ invariant(replCoord);
+ if (replCoord->isReplEnabled()) {
+ storageEngine->setOldestActiveTransactionTimestampCallback(
+ TransactionParticipant::getOldestActiveTimestamp);
+ }
+
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
// Note: For replica sets, ShardingStateRecovery happens on transition to primary.
- if (!repl::ReplicationCoordinator::get(startupOpCtx.get())->isReplEnabled()) {
+ if (!replCoord->isReplEnabled()) {
if (ShardingState::get(startupOpCtx.get())->enabled()) {
uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get()));
}
@@ -553,7 +561,7 @@ ExitCode _initAndListen(int listenPort) {
stdx::make_unique<LogicalTimeValidator>(keyManager));
}
- repl::ReplicationCoordinator::get(startupOpCtx.get())->startup(startupOpCtx.get());
+ replCoord->startup(startupOpCtx.get());
if (getReplSetMemberInStandaloneMode(serviceContext)) {
log() << startupWarningsLog;
log() << "** WARNING: mongod started without --replSet yet document(s) are present in "
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 91e4bef476c..8a61b54f0b3 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -1423,7 +1423,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
log() << "Forcing the stable timestamp to the common point: "
<< fixUpInfo.commonPoint.getTimestamp();
opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp(
- fixUpInfo.commonPoint.getTimestamp(), boost::none, force);
+ fixUpInfo.commonPoint.getTimestamp(), force);
// We must not take a stable checkpoint until it is guaranteed to include all writes from
// before the rollback (i.e. the stable timestamp is at least the local top of oplog). In
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 166a1a08a1c..da758476930 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1073,7 +1073,7 @@ Status StorageInterfaceImpl::upgradeNonReplicatedUniqueIndexes(OperationContext*
}
void StorageInterfaceImpl::setStableTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) {
- serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName, boost::none);
+ serviceCtx->getStorageEngine()->setStableTimestamp(snapshotName);
}
void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index cdbd34697f9..621314e1a3d 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/storage/kv/kv_prefix.h"
#include "mongo/db/storage/record_store.h"
+#include "mongo/db/storage/storage_engine.h"
namespace mongo {
@@ -307,9 +308,7 @@ public:
/**
* See `StorageEngine::setStableTimestamp`
*/
- virtual void setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force) {}
+ virtual void setStableTimestamp(Timestamp stableTimestamp, bool force) {}
/**
* See `StorageEngine::setInitialDataTimestamp`
@@ -322,6 +321,12 @@ public:
virtual void setOldestTimestampFromStable() {}
/**
+ * See `StorageEngine::setOldestActiveTransactionTimestampCallback`
+ */
+ virtual void setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback callback){};
+
+ /**
* See `StorageEngine::setOldestTimestamp`
*/
virtual void setOldestTimestamp(Timestamp newOldestTimestamp, bool force) {}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index b10de937bb7..655935a3043 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -716,10 +716,8 @@ void KVStorageEngine::setJournalListener(JournalListener* jl) {
_engine->setJournalListener(jl);
}
-void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force) {
- _engine->setStableTimestamp(stableTimestamp, maximumTruncationTimestamp, force);
+void KVStorageEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) {
+ _engine->setStableTimestamp(stableTimestamp, force);
}
void KVStorageEngine::setInitialDataTimestamp(Timestamp initialDataTimestamp) {
@@ -736,6 +734,11 @@ void KVStorageEngine::setOldestTimestamp(Timestamp newOldestTimestamp) {
_engine->setOldestTimestamp(newOldestTimestamp, force);
}
+void KVStorageEngine::setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback callback) {
+ _engine->setOldestActiveTransactionTimestampCallback(callback);
+}
+
bool KVStorageEngine::isCacheUnderPressure(OperationContext* opCtx) const {
return _engine->isCacheUnderPressure(opCtx);
}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index d5d4f1dade5..c1c64a51fa8 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -146,9 +146,7 @@ public:
virtual void cleanShutdown();
- virtual void setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force = false) override;
+ virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) override;
virtual void setInitialDataTimestamp(Timestamp initialDataTimestamp) override;
@@ -156,6 +154,9 @@ public:
virtual void setOldestTimestamp(Timestamp newOldestTimestamp) override;
+ virtual void setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback) override;
+
virtual bool isCacheUnderPressure(OperationContext* opCtx) const override;
virtual void setCachePressureForTest(int pressure) override;
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 1f3f111340b..74f6687976c 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/storage/temporary_record_store.h"
+#include "mongo/util/functional.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -57,6 +58,15 @@ class StorageEngineMetadata;
class StorageEngine {
public:
/**
+ * When the storage engine needs to know how much oplog to preserve for the sake of active
+ * transactions, it executes a callback that returns either the oldest active transaction
+ * timestamp, or boost::none if there is no active transaction, or an error if it fails.
+ */
+ using OldestActiveTransactionTimestampResult = StatusWith<boost::optional<Timestamp>>;
+ using OldestActiveTransactionTimestampCallback =
+ std::function<OldestActiveTransactionTimestampResult(Timestamp stableTimestamp)>;
+
+ /**
* The interface for creating new instances of storage engines.
*
* A storage engine provides an instance of this class (along with an associated
@@ -416,19 +426,8 @@ public:
* Sets the highest timestamp at which the storage engine is allowed to take a checkpoint. This
* timestamp must not decrease unless force=true is set, in which case we force the stable
* timestamp, the oldest timestamp, and the commit timestamp backward.
- *
- * The maximumTruncationTimestamp (and newer) must not be truncated from the oplog in order to
- * recover from the `stableTimestamp`. `boost::none` implies there are no additional
- * constraints to what may be truncated.
- *
- * For proper truncation of the oplog, this method requires min(stableTimestamp,
- * maximumTruncationTimestamp) to be monotonically increasing (where `min(stableTimestamp,
- * boost::none) => stableTimestamp`). Otherwise truncation can race and remove a document
- * before a call to this method protects it.
*/
- virtual void setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force = false) {}
+ virtual void setStableTimestamp(Timestamp stableTimestamp, bool force = false) {}
/**
* Tells the storage engine the timestamp of the data at startup. This is necessary because
@@ -455,6 +454,14 @@ public:
virtual void setOldestTimestamp(Timestamp timestamp) {}
/**
+ * Sets a callback which returns the timestamp of the oldest oplog entry involved in an
+ * active MongoDB transaction. The storage engine calls this function to determine how much
+ * oplog it must preserve.
+ */
+ virtual void setOldestActiveTransactionTimestampCallback(
+ OldestActiveTransactionTimestampCallback callback){};
+
+ /**
* Indicates whether the storage engine cache is under pressure.
*
* Retrieves a cache pressure value in the range [0, 100] from the storage engine, and compares
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index d9d07d00cd1..1655591d191 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -260,6 +260,20 @@ private:
AtomicWord<bool> _shuttingDown{false};
};
+std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) {
+ if (r.isOK()) {
+ if (r.getValue()) {
+ // Timestamp.
+ return r.getValue().value().toString();
+ } else {
+ // boost::none.
+ return "null";
+ }
+ } else {
+ return r.getStatus().toString();
+ }
+}
+
class WiredTigerKVEngine::WiredTigerCheckpointThread : public BackgroundJob {
public:
explicit WiredTigerCheckpointThread(WiredTigerKVEngine* wiredTigerKVEngine,
@@ -285,28 +299,31 @@ public:
wiredTigerGlobalOptions.checkpointDelaySecs)));
}
+ // Might have been awakened by another thread shutting us down.
+ if (_shuttingDown.load()) {
+ break;
+ }
+
const Timestamp stableTimestamp = _wiredTigerKVEngine->getStableTimestamp();
const Timestamp initialDataTimestamp = _wiredTigerKVEngine->getInitialDataTimestamp();
// The amount of oplog to keep is primarily dictated by a user setting. However, in
// unexpected cases, durable, recover to a timestamp storage engines may need to play
// forward from an oplog entry that would otherwise be truncated by the user
- // setting. Furthermore with prepared transactions, oplog entries can refer to
- // previous oplog entries.
+ // setting. Furthermore, the entries in prepared or large transactions can refer to
+ // previous entries in the same transaction.
//
- // Live (replication) rollback will replay oplogs from exactly the stable
- // timestamp. With prepared transactions, it may require some additional entries prior
- // to the stable timestamp. These requirements are summarized in
- // `getOplogNeededForRollback`. Truncating the oplog at this point is sufficient for
- // in-memory configurations, but could cause an unrecoverable scenario if the node
- // crashed and has to play from the last stable checkpoint.
+ // Live (replication) rollback will replay oplogs from exactly the stable timestamp.
+ // With prepared or large transactions, it may require some additional entries prior to
+ // the stable timestamp. These requirements are summarized in getOplogNeededForRollback.
+ // Truncating the oplog at this point is sufficient for in-memory configurations, but
+ // could cause an unrecoverable scenario if the node crashed and has to play from the
+ // last stable checkpoint.
//
// By recording the oplog needed for rollback "now", then taking a stable checkpoint,
// we can safely assume that the oplog needed for crash recovery has caught up to the
// recorded value. After the checkpoint, this value will be published such that actors
// which truncate the oplog can read an updated value.
- const Timestamp oplogNeededForRollback =
- _wiredTigerKVEngine->getOplogNeededForRollback();
try {
// Three cases:
//
@@ -331,21 +348,22 @@ public:
<< stableTimestamp.toString()
<< " InitialDataTimestamp: " << initialDataTimestamp.toString();
} else {
- // 'stableTimestamp' is the smallest possible value at which WT will take a
- // stable checkpoint. A newer stable timestamp may be used by WT if one is
- // concurrently set.
- LOG_FOR_RECOVERY(2) << "Performing stable checkpoint. StableTimestamp: "
- << stableTimestamp;
+ auto oplogNeededForRollback = _wiredTigerKVEngine->getOplogNeededForRollback();
+
+ LOG_FOR_RECOVERY(2)
+ << "Performing stable checkpoint. StableTimestamp: " << stableTimestamp
+ << ", OplogNeededForRollback: " << toString(oplogNeededForRollback);
UniqueWiredTigerSession session = _sessionCache->getSession();
WT_SESSION* s = session->getSession();
invariantWTOK(s->checkpoint(s, "use_timestamp=true"));
- // Now that the checkpoint is durable, publish the oplog needed to recover
- // from it.
- {
+ if (oplogNeededForRollback.isOK()) {
+ // Now that the checkpoint is durable, publish the oplog needed to recover
+ // from it.
stdx::lock_guard<stdx::mutex> lk(_oplogNeededForCrashRecoveryMutex);
- _oplogNeededForCrashRecovery.store(oplogNeededForRollback.asULL());
+ _oplogNeededForCrashRecovery.store(
+ oplogNeededForRollback.getValue().asULL());
}
}
} catch (const WriteConflictException&) {
@@ -639,11 +657,14 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
_journalFlusher->go();
}
+ // Until the Replication layer installs a real callback, prevent truncating the oplog.
+ setOldestActiveTransactionTimestampCallback(
+ [](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); });
+
if (!_readOnly && !_ephemeral) {
if (!_recoveryTimestamp.isNull()) {
setInitialDataTimestamp(_recoveryTimestamp);
- // The `maximumTruncationTimestamp` is not persisted, so choose a conservative value.
- setStableTimestamp(_recoveryTimestamp, Timestamp::min(), false);
+ setStableTimestamp(_recoveryTimestamp, false);
}
_checkpointThread =
@@ -1046,6 +1067,12 @@ void WiredTigerKVEngine::syncSizeInfo(bool sync) const {
}
}
+void WiredTigerKVEngine::setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback callback) {
+ stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex);
+ _oldestActiveTransactionTimestampCallback = std::move(callback);
+};
+
RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() {
return new WiredTigerRecoveryUnit(_sessionCache.get());
}
@@ -1528,9 +1555,7 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
} // namespace
-void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force) {
+void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) {
if (stableTimestamp.isNull()) {
return;
}
@@ -1578,26 +1603,9 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp,
invariant(static_cast<std::size_t>(size) < sizeof(stableTSConfigString));
invariantWTOK(_conn->set_timestamp(_conn, stableTSConfigString));
+ // After publishing a stable timestamp to WT, we can record the updated stable timestamp value
+ // for the necessary oplog to keep.
_stableTimestamp.store(stableTimestamp.asULL());
-
- // After publishing a stable timestamp to WT, we can publish the updated value for the
- // necessary oplog to keep. Calls to this method require the min(stableTimestamp,
- // maximumTruncationTimestamp) to be monotonically increasing. This allows us to safely record
- // and publish a single value without additional concurrency control.
- if (maximumTruncationTimestamp) {
- // Until we discover otherwise, assume callers expect to obey the contract for proper
- // oplog truncation.
- DEV invariant(_oplogNeededForRollback.load() <=
- std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL()));
- _oplogNeededForRollback.store(
- std::min(maximumTruncationTimestamp->asULL(), stableTimestamp.asULL()));
- } else {
- // If there is no maximumTruncationTimestamp at this stable timestamp, WT is free to
- // truncate the oplog to any value behind the last stable timestamp, once it is
- // checkpointed.
- _oplogNeededForRollback.store(stableTimestamp.asULL());
- }
-
if (_checkpointThread && !_checkpointThread->hasTriggeredFirstStableCheckpoint()) {
_checkpointThread->triggerFirstStableCheckpoint(
prevStable, Timestamp(_initialDataTimestamp.load()), stableTimestamp);
@@ -1840,8 +1848,29 @@ boost::optional<Timestamp> WiredTigerKVEngine::getLastStableRecoveryTimestamp()
return boost::none;
}
-Timestamp WiredTigerKVEngine::getOplogNeededForRollback() const {
- return Timestamp(_oplogNeededForRollback.load());
+StatusWith<Timestamp> WiredTigerKVEngine::getOplogNeededForRollback() const {
+ // Get the current stable timestamp and use it throughout this function, ignoring updates from
+ // another thread.
+ auto stableTimestamp = _stableTimestamp.load();
+
+ // Only one thread can set or execute this callback.
+ stdx::lock_guard<stdx::mutex> lk(_oldestActiveTransactionTimestampCallbackMutex);
+ boost::optional<Timestamp> oldestActiveTransactionTimestamp;
+ if (_oldestActiveTransactionTimestampCallback) {
+ auto status = _oldestActiveTransactionTimestampCallback(Timestamp(stableTimestamp));
+ if (status.isOK()) {
+ oldestActiveTransactionTimestamp.swap(status.getValue());
+ } else {
+ LOG(1) << "getting oldest active transaction timestamp: " << status.getStatus();
+ return status.getStatus();
+ }
+ }
+
+ if (oldestActiveTransactionTimestamp) {
+ return std::min(oldestActiveTransactionTimestamp.value(), Timestamp(stableTimestamp));
+ } else {
+ return Timestamp(stableTimestamp);
+ }
}
boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery() const {
@@ -1853,21 +1882,37 @@ boost::optional<Timestamp> WiredTigerKVEngine::getOplogNeededForCrashRecovery()
}
Timestamp WiredTigerKVEngine::getPinnedOplog() const {
- stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex);
- if (!storageGlobalParams.allowOplogTruncation) {
- // If oplog truncation is not allowed, then return the min timestamp so that no history is
- // ever allowed to be deleted.
- return Timestamp::min();
- }
- if (_oplogPinnedByBackup) {
- // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup.
- return _oplogPinnedByBackup.get();
+ {
+ stdx::lock_guard<stdx::mutex> lock(_oplogPinnedByBackupMutex);
+ if (!storageGlobalParams.allowOplogTruncation) {
+ // If oplog truncation is not allowed, then return the min timestamp so that no history
+ // is
+ // ever allowed to be deleted.
+ return Timestamp::min();
+ }
+ if (_oplogPinnedByBackup) {
+ // All the oplog since `_oplogPinnedByBackup` should remain intact during the backup.
+ return _oplogPinnedByBackup.get();
+ }
}
+
+ auto oplogNeededForCrashRecovery = getOplogNeededForCrashRecovery();
if (!_keepDataHistory) {
// We use rollbackViaRefetch, so we only need to pin oplog for crash recovery.
- return getOplogNeededForCrashRecovery().value_or(Timestamp::max());
+ return oplogNeededForCrashRecovery.value_or(Timestamp::max());
+ }
+
+ if (oplogNeededForCrashRecovery) {
+ return oplogNeededForCrashRecovery.value();
}
- return getOplogNeededForCrashRecovery().value_or(getOplogNeededForRollback());
+
+ auto status = getOplogNeededForRollback();
+ if (status.isOK()) {
+ return status.getValue();
+ }
+
+ // If getOplogNeededForRollback fails, don't truncate any oplog right now.
+ return Timestamp::min();
}
bool WiredTigerKVEngine::supportsReadConcernSnapshot() const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 7c5328476e5..1b1c9af1648 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -39,6 +39,7 @@
#include "mongo/bson/ordering.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
@@ -94,6 +95,9 @@ public:
return _ephemeral;
}
+ void setOldestActiveTransactionTimestampCallback(
+ StorageEngine::OldestActiveTransactionTimestampCallback callback) override;
+
RecoveryUnit* newRecoveryUnit() override;
Status createRecordStore(OperationContext* opCtx,
@@ -193,9 +197,7 @@ public:
void setJournalListener(JournalListener* jl) final;
- void setStableTimestamp(Timestamp stableTimestamp,
- boost::optional<Timestamp> maximumTruncationTimestamp,
- bool force) override;
+ void setStableTimestamp(Timestamp stableTimestamp, bool force) override;
void setInitialDataTimestamp(Timestamp initialDataTimestamp) override;
@@ -321,10 +323,11 @@ public:
/**
* Returns the minimum possible Timestamp value in the oplog that replication may need for
- * recovery in the event of a rollback. This value gets updated on every `setStableTimestamp`
- * call.
+ * recovery in the event of a rollback. This value depends on the timestamp passed to
+ * `setStableTimestamp` and on the set of active MongoDB transactions. Returns an error if it
+ * times out querying the active transctions.
*/
- Timestamp getOplogNeededForRollback() const;
+ StatusWith<Timestamp> getOplogNeededForRollback() const;
/**
* Returns the minimum possible Timestamp value in the oplog that replication may need for
@@ -405,6 +408,10 @@ private:
std::uint64_t _getCheckpointTimestamp() const;
+ mutable stdx::mutex _oldestActiveTransactionTimestampCallbackMutex;
+ StorageEngine::OldestActiveTransactionTimestampCallback
+ _oldestActiveTransactionTimestampCallback;
+
WT_CONNECTION* _conn;
WiredTigerFileVersion _fileVersion;
WiredTigerEventHandler _eventHandler;
@@ -458,7 +465,6 @@ private:
// Tracks the stable and oldest timestamps we've set on the storage engine.
AtomicWord<std::uint64_t> _oldestTimestamp;
AtomicWord<std::uint64_t> _stableTimestamp;
- AtomicWord<std::uint64_t> _oplogNeededForRollback{Timestamp::min().asULL()};
// Timestamp of data at startup. Used internally to advise checkpointing and recovery to a
// timestamp. Provided by replication layer because WT does not persist timestamps.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
index a9b6f3a5d83..9229c4df0f8 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
@@ -240,13 +240,30 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) {
_engine->setInitialDataTimestamp(Timestamp(1, 1));
wiredTigerGlobalOptions.checkpointDelaySecs = 1;
+ // Simulate the callback that queries config.transactions for the oldest active transaction.
+ boost::optional<Timestamp> oldestActiveTxnTimestamp;
+ AtomicWord<bool> callbackShouldFail{false};
+ auto callback = [&](Timestamp stableTimestamp) {
+ using ResultType = StorageEngine::OldestActiveTransactionTimestampResult;
+ if (callbackShouldFail.load()) {
+ return ResultType(ErrorCodes::ExceededTimeLimit, "timeout");
+ }
+
+ return ResultType(oldestActiveTxnTimestamp);
+ };
+
+ _engine->setOldestActiveTransactionTimestampCallback(callback);
+
// A method that will poll the WiredTigerKVEngine until it sees the amount of oplog necessary
// for crash recovery exceeds the input.
auto assertPinnedMovesSoon = [this](Timestamp newPinned) {
// If the current oplog needed for rollback does not exceed the requested pinned out, we
// cannot expect the CheckpointThread to eventually publish a sufficient crash recovery
// value.
- ASSERT_TRUE(_engine->getOplogNeededForRollback() >= newPinned);
+ auto needed = _engine->getOplogNeededForRollback();
+ if (needed.isOK()) {
+ ASSERT_TRUE(needed.getValue() >= newPinned);
+ }
// Do 100 iterations that sleep for 100 milliseconds between polls. This will wait for up
// to 10 seconds to observe an asynchronous update that iterates once per second.
@@ -264,17 +281,31 @@ TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) {
FAIL("");
};
- _engine->setStableTimestamp(Timestamp(10, 1), boost::none, false);
+ oldestActiveTxnTimestamp = boost::none;
+ _engine->setStableTimestamp(Timestamp(10, 1), false);
assertPinnedMovesSoon(Timestamp(10, 1));
- _engine->setStableTimestamp(Timestamp(20, 1), Timestamp(15, 1), false);
+ oldestActiveTxnTimestamp = Timestamp(15, 1);
+ _engine->setStableTimestamp(Timestamp(20, 1), false);
assertPinnedMovesSoon(Timestamp(15, 1));
- _engine->setStableTimestamp(Timestamp(30, 1), Timestamp(19, 1), false);
+ oldestActiveTxnTimestamp = Timestamp(19, 1);
+ _engine->setStableTimestamp(Timestamp(30, 1), false);
assertPinnedMovesSoon(Timestamp(19, 1));
- _engine->setStableTimestamp(Timestamp(30, 1), boost::none, false);
+ oldestActiveTxnTimestamp = boost::none;
+ _engine->setStableTimestamp(Timestamp(30, 1), false);
assertPinnedMovesSoon(Timestamp(30, 1));
+
+ callbackShouldFail.store(true);
+ ASSERT_NOT_OK(_engine->getOplogNeededForRollback());
+ _engine->setStableTimestamp(Timestamp(40, 1), false);
+ // Await a new checkpoint. Oplog needed for rollback does not advance.
+ sleepmillis(1100);
+ ASSERT_EQ(_engine->getOplogNeededForCrashRecovery().get(), Timestamp(30, 1));
+ _engine->setStableTimestamp(Timestamp(30, 1), false);
+ callbackShouldFail.store(false);
+ assertPinnedMovesSoon(Timestamp(40, 1));
}
std::unique_ptr<KVHarnessHelper> makeHelper() {
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 7e5d3334b3d..16cec7c5e7a 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/transaction_participant.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/test_commands_enabled.h"
@@ -332,19 +333,63 @@ void TransactionParticipant::performNoopWrite(OperationContext* opCtx, StringDat
}
}
-boost::optional<Timestamp> TransactionParticipant::getOldestActiveTimestamp(
- OperationContext* opCtx) {
- DBDirectClient client(opCtx);
- Query q(BSON(SessionTxnRecord::kStateFieldName << "prepared"));
- q.sort(SessionTxnRecord::kStartOpTimeFieldName.toString());
- auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), q);
- if (result.isEmpty()) {
- return boost::none;
- }
+StorageEngine::OldestActiveTransactionTimestampResult
+TransactionParticipant::getOldestActiveTimestamp(Timestamp stableTimestamp) {
+ // Read from config.transactions at the stable timestamp for the oldest active transaction
+ // timestamp. Use a short timeout: another thread might have the global lock e.g. to shut down
+ // the server, and it both blocks this thread from querying config.transactions and waits for
+ // this thread to terminate.
+ auto client = getGlobalServiceContext()->makeClient("OldestActiveTxnTimestamp");
+ AlternativeClientRegion acr(client);
+
+ try {
+ auto opCtx = cc().makeOperationContext();
+ auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ auto deadline = Date_t::now() + Milliseconds(100);
+ Lock::DBLock dbLock(opCtx.get(), nss.db(), MODE_IS, deadline);
+ Lock::CollectionLock collLock(opCtx.get()->lockState(), nss.toString(), MODE_IS, deadline);
+
+ auto databaseHolder = DatabaseHolder::get(opCtx.get());
+ auto db = databaseHolder->getDb(opCtx.get(), nss.db());
+ if (!db) {
+ // There is no config database, so there cannot be any active transactions.
+ return boost::none;
+ }
+
+ auto collection = db->getCollection(opCtx.get(), nss);
+ if (!collection) {
+ return boost::none;
+ }
- auto txnRecord =
- SessionTxnRecord::parse(IDLParserErrorContext("parse oldest active txn record"), result);
- return txnRecord.getStartOpTime()->getTimestamp();
+ if (!stableTimestamp.isNull()) {
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
+ stableTimestamp);
+ }
+
+ // Scan. We guess that occasional scans are cheaper than the write overhead of an index.
+ boost::optional<Timestamp> oldestTxnTimestamp;
+ auto cursor = collection->getCursor(opCtx.get());
+ while (auto record = cursor->next()) {
+ auto doc = record.get().data.toBson();
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("parse oldest active txn record"), doc);
+ if (txnRecord.getState() != DurableTxnStateEnum::kPrepared) {
+ continue;
+ }
+
+ // A prepared transaction must have a start timestamp.
+ // TODO(SERVER-40013): Handle entries with state "prepared" and no "startTimestamp".
+ invariant(txnRecord.getStartOpTime());
+ auto ts = txnRecord.getStartOpTime()->getTimestamp();
+ if (!oldestTxnTimestamp || ts < oldestTxnTimestamp.value()) {
+ oldestTxnTimestamp = ts;
+ }
+ }
+
+ return oldestTxnTimestamp;
+ } catch (const DBException&) {
+ return exceptionToStatus();
+ }
}
const LogicalSessionId& TransactionParticipant::Observer::_sessionId() const {
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 6d05089b284..2cbb0aaaf7d 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -48,6 +48,7 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/single_transaction_stats.h"
#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/transaction_metrics_observer.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/assert_util.h"
@@ -774,10 +775,12 @@ public:
/**
- * Returns the timestamp of the oldest oplog entry written across all open transactions.
- * Returns boost::none if there are no active transactions.
+ * Returns the timestamp of the oldest oplog entry written across all open transactions, at the
+ * time of the stable timestamp. Returns boost::none if there are no active transactions, or an
+ * error if it fails.
*/
- static boost::optional<Timestamp> getOldestActiveTimestamp(OperationContext* opCtx);
+ static StorageEngine::OldestActiveTransactionTimestampResult getOldestActiveTimestamp(
+ Timestamp stableTimestamp);
/**
* Append a no-op to the oplog, for cases where we haven't written in this unit of work but
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 3a20184e9dd..97070fb2fc6 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -29,6 +29,8 @@
#include "mongo/platform/basic.h"
+#include <boost/optional/optional_io.hpp>
+
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
@@ -3820,5 +3822,92 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) {
ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly());
}
+TEST_F(TxnParticipantTest, OldestActiveTransactionTimestamp) {
+ auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+
+ auto insertTxnRecord = [&](unsigned i) {
+ Timestamp ts(1, i);
+ SessionTxnRecord record;
+ record.setStartOpTime(repl::OpTime(ts, 0));
+ record.setState(DurableTxnStateEnum::kPrepared);
+ record.setSessionId(makeLogicalSessionIdForTest());
+ record.setTxnNum(1);
+ record.setLastWriteOpTime(repl::OpTime(ts, 0));
+ record.setLastWriteDate(Date_t::now());
+
+ AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X);
+ WriteUnitOfWork wuow(opCtx());
+ auto coll = autoDb.getDb()->getCollection(opCtx(), nss.ns());
+ ASSERT(coll);
+ OpDebug* const nullOpDebug = nullptr;
+ ASSERT_OK(
+ coll->insertDocument(opCtx(), InsertStatement(record.toBSON()), nullOpDebug, false));
+ wuow.commit();
+ };
+
+ auto deleteTxnRecord = [&](unsigned i) {
+ Timestamp ts(1, i);
+ AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X);
+ WriteUnitOfWork wuow(opCtx());
+ auto coll = autoDb.getDb()->getCollection(opCtx(), nss.ns());
+ ASSERT(coll);
+ auto cursor = coll->getCursor(opCtx());
+ while (auto record = cursor->next()) {
+ auto bson = record.get().data.toBson();
+ if (bson["state"].String() != "prepared"_sd) {
+ continue;
+ }
+
+ if (bson["startOpTime"]["ts"].timestamp() == ts) {
+ coll->deleteDocument(opCtx(), kUninitializedStmtId, record->id, nullptr);
+ wuow.commit();
+ return;
+ }
+ }
+ FAIL(mongoutils::str::stream() << "No prepared transaction with start timestamp (1, " << i
+ << ")");
+ };
+
+ auto oldestActiveTransactionTS = [&]() {
+ return TransactionParticipant::getOldestActiveTimestamp(Timestamp()).getValue();
+ };
+
+ auto assertOldestActiveTS = [&](boost::optional<unsigned> i) {
+ if (i.has_value()) {
+ ASSERT_EQ(Timestamp(1, i.value()), oldestActiveTransactionTS());
+ } else {
+ ASSERT_EQ(boost::none, oldestActiveTransactionTS());
+ }
+ };
+
+ assertOldestActiveTS(boost::none);
+ insertTxnRecord(1);
+ assertOldestActiveTS(1);
+ insertTxnRecord(2);
+ assertOldestActiveTS(1);
+ deleteTxnRecord(1);
+ assertOldestActiveTS(2);
+ deleteTxnRecord(2);
+ assertOldestActiveTS(boost::none);
+
+ // Add a newer transaction, then an older one, to test that order doesn't matter.
+ insertTxnRecord(4);
+ insertTxnRecord(3);
+ assertOldestActiveTS(3);
+ deleteTxnRecord(4);
+ assertOldestActiveTS(3);
+ deleteTxnRecord(3);
+ assertOldestActiveTS(boost::none);
+};
+
+TEST_F(TxnParticipantTest, OldestActiveTransactionTimestampTimeout) {
+ // Block getOldestActiveTimestamp() by locking the config database.
+ auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ AutoGetOrCreateDb autoDb(opCtx(), nss.db(), MODE_X);
+ auto statusWith = TransactionParticipant::getOldestActiveTimestamp(Timestamp());
+ ASSERT_FALSE(statusWith.isOK());
+ ASSERT_TRUE(ErrorCodes::isInterruption(statusWith.getStatus().code()));
+};
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 2a326ced824..6a45c1d0223 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -432,8 +432,8 @@ public:
void assertOldestActiveTxnTimestampEquals(const boost::optional<Timestamp>& ts,
const Timestamp& atTs) {
- OneOffRead oor(_opCtx, atTs);
- ASSERT_EQ(TransactionParticipant::getOldestActiveTimestamp(_opCtx), ts);
+ auto oldest = TransactionParticipant::getOldestActiveTimestamp(atTs);
+ ASSERT_EQ(oldest, ts);
}
void assertHasStartOpTime() {