diff options
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_blocking_queue.h | 2 |
2 files changed, 26 insertions, 2 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index 72eb401ddb5..c0c91ea3694 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -45,7 +45,9 @@ size_t getDocumentSize(const BSONObj& o) { } // namespace -OplogBufferBlockingQueue::OplogBufferBlockingQueue() : _queue(kOplogBufferSize, &getDocumentSize) {} +OplogBufferBlockingQueue::OplogBufferBlockingQueue() : OplogBufferBlockingQueue(nullptr) {} +OplogBufferBlockingQueue::OplogBufferBlockingQueue(Counters* counters) + : _counters(counters), _queue(kOplogBufferSize, &getDocumentSize) {} void OplogBufferBlockingQueue::startup(OperationContext*) {} @@ -55,16 +57,27 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) { void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) { _queue.pushEvenIfFull(value); + if (_counters) { + _counters->increment(value); + } } void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) { _queue.push(value); + if (_counters) { + _counters->increment(value); + } } void OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*, Batch::const_iterator begin, Batch::const_iterator end) { _queue.pushAllNonBlocking(begin, end); + if (_counters) { + for (auto i = begin; i != end; ++i) { + _counters->increment(*i); + } + } } void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) { @@ -89,10 +102,19 @@ std::size_t OplogBufferBlockingQueue::getCount() const { void OplogBufferBlockingQueue::clear(OperationContext*) { _queue.clear(); + if (_counters) { + _counters->clear(); + } } bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) { - return _queue.tryPop(*value); + if (!_queue.tryPop(*value)) { + return false; + } + if (_counters) { + _counters->decrement(*value); + } + return true; } bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) { diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 68c74779b0e..1bd0fe5278c 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -40,6 +40,7 @@ namespace repl { class OplogBufferBlockingQueue final : public OplogBuffer { public: OplogBufferBlockingQueue(); + explicit OplogBufferBlockingQueue(Counters* counters); void startup(OperationContext* opCtx) override; void shutdown(OperationContext* opCtx) override; @@ -60,6 +61,7 @@ public: boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override; private: + Counters* const _counters; BlockingQueue<BSONObj> _queue; }; |