summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-02-23 14:22:11 -0500
committerBenety Goh <benety@mongodb.com>2016-03-16 10:38:27 -0400
commit35ddf24122bbdba9f6eb0ae4cdfcb2e9edd6d2de (patch)
treeb66c2f17a47ee1b10233d7a2c011b2352803767e
parent0f70440c1d61ecce39d54fb176cd0280b82d030c (diff)
downloadmongo-35ddf24122bbdba9f6eb0ae4cdfcb2e9edd6d2de.tar.gz
SERVER-23085 integrated Reporter into SyncSourceFeedback
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp275
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h40
2 files changed, 160 insertions, 155 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 7ea085dcc23..43a9b077d83 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -32,23 +32,23 @@
#include "mongo/db/repl/sync_source_feedback.h"
-#include "mongo/client/constants.h"
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_manager_global.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/auth/internal_user_auth.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/dbhelpers.h"
+#include "mongo/db/client.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
-#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replica_set_config.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/reporter.h"
#include "mongo/db/operation_context.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/scopeguard.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -57,108 +57,95 @@ using std::string;
namespace repl {
-void SyncSourceFeedback::_resetConnection() {
- LOG(1) << "resetting connection in sync source feedback";
- _connection.reset();
- _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle;
-}
-
-bool SyncSourceFeedback::replAuthenticate() {
- if (!getGlobalAuthorizationManager()->isAuthEnabled())
- return true;
+namespace {
- if (!isInternalAuthSet())
- return false;
- return _connection->authenticateInternalUser();
+/**
+ * Calculates the keep alive interval based on the current configuration in the replication
+ * coordinator.
+ */
+Milliseconds calculateKeepAliveInterval(OperationContext* txn, stdx::mutex& mtx) {
+ stdx::lock_guard<stdx::mutex> lock(mtx);
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ auto rsConfig = replCoord->getConfig();
+ auto keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2;
+ return keepAliveInterval;
}
-bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) {
- if (hasConnection()) {
- return true;
- }
- log() << "setting syncSourceFeedback to " << host.toString();
- _connection.reset(
- new DBClientConnection(false, durationCount<Seconds>(OplogReader::kSocketTimeout)));
- string errmsg;
- try {
- if (!_connection->connect(host, errmsg) ||
- (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) {
- _resetConnection();
- log() << errmsg << endl;
- return false;
- }
- } catch (const DBException& e) {
- error() << "Error connecting to " << host.toString() << ": " << e.what();
- _resetConnection();
- return false;
- }
+/**
+ * Returns function to prepare update command
+ */
+Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn(
+ OperationContext* txn, stdx::mutex& mtx, const HostAndPort& syncTarget) {
+ return [&mtx, syncTarget, txn](
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> {
+ auto currentSyncTarget = BackgroundSync::get()->getSyncTarget();
+ if (currentSyncTarget != syncTarget) {
+ // Change in sync target
+ return Status(ErrorCodes::InvalidSyncSource, "Sync target is no longer valid");
+ }
- // Update keepalive value from config.
- auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig();
- _keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2;
+ stdx::lock_guard<stdx::mutex> lock(mtx);
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ if (replCoord->getMemberState().primary()) {
+ // Primary has no one to send updates to.
+ return Status(ErrorCodes::InvalidSyncSource,
+ "Currently primary - no one to send updates to");
+ }
- return hasConnection();
+ return replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
+ };
}
-void SyncSourceFeedback::forwardSlaveProgress() {
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- _cond.notify_all();
-}
+} // namespace
-Status SyncSourceFeedback::updateUpstream(
- OperationContext* txn, ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) {
- auto replCoord = repl::ReplicationCoordinator::get(txn);
- if (replCoord->getMemberState().primary()) {
- // Primary has no one to send updates to.
- return Status::OK();
- }
- BSONObj cmd;
+void SyncSourceFeedback::forwardSlaveProgress() {
{
stdx::unique_lock<stdx::mutex> lock(_mtx);
- // The command could not be created, likely because this node was removed from the set.
- auto prepareResult = replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
- if (!prepareResult.isOK()) {
- return Status::OK();
+ _positionChanged = true;
+ _cond.notify_all();
+ if (_reporter) {
+ auto triggerStatus = _reporter->trigger();
+ if (!triggerStatus.isOK()) {
+ warning() << "unable to forward slave progress to " << _reporter->getTarget()
+ << ": " << triggerStatus;
+ }
}
- cmd = prepareResult.getValue();
}
- BSONObj res;
-
- LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd;
- try {
- _connection->runCommand("admin", cmd, res);
- } catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending "
- << (commandStyle ==
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
- ? "old style "
- : "") << "update: " << e.what();
- // Blacklist sync target for .5 seconds and find a new one.
- replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
- BackgroundSync::get()->clearSyncTarget();
- _resetConnection();
- return e.toStatus();
+}
+
+Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) {
+ Reporter* reporter;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ reporter = _reporter;
+ }
+
+ auto syncTarget = reporter->getTarget();
+
+ auto triggerStatus = reporter->trigger();
+ if (!triggerStatus.isOK()) {
+ warning() << "unable to schedule reporter to update replication progress on " << syncTarget
+ << ": " << triggerStatus;
+ return triggerStatus;
}
- Status status = Command::getStatusFromCommandResult(res);
+ auto status = reporter->join();
+
if (!status.isOK()) {
- log() << "SyncSourceFeedback error sending "
- << (commandStyle ==
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
- ? "old style "
- : "") << "update, response: " << res.toString();
- if (status == ErrorCodes::BadValue &&
- commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) {
- log() << "SyncSourceFeedback falling back to old style UpdatePosition command";
- _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle;
- } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() ||
- res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) {
- // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due
- // to the syncsource having a newer config.
- replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
+ log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status;
+
+ // Some errors should not cause result in blacklisting the sync source.
+ if (status != ErrorCodes::InvalidSyncSource) {
+ // The command could not be created because the node is now primary.
+ } else if (status != ErrorCodes::NodeNotFound) {
+ // The command could not be created, likely because this node was removed from the set.
+ } else {
+ // Blacklist sync target for .5 seconds and find a new one.
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ replCoord->blacklistSyncSource(syncTarget, Date_t::now() + Milliseconds(500));
BackgroundSync::get()->clearSyncTarget();
- _resetConnection();
}
}
@@ -167,6 +154,9 @@ Status SyncSourceFeedback::updateUpstream(
void SyncSourceFeedback::shutdown() {
stdx::unique_lock<stdx::mutex> lock(_mtx);
+ if (_reporter) {
+ _reporter->shutdown();
+ }
_shutdownSignaled = true;
_cond.notify_all();
}
@@ -174,12 +164,35 @@ void SyncSourceFeedback::shutdown() {
void SyncSourceFeedback::run() {
Client::initThread("SyncSourceFeedback");
+ // Task executor used to run replSetUpdatePosition command on sync source.
+ auto net = executor::makeNetworkInterface("NetworkInterfaceASIO-SyncSourceFeedback");
+ auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
+ executor::ThreadPoolTaskExecutor executor(std::move(pool), std::move(net));
+ executor.startup();
+ ON_BLOCK_EXIT([&executor]() {
+ executor.shutdown();
+ executor.join();
+ });
+
+ HostAndPort syncTarget;
+
+ // keepAliveInterval indicates how frequently to forward progress in the absence of updates.
+ Milliseconds keepAliveInterval(0);
+
while (true) { // breaks once _shutdownSignaled is true
auto txn = cc().makeOperationContext();
+
+ if (keepAliveInterval == Milliseconds(0)) {
+ keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx);
+ }
+
{
+ // Take SyncSourceFeedback lock before calling into ReplicationCoordinator
+ // to avoid deadlock because ReplicationCoordinator could conceivably calling back into
+ // this class.
stdx::unique_lock<stdx::mutex> lock(_mtx);
while (!_positionChanged && !_shutdownSignaled) {
- if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) {
+ if (_cond.wait_for(lock, keepAliveInterval) == stdx::cv_status::timeout) {
MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
if (!(state.primary() || state.startup())) {
break;
@@ -194,39 +207,57 @@ void SyncSourceFeedback::run() {
_positionChanged = false;
}
- MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
- if (state.primary() || state.startup()) {
- _resetConnection();
- continue;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
+ if (state.primary() || state.startup()) {
+ continue;
+ }
}
+
const HostAndPort target = BackgroundSync::get()->getSyncTarget();
- if (_syncTarget != target) {
- _resetConnection();
- _syncTarget = target;
+ // Log sync source changes.
+ if (target.empty()) {
+ if (syncTarget != target) {
+ syncTarget = target;
+ }
+ // Loop back around again; the keepalive functionality will cause us to retry
+ continue;
}
- if (!hasConnection()) {
- // fix connection if need be
- if (target.empty() || !_connect(txn.get(), target)) {
- // Loop back around again; the keepalive functionality will cause us to retry
- continue;
+
+ if (syncTarget != target) {
+ LOG(1) << "setting syncSourceFeedback to " << target;
+ syncTarget = target;
+
+ // Update keepalive value from config.
+ auto oldKeepAliveInterval = keepAliveInterval;
+ keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx);
+ if (oldKeepAliveInterval != keepAliveInterval) {
+ LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval
+ << " (previously " << oldKeepAliveInterval << ")";
}
}
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle oldCommandStyle = _commandStyle;
- Status status = updateUpstream(txn.get(), _commandStyle);
+
+ Reporter reporter(&executor,
+ makePrepareReplSetUpdatePositionCommandFn(txn.get(), _mtx, syncTarget),
+ syncTarget,
+ keepAliveInterval);
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ _reporter = &reporter;
+ }
+ ON_BLOCK_EXIT([this]() {
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ _reporter = nullptr;
+ });
+
+ auto status = _updateUpstream(txn.get());
if (!status.isOK()) {
- if (_commandStyle != oldCommandStyle) {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- } else {
- log()
- << (_commandStyle ==
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
- ? "old style "
- : "") << "updateUpstream"
- << " failed: " << status << ", will retry";
- }
+ LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be "
+ "retried: " << status;
}
}
}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index f3fcdca0abc..3d3ea0e4747 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -29,17 +29,16 @@
#pragma once
-#include "mongo/client/constants.h"
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/base/status.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/net/hostandport.h"
namespace mongo {
+struct HostAndPort;
class OperationContext;
namespace repl {
+class Reporter;
class SyncSourceFeedback {
public:
@@ -58,47 +57,22 @@ public:
void shutdown();
private:
- void _resetConnection();
-
- /**
- * Authenticates _connection using the server's cluster-membership credentials.
- *
- * Returns true on successful authentication.
- */
- bool replAuthenticate();
-
/* Inform the sync target of our current position in the oplog, as well as the positions
* of all secondaries chained through us.
- * "commandStyle" indicates whether or not the upstream node is pre-3.2.4 and needs the older
- * style
- * ReplSetUpdatePosition commands as a result.
*/
- Status updateUpstream(OperationContext* txn,
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle);
+ Status _updateUpstream(OperationContext* txn);
- bool hasConnection() {
- return _connection.get();
- }
-
- /// Connect to sync target.
- bool _connect(OperationContext* txn, const HostAndPort& host);
-
- // the member we are currently syncing from
- HostAndPort _syncTarget;
- // our connection to our sync target
- std::unique_ptr<DBClientConnection> _connection;
// protects cond, _shutdownSignaled, _keepAliveInterval, and _positionChanged.
stdx::mutex _mtx;
// used to alert our thread of changes which need to be passed up the chain
stdx::condition_variable _cond;
- /// _keepAliveInterval indicates how frequently to forward progress in the absence of updates.
- Milliseconds _keepAliveInterval = Milliseconds(100);
// used to indicate a position change which has not yet been pushed along
bool _positionChanged = false;
// Once this is set to true the _run method will terminate
bool _shutdownSignaled = false;
- // Indicates version of the UpdatePosition command accepted by our syncSource.
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle;
+ // Reports replication progress to sync source.
+ Reporter* _reporter = nullptr;
};
+
} // namespace repl
} // namespace mongo