diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 162 |
1 files changed, 118 insertions, 44 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 7e2587b4561..37ba22bf1a7 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -54,6 +54,8 @@ #include "mongo/db/repl/rs_sync.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/executor/network_interface_factory.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" @@ -242,6 +244,8 @@ void BackgroundSync::_producerThread() { return; } + invariant(!state.rollback()); + // We need to wait until initial sync has started. if (_replCoord->getMyLastAppliedOpTime().isNull()) { sleepsecs(1); @@ -296,7 +300,9 @@ void BackgroundSync::_produce(OperationContext* txn) { minValid = minValidSaved; } } - syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord); + + int rbid; + syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord, &rbid); // no server found if (syncSourceReader.getHost().empty()) { @@ -348,7 +354,8 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched, lastHashFetched, fetcherMaxTimeMS, - &fetcherReturnStatus); + &fetcherReturnStatus, + rbid); BSONObjBuilder cmdBob; @@ -436,19 +443,8 @@ void BackgroundSync::_produce(OperationContext* txn) { } } } - // check that we are at minvalid, otherwise we cannot roll back as we may be in an - // inconsistent state - const auto minValid = getMinValid(txn); - if (lastApplied < minValid) { - fassertNoTrace(18750, - Status(ErrorCodes::UnrecoverableRollbackError, - str::stream() - << "need to rollback, but in inconsistent state. " - << "minvalid: " << minValid.toString() - << " > our last optime: " << lastApplied.toString())); - } - _rollback(txn, source, getConnection); + _rollback(txn, source, rbid, getConnection); stop(); } else if (!fetcherReturnStatus.isOK()) { warning() << "Fetcher error querying oplog: " << fetcherReturnStatus.toString(); @@ -461,7 +457,8 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& OpTime lastOpTimeFetched, long long lastFetchedHash, Milliseconds fetcherMaxTimeMS, - Status* returnStatus) { + Status* returnStatus, + int rbid) { // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor if (!result.isOK()) { @@ -515,6 +512,46 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { + // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back + // since that could cause it to not have our required minValid point. The cursor will be + // killed if the upstream node rolls back so we don't need to keep checking. This must be + // blocking since the Fetcher doesn't give us a way to defer sending the getmores after we + // return. + auto handle = _threadPoolTaskExecutor.scheduleRemoteCommand( + {source, "admin", BSON("replSetGetRBID" << 1)}, + [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { + *returnStatus = rbidReply.response.getStatus(); + if (!returnStatus->isOK()) + return; + + const auto& rbidReplyObj = rbidReply.response.getValue().data; + *returnStatus = getStatusFromCommandResult(rbidReplyObj); + if (!returnStatus->isOK()) + return; + + const auto rbidElem = rbidReplyObj["rbid"]; + if (rbidElem.type() != NumberInt) { + *returnStatus = + Status(ErrorCodes::BadValue, + str::stream() << "Upstream node returned an " + << "rbid with invalid type " << rbidElem.type()); + return; + } + if (rbidElem.Int() != rbid) { + *returnStatus = Status(ErrorCodes::BadValue, + "Upstream node rolled back after verifying " + "that it had our MinValid point. Retrying."); + } + }); + if (!handle.isOK()) { + *returnStatus = handle.getStatus(); + return; + } + + _threadPoolTaskExecutor.wait(handle.getValue()); + if (!returnStatus->isOK()) + return; + auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> { if (firstDocToApply == lastDocToApply) { return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing"); @@ -702,40 +739,77 @@ void BackgroundSync::consume() { void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, + boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { - // Abort only when syncRollback detects we are in a unrecoverable state. - // In other cases, we log the message contained in the error status and retry later. - auto status = syncRollback(txn, - OplogInterfaceLocal(txn, rsOplogName), - RollbackSourceImpl(getConnection, source, rsOplogName), - _replCoord); - if (status.isOK()) { - // When the syncTail thread sees there is no new data by adding something to the buffer. - _signalNoNewDataForApplier(); - // Wait until the buffer is empty. - // This is an indication that syncTail has removed the sentinal marker from the buffer - // and reset its local lastAppliedOpTime via the replCoord. - while (!_buffer.empty()) { - sleepmillis(10); - if (inShutdown()) { - return; - } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from + // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any + // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or + // after transitioning to RECOVERING. We always transition to RECOVERING regardless of success + // or (recoverable) failure since we may be in an inconsistent state. If rollback failed before + // writing anything, SyncTail will quickly take us to SECONDARY since are are still at our + // original MinValid, which is fine because we may choose a sync source that doesn't require + // rollback. If it failed after we wrote to MinValid, then we will pick a sync source that will + // cause us to roll back to the same common point, which is fine. If we succeeded, we will be + // consistent as soon as we apply up to/through MinValid and SyncTail will make us SECONDARY + // then. + { + log() << "rollback 0"; + Lock::GlobalWrite globalWrite(txn->lockState()); + if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { + log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to " + << MemberState(MemberState::RS_ROLLBACK).toString(); + return; } + } - // It is now safe to clear the ROLLBACK state, which may result in the applier thread - // transitioning to SECONDARY. This is safe because the applier thread has now reloaded - // the new rollback minValid from the database. - if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { - warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) - << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) - << " but found self in " << _replCoord->getMemberState(); + try { + auto status = syncRollback(txn, + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(getConnection, source, rsOplogName), + requiredRBID, + _replCoord); + + // Abort only when syncRollback detects we are in a unrecoverable state. + // WARNING: these statuses sometimes have location codes which are lost with uassertStatusOK + // so we need to check here first. + if (ErrorCodes::UnrecoverableRollbackError == status.code()) { + severe() << "Unable to complete rollback. A full resync may be needed: " << status; + fassertFailedNoTrace(28723); } - return; - } - if (ErrorCodes::UnrecoverableRollbackError == status.code()) { - fassertNoTrace(28723, status); + + // In other cases, we log the message contained in the error status and retry later. + uassertStatusOK(status); + } catch (const DBException& ex) { + // UnrecoverableRollbackError should only come from a returned status which is handled + // above. + invariant(ex.getCode() != ErrorCodes::UnrecoverableRollbackError); + + warning() << "rollback cannot complete at this time (retrying later): " << ex + << " appliedThrough=" << _replCoord->getMyLastAppliedOpTime() + << " minvalid=" << getMinValid(txn); + + // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If + // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this + // will also prevent us from hot-looping and putting too much load on upstream nodes. + sleepsecs(5); // 5 seconds was chosen as a completely arbitrary amount of time. + } catch (...) { + std::terminate(); + } + + // At this point we are about to leave rollback. Before we do, wait for any writes done + // as part of rollback to be durable, and then do any necessary checks that we didn't + // wind up rolling back something illegal. We must wait for the rollback to be durable + // so that if we wind up shutting down uncleanly in response to something we rolled back + // we know that we won't wind up right back in the same situation when we start back up + // because the rollback wasn't durable. + txn->recoveryUnit()->waitUntilDurable(); + + if (!_replCoord->setFollowerMode(MemberState::RS_RECOVERING)) { + severe() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) + << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) + << " but found self in " << _replCoord->getMemberState(); + fassertFailedNoTrace(40364); } - warning() << "rollback cannot proceed at this time (retrying later): " << status; } HostAndPort BackgroundSync::getSyncTarget() { |