diff options
author | Benety Goh <benety@mongodb.com> | 2018-03-13 17:31:39 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-03-13 17:45:00 -0400 |
commit | d988a58bcb09d45a841570e26e7d50a4e9c23de8 (patch) | |
tree | 71ce8d6f762cab4b870c7d6cc953f66aa69a9f88 /src/mongo/db/repl | |
parent | a3909e15cf23edff53fdeb2ac3203e05d5ed9737 (diff) | |
download | mongo-d988a58bcb09d45a841570e26e7d50a4e9c23de8.tar.gz |
SERVER-32332 decouple BackgroundSync from SyncTail
Explicit shutdown() functions for SyncTail and RSDataSync.
BackgroundSync implements OplogApplier::Observer.
OplogBuffer for steady state replication is now cleared in
ReplicationCoordinatorExternalStateImpl::shutdown() between shutting down
and joining BackgroundSync/SyncTail.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 48 |
8 files changed, 145 insertions, 72 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index b63fc5fd45d..eab353f2a39 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -164,10 +164,6 @@ void BackgroundSync::startup(OperationContext* opCtx) { void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); - // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but - // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is - // waiting for an operation to be past the slaveDelay point. - clearBuffer(opCtx); _state = ProducerState::Stopped; if (_syncSourceResolver) { @@ -216,7 +212,8 @@ void BackgroundSync::_run() { fassertFailed(28546); } } - stop(true); + // No need to reset optimes here because we are shutting down. + stop(false); } void BackgroundSync::_runProducer() { @@ -562,28 +559,9 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi return Status::OK(); } -bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) { - return _oplogBuffer->peek(opCtx, op); -} - -void BackgroundSync::waitForMore() { - // Block for one second before timing out. - _oplogBuffer->waitForData(Seconds(1)); -} - -void BackgroundSync::consume(OperationContext* opCtx) { - // this is just to get the op off the queue, it's been peeked at - // and queued for application already - BSONObj op; - if (_oplogBuffer->tryPop(opCtx, &op)) { - bufferCountGauge.decrement(1); - bufferSizeGauge.decrement(getSize(op)); - } else { - invariant(inShutdown()); - // This means that shutdown() was called between the consumer's calls to peek() and - // consume(). shutdown() cleared the buffer so there is nothing for us to consume here. - // Since our postcondition is already met, it is safe to return successfully. - } +void BackgroundSync::onOperationConsumed(const BSONObj& op) { + bufferCountGauge.decrement(1); + bufferSizeGauge.decrement(getSize(op)); } void BackgroundSync::_runRollback(OperationContext* opCtx, @@ -763,8 +741,7 @@ void BackgroundSync::start(OperationContext* opCtx) { LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } -void BackgroundSync::clearBuffer(OperationContext* opCtx) { - _oplogBuffer->clear(opCtx); +void BackgroundSync::onBufferCleared() { const auto count = bufferCountGauge.get(); bufferCountGauge.decrement(count); const auto size = bufferSizeGauge.get(); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 127c49fe8ec..704464c109b 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -34,6 +34,7 @@ #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/oplog_interface_remote.h" @@ -59,7 +60,7 @@ class ReplicationCoordinatorExternalState; class ReplicationProcess; class StorageInterface; -class BackgroundSync { +class BackgroundSync : public OplogApplier::Observer { MONGO_DISALLOW_COPYING(BackgroundSync); public: @@ -119,19 +120,16 @@ public: HostAndPort getSyncTarget() const; - // Interface implementation + /** + * This is called while shutting down to reset the counters for the OplogBuffer. + */ + void onBufferCleared(); - bool peek(OperationContext* opCtx, BSONObj* op); - void consume(OperationContext* opCtx); void clearSyncTarget(); - void waitForMore(); // For monitoring BSONObj getCounters(); - // Clears any fetched and buffered oplog entries. - void clearBuffer(OperationContext* opCtx); - /** * Returns true if any of the following is true: * 1) We are shutting down; @@ -145,6 +143,12 @@ public: // Starts the producer if it's stopped. Otherwise, let it keep running. void startProducerIfStopped(); + // OplogApplier::Observer functions + void onBatchBegin(const OplogApplier::Operations&) final {} + void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {} + void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {} + void onOperationConsumed(const BSONObj& op) final; + private: bool _inShutdown_inlock() const; diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index bbc5f29eafa..2fc88e449f8 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -136,6 +136,12 @@ public: using FetchInfo = std::pair<OplogEntry, BSONObj>; virtual void onMissingDocumentsFetchedAndInserted( const std::vector<FetchInfo>& documentsFetchedAndInserted) = 0; + + /** + * Used primarily by BackgroundSync to update server statistics during steady state replication. + * TODO: remove this function. See SERVER-33864. + */ + virtual void onOperationConsumed(const BSONObj& op) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 00f2d968489..d5baf0d959b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -241,7 +241,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( log() << "Starting replication applier thread"; invariant(!_applierThread); - _applierThread.reset(new RSDataSync{_bgSync.get(), replCoord}); + _applierThread.reset(new RSDataSync{_bgSync.get(), _oplogBuffer.get(), replCoord}); _applierThread->startup(); log() << "Starting replication reporter thread"; invariant(!_syncSourceFeedbackThread); @@ -286,13 +286,27 @@ void ReplicationCoordinatorExternalStateImpl::_stopDataReplication_inlock(Operat if (oldApplier) { log() << "Stopping replication applier thread"; - oldApplier->join(); + oldApplier->shutdown(); + } + + // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but + // ensures that it won't add anything. It will also unblock the OplogApplier pipeline if it is + // waiting for an operation to be past the slaveDelay point. + if (oldOplogBuffer) { + oldOplogBuffer->clear(opCtx); + if (oldBgSync) { + oldBgSync->onBufferCleared(); + } } if (oldBgSync) { oldBgSync->join(opCtx); } + if (oldApplier) { + oldApplier->join(); + } + if (oldOplogBuffer) { oldOplogBuffer->shutdown(opCtx); } diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 4b16baef4c9..4aadfa69330 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -44,8 +44,13 @@ namespace mongo { namespace repl { -RSDataSync::RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord) - : _bgsync(bgsync), _replCoord(replCoord) {} +RSDataSync::RSDataSync(OplogApplier::Observer* observer, + OplogBuffer* oplogBuffer, + ReplicationCoordinator* replCoord) + : _oplogBuffer(oplogBuffer), + _replCoord(replCoord), + _writerPool(SyncTail::makeWriterPool()), + _syncTail(observer, multiSyncApply, _writerPool.get()) {} RSDataSync::~RSDataSync() { DESTRUCTOR_GUARD(join();); @@ -56,9 +61,13 @@ void RSDataSync::startup() { _runThread = stdx::thread(&RSDataSync::_run, this); } +void RSDataSync::shutdown() { + _syncTail.shutdown(); +} + void RSDataSync::join() { if (_runThread.joinable()) { - invariant(_bgsync->inShutdown()); + invariant(_syncTail.inShutdown()); _runThread.join(); } } @@ -79,9 +88,7 @@ void RSDataSync::_run() { try { // Once we call into SyncTail::oplogApplication we never return, so this code only runs at // startup. - auto writerPool = SyncTail::makeWriterPool(); - SyncTail syncTail(_bgsync, multiSyncApply, writerPool.get()); - syncTail.oplogApplication(_replCoord); + _syncTail.oplogApplication(_oplogBuffer, _replCoord); } catch (...) { auto status = exceptionToStatus(); severe() << "Exception thrown in RSDataSync: " << redact(status); diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h index c9f3f17fa0d..424d7091060 100644 --- a/src/mongo/db/repl/rs_sync.h +++ b/src/mongo/db/repl/rs_sync.h @@ -28,11 +28,13 @@ #pragma once +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/oplog_buffer.h" +#include "mongo/db/repl/sync_tail.h" #include "mongo/stdx/thread.h" namespace mongo { namespace repl { -class BackgroundSync; class ReplicationCoordinator; /** @@ -41,9 +43,12 @@ class ReplicationCoordinator; */ class RSDataSync { public: - RSDataSync(BackgroundSync* bgsync, ReplicationCoordinator* replCoord); + RSDataSync(OplogApplier::Observer* observer, + OplogBuffer* oplogBuffer, + ReplicationCoordinator* replCoord); ~RSDataSync(); void startup(); + void shutdown(); void join(); private: @@ -51,8 +56,10 @@ private: void _run(); stdx::thread _runThread; - BackgroundSync* _bgsync; + OplogBuffer* _oplogBuffer; ReplicationCoordinator* _replCoord; + std::unique_ptr<ThreadPool> _writerPool; + SyncTail _syncTail; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 7a0700a6aa9..25dedfa09a3 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -274,8 +274,10 @@ NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) } // namespace -SyncTail::SyncTail(BackgroundSync* bgsync, MultiSyncApplyFunc func, ThreadPool* writerPool) - : _bgsync(bgsync), _applyFunc(func), _writerPool(writerPool) {} +SyncTail::SyncTail(OplogApplier::Observer* observer, + MultiSyncApplyFunc func, + ThreadPool* writerPool) + : _observer(observer), _applyFunc(func), _writerPool(writerPool) {} SyncTail::~SyncTail() {} @@ -300,10 +302,6 @@ std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) { return pool; } -bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) { - return _bgsync->peek(opCtx, op); -} - // static Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, @@ -691,7 +689,8 @@ class SyncTail::OpQueueBatcher { MONGO_DISALLOW_COPYING(OpQueueBatcher); public: - OpQueueBatcher(SyncTail* syncTail) : _syncTail(syncTail), _thread([this] { run(); }) {} + OpQueueBatcher(SyncTail* syncTail, OplogBuffer* oplogBuffer) + : _syncTail(syncTail), _oplogBuffer(oplogBuffer), _thread([this] { run(); }) {} ~OpQueueBatcher() { invariant(_isDead); _thread.join(); @@ -758,7 +757,8 @@ private: // tryPopAndWaitForMore adds to ops and returns true when we need to end a batch early. { auto opCtx = cc().makeOperationContext(); - while (!_syncTail->tryPopAndWaitForMore(opCtx.get(), &ops, batchLimits)) { + while (!_syncTail->tryPopAndWaitForMore( + opCtx.get(), _oplogBuffer, &ops, batchLimits)) { } } @@ -779,6 +779,7 @@ private: } SyncTail* const _syncTail; + OplogBuffer* const _oplogBuffer; stdx::mutex _mutex; // Guards _ops. stdx::condition_variable _cv; @@ -791,8 +792,8 @@ private: stdx::thread _thread; // Must be last so all other members are initialized before starting. }; -void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { - OpQueueBatcher batcher(this); +void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord) { + OpQueueBatcher batcher(this, oplogBuffer); std::unique_ptr<ApplyBatchFinalizer> finalizer{ getGlobalServiceContext()->getGlobalStorageEngine()->isDurable() @@ -817,7 +818,7 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { while (MONGO_FAIL_POINT(rsSyncApplyStop)) { // Tests should not trigger clean shutdown while that failpoint is active. If we // think we need this, we need to think hard about what the behavior should be. - if (_bgsync->inShutdown()) { + if (inShutdown()) { severe() << "Turn off rsSyncApplyStop before attempting clean shutdown"; fassertFailedNoTrace(40304); } @@ -916,21 +917,22 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { // queue. We don't block forever so that we can periodically check for things like shutdown or // reconfigs. bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, + OplogBuffer* oplogBuffer, SyncTail::OpQueue* ops, const BatchLimits& limits) { { BSONObj op; // Check to see if there are ops waiting in the bgsync queue - bool peek_success = peek(opCtx, &op); + bool peek_success = oplogBuffer->peek(opCtx, &op); if (!peek_success) { // If we don't have anything in the queue, wait a bit for something to appear. if (ops->empty()) { - if (_bgsync->inShutdown()) { + if (inShutdown()) { ops->setMustShutdownFlag(); } else { // Block up to 1 second. We still return true in this case because we want this // op to be the first in a new batch with a new start time. - _bgsync->waitForMore(); + oplogBuffer->waitForData(Seconds(1)); } } @@ -981,7 +983,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, if (entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) { if (ops->getCount() == 1) { // apply commands one-at-a-time - _bgsync->consume(opCtx); + _consume(opCtx, oplogBuffer); } else { // This op must be processed alone, but we already had ops in the queue so we can't // include it in this batch. Since we didn't call consume(), we'll see this again next @@ -994,12 +996,36 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, } // We are going to apply this Op. - _bgsync->consume(opCtx); + _consume(opCtx, oplogBuffer); // Go back for more ops, unless we've hit the limit. return ops->getCount() >= limits.ops; } +void SyncTail::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { + // This is just to get the op off the queue; it's been peeked at and queued for application + // already. + BSONObj op; + if (oplogBuffer->tryPop(opCtx, &op)) { + _observer->onOperationConsumed(op); + } else { + invariant(inShutdown()); + // This means that shutdown() was called between the consumer's calls to peek() and + // consume(). shutdown() cleared the buffer so there is nothing for us to consume here. + // Since our postcondition is already met, it is safe to return successfully. + } +} + +void SyncTail::shutdown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _inShutdown = true; +} + +bool SyncTail::inShutdown() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _inShutdown; +} + void SyncTail::setHostname(const std::string& hostname) { _hostname = hostname; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index d456107fcd7..660fb27a364 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -35,8 +35,11 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -46,7 +49,6 @@ class OperationContext; struct MultikeyPathInfo; namespace repl { -class BackgroundSync; class ReplicationCoordinator; class OpTime; @@ -102,12 +104,12 @@ public: * * Constructs a SyncTail. * During steady state replication, oplogApplication() obtains batches of operations to apply - * from 'bgsync'. It is not required to provide 'bgsync' at construction if we do not plan on - * using oplogApplication(). During the oplog application phase, the batch of operations is + * from 'observer'. It is not required to provide 'observer' at construction if we do not plan + * on using oplogApplication(). During the oplog application phase, the batch of operations is * distributed across writer threads in 'writerPool'. Each writer thread applies its own vector * of operations using 'func'. The writer thread pool is not owned by us. */ - SyncTail(BackgroundSync* bgsync, MultiSyncApplyFunc func, ThreadPool* writerPool); + SyncTail(OplogApplier::Observer* observer, MultiSyncApplyFunc func, ThreadPool* writerPool); virtual ~SyncTail(); /** @@ -132,8 +134,23 @@ public: const BSONObj& o, OplogApplication::Mode oplogApplicationMode); - void oplogApplication(ReplicationCoordinator* replCoord); - bool peek(OperationContext* opCtx, BSONObj* obj); + /** + * Runs oplog application in a loop until shutdown() is called. + * Retrieves operations from the OplogBuffer in batches that will be applied in parallel using + * multiApply(). + */ + void oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator* replCoord); + + /** + * Shuts down oplogApplication() processing. + */ + void shutdown(); + + /** + * Returns true if we are shutting down. + */ + bool inShutdown() const; + class OpQueue { public: @@ -218,7 +235,10 @@ public: * If ops is empty on entry and nothing can be added yet, will wait up to a second before * returning true. */ - bool tryPopAndWaitForMore(OperationContext* opCtx, OpQueue* ops, const BatchLimits& limits); + bool tryPopAndWaitForMore(OperationContext* opCtx, + OplogBuffer* oplogBuffer, + OpQueue* ops, + const BatchLimits& limits); /** * Fetch a single document referenced in the operation from the sync source. @@ -256,11 +276,17 @@ protected: static const int replBatchLimitSeconds = 1; private: + /** + * Pops the operation at the front of the OplogBuffer. + * Updates stats on BackgroundSync. + */ + void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer); + class OpQueueBatcher; std::string _hostname; - BackgroundSync* _bgsync; + OplogApplier::Observer* const _observer; // Function to use during applyOps MultiSyncApplyFunc _applyFunc; @@ -268,6 +294,12 @@ private: // Pool of worker threads for writing ops to the databases. // Not owned by us. ThreadPool* const _writerPool; + + // Protects member data of SyncTail. + mutable stdx::mutex _mutex; + + // Set to true if shutdown() has been called. + bool _inShutdown = false; }; // These free functions are used by the thread pool workers to write ops to the db. |