diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-03-08 17:14:26 -0500 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-03-08 17:14:26 -0500 |
commit | c05f900dd80342d0899f6461f845dc97fe942b01 (patch) | |
tree | 8a540d49401bfecced41b6cd5fd6a1659d4123a6 /src | |
parent | d44517bc2cfefbc3c1a68626d511f0e2ade559f6 (diff) | |
download | mongo-c05f900dd80342d0899f6461f845dc97fe942b01.tar.gz |
SERVER-27403 Ensure sync source is ahead and has not rolled back after first OplogFetcher batch
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 84 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 291 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 35 |
16 files changed, 421 insertions, 160 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 56e5ab0e0c0..c0211a21022 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -270,6 +270,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; HostAndPort source; + HostAndPort oldSource = _syncSourceHost; SyncSourceResolverResponse syncSourceResp; { const OpTime minValidSaved = storageInterface->getMinValid(opCtx); @@ -332,6 +333,16 @@ void BackgroundSync::_produce(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); _syncSourceHost = syncSourceResp.getSyncSource(); source = _syncSourceHost; + + // If our sync source has not changed, it is likely caused by our heartbeat data map being + // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting + // for our map to update. + if (oldSource == source) { + log() << "Chose same sync source candidate as last time, " << source + << ". Sleeping for 1 second to avoid immediately choosing a new sync source for " + "the same reason as last time."; + sleepsecs(1); + } } else { if (!syncSourceResp.isOK()) { log() << "failed to find sync source, received error " @@ -367,7 +378,6 @@ void BackgroundSync::_produce(OperationContext* opCtx) { Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); - auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this. OplogFetcher* oplogFetcher; try { auto executor = _replicationCoordinatorExternalState->getTaskExecutor(); @@ -385,13 +395,14 @@ void BackgroundSync::_produce(OperationContext* opCtx) { NamespaceString(rsOplogName), config, _replicationCoordinatorExternalState->getOplogFetcherMaxFetcherRestarts(), + syncSourceResp.rbid, + true /* requireFresherSyncSource */, &dataReplicatorExternalState, stdx::bind(&BackgroundSync::_enqueueDocuments, this, stdx::placeholders::_1, stdx::placeholders::_2, - stdx::placeholders::_3, - &rbidCopyForFetcher), + stdx::placeholders::_3), onOplogFetcherShutdownCallbackFn); oplogFetcher = _oplogFetcher.get(); } catch (const mongo::DBException& ex) { @@ -428,8 +439,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { // Do not blacklist the server here, it will be blacklisted when we try to reuse it, // if it can't return a matching oplog start from the last fetch oplog ts field. return; - } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || - fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) { if (_replCoord->getMemberState().primary()) { // TODO: Abort catchup mode early if rollback detected. warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; @@ -503,50 +513,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info, - boost::optional<int>* requiredRBID) { - // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back - // since that could cause it to not have our required minValid point. The cursor will be killed - // if the upstream node rolls back so we don't need to keep checking. This must be blocking - // since the Fetcher doesn't give us a way to defer sending the getmores after we return. - if (*requiredRBID) { - auto rbidStatus = Status(ErrorCodes::InternalError, ""); - auto handle = - _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand( - {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr}, - [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { - rbidStatus = rbidReply.response.status; - if (!rbidStatus.isOK()) - return; - - rbidStatus = getStatusFromCommandResult(rbidReply.response.data); - if (!rbidStatus.isOK()) - return; - - const auto rbidElem = rbidReply.response.data["rbid"]; - if (rbidElem.type() != NumberInt) { - rbidStatus = Status(ErrorCodes::BadValue, - str::stream() << "Upstream node returned an " - << "rbid with invalid type " - << rbidElem.type()); - return; - } - if (rbidElem.Int() != **requiredRBID) { - rbidStatus = Status(ErrorCodes::BadValue, - "Upstream node rolled back after verifying " - "that it had our MinValid point. Retrying."); - } - }); - if (!handle.isOK()) - return handle.getStatus(); - - _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue()); - if (!rbidStatus.isOK()) - return rbidStatus; - - requiredRBID->reset(); // Don't come back to this block while on this cursor. - } - + const OplogFetcher::DocumentsInfo& info) { // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index e46d7842936..f0c50f870e9 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -158,8 +158,7 @@ private: */ Status _enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info, - boost::optional<int>* requiredRBID); + const OplogFetcher::DocumentsInfo& info); // restart syncing void start(OperationContext* opCtx); diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 92e64cbb993..81d63055d72 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -682,6 +682,8 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp( _opts.remoteOplogNS, config, _opts.oplogFetcherMaxFetcherRestarts, + _rollbackChecker->getBaseRBID(), + false /* requireFresherSyncSource */, _dataReplicatorExternalState.get(), stdx::bind(&DataReplicator::_enqueueDocuments, this, diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 61a5dcb8f2d..f6eaae1fa99 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -476,10 +476,17 @@ BSONObj makeRollbackCheckerResponse(int rollbackId) { /** * Generates a cursor response for a Fetcher to consume. */ -BSONObj makeCursorResponse(CursorId cursorId, - const NamespaceString& nss, - std::vector<BSONObj> docs, - bool isFirstBatch = true) { +RemoteCommandResponse makeCursorResponse(CursorId cursorId, + const NamespaceString& nss, + std::vector<BSONObj> docs, + bool isFirstBatch = true, + int rbid = 1) { + OpTime futureOpTime(Timestamp(1000, 1000), 1000); + rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0); + BSONObjBuilder metadataBob; + ASSERT_OK(oqMetadata.writeToMetadata(&metadataBob)); + auto metadataObj = metadataBob.obj(); + BSONObjBuilder bob; { BSONObjBuilder cursorBob(bob.subobjStart("cursor")); @@ -494,7 +501,7 @@ BSONObj makeCursorResponse(CursorId cursorId, } } bob.append("ok", 1); - return bob.obj(); + return {bob.obj(), metadataObj, Milliseconds(0)}; } /** @@ -2889,11 +2896,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError) // MultiApplier next time _getNextApplierBatchCallback() runs. net->scheduleSuccessfulResponse( oplogFetcherNoi, - executor::RemoteCommandResponse( - makeCursorResponse( - 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}), - BSONObj(), - Milliseconds(0))); + makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)})); net->runReadyNetworkOperations(); // Ignore OplogFetcher's getMore request. diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index d8aacca5a04..052de5cba7f 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -144,21 +144,75 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) { * Checks the first batch of results from query. * 'documents' are the first batch of results returned from tailing the remote oplog. * 'lastFetched' optime and hash should be consistent with the predicate in the query. - * Returns RemoteOplogStale if the oplog query has no results. + * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for + * compatibility with 3.4 servers that do not send OplogQueryMetadata. + * 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to + * guarantee we have not rolled back since we confirmed the sync source had our minValid. + * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional + * for compatibility with 3.4 servers that do not send OplogQueryMetadata. + * 'requireFresherSyncSource' is a boolean indicating whether we should require the sync source's + * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point + * as ours, but still cannot be behind ours. + * + * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8. + * * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in * the remote oplog. */ -Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) { +Status checkRemoteOplogStart(const Fetcher::Documents& documents, + OpTimeWithHash lastFetched, + boost::optional<OpTime> remoteLastOpApplied, + int requiredRBID, + boost::optional<int> remoteRBID, + bool requireFresherSyncSource) { + // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back + // since that could cause it to not have our required minValid point. The cursor will be + // killed if the upstream node rolls back so we don't need to keep checking once the cursor + // is established. + if (remoteRBID && (*remoteRBID != requiredRBID)) { + return Status(ErrorCodes::InvalidSyncSource, + "Upstream node rolled back after choosing it as a sync source. Choosing " + "new sync source."); + } + + // The SyncSourceResolver never checks that the sync source candidate is actually ahead of + // us. Rather than have it check there with an extra network roundtrip, we check here. + if (requireFresherSyncSource && remoteLastOpApplied && + (*remoteLastOpApplied <= lastFetched.opTime)) { + return Status(ErrorCodes::InvalidSyncSource, + str::stream() << "Sync source's last applied OpTime " + << remoteLastOpApplied->toString() + << " is not greater than our last fetched OpTime " + << lastFetched.opTime.toString() + << ". Choosing new sync source."); + } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { + // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied + // since we fetch the sync source's last applied OpTime to determine where to start our + // OplogFetcher. This is fine since no other node can sync off of an initial syncing node + // and thus cannot form a sync source cycle. To account for this, we must relax the + // constraint on our sync source being fresher. + return Status(ErrorCodes::InvalidSyncSource, + str::stream() << "Sync source's last applied OpTime " + << remoteLastOpApplied->toString() + << " is older than our last fetched OpTime " + << lastFetched.opTime.toString() + << ". Choosing new sync source."); + } + + // At this point we know that our sync source has our minValid and is ahead of us, so if our + // history diverges from our sync source's we should prefer its history and roll back ours. + + // Since we checked for rollback and our sync source is ahead of us, an empty batch means that + // we have a higher timestamp on our last fetched OpTime than our sync source's last applied + // OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry. if (documents.empty()) { - // The GTE query from upstream returns nothing, so we're ahead of the upstream. - return Status(ErrorCodes::RemoteOplogStale, - str::stream() << "We are ahead of the sync source. Our last op time fetched: " - << lastFetched.opTime.toString()); + return Status(ErrorCodes::OplogStartMissing, "Received an empty batch from sync source."); } + const auto& o = documents.front(); auto opTimeResult = OpTime::parseFromOplogEntry(o); if (!opTimeResult.isOK()) { - return Status(ErrorCodes::OplogStartMissing, + return Status(ErrorCodes::InvalidBSON, str::stream() << "our last op time fetched: " << lastFetched.opTime.toString() << " (hash: " << lastFetched.value @@ -277,6 +331,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, NamespaceString nss, ReplSetConfig config, std::size_t maxFetcherRestarts, + int requiredRBID, + bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn) @@ -285,6 +341,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, _nss(nss), _metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))), _maxFetcherRestarts(maxFetcherRestarts), + _requiredRBID(requiredRBID), + _requireFresherSyncSource(requireFresherSyncSource), _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), _awaitDataTimeout(calculateAwaitDataTimeout(config)), @@ -480,9 +538,17 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { - auto status = checkRemoteOplogStart(documents, opTimeWithHash); + auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; + auto remoteLastApplied = + oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; + auto status = checkRemoteOplogStart(documents, + opTimeWithHash, + remoteLastApplied, + _requiredRBID, + remoteRBID, + _requireFresherSyncSource); if (!status.isOK()) { - // Stop oplog fetcher and execute rollback. + // Stop oplog fetcher and execute rollback if necessary. _finishCallback(status, opTimeWithHash); return; } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 9ed1fdbe63a..5d739bfae0c 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -136,6 +136,8 @@ public: NamespaceString nss, ReplSetConfig config, std::size_t maxFetcherRestarts, + int requiredRBID, + bool requireFresherSyncSource, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn); @@ -256,6 +258,15 @@ private: // Maximum number of times to consecutively restart the fetcher on non-cancellation errors. const std::size_t _maxFetcherRestarts; + // Rollback ID that the sync source is required to have after the first batch. + int _requiredRBID; + + // A boolean indicating whether we should error if the sync source is not ahead of our initial + // last fetched OpTime on the first batch. Most of the time this should be set to true, + // but there are certain special cases, namely during initial sync, where it's acceptable for + // our sync source to have no ops newer than _lastFetched. + bool _requireFresherSyncSource; + DataReplicatorExternalState* const _dataReplicatorExternalState; const EnqueueDocumentsFn _enqueueDocumentsFn; const Milliseconds _awaitDataTimeout; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 5419e7d7f13..827cbd86e26 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -91,8 +91,18 @@ protected: * the oplog query and shuts down. * Returns shutdown state. */ - std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response); - std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj); + std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response, + bool requireFresherSyncSource = true); + std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj, + bool requireFresherSyncSource = true); + + /** + * Makes an OplogQueryMetadata object with the given fields and a stale committed OpTime. + */ + BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, + int rbid, + int primaryIndex, + int syncSourceIndex); /** * Tests checkSyncSource result handling. @@ -107,6 +117,9 @@ protected: RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol); OpTimeWithHash lastFetched; + OpTime remoteNewerOpTime; + OpTime staleOpTime; + int rbid; std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState; @@ -135,6 +148,9 @@ void OplogFetcherTest::setUp() { launchExecutorThread(); lastFetched = {456LL, {{123, 0}, 1}}; + remoteNewerOpTime = {{124, 1}, 2}; + staleOpTime = {{1, 1}, 0}; + rbid = 2; dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm(); @@ -174,6 +190,17 @@ RemoteCommandRequest OplogFetcherTest::processNetworkResponse( expectReadyRequestsAfterProcessing); } +BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, + int rbid, + int primaryIndex, + int syncSourceIndex) { + rpc::OplogQueryMetadata oqMetadata( + staleOpTime, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex); + BSONObjBuilder bob; + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + return bob.obj(); +} + HostAndPort source("localhost:12345"); NamespaceString nss("local.oplog.rs"); @@ -200,8 +227,8 @@ ReplSetConfig _createConfig(bool isV1ElectionProtocol) { return config; } -std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch( - RemoteCommandResponse response) { +std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteCommandResponse response, + bool requireFresherSyncSource) { auto shutdownState = stdx::make_unique<ShutdownState>(); OplogFetcher oplogFetcher(&getExecutor(), @@ -210,6 +237,8 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch( nss, _createConfig(true), 0, + rbid, + requireFresherSyncSource, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(*shutdownState)); @@ -229,8 +258,10 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch( return shutdownState; } -std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj) { - return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)}); +std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj, + bool requireFresherSyncSource) { + return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)}, + requireFresherSyncSource); } TEST_F(OplogFetcherTest, InvalidConstruction) { @@ -241,6 +272,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}), @@ -255,6 +288,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), OplogFetcher::EnqueueDocumentsFn(), [](Status, OpTimeWithHash) {}), @@ -269,6 +304,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) { nss, ReplSetConfig(), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}), @@ -283,6 +320,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, OplogFetcher::OnShutdownCallbackFn()), @@ -298,6 +337,8 @@ TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}); @@ -316,6 +357,8 @@ TEST_F(OplogFetcherTest, ShutdownAfterStartupTransitionsToShuttingDownState) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}); @@ -333,6 +376,8 @@ TEST_F(OplogFetcherTest, StartupWhenShuttingDownReturnsShutdownInProgress) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}); @@ -358,6 +403,8 @@ TEST_F( nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -379,6 +426,8 @@ TEST_F( nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -397,6 +446,8 @@ TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersio nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -413,6 +464,8 @@ TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) { nss, _createConfig(false), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -430,6 +483,8 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProt nss, config, 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -444,6 +499,8 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0) nss, _createConfig(false), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) @@ -461,6 +518,8 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarti nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [](Status, OpTimeWithHash) {}); @@ -485,6 +544,8 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) { nss, _createConfig(true), 0, + -1, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(shutdownState)); @@ -562,6 +623,7 @@ TEST_F(OplogFetcherTest, BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); + ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), metadataObj, Milliseconds(0)}) @@ -574,7 +636,7 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -590,6 +652,95 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); } +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid + 1, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(OplogFetcherTest, + MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}, + false) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(OplogFetcherTest, + MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + + auto entry = makeNoopOplogEntry(lastFetched); + auto shutdownState = + processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false); + ASSERT_OK(shutdownState->getStatus()); + ASSERT(dataReplicatorExternalState->metadataWasProcessed); + ASSERT_EQUALS(OpTimeWithHash(entry["h"].numberLong(), + unittest::assertGet(OpTime::parseFromOplogEntry(entry))), + shutdownState->getLastFetched()); +} + TEST_F(OplogFetcherTest, MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); @@ -607,7 +758,7 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -629,64 +780,80 @@ TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) { - ASSERT_EQUALS(ErrorCodes::RemoteOplogStale, +TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch(makeCursorResponse(0, {}))->getStatus()); } -TEST_F(OplogFetcherTest, - MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch(makeCursorResponse(0, {BSONObj()}))->getStatus()); +TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + ASSERT_EQUALS( + ErrorCodes::InvalidBSON, + processSingleBatch({makeCursorResponse(0, {BSONObj()}), metadataObj, Milliseconds(0)}) + ->getStatus()); } TEST_F( OplogFetcherTest, LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch( - makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)})) + {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj, + Milliseconds(0)}) ->getStatus()); } TEST_F(OplogFetcherTest, LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); ASSERT_EQUALS( ErrorCodes::OplogStartMissing, processSingleBatch( - makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)})) + {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)}), + metadataObj, + Milliseconds(0)}) ->getStatus()); } TEST_F(OplogFetcherTest, MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { - ASSERT_EQUALS( - ErrorCodes::NoSuchKey, - processSingleBatch(makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - BSON("o" << BSON("msg" - << "oplog entry without optime"))})) - ->getStatus()); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + processSingleBatch( + {makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + BSON("o" << BSON("msg" + << "oplog entry without optime"))}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); } TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, - processSingleBatch(makeCursorResponse(0, - {makeNoopOplogEntry(lastFetched), - makeNoopOplogEntry(Seconds(1000), 1), - makeNoopOplogEntry(Seconds(2000), 1), - makeNoopOplogEntry(Seconds(1500), 1)})) + processSingleBatch({makeCursorResponse(0, + {makeNoopOplogEntry(lastFetched), + makeNoopOplogEntry(Seconds(1000), 1), + makeNoopOplogEntry(Seconds(2000), 1), + makeNoopOplogEntry(Seconds(1500), 1)}), + metadataObj, + Milliseconds(0)}) ->getStatus()); } TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - auto shutdownState = processSingleBatch( - {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)}); + auto shutdownState = + processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); @@ -713,6 +880,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE } TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); @@ -724,8 +893,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { return Status(ErrorCodes::InternalError, "my custom error"); }; - auto shutdownState = processSingleBatch( - {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)}); + auto shutdownState = + processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)}); ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error")); } @@ -790,7 +959,8 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFe TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { rpc::ReplSetMetadata replMetadata( lastFetched.opTime.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata({{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, 2); + rpc::OplogQueryMetadata oqMetadata( + {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, 2); testSyncSourceChecking(&replMetadata, &oqMetadata); @@ -828,7 +998,7 @@ TEST_F(OplogFetcherTest, 2, 2); rpc::OplogQueryMetadata oqMetadata( - {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, -1); + {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, -1); testSyncSourceChecking(&replMetadata, &oqMetadata); @@ -851,6 +1021,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro nss, _createConfig(isV1ElectionProtocol), 0, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(shutdownState)); @@ -862,7 +1034,11 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); - processNetworkResponse(makeCursorResponse(cursorId, {firstEntry, secondEntry}), true); + + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + processNetworkResponse( + {makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj, Milliseconds(0)}, + true); ASSERT_EQUALS(1U, lastEnqueuedDocuments.size()); ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); @@ -1084,6 +1260,8 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo nss, _createConfig(true), maxFetcherRestarts, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(*shutdownState)); @@ -1091,9 +1269,13 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo ASSERT_OK(oplogFetcher.startup()); + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); + // Send first batch from FIND. _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + ops[0], + processNetworkResponse( + {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true)); // Send error during GETMORE. processNetworkResponse({ErrorCodes::CursorNotFound, "blah"}, true); @@ -1101,7 +1283,10 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo // Send first batch from FIND, and Check that it started from the end of the last FIND response. // Check that the optimes match for the query and last oplog entry. _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse(makeCursorResponse(0, {ops[2], ops[3], ops[4]}), false)); + ops[2], + processNetworkResponse( + {makeCursorResponse(0, {ops[2], ops[3], ops[4]}), metadataObj, Milliseconds(0)}, + false)); // Done. oplogFetcher.join(); @@ -1119,6 +1304,8 @@ TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReach nss, _createConfig(true), maxFetcherRestarts, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(*shutdownState)); @@ -1127,8 +1314,13 @@ TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReach ASSERT_OK(oplogFetcher.startup()); unittest::log() << "processing find request from first fetcher"; + + + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + ops[0], + processNetworkResponse( + {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true)); unittest::log() << "sending error response to getMore request from first fetcher"; assertRemoteCommandNameEquals( @@ -1157,6 +1349,8 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp nss, _createConfig(true), maxFetcherRestarts, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(*shutdownState)); @@ -1165,8 +1359,13 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp ASSERT_OK(oplogFetcher.startup()); unittest::log() << "processing find request from first fetcher"; + + + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + ops[0], + processNetworkResponse( + {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true)); unittest::log() << "sending error response to getMore request from first fetcher"; assertRemoteCommandNameEquals( @@ -1174,7 +1373,9 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp unittest::log() << "processing find request from second fetcher"; _assertFindCommandTimestampEquals( - ops[2], processNetworkResponse(makeCursorResponse(1, {ops[2], ops[3], ops[4]}), true)); + ops[2], + processNetworkResponse( + {makeCursorResponse(1, {ops[2], ops[3], ops[4]}), metadataObj, Milliseconds(0)}, true)); unittest::log() << "sending error response to getMore request from second fetcher"; assertRemoteCommandNameEquals( @@ -1228,6 +1429,8 @@ TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToS nss, _createConfig(true), maxFetcherRestarts, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, stdx::ref(*shutdownState)); @@ -1237,8 +1440,12 @@ TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToS ASSERT_TRUE(oplogFetcher.isActive()); unittest::log() << "processing find request from first fetcher"; + + auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); _assertFindCommandTimestampEquals( - ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true)); + ops[0], + processNetworkResponse( + {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true)); unittest::log() << "sending error response to getMore request from first fetcher"; shouldFailSchedule = true; @@ -1274,6 +1481,8 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFunctionOnCompletio nss, _createConfig(true), 0, + rbid, + true, dataReplicatorExternalState.get(), enqueueDocumentsFn, [&callbackInvoked, sharedCallbackData, &status]( diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp index 4c2966acb96..589ba0e6444 100644 --- a/src/mongo/db/repl/rollback_checker.cpp +++ b/src/mongo/db/repl/rollback_checker.cpp @@ -119,7 +119,7 @@ Status RollbackChecker::reset_sync() { return resetStatus; } -int RollbackChecker::getBaseRBID_forTest() { +int RollbackChecker::getBaseRBID() { stdx::lock_guard<stdx::mutex> lk(_mutex); return _baseRBID; } diff --git a/src/mongo/db/repl/rollback_checker.h b/src/mongo/db/repl/rollback_checker.h index d1c6ea5bc5c..17d8d8d0d08 100644 --- a/src/mongo/db/repl/rollback_checker.h +++ b/src/mongo/db/repl/rollback_checker.h @@ -92,10 +92,10 @@ public: // Synchronously calls reset and returns the Status of the command. Status reset_sync(); - // ================== Test Support API =================== - // Returns the current baseline rbid. - int getBaseRBID_forTest(); + int getBaseRBID(); + + // ================== Test Support API =================== // Returns the last rbid seen. int getLastRBID_forTest(); diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp index 717eaaf6c7e..9b29fde9874 100644 --- a/src/mongo/db/repl/rollback_checker_test.cpp +++ b/src/mongo/db/repl/rollback_checker_test.cpp @@ -108,7 +108,7 @@ TEST_F(RollbackCheckerTest, reset) { getNet()->exitNetwork(); getReplExecutor().wait(cbh); - ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); } TEST_F(RollbackCheckerTest, RollbackRBID) { @@ -124,7 +124,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) { getNet()->scheduleSuccessfulResponse(commandResponse); getNet()->runReadyNetworkOperations(); getReplExecutor().wait(refreshCBH); - ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); { LockGuard lk(_mutex); ASSERT_TRUE(_hasCalledCallback); @@ -143,7 +143,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) { getReplExecutor().wait(rbCBH); ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4); - ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); LockGuard lk(_mutex); ASSERT_TRUE(_hasCalledCallback); ASSERT_TRUE(unittest::assertGet(_hasRolledBackResult)); @@ -162,7 +162,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) { getNet()->scheduleSuccessfulResponse(commandResponse); getNet()->runReadyNetworkOperations(); getReplExecutor().wait(refreshCBH); - ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); { LockGuard lk(_mutex); ASSERT_TRUE(_hasCalledCallback); @@ -181,7 +181,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) { getReplExecutor().wait(rbCBH); ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3); - ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3); LockGuard lk(_mutex); ASSERT_TRUE(_hasCalledCallback); ASSERT_FALSE(unittest::assertGet(_hasRolledBackResult)); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 2653c08812e..88fbf63f036 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -790,7 +790,7 @@ void syncFixUp(OperationContext* opCtx, Status _syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - boost::optional<int> requiredRBID, + int requiredRBID, ReplicationCoordinator* replCoord, StorageInterface* storageInterface) { invariant(!opCtx->lockState()->isLocked()); @@ -798,11 +798,8 @@ Status _syncRollback(OperationContext* opCtx, FixUpInfo how; log() << "rollback 1"; how.rbid = rollbackSource.getRollbackId(); - if (requiredRBID) { - uassert(40362, - "Upstream node rolled back. Need to retry our rollback.", - how.rbid == *requiredRBID); - } + uassert( + 40362, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID); log() << "rollback 2 FindCommonPoint"; try { @@ -861,7 +858,7 @@ Status _syncRollback(OperationContext* opCtx, Status syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - boost::optional<int> requiredRBID, + int requiredRBID, ReplicationCoordinator* replCoord, StorageInterface* storageInterface) { invariant(opCtx); @@ -881,7 +878,7 @@ Status syncRollback(OperationContext* opCtx, void rollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - boost::optional<int> requiredRBID, + int requiredRBID, ReplicationCoordinator* replCoord, StorageInterface* storageInterface, stdx::function<void(int)> sleepSecsFn) { diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index 9234cb4c063..e305c076e44 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -61,7 +61,7 @@ class StorageInterface; void rollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - boost::optional<int> requiredRBID, + int requiredRBID, ReplicationCoordinator* replCoord, StorageInterface* storageInterface, stdx::function<void(int)> sleepSecsFn = [](int secs) { sleepsecs(secs); }); @@ -85,6 +85,7 @@ void rollback(OperationContext* opCtx, * @param rollbackSource interface for sync source: * provides oplog; and * supports fetching documents and copying collections. + * @param requiredRBID Rollback ID we are required to have throughout rollback. * @param replCoord Used to track the rollback ID and to change the follower state * @param storageInterface Used to update minValid. * @@ -98,7 +99,7 @@ void rollback(OperationContext* opCtx, Status syncRollback(OperationContext* opCtx, const OplogInterface& localOplog, const RollbackSource& rollbackSource, - boost::optional<int> requiredRBID, + int requiredRBID, ReplicationCoordinator* replCoord, StorageInterface* storageInterface); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 2e8c199c277..8d8919a1ae1 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -247,7 +247,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) { OplogInterfaceMock({operation}), RollbackSourceLocal(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), - {1}, + 1, _coordinator, &_storageInterface), UserException, @@ -796,7 +796,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommandFailsIfRBIDChangesWhileSynci ASSERT_THROWS_CODE(syncRollback(_opCtx.get(), OplogInterfaceMock({dropCollectionOperation, commonOperation}), rollbackSource, - {0}, + 0, _coordinator, &_storageInterface), DBException, diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index 33818e14c50..948a471b245 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -311,12 +311,7 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } - if (!_requiredOpTime.isNull()) { - _scheduleRBIDRequest(candidate, earliestOpTimeSeen); - return; - } - - _finishCallback(candidate); + _scheduleRBIDRequest(candidate, earliestOpTimeSeen); } void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { @@ -362,12 +357,16 @@ void SyncSourceResolver::_rbidRequestCallback( return; } - // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. - // Unittest requires that this kind of failure be handled specially. - auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); - if (!status.isOK()) { - _finishCallback(status); + if (!_requiredOpTime.isNull()) { + // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. + // Unittest requires that this kind of failure be handled specially. + auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + if (!status.isOK()) { + _finishCallback(status); + } + return; } + _finishCallback(candidate); } Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index 48f7e968a8d..201658caacf 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -70,10 +70,10 @@ struct SyncSourceResolverResponse { // Contains the new MinValid boundry if syncSourceStatus is ErrorCodes::OplogStartMissing. OpTime earliestOpTimeSeen; - // Rollback ID of the selected sync source. Only filled in when there is a required optime. + // Rollback ID of the selected sync source. // The rbid is fetched before the required optime so callers can be sure that as long as the // rbid is the same, the required optime is still present. - boost::optional<int> rbid; + int rbid; bool isOK() { return syncSourceStatus.isOK(); @@ -230,7 +230,7 @@ private: const OnCompletionFn _onCompletion; // The rbid we will return to our caller. - boost::optional<int> _rbid; + int _rbid; // Protects members of this sync source resolver defined below. mutable stdx::mutex _mutex; diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index fb27a2cc681..bab9d57bb89 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -303,6 +303,19 @@ void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net net, selector, currentSyncSource, nextSyncSource, {BSON("ts" << ts << "t" << 0)}); } +void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net, + HostAndPort currentSyncSource, + const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleSuccessfulResponse(reply); + ASSERT_EQUALS(currentSyncSource, request.target); + ASSERT_EQUALS("admin", request.dbname); + ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); + ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj); + net->runReadyNetworkOperations(); +} + TEST_F(SyncSourceResolverTest, SyncSourceResolverReturnsStatusOkAndTheFoundHostWhenAnEligibleSyncSourceExists) { HostAndPort candidate1("node1", 12345); @@ -312,6 +325,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate1); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -332,6 +346,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate1); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -382,6 +397,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -447,6 +463,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -512,6 +529,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -536,6 +554,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -561,6 +580,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -614,19 +634,6 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())}); } -void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net, - HostAndPort currentSyncSource, - const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) { - executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); - ASSERT_TRUE(net->hasReadyRequests()); - auto request = net->scheduleSuccessfulResponse(reply); - ASSERT_EQUALS(currentSyncSource, request.target); - ASSERT_EQUALS("admin", request.dbname); - ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); - ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj); - net->runReadyNetworkOperations(); -} - const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); TEST_F( @@ -651,7 +658,7 @@ TEST_F( ASSERT_FALSE(_resolver->isActive()); ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); ASSERT(_response.rbid); - ASSERT_EQ(*_response.rbid, 7); + ASSERT_EQ(_response.rbid, 7); } TEST_F(SyncSourceResolverTest, |