summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/dbcommands.cpp3
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator.h14
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp44
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp23
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h7
-rw-r--r--src/mongo/db/repl/replication_info.cpp7
-rw-r--r--src/mongo/db/repl/rollback.idl44
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp5
-rw-r--r--src/mongo/db/repl/storage_interface.h7
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp74
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h7
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp153
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp29
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h7
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp1
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp5
21 files changed, 375 insertions, 99 deletions
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index aefee2bb4df..da379905d84 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -1313,7 +1313,8 @@ void appendReplyMetadata(OperationContext* opCtx,
// Attach our own last opTime.
repl::OpTime lastOpTimeFromClient =
repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- replCoord->prepareReplMetadata(request.getMetadata(), lastOpTimeFromClient, metadataBob);
+ replCoord->prepareReplMetadata(
+ opCtx, request.getMetadata(), lastOpTimeFromClient, metadataBob);
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18236
if (isShardingAware || isConfig) {
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 9cc1ace7a04..b38283f4448 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -135,6 +135,17 @@ env.CppUnitTest(
)
env.Library(
+ target='rollback_idl',
+ source=[
+ env.Idlc('rollback.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ ],
+)
+
+env.Library(
target='storage_interface',
source=[
'storage_interface.cpp',
@@ -153,6 +164,7 @@ env.Library(
'collection_bulk_loader_impl.cpp',
],
LIBDEPS=[
+ 'rollback_idl',
'storage_interface',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/exec/exec',
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index 86da2bec9db..bd39f2e18e9 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -169,8 +169,13 @@ public:
if (!status.isOK())
return appendCommandStatus(result, status);
- status = getGlobalReplicationCoordinator()->processReplSetGetRBID(&result);
- return appendCommandStatus(result, status);
+ auto rbid = StorageInterface::get(opCtx)->getRollbackID(opCtx);
+
+ // We should always have a Rollback ID since it is created at startup.
+ fassertStatusOK(40426, rbid.getStatus());
+
+ result.append("rbid", rbid.getValue());
+ return appendCommandStatus(result, Status::OK());
}
} cmdReplSetRBID;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index c53f5a7b655..a9ccd9f642a 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -633,17 +633,6 @@ public:
const BSONObj& configObj,
BSONObjBuilder* resultObj) = 0;
- /*
- * Handles an incoming replSetGetRBID command.
- * Adds BSON to 'resultObj'; returns a Status with either OK or an error message.
- */
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) = 0;
-
- /**
- * Increments this process's rollback id. Called every time a rollback occurs.
- */
- virtual void incrementRollbackID() = 0;
-
/**
* Arguments to the replSetFresh command.
*/
@@ -765,7 +754,8 @@ public:
* Prepares a metadata object with the ReplSetMetadata and the OplogQueryMetadata depending
* on what has been requested.
*/
- virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ virtual void prepareReplMetadata(OperationContext* opCtx,
+ const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 72c45415344..e538718650c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -329,13 +329,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
return;
}
- std::unique_ptr<SecureRandom> rbidGenerator(SecureRandom::create());
- _rbid = static_cast<int>(rbidGenerator->nextInt64());
- if (_rbid < 0) {
- // Ensure _rbid is always positive
- _rbid = -_rbid;
- }
-
// Make sure there is always an entry in _slaveInfo for ourself.
SlaveInfo selfInfo;
selfInfo.self = true;
@@ -430,6 +423,19 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx)
_topCoord->loadLastVote(lastVote.getValue());
}
+ // Check that we have a local Rollback ID. If we do not have one, create one.
+ auto rbid = _storage->getRollbackID(opCtx);
+ if (!rbid.isOK()) {
+ if (rbid.getStatus() == ErrorCodes::NamespaceNotFound) {
+ log() << "Did not find local Rollback ID document at startup. Creating one.";
+ auto initializingStatus = _storage->initializeRollbackID(opCtx);
+ fassertStatusOK(40424, initializingStatus);
+ } else {
+ severe() << "Error loading local Rollback ID document at startup; " << rbid.getStatus();
+ fassertFailedNoTrace(40428);
+ }
+ }
+
StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx);
if (!cfg.isOK()) {
log() << "Did not find local replica set configuration document at startup; "
@@ -2867,17 +2873,6 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_externalState->stopProducer();
}
-Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- resultObj->append("rbid", _rbid);
- return Status::OK();
-}
-
-void ReplicationCoordinatorImpl::incrementRollbackID() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- ++_rbid;
-}
-
Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -3329,7 +3324,8 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj,
+void ReplicationCoordinatorImpl::prepareReplMetadata(OperationContext* opCtx,
+ const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const {
@@ -3340,6 +3336,9 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ
return;
}
+ auto rbid = _storage->getRollbackID(opCtx);
+ fassertStatusOK(40427, rbid.getStatus());
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (hasReplSetMetadata) {
@@ -3347,7 +3346,7 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ
}
if (hasOplogQueryMetadata) {
- _prepareOplogQueryMetadata_inlock(builder);
+ _prepareOplogQueryMetadata_inlock(rbid.getValue(), builder);
}
}
@@ -3359,10 +3358,11 @@ void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& la
metadata.writeToMetadata(builder);
}
-void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const {
+void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid,
+ BSONObjBuilder* builder) const {
OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
auto metadata =
- _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, _rbid);
+ _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, rbid);
metadata.writeToMetadata(builder);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index a34d7bb8df9..04c22e2e829 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -234,10 +234,6 @@ public:
const BSONObj& configObj,
BSONObjBuilder* resultObj) override;
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override;
-
- virtual void incrementRollbackID() override;
-
virtual Status processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) override;
@@ -282,7 +278,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) override;
- virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ virtual void prepareReplMetadata(OperationContext* opCtx,
+ const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
@@ -1091,7 +1088,7 @@ private:
/**
* Prepares a metadata object for OplogQueryMetadata.
*/
- void _prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const;
+ void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const;
/**
* Blesses a snapshot to be used for new committed reads.
@@ -1290,10 +1287,6 @@ private:
// updates upstream. Set once in startReplication() and then never modified again.
OID _myRID; // (M)
- // Rollback ID. Used to check if a rollback happened during some interval of time
- // TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
- int _rbid; // (M)
-
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
WaiterList _replicationWaiterList; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index f1c0f7b43b7..2fd396c1218 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -190,7 +190,7 @@ TEST_F(ReplCoordTest, NodeEntersStartupStateWhenStartingUpWithNoLocalConfig) {
startCapturingLogMessages();
start();
stopCapturingLogMessages();
- ASSERT_EQUALS(2, countLogLinesContaining("Did not find local "));
+ ASSERT_EQUALS(3, countLogLinesContaining("Did not find local "));
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
}
@@ -583,19 +583,6 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenCheckReplEnabledForCommandAfterReceivingA
ASSERT_TRUE(result.obj().isEmpty());
}
-TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCalled) {
- start();
- BSONObjBuilder result;
- getReplCoord()->processReplSetGetRBID(&result);
- long long initialValue = result.obj()["rbid"].Int();
- getReplCoord()->incrementRollbackID();
-
- BSONObjBuilder result2;
- getReplCoord()->processReplSetGetRBID(&result2);
- long long incrementedValue = result2.obj()["rbid"].Int();
- ASSERT_EQUALS(incrementedValue, initialValue + 1);
-}
-
TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) {
init("");
auto opCtx = makeOperationContext();
@@ -4071,13 +4058,11 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
getReplCoord()->advanceCommitPoint(optime1);
getReplCoord()->setMyLastAppliedOpTime(optime2);
- // Get current rbid to check against.
- BSONObjBuilder result;
- getReplCoord()->processReplSetGetRBID(&result);
- int initialValue = result.obj()["rbid"].Int();
+ auto opCtx = makeOperationContext();
BSONObjBuilder metadataBob;
getReplCoord()->prepareReplMetadata(
+ opCtx.get(),
BSON(rpc::kOplogQueryMetadataFieldName << 1 << rpc::kReplSetMetadataFieldName << 1),
OpTime(),
&metadataBob);
@@ -4089,7 +4074,7 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
ASSERT_OK(oqMetadata.getStatus());
ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1);
ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2);
- ASSERT_EQ(oqMetadata.getValue().getRBID(), initialValue);
+ ASSERT_EQ(oqMetadata.getValue().getRBID(), 100);
ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1);
ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1);
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 18397d23a23..5938c283371 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -326,12 +326,6 @@ Status ReplicationCoordinatorMock::processReplSetInitiate(OperationContext* opCt
return Status::OK();
}
-Status ReplicationCoordinatorMock::processReplSetGetRBID(BSONObjBuilder* resultObj) {
- return Status::OK();
-}
-
-void ReplicationCoordinatorMock::incrementRollbackID() {}
-
Status ReplicationCoordinatorMock::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
return Status::OK();
@@ -416,7 +410,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorMock::prepareReplMetadata(const BSONObj& metadataRequestObj,
+void ReplicationCoordinatorMock::prepareReplMetadata(OperationContext* opCtx,
+ const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const {}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 72bc120f226..93dea9521de 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -185,10 +185,6 @@ public:
const BSONObj& configObj,
BSONObjBuilder* resultObj);
- virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj);
-
- virtual void incrementRollbackID();
-
virtual Status processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj);
virtual Status processReplSetElect(const ReplSetElectArgs& args, BSONObjBuilder* resultObj);
@@ -226,7 +222,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response);
- void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ void prepareReplMetadata(OperationContext* opCtx,
+ const BSONObj& metadataRequestObj,
const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index 18ad4ab2d11..a852ff23041 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/storage_options.h"
@@ -168,7 +169,11 @@ public:
BSONObjBuilder result;
appendReplicationInfo(opCtx, result, level);
- getGlobalReplicationCoordinator()->processReplSetGetRBID(&result);
+
+ auto rbid = StorageInterface::get(opCtx)->getRollbackID(opCtx);
+ if (rbid.isOK()) {
+ result.append("rbid", rbid.getValue());
+ }
return result.obj();
}
diff --git a/src/mongo/db/repl/rollback.idl b/src/mongo/db/repl/rollback.idl
new file mode 100644
index 00000000000..62208d617d1
--- /dev/null
+++ b/src/mongo/db/repl/rollback.idl
@@ -0,0 +1,44 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# Rollback IDL File
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ rollbackID:
+ description: A document in which the server stores its Rollback ID on disk.
+ fields:
+ _id:
+ type: string
+ description: "The _id of the document stored in the database"
+ rollbackId:
+ type: int
+ description: "The Rollback ID stored in the document"
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 764a84a281d..39b73b2ecf5 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -76,6 +76,7 @@ void RollbackTest::setUp() {
setOplogCollectionName();
_storageInterface.setAppliedThrough(_opCtx.get(), OpTime{});
_storageInterface.setMinValid(_opCtx.get(), OpTime{});
+ _storageInterface.initializeRollbackID(_opCtx.get());
_threadPoolExecutorTest.launchExecutorThread();
}
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 54e233728f1..8fc56ae9aca 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -826,7 +826,10 @@ Status _syncRollback(OperationContext* opCtx,
log() << "rollback common point is " << how.commonPoint;
log() << "rollback 3 fixup";
try {
- ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); });
+ ON_BLOCK_EXIT([&] {
+ auto status = storageInterface->incrementRollbackID(opCtx);
+ fassertStatusOK(40425, status);
+ });
syncFixUp(opCtx, how, rollbackSource, replCoord, storageInterface);
} catch (const RSFatalException& e) {
return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753);
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 2c8a2c2635e..12f0d25507f 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -135,6 +135,13 @@ public:
virtual void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) = 0;
/**
+ * Rollback ID is an increasing counter of how many rollbacks have occurred on this server.
+ */
+ virtual StatusWith<int> getRollbackID(OperationContext* opCtx) = 0;
+ virtual Status initializeRollbackID(OperationContext* opCtx) = 0;
+ virtual Status incrementRollbackID(OperationContext* opCtx) = 0;
+
+ /**
* On startup all oplog entries with a value >= the oplog delete from point should be deleted.
* If null, no documents should be deleted.
*/
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 3c2325ae78a..c187c0871ca 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -69,6 +69,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/repl/task_runner.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
@@ -83,6 +84,9 @@ const char StorageInterfaceImpl::kDefaultMinValidNamespace[] = "local.replset.mi
const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync";
const char StorageInterfaceImpl::kBeginFieldName[] = "begin";
const char StorageInterfaceImpl::kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint";
+const char StorageInterfaceImpl::kDefaultRollbackIdNamespace[] = "local.system.rollback.id";
+const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId";
+const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId";
namespace {
using UniqueLock = stdx::unique_lock<stdx::mutex>;
@@ -97,7 +101,8 @@ StorageInterfaceImpl::StorageInterfaceImpl()
: StorageInterfaceImpl(NamespaceString(StorageInterfaceImpl::kDefaultMinValidNamespace)) {}
StorageInterfaceImpl::StorageInterfaceImpl(const NamespaceString& minValidNss)
- : _minValidNss(minValidNss) {}
+ : _minValidNss(minValidNss),
+ _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {}
NamespaceString StorageInterfaceImpl::getMinValidNss() const {
return _minValidNss;
@@ -129,6 +134,73 @@ void StorageInterfaceImpl::updateMinValidDocument(OperationContext* opCtx,
opCtx, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns());
}
+StatusWith<int> StorageInterfaceImpl::getRollbackID(OperationContext* opCtx) {
+ BSONObjBuilder bob;
+ bob.append("_id", kRollbackIdDocumentId);
+ auto id = bob.obj();
+
+ try {
+ auto rbidDoc = findById(opCtx, _rollbackIdNss, id["_id"]);
+ if (!rbidDoc.isOK()) {
+ return rbidDoc.getStatus();
+ }
+
+ auto rbid = RollbackID::parse(IDLParserErrorContext("RollbackID"), rbidDoc.getValue());
+ invariant(rbid.get_id() == kRollbackIdDocumentId);
+ return rbid.getRollbackId();
+ } catch (...) {
+ return exceptionToStatus();
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+Status StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) {
+ auto status = createCollection(opCtx, _rollbackIdNss, CollectionOptions());
+ if (!status.isOK()) {
+ return status;
+ }
+
+ RollbackID rbid;
+ rbid.set_id(kRollbackIdDocumentId);
+ rbid.setRollbackId(0);
+
+ BSONObjBuilder bob;
+ rbid.serialize(&bob);
+ return insertDocument(opCtx, _rollbackIdNss, bob.done());
+}
+
+Status StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
+ // This is safe because this is only called during rollback, and you can not have two
+ // rollbacks at once.
+ auto rbid = getRollbackID(opCtx);
+ if (!rbid.isOK()) {
+ return rbid.getStatus();
+ }
+
+ // If we would go over the integer limit, reset the Rollback ID to 0.
+ BSONObjBuilder updateBob;
+ if (rbid.getValue() == std::numeric_limits<int>::max()) {
+ BSONObjBuilder setBob(updateBob.subobjStart("$set"));
+ setBob.append(kRollbackIdFieldName, 0);
+ } else {
+ BSONObjBuilder incBob(updateBob.subobjStart("$inc"));
+ incBob.append(kRollbackIdFieldName, 1);
+ }
+
+ // Since the Rollback ID is in a singleton collection, we can fix the _id field.
+ BSONObjBuilder bob;
+ bob.append("_id", kRollbackIdDocumentId);
+ auto id = bob.obj();
+ Status status = upsertById(opCtx, _rollbackIdNss, id["_id"], updateBob.obj());
+
+ // We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
+ if (status.isOK()) {
+ opCtx->recoveryUnit()->waitUntilDurable();
+ }
+ return status;
+}
+
bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* opCtx) const {
const BSONObj doc = getMinValidDocument(opCtx);
const auto flag = doc[kInitialSyncFlagFieldName].trueValue();
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index adc372f23fd..f2f52ea4e5e 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -49,6 +49,9 @@ public:
static const char kInitialSyncFlagFieldName[];
static const char kBeginFieldName[];
static const char kOplogDeleteFromPointFieldName[];
+ static const char kDefaultRollbackIdNamespace[];
+ static const char kRollbackIdFieldName[];
+ static const char kRollbackIdDocumentId[];
StorageInterfaceImpl();
explicit StorageInterfaceImpl(const NamespaceString& minValidNss);
@@ -67,6 +70,9 @@ public:
OpTime getMinValid(OperationContext* opCtx) const override;
void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) override;
+ StatusWith<int> getRollbackID(OperationContext* opCtx) override;
+ Status initializeRollbackID(OperationContext* opCtx) override;
+ Status incrementRollbackID(OperationContext* opCtx) override;
void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override;
void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
@@ -151,6 +157,7 @@ private:
void updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec);
const NamespaceString _minValidNss;
+ const NamespaceString _rollbackIdNss;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 534641674de..ab06431d442 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -337,6 +337,143 @@ TEST_F(StorageInterfaceImplTest, MinValid) {
ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
}
+TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsNamespaceNotFoundOnMissingCollection) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, storage.getRollbackID(opCtx).getStatus());
+}
+
+TEST_F(StorageInterfaceImplTest, IncrementRollbackIDReturnsNamespaceNotFoundOnMissingCollection) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, storage.incrementRollbackID(opCtx));
+}
+
+TEST_F(StorageInterfaceImplTest, InitializeRollbackIDReturnsNamespaceExistsOnExistingCollection) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+
+ createCollection(opCtx, NamespaceString(StorageInterfaceImpl::kDefaultRollbackIdNamespace));
+ ASSERT_EQUALS(ErrorCodes::NamespaceExists, storage.initializeRollbackID(opCtx));
+}
+
+TEST_F(StorageInterfaceImplTest,
+ InitializeRollbackIDReturnsNamespaceExistsIfItHasAlreadyBeenInitialized) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+
+ ASSERT_OK(storage.initializeRollbackID(opCtx));
+ ASSERT_EQUALS(ErrorCodes::NamespaceExists, storage.initializeRollbackID(opCtx));
+}
+
+/**
+ * Check collection contents. OplogInterface returns documents in reverse natural order.
+ */
+void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) {
+ std::vector<BSONObj> reversedDocs(docs);
+ std::reverse(reversedDocs.begin(), reversedDocs.end());
+ OplogInterfaceLocal oplog(opCtx, nss.ns());
+ auto iter = oplog.makeIterator();
+ for (const auto& doc : reversedDocs) {
+ ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first);
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
+}
+
+/**
+ * Check collection contents for a singleton Rollback ID document.
+ */
+void _assertRollbackIDDocument(OperationContext* opCtx, int id) {
+ _assertDocumentsInCollectionEquals(
+ opCtx,
+ NamespaceString(StorageInterfaceImpl::kDefaultRollbackIdNamespace),
+ {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
+ << StorageInterfaceImpl::kRollbackIdFieldName
+ << id)});
+}
+
+TEST_F(StorageInterfaceImplTest, RollbackIdInitializesIncrementsAndReadsProperly) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+
+ ASSERT_OK(storage.initializeRollbackID(opCtx));
+ _assertRollbackIDDocument(opCtx, 0);
+
+ auto rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, 0);
+
+ ASSERT_OK(storage.incrementRollbackID(opCtx));
+ _assertRollbackIDDocument(opCtx, 1);
+
+ rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, 1);
+
+ ASSERT_OK(storage.incrementRollbackID(opCtx));
+ _assertRollbackIDDocument(opCtx, 2);
+
+ rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, 2);
+}
+
+TEST_F(StorageInterfaceImplTest, IncrementRollbackIDRollsToZeroWhenExceedingMaxInt) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+ NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace);
+ createCollection(opCtx, nss);
+ auto maxDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
+ << StorageInterfaceImpl::kRollbackIdFieldName
+ << std::numeric_limits<int>::max())};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, maxDoc));
+ _assertRollbackIDDocument(opCtx, std::numeric_limits<int>::max());
+
+ auto rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, std::numeric_limits<int>::max());
+
+ ASSERT_OK(storage.incrementRollbackID(opCtx));
+ _assertRollbackIDDocument(opCtx, 0);
+
+ rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, 0);
+
+ ASSERT_OK(storage.incrementRollbackID(opCtx));
+ _assertRollbackIDDocument(opCtx, 1);
+
+ rbid = unittest::assertGet(storage.getRollbackID(opCtx));
+ ASSERT_EQUALS(rbid, 1);
+}
+
+TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfDocumentHasBadField) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+ NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace);
+
+ createCollection(opCtx, nss);
+
+ auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId << "bad field" << 3)};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, badDoc));
+ ASSERT_EQUALS(mongo::AssertionException::convertExceptionCode(40415),
+ storage.getRollbackID(opCtx).getStatus());
+}
+
+TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfRollbackIDIsNotInt) {
+ StorageInterfaceImpl storage;
+ auto opCtx = getOperationContext();
+ NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace);
+
+ createCollection(opCtx, nss);
+
+ auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
+ << StorageInterfaceImpl::kRollbackIdFieldName
+ << "bad id")};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, badDoc));
+ ASSERT_EQUALS(mongo::AssertionException::convertExceptionCode(40410),
+ storage.getRollbackID(opCtx).getStatus());
+}
+
TEST_F(StorageInterfaceImplTest, SnapshotSupported) {
auto opCtx = getOperationContext();
Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
@@ -765,22 +902,6 @@ std::string _toString(const std::vector<BSONObj>& docs) {
}
/**
- * Check collection contents. OplogInterface returns documents in reverse natural order.
- */
-void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<BSONObj>& docs) {
- std::vector<BSONObj> reversedDocs(docs);
- std::reverse(reversedDocs.begin(), reversedDocs.end());
- OplogInterfaceLocal oplog(opCtx, nss.ns());
- auto iter = oplog.makeIterator();
- for (const auto& doc : reversedDocs) {
- ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first);
- }
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
-}
-
-/**
* Check StatusWith<std::vector<BSONObj>> value.
*/
void _assertDocumentsEqual(const StatusWith<std::vector<BSONObj>>& statusWithDocs,
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index f067a759032..531e4b17310 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -69,6 +69,35 @@ void StorageInterfaceMock::setMinValidToAtLeast(OperationContext* opCtx, const O
_minValid = std::max(_minValid, minValid);
}
+StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ if (!_rbidInitialized) {
+ return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized");
+ }
+ return _rbid;
+}
+
+Status StorageInterfaceMock::initializeRollbackID(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ if (_rbidInitialized) {
+ return Status(ErrorCodes::NamespaceExists, "Rollback ID already initialized");
+ }
+ _rbidInitialized = true;
+
+ // Start the mock RBID at a very high number to differentiate it from uninitialized RBIDs.
+ _rbid = 100;
+ return Status::OK();
+}
+
+Status StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ if (!_rbidInitialized) {
+ return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized");
+ }
+ _rbid++;
+ return Status::OK();
+}
+
void StorageInterfaceMock::setOplogDeleteFromPoint(OperationContext* opCtx,
const Timestamp& timestamp) {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 33b25731dcb..17a17b20b87 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -129,6 +129,9 @@ public:
OpTime getMinValid(OperationContext* opCtx) const override;
void setMinValid(OperationContext* opCtx, const OpTime& minValid) override;
void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
+ StatusWith<int> getRollbackID(OperationContext* opCtx) override;
+ Status initializeRollbackID(OperationContext* opCtx) override;
+ Status incrementRollbackID(OperationContext* opCtx) override;
void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override;
void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
@@ -215,7 +218,7 @@ public:
const NamespaceString& nss,
const BSONElement& idKey,
const BSONObj& update) override {
- return Status{ErrorCodes::IllegalOperation, "upsertbyId not implemented."};
+ return Status{ErrorCodes::IllegalOperation, "upsertById not implemented."};
}
Status deleteByFilter(OperationContext* opCtx,
@@ -298,6 +301,8 @@ private:
mutable stdx::mutex _minValidBoundariesMutex;
OpTime _appliedThrough;
OpTime _minValid;
+ int _rbid;
+ bool _rbidInitialized = false;
Timestamp _oplogDeleteFromPoint;
};
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index 948a471b245..d14be44f6fe 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -477,7 +477,6 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
SyncSourceResolverResponse response;
response.syncSourceStatus = std::move(result);
if (response.isOK() && !response.getSyncSource().empty()) {
- invariant(_requiredOpTime.isNull() || _rbid);
response.rbid = _rbid;
}
return _finishCallback(response);
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index fe85fdab89b..9cac75212fd 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor_pool.h"
@@ -123,6 +124,10 @@ void ShardingMongodTestFixture::setUp() {
repl::ReplicationCoordinator::set(service, std::move(replCoordPtr));
+ auto storagePtr = stdx::make_unique<repl::StorageInterfaceMock>();
+ storagePtr->initializeRollbackID(_opCtx.get());
+ repl::StorageInterface::set(service, std::move(storagePtr));
+
service->setOpObserver(stdx::make_unique<OpObserverImpl>());
repl::setOplogCollectionName();
repl::createOplog(_opCtx.get());