summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.cpp38
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.h41
2 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp
index ea479ede84d..0715695f6b6 100644
--- a/src/mongo/db/free_mon/free_mon_queue.cpp
+++ b/src/mongo/db/free_mon/free_mon_queue.cpp
@@ -37,6 +37,38 @@
namespace mongo {
+std::shared_ptr<FreeMonMessage> FreeMonPriorityQueue::top() const {
+ return _vector.front();
+}
+
+void FreeMonPriorityQueue::pop() {
+ std::pop_heap(_vector.begin(), _vector.end(), _comp);
+ _vector.pop_back();
+}
+
+void FreeMonPriorityQueue::push(std::shared_ptr<FreeMonMessage> item) {
+ _vector.push_back(item);
+ std::push_heap(_vector.begin(), _vector.end(), _comp);
+}
+
+void FreeMonPriorityQueue::eraseByType(FreeMonMessageType type) {
+
+ while (true) {
+ auto it = std::find_if(_vector.begin(), _vector.end(), [type](const auto& item) {
+ return item->getType() == type;
+ });
+
+ if (it == _vector.end()) {
+ break;
+ }
+
+ _vector.erase(it);
+ }
+
+ std::make_heap(_vector.begin(), _vector.end(), _comp);
+}
+
+
FreeMonMessage::~FreeMonMessage() {}
void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
@@ -48,7 +80,11 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
return;
}
- _queue.emplace(msg);
+ if (msg->getType() == FreeMonMessageType::MetricsSend) {
+ _queue.eraseByType(FreeMonMessageType::MetricsSend);
+ }
+
+ _queue.push(msg);
// Signal the dequeue
_condvar.notify_one();
diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h
index d91aeb9e157..4b0909bcb4e 100644
--- a/src/mongo/db/free_mon/free_mon_queue.h
+++ b/src/mongo/db/free_mon/free_mon_queue.h
@@ -52,6 +52,41 @@ struct FreeMonMessageGreater {
};
/**
+ * Priority Queue with ability to remove items by filter.
+ */
+class FreeMonPriorityQueue {
+public:
+ bool empty() const {
+ return _vector.empty();
+ }
+
+ /**
+ * Return the message at the top of the priority queue.
+ */
+ std::shared_ptr<FreeMonMessage> top() const;
+
+ /**
+ * Pop the message at the top of the priority queue.
+ */
+ void pop();
+
+ /**
+ * Push a message into the priority queue.
+ */
+ void push(std::shared_ptr<FreeMonMessage> item);
+
+ /**
+ * Erase messages of a given type from the queue.
+ */
+ void eraseByType(FreeMonMessageType type);
+
+private:
+ // Using shared_ptr because std::pop_heap does not support move-only types
+ std::vector<std::shared_ptr<FreeMonMessage>> _vector;
+ FreeMonMessageGreater _comp;
+};
+
+/**
* A multi-producer, single-consumer queue with deadlines.
*
* The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism.
@@ -95,11 +130,7 @@ private:
bool _stop{false};
// Priority queue of messages with shortest deadline first
- // Using shared_ptr because priority_queue does not support move-only types
- std::priority_queue<std::shared_ptr<FreeMonMessage>,
- std::vector<std::shared_ptr<FreeMonMessage>>,
- FreeMonMessageGreater>
- _queue;
+ FreeMonPriorityQueue _queue;
// Use manual crank to process messages in-order instead of based on deadlines.
bool _useCrank{false};