summaryrefslogtreecommitdiff
path: root/src/mongo/db/free_mon
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-06-18 19:45:08 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-06-18 21:44:20 -0400
commit28f341edb7ca112f80d2c6bf0bd23d17b4ed330a (patch)
treeaa1af792e1a93f2a63a52ca143edc67262ab4f49 /src/mongo/db/free_mon
parent770e0ab727a053a69873fecc55844ff5b7e6d5db (diff)
downloadmongo-28f341edb7ca112f80d2c6bf0bd23d17b4ed330a.tar.gz
SERVER-35652 Duplicate registrations should only trigger one metrics upload stream
(cherry picked from commit 99d48121c5bb18168eb08b20201065170a1d7213)
Diffstat (limited to 'src/mongo/db/free_mon')
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.cpp38
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.h41
2 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp
index ea479ede84d..0715695f6b6 100644
--- a/src/mongo/db/free_mon/free_mon_queue.cpp
+++ b/src/mongo/db/free_mon/free_mon_queue.cpp
@@ -37,6 +37,38 @@
namespace mongo {
+std::shared_ptr<FreeMonMessage> FreeMonPriorityQueue::top() const {
+ return _vector.front();
+}
+
+void FreeMonPriorityQueue::pop() {
+ std::pop_heap(_vector.begin(), _vector.end(), _comp);
+ _vector.pop_back();
+}
+
+void FreeMonPriorityQueue::push(std::shared_ptr<FreeMonMessage> item) {
+ _vector.push_back(item);
+ std::push_heap(_vector.begin(), _vector.end(), _comp);
+}
+
+void FreeMonPriorityQueue::eraseByType(FreeMonMessageType type) {
+
+ while (true) {
+ auto it = std::find_if(_vector.begin(), _vector.end(), [type](const auto& item) {
+ return item->getType() == type;
+ });
+
+ if (it == _vector.end()) {
+ break;
+ }
+
+ _vector.erase(it);
+ }
+
+ std::make_heap(_vector.begin(), _vector.end(), _comp);
+}
+
+
FreeMonMessage::~FreeMonMessage() {}
void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
@@ -48,7 +80,11 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
return;
}
- _queue.emplace(msg);
+ if (msg->getType() == FreeMonMessageType::MetricsSend) {
+ _queue.eraseByType(FreeMonMessageType::MetricsSend);
+ }
+
+ _queue.push(msg);
// Signal the dequeue
_condvar.notify_one();
diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h
index d91aeb9e157..4b0909bcb4e 100644
--- a/src/mongo/db/free_mon/free_mon_queue.h
+++ b/src/mongo/db/free_mon/free_mon_queue.h
@@ -52,6 +52,41 @@ struct FreeMonMessageGreater {
};
/**
+ * Priority Queue with ability to remove items by filter.
+ */
+class FreeMonPriorityQueue {
+public:
+ bool empty() const {
+ return _vector.empty();
+ }
+
+ /**
+ * Return the message at the top of the priority queue.
+ */
+ std::shared_ptr<FreeMonMessage> top() const;
+
+ /**
+ * Pop the message at the top of the priority queue.
+ */
+ void pop();
+
+ /**
+ * Push a message into the priority queue.
+ */
+ void push(std::shared_ptr<FreeMonMessage> item);
+
+ /**
+ * Erase messages of a given type from the queue.
+ */
+ void eraseByType(FreeMonMessageType type);
+
+private:
+ // Using shared_ptr because std::pop_heap does not support move-only types
+ std::vector<std::shared_ptr<FreeMonMessage>> _vector;
+ FreeMonMessageGreater _comp;
+};
+
+/**
* A multi-producer, single-consumer queue with deadlines.
*
* The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism.
@@ -95,11 +130,7 @@ private:
bool _stop{false};
// Priority queue of messages with shortest deadline first
- // Using shared_ptr because priority_queue does not support move-only types
- std::priority_queue<std::shared_ptr<FreeMonMessage>,
- std::vector<std::shared_ptr<FreeMonMessage>>,
- FreeMonMessageGreater>
- _queue;
+ FreeMonPriorityQueue _queue;
// Use manual crank to process messages in-order instead of based on deadlines.
bool _useCrank{false};