diff options
Diffstat (limited to 'src/mongo/db/repl/sync_source_feedback.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 241 |
1 files changed, 119 insertions, 122 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 49c70c3c2b7..602523a5471 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -51,161 +51,158 @@ namespace mongo { - using std::endl; - using std::string; +using std::endl; +using std::string; namespace repl { - SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false), - _shutdownSignaled(false) {} - SyncSourceFeedback::~SyncSourceFeedback() {} +SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false), _shutdownSignaled(false) {} +SyncSourceFeedback::~SyncSourceFeedback() {} - void SyncSourceFeedback::_resetConnection() { - LOG(1) << "resetting connection in sync source feedback"; - _connection.reset(); - } +void SyncSourceFeedback::_resetConnection() { + LOG(1) << "resetting connection in sync source feedback"; + _connection.reset(); +} - bool SyncSourceFeedback::replAuthenticate() { - if (!getGlobalAuthorizationManager()->isAuthEnabled()) - return true; +bool SyncSourceFeedback::replAuthenticate() { + if (!getGlobalAuthorizationManager()->isAuthEnabled()) + return true; - if (!isInternalAuthSet()) - return false; - return authenticateInternalUser(_connection.get()); - } + if (!isInternalAuthSet()) + return false; + return authenticateInternalUser(_connection.get()); +} - bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) { - if (hasConnection()) { - return true; - } - log() << "setting syncSourceFeedback to " << host.toString(); - _connection.reset(new DBClientConnection(false, OplogReader::tcp_timeout)); - string errmsg; - try { - if (!_connection->connect(host, errmsg) || - (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) { - _resetConnection(); - log() << errmsg << endl; - return false; - } - } - catch (const DBException& e) { - error() << "Error connecting to " << host.toString() << ": " << e.what(); +bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) { + if (hasConnection()) { + return true; + } + log() << "setting syncSourceFeedback to " << host.toString(); + _connection.reset(new DBClientConnection(false, OplogReader::tcp_timeout)); + string errmsg; + try { + if (!_connection->connect(host, errmsg) || + (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) { _resetConnection(); + log() << errmsg << endl; return false; } - - return hasConnection(); + } catch (const DBException& e) { + error() << "Error connecting to " << host.toString() << ": " << e.what(); + _resetConnection(); + return false; } - void SyncSourceFeedback::forwardSlaveProgress() { - stdx::unique_lock<stdx::mutex> lock(_mtx); - _positionChanged = true; - _cond.notify_all(); - } + return hasConnection(); +} + +void SyncSourceFeedback::forwardSlaveProgress() { + stdx::unique_lock<stdx::mutex> lock(_mtx); + _positionChanged = true; + _cond.notify_all(); +} - Status SyncSourceFeedback::updateUpstream(OperationContext* txn) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (replCoord->getMemberState().primary()) { - // primary has no one to update to +Status SyncSourceFeedback::updateUpstream(OperationContext* txn) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + if (replCoord->getMemberState().primary()) { + // primary has no one to update to + return Status::OK(); + } + BSONObjBuilder cmd; + { + stdx::unique_lock<stdx::mutex> lock(_mtx); + // the command could not be created, likely because the node was removed from the set + if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) { return Status::OK(); } - BSONObjBuilder cmd; - { - stdx::unique_lock<stdx::mutex> lock(_mtx); - // the command could not be created, likely because the node was removed from the set - if (!replCoord->prepareReplSetUpdatePositionCommand(&cmd)) { - return Status::OK(); - } - } - BSONObj res; + } + BSONObj res; + + LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done(); + try { + _connection->runCommand("admin", cmd.obj(), res); + } catch (const DBException& e) { + log() << "SyncSourceFeedback error sending update: " << e.what() << endl; + // blacklist sync target for .5 seconds and find a new one + replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); + BackgroundSync::get()->clearSyncTarget(); + _resetConnection(); + return e.toStatus(); + } - LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done(); - try { - _connection->runCommand("admin", cmd.obj(), res); - } - catch (const DBException& e) { - log() << "SyncSourceFeedback error sending update: " << e.what() << endl; - // blacklist sync target for .5 seconds and find a new one + Status status = Command::getStatusFromCommandResult(res); + if (!status.isOK()) { + log() << "SyncSourceFeedback error sending update, response: " << res.toString() << endl; + // blacklist sync target for .5 seconds and find a new one, unless we were rejected due + // to the syncsource having a newer config + if (status != ErrorCodes::InvalidReplicaSetConfig || res["cfgver"].eoo() || + res["cfgver"].numberLong() < replCoord->getConfig().getConfigVersion()) { replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); BackgroundSync::get()->clearSyncTarget(); _resetConnection(); - return e.toStatus(); - } - - Status status = Command::getStatusFromCommandResult(res); - if (!status.isOK()) { - log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl; - // blacklist sync target for .5 seconds and find a new one, unless we were rejected due - // to the syncsource having a newer config - if (status != ErrorCodes::InvalidReplicaSetConfig || res["cfgver"].eoo() || - res["cfgver"].numberLong() < replCoord->getConfig().getConfigVersion()) { - replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); - BackgroundSync::get()->clearSyncTarget(); - _resetConnection(); - } } - - return status; } - void SyncSourceFeedback::shutdown() { - stdx::unique_lock<stdx::mutex> lock(_mtx); - _shutdownSignaled = true; - _cond.notify_all(); - } + return status; +} - void SyncSourceFeedback::run() { - Client::initThread("SyncSourceFeedback"); +void SyncSourceFeedback::shutdown() { + stdx::unique_lock<stdx::mutex> lock(_mtx); + _shutdownSignaled = true; + _cond.notify_all(); +} - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - while (true) { // breaks once _shutdownSignaled is true - { - stdx::unique_lock<stdx::mutex> lock(_mtx); - while (!_positionChanged && !_shutdownSignaled) { - _cond.wait(lock); - } +void SyncSourceFeedback::run() { + Client::initThread("SyncSourceFeedback"); - if (_shutdownSignaled) { - break; - } + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + while (true) { // breaks once _shutdownSignaled is true + { + stdx::unique_lock<stdx::mutex> lock(_mtx); + while (!_positionChanged && !_shutdownSignaled) { + _cond.wait(lock); + } - _positionChanged = false; + if (_shutdownSignaled) { + break; } - auto txn = cc().makeOperationContext(); - MemberState state = replCoord->getMemberState(); - if (state.primary() || state.startup()) { - _resetConnection(); + _positionChanged = false; + } + + auto txn = cc().makeOperationContext(); + MemberState state = replCoord->getMemberState(); + if (state.primary() || state.startup()) { + _resetConnection(); + continue; + } + const HostAndPort target = BackgroundSync::get()->getSyncTarget(); + if (_syncTarget != target) { + _resetConnection(); + _syncTarget = target; + } + if (!hasConnection()) { + // fix connection if need be + if (target.empty()) { + sleepmillis(500); + stdx::unique_lock<stdx::mutex> lock(_mtx); + _positionChanged = true; continue; } - const HostAndPort target = BackgroundSync::get()->getSyncTarget(); - if (_syncTarget != target) { - _resetConnection(); - _syncTarget = target; - } - if (!hasConnection()) { - // fix connection if need be - if (target.empty()) { - sleepmillis(500); - stdx::unique_lock<stdx::mutex> lock(_mtx); - _positionChanged = true; - continue; - } - if (!_connect(txn.get(), target)) { - sleepmillis(500); - stdx::unique_lock<stdx::mutex> lock(_mtx); - _positionChanged = true; - continue; - } - } - Status status = updateUpstream(txn.get()); - if (!status.isOK()) { + if (!_connect(txn.get(), target)) { sleepmillis(500); stdx::unique_lock<stdx::mutex> lock(_mtx); _positionChanged = true; + continue; } } + Status status = updateUpstream(txn.get()); + if (!status.isOK()) { + sleepmillis(500); + stdx::unique_lock<stdx::mutex> lock(_mtx); + _positionChanged = true; + } } -} // namespace repl -} // namespace mongo +} +} // namespace repl +} // namespace mongo |