diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-06 17:23:10 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-06 17:23:25 -0400 |
commit | b66c0d34088dae2a01a42c936396fc7a8f750201 (patch) | |
tree | 39713198ac9a80071eaf49a29424e02969bd130c /src/mongo/db | |
parent | 83383eb160f904c699b399ac59ccbbf103ad6102 (diff) | |
download | mongo-b66c0d34088dae2a01a42c936396fc7a8f750201.tar.gz |
SERVER-39950 make OplogApplier::getNextApplierBatch() shutdown-aware
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 17 |
2 files changed, 38 insertions, 6 deletions
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index da1544e4b53..da6370425df 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -105,6 +105,14 @@ Future<void> OplogApplier::startup() { void OplogApplier::shutdown() { _shutdown(); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + _inShutdown = true; +} + +bool OplogApplier::inShutdown() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _inShutdown; } /** @@ -230,9 +238,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( if (mustProcessStandalone(entry)) { if (ops.empty()) { ops.push_back(std::move(entry)); - BSONObj opToPopAndDiscard; - invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard)); - dassert(ops.back() == OplogEntry(opToPopAndDiscard)); + _consume(opCtx, _oplogBuffer); } // Otherwise, apply what we have so far and come back for this entry. @@ -252,9 +258,7 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( // Add op to buffer. totalBytes += entry.getRawObjSizeBytes(); ops.push_back(std::move(entry)); - BSONObj opToPopAndDiscard; - invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard)); - dassert(ops.back() == OplogEntry(opToPopAndDiscard)); + _consume(opCtx, _oplogBuffer); } return std::move(ops); } @@ -266,5 +270,16 @@ StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations return lastApplied; } +void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { + // This is just to get the op off the queue; it's been peeked at and queued for application + // already. + // If we failed to get an op off the queue, this means that shutdown() was called between the + // consumer's calls to peek() and consume(). shutdown() cleared the buffer so there is nothing + // for us to consume here. Since our postcondition is already met, it is safe to return + // successfully. + BSONObj opToPopAndDiscard; + invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown()); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 290043c8e80..49477ac03ec 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -39,6 +39,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/executor/task_executor.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" @@ -153,6 +154,11 @@ public: void shutdown(); /** + * Returns true if we are shutting down. + */ + bool inShutdown() const; + + /** * Pushes operations read into oplog buffer. * Accepts both Operations (OplogEntry) and OplogBuffer::Batch (BSONObj) iterators. * This supports current implementations of OplogFetcher and OplogBuffer which work in terms of @@ -194,6 +200,11 @@ public: private: /** + * Pops the operation at the front of the OplogBuffer. + */ + void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer); + + /** * Called from startup() to run oplog application loop. * Currently applicable to steady state replication only. * Implemented in subclasses but not visible otherwise. @@ -222,6 +233,12 @@ private: // Not owned by us. Observer* const _observer; + + // Protects member data of OplogApplier. + mutable stdx::mutex _mutex; + + // Set to true if shutdown() has been called. + bool _inShutdown = false; }; /** |