diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 65 |
1 files changed, 16 insertions, 49 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 56e5ab0e0c0..c0211a21022 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -270,6 +270,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; HostAndPort source; + HostAndPort oldSource = _syncSourceHost; SyncSourceResolverResponse syncSourceResp; { const OpTime minValidSaved = storageInterface->getMinValid(opCtx); @@ -332,6 +333,16 @@ void BackgroundSync::_produce(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); _syncSourceHost = syncSourceResp.getSyncSource(); source = _syncSourceHost; + + // If our sync source has not changed, it is likely caused by our heartbeat data map being + // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting + // for our map to update. + if (oldSource == source) { + log() << "Chose same sync source candidate as last time, " << source + << ". Sleeping for 1 second to avoid immediately choosing a new sync source for " + "the same reason as last time."; + sleepsecs(1); + } } else { if (!syncSourceResp.isOK()) { log() << "failed to find sync source, received error " @@ -367,7 +378,6 @@ void BackgroundSync::_produce(OperationContext* opCtx) { Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); - auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this. OplogFetcher* oplogFetcher; try { auto executor = _replicationCoordinatorExternalState->getTaskExecutor(); @@ -385,13 +395,14 @@ void BackgroundSync::_produce(OperationContext* opCtx) { NamespaceString(rsOplogName), config, _replicationCoordinatorExternalState->getOplogFetcherMaxFetcherRestarts(), + syncSourceResp.rbid, + true /* requireFresherSyncSource */, &dataReplicatorExternalState, stdx::bind(&BackgroundSync::_enqueueDocuments, this, stdx::placeholders::_1, stdx::placeholders::_2, - stdx::placeholders::_3, - &rbidCopyForFetcher), + stdx::placeholders::_3), onOplogFetcherShutdownCallbackFn); oplogFetcher = _oplogFetcher.get(); } catch (const mongo::DBException& ex) { @@ -428,8 +439,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { // Do not blacklist the server here, it will be blacklisted when we try to reuse it, // if it can't return a matching oplog start from the last fetch oplog ts field. return; - } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || - fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) { if (_replCoord->getMemberState().primary()) { // TODO: Abort catchup mode early if rollback detected. warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; @@ -503,50 +513,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) { Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, - const OplogFetcher::DocumentsInfo& info, - boost::optional<int>* requiredRBID) { - // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back - // since that could cause it to not have our required minValid point. The cursor will be killed - // if the upstream node rolls back so we don't need to keep checking. This must be blocking - // since the Fetcher doesn't give us a way to defer sending the getmores after we return. - if (*requiredRBID) { - auto rbidStatus = Status(ErrorCodes::InternalError, ""); - auto handle = - _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand( - {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr}, - [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) { - rbidStatus = rbidReply.response.status; - if (!rbidStatus.isOK()) - return; - - rbidStatus = getStatusFromCommandResult(rbidReply.response.data); - if (!rbidStatus.isOK()) - return; - - const auto rbidElem = rbidReply.response.data["rbid"]; - if (rbidElem.type() != NumberInt) { - rbidStatus = Status(ErrorCodes::BadValue, - str::stream() << "Upstream node returned an " - << "rbid with invalid type " - << rbidElem.type()); - return; - } - if (rbidElem.Int() != **requiredRBID) { - rbidStatus = Status(ErrorCodes::BadValue, - "Upstream node rolled back after verifying " - "that it had our MinValid point. Retrying."); - } - }); - if (!handle.isOK()) - return handle.getStatus(); - - _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue()); - if (!rbidStatus.isOK()) - return rbidStatus; - - requiredRBID->reset(); // Don't come back to this block while on this cursor. - } - + const OplogFetcher::DocumentsInfo& info) { // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. |