summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-10 13:29:16 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-10 13:29:16 -0400
commit2dc3299c530e41f05c032e8eb42a5b3b8a14f93e (patch)
treede72c1be1a94ec600566660fe7f330ade1ad58f9 /src/mongo
parent84762192d91d5f892fe88f2b060a3f2030ab237c (diff)
downloadmongo-2dc3299c530e41f05c032e8eb42a5b3b8a14f93e.tar.gz
SERVER-34225 Implement FreeMonMessageQueue
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/free_mon/SConscript41
-rw-r--r--src/mongo/db/free_mon/free_mon_message.h129
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.cpp116
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.h99
-rw-r--r--src/mongo/db/free_mon/free_mon_queue_test.cpp149
6 files changed, 535 insertions, 0 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9470b8ac509..a2c4a480407 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -20,6 +20,7 @@ env.SConscript(
'commands',
'concurrency',
'exec',
+ 'free_mon',
'fts',
'ftdc',
'geo',
diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript
new file mode 100644
index 00000000000..0064e28495d
--- /dev/null
+++ b/src/mongo/db/free_mon/SConscript
@@ -0,0 +1,41 @@
+# -*- mode: python -*-
+Import("env")
+Import("get_option")
+
+env = env.Clone()
+
+fmEnv = env.Clone()
+fmEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
+
+fmEnv.Library(
+ target='free_mon',
+ source=[
+ 'free_mon_queue.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/dbhelpers',
+ '$BUILD_DIR/mongo/db/ftdc/ftdc',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/third_party/shim_snappy',
+ ],
+)
+
+env.CppUnitTest(
+ target='free_mon_test',
+ source=[
+ 'free_mon_queue_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/replmocks',
+ '$BUILD_DIR/mongo/db/repl/storage_interface_impl',
+ '$BUILD_DIR/mongo/db/serveronly',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ '$BUILD_DIR/mongo/db/service_context_d',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/concurrency',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
+ 'free_mon',
+ ],
+)
diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h
new file mode 100644
index 00000000000..e2b4e727463
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_message.h
@@ -0,0 +1,129 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <vector>
+
+#include "mongo/util/duration.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * Message types for free monitoring.
+ *
+ * Some are generated internally by FreeMonProcessor to handle async HTTP requests.
+ */
+enum class FreeMonMessageType {
+ /**
+ * Register server from command-line/config.
+ */
+ RegisterServer,
+
+ /**
+ * Register server from server command.
+ */
+ RegisterCommand,
+
+ /**
+ * Internal: Generated when an async HTTP request completes succesfully.
+ */
+ AsyncRegisterComplete,
+
+ /**
+ * Internal: Generated when an async HTTP request completes with an error.
+ */
+ AsyncRegisterFail,
+
+ // TODO - add metrics messages
+ // MetricsCollect - Cloud wants the "wait" time to calculated when the message processing
+ // starts, not ends
+ // AsyncMetricsComplete,
+ // AsyncMetricsFail,
+
+ // TODO - add replication messages
+ // OnPrimary,
+ // OpObserver,
+};
+
+/**
+ * Message class that encapsulate a message to the FreeMonMessageProcessor
+ *
+ * Has a type and a deadline for when to start processing the message.
+ */
+class FreeMonMessage {
+public:
+ virtual ~FreeMonMessage();
+
+ /**
+ * Create a message that should processed immediately.
+ */
+ static std::shared_ptr<FreeMonMessage> createNow(FreeMonMessageType type) {
+ return std::make_shared<FreeMonMessage>(type, Date_t::min());
+ }
+
+ /**
+ * Create a message that should processed after the specified deadline.
+ */
+ static std::shared_ptr<FreeMonMessage> createWithDeadline(FreeMonMessageType type,
+ Date_t deadline) {
+ return std::make_shared<FreeMonMessage>(type, deadline);
+ }
+
+ FreeMonMessage(const FreeMonMessage&) = delete;
+ FreeMonMessage(FreeMonMessage&&) = default;
+
+ /**
+ * Get the type of message.
+ */
+ FreeMonMessageType getType() const {
+ return _type;
+ }
+
+ /**
+ * Get the deadline for the message.
+ */
+ Date_t getDeadline() const {
+ return _deadline;
+ }
+
+public:
+ FreeMonMessage(FreeMonMessageType type, Date_t deadline) : _type(type), _deadline(deadline) {}
+
+private:
+ // Type of message
+ FreeMonMessageType _type;
+
+ // Deadline for when to process message
+ Date_t _deadline;
+};
+
+
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp
new file mode 100644
index 00000000000..c8a0b74aa87
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_queue.cpp
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/free_mon/free_mon_queue.h"
+
+#include <chrono>
+
+#include "mongo/util/duration.h"
+
+namespace mongo {
+
+FreeMonMessage::~FreeMonMessage() {}
+
+void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // If we were stopped, drop messages
+ if (_stop) {
+ return;
+ }
+
+ _queue.emplace(msg);
+
+ // Signal the dequeue
+ _condvar.notify_one();
+ }
+}
+
+boost::optional<std::shared_ptr<FreeMonMessage>> FreeMonMessageQueue::dequeue(
+ ClockSource* clockSource) {
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_stop) {
+ return {};
+ }
+
+ Date_t deadlineCV = Date_t::max();
+ if (!_queue.empty()) {
+ deadlineCV = _queue.top()->getDeadline();
+ } else {
+ deadlineCV = clockSource->now() + Hours(24);
+ }
+
+ _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() {
+ if (_stop) {
+ return true;
+ }
+
+ if (this->_queue.empty()) {
+ return false;
+ }
+
+ auto deadlineMessage = this->_queue.top()->getDeadline();
+ if (deadlineMessage == Date_t::min()) {
+ return true;
+ }
+
+ auto now = clockSource->now();
+
+ bool check = deadlineMessage < now;
+ return check;
+ });
+
+ if (_stop || _queue.empty()) {
+ return {};
+ }
+
+ auto item = _queue.top();
+ _queue.pop();
+ return item;
+ }
+}
+
+void FreeMonMessageQueue::stop() {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // We can be stopped twice in some situations:
+ // 1. Stop on unexpected error
+ // 2. Stop on clean shutdown
+ if (_stop == false) {
+ _stop = true;
+ _condvar.notify_one();
+ }
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h
new file mode 100644
index 00000000000..687e61c13bd
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_queue.h
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <vector>
+
+#include "mongo/db/free_mon/free_mon_message.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * Comparator for FreeMonMessage that will sort smallest deadlines at the beginning of a priority
+ * queue. The std::priority_queue is a max-heap.
+ */
+struct FreeMonMessageGreater {
+ bool operator()(const std::shared_ptr<FreeMonMessage>& left,
+ const std::shared_ptr<FreeMonMessage>& right) const {
+ return (left->getDeadline() > right->getDeadline());
+ }
+};
+
+/**
+ * A multi-producer, single-consumer queue with deadlines.
+ *
+ * The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism.
+ */
+class FreeMonMessageQueue {
+public:
+ /**
+ * Enqueue a message and wake consumer if needed.
+ *
+ * Messages are dropped if the queue has been stopped.
+ */
+ void enqueue(std::shared_ptr<FreeMonMessage> msg);
+
+ /**
+ * Deque a message from the queue.
+ *
+ * Waits for a message to arrive. Returns boost::none if the queue has been stopped.
+ */
+ boost::optional<std::shared_ptr<FreeMonMessage>> dequeue(ClockSource* clockSource);
+
+ /**
+ * Stop the queue.
+ */
+ void stop();
+
+private:
+ // Condition variable to signal consumer
+ stdx::condition_variable _condvar;
+
+ // Lock for condition variable and to protect state
+ stdx::mutex _mutex;
+
+ // Indicates whether queue has been stopped.
+ bool _stop{false};
+
+ // Priority queue of messages with shortest deadline first
+ // Using shared_ptr because priority_queue does not support move-only types
+ std::priority_queue<std::shared_ptr<FreeMonMessage>,
+ std::vector<std::shared_ptr<FreeMonMessage>>,
+ FreeMonMessageGreater>
+ _queue;
+};
+
+
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_queue_test.cpp b/src/mongo/db/free_mon/free_mon_queue_test.cpp
new file mode 100644
index 00000000000..b7dd61887f0
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_queue_test.cpp
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/free_mon/free_mon_message.h"
+#include "mongo/db/free_mon/free_mon_queue.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace {
+
+class FreeMonQueueTest : public ServiceContextMongoDTest {
+private:
+ void setUp() final;
+ void tearDown() final;
+
+protected:
+ ServiceContext::UniqueOperationContext _opCtx;
+
+ executor::NetworkInterfaceMock* _mockNetwork{nullptr};
+
+ std::unique_ptr<executor::ThreadPoolTaskExecutor> _mockThreadPool;
+};
+
+void FreeMonQueueTest::setUp() {
+ ServiceContextMongoDTest::setUp();
+
+ // Set up a NetworkInterfaceMock. Note, unlike NetworkInterfaceASIO, which has its own pool of
+ // threads, tasks in the NetworkInterfaceMock must be carried out synchronously by the (single)
+ // thread the unit test is running on.
+ auto netForFixedTaskExecutor = std::make_unique<executor::NetworkInterfaceMock>();
+ _mockNetwork = netForFixedTaskExecutor.get();
+
+ // Set up a ThreadPoolTaskExecutor. Note, for local tasks this TaskExecutor uses a
+ // ThreadPoolMock, and for remote tasks it uses the NetworkInterfaceMock created above. However,
+ // note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is
+ // again just the (single) thread the unit test is running on. Therefore, all tasks, local and
+ // remote, must be carried out synchronously by the test thread.
+ _mockThreadPool = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor));
+
+ _mockThreadPool->startup();
+
+ _opCtx = cc().makeOperationContext();
+}
+
+void FreeMonQueueTest::tearDown() {
+ _opCtx = {};
+
+ ServiceContextMongoDTest::tearDown();
+}
+
+// Postive: Can we enqueue and dequeue one item
+TEST_F(FreeMonQueueTest, TestBasic) {
+ FreeMonMessageQueue queue;
+
+ queue.enqueue(FreeMonMessage::createNow(FreeMonMessageType::RegisterServer));
+
+ auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource());
+
+ ASSERT(item.get()->getType() == FreeMonMessageType::RegisterServer);
+}
+
+Date_t fromNow(int millis) {
+ return getGlobalServiceContext()->getPreciseClockSource()->now() + Milliseconds(millis);
+}
+
+// Positive: Ensure deadlines sort properly
+TEST_F(FreeMonQueueTest, TestDeadlinePriority) {
+ FreeMonMessageQueue queue;
+
+ queue.enqueue(
+ FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, fromNow(5000)));
+ queue.enqueue(
+ FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterCommand, fromNow(50)));
+
+ auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get();
+ ASSERT(item->getType() == FreeMonMessageType::RegisterCommand);
+
+ item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get();
+ ASSERT(item->getType() == FreeMonMessageType::RegisterServer);
+}
+
+// Positive: Test Queue Stop
+TEST_F(FreeMonQueueTest, TestQueueStop) {
+ FreeMonMessageQueue queue;
+
+ queue.enqueue(
+ FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, fromNow(50000)));
+
+ unittest::Barrier barrier(2);
+
+ auto swSchedule =
+ _mockThreadPool->scheduleWork([&](const executor::TaskExecutor::CallbackArgs& cbArgs) {
+
+ barrier.countDownAndWait();
+
+ // Try to dequeue from a stopped task queue
+ auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource());
+ ASSERT_FALSE(item.is_initialized());
+
+ });
+
+ ASSERT_OK(swSchedule.getStatus());
+
+ // Stop the queue
+ queue.stop();
+
+ // Let our worker thread proceed
+ barrier.countDownAndWait();
+
+ _mockThreadPool->shutdown();
+ _mockThreadPool->join();
+}
+
+} // namespace
+} // namespace mongo