diff options
Diffstat (limited to 'src/mongo/db/repl/sync_source_feedback.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 47 |
1 files changed, 33 insertions, 14 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 7c08a04be27..43b84f45e38 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -60,6 +60,7 @@ namespace repl { void SyncSourceFeedback::_resetConnection() { LOG(1) << "resetting connection in sync source feedback"; _connection.reset(); + _fallBackToOldUpdatePosition = false; } bool SyncSourceFeedback::replAuthenticate() { @@ -105,18 +106,24 @@ void SyncSourceFeedback::forwardSlaveProgress() { _cond.notify_all(); } -Status SyncSourceFeedback::updateUpstream(OperationContext* txn) { +Status SyncSourceFeedback::updateUpstream(OperationContext* txn, bool oldStyle) { auto replCoord = repl::ReplicationCoordinator::get(txn); if (replCoord->getMemberState().primary()) { - // primary has no one to update to + // Primary has no one to send updates to. return Status::OK(); } BSONObjBuilder cmd; { stdx::unique_lock<stdx::mutex> lock(_mtx); - // the command could not be created, likely because the node was removed from the set - if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) { - return Status::OK(); + // The command could not be created, likely because this node was removed from the set. + if (!oldStyle) { + if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) { + return Status::OK(); + } + } else { + if (!replCoord->prepareOldReplSetUpdatePositionCommand(&cmd)) { + return Status::OK(); + } } } BSONObj res; @@ -125,8 +132,9 @@ Status SyncSourceFeedback::updateUpstream(OperationContext* txn) { try { _connection->runCommand("admin", cmd.obj(), res); } catch (const DBException& e) { - log() << "SyncSourceFeedback error sending update: " << e.what() << endl; - // blacklist sync target for .5 seconds and find a new one + log() << "SyncSourceFeedback error sending " << (oldStyle ? "old style " : "") + << "update: " << e.what(); + // Blacklist sync target for .5 seconds and find a new one. replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); BackgroundSync::get()->clearSyncTarget(); _resetConnection(); @@ -135,11 +143,15 @@ Status SyncSourceFeedback::updateUpstream(OperationContext* txn) { Status status = Command::getStatusFromCommandResult(res); if (!status.isOK()) { - log() << "SyncSourceFeedback error sending update, response: " << res.toString() << endl; - // blacklist sync target for .5 seconds and find a new one, unless we were rejected due - // to the syncsource having a newer config - if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() || - res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) { + log() << "SyncSourceFeedback error sending " << (oldStyle ? "old style " : "") + << "update, response: " << res.toString(); + if (status == ErrorCodes::BadValue && !oldStyle) { + log() << "SyncSourceFeedback falling back to old style UpdatePosition command"; + _fallBackToOldUpdatePosition = true; + } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() || + res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) { + // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due + // to the syncsource having a newer config. replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); BackgroundSync::get()->clearSyncTarget(); _resetConnection(); @@ -195,9 +207,16 @@ void SyncSourceFeedback::run() { continue; } } - Status status = updateUpstream(txn.get()); + bool oldFallBackValue = _fallBackToOldUpdatePosition; + Status status = updateUpstream(txn.get(), _fallBackToOldUpdatePosition); if (!status.isOK()) { - log() << "updateUpstream failed: " << status << ", will retry"; + if (_fallBackToOldUpdatePosition != oldFallBackValue) { + stdx::unique_lock<stdx::mutex> lock(_mtx); + _positionChanged = true; + } else { + log() << (_fallBackToOldUpdatePosition ? "old style " : "") << "updateUpstream" + << " failed: " << status << ", will retry"; + } } } } |