diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-10 13:29:16 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-10 13:29:16 -0400 |
commit | 2dc3299c530e41f05c032e8eb42a5b3b8a14f93e (patch) | |
tree | de72c1be1a94ec600566660fe7f330ade1ad58f9 /src/mongo | |
parent | 84762192d91d5f892fe88f2b060a3f2030ab237c (diff) | |
download | mongo-2dc3299c530e41f05c032e8eb42a5b3b8a14f93e.tar.gz |
SERVER-34225 Implement FreeMonMessageQueue
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/free_mon/SConscript | 41 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_message.h | 129 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.cpp | 116 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue.h | 99 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_queue_test.cpp | 149 |
6 files changed, 535 insertions, 0 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9470b8ac509..a2c4a480407 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -20,6 +20,7 @@ env.SConscript( 'commands', 'concurrency', 'exec', + 'free_mon', 'fts', 'ftdc', 'geo', diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript new file mode 100644 index 00000000000..0064e28495d --- /dev/null +++ b/src/mongo/db/free_mon/SConscript @@ -0,0 +1,41 @@ +# -*- mode: python -*- +Import("env") +Import("get_option") + +env = env.Clone() + +fmEnv = env.Clone() +fmEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) + +fmEnv.Library( + target='free_mon', + source=[ + 'free_mon_queue.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/dbhelpers', + '$BUILD_DIR/mongo/db/ftdc/ftdc', + '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/third_party/shim_snappy', + ], +) + +env.CppUnitTest( + target='free_mon_test', + source=[ + 'free_mon_queue_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/replmocks', + '$BUILD_DIR/mongo/db/repl/storage_interface_impl', + '$BUILD_DIR/mongo/db/serveronly', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', + '$BUILD_DIR/mongo/db/service_context_d', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/concurrency', + '$BUILD_DIR/mongo/util/clock_source_mock', + 'free_mon', + ], +) diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h new file mode 100644 index 00000000000..e2b4e727463 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_message.h @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <condition_variable> +#include <vector> + +#include "mongo/util/duration.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * Message types for free monitoring. + * + * Some are generated internally by FreeMonProcessor to handle async HTTP requests. + */ +enum class FreeMonMessageType { + /** + * Register server from command-line/config. + */ + RegisterServer, + + /** + * Register server from server command. + */ + RegisterCommand, + + /** + * Internal: Generated when an async HTTP request completes succesfully. + */ + AsyncRegisterComplete, + + /** + * Internal: Generated when an async HTTP request completes with an error. + */ + AsyncRegisterFail, + + // TODO - add metrics messages + // MetricsCollect - Cloud wants the "wait" time to calculated when the message processing + // starts, not ends + // AsyncMetricsComplete, + // AsyncMetricsFail, + + // TODO - add replication messages + // OnPrimary, + // OpObserver, +}; + +/** + * Message class that encapsulate a message to the FreeMonMessageProcessor + * + * Has a type and a deadline for when to start processing the message. + */ +class FreeMonMessage { +public: + virtual ~FreeMonMessage(); + + /** + * Create a message that should processed immediately. + */ + static std::shared_ptr<FreeMonMessage> createNow(FreeMonMessageType type) { + return std::make_shared<FreeMonMessage>(type, Date_t::min()); + } + + /** + * Create a message that should processed after the specified deadline. + */ + static std::shared_ptr<FreeMonMessage> createWithDeadline(FreeMonMessageType type, + Date_t deadline) { + return std::make_shared<FreeMonMessage>(type, deadline); + } + + FreeMonMessage(const FreeMonMessage&) = delete; + FreeMonMessage(FreeMonMessage&&) = default; + + /** + * Get the type of message. + */ + FreeMonMessageType getType() const { + return _type; + } + + /** + * Get the deadline for the message. + */ + Date_t getDeadline() const { + return _deadline; + } + +public: + FreeMonMessage(FreeMonMessageType type, Date_t deadline) : _type(type), _deadline(deadline) {} + +private: + // Type of message + FreeMonMessageType _type; + + // Deadline for when to process message + Date_t _deadline; +}; + + +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp new file mode 100644 index 00000000000..c8a0b74aa87 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_queue.cpp @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/free_mon/free_mon_queue.h" + +#include <chrono> + +#include "mongo/util/duration.h" + +namespace mongo { + +FreeMonMessage::~FreeMonMessage() {} + +void FreeMonMessageQueue::enqueue(std::shared_ptr<FreeMonMessage> msg) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // If we were stopped, drop messages + if (_stop) { + return; + } + + _queue.emplace(msg); + + // Signal the dequeue + _condvar.notify_one(); + } +} + +boost::optional<std::shared_ptr<FreeMonMessage>> FreeMonMessageQueue::dequeue( + ClockSource* clockSource) { + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_stop) { + return {}; + } + + Date_t deadlineCV = Date_t::max(); + if (!_queue.empty()) { + deadlineCV = _queue.top()->getDeadline(); + } else { + deadlineCV = clockSource->now() + Hours(24); + } + + _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() { + if (_stop) { + return true; + } + + if (this->_queue.empty()) { + return false; + } + + auto deadlineMessage = this->_queue.top()->getDeadline(); + if (deadlineMessage == Date_t::min()) { + return true; + } + + auto now = clockSource->now(); + + bool check = deadlineMessage < now; + return check; + }); + + if (_stop || _queue.empty()) { + return {}; + } + + auto item = _queue.top(); + _queue.pop(); + return item; + } +} + +void FreeMonMessageQueue::stop() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // We can be stopped twice in some situations: + // 1. Stop on unexpected error + // 2. Stop on clean shutdown + if (_stop == false) { + _stop = true; + _condvar.notify_one(); + } + } +} + +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h new file mode 100644 index 00000000000..687e61c13bd --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_queue.h @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <memory> +#include <mutex> +#include <queue> +#include <vector> + +#include "mongo/db/free_mon/free_mon_message.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * Comparator for FreeMonMessage that will sort smallest deadlines at the beginning of a priority + * queue. The std::priority_queue is a max-heap. + */ +struct FreeMonMessageGreater { + bool operator()(const std::shared_ptr<FreeMonMessage>& left, + const std::shared_ptr<FreeMonMessage>& right) const { + return (left->getDeadline() > right->getDeadline()); + } +}; + +/** + * A multi-producer, single-consumer queue with deadlines. + * + * The smallest deadline sorts first. Messages with deadlines can be use as a timer mechanism. + */ +class FreeMonMessageQueue { +public: + /** + * Enqueue a message and wake consumer if needed. + * + * Messages are dropped if the queue has been stopped. + */ + void enqueue(std::shared_ptr<FreeMonMessage> msg); + + /** + * Deque a message from the queue. + * + * Waits for a message to arrive. Returns boost::none if the queue has been stopped. + */ + boost::optional<std::shared_ptr<FreeMonMessage>> dequeue(ClockSource* clockSource); + + /** + * Stop the queue. + */ + void stop(); + +private: + // Condition variable to signal consumer + stdx::condition_variable _condvar; + + // Lock for condition variable and to protect state + stdx::mutex _mutex; + + // Indicates whether queue has been stopped. + bool _stop{false}; + + // Priority queue of messages with shortest deadline first + // Using shared_ptr because priority_queue does not support move-only types + std::priority_queue<std::shared_ptr<FreeMonMessage>, + std::vector<std::shared_ptr<FreeMonMessage>>, + FreeMonMessageGreater> + _queue; +}; + + +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_queue_test.cpp b/src/mongo/db/free_mon/free_mon_queue_test.cpp new file mode 100644 index 00000000000..b7dd61887f0 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_queue_test.cpp @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/free_mon/free_mon_message.h" +#include "mongo/db/free_mon/free_mon_queue.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/unittest/barrier.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace { + +class FreeMonQueueTest : public ServiceContextMongoDTest { +private: + void setUp() final; + void tearDown() final; + +protected: + ServiceContext::UniqueOperationContext _opCtx; + + executor::NetworkInterfaceMock* _mockNetwork{nullptr}; + + std::unique_ptr<executor::ThreadPoolTaskExecutor> _mockThreadPool; +}; + +void FreeMonQueueTest::setUp() { + ServiceContextMongoDTest::setUp(); + + // Set up a NetworkInterfaceMock. Note, unlike NetworkInterfaceASIO, which has its own pool of + // threads, tasks in the NetworkInterfaceMock must be carried out synchronously by the (single) + // thread the unit test is running on. + auto netForFixedTaskExecutor = std::make_unique<executor::NetworkInterfaceMock>(); + _mockNetwork = netForFixedTaskExecutor.get(); + + // Set up a ThreadPoolTaskExecutor. Note, for local tasks this TaskExecutor uses a + // ThreadPoolMock, and for remote tasks it uses the NetworkInterfaceMock created above. However, + // note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is + // again just the (single) thread the unit test is running on. Therefore, all tasks, local and + // remote, must be carried out synchronously by the test thread. + _mockThreadPool = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor)); + + _mockThreadPool->startup(); + + _opCtx = cc().makeOperationContext(); +} + +void FreeMonQueueTest::tearDown() { + _opCtx = {}; + + ServiceContextMongoDTest::tearDown(); +} + +// Postive: Can we enqueue and dequeue one item +TEST_F(FreeMonQueueTest, TestBasic) { + FreeMonMessageQueue queue; + + queue.enqueue(FreeMonMessage::createNow(FreeMonMessageType::RegisterServer)); + + auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()); + + ASSERT(item.get()->getType() == FreeMonMessageType::RegisterServer); +} + +Date_t fromNow(int millis) { + return getGlobalServiceContext()->getPreciseClockSource()->now() + Milliseconds(millis); +} + +// Positive: Ensure deadlines sort properly +TEST_F(FreeMonQueueTest, TestDeadlinePriority) { + FreeMonMessageQueue queue; + + queue.enqueue( + FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, fromNow(5000))); + queue.enqueue( + FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterCommand, fromNow(50))); + + auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get(); + ASSERT(item->getType() == FreeMonMessageType::RegisterCommand); + + item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()).get(); + ASSERT(item->getType() == FreeMonMessageType::RegisterServer); +} + +// Positive: Test Queue Stop +TEST_F(FreeMonQueueTest, TestQueueStop) { + FreeMonMessageQueue queue; + + queue.enqueue( + FreeMonMessage::createWithDeadline(FreeMonMessageType::RegisterServer, fromNow(50000))); + + unittest::Barrier barrier(2); + + auto swSchedule = + _mockThreadPool->scheduleWork([&](const executor::TaskExecutor::CallbackArgs& cbArgs) { + + barrier.countDownAndWait(); + + // Try to dequeue from a stopped task queue + auto item = queue.dequeue(_opCtx.get()->getServiceContext()->getPreciseClockSource()); + ASSERT_FALSE(item.is_initialized()); + + }); + + ASSERT_OK(swSchedule.getStatus()); + + // Stop the queue + queue.stop(); + + // Let our worker thread proceed + barrier.countDownAndWait(); + + _mockThreadPool->shutdown(); + _mockThreadPool->join(); +} + +} // namespace +} // namespace mongo |