diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-10-26 11:36:37 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-10-29 17:47:05 -0400 |
commit | 556d4f8b0fabeaac4958c8f6d319367c1a10b543 (patch) | |
tree | 7b44c089d9d87d078a27d366e8df97af15da9689 | |
parent | cbb76539c47068f8836ed05283763e687cf126a7 (diff) | |
download | mongo-556d4f8b0fabeaac4958c8f6d319367c1a10b543.tar.gz |
SERVER-37685 Ensure free monitoring queue preserves FIFO on messages with same deadline
(cherry picked from commit eb2157a3d7c6b02d7d8f873afafd630f3d9aed4f)
-rw-r--r-- | src/mongo/db/free_mon/free_mon_message.h | 18 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.h | 14 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue_test.cpp | 22 |
4 files changed, 56 insertions, 1 deletions
diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h index 197cea4d300..f5857c973a4 100644 --- a/src/mongo/db/free_mon/free_mon_message.h +++ b/src/mongo/db/free_mon/free_mon_message.h @@ -180,6 +180,20 @@ public: return _deadline; } + /** + * Get the unique message id for FIFO ordering messages with the same deadline. + */ + uint64_t getId() const { + return _id; + } + + /** + * Set the unique message id. + */ + void setId(uint64_t id) { + _id = id; + } + public: FreeMonMessage(FreeMonMessageType type, Date_t deadline) : _type(type), _deadline(deadline) {} @@ -189,6 +203,10 @@ private: // Deadline for when to process message Date_t _deadline; + + // Process-wide unique message id to ensure messages with the same deadlines are processed in + // FIFO order. + uint64_t _id{0}; }; diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp index ba67d5bfe16..44bdc1290e6 100644 --- a/src/mongo/db/free_mon/free_mon_queue.cpp +++ b/src/mongo/db/free_mon/free_mon_queue.cpp @@ -82,6 +82,9 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { return; } + ++_counter; + msg->setId(_counter); + if (msg->getType() == FreeMonMessageType::MetricsSend) { _queue.eraseByType(FreeMonMessageType::MetricsSend); } diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h index c36636b4c96..d8f90d32b60 100644 --- a/src/mongo/db/free_mon/free_mon_queue.h +++ b/src/mongo/db/free_mon/free_mon_queue.h @@ -49,7 +49,15 @@ namespace mongo { struct FreeMonMessageGreater { bool operator()(const std::shared_ptr<FreeMonMessage>& left, const std::shared_ptr<FreeMonMessage>& right) const { - return (left->getDeadline() > right->getDeadline()); + if (left->getDeadline() > right->getDeadline()) { + return true; + } + + if (left->getDeadline() == right->getDeadline()) { + return left->getId() > right->getId(); + } + + return false; } }; @@ -137,6 +145,10 @@ private: // Use manual crank to process messages in-order instead of based on deadlines. bool _useCrank{false}; + // Stamp each message with a unique counter. This ensures that if two messages are queued with + // the same deadline, FIFO is achieved. + uint64_t _counter; + // Number of messages to ignore size_t _countMessagesToIgnore{0}; diff --git a/src/mongo/db/free_mon/free_mon_queue_test.cpp b/src/mongo/db/free_mon/free_mon_queue_test.cpp index e41b646e8df..3f181f3c720 100644 --- a/src/mongo/db/free_mon/free_mon_queue_test.cpp +++ b/src/mongo/db/free_mon/free_mon_queue_test.cpp @@ -115,6 +115,28 @@ TEST_F(FreeMonQueueTest, TestDeadlinePriority) { ASSERT(item->getType() == FreeMonMessageType::RegisterServer); } +// Positive: Ensure deadlines sort properly when they have the same deadlines +TEST_F(FreeMonQueueTest, TestFIFO) { + FreeMonMessageQueue queue; + + queue.enqueue( + FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, Date_t::min())); + queue.enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::AsyncRegisterComplete, + Date_t::min())); + queue.enqueue( + FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterCommand, Date_t::min())); + + auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get(); + ASSERT(item->getType() == FreeMonMessageType::RegisterServer); + + item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get(); + ASSERT(item->getType() == FreeMonMessageType::AsyncRegisterComplete); + + item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get(); + ASSERT(item->getType() == FreeMonMessageType::RegisterCommand); +} + + // Positive: Test Queue Stop TEST_F(FreeMonQueueTest, TestQueueStop) { FreeMonMessageQueue queue; |