diff options
author | Benety Goh <benety@mongodb.com> | 2016-03-10 21:30:24 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-03-10 21:30:24 -0500 |
commit | 81e73bae1413668fc53183ac693d87341561dc0f (patch) | |
tree | 94aa6a4b25e528fc39abe59899098a14e12f688f | |
parent | 052c463c90eb19e73f2e9b2f6fbd4eb669c83855 (diff) | |
download | mongo-81e73bae1413668fc53183ac693d87341561dc0f.tar.gz |
Revert "SERVER-18029 integrated Reporter into SyncSourceFeedback"
This reverts commit 052c463c90eb19e73f2e9b2f6fbd4eb669c83855.
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 275 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.h | 40 |
2 files changed, 155 insertions, 160 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 43a9b077d83..7ea085dcc23 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -32,23 +32,23 @@ #include "mongo/db/repl/sync_source_feedback.h" -#include "mongo/db/client.h" +#include "mongo/client/constants.h" +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/auth/authorization_manager.h" +#include "mongo/db/auth/authorization_manager_global.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/internal_user_auth.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/replica_set_config.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/operation_context.h" -#include "mongo/executor/network_interface_factory.h" -#include "mongo/executor/network_interface_thread_pool.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/time_support.h" namespace mongo { @@ -57,95 +57,108 @@ using std::string; namespace repl { -namespace { +void SyncSourceFeedback::_resetConnection() { + LOG(1) << "resetting connection in sync source feedback"; + _connection.reset(); + _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle; +} -/** - * Calculates the keep alive interval based on the current configuration in the replication - * coordinator. - */ -Milliseconds calculateKeepAliveInterval(OperationContext* txn, stdx::mutex& mtx) { - stdx::lock_guard<stdx::mutex> lock(mtx); - auto replCoord = repl::ReplicationCoordinator::get(txn); - auto rsConfig = replCoord->getConfig(); - auto keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2; - return keepAliveInterval; +bool SyncSourceFeedback::replAuthenticate() { + if (!getGlobalAuthorizationManager()->isAuthEnabled()) + return true; + + if (!isInternalAuthSet()) + return false; + return _connection->authenticateInternalUser(); } -/** - * Returns function to prepare update command - */ -Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn( - OperationContext* txn, stdx::mutex& mtx, const HostAndPort& syncTarget) { - return [&mtx, syncTarget, txn]( - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) - -> StatusWith<BSONObj> { - auto currentSyncTarget = BackgroundSync::get()->getSyncTarget(); - if (currentSyncTarget != syncTarget) { - // Change in sync target - return Status(ErrorCodes::InvalidSyncSource, "Sync target is no longer valid"); - } +bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) { + if (hasConnection()) { + return true; + } + log() << "setting syncSourceFeedback to " << host.toString(); + _connection.reset( + new DBClientConnection(false, durationCount<Seconds>(OplogReader::kSocketTimeout))); + string errmsg; + try { + if (!_connection->connect(host, errmsg) || + (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) { + _resetConnection(); + log() << errmsg << endl; + return false; + } + } catch (const DBException& e) { + error() << "Error connecting to " << host.toString() << ": " << e.what(); + _resetConnection(); + return false; + } - stdx::lock_guard<stdx::mutex> lock(mtx); - auto replCoord = repl::ReplicationCoordinator::get(txn); - if (replCoord->getMemberState().primary()) { - // Primary has no one to send updates to. - return Status(ErrorCodes::InvalidSyncSource, - "Currently primary - no one to send updates to"); - } + // Update keepalive value from config. + auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig(); + _keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2; - return replCoord->prepareReplSetUpdatePositionCommand(commandStyle); - }; + return hasConnection(); } -} // namespace - void SyncSourceFeedback::forwardSlaveProgress() { - { - stdx::unique_lock<stdx::mutex> lock(_mtx); - _positionChanged = true; - _cond.notify_all(); - if (_reporter) { - auto triggerStatus = _reporter->trigger(); - if (!triggerStatus.isOK()) { - warning() << "unable to forward slave progress to " << _reporter->getTarget() - << ": " << triggerStatus; - } - } - } + stdx::lock_guard<stdx::mutex> lock(_mtx); + _positionChanged = true; + _cond.notify_all(); } -Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) { - Reporter* reporter; +Status SyncSourceFeedback::updateUpstream( + OperationContext* txn, ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) { + auto replCoord = repl::ReplicationCoordinator::get(txn); + if (replCoord->getMemberState().primary()) { + // Primary has no one to send updates to. + return Status::OK(); + } + BSONObj cmd; { - stdx::lock_guard<stdx::mutex> lock(_mtx); - reporter = _reporter; + stdx::unique_lock<stdx::mutex> lock(_mtx); + // The command could not be created, likely because this node was removed from the set. + auto prepareResult = replCoord->prepareReplSetUpdatePositionCommand(commandStyle); + if (!prepareResult.isOK()) { + return Status::OK(); + } + cmd = prepareResult.getValue(); } - - auto syncTarget = reporter->getTarget(); - - auto triggerStatus = reporter->trigger(); - if (!triggerStatus.isOK()) { - warning() << "unable to schedule reporter to update replication progress on " << syncTarget - << ": " << triggerStatus; - return triggerStatus; + BSONObj res; + + LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd; + try { + _connection->runCommand("admin", cmd, res); + } catch (const DBException& e) { + log() << "SyncSourceFeedback error sending " + << (commandStyle == + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle + ? "old style " + : "") << "update: " << e.what(); + // Blacklist sync target for .5 seconds and find a new one. + replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); + BackgroundSync::get()->clearSyncTarget(); + _resetConnection(); + return e.toStatus(); } - auto status = reporter->join(); - + Status status = Command::getStatusFromCommandResult(res); if (!status.isOK()) { - log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status; - - // Some errors should not cause result in blacklisting the sync source. - if (status != ErrorCodes::InvalidSyncSource) { - // The command could not be created because the node is now primary. - } else if (status != ErrorCodes::NodeNotFound) { - // The command could not be created, likely because this node was removed from the set. - } else { - // Blacklist sync target for .5 seconds and find a new one. - stdx::lock_guard<stdx::mutex> lock(_mtx); - auto replCoord = repl::ReplicationCoordinator::get(txn); - replCoord->blacklistSyncSource(syncTarget, Date_t::now() + Milliseconds(500)); + log() << "SyncSourceFeedback error sending " + << (commandStyle == + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle + ? "old style " + : "") << "update, response: " << res.toString(); + if (status == ErrorCodes::BadValue && + commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) { + log() << "SyncSourceFeedback falling back to old style UpdatePosition command"; + _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle; + } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() || + res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) { + // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due + // to the syncsource having a newer config. + replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); BackgroundSync::get()->clearSyncTarget(); + _resetConnection(); } } @@ -154,9 +167,6 @@ Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) { void SyncSourceFeedback::shutdown() { stdx::unique_lock<stdx::mutex> lock(_mtx); - if (_reporter) { - _reporter->shutdown(); - } _shutdownSignaled = true; _cond.notify_all(); } @@ -164,35 +174,12 @@ void SyncSourceFeedback::shutdown() { void SyncSourceFeedback::run() { Client::initThread("SyncSourceFeedback"); - // Task executor used to run replSetUpdatePosition command on sync source. - auto net = executor::makeNetworkInterface("NetworkInterfaceASIO-SyncSourceFeedback"); - auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get()); - executor::ThreadPoolTaskExecutor executor(std::move(pool), std::move(net)); - executor.startup(); - ON_BLOCK_EXIT([&executor]() { - executor.shutdown(); - executor.join(); - }); - - HostAndPort syncTarget; - - // keepAliveInterval indicates how frequently to forward progress in the absence of updates. - Milliseconds keepAliveInterval(0); - while (true) { // breaks once _shutdownSignaled is true auto txn = cc().makeOperationContext(); - - if (keepAliveInterval == Milliseconds(0)) { - keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx); - } - { - // Take SyncSourceFeedback lock before calling into ReplicationCoordinator - // to avoid deadlock because ReplicationCoordinator could conceivably calling back into - // this class. stdx::unique_lock<stdx::mutex> lock(_mtx); while (!_positionChanged && !_shutdownSignaled) { - if (_cond.wait_for(lock, keepAliveInterval) == stdx::cv_status::timeout) { + if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) { MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); if (!(state.primary() || state.startup())) { break; @@ -207,57 +194,39 @@ void SyncSourceFeedback::run() { _positionChanged = false; } - { - stdx::lock_guard<stdx::mutex> lock(_mtx); - MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); - if (state.primary() || state.startup()) { - continue; - } + MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); + if (state.primary() || state.startup()) { + _resetConnection(); + continue; } - const HostAndPort target = BackgroundSync::get()->getSyncTarget(); - // Log sync source changes. - if (target.empty()) { - if (syncTarget != target) { - syncTarget = target; - } - // Loop back around again; the keepalive functionality will cause us to retry - continue; + if (_syncTarget != target) { + _resetConnection(); + _syncTarget = target; } - - if (syncTarget != target) { - LOG(1) << "setting syncSourceFeedback to " << target; - syncTarget = target; - - // Update keepalive value from config. - auto oldKeepAliveInterval = keepAliveInterval; - keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx); - if (oldKeepAliveInterval != keepAliveInterval) { - LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval - << " (previously " << oldKeepAliveInterval << ")"; + if (!hasConnection()) { + // fix connection if need be + if (target.empty() || !_connect(txn.get(), target)) { + // Loop back around again; the keepalive functionality will cause us to retry + continue; } } - - Reporter reporter(&executor, - makePrepareReplSetUpdatePositionCommandFn(txn.get(), _mtx, syncTarget), - syncTarget, - keepAliveInterval); - { - stdx::lock_guard<stdx::mutex> lock(_mtx); - _reporter = &reporter; - } - ON_BLOCK_EXIT([this]() { - stdx::lock_guard<stdx::mutex> lock(_mtx); - _reporter = nullptr; - }); - - auto status = _updateUpstream(txn.get()); + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle oldCommandStyle = _commandStyle; + Status status = updateUpstream(txn.get(), _commandStyle); if (!status.isOK()) { - LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be " - "retried: " << status; + if (_commandStyle != oldCommandStyle) { + stdx::unique_lock<stdx::mutex> lock(_mtx); + _positionChanged = true; + } else { + log() + << (_commandStyle == + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle + ? "old style " + : "") << "updateUpstream" + << " failed: " << status << ", will retry"; + } } } } - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index 3d3ea0e4747..f3fcdca0abc 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -29,16 +29,17 @@ #pragma once -#include "mongo/base/status.h" +#include "mongo/client/constants.h" +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/net/hostandport.h" namespace mongo { -struct HostAndPort; class OperationContext; namespace repl { -class Reporter; class SyncSourceFeedback { public: @@ -57,22 +58,47 @@ public: void shutdown(); private: + void _resetConnection(); + + /** + * Authenticates _connection using the server's cluster-membership credentials. + * + * Returns true on successful authentication. + */ + bool replAuthenticate(); + /* Inform the sync target of our current position in the oplog, as well as the positions * of all secondaries chained through us. + * "commandStyle" indicates whether or not the upstream node is pre-3.2.4 and needs the older + * style + * ReplSetUpdatePosition commands as a result. */ - Status _updateUpstream(OperationContext* txn); + Status updateUpstream(OperationContext* txn, + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle); + bool hasConnection() { + return _connection.get(); + } + + /// Connect to sync target. + bool _connect(OperationContext* txn, const HostAndPort& host); + + // the member we are currently syncing from + HostAndPort _syncTarget; + // our connection to our sync target + std::unique_ptr<DBClientConnection> _connection; // protects cond, _shutdownSignaled, _keepAliveInterval, and _positionChanged. stdx::mutex _mtx; // used to alert our thread of changes which need to be passed up the chain stdx::condition_variable _cond; + /// _keepAliveInterval indicates how frequently to forward progress in the absence of updates. + Milliseconds _keepAliveInterval = Milliseconds(100); // used to indicate a position change which has not yet been pushed along bool _positionChanged = false; // Once this is set to true the _run method will terminate bool _shutdownSignaled = false; - // Reports replication progress to sync source. - Reporter* _reporter = nullptr; + // Indicates version of the UpdatePosition command accepted by our syncSource. + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle; }; - } // namespace repl } // namespace mongo |