summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_source_feedback.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_source_feedback.cpp')
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp47
1 files changed, 33 insertions, 14 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 7c08a04be27..43b84f45e38 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -60,6 +60,7 @@ namespace repl {
void SyncSourceFeedback::_resetConnection() {
LOG(1) << "resetting connection in sync source feedback";
_connection.reset();
+ _fallBackToOldUpdatePosition = false;
}
bool SyncSourceFeedback::replAuthenticate() {
@@ -105,18 +106,24 @@ void SyncSourceFeedback::forwardSlaveProgress() {
_cond.notify_all();
}
-Status SyncSourceFeedback::updateUpstream(OperationContext* txn) {
+Status SyncSourceFeedback::updateUpstream(OperationContext* txn, bool oldStyle) {
auto replCoord = repl::ReplicationCoordinator::get(txn);
if (replCoord->getMemberState().primary()) {
- // primary has no one to update to
+ // Primary has no one to send updates to.
return Status::OK();
}
BSONObjBuilder cmd;
{
stdx::unique_lock<stdx::mutex> lock(_mtx);
- // the command could not be created, likely because the node was removed from the set
- if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) {
- return Status::OK();
+ // The command could not be created, likely because this node was removed from the set.
+ if (!oldStyle) {
+ if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) {
+ return Status::OK();
+ }
+ } else {
+ if (!replCoord->prepareOldReplSetUpdatePositionCommand(&cmd)) {
+ return Status::OK();
+ }
}
}
BSONObj res;
@@ -125,8 +132,9 @@ Status SyncSourceFeedback::updateUpstream(OperationContext* txn) {
try {
_connection->runCommand("admin", cmd.obj(), res);
} catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
- // blacklist sync target for .5 seconds and find a new one
+ log() << "SyncSourceFeedback error sending " << (oldStyle ? "old style " : "")
+ << "update: " << e.what();
+ // Blacklist sync target for .5 seconds and find a new one.
replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
BackgroundSync::get()->clearSyncTarget();
_resetConnection();
@@ -135,11 +143,15 @@ Status SyncSourceFeedback::updateUpstream(OperationContext* txn) {
Status status = Command::getStatusFromCommandResult(res);
if (!status.isOK()) {
- log() << "SyncSourceFeedback error sending update, response: " << res.toString() << endl;
- // blacklist sync target for .5 seconds and find a new one, unless we were rejected due
- // to the syncsource having a newer config
- if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() ||
- res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) {
+ log() << "SyncSourceFeedback error sending " << (oldStyle ? "old style " : "")
+ << "update, response: " << res.toString();
+ if (status == ErrorCodes::BadValue && !oldStyle) {
+ log() << "SyncSourceFeedback falling back to old style UpdatePosition command";
+ _fallBackToOldUpdatePosition = true;
+ } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() ||
+ res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) {
+ // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due
+ // to the syncsource having a newer config.
replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
BackgroundSync::get()->clearSyncTarget();
_resetConnection();
@@ -195,9 +207,16 @@ void SyncSourceFeedback::run() {
continue;
}
}
- Status status = updateUpstream(txn.get());
+ bool oldFallBackValue = _fallBackToOldUpdatePosition;
+ Status status = updateUpstream(txn.get(), _fallBackToOldUpdatePosition);
if (!status.isOK()) {
- log() << "updateUpstream failed: " << status << ", will retry";
+ if (_fallBackToOldUpdatePosition != oldFallBackValue) {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
+ } else {
+ log() << (_fallBackToOldUpdatePosition ? "old style " : "") << "updateUpstream"
+ << " failed: " << status << ", will retry";
+ }
}
}
}