diff options
author | Pavithra Vetriselvan <pavithra.vetriselvan@mongodb.com> | 2017-10-24 13:52:04 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-11-17 15:14:15 -0500 |
commit | 78a87f7580736c7fbe7ee48d8c7c00716fbc913e (patch) | |
tree | 0bcd6c3d2a57af98131ca27e21c8deb112e4efda | |
parent | 85eec1d61a2bc6a7b7a1391fd787ad00eadc5060 (diff) | |
download | mongo-78a87f7580736c7fbe7ee48d8c7c00716fbc913e.tar.gz |
SERVER-31589 Replaces OpContext pts with ReplicationCoordinator ptrs.
(cherry picked from commit 608f38a4fb0de96546dd0aa3eea2f244bbcf2bef)
9 files changed, 26 insertions, 22 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index de8ee0011e1..27b65898f19 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -50,6 +50,7 @@ namespace repl { class LastVote; class ReplSettings; +class ReplicationCoordinator; /** * This class represents the interface the ReplicationCoordinator uses to interact with the @@ -69,7 +70,7 @@ public: * * NOTE: Only starts threads if they are not already started, */ - virtual void startThreads(const ReplSettings& settings) = 0; + virtual void startThreads(const ReplSettings& settings, ReplicationCoordinator* replCoord) = 0; /** * Starts the Master/Slave threads and sets up logOp diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index f0e4cb4e0ee..2e336dcd3bc 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -96,7 +96,8 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl : _startedThreads(false), _nextThreadId(0) {} ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} -void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) { +void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings, + ReplicationCoordinator* replCoord) { stdx::lock_guard<stdx::mutex> lk(_threadMutex); if (_startedThreads) { return; @@ -106,7 +107,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s BackgroundSync* bgsync = BackgroundSync::get(); _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); _syncSourceFeedbackThread.reset( - new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); + new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback, replCoord))); if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) { _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index fb3135b8592..90d2fd5edf5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -51,7 +51,8 @@ class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExt public: ReplicationCoordinatorExternalStateImpl(); virtual ~ReplicationCoordinatorExternalStateImpl(); - virtual void startThreads(const ReplSettings& settings) override; + virtual void startThreads(const ReplSettings& settings, + ReplicationCoordinator* replCoord) override; virtual void startMasterSlave(OperationContext* txn); virtual void shutdown(OperationContext* txn); virtual Status initializeReplSetStorage(OperationContext* txn, diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index a05a7e7ae06..d10d73205d4 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -58,7 +58,8 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {} -void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& settings) { +void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& settings, + ReplicationCoordinator* replCoord) { _threadsStarted = true; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 1434fc34bab..a26d5273c9d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -52,7 +52,8 @@ public: ReplicationCoordinatorExternalStateMock(); virtual ~ReplicationCoordinatorExternalStateMock(); - virtual void startThreads(const ReplSettings& settings) override; + virtual void startThreads(const ReplSettings& settings, + ReplicationCoordinator* replCoord) override; virtual void startMasterSlave(OperationContext*); virtual void shutdown(OperationContext*); virtual Status initializeReplSetStorage(OperationContext* txn, diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a16abdbceb5..e0e88d1a92a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -475,7 +475,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } _performPostMemberStateUpdateAction(action); if (!isArbiter) { - _externalState->startThreads(_settings); + _externalState->startThreads(_settings, this); } } @@ -2532,7 +2532,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, // A configuration passed to replSetInitiate() with the current node as an arbiter // will fail validation with a "replSet initiate got ... while validating" reason. invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter()); - _externalState->startThreads(_settings); + _externalState->startThreads(_settings, this); } return Status::OK(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 8a3d9532d1a..6c781f982d3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -483,7 +483,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && newConfig.getMemberAt(myIndex.getValue()).isArbiter(); if (!isArbiter) { - _externalState->startThreads(_settings); + _externalState->startThreads(_settings, this); } } diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index beb73bebbdd..8b7a1e392fa 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -72,7 +72,7 @@ bool SyncSourceFeedback::replAuthenticate() { return _connection->authenticateInternalUser(); } -bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) { +bool SyncSourceFeedback::_connect(const ReplicaSetConfig& rsConfig, const HostAndPort& host) { if (hasConnection()) { return true; } @@ -94,7 +94,6 @@ bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host } // Update keepalive value from config. - auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig(); _keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2; return hasConnection(); @@ -106,8 +105,7 @@ void SyncSourceFeedback::forwardSlaveProgress() { _cond.notify_all(); } -Status SyncSourceFeedback::updateUpstream(OperationContext* txn, bool oldStyle) { - auto replCoord = repl::ReplicationCoordinator::get(txn); +Status SyncSourceFeedback::updateUpstream(ReplicationCoordinator* replCoord, bool oldStyle) { if (replCoord->getMemberState().primary()) { // Primary has no one to send updates to. return Status::OK(); @@ -164,16 +162,16 @@ void SyncSourceFeedback::shutdown() { _cond.notify_all(); } -void SyncSourceFeedback::run() { +void SyncSourceFeedback::run(ReplicationCoordinator* replCoord) { Client::initThread("SyncSourceFeedback"); while (true) { // breaks once _shutdownSignaled is true - auto txn = cc().makeOperationContext(); + { stdx::unique_lock<stdx::mutex> lock(_mtx); while (!_positionChanged && !_shutdownSignaled) { if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) { - MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); + MemberState state = replCoord->getMemberState(); if (!(state.primary() || state.startup())) { break; } @@ -187,7 +185,7 @@ void SyncSourceFeedback::run() { _positionChanged = false; } - MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState(); + MemberState state = replCoord->getMemberState(); if (state.primary() || state.startup()) { _resetConnection(); continue; @@ -199,13 +197,13 @@ void SyncSourceFeedback::run() { } if (!hasConnection()) { // fix connection if need be - if (target.empty() || !_connect(txn.get(), target)) { + if (target.empty() || !_connect(replCoord->getConfig(), target)) { // Loop back around again; the keepalive functionality will cause us to retry continue; } } bool oldFallBackValue = _fallBackToOldUpdatePosition; - Status status = updateUpstream(txn.get(), _fallBackToOldUpdatePosition); + Status status = updateUpstream(replCoord, _fallBackToOldUpdatePosition); if (!status.isOK()) { if (_fallBackToOldUpdatePosition != oldFallBackValue) { stdx::unique_lock<stdx::mutex> lock(_mtx); diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index ed45b59a752..51938fb0142 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -31,6 +31,7 @@ #include "mongo/client/constants.h" #include "mongo/client/dbclientcursor.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" @@ -51,7 +52,7 @@ public: * update occurs within the _keepAliveInterval, progress is forwarded to let the upstream node * know that this node, along with the alive nodes chaining through it, are still alive. */ - void run(); + void run(ReplicationCoordinator* replCoord); /// Signals the run() method to terminate. void shutdown(); @@ -71,14 +72,14 @@ private: * "oldStyle" indicates whether or not the upstream node is pre-3.2.2 and needs the older style * ReplSetUpdatePosition commands as a result. */ - Status updateUpstream(OperationContext* txn, bool oldStyle); + Status updateUpstream(ReplicationCoordinator* replCoord, bool oldStyle); bool hasConnection() { return _connection.get(); } /// Connect to sync target. - bool _connect(OperationContext* txn, const HostAndPort& host); + bool _connect(const ReplicaSetConfig& rsConfig, const HostAndPort& host); // the member we are currently syncing from HostAndPort _syncTarget; |