summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-10-26 11:36:37 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-10-26 11:36:37 -0400
commiteb2157a3d7c6b02d7d8f873afafd630f3d9aed4f (patch)
treed82e14ca27cd428979adcf66bffb02daf0d5f2de
parenta15cf62de1d5de973160ec1a23609aaf6f0f500f (diff)
downloadmongo-eb2157a3d7c6b02d7d8f873afafd630f3d9aed4f.tar.gz
SERVER-37685 Ensure free monitoring queue preserves FIFO on messages with same deadline
-rw-r--r--src/mongo/db/free_mon/free_mon_message.h18
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.cpp3
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.h14
-rw-r--r--src/mongo/db/free_mon/free_mon_queue_test.cpp22
4 files changed, 56 insertions, 1 deletions
diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h
index 197cea4d300..f5857c973a4 100644
--- a/src/mongo/db/free_mon/free_mon_message.h
+++ b/src/mongo/db/free_mon/free_mon_message.h
@@ -180,6 +180,20 @@ public:
return _deadline;
}
+ /**
+ * Get the unique message id for FIFO ordering messages with the same deadline.
+ */
+ uint64_t getId() const {
+ return _id;
+ }
+
+ /**
+ * Set the unique message id.
+ */
+ void setId(uint64_t id) {
+ _id = id;
+ }
+
public:
FreeMonMessage(FreeMonMessageType type, Date_t deadline) : _type(type), _deadline(deadline) {}
@@ -189,6 +203,10 @@ private:
// Deadline for when to process message
Date_t _deadline;
+
+ // Process-wide unique message id to ensure messages with the same deadlines are processed in
+ // FIFO order.
+ uint64_t _id{0};
};
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp
index ba67d5bfe16..44bdc1290e6 100644
--- a/src/mongo/db/free_mon/free_mon_queue.cpp
+++ b/src/mongo/db/free_mon/free_mon_queue.cpp
@@ -82,6 +82,9 @@ void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
return;
}
+ ++_counter;
+ msg->setId(_counter);
+
if (msg->getType() == FreeMonMessageType::MetricsSend) {
_queue.eraseByType(FreeMonMessageType::MetricsSend);
}
diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h
index c36636b4c96..d8f90d32b60 100644
--- a/src/mongo/db/free_mon/free_mon_queue.h
+++ b/src/mongo/db/free_mon/free_mon_queue.h
@@ -49,7 +49,15 @@ namespace mongo {
struct FreeMonMessageGreater {
bool operator()(const std::shared_ptr<FreeMonMessage>& left,
const std::shared_ptr<FreeMonMessage>& right) const {
- return (left->getDeadline() > right->getDeadline());
+ if (left->getDeadline() > right->getDeadline()) {
+ return true;
+ }
+
+ if (left->getDeadline() == right->getDeadline()) {
+ return left->getId() > right->getId();
+ }
+
+ return false;
}
};
@@ -137,6 +145,10 @@ private:
// Use manual crank to process messages in-order instead of based on deadlines.
bool _useCrank{false};
+ // Stamp each message with a unique counter. This ensures that if two messages are queued with
+ // the same deadline, FIFO is achieved.
+ uint64_t _counter;
+
// Number of messages to ignore
size_t _countMessagesToIgnore{0};
diff --git a/src/mongo/db/free_mon/free_mon_queue_test.cpp b/src/mongo/db/free_mon/free_mon_queue_test.cpp
index e41b646e8df..3f181f3c720 100644
--- a/src/mongo/db/free_mon/free_mon_queue_test.cpp
+++ b/src/mongo/db/free_mon/free_mon_queue_test.cpp
@@ -115,6 +115,28 @@ TEST_F(FreeMonQueueTest, TestDeadlinePriority) {
ASSERT(item->getType() == FreeMonMessageType::RegisterServer);
}
+// Positive: Ensure deadlines sort properly when they have the same deadlines
+TEST_F(FreeMonQueueTest, TestFIFO) {
+ FreeMonMessageQueue queue;
+
+ queue.enqueue(
+ FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, Date_t::min()));
+ queue.enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::AsyncRegisterComplete,
+ Date_t::min()));
+ queue.enqueue(
+ FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterCommand, Date_t::min()));
+
+ auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get();
+ ASSERT(item->getType() == FreeMonMessageType::RegisterServer);
+
+ item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get();
+ ASSERT(item->getType() == FreeMonMessageType::AsyncRegisterComplete);
+
+ item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get();
+ ASSERT(item->getType() == FreeMonMessageType::RegisterCommand);
+}
+
+
// Positive: Test Queue Stop
TEST_F(FreeMonQueueTest, TestQueueStop) {
FreeMonMessageQueue queue;