diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-11-15 15:24:22 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-01-03 16:02:19 -0500 |
commit | 0b76764eac7651ddba4c82c504aa7e8d785087c2 (patch) | |
tree | f90fce58d2781a48afaee696ee3fb9e6f8fefedc /src/mongo | |
parent | 506c8af1269c76fcd730e121e37b82a18347ac70 (diff) | |
download | mongo-0b76764eac7651ddba4c82c504aa7e8d785087c2.tar.gz |
SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 101 |
13 files changed, 331 insertions, 73 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d5fdd001921..84676fd60c8 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -50,6 +50,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" @@ -287,39 +288,32 @@ void BackgroundSync::_produce(OperationContext* txn) { OpTime lastOpTimeFetched; HostAndPort source; SyncSourceResolverResponse syncSourceResp; - SyncSourceResolver* syncSourceResolver; - OpTime minValid; - OpTime minValidSaved; - if (_replCoord->getMemberState().recovering()) { - minValidSaved = StorageInterface::get(txn)->getMinValid(txn); - } { + const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn); + stdx::lock_guard<stdx::mutex> lock(_mutex); - if (minValidSaved > _lastOpTimeFetched) { - minValid = minValidSaved; - } + const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime(); lastOpTimeFetched = _lastOpTimeFetched; _syncSourceHost = HostAndPort(); _syncSourceResolver = stdx::make_unique<SyncSourceResolver>( _replicationCoordinatorExternalState->getTaskExecutor(), _replCoord, lastOpTimeFetched, - minValid, + requiredOpTime, [&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; }); - syncSourceResolver = _syncSourceResolver.get(); } // This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls // ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to // acquired before BackgroundSync's. // It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because - // we do not destroy this instance outside of this function. + // we do not destroy this instance outside of this function which is only called from a single + // thread. auto status = _syncSourceResolver->startup(); if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) { return; } fassertStatusOK(40349, status); - syncSourceResolver->join(); - syncSourceResolver = nullptr; + _syncSourceResolver->join(); { stdx::lock_guard<stdx::mutex> lock(_mutex); _syncSourceResolver.reset(); @@ -388,6 +382,7 @@ void BackgroundSync::_produce(OperationContext* txn) { Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); + auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this. OplogFetcher* oplogFetcher; try { auto executor = _replicationCoordinatorExternalState->getTaskExecutor(); @@ -410,7 +405,8 @@ void BackgroundSync::_produce(OperationContext* txn) { this, stdx::placeholders::_1, stdx::placeholders::_2, - stdx::placeholders::_3), + stdx::placeholders::_3, + &rbidCopyForFetcher), onOplogFetcherShutdownCallbackFn); oplogFetcher = _oplogFetcher.get(); } catch (const mongo::DBException& ex) { @@ -484,20 +480,8 @@ void BackgroundSync::_produce(OperationContext* txn) { } } } - // check that we are at minvalid, otherwise we cannot roll back as we may be in an - // inconsistent state - const auto minValid = StorageInterface::get(txn)->getMinValid(txn); - if (lastApplied < minValid) { - fassertNoTrace(18750, - Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() << "need to rollback, but in inconsistent state. " - << "minvalid: " - << minValid.toString() - << " > our last optime: " - << lastApplied.toString())); - } - _rollback(txn, source, getConnection); + _rollback(txn, source, syncSourceResp.rbid, getConnection); stop(); } else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) { Seconds blacklistDuration(60); @@ -510,14 +494,57 @@ void BackgroundSync::_produce(OperationContext* txn) { } } -void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info) { +Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + boost::optional<int>* requiredRBID) { + // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back + // since that could cause it to not have our required minValid point. The cursor will be killed + // if the upstream node rolls back so we don't need to keep checking. This must be blocking + // since the Fetcher doesn't give us a way to defer sending the getmores after we return. + if (*requiredRBID) { + auto rbidStatus = Status(ErrorCodes::InternalError, ""); + auto handle = + _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand( + {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr}, + [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + rbidStatus = rbidReply.response.status; + if (!rbidStatus.isOK()) + return; + + rbidStatus = getStatusFromCommandResult(rbidReply.response.data); + if (!rbidStatus.isOK()) + return; + + const auto rbidElem = rbidReply.response.data["rbid"]; + if (rbidElem.type() != NumberInt) { + rbidStatus = Status(ErrorCodes::BadValue, + str::stream() << "Upstream node returned an " + << "rbid with invalid type " + << rbidElem.type()); + return; + } + if (rbidElem.Int() != **requiredRBID) { + rbidStatus = Status(ErrorCodes::BadValue, + "Upstream node rolled back after verifying " + "that it had our MinValid point. Retrying."); + } + }); + if (!handle.isOK()) + return handle.getStatus(); + + _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue()); + if (!rbidStatus.isOK()) + return rbidStatus; + + requiredRBID->reset(); // Don't come back to this block while on this cursor. + } + // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. if (info.toApplyDocumentCount == 0) { - return; // Nothing to do. + return Status::OK(); // Nothing to do. } auto txn = cc().makeOperationContext(); @@ -532,7 +559,7 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, // buffer. stdx::unique_lock<stdx::mutex> lock(_mutex); if (_inShutdown) { - return; + return Status::OK(); } OCCASIONALLY { @@ -561,6 +588,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, // The inference here is basically if the batch is really small, we are "caught up". sleepmillis(kSleepToAllowBatchingMillis); } + + return Status::OK(); } bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) { @@ -589,26 +618,16 @@ void BackgroundSync::consume(OperationContext* txn) { void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, + boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { // Abort only when syncRollback detects we are in a unrecoverable state. // In other cases, we log the message contained in the error status and retry later. auto status = syncRollback(txn, OplogInterfaceLocal(txn, rsOplogName), RollbackSourceImpl(getConnection, source, rsOplogName), + requiredRBID, _replCoord); if (status.isOK()) { - // When the syncTail thread sees there is no new data by adding something to the buffer. - _signalNoNewDataForApplier(txn); - // Wait until the buffer is empty. - // This is an indication that syncTail has removed the sentinal marker from the buffer - // and reset its local lastAppliedOpTime via the replCoord. - while (!_oplogBuffer->isEmpty()) { - sleepmillis(10); - if (inShutdown()) { - return; - } - } - // At this point we are about to leave rollback. Before we do, wait for any writes done // as part of rollback to be durable, and then do any necessary checks that we didn't // wind up rolling back something illegal. We must wait for the rollback to be durable diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 312acb15bc7..8177509e049 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -147,10 +147,13 @@ private: /** * Checks current background sync state before pushing operations into blocking queue and * updating metrics. If the queue is full, might block. + * + * requiredRBID is reset to empty after the first call. */ - void _enqueueDocuments(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info); + Status _enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + boost::optional<int>* requiredRBID); /** * Executes a rollback. @@ -158,6 +161,7 @@ private: */ void _rollback(OperationContext* txn, const HostAndPort& source, + boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection); // restart syncing @@ -195,11 +199,11 @@ private: HostAndPort _syncSourceHost; // Current sync source resolver validating sync source candidates. - // Owned by us. + // Pointer may be read on any thread that locks _mutex or unlocked on the BGSync thread. It can + // only be written to by the BGSync thread while holding _mutex. std::unique_ptr<SyncSourceResolver> _syncSourceResolver; // Current oplog fetcher tailing the oplog on the sync source. - // Owned by us. std::unique_ptr<OplogFetcher> _oplogFetcher; }; diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index ad5a157b10f..0fb71dacd75 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1467,15 +1467,15 @@ StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() { return syncSource; } -void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info) { +Status DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) { if (info.toApplyDocumentCount == 0) { - return; + return Status::OK(); } if (_isShuttingDown()) { - return; + return Status::OK(); } invariant(_oplogBuffer); @@ -1494,6 +1494,7 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, _lastFetched = info.lastDocument; // TODO: updates metrics with "info". + return Status::OK(); } DataReplicator::OnCompletionGuard::OnCompletionGuard( diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index adf3098bd87..4ee9c3082e9 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -510,10 +510,13 @@ private: /** * Pushes documents from oplog fetcher to blocking queue for * applier to consume. + * + * Returns a status even though it always returns OK, to conform the interface OplogFetcher + * expects for the EnqueueDocumentsFn. */ - void _enqueueDocuments(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info); + Status _enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info); BSONObj _getInitialSyncProgress_inlock() const; diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 2893a9099d6..7e31eb598d5 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -495,7 +495,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis)); // TODO: back pressure handling will be added in SERVER-23499. - _enqueueDocumentsFn(firstDocToApply, documents.cend(), info); + auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info); + if (!status.isOK()) { + _finishCallback(status); + return; + } // Update last fetched info. if (firstDocToApply != documents.cend()) { diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 3d44a50fb67..998df06a07e 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -108,9 +108,9 @@ public: * Additional information on the operations is provided in a DocumentsInfo * struct. */ - using EnqueueDocumentsFn = stdx::function<void(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const DocumentsInfo& info)>; + using EnqueueDocumentsFn = stdx::function<Status(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const DocumentsInfo& info)>; /** * Validates documents in current batch of results returned from tailing the remote oplog. diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 0c3e41e575f..f3ea0760d35 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -140,9 +140,10 @@ void OplogFetcherTest::setUp() { enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info) { + const OplogFetcher::DocumentsInfo& info) -> Status { lastEnqueuedDocuments = {begin, end}; lastEnqueuedDocumentsInfo = info; + return Status::OK(); }; } @@ -652,6 +653,23 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE shutdownState->getLastFetched()); } +TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; + + enqueueDocumentsFn = [](Fetcher::Documents::const_iterator, + Fetcher::Documents::const_iterator, + const OplogFetcher::DocumentsInfo&) -> Status { + return Status(ErrorCodes::InternalError, "my custom error"); + }; + + auto shutdownState = processSingleBatch( + {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)}); + ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error")); +} + void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index ecca00a539b..7cb229e064f 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -828,6 +828,7 @@ void syncFixUp(OperationContext* txn, Status _syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord, const SleepSecondsFn& sleepSecondsFn) { invariant(!txn->lockState()->isLocked()); @@ -854,6 +855,14 @@ Status _syncRollback(OperationContext* txn, FixUpInfo how; log() << "rollback 1"; how.rbid = rollbackSource.getRollbackId(); + if (requiredRBID && how.rbid != requiredRBID) { + log() << "Upstream node rolled back. Need to retry our rollback."; + // Currently the transitive callers of this function require that we return Status::OK() for + // all recoverable errors such as this. Even though we aren't able to rollback we are still + // "OK" because the system is still in a consistent state. + return Status::OK(); + } + { log() << "rollback 2 FindCommonPoint"; try { @@ -928,6 +937,7 @@ Status _syncRollback(OperationContext* txn, Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord, const SleepSecondsFn& sleepSecondsFn) { invariant(txn); @@ -937,7 +947,8 @@ Status syncRollback(OperationContext* txn, DisableDocumentValidation validationDisabler(txn); txn->setReplicatedWrites(false); - Status status = _syncRollback(txn, localOplog, rollbackSource, replCoord, sleepSecondsFn); + Status status = + _syncRollback(txn, localOplog, rollbackSource, requiredRBID, replCoord, sleepSecondsFn); log() << "rollback finished" << rsLog; return status; @@ -946,10 +957,12 @@ Status syncRollback(OperationContext* txn, Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord) { - return syncRollback(txn, localOplog, rollbackSource, replCoord, [](Seconds seconds) { - sleepsecs(durationCount<Seconds>(seconds)); - }); + return syncRollback( + txn, localOplog, rollbackSource, requiredRBID, replCoord, [](Seconds seconds) { + sleepsecs(durationCount<Seconds>(seconds)); + }); } } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index c88e9fd27c0..2c54ed6ba73 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -77,12 +77,14 @@ using SleepSecondsFn = stdx::function<void(Seconds)>; Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord, const SleepSecondsFn& sleepSecondsFn); Status syncRollback(OperationContext* txn, const OplogInterface& localOplog, const RollbackSource& rollbackSource, + boost::optional<int> requiredRBID, ReplicationCoordinator* replCoord); } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index edcf8668c16..852ce117712 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -173,6 +173,7 @@ TEST_F(RSRollbackTest, InconsistentMinValid) { OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), + {}, _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); @@ -199,6 +200,7 @@ TEST_F(RSRollbackTest, SetFollowerModeFailed) { OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), + {}, _coordinator, noSleep) .code()); @@ -215,6 +217,7 @@ TEST_F(RSRollbackTest, OplogStartMissing) { RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ operation, }))), + {}, _coordinator, noSleep) .code()); @@ -228,6 +231,7 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) { OplogInterfaceMock({operation}), RollbackSourceMock(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), + {}, _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); @@ -250,6 +254,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { OplogInterfaceMock({operation}), RollbackSourceLocal(std::unique_ptr<OplogInterface>( new OplogInterfaceMock(kEmptyMockOperations))), + {}, _coordinator, noSleep), UserException, @@ -267,6 +272,7 @@ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ operation, }))), + {}, _coordinator, noSleep)); } @@ -335,6 +341,7 @@ int _testRollbackDelete(OperationContext* txn, ASSERT_OK(syncRollback(txn, OplogInterfaceMock({deleteOperation, commonOperation}), rollbackSource, + {}, coordinator, noSleep)); ASSERT_TRUE(rollbackSource.called); @@ -410,6 +417,7 @@ TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep); stopCapturingLogMessages(); @@ -474,6 +482,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) { _txn.get(), OplogInterfaceMock({insertDocumentOperation, insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); stopCapturingLogMessages(); @@ -534,6 +543,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); stopCapturingLogMessages(); @@ -582,6 +592,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingNamespace) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep); stopCapturingLogMessages(); @@ -629,6 +640,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandInvalidNamespace) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep); stopCapturingLogMessages(); @@ -674,6 +686,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { auto status = syncRollback(_txn.get(), OplogInterfaceMock({insertDocumentOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep); stopCapturingLogMessages(); @@ -710,6 +723,7 @@ TEST_F(RSRollbackTest, RollbackUnknownCommand) { RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))), + {}, _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); @@ -746,6 +760,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({dropCollectionOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); ASSERT_TRUE(rollbackSource.called); @@ -868,6 +883,7 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({applyOpsOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); ASSERT_EQUALS(4U, rollbackSource.searchedIds.size()); @@ -908,6 +924,7 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({createCollectionOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); { @@ -951,6 +968,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { ASSERT_OK(syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep)); stopCapturingLogMessages(); @@ -992,6 +1010,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt syncRollback(_txn.get(), OplogInterfaceMock({collectionModificationOperation, commonOperation}), rollbackSource, + {}, _coordinator, noSleep); ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index 881206a7d5e..33818e14c50 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/sync_source_selector.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" @@ -129,6 +130,9 @@ void SyncSourceResolver::shutdown() { if (_fetcher) { _fetcher->shutdown(); } + if (_rbidCommandHandle) { + _taskExecutor->cancel(_rbidCommandHandle); + } } void SyncSourceResolver::join() { @@ -196,6 +200,7 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { stdx::lock_guard<stdx::mutex> lk(_mutex); + // TODO SERVER-27499 need to check if _state is kShuttingDown inside the mutex. // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task // executor. auto status = fetcher->schedule(); @@ -306,18 +311,65 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } - // Schedules fetcher to look for '_requiredOpTime' in the remote oplog. if (!_requiredOpTime.isNull()) { - auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); - if (!status.isOK()) { - _finishCallback(status); - } + _scheduleRBIDRequest(candidate, earliestOpTimeSeen); return; } _finishCallback(candidate); } +void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { + auto handle = _taskExecutor->scheduleRemoteCommand( + {candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout}, + stdx::bind(&SyncSourceResolver::_rbidRequestCallback, + this, + candidate, + earliestOpTimeSeen, + stdx::placeholders::_1)); + + if (!handle.isOK()) { + _finishCallback(handle.getStatus()); + return; + } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + _rbidCommandHandle = std::move(handle.getValue()); + if (_state == State::kShuttingDown) { + _taskExecutor->cancel(_rbidCommandHandle); + } +} + +void SyncSourceResolver::_rbidRequestCallback( + HostAndPort candidate, + OpTime earliestOpTimeSeen, + const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + if (rbidReply.response.status == ErrorCodes::CallbackCanceled) { + _finishCallback(rbidReply.response.status); + return; + } + + try { + uassertStatusOK(rbidReply.response.status); + uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data)); + _rbid = rbidReply.response.data["rbid"].Int(); + } catch (const DBException& ex) { + const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; + log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for " + << kFetcherErrorBlacklistDuration << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. + // Unittest requires that this kind of failure be handled specially. + auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + if (!status.isOK()) { + _finishCallback(status); + } +} + Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( const Fetcher::QueryResponse& queryResponse) { if (queryResponse.documents.empty()) { @@ -425,6 +477,10 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { SyncSourceResolverResponse response; response.syncSourceStatus = std::move(result); + if (response.isOK() && !response.getSyncSource().empty()) { + invariant(_requiredOpTime.isNull() || _rbid); + response.rbid = _rbid; + } return _finishCallback(response); } diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index 099b56994e2..48f7e968a8d 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -70,6 +70,11 @@ struct SyncSourceResolverResponse { // Contains the new MinValid boundry if syncSourceStatus is ErrorCodes::OplogStartMissing. OpTime earliestOpTimeSeen; + // Rollback ID of the selected sync source. Only filled in when there is a required optime. + // The rbid is fetched before the required optime so callers can be sure that as long as the + // rbid is the same, the required optime is still present. + boost::optional<int> rbid; + bool isOK() { return syncSourceStatus.isOK(); } @@ -172,6 +177,14 @@ private: OpTime earliestOpTimeSeen); /** + * Schedules a replSetGetRBID command against the candidate to fetch its current rollback id. + */ + void _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen); + void _rbidRequestCallback(HostAndPort candidate, + OpTime earliestOpTimeSeen, + const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply); + + /** * Checks query response for required optime. */ Status _compareRequiredOpTimeWithQueryResponse(const Fetcher::QueryResponse& queryResponse); @@ -216,7 +229,10 @@ private: // resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes. const OnCompletionFn _onCompletion; - // Protects members of this sync source resolver. + // The rbid we will return to our caller. + boost::optional<int> _rbid; + + // Protects members of this sync source resolver defined below. mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; @@ -233,6 +249,8 @@ private: // Holds reference to fetcher in the process of shutting down. std::unique_ptr<Fetcher> _shuttingDownFetcher; + + executor::TaskExecutor::CallbackHandle _rbidCommandHandle; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 82ae8ab528c..fb27a2cc681 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -614,6 +614,19 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())}); } +void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net, + HostAndPort currentSyncSource, + const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleSuccessfulResponse(reply); + ASSERT_EQUALS(currentSyncSource, request.target); + ASSERT_EQUALS("admin", request.dbname); + ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); + ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj); + net->runReadyNetworkOperations(); +} + const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); TEST_F( @@ -631,11 +644,14 @@ TEST_F( ASSERT_TRUE(_resolver->isActive()); + _scheduleRBIDResponse(getNet(), candidate1, BSON("ok" << 1 << "rbid" << 7)); _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); + ASSERT(_response.rbid); + ASSERT_EQ(*_response.rbid, 7); } TEST_F(SyncSourceResolverTest, @@ -653,6 +669,7 @@ TEST_F(SyncSourceResolverTest, ASSERT_TRUE(_resolver->isActive()); + _scheduleRBIDResponse(getNet(), candidate1); _scheduleRequiredOpTimeFetcherResponse( getNet(), _selector.get(), @@ -667,6 +684,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate2); _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); _resolver->join(); @@ -692,6 +710,7 @@ TEST_F( ASSERT_TRUE(_resolver->isActive()); + _scheduleRBIDResponse(getNet(), candidate1); _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); ASSERT_TRUE(_resolver->isActive()); @@ -701,6 +720,7 @@ TEST_F( _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate2); _scheduleRequiredOpTimeFetcherResponse( getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm); @@ -724,6 +744,7 @@ TEST_F(SyncSourceResolverTest, ASSERT_TRUE(_resolver->isActive()); + _scheduleRBIDResponse(getNet(), candidate1); _scheduleRequiredOpTimeFetcherResponse( getNet(), _selector.get(), candidate1, requiredOpTime, {}); @@ -734,6 +755,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate2); _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); _resolver->join(); @@ -756,6 +778,7 @@ TEST_F(SyncSourceResolverTest, ASSERT_TRUE(_resolver->isActive()); + _scheduleRBIDResponse(getNet(), candidate1); _scheduleRequiredOpTimeFetcherResponse( getNet(), _selector.get(), @@ -770,6 +793,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate2); _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); _resolver->join(); @@ -778,6 +802,27 @@ TEST_F(SyncSourceResolverTest, } TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsScheduleErrorWhenSchedulingRBIDCommandFails) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + _shouldFailRequest = [](const executor::RemoteCommandRequest& request) { + return request.cmdObj.firstElementFieldName() == "replSetGetRBID"_sd; + }; + + HostAndPort candidate1("node1", 12345); + _selector->setChooseNewSyncSourceResult_forTest(candidate1); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) { _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); @@ -792,6 +837,7 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate1); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -843,6 +889,60 @@ TEST_F( ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); } +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRBIDCommand) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->setChooseNewSyncSourceResult_forTest(candidate1); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0)); + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort()); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest()); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->getLastBlacklistExpiration_forTest()); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterRBIDCommandNotOk) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->setChooseNewSyncSourceResult_forTest(candidate1); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0)); + + // _scheduleNetworkErrorForFirstNode does this for us. + _selector->setChooseNewSyncSourceResult_forTest(HostAndPort()); + + _scheduleRBIDResponse(getNet(), + candidate1, + BSON("ok" << 0 << "code" << 9000 << "errmsg" + << "I'm sorry Dave, I'm afraid I can't do that.")); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest()); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->getLastBlacklistExpiration_forTest()); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); +} + TEST_F( SyncSourceResolverTest, SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) { @@ -856,6 +956,7 @@ TEST_F( _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0)); + _scheduleRBIDResponse(getNet(), candidate1); _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort()); ASSERT_FALSE(_resolver->isActive()); |