diff options
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.h | 41 |
2 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp index ea479ede84d..0715695f6b6 100644 --- a/src/mongo/db/free_mon/free_mon_queue.cpp +++ b/src/mongo/db/free_mon/free_mon_queue.cpp @@ -37,6 +37,38 @@ namespace mongo { +std::shared_ptr<FreeMonMessage> FreeMonPriorityQueue::top() const { + return _vector.front(); +} + +void FreeMonPriorityQueue::pop() { + std::pop_heap(_vector.begin(), _vector.end(), _comp); + _vector.pop_back(); +} + +void FreeMonPriorityQueue::push(std::shared_ptr<FreeMonMessage> item) { + _vector.push_back(item); + std::push_heap(_vector.begin(), _vector.end(), _comp); +} + +void FreeMonPriorityQueue::eraseByType(FreeMonMessageType type) { + + while (true) { + auto it = std::find_if(_vector.begin(), _vector.end(), [type](const auto& item) { + return item->getType() == type; + }); + + if (it == _vector.end()) { + break; + } + + _vector.erase(it); + } + + std::make_heap(_vector.begin(), _vector.end(), _comp); +} + + FreeMonMessage::~FreeMonMessage() {} void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { @@ -48,7 +80,11 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { return; } - _queue.emplace(msg); + if (msg->getType() == FreeMonMessageType::MetricsSend) { + _queue.eraseByType(FreeMonMessageType::MetricsSend); + } + + _queue.push(msg); // Signal the dequeue _condvar.notify_one(); diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h index d91aeb9e157..4b0909bcb4e 100644 --- a/src/mongo/db/free_mon/free_mon_queue.h +++ b/src/mongo/db/free_mon/free_mon_queue.h @@ -52,6 +52,41 @@ struct FreeMonMessageGreater { }; /** + * Priority Queue with ability to remove items by filter. + */ +class FreeMonPriorityQueue { +public: + bool empty() const { + return _vector.empty(); + } + + /** + * Return the message at the top of the priority queue. + */ + std::shared_ptr<FreeMonMessage> top() const; + + /** + * Pop the message at the top of the priority queue. + */ + void pop(); + + /** + * Push a message into the priority queue. + */ + void push(std::shared_ptr<FreeMonMessage> item); + + /** + * Erase messages of a given type from the queue. + */ + void eraseByType(FreeMonMessageType type); + +private: + // Using shared_ptr because std::pop_heap does not support move-only types + std::vector<std::shared_ptr<FreeMonMessage>> _vector; + FreeMonMessageGreater _comp; +}; + +/** * A multi-producer, single-consumer queue with deadlines. * * The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism. @@ -95,11 +130,7 @@ private: bool _stop{false}; // Priority queue of messages with shortest deadline first - // Using shared_ptr because priority_queue does not support move-only types - std::priority_queue<std::shared_ptr<FreeMonMessage>, - std::vector<std::shared_ptr<FreeMonMessage>>, - FreeMonMessageGreater> - _queue; + FreeMonPriorityQueue _queue; // Use manual crank to process messages in-order instead of based on deadlines. bool _useCrank{false}; |