summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_source_feedback.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_source_feedback.cpp')
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp241
1 files changed, 119 insertions, 122 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 49c70c3c2b7..602523a5471 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -51,161 +51,158 @@
namespace mongo {
- using std::endl;
- using std::string;
+using std::endl;
+using std::string;
namespace repl {
- SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false),
- _shutdownSignaled(false) {}
- SyncSourceFeedback::~SyncSourceFeedback() {}
+SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false), _shutdownSignaled(false) {}
+SyncSourceFeedback::~SyncSourceFeedback() {}
- void SyncSourceFeedback::_resetConnection() {
- LOG(1) << "resetting connection in sync source feedback";
- _connection.reset();
- }
+void SyncSourceFeedback::_resetConnection() {
+ LOG(1) << "resetting connection in sync source feedback";
+ _connection.reset();
+}
- bool SyncSourceFeedback::replAuthenticate() {
- if (!getGlobalAuthorizationManager()->isAuthEnabled())
- return true;
+bool SyncSourceFeedback::replAuthenticate() {
+ if (!getGlobalAuthorizationManager()->isAuthEnabled())
+ return true;
- if (!isInternalAuthSet())
- return false;
- return authenticateInternalUser(_connection.get());
- }
+ if (!isInternalAuthSet())
+ return false;
+ return authenticateInternalUser(_connection.get());
+}
- bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) {
- if (hasConnection()) {
- return true;
- }
- log() << "setting syncSourceFeedback to " << host.toString();
- _connection.reset(new DBClientConnection(false, OplogReader::tcp_timeout));
- string errmsg;
- try {
- if (!_connection->connect(host, errmsg) ||
- (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) {
- _resetConnection();
- log() << errmsg << endl;
- return false;
- }
- }
- catch (const DBException& e) {
- error() << "Error connecting to " << host.toString() << ": " << e.what();
+bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) {
+ if (hasConnection()) {
+ return true;
+ }
+ log() << "setting syncSourceFeedback to " << host.toString();
+ _connection.reset(new DBClientConnection(false, OplogReader::tcp_timeout));
+ string errmsg;
+ try {
+ if (!_connection->connect(host, errmsg) ||
+ (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) {
_resetConnection();
+ log() << errmsg << endl;
return false;
}
-
- return hasConnection();
+ } catch (const DBException& e) {
+ error() << "Error connecting to " << host.toString() << ": " << e.what();
+ _resetConnection();
+ return false;
}
- void SyncSourceFeedback::forwardSlaveProgress() {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- _cond.notify_all();
- }
+ return hasConnection();
+}
+
+void SyncSourceFeedback::forwardSlaveProgress() {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
+ _cond.notify_all();
+}
- Status SyncSourceFeedback::updateUpstream(OperationContext* txn) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- if (replCoord->getMemberState().primary()) {
- // primary has no one to update to
+Status SyncSourceFeedback::updateUpstream(OperationContext* txn) {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ if (replCoord->getMemberState().primary()) {
+ // primary has no one to update to
+ return Status::OK();
+ }
+ BSONObjBuilder cmd;
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ // the command could not be created, likely because the node was removed from the set
+ if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) {
return Status::OK();
}
- BSONObjBuilder cmd;
- {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- // the command could not be created, likely because the node was removed from the set
- if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) {
- return Status::OK();
- }
- }
- BSONObj res;
+ }
+ BSONObj res;
+
+ LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done();
+ try {
+ _connection->runCommand("admin", cmd.obj(), res);
+ } catch (const DBException& e) {
+ log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
+ // blacklist sync target for .5 seconds and find a new one
+ replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
+ BackgroundSync::get()->clearSyncTarget();
+ _resetConnection();
+ return e.toStatus();
+ }
- LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done();
- try {
- _connection->runCommand("admin", cmd.obj(), res);
- }
- catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
- // blacklist sync target for .5 seconds and find a new one
+ Status status = Command::getStatusFromCommandResult(res);
+ if (!status.isOK()) {
+ log() << "SyncSourceFeedback error sending update, response: " << res.toString() << endl;
+ // blacklist sync target for .5 seconds and find a new one, unless we were rejected due
+ // to the syncsource having a newer config
+ if (status != ErrorCodes::InvalidReplicaSetConfig || res["cfgver"].eoo() ||
+ res["cfgver"].numberLong() < replCoord->getConfig().getConfigVersion()) {
replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
BackgroundSync::get()->clearSyncTarget();
_resetConnection();
- return e.toStatus();
- }
-
- Status status = Command::getStatusFromCommandResult(res);
- if (!status.isOK()) {
- log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl;
- // blacklist sync target for .5 seconds and find a new one, unless we were rejected due
- // to the syncsource having a newer config
- if (status != ErrorCodes::InvalidReplicaSetConfig || res["cfgver"].eoo() ||
- res["cfgver"].numberLong() < replCoord->getConfig().getConfigVersion()) {
- replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
- BackgroundSync::get()->clearSyncTarget();
- _resetConnection();
- }
}
-
- return status;
}
- void SyncSourceFeedback::shutdown() {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _shutdownSignaled = true;
- _cond.notify_all();
- }
+ return status;
+}
- void SyncSourceFeedback::run() {
- Client::initThread("SyncSourceFeedback");
+void SyncSourceFeedback::shutdown() {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _shutdownSignaled = true;
+ _cond.notify_all();
+}
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- while (true) { // breaks once _shutdownSignaled is true
- {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- while (!_positionChanged && !_shutdownSignaled) {
- _cond.wait(lock);
- }
+void SyncSourceFeedback::run() {
+ Client::initThread("SyncSourceFeedback");
- if (_shutdownSignaled) {
- break;
- }
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ while (true) { // breaks once _shutdownSignaled is true
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ while (!_positionChanged && !_shutdownSignaled) {
+ _cond.wait(lock);
+ }
- _positionChanged = false;
+ if (_shutdownSignaled) {
+ break;
}
- auto txn = cc().makeOperationContext();
- MemberState state = replCoord->getMemberState();
- if (state.primary() || state.startup()) {
- _resetConnection();
+ _positionChanged = false;
+ }
+
+ auto txn = cc().makeOperationContext();
+ MemberState state = replCoord->getMemberState();
+ if (state.primary() || state.startup()) {
+ _resetConnection();
+ continue;
+ }
+ const HostAndPort target = BackgroundSync::get()->getSyncTarget();
+ if (_syncTarget != target) {
+ _resetConnection();
+ _syncTarget = target;
+ }
+ if (!hasConnection()) {
+ // fix connection if need be
+ if (target.empty()) {
+ sleepmillis(500);
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
continue;
}
- const HostAndPort target = BackgroundSync::get()->getSyncTarget();
- if (_syncTarget != target) {
- _resetConnection();
- _syncTarget = target;
- }
- if (!hasConnection()) {
- // fix connection if need be
- if (target.empty()) {
- sleepmillis(500);
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- continue;
- }
- if (!_connect(txn.get(), target)) {
- sleepmillis(500);
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- continue;
- }
- }
- Status status = updateUpstream(txn.get());
- if (!status.isOK()) {
+ if (!_connect(txn.get(), target)) {
sleepmillis(500);
stdx::unique_lock<stdx::mutex> lock(_mtx);
_positionChanged = true;
+ continue;
}
}
+ Status status = updateUpstream(txn.get());
+ if (!status.isOK()) {
+ sleepmillis(500);
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
+ }
}
-} // namespace repl
-} // namespace mongo
+}
+} // namespace repl
+} // namespace mongo