summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp26
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h2
2 files changed, 26 insertions, 2 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index 72eb401ddb5..c0c91ea3694 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -45,7 +45,9 @@ size_t getDocumentSize(const BSONObj& o) {
} // namespace
-OplogBufferBlockingQueue::OplogBufferBlockingQueue() : _queue(kOplogBufferSize, &getDocumentSize) {}
+OplogBufferBlockingQueue::OplogBufferBlockingQueue() : OplogBufferBlockingQueue(nullptr) {}
+OplogBufferBlockingQueue::OplogBufferBlockingQueue(Counters* counters)
+ : _counters(counters), _queue(kOplogBufferSize, &getDocumentSize) {}
void OplogBufferBlockingQueue::startup(OperationContext*) {}
@@ -55,16 +57,27 @@ void OplogBufferBlockingQueue::shutdown(OperationContext* opCtx) {
void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) {
_queue.pushEvenIfFull(value);
+ if (_counters) {
+ _counters->increment(value);
+ }
}
void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) {
_queue.push(value);
+ if (_counters) {
+ _counters->increment(value);
+ }
}
void OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*,
Batch::const_iterator begin,
Batch::const_iterator end) {
_queue.pushAllNonBlocking(begin, end);
+ if (_counters) {
+ for (auto i = begin; i != end; ++i) {
+ _counters->increment(*i);
+ }
+ }
}
void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) {
@@ -89,10 +102,19 @@ std::size_t OplogBufferBlockingQueue::getCount() const {
void OplogBufferBlockingQueue::clear(OperationContext*) {
_queue.clear();
+ if (_counters) {
+ _counters->clear();
+ }
}
bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
- return _queue.tryPop(*value);
+ if (!_queue.tryPop(*value)) {
+ return false;
+ }
+ if (_counters) {
+ _counters->decrement(*value);
+ }
+ return true;
}
bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) {
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 68c74779b0e..1bd0fe5278c 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -40,6 +40,7 @@ namespace repl {
class OplogBufferBlockingQueue final : public OplogBuffer {
public:
OplogBufferBlockingQueue();
+ explicit OplogBufferBlockingQueue(Counters* counters);
void startup(OperationContext* opCtx) override;
void shutdown(OperationContext* opCtx) override;
@@ -60,6 +61,7 @@ public:
boost::optional<Value> lastObjectPushed(OperationContext* opCtx) const override;
private:
+ Counters* const _counters;
BlockingQueue<BSONObj> _queue;
};