diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-04-21 10:27:59 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-04-28 19:52:01 -0400 |
commit | 7358c66cbf77203fa0803417a4442e35f11bf3f7 (patch) | |
tree | 947ec510b25c7643e3eb8c9e948fdaa75c163888 /src | |
parent | 60636b4d3ae60a24c080c7250459814eef5e7c87 (diff) | |
download | mongo-7358c66cbf77203fa0803417a4442e35f11bf3f7.tar.gz |
SERVER-27659 Persist Rollback Id
Diffstat (limited to 'src')
21 files changed, 375 insertions, 99 deletions
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index aefee2bb4df..da379905d84 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -1313,7 +1313,8 @@ void appendReplyMetadata(OperationContext* opCtx, // Attach our own last opTime. repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - replCoord->prepareReplMetadata(request.getMetadata(), lastOpTimeFromClient, metadataBob); + replCoord->prepareReplMetadata( + opCtx, request.getMetadata(), lastOpTimeFromClient, metadataBob); // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18236 if (isShardingAware || isConfig) { diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 9cc1ace7a04..b38283f4448 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -135,6 +135,17 @@ env.CppUnitTest( ) env.Library( + target='rollback_idl', + source=[ + env.Idlc('rollback.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/idl/idl_parser', + ], +) + +env.Library( target='storage_interface', source=[ 'storage_interface.cpp', @@ -153,6 +164,7 @@ env.Library( 'collection_bulk_loader_impl.cpp', ], LIBDEPS=[ + 'rollback_idl', 'storage_interface', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/exec/exec', diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index 86da2bec9db..bd39f2e18e9 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -169,8 +169,13 @@ public: if (!status.isOK()) return appendCommandStatus(result, status); - status = getGlobalReplicationCoordinator()->processReplSetGetRBID(&result); - return appendCommandStatus(result, status); + auto rbid = StorageInterface::get(opCtx)->getRollbackID(opCtx); + + // We should always have a Rollback ID since it is created at startup. + fassertStatusOK(40426, rbid.getStatus()); + + result.append("rbid", rbid.getValue()); + return appendCommandStatus(result, Status::OK()); } } cmdReplSetRBID; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c53f5a7b655..a9ccd9f642a 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -633,17 +633,6 @@ public: const BSONObj& configObj, BSONObjBuilder* resultObj) = 0; - /* - * Handles an incoming replSetGetRBID command. - * Adds BSON to 'resultObj'; returns a Status with either OK or an error message. - */ - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) = 0; - - /** - * Increments this process's rollback id. Called every time a rollback occurs. - */ - virtual void incrementRollbackID() = 0; - /** * Arguments to the replSetFresh command. */ @@ -765,7 +754,8 @@ public: * Prepares a metadata object with the ReplSetMetadata and the OplogQueryMetadata depending * on what has been requested. */ - virtual void prepareReplMetadata(const BSONObj& metadataRequestObj, + virtual void prepareReplMetadata(OperationContext* opCtx, + const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const = 0; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 72c45415344..e538718650c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -329,13 +329,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( return; } - std::unique_ptr<SecureRandom> rbidGenerator(SecureRandom::create()); - _rbid = static_cast<int>(rbidGenerator->nextInt64()); - if (_rbid < 0) { - // Ensure _rbid is always positive - _rbid = -_rbid; - } - // Make sure there is always an entry in _slaveInfo for ourself. SlaveInfo selfInfo; selfInfo.self = true; @@ -430,6 +423,19 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* opCtx) _topCoord->loadLastVote(lastVote.getValue()); } + // Check that we have a local Rollback ID. If we do not have one, create one. + auto rbid = _storage->getRollbackID(opCtx); + if (!rbid.isOK()) { + if (rbid.getStatus() == ErrorCodes::NamespaceNotFound) { + log() << "Did not find local Rollback ID document at startup. Creating one."; + auto initializingStatus = _storage->initializeRollbackID(opCtx); + fassertStatusOK(40424, initializingStatus); + } else { + severe() << "Error loading local Rollback ID document at startup; " << rbid.getStatus(); + fassertFailedNoTrace(40428); + } + } + StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(opCtx); if (!cfg.isOK()) { log() << "Did not find local replica set configuration document at startup; " @@ -2867,17 +2873,6 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _externalState->stopProducer(); } -Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - resultObj->append("rbid", _rbid); - return Status::OK(); -} - -void ReplicationCoordinatorImpl::incrementRollbackID() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - ++_rbid; -} - Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -3329,7 +3324,8 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj, +void ReplicationCoordinatorImpl::prepareReplMetadata(OperationContext* opCtx, + const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const { @@ -3340,6 +3336,9 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ return; } + auto rbid = _storage->getRollbackID(opCtx); + fassertStatusOK(40427, rbid.getStatus()); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (hasReplSetMetadata) { @@ -3347,7 +3346,7 @@ void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequ } if (hasOplogQueryMetadata) { - _prepareOplogQueryMetadata_inlock(builder); + _prepareOplogQueryMetadata_inlock(rbid.getValue(), builder); } } @@ -3359,10 +3358,11 @@ void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& la metadata.writeToMetadata(builder); } -void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const { +void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(int rbid, + BSONObjBuilder* builder) const { OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); auto metadata = - _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, _rbid); + _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, rbid); metadata.writeToMetadata(builder); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index a34d7bb8df9..04c22e2e829 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -234,10 +234,6 @@ public: const BSONObj& configObj, BSONObjBuilder* resultObj) override; - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj) override; - - virtual void incrementRollbackID() override; - virtual Status processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj) override; @@ -282,7 +278,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) override; - virtual void prepareReplMetadata(const BSONObj& metadataRequestObj, + virtual void prepareReplMetadata(OperationContext* opCtx, + const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const override; @@ -1091,7 +1088,7 @@ private: /** * Prepares a metadata object for OplogQueryMetadata. */ - void _prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const; + void _prepareOplogQueryMetadata_inlock(int rbid, BSONObjBuilder* builder) const; /** * Blesses a snapshot to be used for new committed reads. @@ -1290,10 +1287,6 @@ private: // updates upstream. Set once in startReplication() and then never modified again. OID _myRID; // (M) - // Rollback ID. Used to check if a rollback happened during some interval of time - // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. - int _rbid; // (M) - // list of information about clients waiting on replication. Does *not* own the WaiterInfos. WaiterList _replicationWaiterList; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index f1c0f7b43b7..2fd396c1218 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -190,7 +190,7 @@ TEST_F(ReplCoordTest, NodeEntersStartupStateWhenStartingUpWithNoLocalConfig) { startCapturingLogMessages(); start(); stopCapturingLogMessages(); - ASSERT_EQUALS(2, countLogLinesContaining("Did not find local ")); + ASSERT_EQUALS(3, countLogLinesContaining("Did not find local ")); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); } @@ -583,19 +583,6 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenCheckReplEnabledForCommandAfterReceivingA ASSERT_TRUE(result.obj().isEmpty()); } -TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCalled) { - start(); - BSONObjBuilder result; - getReplCoord()->processReplSetGetRBID(&result); - long long initialValue = result.obj()["rbid"].Int(); - getReplCoord()->incrementRollbackID(); - - BSONObjBuilder result2; - getReplCoord()->processReplSetGetRBID(&result2); - long long incrementedValue = result2.obj()["rbid"].Int(); - ASSERT_EQUALS(incrementedValue, initialValue + 1); -} - TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) { init(""); auto opCtx = makeOperationContext(); @@ -4071,13 +4058,11 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { getReplCoord()->advanceCommitPoint(optime1); getReplCoord()->setMyLastAppliedOpTime(optime2); - // Get current rbid to check against. - BSONObjBuilder result; - getReplCoord()->processReplSetGetRBID(&result); - int initialValue = result.obj()["rbid"].Int(); + auto opCtx = makeOperationContext(); BSONObjBuilder metadataBob; getReplCoord()->prepareReplMetadata( + opCtx.get(), BSON(rpc::kOplogQueryMetadataFieldName << 1 << rpc::kReplSetMetadataFieldName << 1), OpTime(), &metadataBob); @@ -4089,7 +4074,7 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { ASSERT_OK(oqMetadata.getStatus()); ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1); ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2); - ASSERT_EQ(oqMetadata.getValue().getRBID(), initialValue); + ASSERT_EQ(oqMetadata.getValue().getRBID(), 100); ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1); ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 18397d23a23..5938c283371 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -326,12 +326,6 @@ Status ReplicationCoordinatorMock::processReplSetInitiate(OperationContext* opCt return Status::OK(); } -Status ReplicationCoordinatorMock::processReplSetGetRBID(BSONObjBuilder* resultObj) { - return Status::OK(); -} - -void ReplicationCoordinatorMock::incrementRollbackID() {} - Status ReplicationCoordinatorMock::processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj) { return Status::OK(); @@ -416,7 +410,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorMock::prepareReplMetadata(const BSONObj& metadataRequestObj, +void ReplicationCoordinatorMock::prepareReplMetadata(OperationContext* opCtx, + const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 72bc120f226..93dea9521de 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -185,10 +185,6 @@ public: const BSONObj& configObj, BSONObjBuilder* resultObj); - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj); - - virtual void incrementRollbackID(); - virtual Status processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj); virtual Status processReplSetElect(const ReplSetElectArgs& args, BSONObjBuilder* resultObj); @@ -226,7 +222,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); - void prepareReplMetadata(const BSONObj& metadataRequestObj, + void prepareReplMetadata(OperationContext* opCtx, + const BSONObj& metadataRequestObj, const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const override; diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 18ad4ab2d11..a852ff23041 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage/storage_options.h" @@ -168,7 +169,11 @@ public: BSONObjBuilder result; appendReplicationInfo(opCtx, result, level); - getGlobalReplicationCoordinator()->processReplSetGetRBID(&result); + + auto rbid = StorageInterface::get(opCtx)->getRollbackID(opCtx); + if (rbid.isOK()) { + result.append("rbid", rbid.getValue()); + } return result.obj(); } diff --git a/src/mongo/db/repl/rollback.idl b/src/mongo/db/repl/rollback.idl new file mode 100644 index 00000000000..62208d617d1 --- /dev/null +++ b/src/mongo/db/repl/rollback.idl @@ -0,0 +1,44 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# Rollback IDL File + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + rollbackID: + description: A document in which the server stores its Rollback ID on disk. + fields: + _id: + type: string + description: "The _id of the document stored in the database" + rollbackId: + type: int + description: "The Rollback ID stored in the document" diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 764a84a281d..39b73b2ecf5 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -76,6 +76,7 @@ void RollbackTest::setUp() { setOplogCollectionName(); _storageInterface.setAppliedThrough(_opCtx.get(), OpTime{}); _storageInterface.setMinValid(_opCtx.get(), OpTime{}); + _storageInterface.initializeRollbackID(_opCtx.get()); _threadPoolExecutorTest.launchExecutorThread(); } diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 54e233728f1..8fc56ae9aca 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -826,7 +826,10 @@ Status _syncRollback(OperationContext* opCtx, log() << "rollback common point is " << how.commonPoint; log() << "rollback 3 fixup"; try { - ON_BLOCK_EXIT([&] { replCoord->incrementRollbackID(); }); + ON_BLOCK_EXIT([&] { + auto status = storageInterface->incrementRollbackID(opCtx); + fassertStatusOK(40425, status); + }); syncFixUp(opCtx, how, rollbackSource, replCoord, storageInterface); } catch (const RSFatalException& e) { return Status(ErrorCodes::UnrecoverableRollbackError, e.what(), 18753); diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 2c8a2c2635e..12f0d25507f 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -135,6 +135,13 @@ public: virtual void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) = 0; /** + * Rollback ID is an increasing counter of how many rollbacks have occurred on this server. + */ + virtual StatusWith<int> getRollbackID(OperationContext* opCtx) = 0; + virtual Status initializeRollbackID(OperationContext* opCtx) = 0; + virtual Status incrementRollbackID(OperationContext* opCtx) = 0; + + /** * On startup all oplog entries with a value >= the oplog delete from point should be deleted. * If null, no documents should be deleted. */ diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 3c2325ae78a..c187c0871ca 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -69,6 +69,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/rollback_gen.h" #include "mongo/db/repl/task_runner.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" @@ -83,6 +84,9 @@ const char StorageInterfaceImpl::kDefaultMinValidNamespace[] = "local.replset.mi const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync"; const char StorageInterfaceImpl::kBeginFieldName[] = "begin"; const char StorageInterfaceImpl::kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint"; +const char StorageInterfaceImpl::kDefaultRollbackIdNamespace[] = "local.system.rollback.id"; +const char StorageInterfaceImpl::kRollbackIdFieldName[] = "rollbackId"; +const char StorageInterfaceImpl::kRollbackIdDocumentId[] = "rollbackId"; namespace { using UniqueLock = stdx::unique_lock<stdx::mutex>; @@ -97,7 +101,8 @@ StorageInterfaceImpl::StorageInterfaceImpl() : StorageInterfaceImpl(NamespaceString(StorageInterfaceImpl::kDefaultMinValidNamespace)) {} StorageInterfaceImpl::StorageInterfaceImpl(const NamespaceString& minValidNss) - : _minValidNss(minValidNss) {} + : _minValidNss(minValidNss), + _rollbackIdNss(StorageInterfaceImpl::kDefaultRollbackIdNamespace) {} NamespaceString StorageInterfaceImpl::getMinValidNss() const { return _minValidNss; @@ -129,6 +134,73 @@ void StorageInterfaceImpl::updateMinValidDocument(OperationContext* opCtx, opCtx, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns()); } +StatusWith<int> StorageInterfaceImpl::getRollbackID(OperationContext* opCtx) { + BSONObjBuilder bob; + bob.append("_id", kRollbackIdDocumentId); + auto id = bob.obj(); + + try { + auto rbidDoc = findById(opCtx, _rollbackIdNss, id["_id"]); + if (!rbidDoc.isOK()) { + return rbidDoc.getStatus(); + } + + auto rbid = RollbackID::parse(IDLParserErrorContext("RollbackID"), rbidDoc.getValue()); + invariant(rbid.get_id() == kRollbackIdDocumentId); + return rbid.getRollbackId(); + } catch (...) { + return exceptionToStatus(); + } + + MONGO_UNREACHABLE; +} + +Status StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) { + auto status = createCollection(opCtx, _rollbackIdNss, CollectionOptions()); + if (!status.isOK()) { + return status; + } + + RollbackID rbid; + rbid.set_id(kRollbackIdDocumentId); + rbid.setRollbackId(0); + + BSONObjBuilder bob; + rbid.serialize(&bob); + return insertDocument(opCtx, _rollbackIdNss, bob.done()); +} + +Status StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) { + // This is safe because this is only called during rollback, and you can not have two + // rollbacks at once. + auto rbid = getRollbackID(opCtx); + if (!rbid.isOK()) { + return rbid.getStatus(); + } + + // If we would go over the integer limit, reset the Rollback ID to 0. + BSONObjBuilder updateBob; + if (rbid.getValue() == std::numeric_limits<int>::max()) { + BSONObjBuilder setBob(updateBob.subobjStart("$set")); + setBob.append(kRollbackIdFieldName, 0); + } else { + BSONObjBuilder incBob(updateBob.subobjStart("$inc")); + incBob.append(kRollbackIdFieldName, 1); + } + + // Since the Rollback ID is in a singleton collection, we can fix the _id field. + BSONObjBuilder bob; + bob.append("_id", kRollbackIdDocumentId); + auto id = bob.obj(); + Status status = upsertById(opCtx, _rollbackIdNss, id["_id"], updateBob.obj()); + + // We wait until durable so that we are sure the Rollback ID is updated before rollback ends. + if (status.isOK()) { + opCtx->recoveryUnit()->waitUntilDurable(); + } + return status; +} + bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* opCtx) const { const BSONObj doc = getMinValidDocument(opCtx); const auto flag = doc[kInitialSyncFlagFieldName].trueValue(); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index adc372f23fd..f2f52ea4e5e 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -49,6 +49,9 @@ public: static const char kInitialSyncFlagFieldName[]; static const char kBeginFieldName[]; static const char kOplogDeleteFromPointFieldName[]; + static const char kDefaultRollbackIdNamespace[]; + static const char kRollbackIdFieldName[]; + static const char kRollbackIdDocumentId[]; StorageInterfaceImpl(); explicit StorageInterfaceImpl(const NamespaceString& minValidNss); @@ -67,6 +70,9 @@ public: OpTime getMinValid(OperationContext* opCtx) const override; void setMinValid(OperationContext* opCtx, const OpTime& minValid) override; void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& endOpTime) override; + StatusWith<int> getRollbackID(OperationContext* opCtx) override; + Status initializeRollbackID(OperationContext* opCtx) override; + Status incrementRollbackID(OperationContext* opCtx) override; void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override; void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; @@ -151,6 +157,7 @@ private: void updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec); const NamespaceString _minValidNss; + const NamespaceString _rollbackIdNss; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 534641674de..ab06431d442 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -337,6 +337,143 @@ TEST_F(StorageInterfaceImplTest, MinValid) { ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); } +TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsNamespaceNotFoundOnMissingCollection) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, storage.getRollbackID(opCtx).getStatus()); +} + +TEST_F(StorageInterfaceImplTest, IncrementRollbackIDReturnsNamespaceNotFoundOnMissingCollection) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, storage.incrementRollbackID(opCtx)); +} + +TEST_F(StorageInterfaceImplTest, InitializeRollbackIDReturnsNamespaceExistsOnExistingCollection) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + + createCollection(opCtx, NamespaceString(StorageInterfaceImpl::kDefaultRollbackIdNamespace)); + ASSERT_EQUALS(ErrorCodes::NamespaceExists, storage.initializeRollbackID(opCtx)); +} + +TEST_F(StorageInterfaceImplTest, + InitializeRollbackIDReturnsNamespaceExistsIfItHasAlreadyBeenInitialized) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + + ASSERT_OK(storage.initializeRollbackID(opCtx)); + ASSERT_EQUALS(ErrorCodes::NamespaceExists, storage.initializeRollbackID(opCtx)); +} + +/** + * Check collection contents. OplogInterface returns documents in reverse natural order. + */ +void _assertDocumentsInCollectionEquals(OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { + std::vector<BSONObj> reversedDocs(docs); + std::reverse(reversedDocs.begin(), reversedDocs.end()); + OplogInterfaceLocal oplog(opCtx, nss.ns()); + auto iter = oplog.makeIterator(); + for (const auto& doc : reversedDocs) { + ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first); + } + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); +} + +/** + * Check collection contents for a singleton Rollback ID document. + */ +void _assertRollbackIDDocument(OperationContext* opCtx, int id) { + _assertDocumentsInCollectionEquals( + opCtx, + NamespaceString(StorageInterfaceImpl::kDefaultRollbackIdNamespace), + {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId + << StorageInterfaceImpl::kRollbackIdFieldName + << id)}); +} + +TEST_F(StorageInterfaceImplTest, RollbackIdInitializesIncrementsAndReadsProperly) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + + ASSERT_OK(storage.initializeRollbackID(opCtx)); + _assertRollbackIDDocument(opCtx, 0); + + auto rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, 0); + + ASSERT_OK(storage.incrementRollbackID(opCtx)); + _assertRollbackIDDocument(opCtx, 1); + + rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, 1); + + ASSERT_OK(storage.incrementRollbackID(opCtx)); + _assertRollbackIDDocument(opCtx, 2); + + rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, 2); +} + +TEST_F(StorageInterfaceImplTest, IncrementRollbackIDRollsToZeroWhenExceedingMaxInt) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace); + createCollection(opCtx, nss); + auto maxDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId + << StorageInterfaceImpl::kRollbackIdFieldName + << std::numeric_limits<int>::max())}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, maxDoc)); + _assertRollbackIDDocument(opCtx, std::numeric_limits<int>::max()); + + auto rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, std::numeric_limits<int>::max()); + + ASSERT_OK(storage.incrementRollbackID(opCtx)); + _assertRollbackIDDocument(opCtx, 0); + + rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, 0); + + ASSERT_OK(storage.incrementRollbackID(opCtx)); + _assertRollbackIDDocument(opCtx, 1); + + rbid = unittest::assertGet(storage.getRollbackID(opCtx)); + ASSERT_EQUALS(rbid, 1); +} + +TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfDocumentHasBadField) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace); + + createCollection(opCtx, nss); + + auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId << "bad field" << 3)}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, badDoc)); + ASSERT_EQUALS(mongo::AssertionException::convertExceptionCode(40415), + storage.getRollbackID(opCtx).getStatus()); +} + +TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfRollbackIDIsNotInt) { + StorageInterfaceImpl storage; + auto opCtx = getOperationContext(); + NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace); + + createCollection(opCtx, nss); + + auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId + << StorageInterfaceImpl::kRollbackIdFieldName + << "bad id")}; + ASSERT_OK(storage.insertDocuments(opCtx, nss, badDoc)); + ASSERT_EQUALS(mongo::AssertionException::convertExceptionCode(40410), + storage.getRollbackID(opCtx).getStatus()); +} + TEST_F(StorageInterfaceImplTest, SnapshotSupported) { auto opCtx = getOperationContext(); Status status = opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); @@ -765,22 +902,6 @@ std::string _toString(const std::vector<BSONObj>& docs) { } /** - * Check collection contents. OplogInterface returns documents in reverse natural order. - */ -void _assertDocumentsInCollectionEquals(OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<BSONObj>& docs) { - std::vector<BSONObj> reversedDocs(docs); - std::reverse(reversedDocs.begin(), reversedDocs.end()); - OplogInterfaceLocal oplog(opCtx, nss.ns()); - auto iter = oplog.makeIterator(); - for (const auto& doc : reversedDocs) { - ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first); - } - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); -} - -/** * Check StatusWith<std::vector<BSONObj>> value. */ void _assertDocumentsEqual(const StatusWith<std::vector<BSONObj>>& statusWithDocs, diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index f067a759032..531e4b17310 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -69,6 +69,35 @@ void StorageInterfaceMock::setMinValidToAtLeast(OperationContext* opCtx, const O _minValid = std::max(_minValid, minValid); } +StatusWith<int> StorageInterfaceMock::getRollbackID(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + if (!_rbidInitialized) { + return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized"); + } + return _rbid; +} + +Status StorageInterfaceMock::initializeRollbackID(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + if (_rbidInitialized) { + return Status(ErrorCodes::NamespaceExists, "Rollback ID already initialized"); + } + _rbidInitialized = true; + + // Start the mock RBID at a very high number to differentiate it from uninitialized RBIDs. + _rbid = 100; + return Status::OK(); +} + +Status StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + if (!_rbidInitialized) { + return Status(ErrorCodes::NamespaceNotFound, "Rollback ID not initialized"); + } + _rbid++; + return Status::OK(); +} + void StorageInterfaceMock::setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) { stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 33b25731dcb..17a17b20b87 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -129,6 +129,9 @@ public: OpTime getMinValid(OperationContext* opCtx) const override; void setMinValid(OperationContext* opCtx, const OpTime& minValid) override; void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override; + StatusWith<int> getRollbackID(OperationContext* opCtx) override; + Status initializeRollbackID(OperationContext* opCtx) override; + Status incrementRollbackID(OperationContext* opCtx) override; void setOplogDeleteFromPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogDeleteFromPoint(OperationContext* opCtx) override; void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; @@ -215,7 +218,7 @@ public: const NamespaceString& nss, const BSONElement& idKey, const BSONObj& update) override { - return Status{ErrorCodes::IllegalOperation, "upsertbyId not implemented."}; + return Status{ErrorCodes::IllegalOperation, "upsertById not implemented."}; } Status deleteByFilter(OperationContext* opCtx, @@ -298,6 +301,8 @@ private: mutable stdx::mutex _minValidBoundariesMutex; OpTime _appliedThrough; OpTime _minValid; + int _rbid; + bool _rbidInitialized = false; Timestamp _oplogDeleteFromPoint; }; diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index 948a471b245..d14be44f6fe 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -477,7 +477,6 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { SyncSourceResolverResponse response; response.syncSourceStatus = std::move(result); if (response.isOK() && !response.getSyncSource().empty()) { - invariant(_requiredOpTime.isNull() || _rbid); response.rbid = _rbid; } return _finishCallback(response); diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index fe85fdab89b..9cac75212fd 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -48,6 +48,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" @@ -123,6 +124,10 @@ void ShardingMongodTestFixture::setUp() { repl::ReplicationCoordinator::set(service, std::move(replCoordPtr)); + auto storagePtr = stdx::make_unique<repl::StorageInterfaceMock>(); + storagePtr->initializeRollbackID(_opCtx.get()); + repl::StorageInterface::set(service, std::move(storagePtr)); + service->setOpObserver(stdx::make_unique<OpObserverImpl>()); repl::setOplogCollectionName(); repl::createOplog(_opCtx.get()); |