diff options
author | Benety Goh <benety@mongodb.com> | 2018-03-13 17:31:39 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-03-13 17:45:00 -0400 |
commit | d988a58bcb09d45a841570e26e7d50a4e9c23de8 (patch) | |
tree | 71ce8d6f762cab4b870c7d6cc953f66aa69a9f88 /src/mongo/db/repl/bgsync.cpp | |
parent | a3909e15cf23edff53fdeb2ac3203e05d5ed9737 (diff) | |
download | mongo-d988a58bcb09d45a841570e26e7d50a4e9c23de8.tar.gz |
SERVER-32332 decouple BackgroundSync from SyncTail
Explicit shutdown() functions for SyncTail and RSDataSync.
BackgroundSync implements OplogApplier::Observer.
OplogBuffer for steady state replication is now cleared in
ReplicationCoordinatorExternalStateImpl::shutdown() between shutting down
and joining BackgroundSync/SyncTail.
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 35 |
1 files changed, 6 insertions, 29 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index b63fc5fd45d..eab353f2a39 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -164,10 +164,6 @@ void BackgroundSync::startup(OperationContext* opCtx) { void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); - // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but - // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is - // waiting for an operation to be past the slaveDelay point. - clearBuffer(opCtx); _state = ProducerState::Stopped; if (_syncSourceResolver) { @@ -216,7 +212,8 @@ void BackgroundSync::_run() { fassertFailed(28546); } } - stop(true); + // No need to reset optimes here because we are shutting down. + stop(false); } void BackgroundSync::_runProducer() { @@ -562,28 +559,9 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi return Status::OK(); } -bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) { - return _oplogBuffer->peek(opCtx, op); -} - -void BackgroundSync::waitForMore() { - // Block for one second before timing out. - _oplogBuffer->waitForData(Seconds(1)); -} - -void BackgroundSync::consume(OperationContext* opCtx) { - // this is just to get the op off the queue, it's been peeked at - // and queued for application already - BSONObj op; - if (_oplogBuffer->tryPop(opCtx, &op)) { - bufferCountGauge.decrement(1); - bufferSizeGauge.decrement(getSize(op)); - } else { - invariant(inShutdown()); - // This means that shutdown() was called between the consumer's calls to peek() and - // consume(). shutdown() cleared the buffer so there is nothing for us to consume here. - // Since our postcondition is already met, it is safe to return successfully. - } +void BackgroundSync::onOperationConsumed(const BSONObj& op) { + bufferCountGauge.decrement(1); + bufferSizeGauge.decrement(getSize(op)); } void BackgroundSync::_runRollback(OperationContext* opCtx, @@ -763,8 +741,7 @@ void BackgroundSync::start(OperationContext* opCtx) { LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } -void BackgroundSync::clearBuffer(OperationContext* opCtx) { - _oplogBuffer->clear(opCtx); +void BackgroundSync::onBufferCleared() { const auto count = bufferCountGauge.get(); bufferCountGauge.decrement(count); const auto size = bufferSizeGauge.get(); |