summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp65
1 files changed, 16 insertions, 49 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 56e5ab0e0c0..c0211a21022 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -270,6 +270,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
+ HostAndPort oldSource = _syncSourceHost;
SyncSourceResolverResponse syncSourceResp;
{
const OpTime minValidSaved = storageInterface->getMinValid(opCtx);
@@ -332,6 +333,16 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_syncSourceHost = syncSourceResp.getSyncSource();
source = _syncSourceHost;
+
+ // If our sync source has not changed, it is likely caused by our heartbeat data map being
+ // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
+ // for our map to update.
+ if (oldSource == source) {
+ log() << "Chose same sync source candidate as last time, " << source
+ << ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
+ "the same reason as last time.";
+ sleepsecs(1);
+ }
} else {
if (!syncSourceResp.isOK()) {
log() << "failed to find sync source, received error "
@@ -367,7 +378,6 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
- auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this.
OplogFetcher* oplogFetcher;
try {
auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
@@ -385,13 +395,14 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
NamespaceString(rsOplogName),
config,
_replicationCoordinatorExternalState->getOplogFetcherMaxFetcherRestarts(),
+ syncSourceResp.rbid,
+ true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
stdx::bind(&BackgroundSync::_enqueueDocuments,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
- stdx::placeholders::_3,
- &rbidCopyForFetcher),
+ stdx::placeholders::_3),
onOplogFetcherShutdownCallbackFn);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException& ex) {
@@ -428,8 +439,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// Do not blacklist the server here, it will be blacklisted when we try to reuse it,
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
- } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
- fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
if (_replCoord->getMemberState().primary()) {
// TODO: Abort catchup mode early if rollback detected.
warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
@@ -503,50 +513,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info,
- boost::optional<int>* requiredRBID) {
- // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
- // since that could cause it to not have our required minValid point. The cursor will be killed
- // if the upstream node rolls back so we don't need to keep checking. This must be blocking
- // since the Fetcher doesn't give us a way to defer sending the getmores after we return.
- if (*requiredRBID) {
- auto rbidStatus = Status(ErrorCodes::InternalError, "");
- auto handle =
- _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand(
- {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr},
- [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
- rbidStatus = rbidReply.response.status;
- if (!rbidStatus.isOK())
- return;
-
- rbidStatus = getStatusFromCommandResult(rbidReply.response.data);
- if (!rbidStatus.isOK())
- return;
-
- const auto rbidElem = rbidReply.response.data["rbid"];
- if (rbidElem.type() != NumberInt) {
- rbidStatus = Status(ErrorCodes::BadValue,
- str::stream() << "Upstream node returned an "
- << "rbid with invalid type "
- << rbidElem.type());
- return;
- }
- if (rbidElem.Int() != **requiredRBID) {
- rbidStatus = Status(ErrorCodes::BadValue,
- "Upstream node rolled back after verifying "
- "that it had our MinValid point. Retrying.");
- }
- });
- if (!handle.isOK())
- return handle.getStatus();
-
- _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue());
- if (!rbidStatus.isOK())
- return rbidStatus;
-
- requiredRBID->reset(); // Don't come back to this block while on this cursor.
- }
-
+ const OplogFetcher::DocumentsInfo& info) {
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.