summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/replsets/pin_history_after_restart.js91
-rw-r--r--jstests/replsets/unpin_history_after_rollback.js61
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/test_commands.cpp110
-rw-r--r--src/mongo/db/commands/test_commands.h51
-rw-r--r--src/mongo/db/mongod_main.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.cpp128
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h19
-rw-r--r--src/mongo/db/storage/SConscript3
-rw-r--r--src/mongo/db/storage/durable_history_pin.cpp104
-rw-r--r--src/mongo/db/storage/durable_history_pin.h82
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h3
-rw-r--r--src/mongo/db/storage/storage_engine.h6
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp10
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h3
-rw-r--r--src/mongo/db/storage/storage_engine_mock.h3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp56
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h11
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp20
24 files changed, 764 insertions, 27 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index ede2af1fbfb..b537ddc91f9 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -414,6 +414,7 @@ let viewsCommandTests = {
movePrimary: {skip: "Tested in sharding/movePrimary1.js"},
multicast: {skip: isUnrelated},
netstat: {skip: isAnInternalCommand},
+ pinHistoryReplicated: {skip: isAnInternalCommand},
ping: {command: {ping: 1}},
planCacheClear: {command: {planCacheClear: "view"}, expectFailure: true},
planCacheClearFilters: {command: {planCacheClearFilters: "view"}, expectFailure: true},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index c636fcecc20..bfdcaa1eb47 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -239,6 +239,7 @@ const allCommands = {
},
mergeChunks: {skip: isPrimaryOnly},
moveChunk: {skip: isPrimaryOnly},
+ pinHistoryReplicated: {skip: isAnInternalCommand},
ping: {skip: isNotAUserDataRead},
planCacheClear: {skip: isNotAUserDataRead},
planCacheClearFilters: {skip: isNotAUserDataRead},
diff --git a/jstests/replsets/pin_history_after_restart.js b/jstests/replsets/pin_history_after_restart.js
new file mode 100644
index 00000000000..38c0265484e
--- /dev/null
+++ b/jstests/replsets/pin_history_after_restart.js
@@ -0,0 +1,91 @@
+/**
+ * This test uses the test only `pinHistoryReplicated` command to exercise the DurableHistoryPins
+ * across restart.
+ *
+ * The `pinHistoryReplicated` command will pin the oldest timestamp at the requested time (with an
+ * optional rounding up to oldest). If the pin is successfully, the pinned time is written to a
+ * document inside `mdb_testing.pinned_timestamp`.
+ *
+ * When the `TestingDurableHistoryPin` is registered at startup, it will repin the oldest timestamp
+ * at the minimum of all documents written to `mdb_testing.pinned_timestamp`.
+ *
+ * This test does the following:
+ *
+ * 1) Pin the timestamp with an artificially small value to take advantage of rounding.
+ * 2) See that the timestamp is pinned via serverStatus.
+ * 3) Restart the node, see the pin persist.
+ * 4) Make a new pin at "original pin" + 1
+ * 5) Remove the pin at "original pin"
+ * 6) Restart the node, see the incremented pin value from serverStatus.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, requires_persistence]
+ */
+
+(function() {
+"use strict";
+
+function incTs(ts) {
+ return Timestamp(ts.t, ts.i + 1);
+}
+
+let replTest = new ReplSetTest({
+ name: "use_history_after_restart",
+ nodes: 1,
+ nodeOptions: {
+ setParameter: {
+ // Set the history window to zero to more aggressively advance the oldest timestamp.
+ minSnapshotHistoryWindowInSeconds: 0,
+ logComponentVerbosity: tojson({storage: {recovery: 2}}),
+ }
+ }
+});
+let nodes = replTest.startSet();
+replTest.initiate();
+let primary = replTest.getPrimary();
+
+// Pin with an arbitrarily small timestamp. Let the rounding tell us where the pin ended up. The
+// write to the `mdb_testing.pinned_timestamp` collection is not logged/replayed during replication
+// recovery. Repinning across startup happens before replication recovery. Do a majority write for
+// predictability of the test.
+let result = assert.commandWorked(primary.adminCommand(
+ {"pinHistoryReplicated": Timestamp(100, 1), round: true, writeConcern: {w: "majority"}}));
+let origPinTs = result["pinTs"];
+jsTestLog({"First pin result": result});
+
+// Do some additional writes that would traditionally advance the oldest timestamp.
+for (var idx = 0; idx < 10; ++idx) {
+ assert.commandWorked(primary.getDB("test")["coll"].insert({}));
+}
+assert.commandWorked(primary.getDB("test")["coll"].insert({}, {writeConcern: {w: "majority"}}));
+
+// Observe that the pinned timestamp matches the command response.
+let serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
+let pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+assert.eq(origPinTs, pinnedTs);
+
+// Restarting the primary should preserve the pin.
+replTest.restart(primary);
+primary = replTest.getPrimary();
+serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
+pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+assert.eq(origPinTs, pinnedTs);
+
+// Create a new pin at "ts + 1". This should succeed, but have no effect.
+result = assert.commandWorked(
+ primary.adminCommand({"pinHistoryReplicated": incTs(result["pinTs"]), round: false}));
+jsTestLog({"Second pin result": result});
+let newPinTs = result["pinTs"];
+assert.eq(newPinTs, incTs(origPinTs));
+
+// Remove the old pin at "ts".
+assert.commandWorked(primary.getDB("mdb_testing")["pinned_timestamp"].remove(
+ {"pinTs": origPinTs}, {writeConcern: {w: "majority"}}));
+
+// Restarting the node should observe a pin at "ts + 1".
+replTest.restart(primary);
+primary = replTest.getPrimary();
+serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
+pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+assert.eq(newPinTs, pinnedTs);
+replTest.stopSet();
+})();
diff --git a/jstests/replsets/unpin_history_after_rollback.js b/jstests/replsets/unpin_history_after_rollback.js
new file mode 100644
index 00000000000..32e68260659
--- /dev/null
+++ b/jstests/replsets/unpin_history_after_rollback.js
@@ -0,0 +1,61 @@
+/**
+ * This test uses the test only `pinHistoryReplicated` command to exercise DurableHistoryPins
+ * across rollback.
+ *
+ * The `pinHistoryReplicated` command will pin the oldest timestamp at the requested time (with an
+ * optional rounding up to oldest). If the pin is successfully, the pinned time is written to a
+ * document inside `mdb_testing.pinned_timestamp`.
+ *
+ * For the purposes of this test, the write is timestamped as its replicated in the oplog. If the
+ * write gets rolled back, this test ensures any pinning effect it had is removed.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, requires_persistence]
+ */
+(function() {
+"use strict";
+
+load("jstests/replsets/libs/rollback_test.js");
+
+let rst = new ReplSetTest({
+ name: "history_rollback_test",
+ nodes: 3,
+ useBridge: true,
+ nodeOptions: {setParameter: {logComponentVerbosity: tojson({storage: {recovery: 2}})}}
+});
+rst.startSet();
+const config = rst.getReplSetConfig();
+config.members[2].priority = 0;
+config.settings = {
+ chainingAllowed: false
+};
+rst.initiateWithHighElectionTimeout(config);
+
+let rollbackTest = new RollbackTest("history_rollback_test", rst);
+let rollbackNode = rollbackTest.getPrimary();
+rollbackTest.transitionToRollbackOperations();
+
+let serverStatus = rollbackNode.adminCommand("serverStatus");
+// When there is no pin, the `min pinned timestamp` value is `Timestamp::max()`. I don't believe
+// there is a JS constant for `Timestamp::max()`, so we capture it now for later.
+let maxTimestampValue =
+ serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+
+// Perform a write that pins history. This write will be rolled back.
+let result = assert.commandWorked(
+ rollbackNode.adminCommand({"pinHistoryReplicated": Timestamp(100, 1), round: true}));
+let origPinTs = result["pinTs"];
+
+serverStatus = rollbackNode.adminCommand("serverStatus");
+let pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+assert.eq(origPinTs, pinnedTs);
+
+rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
+rollbackTest.transitionToSyncSourceOperationsDuringRollback();
+rollbackTest.transitionToSteadyStateOperations();
+
+serverStatus = rollbackNode.adminCommand("serverStatus");
+pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
+assert.eq(maxTimestampValue, pinnedTs);
+
+rst.stopSet();
+})();
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index bf09b3a0b2c..24d2b31bb94 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -495,6 +495,7 @@ let testCases = {
movePrimary: {skip: "does not accept read or write concern"},
multicast: {skip: "does not accept read or write concern"},
netstat: {skip: "internal command"},
+ pinHistoryReplicated: {skip: "internal command"},
ping: {skip: "does not accept read or write concern"},
planCacheClear: {skip: "does not accept read or write concern"},
planCacheClearFilters: {skip: "does not accept read or write concern"},
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 721ebd015f6..23a53b5b336 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2131,6 +2131,7 @@ env.Library(
# please add that library as a private libdep of
# mongod_initializers.
'$BUILD_DIR/mongo/client/clientdriver_minimal',
+ '$BUILD_DIR/mongo/db/commands/shell_protocol',
'$BUILD_DIR/mongo/db/repl/tenant_migration_donor_service',
'$BUILD_DIR/mongo/s/grid',
'$BUILD_DIR/mongo/s/sessions_collection_sharded',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 6aa23bb7b07..a1e50e001e8 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -365,7 +365,6 @@ env.Library(
],
)
-# Commands required by the shell to connect
env.Library(
target="shell_protocol",
source=[
@@ -376,6 +375,8 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/collection',
'$BUILD_DIR/mongo/db/commands',
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
'test_commands_enabled',
],
)
diff --git a/src/mongo/db/commands/test_commands.cpp b/src/mongo/db/commands/test_commands.cpp
index 58d75f67e71..1f3913e5277 100644
--- a/src/mongo/db/commands/test_commands.cpp
+++ b/src/mongo/db/commands/test_commands.cpp
@@ -33,6 +33,8 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/commands/test_commands.h"
+
#include "mongo/base/init.h"
#include "mongo/db/catalog/capped_utils.h"
#include "mongo/db/catalog/collection.h"
@@ -41,13 +43,20 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/ops/insert.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
namespace mongo {
+namespace {
+const NamespaceString kDurableHistoryTestNss("mdb_testing.pinned_timestamp");
+const std::string kTestingDurableHistoryPinName = "_testing";
+} // namespace
+
using repl::UnreplicatedWritesBlock;
using std::endl;
using std::string;
@@ -213,4 +222,105 @@ public:
};
MONGO_REGISTER_TEST_COMMAND(EmptyCapped);
+
+class DurableHistoryReplicatedTestCmd : public BasicCommand {
+public:
+ DurableHistoryReplicatedTestCmd() : BasicCommand("pinHistoryReplicated") {}
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return true;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ bool requiresAuth() const override {
+ return false;
+ }
+
+ // No auth needed because it only works when enabled via command line.
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) const override {}
+
+ std::string help() const override {
+ return "pins the oldest timestamp";
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ const Timestamp requestedPinTs = cmdObj.firstElement().timestamp();
+ const bool round = cmdObj["round"].booleanSafe();
+
+ AutoGetOrCreateDb db(opCtx, kDurableHistoryTestNss.db(), MODE_IX);
+ Lock::CollectionLock collLock(opCtx, kDurableHistoryTestNss, MODE_IX);
+ if (!CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(
+ opCtx,
+ kDurableHistoryTestNss)) { // someone else may have beat us to it.
+ uassertStatusOK(userAllowedCreateNS(kDurableHistoryTestNss));
+ WriteUnitOfWork wuow(opCtx);
+ CollectionOptions defaultCollectionOptions;
+ uassertStatusOK(
+ db.getDb()->userCreateNS(opCtx, kDurableHistoryTestNss, defaultCollectionOptions));
+ wuow.commit();
+ }
+
+ AutoGetCollection autoColl(opCtx, kDurableHistoryTestNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+
+ // Note, this write will replicate to secondaries, but a secondary will not in-turn pin the
+ // oldest timestamp. The write otherwise must be timestamped in a storage engine table with
+ // logging disabled. This is to test that rolling back the written document also results in
+ // the pin being lifted.
+ Timestamp pinTs =
+ uassertStatusOK(opCtx->getServiceContext()->getStorageEngine()->pinOldestTimestamp(
+ opCtx, kTestingDurableHistoryPinName, requestedPinTs, round));
+
+ uassertStatusOK(autoColl->insertDocument(
+ opCtx,
+ InsertStatement(fixDocumentForInsert(opCtx, BSON("pinTs" << pinTs)).getValue()),
+ nullptr));
+ wuow.commit();
+
+ result.append("requestedPinTs", requestedPinTs);
+ result.append("pinTs", pinTs);
+ return true;
+ }
+};
+
+MONGO_REGISTER_TEST_COMMAND(DurableHistoryReplicatedTestCmd);
+
+std::string TestingDurableHistoryPin::getName() {
+ return kTestingDurableHistoryPinName;
+}
+
+boost::optional<Timestamp> TestingDurableHistoryPin::calculatePin(OperationContext* opCtx) {
+ AutoGetCollectionForRead autoColl(opCtx, kDurableHistoryTestNss);
+ if (!autoColl) {
+ return boost::none;
+ }
+
+ Timestamp ret = Timestamp::max();
+ auto cursor = autoColl->getCursor(opCtx);
+ for (auto doc = cursor->next(); doc; doc = cursor->next()) {
+ const BSONObj obj = doc.get().data.toBson();
+ const Timestamp ts = obj["pinTs"].timestamp();
+ ret = std::min(ret, ts);
+ }
+
+ if (ret == Timestamp::min()) {
+ return boost::none;
+ }
+
+ return ret;
+}
+
+
} // namespace mongo
diff --git a/src/mongo/db/commands/test_commands.h b/src/mongo/db/commands/test_commands.h
new file mode 100644
index 00000000000..c3b9759d3d6
--- /dev/null
+++ b/src/mongo/db/commands/test_commands.h
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/durable_history_pin.h"
+
+namespace mongo {
+
+/**
+ * This hook pairs with the `pinHistoryReplicated` test-only command. The test command will pin the
+ * oldest timestamp and perform a write into `mdb_testing.pinned_timestamp`. This hook knows how to
+ * read that collection and re-pin any history requests after a restart or across rollback.
+ */
+class TestingDurableHistoryPin : public DurableHistoryPin {
+public:
+ std::string getName() override;
+
+ boost::optional<Timestamp> calculatePin(OperationContext* opCtx) override;
+};
+} // namespace mongo
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index de25ae2da20..76c491e3016 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -67,6 +67,8 @@
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/feature_compatibility_version_gen.h"
#include "mongo/db/commands/shutdown.h"
+#include "mongo/db/commands/test_commands.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/flow_control_ticketholder.h"
#include "mongo/db/concurrency/lock_state.h"
@@ -153,6 +155,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
#include "mongo/db/storage/control/storage_control.h"
+#include "mongo/db/storage/durable_history_pin.h"
#include "mongo/db/storage/encryption_hooks.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/storage/flow_control_parameters_gen.h"
@@ -492,6 +495,11 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
LOGV2(20536, "Flow Control is enabled on this deployment");
}
+ {
+ Lock::GlobalWrite globalLk(startupOpCtx.get());
+ DurableHistoryRegistry::get(serviceContext)->reconcilePins(startupOpCtx.get());
+ }
+
// Notify the storage engine that startup is completed before repair exits below, as repair sets
// the upgrade flag to true.
serviceContext->getStorageEngine()->notifyStartupComplete();
@@ -998,8 +1006,11 @@ void setUpReplication(ServiceContext* serviceContext) {
void setUpObservers(ServiceContext* serviceContext) {
auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ DurableHistoryRegistry::get(serviceContext)
+ ->registerPin(std::make_unique<ReshardingHistoryHook>());
opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>());
opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
+ opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
} else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>());
@@ -1381,6 +1392,16 @@ int mongod_main(int argc, char* argv[]) {
}
}();
+ {
+ // Create the durable history registry prior to calling the `setUp*` methods. They may
+ // depend on it existing at this point.
+ DurableHistoryRegistry::set(service, std::make_unique<DurableHistoryRegistry>());
+ DurableHistoryRegistry* registry = DurableHistoryRegistry::get(service);
+ if (getTestCommandsEnabled()) {
+ registry->registerPin(std::make_unique<TestingDurableHistoryPin>());
+ }
+ }
+
setUpCollectionShardingState(service);
setUpCatalog(service);
setUpReplication(service);
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index d1f825f0f54..84a3cb1242b 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -111,8 +111,6 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) {
uassertStatusOK(waitForWriteConcern(
opCtx.get(), generatedOpTime, WriteConcerns::kMajorityWriteConcern, &result));
- // TODO notify storage engine to pin the minFetchTimestamp
-
return generatedOpTime.getTimestamp();
}
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp
index 979016f5320..05144408e12 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp
@@ -33,6 +33,9 @@
#include "mongo/db/s/resharding/resharding_op_observer.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/s/resharding/donor_document_gen.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
#include "mongo/logv2/log.h"
@@ -50,13 +53,126 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver(
return (*instance)->getObserver();
}
+boost::optional<Timestamp> parseNewMinFetchTimestampValue(const BSONObj& obj) {
+ auto doc = ReshardingDonorDocument::parse(IDLParserErrorContext("Resharding"), obj);
+ if (doc.getState() == DonorStateEnum::kDonatingInitialData) {
+ return doc.getMinFetchTimestamp().get();
+ } else {
+ return boost::none;
+ }
+}
+
+boost::optional<Timestamp> _calculatePin(OperationContext* opCtx) {
+ // We recalculate the pin by looking at all documents inside the resharding donor
+ // collection. The caller may or may not be in a transaction. If the caller is in a transaction,
+ // we intentionally read any uncommitted writes it has made.
+ //
+ // If there are concurrent transactions updating different keys in the donor collection, there
+ // can be write skew resulting in the wrong pin, including leaking a resource. We enforce the
+ // collection is held in exclusive mode to prevent this.
+
+ // TODO: Uncomment
+ // invariant(opCtx->lockState()->isCollectionLockedForMode(
+ // NamespaceString::kDonorReshardingOperationsNamespace, LockMode::MODE_X));
+
+ // If the RecoveryUnit already had an open snapshot, keep the snapshot open. Otherwise abandon
+ // the snapshot when exitting the function.
+ auto scopeGuard = makeGuard([&] { opCtx->recoveryUnit()->abandonSnapshot(); });
+ if (opCtx->recoveryUnit()->isActive()) {
+ scopeGuard.dismiss();
+ }
+
+ AutoGetCollectionForRead autoColl(opCtx, NamespaceString::kDonorReshardingOperationsNamespace);
+ if (!autoColl) {
+ return boost::none;
+ }
+
+ Timestamp ret = Timestamp::max();
+ auto cursor = autoColl->getCursor(opCtx);
+ for (auto doc = cursor->next(); doc; doc = cursor->next()) {
+ if (auto fetchTs = parseNewMinFetchTimestampValue(doc.get().data.toBson()); fetchTs) {
+ ret = std::min(ret, fetchTs.get());
+ }
+ }
+
+ if (ret == Timestamp::max()) {
+ return boost::none;
+ }
+
+ return ret;
+}
+
+void _doPin(OperationContext* opCtx) {
+ auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
+ boost::optional<Timestamp> pin = _calculatePin(opCtx);
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!pin) {
+ storageEngine->unpinOldestTimestamp(ReshardingHistoryHook::kName.toString());
+ return;
+ }
+
+ StatusWith<Timestamp> res = storageEngine->pinOldestTimestamp(
+ opCtx, ReshardingHistoryHook::kName.toString(), pin.get(), false);
+ if (!res.isOK()) {
+ if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeReplSet) {
+ // The pin has failed, but we're in standalone mode. Ignore the error.
+ return;
+ }
+
+ const auto state = replCoord->getMemberState();
+ if (state.primary()) {
+ // If we're a primary, the pin can fail and the error should bubble up and fail
+ // resharding.
+ uassertStatusOK(res);
+ } else if (state.secondary()) {
+ // The pin timestamp can be earlier than the oplog entry being processed. Thus
+ // the oldest timestamp can race ahead of the pin request. It's not ideal this
+ // node cannot participate in donating documents for the cloning phase, but this
+ // is the most robust path forward. Ignore this case.
+ LOGV2_WARNING(5384104,
+ "This node is unable to pin history for resharding",
+ "requestedTs"_attr = pin.get());
+ } else {
+ // For recovery cases we also ignore the error. The expected scenario is the pin
+ // request is no longer needed, but the write to delete the pin was rolled
+ // back. The write to delete the pin won't be issued until the collection
+ // cloning phase of resharding is majority committed. Thus there should be no
+ // consequence to observing this error. Ignore this case.
+ LOGV2(5384103,
+ "The requested pin was unavailable, but should also be unnecessary",
+ "requestedTs"_attr = pin.get());
+ }
+ }
+}
+
} // namespace
+boost::optional<Timestamp> ReshardingHistoryHook::calculatePin(OperationContext* opCtx) {
+ return _calculatePin(opCtx);
+}
+
ReshardingOpObserver::ReshardingOpObserver() = default;
ReshardingOpObserver::~ReshardingOpObserver() = default;
+void ReshardingOpObserver::onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) {
+ // If a document is inserted into the resharding donor collection with a `minFetchTimestamp`, we
+ // assume the document was inserted as part of initial sync and do nothing to pin history.
+ return;
+}
+
void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
+ if (args.nss == NamespaceString::kDonorReshardingOperationsNamespace) {
+ // Primaries and secondaries should execute pinning logic when observing changes to the
+ // donor resharding document.
+ _doPin(opCtx);
+ }
+
// This is a no-op if either replication is not enabled or this node is a secondary
if (!repl::ReplicationCoordinator::get(opCtx)->isReplEnabled() ||
!opCtx->writesAreReplicated()) {
@@ -77,4 +193,16 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn
}
}
+void ReshardingOpObserver::onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ StmtId stmtId,
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {
+ if (nss == NamespaceString::kDonorReshardingOperationsNamespace) {
+ _doPin(opCtx);
+ }
+}
+
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index a944c4f84ee..cf886deee5c 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -31,8 +31,23 @@
#include "mongo/db/op_observer.h"
+#include <boost/optional.hpp>
+
+#include "mongo/db/storage/durable_history_pin.h"
+
namespace mongo {
+class ReshardingHistoryHook : public DurableHistoryPin {
+public:
+ static constexpr StringData kName = "resharding"_sd;
+
+ std::string getName() override {
+ return kName.toString();
+ }
+
+ boost::optional<Timestamp> calculatePin(OperationContext* opCtx) override;
+};
+
/**
* OpObserver for observing writes to internal resharding collections. This includes collections
* such as config.reshardingOperations, config.localReshardingOperations.donor, and
@@ -82,7 +97,7 @@ public:
OptionalCollectionUUID uuid,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
- bool fromMigrate) override {}
+ bool fromMigrate) override;
void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override;
@@ -95,7 +110,7 @@ public:
OptionalCollectionUUID uuid,
StmtId stmtId,
bool fromMigrate,
- const boost::optional<BSONObj>& deletedDoc) override {}
+ const boost::optional<BSONObj>& deletedDoc) override;
void onInternalOpMessage(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 23d13b1a904..0754221bec8 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -118,6 +118,7 @@ env.Library(
target='storage_control',
source=[
'control/storage_control.cpp',
+ 'durable_history_pin.cpp',
],
LIBDEPS=[
'checkpointer',
@@ -125,6 +126,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/service_context',
'storage_options',
],
@@ -586,6 +588,7 @@ env.Library(
'$BUILD_DIR/mongo/db/resumable_index_builds_idl',
'$BUILD_DIR/mongo/db/storage/storage_repair_observer',
'$BUILD_DIR/mongo/db/vector_clock',
+ 'storage_control',
'storage_util',
'two_phase_index_build_knobs_idl',
],
diff --git a/src/mongo/db/storage/durable_history_pin.cpp b/src/mongo/db/storage/durable_history_pin.cpp
new file mode 100644
index 00000000000..09e4e97bcd4
--- /dev/null
+++ b/src/mongo/db/storage/durable_history_pin.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#define LOGV2_FOR_RECOVERY(ID, DLEVEL, MESSAGE, ...) \
+ LOGV2_DEBUG_OPTIONS(ID, DLEVEL, {logv2::LogComponent::kStorageRecovery}, MESSAGE, ##__VA_ARGS__)
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/durable_history_pin.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/service_context.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+const auto getDurableHistoryRegistry =
+ ServiceContext::declareDecoration<std::unique_ptr<DurableHistoryRegistry>>();
+
+} // namespace
+
+DurableHistoryRegistry* DurableHistoryRegistry::get(ServiceContext* service) {
+ return getDurableHistoryRegistry(service).get();
+}
+
+DurableHistoryRegistry* DurableHistoryRegistry::get(ServiceContext& service) {
+ return getDurableHistoryRegistry(service).get();
+}
+
+DurableHistoryRegistry* DurableHistoryRegistry::get(OperationContext* opCtx) {
+ return get(opCtx->getClient()->getServiceContext());
+}
+
+void DurableHistoryRegistry::set(ServiceContext* service,
+ std::unique_ptr<DurableHistoryRegistry> registry) {
+ auto& decoratedRegistry = getDurableHistoryRegistry(service);
+ decoratedRegistry = std::move(registry);
+}
+
+void DurableHistoryRegistry::registerPin(std::unique_ptr<DurableHistoryPin> pin) {
+ _pins.push_back(std::move(pin));
+}
+
+void DurableHistoryRegistry::reconcilePins(OperationContext* opCtx) {
+ StorageEngine* engine = opCtx->getServiceContext()->getStorageEngine();
+ if (!engine->supportsRecoveryTimestamp()) {
+ return;
+ }
+
+ for (auto& pin : _pins) {
+ boost::optional<Timestamp> pinTs = pin->calculatePin(opCtx);
+ LOGV2_FOR_RECOVERY(5384102,
+ 2,
+ "Reconciling timestamp pin.",
+ "name"_attr = pin->getName(),
+ "ts"_attr = pinTs);
+ if (pinTs) {
+ auto swTimestamp =
+ engine->pinOldestTimestamp(opCtx, pin->getName(), pinTs.get(), false);
+ if (!swTimestamp.isOK()) {
+ LOGV2_WARNING(5384105,
+ "Unable to repin oldest timestamp",
+ "service"_attr = pin->getName(),
+ "request"_attr = pinTs.get(),
+ "error"_attr = swTimestamp.getStatus());
+ }
+ } else {
+ engine->unpinOldestTimestamp(pin->getName());
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/durable_history_pin.h b/src/mongo/db/storage/durable_history_pin.h
new file mode 100644
index 00000000000..0a202772579
--- /dev/null
+++ b/src/mongo/db/storage/durable_history_pin.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+class DurableHistoryPin {
+public:
+ virtual ~DurableHistoryPin() {}
+
+ virtual std::string getName() = 0;
+
+ virtual boost::optional<Timestamp> calculatePin(OperationContext* opCtx) = 0;
+};
+
+/**
+ * Services that want to preserve storage engine history across restarts or replication rollback
+ * should create a class that implements `DurableHistoryPin` and register an instance of that class
+ * at startup prior to the first call of `reconcilePins`.
+ */
+class DurableHistoryRegistry {
+public:
+ static DurableHistoryRegistry* get(ServiceContext* service);
+ static DurableHistoryRegistry* get(ServiceContext& service);
+ static DurableHistoryRegistry* get(OperationContext* ctx);
+
+ static void set(ServiceContext* service, std::unique_ptr<DurableHistoryRegistry> registry);
+
+ void registerPin(std::unique_ptr<DurableHistoryPin> pin);
+
+ /**
+ * Iterates through each registered pin and takes one of two actions:
+ *
+ * 1) If the pin returns an engaged boost::optional<Timestamp>, forward that pinned timestamp to
+ * the storage engine using the pins name.
+ * 2) If the pin returns boost::none, unpin any resources held by the storage engine on behalf
+ * of the pins name.
+ *
+ * If a requested pin fails, a log will be issued, but the process will otherwise continue.
+ */
+ void reconcilePins(OperationContext* opCtx);
+
+private:
+ std::vector<std::unique_ptr<DurableHistoryPin>> _pins;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 22ecf3727fb..acf5562c1b4 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -386,7 +386,8 @@ public:
MONGO_UNREACHABLE;
}
- virtual StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
+ virtual StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
+ const std::string& requestingServiceName,
Timestamp requestedTimestamp,
bool roundUpIfTooOld) {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index a40d26fda2e..1c43ea90235 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -675,8 +675,12 @@ public:
* | requested >= oldest | false/true | <OK, requested timestamp> |
* | requested < oldest | false | <SnapshotTooOld> |
* | requested < oldest | true | <OK, oldest timestamp> |
+ *
+ * If the input OperationContext is in a WriteUnitOfWork, an `onRollback` handler will be
+ * registered to return the pin for the `requestingServiceName` to the previous value.
*/
- virtual StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
+ virtual StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
+ const std::string& requestingServiceName,
Timestamp requestedTimestamp,
bool roundUpIfTooOld) = 0;
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index fe4152e0995..9d0b40c520b 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/db/storage/durable_catalog_feature_tracker.h"
+#include "mongo/db/storage/durable_history_pin.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/kv/temporary_kv_record_store.h"
#include "mongo/db/storage/storage_repair_observer.h"
@@ -1002,6 +1003,7 @@ StatusWith<Timestamp> StorageEngineImpl::recoverToStableTimestamp(OperationConte
}
catalog::openCatalog(opCtx, state, swTimestamp.getValue());
+ DurableHistoryRegistry::get(opCtx)->reconcilePins(opCtx);
LOGV2(22259,
"recoverToStableTimestamp successful",
@@ -1253,8 +1255,12 @@ int64_t StorageEngineImpl::sizeOnDiskForDb(OperationContext* opCtx, StringData d
}
StatusWith<Timestamp> StorageEngineImpl::pinOldestTimestamp(
- const std::string& requestingServiceName, Timestamp requestedTimestamp, bool roundUpIfTooOld) {
- return _engine->pinOldestTimestamp(requestingServiceName, requestedTimestamp, roundUpIfTooOld);
+ OperationContext* opCtx,
+ const std::string& requestingServiceName,
+ Timestamp requestedTimestamp,
+ bool roundUpIfTooOld) {
+ return _engine->pinOldestTimestamp(
+ opCtx, requestingServiceName, requestedTimestamp, roundUpIfTooOld);
}
void StorageEngineImpl::unpinOldestTimestamp(const std::string& requestingServiceName) {
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 9055797e22e..cc6b0b4f84e 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -363,7 +363,8 @@ public:
return _options.directoryPerDB;
}
- StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
+ StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
+ const std::string& requestingServiceName,
Timestamp requestedTimestamp,
bool roundUpIfTooOld) override;
diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h
index 9c2b61e22ad..11f350ee9fb 100644
--- a/src/mongo/db/storage/storage_engine_mock.h
+++ b/src/mongo/db/storage/storage_engine_mock.h
@@ -196,7 +196,8 @@ public:
return nullptr;
}
- StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
+ StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
+ const std::string& requestingServiceName,
Timestamp requestedTimestamp,
bool roundUpIfTooOld) final {
return Status::OK();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 8419f7debc5..76a85bc5d81 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -532,8 +532,13 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
// 1) The feature stores the desired pin timestamp in some local collection.
// 2) This temporary pinning lasts long enough for the catalog to be loaded and
// accessed.
- uassertStatusOK(pinOldestTimestamp(
- kPinOldestTimestampAtStartupName, Timestamp(_oldestTimestamp.load()), false));
+ {
+ stdx::lock_guard<Latch> lk(_oldestTimestampPinRequestsMutex);
+ uassertStatusOK(_pinOldestTimestamp(lk,
+ kPinOldestTimestampAtStartupName,
+ Timestamp(_oldestTimestamp.load()),
+ false));
+ }
setStableTimestamp(_recoveryTimestamp, false);
@@ -2342,7 +2347,10 @@ Timestamp WiredTigerKVEngine::getPinnedOplog() const {
}
StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
- const std::string& requestingServiceName, Timestamp requestedTimestamp, bool roundUpIfTooOld) {
+ OperationContext* opCtx,
+ const std::string& requestingServiceName,
+ Timestamp requestedTimestamp,
+ bool roundUpIfTooOld) {
stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
Timestamp oldest = getOldestTimestamp();
LOGV2(5380104,
@@ -2352,6 +2360,40 @@ StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
"roundUpIfTooOld"_attr = roundUpIfTooOld,
"currOldestTs"_attr = oldest);
+ Timestamp previousTimestamp = _oldestTimestampPinRequests[requestingServiceName];
+ auto swPinnedTimestamp =
+ _pinOldestTimestamp(lock, requestingServiceName, requestedTimestamp, roundUpIfTooOld);
+ if (!swPinnedTimestamp.isOK()) {
+ return swPinnedTimestamp;
+ }
+
+ if (opCtx->lockState()->inAWriteUnitOfWork()) {
+ // If we've moved the pin and are in a `WriteUnitOfWork`, assume the caller has a write that
+ // should be atomic with this pin request. If the `WriteUnitOfWork` is rolled back, either
+ // unpin the oldest timestamp or repin the previous value.
+ opCtx->recoveryUnit()->onRollback(
+ [this, svcName = requestingServiceName, previousTimestamp]() {
+ if (previousTimestamp.isNull()) {
+ unpinOldestTimestamp(svcName);
+ } else {
+ stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
+ // When a write is updating the value from an earlier pin to a later one, use
+ // rounding to make a best effort to repin the earlier value.
+ invariant(_pinOldestTimestamp(lock, svcName, previousTimestamp, true).isOK());
+ }
+ });
+ }
+
+ return swPinnedTimestamp;
+}
+
+StatusWith<Timestamp> WiredTigerKVEngine::_pinOldestTimestamp(
+ WithLock,
+ const std::string& requestingServiceName,
+ Timestamp requestedTimestamp,
+ bool roundUpIfTooOld) {
+
+ Timestamp oldest = getOldestTimestamp();
if (requestedTimestamp < oldest) {
if (roundUpIfTooOld) {
requestedTimestamp = oldest;
@@ -2363,7 +2405,6 @@ StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
}
_oldestTimestampPinRequests[requestingServiceName] = requestedTimestamp;
-
return {requestedTimestamp};
}
@@ -2371,9 +2412,10 @@ void WiredTigerKVEngine::unpinOldestTimestamp(const std::string& requestingServi
stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
auto it = _oldestTimestampPinRequests.find(requestingServiceName);
if (it == _oldestTimestampPinRequests.end()) {
- LOGV2_WARNING(5380105,
- "The requested service had nothing to unpin",
- "service"_attr = requestingServiceName);
+ LOGV2_DEBUG(2,
+ 5380105,
+ "The requested service had nothing to unpin",
+ "service"_attr = requestingServiceName);
return;
}
LOGV2(5380103,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 09dfe563399..b9f6e1fb635 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -46,6 +46,7 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/platform/mutex.h"
+#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
@@ -345,10 +346,18 @@ public:
return _clockSource;
}
- StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
+ StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
+ const std::string& requestingServiceName,
Timestamp requestedTimestamp,
bool roundUpIfTooOld) override;
+private:
+ StatusWith<Timestamp> _pinOldestTimestamp(WithLock,
+ const std::string& requestingServiceName,
+ Timestamp requestedTimestamp,
+ bool roundUpIfTooOld);
+
+public:
void unpinOldestTimestamp(const std::string& requestingServiceName) override;
std::map<std::string, Timestamp> getPinnedTimestampRequests();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
index d2e51d60600..49619f6cee8 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
@@ -392,6 +392,7 @@ TEST_F(WiredTigerKVEngineTest, IdentDrop) {
}
TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
+ auto opCtxRaii = makeOperationContext();
const Timestamp initTs = Timestamp(1, 0);
// Initialize the oldest timestamp.
@@ -406,8 +407,8 @@ TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
// as. This error case is not exercised in this test.
const bool roundUpIfTooOld = false;
// Pin the oldest timestamp to "3".
- auto pinnedTs =
- unittest::assertGet(_engine->pinOldestTimestamp("A", initTs + 3, roundUpIfTooOld));
+ auto pinnedTs = unittest::assertGet(
+ _engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs + 3, roundUpIfTooOld));
// Assert that the pinning method returns the same timestamp as was requested.
ASSERT_EQ(initTs + 3, pinnedTs);
// Assert that pinning the oldest timestamp does not advance it.
@@ -432,6 +433,7 @@ TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
* of all active requests will be obeyed.
*/
TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
+ auto opCtxRaii = makeOperationContext();
const Timestamp initTs = Timestamp(1, 0);
_engine->setOldestTimestamp(initTs, false);
@@ -441,13 +443,14 @@ TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
// as. This error case is not exercised in this test.
const bool roundUpIfTooOld = false;
// Have "A" pin the timestamp to "1".
- auto pinnedTs =
- unittest::assertGet(_engine->pinOldestTimestamp("A", initTs + 1, roundUpIfTooOld));
+ auto pinnedTs = unittest::assertGet(
+ _engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs + 1, roundUpIfTooOld));
ASSERT_EQ(initTs + 1, pinnedTs);
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
// Have "B" pin the timestamp to "2".
- pinnedTs = unittest::assertGet(_engine->pinOldestTimestamp("B", initTs + 2, roundUpIfTooOld));
+ pinnedTs = unittest::assertGet(
+ _engine->pinOldestTimestamp(opCtxRaii.get(), "B", initTs + 2, roundUpIfTooOld));
ASSERT_EQ(initTs + 2, pinnedTs);
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
@@ -471,6 +474,7 @@ TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
* relative to the current oldest timestamp.
*/
TEST_F(WiredTigerKVEngineTest, TestPinOldestTimestampErrors) {
+ auto opCtxRaii = makeOperationContext();
const Timestamp initTs = Timestamp(10, 0);
_engine->setOldestTimestamp(initTs, false);
@@ -482,13 +486,13 @@ TEST_F(WiredTigerKVEngineTest, TestPinOldestTimestampErrors) {
// When rounding on error, the pin will succeed, but the return value will be the current oldest
// timestamp instead of the requested value.
- auto pinnedTs =
- unittest::assertGet(_engine->pinOldestTimestamp("A", initTs - 1, roundUpIfTooOld));
+ auto pinnedTs = unittest::assertGet(
+ _engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs - 1, roundUpIfTooOld));
ASSERT_EQ(initTs, pinnedTs);
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
// Using "fail on error" will result in a not-OK return value.
- ASSERT_NOT_OK(_engine->pinOldestTimestamp("B", initTs - 1, failOnError));
+ ASSERT_NOT_OK(_engine->pinOldestTimestamp(opCtxRaii.get(), "B", initTs - 1, failOnError));
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
}