summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-08-25 20:00:59 -0400
committerMathias Stearn <mathias@10gen.com>2016-08-26 18:32:35 -0400
commit15c19250190932511229ac0e70bacb4c9b107b82 (patch)
tree3ddcec9e8aa141faebee8490fefc30b5f2f056af /src/mongo/db/repl/bgsync.cpp
parent6f96fbe7008283d458e7f72063e954c0fac2cc1c (diff)
downloadmongo-15c19250190932511229ac0e70bacb4c9b107b82.tar.gz
SERVER-25071 Flush data replication queue as part of clean shutdown
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp36
1 files changed, 22 insertions, 14 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d31775f0001..42ec9938cdf 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -149,7 +149,9 @@ void BackgroundSync::startup(OperationContext* txn) {
void BackgroundSync::shutdown(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- // Clear the buffer in case the producerThread is waiting in push() due to a full queue.
+ // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
+ // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
+ // waiting for an operation to be past the slaveDelay point.
clearBuffer(txn);
_stopped = true;
@@ -472,30 +474,36 @@ void BackgroundSync::_produce(OperationContext* txn) {
void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info) {
- auto txn = cc().makeOperationContext();
-
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
if (info.toApplyDocumentCount == 0) {
- bufferCountGauge.increment(info.toApplyDocumentCount);
- bufferSizeGauge.increment(info.toApplyDocumentBytes);
- return;
+ return; // Nothing to do.
}
+ auto txn = cc().makeOperationContext();
+
// Wait for enough space.
_oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes);
- OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
- }
-
- // Buffer docs for later application.
- _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end);
-
- // Update last fetched info.
{
+ // Don't add more to the buffer if we are in shutdown. Continue holding the lock until we
+ // are done to prevent going into shutdown. This avoids a race where shutdown() clears the
+ // buffer between the time we check _inShutdown and the point where we finish writing to the
+ // buffer.
stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_inShutdown) {
+ return;
+ }
+
+ OCCASIONALLY {
+ LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
+ }
+
+ // Buffer docs for later application.
+ _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end);
+
+ // Update last fetched info.
_lastFetchedHash = info.lastDocument.value;
_lastOpTimeFetched = info.lastDocument.opTime;
LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;