From 35ddf24122bbdba9f6eb0ae4cdfcb2e9edd6d2de Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Tue, 23 Feb 2016 14:22:11 -0500 Subject: SERVER-23085 integrated Reporter into SyncSourceFeedback --- src/mongo/db/repl/sync_source_feedback.cpp | 275 ++++++++++++++++------------- src/mongo/db/repl/sync_source_feedback.h | 40 +---- 2 files changed, 160 insertions(+), 155 deletions(-) diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index 7ea085dcc23..43a9b077d83 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -32,23 +32,23 @@ #include "mongo/db/repl/sync_source_feedback.h" -#include "mongo/client/constants.h" -#include "mongo/client/dbclientcursor.h" -#include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_manager_global.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/auth/internal_user_auth.h" -#include "mongo/db/commands.h" -#include "mongo/db/dbhelpers.h" +#include "mongo/db/client.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/replica_set_config.h" -#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/reporter.h" #include "mongo/db/operation_context.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -57,108 +57,95 @@ using std::string; namespace repl { -void SyncSourceFeedback::_resetConnection() { - LOG(1) << "resetting connection in sync source feedback"; - _connection.reset(); - _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle; -} - -bool SyncSourceFeedback::replAuthenticate() { - if (!getGlobalAuthorizationManager()->isAuthEnabled()) - return true; +namespace { - if (!isInternalAuthSet()) - return false; - return _connection->authenticateInternalUser(); +/** + * Calculates the keep alive interval based on the current configuration in the replication + * coordinator. + */ +Milliseconds calculateKeepAliveInterval(OperationContext* txn, stdx::mutex& mtx) { + stdx::lock_guard lock(mtx); + auto replCoord = repl::ReplicationCoordinator::get(txn); + auto rsConfig = replCoord->getConfig(); + auto keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2; + return keepAliveInterval; } -bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) { - if (hasConnection()) { - return true; - } - log() << "setting syncSourceFeedback to " << host.toString(); - _connection.reset( - new DBClientConnection(false, durationCount(OplogReader::kSocketTimeout))); - string errmsg; - try { - if (!_connection->connect(host, errmsg) || - (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) { - _resetConnection(); - log() << errmsg << endl; - return false; - } - } catch (const DBException& e) { - error() << "Error connecting to " << host.toString() << ": " << e.what(); - _resetConnection(); - return false; - } +/** + * Returns function to prepare update command + */ +Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn( + OperationContext* txn, stdx::mutex& mtx, const HostAndPort& syncTarget) { + return [&mtx, syncTarget, txn]( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith { + auto currentSyncTarget = BackgroundSync::get()->getSyncTarget(); + if (currentSyncTarget != syncTarget) { + // Change in sync target + return Status(ErrorCodes::InvalidSyncSource, "Sync target is no longer valid"); + } - // Update keepalive value from config. - auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig(); - _keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2; + stdx::lock_guard lock(mtx); + auto replCoord = repl::ReplicationCoordinator::get(txn); + if (replCoord->getMemberState().primary()) { + // Primary has no one to send updates to. + return Status(ErrorCodes::InvalidSyncSource, + "Currently primary - no one to send updates to"); + } - return hasConnection(); + return replCoord->prepareReplSetUpdatePositionCommand(commandStyle); + }; } -void SyncSourceFeedback::forwardSlaveProgress() { - stdx::lock_guard lock(_mtx); - _positionChanged = true; - _cond.notify_all(); -} +} // namespace -Status SyncSourceFeedback::updateUpstream( - OperationContext* txn, ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) { - auto replCoord = repl::ReplicationCoordinator::get(txn); - if (replCoord->getMemberState().primary()) { - // Primary has no one to send updates to. - return Status::OK(); - } - BSONObj cmd; +void SyncSourceFeedback::forwardSlaveProgress() { { stdx::unique_lock lock(_mtx); - // The command could not be created, likely because this node was removed from the set. - auto prepareResult = replCoord->prepareReplSetUpdatePositionCommand(commandStyle); - if (!prepareResult.isOK()) { - return Status::OK(); + _positionChanged = true; + _cond.notify_all(); + if (_reporter) { + auto triggerStatus = _reporter->trigger(); + if (!triggerStatus.isOK()) { + warning() << "unable to forward slave progress to " << _reporter->getTarget() + << ": " << triggerStatus; + } } - cmd = prepareResult.getValue(); } - BSONObj res; - - LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd; - try { - _connection->runCommand("admin", cmd, res); - } catch (const DBException& e) { - log() << "SyncSourceFeedback error sending " - << (commandStyle == - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle - ? "old style " - : "") << "update: " << e.what(); - // Blacklist sync target for .5 seconds and find a new one. - replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); - BackgroundSync::get()->clearSyncTarget(); - _resetConnection(); - return e.toStatus(); +} + +Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) { + Reporter* reporter; + { + stdx::lock_guard lock(_mtx); + reporter = _reporter; + } + + auto syncTarget = reporter->getTarget(); + + auto triggerStatus = reporter->trigger(); + if (!triggerStatus.isOK()) { + warning() << "unable to schedule reporter to update replication progress on " << syncTarget + << ": " << triggerStatus; + return triggerStatus; } - Status status = Command::getStatusFromCommandResult(res); + auto status = reporter->join(); + if (!status.isOK()) { - log() << "SyncSourceFeedback error sending " - << (commandStyle == - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle - ? "old style " - : "") << "update, response: " << res.toString(); - if (status == ErrorCodes::BadValue && - commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) { - log() << "SyncSourceFeedback falling back to old style UpdatePosition command"; - _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle; - } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() || - res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) { - // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due - // to the syncsource having a newer config. - replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500)); + log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status; + + // Some errors should not cause result in blacklisting the sync source. + if (status != ErrorCodes::InvalidSyncSource) { + // The command could not be created because the node is now primary. + } else if (status != ErrorCodes::NodeNotFound) { + // The command could not be created, likely because this node was removed from the set. + } else { + // Blacklist sync target for .5 seconds and find a new one. + stdx::lock_guard lock(_mtx); + auto replCoord = repl::ReplicationCoordinator::get(txn); + replCoord->blacklistSyncSource(syncTarget, Date_t::now() + Milliseconds(500)); BackgroundSync::get()->clearSyncTarget(); - _resetConnection(); } } @@ -167,6 +154,9 @@ Status SyncSourceFeedback::updateUpstream( void SyncSourceFeedback::shutdown() { stdx::unique_lock lock(_mtx); + if (_reporter) { + _reporter->shutdown(); + } _shutdownSignaled = true; _cond.notify_all(); } @@ -174,12 +164,35 @@ void SyncSourceFeedback::shutdown() { void SyncSourceFeedback::run() { Client::initThread("SyncSourceFeedback"); + // Task executor used to run replSetUpdatePosition command on sync source. + auto net = executor::makeNetworkInterface("NetworkInterfaceASIO-SyncSourceFeedback"); + auto pool = stdx::make_unique(net.get()); + executor::ThreadPoolTaskExecutor executor(std::move(pool), std::move(net)); + executor.startup(); + ON_BLOCK_EXIT([&executor]() { + executor.shutdown(); + executor.join(); + }); + + HostAndPort syncTarget; + + // keepAliveInterval indicates how frequently to forward progress in the absence of updates. + Milliseconds keepAliveInterval(0); + while (true) { // breaks once _shutdownSignaled is true auto txn = cc().makeOperationContext(); + + if (keepAliveInterval == Milliseconds(0)) { + keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx); + } + { + // Take SyncSourceFeedback lock before calling into ReplicationCoordinator + // to avoid deadlock because ReplicationCoordinator could conceivably calling back into + // this class. stdx::unique_lock lock(_mtx); while (!_positionChanged && !_shutdownSignaled) { - if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) { + if (_cond.wait_for(lock, keepAliveInterval) == stdx::cv_status::timeout) { MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); if (!(state.primary() || state.startup())) { break; @@ -194,39 +207,57 @@ void SyncSourceFeedback::run() { _positionChanged = false; } - MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); - if (state.primary() || state.startup()) { - _resetConnection(); - continue; + { + stdx::lock_guard lock(_mtx); + MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); + if (state.primary() || state.startup()) { + continue; + } } + const HostAndPort target = BackgroundSync::get()->getSyncTarget(); - if (_syncTarget != target) { - _resetConnection(); - _syncTarget = target; + // Log sync source changes. + if (target.empty()) { + if (syncTarget != target) { + syncTarget = target; + } + // Loop back around again; the keepalive functionality will cause us to retry + continue; } - if (!hasConnection()) { - // fix connection if need be - if (target.empty() || !_connect(txn.get(), target)) { - // Loop back around again; the keepalive functionality will cause us to retry - continue; + + if (syncTarget != target) { + LOG(1) << "setting syncSourceFeedback to " << target; + syncTarget = target; + + // Update keepalive value from config. + auto oldKeepAliveInterval = keepAliveInterval; + keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx); + if (oldKeepAliveInterval != keepAliveInterval) { + LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval + << " (previously " << oldKeepAliveInterval << ")"; } } - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle oldCommandStyle = _commandStyle; - Status status = updateUpstream(txn.get(), _commandStyle); + + Reporter reporter(&executor, + makePrepareReplSetUpdatePositionCommandFn(txn.get(), _mtx, syncTarget), + syncTarget, + keepAliveInterval); + { + stdx::lock_guard lock(_mtx); + _reporter = &reporter; + } + ON_BLOCK_EXIT([this]() { + stdx::lock_guard lock(_mtx); + _reporter = nullptr; + }); + + auto status = _updateUpstream(txn.get()); if (!status.isOK()) { - if (_commandStyle != oldCommandStyle) { - stdx::unique_lock lock(_mtx); - _positionChanged = true; - } else { - log() - << (_commandStyle == - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle - ? "old style " - : "") << "updateUpstream" - << " failed: " << status << ", will retry"; - } + LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be " + "retried: " << status; } } } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index f3fcdca0abc..3d3ea0e4747 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -29,17 +29,16 @@ #pragma once -#include "mongo/client/constants.h" -#include "mongo/client/dbclientcursor.h" -#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/base/status.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/net/hostandport.h" namespace mongo { +struct HostAndPort; class OperationContext; namespace repl { +class Reporter; class SyncSourceFeedback { public: @@ -58,47 +57,22 @@ public: void shutdown(); private: - void _resetConnection(); - - /** - * Authenticates _connection using the server's cluster-membership credentials. - * - * Returns true on successful authentication. - */ - bool replAuthenticate(); - /* Inform the sync target of our current position in the oplog, as well as the positions * of all secondaries chained through us. - * "commandStyle" indicates whether or not the upstream node is pre-3.2.4 and needs the older - * style - * ReplSetUpdatePosition commands as a result. */ - Status updateUpstream(OperationContext* txn, - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle); + Status _updateUpstream(OperationContext* txn); - bool hasConnection() { - return _connection.get(); - } - - /// Connect to sync target. - bool _connect(OperationContext* txn, const HostAndPort& host); - - // the member we are currently syncing from - HostAndPort _syncTarget; - // our connection to our sync target - std::unique_ptr _connection; // protects cond, _shutdownSignaled, _keepAliveInterval, and _positionChanged. stdx::mutex _mtx; // used to alert our thread of changes which need to be passed up the chain stdx::condition_variable _cond; - /// _keepAliveInterval indicates how frequently to forward progress in the absence of updates. - Milliseconds _keepAliveInterval = Milliseconds(100); // used to indicate a position change which has not yet been pushed along bool _positionChanged = false; // Once this is set to true the _run method will terminate bool _shutdownSignaled = false; - // Indicates version of the UpdatePosition command accepted by our syncSource. - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle; + // Reports replication progress to sync source. + Reporter* _reporter = nullptr; }; + } // namespace repl } // namespace mongo -- cgit v1.2.1