summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-03-13 17:31:39 -0400
committerBenety Goh <benety@mongodb.com>2018-03-13 17:45:00 -0400
commitd988a58bcb09d45a841570e26e7d50a4e9c23de8 (patch)
tree71ce8d6f762cab4b870c7d6cc953f66aa69a9f88 /src/mongo/db/repl/bgsync.cpp
parenta3909e15cf23edff53fdeb2ac3203e05d5ed9737 (diff)
downloadmongo-d988a58bcb09d45a841570e26e7d50a4e9c23de8.tar.gz
SERVER-32332 decouple BackgroundSync from SyncTail
Explicit shutdown() functions for SyncTail and RSDataSync. BackgroundSync implements OplogApplier::Observer. OplogBuffer for steady state replication is now cleared in ReplicationCoordinatorExternalStateImpl::shutdown() between shutting down and joining BackgroundSync/SyncTail.
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp35
1 files changed, 6 insertions, 29 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index b63fc5fd45d..eab353f2a39 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -164,10 +164,6 @@ void BackgroundSync::startup(OperationContext* opCtx) {
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
- // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
- // waiting for an operation to be past the slaveDelay point.
- clearBuffer(opCtx);
_state = ProducerState::Stopped;
if (_syncSourceResolver) {
@@ -216,7 +212,8 @@ void BackgroundSync::_run() {
fassertFailed(28546);
}
}
- stop(true);
+ // No need to reset optimes here because we are shutting down.
+ stop(false);
}
void BackgroundSync::_runProducer() {
@@ -562,28 +559,9 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
return Status::OK();
}
-bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) {
- return _oplogBuffer->peek(opCtx, op);
-}
-
-void BackgroundSync::waitForMore() {
- // Block for one second before timing out.
- _oplogBuffer->waitForData(Seconds(1));
-}
-
-void BackgroundSync::consume(OperationContext* opCtx) {
- // this is just to get the op off the queue, it's been peeked at
- // and queued for application already
- BSONObj op;
- if (_oplogBuffer->tryPop(opCtx, &op)) {
- bufferCountGauge.decrement(1);
- bufferSizeGauge.decrement(getSize(op));
- } else {
- invariant(inShutdown());
- // This means that shutdown() was called between the consumer's calls to peek() and
- // consume(). shutdown() cleared the buffer so there is nothing for us to consume here.
- // Since our postcondition is already met, it is safe to return successfully.
- }
+void BackgroundSync::onOperationConsumed(const BSONObj& op) {
+ bufferCountGauge.decrement(1);
+ bufferSizeGauge.decrement(getSize(op));
}
void BackgroundSync::_runRollback(OperationContext* opCtx,
@@ -763,8 +741,7 @@ void BackgroundSync::start(OperationContext* opCtx) {
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
-void BackgroundSync::clearBuffer(OperationContext* opCtx) {
- _oplogBuffer->clear(opCtx);
+void BackgroundSync::onBufferCleared() {
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();