diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-06-18 19:45:08 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-06-18 19:45:08 -0400 |
commit | 99d48121c5bb18168eb08b20201065170a1d7213 (patch) | |
tree | 3b8ae64d939a58d8d0673135964cfa86fd3fb02d /src/mongo/db/free_mon | |
parent | 2fedb546eab17b0e6e6329f72428e5c979b3e624 (diff) | |
download | mongo-99d48121c5bb18168eb08b20201065170a1d7213.tar.gz |
SERVER-35652 Duplicate registrations should only trigger one metrics upload stream
Diffstat (limited to 'src/mongo/db/free_mon')
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.h | 41 |
2 files changed, 73 insertions, 6 deletions
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp index ea479ede84d..0715695f6b6 100644 --- a/src/mongo/db/free_mon/free_mon_queue.cpp +++ b/src/mongo/db/free_mon/free_mon_queue.cpp @@ -37,6 +37,38 @@ namespace mongo { +std::shared_ptr<FreeMonMessage> FreeMonPriorityQueue::top() const { + return _vector.front(); +} + +void FreeMonPriorityQueue::pop() { + std::pop_heap(_vector.begin(), _vector.end(), _comp); + _vector.pop_back(); +} + +void FreeMonPriorityQueue::push(std::shared_ptr<FreeMonMessage> item) { + _vector.push_back(item); + std::push_heap(_vector.begin(), _vector.end(), _comp); +} + +void FreeMonPriorityQueue::eraseByType(FreeMonMessageType type) { + + while (true) { + auto it = std::find_if(_vector.begin(), _vector.end(), [type](const auto& item) { + return item->getType() == type; + }); + + if (it == _vector.end()) { + break; + } + + _vector.erase(it); + } + + std::make_heap(_vector.begin(), _vector.end(), _comp); +} + + FreeMonMessage::~FreeMonMessage() {} void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { @@ -48,7 +80,11 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { return; } - _queue.emplace(msg); + if (msg->getType() == FreeMonMessageType::MetricsSend) { + _queue.eraseByType(FreeMonMessageType::MetricsSend); + } + + _queue.push(msg); // Signal the dequeue _condvar.notify_one(); diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h index d91aeb9e157..4b0909bcb4e 100644 --- a/src/mongo/db/free_mon/free_mon_queue.h +++ b/src/mongo/db/free_mon/free_mon_queue.h @@ -52,6 +52,41 @@ struct FreeMonMessageGreater { }; /** + * Priority Queue with ability to remove items by filter. + */ +class FreeMonPriorityQueue { +public: + bool empty() const { + return _vector.empty(); + } + + /** + * Return the message at the top of the priority queue. + */ + std::shared_ptr<FreeMonMessage> top() const; + + /** + * Pop the message at the top of the priority queue. + */ + void pop(); + + /** + * Push a message into the priority queue. + */ + void push(std::shared_ptr<FreeMonMessage> item); + + /** + * Erase messages of a given type from the queue. + */ + void eraseByType(FreeMonMessageType type); + +private: + // Using shared_ptr because std::pop_heap does not support move-only types + std::vector<std::shared_ptr<FreeMonMessage>> _vector; + FreeMonMessageGreater _comp; +}; + +/** * A multi-producer, single-consumer queue with deadlines. * * The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism. @@ -95,11 +130,7 @@ private: bool _stop{false}; // Priority queue of messages with shortest deadline first - // Using shared_ptr because priority_queue does not support move-only types - std::priority_queue<std::shared_ptr<FreeMonMessage>, - std::vector<std::shared_ptr<FreeMonMessage>>, - FreeMonMessageGreater> - _queue; + FreeMonPriorityQueue _queue; // Use manual crank to process messages in-order instead of based on deadlines. bool _useCrank{false}; |