summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavithra Vetriselvan <pavithra.vetriselvan@mongodb.com>2017-10-24 13:52:04 -0400
committerBenety Goh <benety@mongodb.com>2017-11-17 15:14:15 -0500
commit78a87f7580736c7fbe7ee48d8c7c00716fbc913e (patch)
tree0bcd6c3d2a57af98131ca27e21c8deb112e4efda
parent85eec1d61a2bc6a7b7a1391fd787ad00eadc5060 (diff)
downloadmongo-78a87f7580736c7fbe7ee48d8c7c00716fbc913e.tar.gz
SERVER-31589 Replaces OpContext pts with ReplicationCoordinator ptrs.
(cherry picked from commit 608f38a4fb0de96546dd0aa3eea2f244bbcf2bef)
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp18
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h7
9 files changed, 26 insertions, 22 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index de8ee0011e1..27b65898f19 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -50,6 +50,7 @@ namespace repl {
class LastVote;
class ReplSettings;
+class ReplicationCoordinator;
/**
* This class represents the interface the ReplicationCoordinator uses to interact with the
@@ -69,7 +70,7 @@ public:
*
* NOTE: Only starts threads if they are not already started,
*/
- virtual void startThreads(const ReplSettings& settings) = 0;
+ virtual void startThreads(const ReplSettings& settings, ReplicationCoordinator* replCoord) = 0;
/**
* Starts the Master/Slave threads and sets up logOp
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index f0e4cb4e0ee..2e336dcd3bc 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -96,7 +96,8 @@ ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl
: _startedThreads(false), _nextThreadId(0) {}
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
-void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings) {
+void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& settings,
+ ReplicationCoordinator* replCoord) {
stdx::lock_guard<stdx::mutex> lk(_threadMutex);
if (_startedThreads) {
return;
@@ -106,7 +107,7 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
BackgroundSync* bgsync = BackgroundSync::get();
_producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
_syncSourceFeedbackThread.reset(
- new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback)));
+ new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback, replCoord)));
if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) {
_snapshotThread = SnapshotThread::start(getGlobalServiceContext());
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index fb3135b8592..90d2fd5edf5 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -51,7 +51,8 @@ class ReplicationCoordinatorExternalStateImpl : public ReplicationCoordinatorExt
public:
ReplicationCoordinatorExternalStateImpl();
virtual ~ReplicationCoordinatorExternalStateImpl();
- virtual void startThreads(const ReplSettings& settings) override;
+ virtual void startThreads(const ReplSettings& settings,
+ ReplicationCoordinator* replCoord) override;
virtual void startMasterSlave(OperationContext* txn);
virtual void shutdown(OperationContext* txn);
virtual Status initializeReplSetStorage(OperationContext* txn,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index a05a7e7ae06..d10d73205d4 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -58,7 +58,8 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock
ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {}
-void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& settings) {
+void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& settings,
+ ReplicationCoordinator* replCoord) {
_threadsStarted = true;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 1434fc34bab..a26d5273c9d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -52,7 +52,8 @@ public:
ReplicationCoordinatorExternalStateMock();
virtual ~ReplicationCoordinatorExternalStateMock();
- virtual void startThreads(const ReplSettings& settings) override;
+ virtual void startThreads(const ReplSettings& settings,
+ ReplicationCoordinator* replCoord) override;
virtual void startMasterSlave(OperationContext*);
virtual void shutdown(OperationContext*);
virtual Status initializeReplSetStorage(OperationContext* txn,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a16abdbceb5..e0e88d1a92a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -475,7 +475,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
}
_performPostMemberStateUpdateAction(action);
if (!isArbiter) {
- _externalState->startThreads(_settings);
+ _externalState->startThreads(_settings, this);
}
}
@@ -2532,7 +2532,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
// A configuration passed to replSetInitiate() with the current node as an arbiter
// will fail validation with a "replSet initiate got ... while validating" reason.
invariant(!newConfig.getMemberAt(myIndex.getValue()).isArbiter());
- _externalState->startThreads(_settings);
+ _externalState->startThreads(_settings, this);
}
return Status::OK();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 8a3d9532d1a..6c781f982d3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -483,7 +483,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 &&
newConfig.getMemberAt(myIndex.getValue()).isArbiter();
if (!isArbiter) {
- _externalState->startThreads(_settings);
+ _externalState->startThreads(_settings, this);
}
}
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index beb73bebbdd..8b7a1e392fa 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -72,7 +72,7 @@ bool SyncSourceFeedback::replAuthenticate() {
return _connection->authenticateInternalUser();
}
-bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host) {
+bool SyncSourceFeedback::_connect(const ReplicaSetConfig& rsConfig, const HostAndPort& host) {
if (hasConnection()) {
return true;
}
@@ -94,7 +94,6 @@ bool SyncSourceFeedback::_connect(OperationContext* txn, const HostAndPort& host
}
// Update keepalive value from config.
- auto rsConfig = repl::ReplicationCoordinator::get(txn)->getConfig();
_keepAliveInterval = rsConfig.getElectionTimeoutPeriod() / 2;
return hasConnection();
@@ -106,8 +105,7 @@ void SyncSourceFeedback::forwardSlaveProgress() {
_cond.notify_all();
}
-Status SyncSourceFeedback::updateUpstream(OperationContext* txn, bool oldStyle) {
- auto replCoord = repl::ReplicationCoordinator::get(txn);
+Status SyncSourceFeedback::updateUpstream(ReplicationCoordinator* replCoord, bool oldStyle) {
if (replCoord->getMemberState().primary()) {
// Primary has no one to send updates to.
return Status::OK();
@@ -164,16 +162,16 @@ void SyncSourceFeedback::shutdown() {
_cond.notify_all();
}
-void SyncSourceFeedback::run() {
+void SyncSourceFeedback::run(ReplicationCoordinator* replCoord) {
Client::initThread("SyncSourceFeedback");
while (true) { // breaks once _shutdownSignaled is true
- auto txn = cc().makeOperationContext();
+
{
stdx::unique_lock<stdx::mutex> lock(_mtx);
while (!_positionChanged && !_shutdownSignaled) {
if (_cond.wait_for(lock, _keepAliveInterval) == stdx::cv_status::timeout) {
- MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
+ MemberState state = replCoord->getMemberState();
if (!(state.primary() || state.startup())) {
break;
}
@@ -187,7 +185,7 @@ void SyncSourceFeedback::run() {
_positionChanged = false;
}
- MemberState state = ReplicationCoordinator::get(txn.get())->getMemberState();
+ MemberState state = replCoord->getMemberState();
if (state.primary() || state.startup()) {
_resetConnection();
continue;
@@ -199,13 +197,13 @@ void SyncSourceFeedback::run() {
}
if (!hasConnection()) {
// fix connection if need be
- if (target.empty() || !_connect(txn.get(), target)) {
+ if (target.empty() || !_connect(replCoord->getConfig(), target)) {
// Loop back around again; the keepalive functionality will cause us to retry
continue;
}
}
bool oldFallBackValue = _fallBackToOldUpdatePosition;
- Status status = updateUpstream(txn.get(), _fallBackToOldUpdatePosition);
+ Status status = updateUpstream(replCoord, _fallBackToOldUpdatePosition);
if (!status.isOK()) {
if (_fallBackToOldUpdatePosition != oldFallBackValue) {
stdx::unique_lock<stdx::mutex> lock(_mtx);
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index ed45b59a752..51938fb0142 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -31,6 +31,7 @@
#include "mongo/client/constants.h"
#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
@@ -51,7 +52,7 @@ public:
* update occurs within the _keepAliveInterval, progress is forwarded to let the upstream node
* know that this node, along with the alive nodes chaining through it, are still alive.
*/
- void run();
+ void run(ReplicationCoordinator* replCoord);
/// Signals the run() method to terminate.
void shutdown();
@@ -71,14 +72,14 @@ private:
* "oldStyle" indicates whether or not the upstream node is pre-3.2.2 and needs the older style
* ReplSetUpdatePosition commands as a result.
*/
- Status updateUpstream(OperationContext* txn, bool oldStyle);
+ Status updateUpstream(ReplicationCoordinator* replCoord, bool oldStyle);
bool hasConnection() {
return _connection.get();
}
/// Connect to sync target.
- bool _connect(OperationContext* txn, const HostAndPort& host);
+ bool _connect(const ReplicaSetConfig& rsConfig, const HostAndPort& host);
// the member we are currently syncing from
HostAndPort _syncTarget;