summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-03-13 17:31:39 -0400
committerBenety Goh <benety@mongodb.com>2018-03-13 17:45:00 -0400
commitd988a58bcb09d45a841570e26e7d50a4e9c23de8 (patch)
tree71ce8d6f762cab4b870c7d6cc953f66aa69a9f88 /src
parenta3909e15cf23edff53fdeb2ac3203e05d5ed9737 (diff)
downloadmongo-d988a58bcb09d45a841570e26e7d50a4e9c23de8.tar.gz
SERVER-32332 decouple BackgroundSync from SyncTail
Explicit shutdown() functions for SyncTail and RSDataSync. BackgroundSync implements OplogApplier::Observer. OplogBuffer for steady state replication is now cleared in ReplicationCoordinatorExternalStateImpl::shutdown() between shutting down and joining BackgroundSync/SyncTail.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp35
-rw-r--r--src/mongo/db/repl/bgsync.h20
-rw-r--r--src/mongo/db/repl/oplog_applier.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp18
-rw-r--r--src/mongo/db/repl/rs_sync.cpp19
-rw-r--r--src/mongo/db/repl/rs_sync.h13
-rw-r--r--src/mongo/db/repl/sync_tail.cpp58
-rw-r--r--src/mongo/db/repl/sync_tail.h48
8 files changed, 145 insertions, 72 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index b63fc5fd45d..eab353f2a39 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -164,10 +164,6 @@ void BackgroundSync::startup(OperationContext* opCtx) {
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
- // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
- // waiting for an operation to be past the slaveDelay point.
- clearBuffer(opCtx);
_state = ProducerState::Stopped;
if (_syncSourceResolver) {
@@ -216,7 +212,8 @@ void BackgroundSync::_run() {
fassertFailed(28546);
}
}
- stop(true);
+ // No need to reset optimes here because we are shutting down.
+ stop(false);
}
void BackgroundSync::_runProducer() {
@@ -562,28 +559,9 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
return Status::OK();
}
-bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) {
- return _oplogBuffer->peek(opCtx, op);
-}
-
-void BackgroundSync::waitForMore() {
- // Block for one second before timing out.
- _oplogBuffer->waitForData(Seconds(1));
-}
-
-void BackgroundSync::consume(OperationContext* opCtx) {
- // this is just to get the op off the queue, it's been peeked at
- // and queued for application already
- BSONObj op;
- if (_oplogBuffer->tryPop(opCtx, &op)) {
- bufferCountGauge.decrement(1);
- bufferSizeGauge.decrement(getSize(op));
- } else {
- invariant(inShutdown());
- // This means that shutdown() was called between the consumer's calls to peek() and
- // consume(). shutdown() cleared the buffer so there is nothing for us to consume here.
- // Since our postcondition is already met, it is safe to return successfully.
- }
+void BackgroundSync::onOperationConsumed(const BSONObj& op) {
+ bufferCountGauge.decrement(1);
+ bufferSizeGauge.decrement(getSize(op));
}
void BackgroundSync::_runRollback(OperationContext* opCtx,
@@ -763,8 +741,7 @@ void BackgroundSync::start(OperationContext* opCtx) {
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
-void BackgroundSync::clearBuffer(OperationContext* opCtx) {
- _oplogBuffer->clear(opCtx);
+void BackgroundSync::onBufferCleared() {
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 127c49fe8ec..704464c109b 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -34,6 +34,7 @@
#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/oplog_interface_remote.h"
@@ -59,7 +60,7 @@ class ReplicationCoordinatorExternalState;
class ReplicationProcess;
class StorageInterface;
-class BackgroundSync {
+class BackgroundSync : public OplogApplier::Observer {
MONGO_DISALLOW_COPYING(BackgroundSync);
public:
@@ -119,19 +120,16 @@ public:
HostAndPort getSyncTarget() const;
- // Interface implementation
+ /**
+ * This is called while shutting down to reset the counters for the OplogBuffer.
+ */
+ void onBufferCleared();
- bool peek(OperationContext* opCtx, BSONObj* op);
- void consume(OperationContext* opCtx);
void clearSyncTarget();
- void waitForMore();
// For monitoring
BSONObj getCounters();
- // Clears any fetched and buffered oplog entries.
- void clearBuffer(OperationContext* opCtx);
-
/**
* Returns true if any of the following is true:
* 1) We are shutting down;
@@ -145,6 +143,12 @@ public:
// Starts the producer if it's stopped. Otherwise, let it keep running.
void startProducerIfStopped();
+ // OplogApplier::Observer functions
+ void onBatchBegin(const OplogApplier::Operations&) final {}
+ void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {}
+ void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {}
+ void onOperationConsumed(const BSONObj& op) final;
+
private:
bool _inShutdown_inlock() const;
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index bbc5f29eafa..2fc88e449f8 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -136,6 +136,12 @@ public:
using FetchInfo = std::pair<OplogEntry, BSONObj>;
virtual void onMissingDocumentsFetchedAndInserted(
const std::vector<FetchInfo>& documentsFetchedAndInserted) = 0;
+
+ /**
+ * Used primarily by BackgroundSync to update server statistics during steady state replication.
+ * TODO: remove this function. See SERVER-33864.
+ */
+ virtual void onOperationConsumed(const BSONObj& op) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 00f2d968489..d5baf0d959b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -241,7 +241,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
log() << "Starting replication applier thread";
invariant(!_applierThread);
- _applierThread.reset(new RSDataSync{_bgSync.get(), replCoord});
+ _applierThread.reset(new RSDataSync{_bgSync.get(), _oplogBuffer.get(), replCoord});
_applierThread->startup();
log() << "Starting replication reporter thread";
invariant(!_syncSourceFeedbackThread);
@@ -286,13 +286,27 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat
if (oldApplier) {
log() << "Stopping replication applier thread";
- oldApplier->join();
+ oldApplier->shutdown();
+ }
+
+ // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
+ // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is
+ // waiting for an operation to be past the slaveDelay point.
+ if (oldOplogBuffer) {
+ oldOplogBuffer->clear(opCtx);
+ if (oldBgSync) {
+ oldBgSync->onBufferCleared();
+ }
}
if (oldBgSync) {
oldBgSync->join(opCtx);
}
+ if (oldApplier) {
+ oldApplier->join();
+ }
+
if (oldOplogBuffer) {
oldOplogBuffer->shutdown(opCtx);
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 4b16baef4c9..4aadfa69330 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -44,8 +44,13 @@
namespace mongo {
namespace repl {
-RSDataSync::RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord)
- : _bgsync(bgsync), _replCoord(replCoord) {}
+RSDataSync::RSDataSync(OplogApplier::Observer* observer,
+ OplogBuffer* oplogBuffer,
+ ReplicationCoordinator* replCoord)
+ : _oplogBuffer(oplogBuffer),
+ _replCoord(replCoord),
+ _writerPool(SyncTail::makeWriterPool()),
+ _syncTail(observer, multiSyncApply, _writerPool.get()) {}
RSDataSync::~RSDataSync() {
DESTRUCTOR_GUARD(join(););
@@ -56,9 +61,13 @@ void RSDataSync::startup() {
_runThread = stdx::thread(&RSDataSync::_run, this);
}
+void RSDataSync::shutdown() {
+ _syncTail.shutdown();
+}
+
void RSDataSync::join() {
if (_runThread.joinable()) {
- invariant(_bgsync->inShutdown());
+ invariant(_syncTail.inShutdown());
_runThread.join();
}
}
@@ -79,9 +88,7 @@ void RSDataSync::_run() {
try {
// Once we call into SyncTail::oplogApplication we never return, so this code only runs at
// startup.
- auto writerPool = SyncTail::makeWriterPool();
- SyncTail syncTail(_bgsync, multiSyncApply, writerPool.get());
- syncTail.oplogApplication(_replCoord);
+ _syncTail.oplogApplication(_oplogBuffer, _replCoord);
} catch (...) {
auto status = exceptionToStatus();
severe() << "Exception thrown in RSDataSync: " << redact(status);
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index c9f3f17fa0d..424d7091060 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -28,11 +28,13 @@
#pragma once
+#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/oplog_buffer.h"
+#include "mongo/db/repl/sync_tail.h"
#include "mongo/stdx/thread.h"
namespace mongo {
namespace repl {
-class BackgroundSync;
class ReplicationCoordinator;
/**
@@ -41,9 +43,12 @@ class ReplicationCoordinator;
*/
class RSDataSync {
public:
- RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord);
+ RSDataSync(OplogApplier::Observer* observer,
+ OplogBuffer* oplogBuffer,
+ ReplicationCoordinator* replCoord);
~RSDataSync();
void startup();
+ void shutdown();
void join();
private:
@@ -51,8 +56,10 @@ private:
void _run();
stdx::thread _runThread;
- BackgroundSync* _bgsync;
+ OplogBuffer* _oplogBuffer;
ReplicationCoordinator* _replCoord;
+ std::unique_ptr<ThreadPool> _writerPool;
+ SyncTail _syncTail;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 7a0700a6aa9..25dedfa09a3 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -274,8 +274,10 @@ NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op)
} // namespace
-SyncTail::SyncTail(BackgroundSync* bgsync, MultiSyncApplyFunc func, ThreadPool* writerPool)
- : _bgsync(bgsync), _applyFunc(func), _writerPool(writerPool) {}
+SyncTail::SyncTail(OplogApplier::Observer* observer,
+ MultiSyncApplyFunc func,
+ ThreadPool* writerPool)
+ : _observer(observer), _applyFunc(func), _writerPool(writerPool) {}
SyncTail::~SyncTail() {}
@@ -300,10 +302,6 @@ std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) {
return pool;
}
-bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) {
- return _bgsync->peek(opCtx, op);
-}
-
// static
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
@@ -691,7 +689,8 @@ class SyncTail::OpQueueBatcher {
MONGO_DISALLOW_COPYING(OpQueueBatcher);
public:
- OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([this] { run(); }) {}
+ OpQueueBatcher(SyncTail* syncTail, OplogBuffer* oplogBuffer)
+ : _syncTail(syncTail), _oplogBuffer(oplogBuffer), _thread([this] { run(); }) {}
~OpQueueBatcher() {
invariant(_isDead);
_thread.join();
@@ -758,7 +757,8 @@ private:
// tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early.
{
auto opCtx = cc().makeOperationContext();
- while (!_syncTail->tryPopAndWaitForMore(opCtx.get(), &ops, batchLimits)) {
+ while (!_syncTail->tryPopAndWaitForMore(
+ opCtx.get(), _oplogBuffer, &ops, batchLimits)) {
}
}
@@ -779,6 +779,7 @@ private:
}
SyncTail* const _syncTail;
+ OplogBuffer* const _oplogBuffer;
stdx::mutex _mutex; // Guards _ops.
stdx::condition_variable _cv;
@@ -791,8 +792,8 @@ private:
stdx::thread _thread; // Must be last so all other members are initialized before starting.
};
-void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
- OpQueueBatcher batcher(this);
+void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord) {
+ OpQueueBatcher batcher(this, oplogBuffer);
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()
@@ -817,7 +818,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
// Tests should not trigger clean shutdown while that failpoint is active. If we
// think we need this, we need to think hard about what the behavior should be.
- if (_bgsync->inShutdown()) {
+ if (inShutdown()) {
severe() << "Turn off rsSyncApplyStop before attempting clean shutdown";
fassertFailedNoTrace(40304);
}
@@ -916,21 +917,22 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
// queue. We don't block forever so that we can periodically check for things like shutdown or
// reconfigs.
bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
+ OplogBuffer* oplogBuffer,
SyncTail::OpQueue* ops,
const BatchLimits& limits) {
{
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
- bool peek_success = peek(opCtx, &op);
+ bool peek_success = oplogBuffer->peek(opCtx, &op);
if (!peek_success) {
// If we don't have anything in the queue, wait a bit for something to appear.
if (ops->empty()) {
- if (_bgsync->inShutdown()) {
+ if (inShutdown()) {
ops->setMustShutdownFlag();
} else {
// Block up to 1 second. We still return true in this case because we want this
// op to be the first in a new batch with a new start time.
- _bgsync->waitForMore();
+ oplogBuffer->waitForData(Seconds(1));
}
}
@@ -981,7 +983,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
if (entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
- _bgsync->consume(opCtx);
+ _consume(opCtx, oplogBuffer);
} else {
// This op must be processed alone, but we already had ops in the queue so we can't
// include it in this batch. Since we didn't call consume(), we'll see this again next
@@ -994,12 +996,36 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
}
// We are going to apply this Op.
- _bgsync->consume(opCtx);
+ _consume(opCtx, oplogBuffer);
// Go back for more ops, unless we've hit the limit.
return ops->getCount() >= limits.ops;
}
+void SyncTail::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
+ // This is just to get the op off the queue; it's been peeked at and queued for application
+ // already.
+ BSONObj op;
+ if (oplogBuffer->tryPop(opCtx, &op)) {
+ _observer->onOperationConsumed(op);
+ } else {
+ invariant(inShutdown());
+ // This means that shutdown() was called between the consumer's calls to peek() and
+ // consume(). shutdown() cleared the buffer so there is nothing for us to consume here.
+ // Since our postcondition is already met, it is safe to return successfully.
+ }
+}
+
+void SyncTail::shutdown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _inShutdown = true;
+}
+
+bool SyncTail::inShutdown() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _inShutdown;
+}
+
void SyncTail::setHostname(const std::string& hostname) {
_hostname = hostname;
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index d456107fcd7..660fb27a364 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -35,8 +35,11 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
@@ -46,7 +49,6 @@ class OperationContext;
struct MultikeyPathInfo;
namespace repl {
-class BackgroundSync;
class ReplicationCoordinator;
class OpTime;
@@ -102,12 +104,12 @@ public:
*
* Constructs a SyncTail.
* During steady state replication, oplogApplication() obtains batches of operations to apply
- * from 'bgsync'. It is not required to provide 'bgsync' at construction if we do not plan on
- * using oplogApplication(). During the oplog application phase, the batch of operations is
+ * from 'observer'. It is not required to provide 'observer' at construction if we do not plan
+ * on using oplogApplication(). During the oplog application phase, the batch of operations is
* distributed across writer threads in 'writerPool'. Each writer thread applies its own vector
* of operations using 'func'. The writer thread pool is not owned by us.
*/
- SyncTail(BackgroundSync* bgsync, MultiSyncApplyFunc func, ThreadPool* writerPool);
+ SyncTail(OplogApplier::Observer* observer, MultiSyncApplyFunc func, ThreadPool* writerPool);
virtual ~SyncTail();
/**
@@ -132,8 +134,23 @@ public:
const BSONObj& o,
OplogApplication::Mode oplogApplicationMode);
- void oplogApplication(ReplicationCoordinator* replCoord);
- bool peek(OperationContext* opCtx, BSONObj* obj);
+ /**
+ * Runs oplog application in a loop until shutdown() is called.
+ * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
+ * multiApply().
+ */
+ void oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord);
+
+ /**
+ * Shuts down oplogApplication() processing.
+ */
+ void shutdown();
+
+ /**
+ * Returns true if we are shutting down.
+ */
+ bool inShutdown() const;
+
class OpQueue {
public:
@@ -218,7 +235,10 @@ public:
* If ops is empty on entry and nothing can be added yet, will wait up to a second before
* returning true.
*/
- bool tryPopAndWaitForMore(OperationContext* opCtx, OpQueue* ops, const BatchLimits& limits);
+ bool tryPopAndWaitForMore(OperationContext* opCtx,
+ OplogBuffer* oplogBuffer,
+ OpQueue* ops,
+ const BatchLimits& limits);
/**
* Fetch a single document referenced in the operation from the sync source.
@@ -256,11 +276,17 @@ protected:
static const int replBatchLimitSeconds = 1;
private:
+ /**
+ * Pops the operation at the front of the OplogBuffer.
+ * Updates stats on BackgroundSync.
+ */
+ void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer);
+
class OpQueueBatcher;
std::string _hostname;
- BackgroundSync* _bgsync;
+ OplogApplier::Observer* const _observer;
// Function to use during applyOps
MultiSyncApplyFunc _applyFunc;
@@ -268,6 +294,12 @@ private:
// Pool of worker threads for writing ops to the databases.
// Not owned by us.
ThreadPool* const _writerPool;
+
+ // Protects member data of SyncTail.
+ mutable stdx::mutex _mutex;
+
+ // Set to true if shutdown() has been called.
+ bool _inShutdown = false;
};
// These free functions are used by the thread pool workers to write ops to the db.