diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-11-15 15:24:22 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-01-03 16:02:19 -0500 |
commit | 0b76764eac7651ddba4c82c504aa7e8d785087c2 (patch) | |
tree | f90fce58d2781a48afaee696ee3fb9e6f8fefedc /src/mongo/db/repl/bgsync.cpp | |
parent | 506c8af1269c76fcd730e121e37b82a18347ac70 (diff) | |
download | mongo-0b76764eac7651ddba4c82c504aa7e8d785087c2.tar.gz |
SERVER-27050 Ensure upstream node doesn't roll back after checking MinValid
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 109 |
1 files changed, 64 insertions, 45 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d5fdd001921..84676fd60c8 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -50,6 +50,7 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" @@ -287,39 +288,32 @@ void BackgroundSync::_produce(OperationContext* txn) { OpTime lastOpTimeFetched; HostAndPort source; SyncSourceResolverResponse syncSourceResp; - SyncSourceResolver* syncSourceResolver; - OpTime minValid; - OpTime minValidSaved; - if (_replCoord->getMemberState().recovering()) { - minValidSaved = StorageInterface::get(txn)->getMinValid(txn); - } { + const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn); + stdx::lock_guard<stdx::mutex> lock(_mutex); - if (minValidSaved > _lastOpTimeFetched) { - minValid = minValidSaved; - } + const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime(); lastOpTimeFetched = _lastOpTimeFetched; _syncSourceHost = HostAndPort(); _syncSourceResolver = stdx::make_unique<SyncSourceResolver>( _replicationCoordinatorExternalState->getTaskExecutor(), _replCoord, lastOpTimeFetched, - minValid, + requiredOpTime, [&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; }); - syncSourceResolver = _syncSourceResolver.get(); } // This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls // ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to // acquired before BackgroundSync's. // It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because - // we do not destroy this instance outside of this function. + // we do not destroy this instance outside of this function which is only called from a single + // thread. auto status = _syncSourceResolver->startup(); if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) { return; } fassertStatusOK(40349, status); - syncSourceResolver->join(); - syncSourceResolver = nullptr; + _syncSourceResolver->join(); { stdx::lock_guard<stdx::mutex> lock(_mutex); _syncSourceResolver.reset(); @@ -388,6 +382,7 @@ void BackgroundSync::_produce(OperationContext* txn) { Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); + auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this. OplogFetcher* oplogFetcher; try { auto executor = _replicationCoordinatorExternalState->getTaskExecutor(); @@ -410,7 +405,8 @@ void BackgroundSync::_produce(OperationContext* txn) { this, stdx::placeholders::_1, stdx::placeholders::_2, - stdx::placeholders::_3), + stdx::placeholders::_3, + &rbidCopyForFetcher), onOplogFetcherShutdownCallbackFn); oplogFetcher = _oplogFetcher.get(); } catch (const mongo::DBException& ex) { @@ -484,20 +480,8 @@ void BackgroundSync::_produce(OperationContext* txn) { } } } - // check that we are at minvalid, otherwise we cannot roll back as we may be in an - // inconsistent state - const auto minValid = StorageInterface::get(txn)->getMinValid(txn); - if (lastApplied < minValid) { - fassertNoTrace(18750, - Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() << "need to rollback, but in inconsistent state. " - << "minvalid: " - << minValid.toString() - << " > our last optime: " - << lastApplied.toString())); - } - _rollback(txn, source, getConnection); + _rollback(txn, source, syncSourceResp.rbid, getConnection); stop(); } else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) { Seconds blacklistDuration(60); @@ -510,14 +494,57 @@ void BackgroundSync::_produce(OperationContext* txn) { } } -void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, - Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info) { +Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + boost::optional<int>* requiredRBID) { + // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back + // since that could cause it to not have our required minValid point. The cursor will be killed + // if the upstream node rolls back so we don't need to keep checking. This must be blocking + // since the Fetcher doesn't give us a way to defer sending the getmores after we return. + if (*requiredRBID) { + auto rbidStatus = Status(ErrorCodes::InternalError, ""); + auto handle = + _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand( + {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr}, + [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + rbidStatus = rbidReply.response.status; + if (!rbidStatus.isOK()) + return; + + rbidStatus = getStatusFromCommandResult(rbidReply.response.data); + if (!rbidStatus.isOK()) + return; + + const auto rbidElem = rbidReply.response.data["rbid"]; + if (rbidElem.type() != NumberInt) { + rbidStatus = Status(ErrorCodes::BadValue, + str::stream() << "Upstream node returned an " + << "rbid with invalid type " + << rbidElem.type()); + return; + } + if (rbidElem.Int() != **requiredRBID) { + rbidStatus = Status(ErrorCodes::BadValue, + "Upstream node rolled back after verifying " + "that it had our MinValid point. Retrying."); + } + }); + if (!handle.isOK()) + return handle.getStatus(); + + _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue()); + if (!rbidStatus.isOK()) + return rbidStatus; + + requiredRBID->reset(); // Don't come back to this block while on this cursor. + } + // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. if (info.toApplyDocumentCount == 0) { - return; // Nothing to do. + return Status::OK(); // Nothing to do. } auto txn = cc().makeOperationContext(); @@ -532,7 +559,7 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, // buffer. stdx::unique_lock<stdx::mutex> lock(_mutex); if (_inShutdown) { - return; + return Status::OK(); } OCCASIONALLY { @@ -561,6 +588,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, // The inference here is basically if the batch is really small, we are "caught up". sleepmillis(kSleepToAllowBatchingMillis); } + + return Status::OK(); } bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) { @@ -589,26 +618,16 @@ void BackgroundSync::consume(OperationContext* txn) { void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, + boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { // Abort only when syncRollback detects we are in a unrecoverable state. // In other cases, we log the message contained in the error status and retry later. auto status = syncRollback(txn, OplogInterfaceLocal(txn, rsOplogName), RollbackSourceImpl(getConnection, source, rsOplogName), + requiredRBID, _replCoord); if (status.isOK()) { - // When the syncTail thread sees there is no new data by adding something to the buffer. - _signalNoNewDataForApplier(txn); - // Wait until the buffer is empty. - // This is an indication that syncTail has removed the sentinal marker from the buffer - // and reset its local lastAppliedOpTime via the replCoord. - while (!_oplogBuffer->isEmpty()) { - sleepmillis(10); - if (inShutdown()) { - return; - } - } - // At this point we are about to leave rollback. Before we do, wait for any writes done // as part of rollback to be durable, and then do any necessary checks that we didn't // wind up rolling back something illegal. We must wait for the rollback to be durable |