summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-03-10 21:30:24 -0500
committerBenety Goh <benety@mongodb.com>2016-03-10 21:30:24 -0500
commit81e73bae1413668fc53183ac693d87341561dc0f (patch)
tree94aa6a4b25e528fc39abe59899098a14e12f688f
parent052c463c90eb19e73f2e9b2f6fbd4eb669c83855 (diff)
downloadmongo-81e73bae1413668fc53183ac693d87341561dc0f.tar.gz
Revert "SERVER-18029 integrated Reporter into SyncSourceFeedback"
This reverts commit 052c463c90eb19e73f2e9b2f6fbd4eb669c83855.
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp275
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h40
2 files changed, 155 insertions, 160 deletions
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 43a9b077d83..7ea085dcc23 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -32,23 +32,23 @@
#include "mongo/db/repl/sync_source_feedback.h"
-#include "mongo/db/client.h"
+#include "mongo/client/constants.h"
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/auth/authorization_manager.h"
+#include "mongo/db/auth/authorization_manager_global.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/internal_user_auth.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replica_set_config.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/operation_context.h"
-#include "mongo/executor/network_interface_factory.h"
-#include "mongo/executor/network_interface_thread_pool.h"
-#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/time_support.h"
namespace mongo {
@@ -57,95 +57,108 @@ using std::string;
namespace repl {
-namespace {
+void SyncSourceFeedback::_resetConnection() {
+ LOG(1) << "resetting connection in sync source feedback";
+ _connection.reset();
+ _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle;
+}
-/**
- * Calculates the keep alive interval based on the current configuration in the replication
- * coordinator.
- */
-Milliseconds calculateKeepAliveInterval(OperationContext* txn, stdx::mutex& mtx) {
- stdx::lock_guard<stdx::mutex> lock(mtx);
- auto replCoord = repl::ReplicationCoordinator::get(txn);
- auto rsConfig = replCoord->getConfig();
- auto keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2;
- return keepAliveInterval;
+bool SyncSourceFeedback::replAuthenticate() {
+ if (!getGlobalAuthorizationManager()->isAuthEnabled())
+ return true;
+
+ if (!isInternalAuthSet())
+ return false;
+ return _connection->authenticateInternalUser();
}
-/**
- * Returns function to prepare update command
- */
-Reporter::PrepareReplSetUpdatePositionCommandFn makePrepareReplSetUpdatePositionCommandFn(
- OperationContext* txn, stdx::mutex& mtx, const HostAndPort& syncTarget) {
- return [&mtx, syncTarget, txn](
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
- -> StatusWith<BSONObj> {
- auto currentSyncTarget = BackgroundSync::get()->getSyncTarget();
- if (currentSyncTarget != syncTarget) {
- // Change in sync target
- return Status(ErrorCodes::InvalidSyncSource, "Sync target is no longer valid");
- }
+bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) {
+ if (hasConnection()) {
+ return true;
+ }
+ log() << "setting syncSourceFeedback to " << host.toString();
+ _connection.reset(
+ new DBClientConnection(false, durationCount<Seconds>(OplogReader::kSocketTimeout)));
+ string errmsg;
+ try {
+ if (!_connection->connect(host, errmsg) ||
+ (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate())) {
+ _resetConnection();
+ log() << errmsg << endl;
+ return false;
+ }
+ } catch (const DBException& e) {
+ error() << "Error connecting to " << host.toString() << ": " << e.what();
+ _resetConnection();
+ return false;
+ }
- stdx::lock_guard<stdx::mutex> lock(mtx);
- auto replCoord = repl::ReplicationCoordinator::get(txn);
- if (replCoord->getMemberState().primary()) {
- // Primary has no one to send updates to.
- return Status(ErrorCodes::InvalidSyncSource,
- "Currently primary - no one to send updates to");
- }
+ // Update keepalive value from config.
+ auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig();
+ _keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2;
- return replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
- };
+ return hasConnection();
}
-} // namespace
-
void SyncSourceFeedback::forwardSlaveProgress() {
- {
- stdx::unique_lock<stdx::mutex> lock(_mtx);
- _positionChanged = true;
- _cond.notify_all();
- if (_reporter) {
- auto triggerStatus = _reporter->trigger();
- if (!triggerStatus.isOK()) {
- warning() << "unable to forward slave progress to " << _reporter->getTarget()
- << ": " << triggerStatus;
- }
- }
- }
+ stdx::lock_guard<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
+ _cond.notify_all();
}
-Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) {
- Reporter* reporter;
+Status SyncSourceFeedback::updateUpstream(
+ OperationContext* txn, ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) {
+ auto replCoord = repl::ReplicationCoordinator::get(txn);
+ if (replCoord->getMemberState().primary()) {
+ // Primary has no one to send updates to.
+ return Status::OK();
+ }
+ BSONObj cmd;
{
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- reporter = _reporter;
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ // The command could not be created, likely because this node was removed from the set.
+ auto prepareResult = replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
+ if (!prepareResult.isOK()) {
+ return Status::OK();
+ }
+ cmd = prepareResult.getValue();
}
-
- auto syncTarget = reporter->getTarget();
-
- auto triggerStatus = reporter->trigger();
- if (!triggerStatus.isOK()) {
- warning() << "unable to schedule reporter to update replication progress on " << syncTarget
- << ": " << triggerStatus;
- return triggerStatus;
+ BSONObj res;
+
+ LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd;
+ try {
+ _connection->runCommand("admin", cmd, res);
+ } catch (const DBException& e) {
+ log() << "SyncSourceFeedback error sending "
+ << (commandStyle ==
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
+ ? "old style "
+ : "") << "update: " << e.what();
+ // Blacklist sync target for .5 seconds and find a new one.
+ replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
+ BackgroundSync::get()->clearSyncTarget();
+ _resetConnection();
+ return e.toStatus();
}
- auto status = reporter->join();
-
+ Status status = Command::getStatusFromCommandResult(res);
if (!status.isOK()) {
- log() << "SyncSourceFeedback error sending update to " << syncTarget << ": " << status;
-
- // Some errors should not cause result in blacklisting the sync source.
- if (status != ErrorCodes::InvalidSyncSource) {
- // The command could not be created because the node is now primary.
- } else if (status != ErrorCodes::NodeNotFound) {
- // The command could not be created, likely because this node was removed from the set.
- } else {
- // Blacklist sync target for .5 seconds and find a new one.
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- auto replCoord = repl::ReplicationCoordinator::get(txn);
- replCoord->blacklistSyncSource(syncTarget, Date_t::now() + Milliseconds(500));
+ log() << "SyncSourceFeedback error sending "
+ << (commandStyle ==
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
+ ? "old style "
+ : "") << "update, response: " << res.toString();
+ if (status == ErrorCodes::BadValue &&
+ commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) {
+ log() << "SyncSourceFeedback falling back to old style UpdatePosition command";
+ _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle;
+ } else if (status != ErrorCodes::InvalidReplicaSetConfig || res["configVersion"].eoo() ||
+ res["configVersion"].numberLong() < replCoord->getConfig().getConfigVersion()) {
+ // Blacklist sync target for .5 seconds and find a new one, unless we were rejected due
+ // to the syncsource having a newer config.
+ replCoord->blacklistSyncSource(_syncTarget, Date_t::now() + Milliseconds(500));
BackgroundSync::get()->clearSyncTarget();
+ _resetConnection();
}
}
@@ -154,9 +167,6 @@ Status SyncSourceFeedback::_updateUpstream(OperationContext* txn) {
void SyncSourceFeedback::shutdown() {
stdx::unique_lock<stdx::mutex> lock(_mtx);
- if (_reporter) {
- _reporter->shutdown();
- }
_shutdownSignaled = true;
_cond.notify_all();
}
@@ -164,35 +174,12 @@ void SyncSourceFeedback::shutdown() {
void SyncSourceFeedback::run() {
Client::initThread("SyncSourceFeedback");
- // Task executor used to run replSetUpdatePosition command on sync source.
- auto net = executor::makeNetworkInterface("NetworkInterfaceASIO-SyncSourceFeedback");
- auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
- executor::ThreadPoolTaskExecutor executor(std::move(pool), std::move(net));
- executor.startup();
- ON_BLOCK_EXIT([&executor]() {
- executor.shutdown();
- executor.join();
- });
-
- HostAndPort syncTarget;
-
- // keepAliveInterval indicates how frequently to forward progress in the absence of updates.
- Milliseconds keepAliveInterval(0);
-
while (true) { // breaks once _shutdownSignaled is true
auto txn = cc().makeOperationContext();
-
- if (keepAliveInterval == Milliseconds(0)) {
- keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx);
- }
-
{
- // Take SyncSourceFeedback lock before calling into ReplicationCoordinator
- // to avoid deadlock because ReplicationCoordinator could conceivably calling back into
- // this class.
stdx::unique_lock<stdx::mutex> lock(_mtx);
while (!_positionChanged && !_shutdownSignaled) {
- if (_cond.wait_for(lock, keepAliveInterval) == stdx::cv_status::timeout) {
+ if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) {
MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
if (!(state.primary() || state.startup())) {
break;
@@ -207,57 +194,39 @@ void SyncSourceFeedback::run() {
_positionChanged = false;
}
- {
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
- if (state.primary() || state.startup()) {
- continue;
- }
+ MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
+ if (state.primary() || state.startup()) {
+ _resetConnection();
+ continue;
}
-
const HostAndPort target = BackgroundSync::get()->getSyncTarget();
- // Log sync source changes.
- if (target.empty()) {
- if (syncTarget != target) {
- syncTarget = target;
- }
- // Loop back around again; the keepalive functionality will cause us to retry
- continue;
+ if (_syncTarget != target) {
+ _resetConnection();
+ _syncTarget = target;
}
-
- if (syncTarget != target) {
- LOG(1) << "setting syncSourceFeedback to " << target;
- syncTarget = target;
-
- // Update keepalive value from config.
- auto oldKeepAliveInterval = keepAliveInterval;
- keepAliveInterval = calculateKeepAliveInterval(txn.get(), _mtx);
- if (oldKeepAliveInterval != keepAliveInterval) {
- LOG(1) << "new syncSourceFeedback keep alive duration = " << keepAliveInterval
- << " (previously " << oldKeepAliveInterval << ")";
+ if (!hasConnection()) {
+ // fix connection if need be
+ if (target.empty() || !_connect(txn.get(), target)) {
+ // Loop back around again; the keepalive functionality will cause us to retry
+ continue;
}
}
-
- Reporter reporter(&executor,
- makePrepareReplSetUpdatePositionCommandFn(txn.get(), _mtx, syncTarget),
- syncTarget,
- keepAliveInterval);
- {
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- _reporter = &reporter;
- }
- ON_BLOCK_EXIT([this]() {
- stdx::lock_guard<stdx::mutex> lock(_mtx);
- _reporter = nullptr;
- });
-
- auto status = _updateUpstream(txn.get());
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle oldCommandStyle = _commandStyle;
+ Status status = updateUpstream(txn.get(), _commandStyle);
if (!status.isOK()) {
- LOG(1) << "The replication progress command (replSetUpdatePosition) failed and will be "
- "retried: " << status;
+ if (_commandStyle != oldCommandStyle) {
+ stdx::unique_lock<stdx::mutex> lock(_mtx);
+ _positionChanged = true;
+ } else {
+ log()
+ << (_commandStyle ==
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle
+ ? "old style "
+ : "") << "updateUpstream"
+ << " failed: " << status << ", will retry";
+ }
}
}
}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index 3d3ea0e4747..f3fcdca0abc 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -29,16 +29,17 @@
#pragma once
-#include "mongo/base/status.h"
+#include "mongo/client/constants.h"
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/net/hostandport.h"
namespace mongo {
-struct HostAndPort;
class OperationContext;
namespace repl {
-class Reporter;
class SyncSourceFeedback {
public:
@@ -57,22 +58,47 @@ public:
void shutdown();
private:
+ void _resetConnection();
+
+ /**
+ * Authenticates _connection using the server's cluster-membership credentials.
+ *
+ * Returns true on successful authentication.
+ */
+ bool replAuthenticate();
+
/* Inform the sync target of our current position in the oplog, as well as the positions
* of all secondaries chained through us.
+ * "commandStyle" indicates whether or not the upstream node is pre-3.2.4 and needs the older
+ * style
+ * ReplSetUpdatePosition commands as a result.
*/
- Status _updateUpstream(OperationContext* txn);
+ Status updateUpstream(OperationContext* txn,
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle);
+ bool hasConnection() {
+ return _connection.get();
+ }
+
+ /// Connect to sync target.
+ bool _connect(OperationContext* txn, const HostAndPort& host);
+
+ // the member we are currently syncing from
+ HostAndPort _syncTarget;
+ // our connection to our sync target
+ std::unique_ptr<DBClientConnection> _connection;
// protects cond, _shutdownSignaled, _keepAliveInterval, and _positionChanged.
stdx::mutex _mtx;
// used to alert our thread of changes which need to be passed up the chain
stdx::condition_variable _cond;
+ /// _keepAliveInterval indicates how frequently to forward progress in the absence of updates.
+ Milliseconds _keepAliveInterval = Milliseconds(100);
// used to indicate a position change which has not yet been pushed along
bool _positionChanged = false;
// Once this is set to true the _run method will terminate
bool _shutdownSignaled = false;
- // Reports replication progress to sync source.
- Reporter* _reporter = nullptr;
+ // Indicates version of the UpdatePosition command accepted by our syncSource.
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle;
};
-
} // namespace repl
} // namespace mongo