diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-08-25 20:00:59 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-08-26 18:32:35 -0400 |
commit | 15c19250190932511229ac0e70bacb4c9b107b82 (patch) | |
tree | 3ddcec9e8aa141faebee8490fefc30b5f2f056af /src/mongo/db/repl/bgsync.cpp | |
parent | 6f96fbe7008283d458e7f72063e954c0fac2cc1c (diff) | |
download | mongo-15c19250190932511229ac0e70bacb4c9b107b82.tar.gz |
SERVER-25071 Flush data replication queue as part of clean shutdown
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 36 |
1 files changed, 22 insertions, 14 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d31775f0001..42ec9938cdf 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -149,7 +149,9 @@ void BackgroundSync::startup(OperationContext* txn) { void BackgroundSync::shutdown(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lock(_mutex); - // Clear the buffer in case the producerThread is waiting in push() due to a full queue. + // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but + // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is + // waiting for an operation to be past the slaveDelay point. clearBuffer(txn); _stopped = true; @@ -472,30 +474,36 @@ void BackgroundSync::_produce(OperationContext* txn) { void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info) { - auto txn = cc().makeOperationContext(); - // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. if (info.toApplyDocumentCount == 0) { - bufferCountGauge.increment(info.toApplyDocumentCount); - bufferSizeGauge.increment(info.toApplyDocumentBytes); - return; + return; // Nothing to do. } + auto txn = cc().makeOperationContext(); + // Wait for enough space. _oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes); - OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes"; - } - - // Buffer docs for later application. - _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end); - - // Update last fetched info. { + // Don't add more to the buffer if we are in shutdown. Continue holding the lock until we + // are done to prevent going into shutdown. This avoids a race where shutdown() clears the + // buffer between the time we check _inShutdown and the point where we finish writing to the + // buffer. stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_inShutdown) { + return; + } + + OCCASIONALLY { + LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes"; + } + + // Buffer docs for later application. + _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end); + + // Update last fetched info. _lastFetchedHash = info.lastDocument.value; _lastOpTimeFetched = info.lastDocument.opTime; LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; |