summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-03-12 17:55:28 -0400
committerBenety Goh <benety@mongodb.com>2018-03-12 17:56:19 -0400
commita1942a4ea0d79aaa7bb93978acbc4273af29a2eb (patch)
treef41d42642f7afa3e184cd62544a5f4f84d5c9220 /src
parent445d92f0880f4315528032916c6cb7a4567d817f (diff)
downloadmongo-a1942a4ea0d79aaa7bb93978acbc4273af29a2eb.tar.gz
SERVER-32332 move ownership of OplogBuffer from BackgroundSync to ReplicationCoordinatorExternalStateImpl
The new OplogApplier requires an OplogBuffer passed in at construction. Moving the ownership out of BackgroundSync makes it clear that the OplogBuffer is owned by neither BackgroundSync nor OplogApplier.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp7
-rw-r--r--src/mongo/db/repl/bgsync.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h5
4 files changed, 25 insertions, 10 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 766885db842..b63fc5fd45d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -146,8 +146,8 @@ BackgroundSync::BackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
ReplicationProcess* replicationProcess,
- std::unique_ptr<OplogBuffer> oplogBuffer)
- : _oplogBuffer(std::move(oplogBuffer)),
+ OplogBuffer* oplogBuffer)
+ : _oplogBuffer(oplogBuffer),
_replCoord(replicationCoordinator),
_replicationCoordinatorExternalState(replicationCoordinatorExternalState),
_replicationProcess(replicationProcess) {
@@ -157,8 +157,6 @@ BackgroundSync::BackgroundSync(
}
void BackgroundSync::startup(OperationContext* opCtx) {
- _oplogBuffer->startup(opCtx);
-
invariant(!_producerThread);
_producerThread.reset(new stdx::thread([this] { _run(); }));
}
@@ -189,7 +187,6 @@ void BackgroundSync::shutdown(OperationContext* opCtx) {
void BackgroundSync::join(OperationContext* opCtx) {
_producerThread->join();
- _oplogBuffer->shutdown(opCtx);
}
bool BackgroundSync::inShutdown() const {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 6f5c7fc116d..127c49fe8ec 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -77,10 +77,15 @@ public:
*/
enum class ProducerState { Starting, Running, Stopped };
+ /**
+ * Constructs a BackgroundSync to fetch oplog entries from a sync source.
+ * The BackgroundSync does not own any of the components referenced by the constructor
+ * arguments. All these components must outlive the BackgroundSync object.
+ */
BackgroundSync(ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
ReplicationProcess* replicationProcess,
- std::unique_ptr<OplogBuffer> oplogBuffer);
+ OplogBuffer* oplogBuffer);
// stop syncing (when this node becomes a primary, e.g.)
// During stepdown, the last fetched optime is not reset in order to keep track of the lastest
@@ -203,8 +208,8 @@ private:
OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* opCtx);
- // Production thread
- std::unique_ptr<OplogBuffer> _oplogBuffer;
+ // This OplogBuffer holds oplog entries fetched from the sync source.
+ OplogBuffer* const _oplogBuffer;
// A pointer to the replication coordinator running the show.
ReplicationCoordinator* _replCoord;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 9f4916aa3bd..a43bf3df4ec 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -233,8 +233,11 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
invariant(replCoord);
invariant(!_bgSync);
log() << "Starting replication fetcher thread";
- _bgSync = stdx::make_unique<BackgroundSync>(
- replCoord, this, _replicationProcess, stdx::make_unique<OplogBufferBlockingQueue>());
+ _oplogBuffer = stdx::make_unique<OplogBufferBlockingQueue>();
+ _oplogBuffer->startup(opCtx);
+
+ _bgSync =
+ stdx::make_unique<BackgroundSync>(replCoord, this, _replicationProcess, _oplogBuffer.get());
_bgSync->startup(opCtx);
log() << "Starting replication applier thread";
@@ -264,6 +267,7 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat
_stoppingDataReplication = true;
auto oldSSF = std::move(_syncSourceFeedbackThread);
+ auto oldOplogBuffer = std::move(_oplogBuffer);
auto oldBgSync = std::move(_bgSync);
auto oldApplier = std::move(_applierThread);
lock->unlock();
@@ -290,6 +294,10 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat
oldBgSync->join(opCtx);
}
+ if (oldOplogBuffer) {
+ oldOplogBuffer->shutdown(opCtx);
+ }
+
lock->lock();
_stoppingDataReplication = false;
_dataReplicationStopped.notify_all();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index ca411fb658d..93e8149e3b3 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -176,6 +176,11 @@ private:
// replication.
SyncSourceFeedback _syncSourceFeedback;
+ // The OplogBuffer is used to hold operations read from the sync source.
+ // BackgroundSync adds operations to the OplogBuffer while the applier thread consumes these
+ // operations in batches during oplog application.
+ std::unique_ptr<OplogBuffer> _oplogBuffer;
+
// The BackgroundSync class is responsible for pulling ops off the network from the sync source
// and into a BlockingQueue.
// We can't create it on construction because it needs a fully constructed