summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-06 17:23:10 -0400
committerBenety Goh <benety@mongodb.com>2019-04-06 17:23:25 -0400
commitb66c0d34088dae2a01a42c936396fc7a8f750201 (patch)
tree39713198ac9a80071eaf49a29424e02969bd130c /src/mongo/db
parent83383eb160f904c699b399ac59ccbbf103ad6102 (diff)
downloadmongo-b66c0d34088dae2a01a42c936396fc7a8f750201.tar.gz
SERVER-39950 make OplogApplier::getNextApplierBatch() shutdown-aware
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp27
-rw-r--r--src/mongo/db/repl/oplog_applier.h17
2 files changed, 38 insertions, 6 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index da1544e4b53..da6370425df 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -105,6 +105,14 @@ Future<void> OplogApplier::startup() {
void OplogApplier::shutdown() {
_shutdown();
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _inShutdown = true;
+}
+
+bool OplogApplier::inShutdown() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _inShutdown;
}
/**
@@ -230,9 +238,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
if (mustProcessStandalone(entry)) {
if (ops.empty()) {
ops.push_back(std::move(entry));
- BSONObj opToPopAndDiscard;
- invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard));
- dassert(ops.back() == OplogEntry(opToPopAndDiscard));
+ _consume(opCtx, _oplogBuffer);
}
// Otherwise, apply what we have so far and come back for this entry.
@@ -252,9 +258,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
// Add op to buffer.
totalBytes += entry.getRawObjSizeBytes();
ops.push_back(std::move(entry));
- BSONObj opToPopAndDiscard;
- invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard));
- dassert(ops.back() == OplogEntry(opToPopAndDiscard));
+ _consume(opCtx, _oplogBuffer);
}
return std::move(ops);
}
@@ -266,5 +270,16 @@ StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations
return lastApplied;
}
+void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
+ // This is just to get the op off the queue; it's been peeked at and queued for application
+ // already.
+ // If we failed to get an op off the queue, this means that shutdown() was called between the
+ // consumer's calls to peek() and consume(). shutdown() cleared the buffer so there is nothing
+ // for us to consume here. Since our postcondition is already met, it is safe to return
+ // successfully.
+ BSONObj opToPopAndDiscard;
+ invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown());
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 290043c8e80..49477ac03ec 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -39,6 +39,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
@@ -153,6 +154,11 @@ public:
void shutdown();
/**
+ * Returns true if we are shutting down.
+ */
+ bool inShutdown() const;
+
+ /**
* Pushes operations read into oplog buffer.
* Accepts both Operations (OplogEntry) and OplogBuffer::Batch (BSONObj) iterators.
* This supports current implementations of OplogFetcher and OplogBuffer which work in terms of
@@ -194,6 +200,11 @@ public:
private:
/**
+ * Pops the operation at the front of the OplogBuffer.
+ */
+ void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer);
+
+ /**
* Called from startup() to run oplog application loop.
* Currently applicable to steady state replication only.
* Implemented in subclasses but not visible otherwise.
@@ -222,6 +233,12 @@ private:
// Not owned by us.
Observer* const _observer;
+
+ // Protects member data of OplogApplier.
+ mutable stdx::mutex _mutex;
+
+ // Set to true if shutdown() has been called.
+ bool _inShutdown = false;
};
/**