summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-12 14:32:28 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-12 14:32:28 -0400
commitc6620182aebd1b62d31879ce4d9456ff197aea22 (patch)
treed5e1c635b7054b0f4ee576d2fb10ff7206aa1b71 /src/mongo
parentec25294c8d0c1c60ff786ea99198749dc4788dd1 (diff)
downloadmongo-c6620182aebd1b62d31879ce4d9456ff197aea22.tar.gz
SERVER-34226 Implement FreeMonController - Registration piece
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/free_mon/SConscript11
-rw-r--r--src/mongo/db/free_mon/free_mon_commands.idl44
-rw-r--r--src/mongo/db/free_mon/free_mon_controller.cpp143
-rw-r--r--src/mongo/db/free_mon/free_mon_controller.h164
-rw-r--r--src/mongo/db/free_mon/free_mon_controller_test.cpp430
-rw-r--r--src/mongo/db/free_mon/free_mon_message.h184
-rw-r--r--src/mongo/db/free_mon/free_mon_mongod.cpp186
-rw-r--r--src/mongo/db/free_mon/free_mon_processor.cpp466
-rw-r--r--src/mongo/db/free_mon/free_mon_processor.h264
-rw-r--r--src/mongo/db/ftdc/ftdc_server.h2
10 files changed, 1885 insertions, 9 deletions
diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript
index f9210015636..384d960c814 100644
--- a/src/mongo/db/free_mon/SConscript
+++ b/src/mongo/db/free_mon/SConscript
@@ -10,12 +10,15 @@ fmEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
fmEnv.Library(
target='free_mon',
source=[
+ 'free_mon_processor.cpp',
'free_mon_queue.cpp',
'free_mon_storage.cpp',
+ 'free_mon_controller.cpp',
env.Idlc('free_mon_protocol.idl')[0],
+ env.Idlc('free_mon_commands.idl')[0],
env.Idlc('free_mon_storage.idl')[0],
],
- LIBDEPS_PRIVATE=[
+ LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/dbhelpers',
@@ -36,6 +39,11 @@ if get_option("enable-free-mon") == "on":
],
LIBDEPS=[
'free_mon',
+ '$BUILD_DIR/mongo/db/ftdc/ftdc_server',
+ '$BUILD_DIR/mongo/util/options_parser/options_parser',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
],
SYSLIBDEPS=[
'libcurld' if env.TargetOSIs('windows') else 'curl',
@@ -54,6 +62,7 @@ else:
env.CppUnitTest(
target='free_mon_test',
source=[
+ 'free_mon_controller_test.cpp',
'free_mon_queue_test.cpp',
'free_mon_storage_test.cpp',
],
diff --git a/src/mongo/db/free_mon/free_mon_commands.idl b/src/mongo/db/free_mon/free_mon_commands.idl
new file mode 100644
index 00000000000..0e41e96a9f8
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_commands.idl
@@ -0,0 +1,44 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+global:
+ cpp_namespace: "mongo"
+
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+
+enums:
+ SetFreeMonAction:
+ description: "Action types"
+ type: string
+ values:
+ enable: "enable"
+ disable: "disable"
+
+
+commands:
+ setFreeMonitoring:
+ description: "setFreeMonitoring Command"
+ namespace: ignored
+ fields:
+ action:
+ description: "Action to take"
+ type: SetFreeMonAction
+
+ getFreeMonitoringStatus:
+ description: "getFreeMonitoringStatus Command"
+ namespace: ignored
+
diff --git a/src/mongo/db/free_mon/free_mon_controller.cpp b/src/mongo/db/free_mon/free_mon_controller.cpp
new file mode 100644
index 00000000000..f8d17816db7
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_controller.cpp
@@ -0,0 +1,143 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/free_mon/free_mon_controller.h"
+
+#include "mongo/db/ftdc/collector.h"
+#include "mongo/logger/logstream_builder.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+FreeMonNetworkInterface::~FreeMonNetworkInterface() = default;
+
+void FreeMonController::addRegistrationCollector(
+ std::unique_ptr<FreeMonCollectorInterface> collector) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kNotStarted);
+
+ _registrationCollectors.add(std::move(collector));
+ }
+}
+
+void FreeMonController::addMetricsCollector(std::unique_ptr<FreeMonCollectorInterface> collector) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kNotStarted);
+
+ _metricCollectors.add(std::move(collector));
+ }
+}
+
+void FreeMonController::registerServerStartup(RegistrationType registrationType,
+ std::vector<std::string>& tags) {
+ _enqueue(FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>::createNow(
+ FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>::payload_type(
+ registrationType, tags)));
+}
+
+boost::optional<Status> FreeMonController::registerServerCommand(Milliseconds timeout) {
+ auto msg = FreeMonRegisterCommandMessage::createNow(std::vector<std::string>());
+ _enqueue(msg);
+
+ if (timeout > Milliseconds::min()) {
+ return msg->wait_for(timeout);
+ }
+
+ return Status::OK();
+}
+
+void FreeMonController::_enqueue(std::shared_ptr<FreeMonMessage> msg) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kStarted);
+ }
+
+ _processor->enqueue(std::move(msg));
+}
+
+void FreeMonController::start(RegistrationType registrationType) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ invariant(_state == State::kNotStarted);
+ }
+
+ // Start the agent
+ _processor = std::make_shared<FreeMonProcessor>(
+ _registrationCollectors, _metricCollectors, _network.get());
+
+ _thread = stdx::thread([this] { _processor->run(); });
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ invariant(_state == State::kNotStarted);
+ _state = State::kStarted;
+ }
+
+ if (registrationType != RegistrationType::DoNotRegister) {
+ std::vector<std::string> vec;
+ registerServerStartup(registrationType, vec);
+ }
+}
+
+void FreeMonController::stop() {
+ // Stop the agent
+ log() << "Shutting down free monitoring";
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ bool started = (_state == State::kStarted);
+
+ invariant(_state == State::kNotStarted || _state == State::kStarted);
+
+ if (!started) {
+ _state = State::kDone;
+ return;
+ }
+
+ _state = State::kStopRequested;
+
+ // Tell the processor to stop
+ _processor->stop();
+ }
+
+ _thread.join();
+
+ _state = State::kDone;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_controller.h b/src/mongo/db/free_mon/free_mon_controller.h
new file mode 100644
index 00000000000..e02b9b71957
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_controller.h
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/db/free_mon/free_mon_message.h"
+#include "mongo/db/free_mon/free_mon_network.h"
+#include "mongo/db/free_mon/free_mon_processor.h"
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/duration.h"
+
+namespace mongo {
+
+/**
+ * Manages and control Free Monitoring. This is the entry point for non-free monitoring components
+ * into free-monitoring.
+ */
+class FreeMonController {
+public:
+ explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network)
+ : _network(std::move(network)) {}
+
+ /**
+ * Initializes free monitoring.
+ * Start free monitoring thread in the background.
+ */
+ void start(RegistrationType registrationType);
+
+ /**
+ * Stops free monitoring thread.
+ */
+ void stop();
+
+ /**
+ * Add a metric collector to collect on registration
+ */
+ void addRegistrationCollector(std::unique_ptr<FreeMonCollectorInterface> collector);
+
+ /**
+ * Add a metric collector to collect periodically
+ */
+ void addMetricsCollector(std::unique_ptr<FreeMonCollectorInterface> collector);
+
+ /**
+ * Get the FreeMonController from ServiceContext.
+ */
+ static FreeMonController* get(ServiceContext* serviceContext);
+
+ /**
+ * Start registration of mongod with remote service.
+ *
+ * Only sends one remote registration at a time.
+ * Returns after timeout if registrations is not complete. Registration continues though.
+ */
+ void registerServerStartup(RegistrationType registrationType, std::vector<std::string>& tags);
+
+ /**
+ * Start registration of mongod with remote service.
+ *
+ * Only sends one remote registration at a time.
+ * Returns after timeout if registrations is not complete. Registration continues though.
+ * Update is synchronous with 10sec timeout
+ * kicks off register, and once register is done kicks off metrics upload
+ */
+ boost::optional<Status> registerServerCommand(Milliseconds timeout);
+
+ // TODO - add these methods
+ // Status deregisterServerCommand(Milliseconds timeout);
+
+ // void getStatus(BSONObjBuilder* builder);
+ // void getServerStatus(BSONObjBuilder* builder);
+
+ // void notifyObserver(const BSONObj& doc);
+private:
+ void _enqueue(std::shared_ptr<FreeMonMessage> msg);
+
+private:
+ /**
+ * Private enum to track state.
+ *
+ * +-----------------------------------------------------------+
+ * | v
+ * +-------------+ +----------+ +----------------+ +-------+
+ * | kNotStarted | --> | kStarted | --> | kStopRequested | --> | kDone |
+ * +-------------+ +----------+ +----------------+ +-------+
+ */
+ enum class State {
+ /**
+ * Initial state. Either start() or stop() can be called next.
+ */
+ kNotStarted,
+
+ /**
+ * start() has been called. stop() should be called next.
+ */
+ kStarted,
+
+ /**
+ * stop() has been called, and the background thread is in progress of shutting down
+ */
+ kStopRequested,
+
+ /**
+ * Controller has been stopped.
+ */
+ kDone,
+ };
+
+ // Controller state
+ State _state{State::kNotStarted};
+
+ // Mutext to protect internal state
+ stdx::mutex _mutex;
+
+ // Set of registration collectors
+ FreeMonCollectorCollection _registrationCollectors;
+
+ // Set of metric collectors
+ FreeMonCollectorCollection _metricCollectors;
+
+ // Network interface
+ std::unique_ptr<FreeMonNetworkInterface> _network;
+
+ // Background thead for agent
+ stdx::thread _thread;
+
+ // Background agent
+ std::shared_ptr<FreeMonProcessor> _processor;
+};
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp
new file mode 100644
index 00000000000..6ac7f582d13
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp
@@ -0,0 +1,430 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kFTDC
+
+
+#include "mongo/platform/basic.h"
+
+#include <boost/filesystem.hpp>
+#include <future>
+#include <iostream>
+
+#include "mongo/db/free_mon/free_mon_controller.h"
+#include "mongo/db/free_mon/free_mon_storage.h"
+
+#include "mongo/base/data_type_validated.h"
+#include "mongo/base/deinitializer_context.h"
+#include "mongo/bson/bson_validate.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
+#include "mongo/db/ftdc/collector.h"
+#include "mongo/db/ftdc/config.h"
+#include "mongo/db/ftdc/constants.h"
+#include "mongo/db/ftdc/controller.h"
+#include "mongo/db/ftdc/ftdc_test.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/service_context_noop.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
+
+
+namespace mongo {
+namespace {
+
+class FreeMonNetworkInterfaceMock : public FreeMonNetworkInterface {
+public:
+ struct Options {
+ bool failRegisterHttp{false};
+ bool invalidRegister{false};
+ };
+
+ explicit FreeMonNetworkInterfaceMock(executor::ThreadPoolTaskExecutor* threadPool,
+ Options options)
+ : _threadPool(threadPool), _options(options) {}
+ ~FreeMonNetworkInterfaceMock() final = default;
+
+ Future<FreeMonRegistrationResponse> sendRegistrationAsync(
+ const FreeMonRegistrationRequest& req) final {
+ log() << "Sending Registration ...";
+
+ _registers.addAndFetch(1);
+
+ Promise<FreeMonRegistrationResponse> promise;
+ auto future = promise.getFuture();
+ auto shared_promise = promise.share();
+
+ auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this](
+ const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+
+ if (_options.failRegisterHttp) {
+ shared_promise.setError(
+ Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure"));
+ return;
+ }
+
+ auto resp = FreeMonRegistrationResponse();
+ resp.setVersion(1);
+
+ if (_options.invalidRegister) {
+ resp.setVersion(42);
+ }
+
+ if (req.getId().is_initialized()) {
+ resp.setId(req.getId().get());
+ } else {
+ resp.setId(UUID::gen().toString());
+ }
+
+ resp.setReportingInterval(1);
+
+ shared_promise.emplaceValue(resp);
+ });
+ ASSERT_OK(swSchedule.getStatus());
+
+ return future;
+ }
+
+ Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) final {
+ log() << "Sending Metrics ...";
+ ASSERT_FALSE(req.getId().empty());
+
+ _metrics.addAndFetch(1);
+
+ Promise<FreeMonMetricsResponse> promise;
+ auto future = promise.getFuture();
+ auto shared_promise = promise.share();
+
+ auto swSchedule = _threadPool->scheduleWork(
+ [shared_promise, req](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+ auto resp = FreeMonMetricsResponse();
+ resp.setVersion(1);
+ resp.setReportingInterval(1);
+
+ shared_promise.emplaceValue(resp);
+ });
+ ASSERT_OK(swSchedule.getStatus());
+
+
+ return future;
+ }
+
+ int32_t getRegistersCalls() const {
+ return _registers.load();
+ }
+ int32_t getMetricsCalls() const {
+ return _metrics.load();
+ }
+
+private:
+ AtomicInt32 _registers;
+ AtomicInt32 _metrics;
+
+ executor::ThreadPoolTaskExecutor* _threadPool;
+
+ Options _options;
+};
+
+class FreeMonControllerTest : public ServiceContextMongoDTest {
+
+private:
+ void setUp() final;
+ void tearDown() final;
+
+protected:
+ /**
+ * Looks up the current ReplicationCoordinator.
+ * The result is cast to a ReplicationCoordinatorMock to provide access to test features.
+ */
+ repl::ReplicationCoordinatorMock* _getReplCoord() const;
+
+ ServiceContext::UniqueOperationContext _opCtx;
+
+ executor::NetworkInterfaceMock* _mockNetwork{nullptr};
+
+ std::unique_ptr<executor::ThreadPoolTaskExecutor> _mockThreadPool;
+};
+
+void FreeMonControllerTest::setUp() {
+ ServiceContextMongoDTest::setUp();
+ auto service = getServiceContext();
+
+ repl::ReplicationCoordinator::set(service,
+ std::make_unique<repl::ReplicationCoordinatorMock>(service));
+
+ // Set up a NetworkInterfaceMock. Note, unlike NetworkInterfaceASIO, which has its own pool of
+ // threads, tasks in the NetworkInterfaceMock must be carried out synchronously by the (single)
+ // thread the unit test is running on.
+ auto netForFixedTaskExecutor = std::make_unique<executor::NetworkInterfaceMock>();
+ _mockNetwork = netForFixedTaskExecutor.get();
+
+ // Set up a ThreadPoolTaskExecutor. Note, for local tasks this TaskExecutor uses a
+ // ThreadPoolMock, and for remote tasks it uses the NetworkInterfaceMock created above. However,
+ // note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is
+ // again just the (single) thread the unit test is running on. Therefore, all tasks, local and
+ // remote, must be carried out synchronously by the test thread.
+ _mockThreadPool = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor));
+
+ _mockThreadPool->startup();
+
+ _opCtx = cc().makeOperationContext();
+
+ //_storage = stdx::make_unique<repl::StorageInterfaceImpl>();
+ repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>());
+
+
+ // Transition to PRIMARY so that the server can accept writes.
+ ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_PRIMARY));
+
+
+ // Create collection with one document.
+ CollectionOptions collectionOptions;
+ collectionOptions.uuid = UUID::gen();
+
+ auto statusCC = repl::StorageInterface::get(service)->createCollection(
+ _opCtx.get(), NamespaceString("admin", "system.version"), collectionOptions);
+ ASSERT_OK(statusCC);
+}
+
+void FreeMonControllerTest::tearDown() {
+ _opCtx = {};
+ ServiceContextMongoDTest::tearDown();
+}
+
+repl::ReplicationCoordinatorMock* FreeMonControllerTest::_getReplCoord() const {
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx.get());
+ ASSERT(replCoord) << "No ReplicationCoordinator installed";
+ auto replCoordMock = dynamic_cast<repl::ReplicationCoordinatorMock*>(replCoord);
+ ASSERT(replCoordMock) << "Unexpected type for installed ReplicationCoordinator";
+ return replCoordMock;
+}
+
+#define ASSERT_RANGE(target, lower, upper) \
+ { \
+ auto __x = counter.getNextDuration(); \
+ ASSERT_GTE(__x, target + lower); \
+ ASSERT_LTE(__x, target + upper); \
+ }
+
+// Positive: Ensure deadlines sort properly
+TEST(FreeMonRetryTest, TestRegistration) {
+ PseudoRandom random(0);
+ RegistrationRetryCounter counter(random);
+ counter.reset();
+
+ ASSERT_EQ(counter.getNextDuration(), Seconds(1));
+ ASSERT_EQ(counter.getNextDuration(), Seconds(1));
+
+ for (int j = 0; j < 3; j++) {
+ // Fail requests
+ for (int i = 1; i <= 10; ++i) {
+ ASSERT_TRUE(counter.incrementError());
+
+ int64_t base = pow(2, i);
+ ASSERT_RANGE(Seconds(base), Seconds(2), Seconds(10));
+ }
+
+ ASSERT_TRUE(counter.incrementError());
+ ASSERT_RANGE(Seconds(1024), Seconds(60), Seconds(120));
+ ASSERT_TRUE(counter.incrementError());
+ ASSERT_RANGE(Seconds(1024), Seconds(60), Seconds(120));
+
+ counter.reset();
+ }
+}
+
+// Positive: Ensure the response is validated correctly
+TEST(AFreeMonProcessorTest, TestResponseValidation) {
+ ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: bad protocol version
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 42LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: halt uploading
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << true << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large registartation id
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id" << std::string(5000, 'a')
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large URL
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << std::string(5000, 'b')
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large message
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << std::string(5000, 'c')
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: too small a reporting interval
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 0LL))));
+
+ // Negative: too large a reporting interval
+ ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << (60LL * 60 * 24 + 1LL)))));
+}
+
+// Positive: Test Register works
+TEST_F(FreeMonControllerTest, TestRegister) {
+ // FreeMonNetworkInterfaceMock network;
+ FreeMonController controller(
+ std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock(
+ _mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options())));
+
+ controller.start(RegistrationType::DoNotRegister);
+
+ ASSERT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(5))));
+
+ ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty());
+
+ controller.stop();
+}
+
+// Negatve: Test Register times out if network stack drops messages
+TEST_F(FreeMonControllerTest, TestRegisterTimeout) {
+
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.failRegisterHttp = true;
+ auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>(
+ new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts));
+ auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get());
+ FreeMonController controller(std::move(networkUnique));
+
+ controller.start(RegistrationType::DoNotRegister);
+
+ ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15))));
+
+ ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized());
+ ASSERT_GTE(network->getRegistersCalls(), 2);
+
+ controller.stop();
+}
+
+// Negatve: Test Register times out if the registration is wrong
+TEST_F(FreeMonControllerTest, TestRegisterFail) {
+
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.invalidRegister = true;
+ auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>(
+ new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts));
+ auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get());
+ FreeMonController controller(std::move(networkUnique));
+
+ controller.start(RegistrationType::DoNotRegister);
+
+ ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15))));
+
+ ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized());
+ ASSERT_EQ(network->getRegistersCalls(), 1);
+
+ controller.stop();
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h
index e2b4e727463..3ee260f7644 100644
--- a/src/mongo/db/free_mon/free_mon_message.h
+++ b/src/mongo/db/free_mon/free_mon_message.h
@@ -31,6 +31,7 @@
#include <condition_variable>
#include <vector>
+#include "mongo/db/free_mon/free_mon_protocol_gen.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
@@ -74,6 +75,26 @@ enum class FreeMonMessageType {
};
/**
+ * Supported types of registration that occur on server startup.
+ */
+enum class RegistrationType {
+ /**
+ * Do not register on start because it was not configured via commandline/config file.
+ */
+ DoNotRegister,
+
+ /**
+ * Register immediately on start since we are a standalone.
+ */
+ RegisterOnStart,
+
+ /**
+ * Register after transition to becoming primary because we are in a replica set.
+ */
+ RegisterAfterOnTransitionToPrimary,
+};
+
+/**
* Message class that encapsulate a message to the FreeMonMessageProcessor
*
* Has a type and a deadline for when to start processing the message.
@@ -126,4 +147,167 @@ private:
};
+/**
+ * Most messages have a simple payload, and this template ensures we create type-safe messages for
+ * each message type without copy-pasting repeatedly.
+ */
+template <FreeMonMessageType typeT>
+struct FreeMonPayloadForMessage {
+ using payload_type = void;
+};
+
+template <>
+struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterComplete> {
+ using payload_type = FreeMonRegistrationResponse;
+};
+
+template <>
+struct FreeMonPayloadForMessage<FreeMonMessageType::RegisterServer> {
+ using payload_type = std::pair<RegistrationType, std::vector<std::string>>;
+};
+
+template <>
+struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterFail> {
+ using payload_type = Status;
+};
+
+/**
+ * Message with a generic payload based on the type of message.
+ */
+template <FreeMonMessageType typeT>
+class FreeMonMessageWithPayload : public FreeMonMessage {
+public:
+ using payload_type = typename FreeMonPayloadForMessage<typeT>::payload_type;
+
+ /**
+ * Create a message that should processed immediately.
+ */
+ static std::shared_ptr<FreeMonMessageWithPayload> createNow(payload_type t) {
+ return std::make_shared<FreeMonMessageWithPayload>(t, Date_t::min());
+ }
+
+ /**
+ * Get message payload.
+ */
+ const payload_type& getPayload() const {
+ return _t;
+ }
+
+public:
+ FreeMonMessageWithPayload(payload_type t, Date_t deadline)
+ : FreeMonMessage(typeT, deadline), _t(std::move(t)) {}
+
+private:
+ // Message payload
+ payload_type _t;
+};
+
+/**
+ * Single-shot class that encapsulates a Status and allows a caller to wait for a time.
+ *
+ * Basically, a single producer, single consumer queue with one event.
+ */
+class WaitableResult {
+public:
+ WaitableResult() : _status(Status::OK()) {}
+
+ /**
+ * Set Status and signal waiter.
+ */
+ void set(Status status) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ invariant(!_set);
+ if (!_set) {
+ _set = true;
+ _status = std::move(status);
+ _condvar.notify_one();
+ }
+ }
+
+ /**
+ * Waits for duration until status has been set.
+ *
+ * Returns boost::none on timeout.
+ */
+ boost::optional<Status> wait_for(Milliseconds duration) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ if (!_condvar.wait_for(lock, duration.toSystemDuration(), [this]() { return _set; })) {
+ return {};
+ }
+
+ return _status;
+ }
+
+private:
+ // Condition variable to signal consumer
+ std::condition_variable _condvar;
+
+ // Lock for condition variable and to protect state
+ std::mutex _mutex;
+
+ // Indicates whether _status has been set
+ bool _set{false};
+
+ // Provided status
+ Status _status;
+};
+
+/**
+ * Custom waitable message for Register Command message.
+ */
+class FreeMonRegisterCommandMessage : public FreeMonMessage {
+public:
+ /**
+ * Create a message that should processed immediately.
+ */
+ static std::shared_ptr<FreeMonRegisterCommandMessage> createNow(
+ const std::vector<std::string>& tags) {
+ return std::make_shared<FreeMonRegisterCommandMessage>(tags, Date_t::min());
+ }
+
+ /**
+ * Create a message that should processed after the specified deadline.
+ */
+ static std::shared_ptr<FreeMonRegisterCommandMessage> createWithDeadline(
+ const std::vector<std::string>& tags, Date_t deadline) {
+ return std::make_shared<FreeMonRegisterCommandMessage>(tags, deadline);
+ }
+
+ /**
+ * Get tags.
+ */
+ const std::vector<std::string>& getTags() const {
+ return _tags;
+ }
+
+ /**
+ * Set Status and signal waiter.
+ */
+ void setStatus(Status status) {
+ _waitable.set(std::move(status));
+ }
+
+ /**
+ * Waits for duration until status has been set.
+ *
+ * Returns boost::none on timeout.
+ */
+ boost::optional<Status> wait_for(Milliseconds duration) {
+ return _waitable.wait_for(duration);
+ }
+
+public:
+ FreeMonRegisterCommandMessage(std::vector<std::string> tags, Date_t deadline)
+ : FreeMonMessage(FreeMonMessageType::RegisterCommand, deadline), _tags(std::move(tags)) {}
+
+private:
+ // WaitaleResult to notify caller
+ WaitableResult _waitable{};
+
+ // Tags
+ const std::vector<std::string> _tags;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp
index aa4008973d5..9d41ba96d38 100644
--- a/src/mongo/db/free_mon/free_mon_mongod.cpp
+++ b/src/mongo/db/free_mon/free_mon_mongod.cpp
@@ -26,6 +26,8 @@
* then also delete it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/free_mon/free_mon_mongod.h"
@@ -42,9 +44,11 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/bsontypes.h"
#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/free_mon/free_mon_controller.h"
#include "mongo/db/free_mon/free_mon_http.h"
#include "mongo/db/free_mon/free_mon_message.h"
#include "mongo/db/free_mon/free_mon_network.h"
+#include "mongo/db/free_mon/free_mon_options.h"
#include "mongo/db/free_mon/free_mon_protocol_gen.h"
#include "mongo/db/free_mon/free_mon_storage.h"
#include "mongo/db/ftdc/ftdc_server.h"
@@ -56,11 +60,24 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
+const auto getFreeMonController =
+ ServiceContext::declareDecoration<std::unique_ptr<FreeMonController>>();
+
+FreeMonController* getGlobalFreeMonController() {
+ if (!hasGlobalServiceContext()) {
+ return nullptr;
+ }
+
+ return getFreeMonController(getGlobalServiceContext()).get();
+}
+
+
/**
* Expose cloudFreeMonitoringEndpointURL set parameter to URL for free monitoring.
*/
@@ -74,7 +91,7 @@ public:
Status setFromString(const std::string& str) final {
// Check for http, not https here because testEnabled may not be set yet
- if (!str.compare(0, 4, "http")) {
+ if (str.compare(0, 4, "http") != 0) {
return Status(ErrorCodes::BadValue,
"ExportedFreeMonEndpointURL only supports https:// URLs");
}
@@ -94,7 +111,8 @@ public:
const FreeMonRegistrationRequest& req) override {
BSONObj reqObj = req.toBSON();
- return _client->postAsync(exportedExportedFreeMonEndpointURL.getURL() + "/register", reqObj)
+ return _client
+ ->postAsync(exportedExportedFreeMonEndpointURL.getLocked() + "/register", reqObj)
.then([](std::vector<uint8_t> blob) {
if (blob.empty()) {
@@ -119,7 +137,8 @@ public:
Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) override {
BSONObj reqObj = req.toBSON();
- return _client->postAsync(exportedExportedFreeMonEndpointURL.getURL() + "/metrics", reqObj)
+ return _client
+ ->postAsync(exportedExportedFreeMonEndpointURL.getLocked() + "/metrics", reqObj)
.then([](std::vector<uint8_t> blob) {
if (blob.empty()) {
@@ -145,6 +164,76 @@ private:
std::unique_ptr<FreeMonHttpClientInterface> _client;
};
+/**
+ * Collect the mms-automation state document from local.clustermanager during registration.
+ */
+class FreeMonLocalClusterManagerCollector : public FreeMonCollectorInterface {
+public:
+ std::string name() const final {
+ return "clustermanager";
+ }
+
+ void collect(OperationContext* opCtx, BSONObjBuilder& builder) {
+ auto optionalObj = FreeMonStorage::readClusterManagerState(opCtx);
+ if (optionalObj.is_initialized()) {
+ builder.appendElements(optionalObj.get());
+ }
+ }
+};
+
+/**
+ * Get the "storageEngine" section of "serverStatus" during registration.
+ */
+class FreeMonLocalStorageEngineStatusCollector : public FTDCSimpleInternalCommandCollector {
+public:
+ FreeMonLocalStorageEngineStatusCollector()
+ : FTDCSimpleInternalCommandCollector(
+ "serverStatus",
+ "serverStatus",
+ "",
+ // Try to filter server status to make it cheaper to collect. Harmless if we gather
+ // extra
+ BSON("serverStatus" << 1 << "storageEngine" << true << "extra_info" << false
+ << "opLatencies"
+ << false
+ << "opcountersRepl"
+ << false
+ << "opcounters"
+ << false
+ << "transactions"
+ << false
+ << "connections"
+ << false
+ << "network"
+ << false
+ << "tcMalloc"
+ << false
+ << "network"
+ << false
+ << "wiredTiger"
+ << false
+ << "sharding"
+ << false
+ << "metrics"
+ << false)) {}
+
+ std::string name() const final {
+ return "storageEngine";
+ }
+
+ void collect(OperationContext* opCtx, BSONObjBuilder& builder) {
+ BSONObjBuilder localBuilder;
+
+ FTDCSimpleInternalCommandCollector::collect(opCtx, localBuilder);
+
+ BSONObj obj = localBuilder.obj();
+
+ builder.appendElements(obj["storageEngine"].Obj());
+ }
+};
+
+} // namespace
+
auto makeTaskExecutor(ServiceContext* /*serviceContext*/) {
ThreadPool::Options tpOptions;
@@ -158,10 +247,59 @@ auto makeTaskExecutor(ServiceContext* /*serviceContext*/) {
executor::makeNetworkInterface("NetworkInterfaceASIO-FreeMon"));
}
-} // namespace
+void registerCollectors(FreeMonController* controller) {
+ // These are collected only at registration
+ //
+ // CmdBuildInfo
+ controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "buildInfo", "buildInfo", "", BSON("buildInfo" << 1)));
+
+ // HostInfoCmd
+ controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "hostInfo", "hostInfo", "", BSON("hostInfo" << 1)));
+
+ // Add storageEngine section from serverStatus
+ controller->addRegistrationCollector(
+ stdx::make_unique<FreeMonLocalStorageEngineStatusCollector>());
+
+ // Gather one document from local.clustermanager
+ controller->addRegistrationCollector(stdx::make_unique<FreeMonLocalClusterManagerCollector>());
+
+ // These are periodically for metrics upload
+ //
+ controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "getDiagnosticData", "diagnosticData", "", BSON("getDiagnosticData" << 1)));
+
+ // These are collected at registration and as metrics periodically
+ //
+ if (repl::ReplicationCoordinator::get(getGlobalServiceContext())->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeNone) {
+ // CmdReplSetGetConfig
+ controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "replSetGetConfig", "replSetGetConfig", "", BSON("replSetGetConfig" << 1)));
+
+ controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "replSetGetConfig", "replSetGetConfig", "", BSON("replSetGetConfig" << 1)));
+ }
+ controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "isMaster", "isMaster", "", BSON("isMaster" << 1)));
+
+ controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>(
+ "isMaster", "isMaster", "", BSON("isMaster" << 1)));
+}
void startFreeMonitoring(ServiceContext* serviceContext) {
+ if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOff) {
+ return;
+ }
+
+ // Check for http, not https here because testEnabled may not be set yet
+ if (!getTestCommandsEnabled()) {
+ uassert(50774,
+ "ExportedFreeMonEndpointURL only supports https:// URLs",
+ exportedExportedFreeMonEndpointURL.getLocked().compare(0, 5, "https") == 0);
+ }
auto executor = makeTaskExecutor(serviceContext);
@@ -175,12 +313,46 @@ void startFreeMonitoring(ServiceContext* serviceContext) {
auto network =
std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkHttp(std::move(http)));
+
+ auto controller = stdx::make_unique<FreeMonController>(std::move(network));
+
+ registerCollectors(controller.get());
+
+ // Install the new controller
+ auto& staticFreeMon = getFreeMonController(serviceContext);
+
+ staticFreeMon = std::move(controller);
+
+ RegistrationType registrationType = RegistrationType::DoNotRegister;
+ if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOn) {
+ // If replication is enabled, we may need to register on becoming primary
+ if (repl::ReplicationCoordinator::get(getGlobalServiceContext())->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeNone) {
+ registrationType = RegistrationType::RegisterAfterOnTransitionToPrimary;
+ } else {
+ registrationType = RegistrationType::RegisterOnStart;
+ }
+ }
+
+ staticFreeMon->start(registrationType);
}
-void stopFreeMonitoring() {}
+void stopFreeMonitoring() {
+ if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOff) {
+ return;
+ }
-FreeMonHttpClientInterface::~FreeMonHttpClientInterface() = default;
+ auto controller = getGlobalFreeMonController();
-FreeMonNetworkInterface::~FreeMonNetworkInterface() = default;
+ if (controller != nullptr) {
+ controller->stop();
+ }
+}
+
+FreeMonController* FreeMonController::get(ServiceContext* serviceContext) {
+ return getFreeMonController(serviceContext).get();
+}
+
+FreeMonHttpClientInterface::~FreeMonHttpClientInterface() = default;
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp
new file mode 100644
index 00000000000..2713e19be32
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_processor.cpp
@@ -0,0 +1,466 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/free_mon/free_mon_processor.h"
+
+#include <functional>
+#include <tuple>
+#include <utility>
+
+#include "mongo/base/data_range.h"
+#include "mongo/base/status.h"
+#include "mongo/base/string_data.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/free_mon/free_mon_storage.h"
+#include "mongo/db/service_context.h"
+#include "mongo/idl/idl_parser.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+
+constexpr auto kProtocolVersion = 1;
+
+constexpr auto kRegistrationIdMaxLength = 4096;
+constexpr auto kInformationalURLMaxLength = 4096;
+constexpr auto kInformationalMessageMaxLength = 4096;
+constexpr auto kUserReminderMaxLength = 4096;
+
+constexpr auto kReportingIntervalMinutesMin = 1;
+constexpr auto kReportingIntervalMinutesMax = 60 * 60 * 24;
+
+int64_t randomJitter(PseudoRandom& random, int64_t min, int64_t max) {
+ dassert(max > min);
+ return (std::abs(random.nextInt64()) % (max - min)) + min;
+}
+
+} // namespace
+
+void RegistrationRetryCounter::reset() {
+ _current = _min;
+ _base = _min;
+ _retryCount = 0;
+ _total = Hours(0);
+}
+
+bool RegistrationRetryCounter::incrementError() {
+ if (_retryCount < kStage1RetryCountMax) {
+ _base = 2 * _base;
+ _current = _base + Seconds(randomJitter(_random, kStage1JitterMin, kStage1JitterMax));
+ ++_retryCount;
+ } else {
+ _base = _base;
+ _current = _base + Seconds(randomJitter(_random, kStage2JitterMin, kStage2JitterMax));
+ }
+
+ _total += _current;
+
+ if (_total > kStage2DurationMax) {
+ return false;
+ }
+
+ return true;
+}
+
+void FreeMonProcessor::enqueue(std::shared_ptr<FreeMonMessage> msg) {
+ _queue.enqueue(std::move(msg));
+}
+
+void FreeMonProcessor::stop() {
+ _queue.stop();
+}
+
+void FreeMonProcessor::run() {
+ try {
+
+ Client::initThread("free_mon");
+ Client* client = &cc();
+
+ while (true) {
+ auto item = _queue.dequeue(client->getServiceContext()->getPreciseClockSource());
+ if (!item.is_initialized()) {
+ // Shutdown was triggered
+ return;
+ }
+
+ // Do work here
+ switch (item.get()->getType()) {
+ case FreeMonMessageType::RegisterCommand: {
+ doCommandRegister(client, item.get());
+ break;
+ }
+ case FreeMonMessageType::RegisterServer: {
+ doServerRegister(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>*>(
+ item.get().get()));
+ break;
+ }
+ case FreeMonMessageType::AsyncRegisterComplete: {
+ doAsyncRegisterComplete(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>*>(
+ item.get().get()));
+ break;
+ }
+ case FreeMonMessageType::AsyncRegisterFail: {
+ doAsyncRegisterFail(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>*>(
+ item.get().get()));
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+ } catch (...) {
+ // Stop the queue
+ _queue.stop();
+
+ warning() << "Uncaught exception in '" << exceptionToStatus()
+ << "' in free monitoring subsystem. Shutting down the "
+ "free monitoring subsystem.";
+ }
+}
+
+void FreeMonProcessor::readState(Client* client) {
+
+ auto optCtx = client->makeOperationContext();
+
+ auto state = FreeMonStorage::read(optCtx.get());
+
+ _lastReadState = state;
+
+ if (state.is_initialized()) {
+ invariant(state.get().getVersion() == kProtocolVersion);
+
+ _state = state.get();
+ } else if (!state.is_initialized()) {
+ // Default the state
+ _state.setVersion(kProtocolVersion);
+ _state.setState(StorageStateEnum::enabled);
+ _state.setRegistrationId("");
+ _state.setInformationalURL("");
+ _state.setMessage("");
+ _state.setUserReminder("");
+ }
+}
+
+void FreeMonProcessor::writeState(Client* client) {
+
+ // Do a compare and swap
+ // Verify the document is the same as the one on disk, if it is the same, then do the update
+ // If the local document is different, then oh-well we do nothing, and wait until the next round
+
+ // Has our in-memory state changed, if so consider writing
+ if (_lastReadState != _state) {
+
+ // The read and write are bound the same operation context
+ {
+ auto optCtx = client->makeOperationContext();
+
+ auto state = FreeMonStorage::read(optCtx.get());
+
+ // If our in-memory copy matches the last read, then write it to disk
+ if (state == _lastReadState) {
+ FreeMonStorage::replace(optCtx.get(), _state);
+ }
+ }
+ }
+}
+
+void FreeMonProcessor::doServerRegister(
+ Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg) {
+
+ // If we are asked to register now, then kick off a registration request
+ if (msg->getPayload().first == RegistrationType::RegisterOnStart) {
+ enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ } else if (msg->getPayload().first == RegistrationType::RegisterAfterOnTransitionToPrimary) {
+ // Check if we need to wait to become primary
+ // If the 'admin.system.version' has content, do not wait and just re-register
+ // If the collection is empty, wait until we become primary
+ // If we become secondary, OpObserver hooks will tell us our registration id
+
+ auto optCtx = client->makeOperationContext();
+
+ // Check if there is an existing document
+ auto state = FreeMonStorage::read(optCtx.get());
+
+ // If there is no document, we may be in a replica set and may need to register after
+ // becoming primary
+ // since we cannot record the registration id until after becoming primary
+ if (!state.is_initialized()) {
+ // TODO: hook OnTransitionToPrimary instead of this hack
+ enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ } else {
+ // If we have state, then we can do the normal register on startup
+ enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ }
+ }
+}
+
+namespace {
+template <typename T>
+std::unique_ptr<Future<void>> doAsyncCallback(FreeMonProcessor* proc,
+ Future<T> future,
+ std::function<void(const T&)> onSuccess,
+ std::function<void(Status)> onErrorFunc) {
+
+ // Grab a weak_ptr to be sure that FreeMonProcessor is alive during the callback
+ std::weak_ptr<FreeMonProcessor> wpProc(proc->shared_from_this());
+
+ auto spError = std::make_shared<bool>(false);
+
+ return std::make_unique<Future<void>>(std::move(future)
+ .onError([=](Status s) {
+ *(spError.get()) = true;
+ if (auto spProc = wpProc.lock()) {
+ onErrorFunc(s);
+ }
+
+ return T();
+ })
+ .then([=](const auto& resp) {
+ // If we hit an error, then do not call onSuccess
+ if (*(spError.get()) == true) {
+ return;
+ }
+
+ // Use a shared pointer here because the callback
+ // could return after we disappear
+ if (auto spProc = wpProc.lock()) {
+ onSuccess(resp);
+ }
+ }));
+}
+} // namespace
+
+void FreeMonProcessor::doCommandRegister(Client* client,
+ std::shared_ptr<FreeMonMessage> sharedMsg) {
+ auto msg = checked_cast<FreeMonRegisterCommandMessage*>(sharedMsg.get());
+
+ if (_futureRegistrationResponse) {
+ msg->setStatus(Status(ErrorCodes::FreeMonHttpInFlight,
+ "Free Monitoring Registration request in-flight already"));
+ return;
+ }
+
+ _pendingRegisters.push_back(sharedMsg);
+
+ readState(client);
+
+ FreeMonRegistrationRequest req;
+
+ if (!_state.getRegistrationId().empty()) {
+ req.setId(_state.getRegistrationId());
+ }
+
+ req.setVersion(kProtocolVersion);
+
+ if (!msg->getTags().empty()) {
+ // Cache the tags for subsequent retries
+ _tags = msg->getTags();
+ }
+
+ if (!_tags.empty()) {
+ req.setTag(transformVector(msg->getTags()));
+ }
+
+ // Collect the data
+ auto collect = _registration.collect(client);
+
+ req.setPayload(std::get<0>(collect));
+
+ // Send the async request
+ _futureRegistrationResponse = doAsyncCallback<FreeMonRegistrationResponse>(
+ this,
+ _network->sendRegistrationAsync(req),
+ [this](const auto& resp) {
+ this->enqueue(
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>::createNow(
+ resp));
+ },
+ [this](Status s) {
+ this->enqueue(
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>::createNow(s));
+ });
+}
+
+Status FreeMonProcessor::validateRegistrationResponse(const FreeMonRegistrationResponse& resp) {
+ // Any validation failure stops registration from proceeding to upload
+ if (resp.getVersion() != kProtocolVersion) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream()
+ << "Unexpected registration response protocol version, expected '"
+ << kProtocolVersion
+ << "', received '"
+ << resp.getVersion()
+ << "'");
+ }
+
+ if (resp.getId().size() >= kRegistrationIdMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Id is '" << resp.getId().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kRegistrationIdMaxLength
+ << "'");
+ }
+
+ if (resp.getInformationalURL().size() >= kInformationalURLMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "InformationURL is '" << resp.getInformationalURL().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kInformationalURLMaxLength
+ << "'");
+ }
+
+ if (resp.getMessage().size() >= kInformationalMessageMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Message is '" << resp.getMessage().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kInformationalMessageMaxLength
+ << "'");
+ }
+
+ if (resp.getUserReminder().is_initialized() &&
+ resp.getUserReminder().get().size() >= kUserReminderMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "UserReminder is '" << resp.getUserReminder().get().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kUserReminderMaxLength
+ << "'");
+ }
+
+ if (resp.getReportingInterval() < kReportingIntervalMinutesMin ||
+ resp.getReportingInterval() > kReportingIntervalMinutesMax) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Reporting Interval '" << resp.getReportingInterval()
+ << "' must be in the range ["
+ << kReportingIntervalMinutesMin
+ << ","
+ << kReportingIntervalMinutesMax
+ << "]");
+ }
+
+ // Did cloud ask us to stop uploading?
+ if (resp.getHaltMetricsUploading()) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Halting metrics upload due to response");
+ }
+
+ return Status::OK();
+}
+
+
+void FreeMonProcessor::notifyPendingRegisters(const Status s) {
+ for (auto&& pendingRegister : _pendingRegisters) {
+ (checked_cast<FreeMonRegisterCommandMessage*>(pendingRegister.get()))->setStatus(s);
+ }
+ _pendingRegisters.clear();
+}
+
+void FreeMonProcessor::doAsyncRegisterComplete(
+ Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg) {
+
+ // Our request is no longer in-progress so delete it
+ _futureRegistrationResponse.reset();
+
+ auto& resp = msg->getPayload();
+
+ Status s = validateRegistrationResponse(resp);
+ if (!s.isOK()) {
+ warning() << "Free Monitoring registration halted due to " << s;
+
+ notifyPendingRegisters(s);
+
+ // If validation fails, we do not retry
+ return;
+ }
+
+ // Update in-memory state
+ _registrationRetry.setMin(Seconds(resp.getReportingInterval()));
+
+ _state.setRegistrationId(resp.getId());
+
+ if (resp.getUserReminder().is_initialized()) {
+ _state.setUserReminder(resp.getUserReminder().get());
+ } else {
+ _state.setUserReminder("");
+ }
+
+ _state.setMessage(resp.getMessage());
+ _state.setInformationalURL(resp.getInformationalURL());
+
+ // Persist state
+ writeState(client);
+
+ // Reset retry counter
+ _registrationRetry.reset();
+
+ // Notify waiters
+ notifyPendingRegisters(Status::OK());
+
+ // TODO: Enqueue next metrics upload
+ // enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCallTimer,
+ // _registrationRetry.getNextDeadline(client)));
+}
+
+void FreeMonProcessor::doAsyncRegisterFail(
+ Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>* msg) {
+
+ // Our request is no longer in-progress so delete it
+ _futureRegistrationResponse.reset();
+
+ if (!_registrationRetry.incrementError()) {
+ // We have exceeded our retry
+ warning() << "Free Monitoring is abandoning registration after excess retries";
+ return;
+ }
+
+ LOG(1) << "Free Monitoring Registration Failed, " << msg->getPayload() << ", retrying in "
+ << _registrationRetry.getNextDuration();
+
+ // Enqueue a register retry
+ enqueue(FreeMonRegisterCommandMessage::createWithDeadline(
+ _tags, _registrationRetry.getNextDeadline(client)));
+}
+
+void FreeMonProcessor::doUnregister(Client* /*client*/) {}
+
+} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_processor.h b/src/mongo/db/free_mon/free_mon_processor.h
new file mode 100644
index 00000000000..ec09286addd
--- /dev/null
+++ b/src/mongo/db/free_mon/free_mon_processor.h
@@ -0,0 +1,264 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+#pragma once
+
+#include <boost/optional.hpp>
+#include <cstdint>
+#include <memory>
+#include <ratio>
+#include <string>
+#include <vector>
+
+#include "mongo/db/client.h"
+#include "mongo/db/free_mon/free_mon_message.h"
+#include "mongo/db/free_mon/free_mon_network.h"
+#include "mongo/db/free_mon/free_mon_processor.h"
+#include "mongo/db/free_mon/free_mon_protocol_gen.h"
+#include "mongo/db/free_mon/free_mon_queue.h"
+#include "mongo/db/free_mon/free_mon_storage_gen.h"
+#include "mongo/db/ftdc/collector.h"
+#include "mongo/db/service_context.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/future.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+using FreeMonCollectorInterface = FTDCCollectorInterface;
+using FreeMonCollectorCollection = FTDCCollectorCollection;
+
+/**
+ * Reponsible for tracking when to send the next retry after errors are encountered.
+ */
+class RetryCounter {
+ const int64_t kMax = 60 * 60 * 24;
+
+public:
+ RetryCounter() : _min(1), _max(kMax) {}
+ virtual ~RetryCounter() = default;
+
+ /**
+ * Set Minimum rety interval
+ */
+ void setMin(Seconds s) {
+ _min = s;
+ reset();
+ }
+
+ /**
+ * Reset the retry interval, typically occurs after a succesfull message is sent.
+ */
+ virtual void reset() = 0;
+
+ /**
+ * Increment the error count and compute the next interval.
+ */
+ virtual bool incrementError() = 0;
+
+ /**
+ * Get the next retry duration.
+ */
+ Seconds getNextDuration() {
+ dassert(_current != Seconds(0));
+ return _current;
+ }
+
+ /**
+ * Get the next retry deadline
+ */
+ Date_t getNextDeadline(Client* client) {
+ return client->getServiceContext()->getPreciseClockSource()->now() + _current;
+ }
+
+protected:
+ // Current retry interval
+ Seconds _current;
+
+ // Minimum retry interval
+ Seconds _min;
+
+ // Maximum retry interval
+ Seconds _max;
+};
+
+/**
+ * Manage retries for registrations
+ */
+class RegistrationRetryCounter : public RetryCounter {
+public:
+ explicit RegistrationRetryCounter(PseudoRandom& random) : _random(random) {}
+
+ void reset() final;
+
+ bool incrementError() final;
+
+private:
+ // Random number generator for jitter
+ PseudoRandom& _random;
+
+ // Retry count for stage 1 retry
+ size_t _retryCount{0};
+
+ // Total Seconds we have retried for
+ Seconds _total;
+
+ // Last retry interval without jitter
+ Seconds _base;
+
+ // Max Retry count
+ const size_t kStage1RetryCountMax{10};
+
+ const size_t kStage1JitterMin{2};
+ const size_t kStage1JitterMax{10};
+
+ const Hours kStage2DurationMax{48};
+
+ const size_t kStage2JitterMin{60};
+ const size_t kStage2JitterMax{120};
+};
+
+/**
+ * Process in an Agent in a Agent/Message Passing model.
+ *
+ * Messages are given to it by enqueue, and the Processor processes messages with run().
+ */
+class FreeMonProcessor : public std::enable_shared_from_this<FreeMonProcessor> {
+public:
+ FreeMonProcessor(FreeMonCollectorCollection& registration,
+ FreeMonCollectorCollection& metrics,
+ FreeMonNetworkInterface* network)
+ : _registration(registration),
+ _metrics(metrics),
+ _network(network),
+ _random(Date_t::now().asInt64()),
+ _registrationRetry(_random) {
+ _registrationRetry.reset();
+ }
+
+ /**
+ * Enqueue a message to process
+ */
+ void enqueue(std::shared_ptr<FreeMonMessage> msg);
+
+ /**
+ * Stop processing messages.
+ */
+ void stop();
+
+ /**
+ * Processes messages forever
+ */
+ void run();
+
+ /**
+ * Validate the registration response. Public for unit testing.
+ */
+ static Status validateRegistrationResponse(const FreeMonRegistrationResponse& resp);
+
+private:
+ /**
+ * Read the state from the database.
+ */
+ void readState(Client* client);
+
+ /**
+ * Write the state to disk if there are any changes.
+ */
+ void writeState(Client* client);
+
+ /**
+ * Process a registration from a command.
+ */
+ void doCommandRegister(Client* client, std::shared_ptr<FreeMonMessage> sharedMsg);
+
+ /**
+ * Process a registration from configuration.
+ */
+ void doServerRegister(Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg);
+
+ /**
+ * Process unregistration.
+ */
+ void doUnregister(Client* client);
+
+ /**
+ * Process a successful HTTP request.
+ */
+ void doAsyncRegisterComplete(
+ Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg);
+
+ /**
+ * Process an unsuccessful HTTP request.
+ */
+ void doAsyncRegisterFail(
+ Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>* msg);
+
+ /**
+ * Notify any command registers that are waiting.
+ */
+ void notifyPendingRegisters(const Status s);
+
+private:
+ // Collection of collectors to send on registration
+ FreeMonCollectorCollection& _registration;
+
+ // Collection of collectors to send on each metrics call
+ FreeMonCollectorCollection& _metrics;
+
+ // HTTP Network interface
+ FreeMonNetworkInterface* _network;
+
+ // Random number generator for retries
+ PseudoRandom _random;
+
+ // Registration Retry logic
+ RegistrationRetryCounter _registrationRetry;
+
+ // List of tags from server configuration registration
+ std::vector<std::string> _tags;
+
+ // In-flight registration response
+ std::unique_ptr<Future<void>> _futureRegistrationResponse;
+
+ // List of command registers waiting to be told about registration
+ std::vector<std::shared_ptr<FreeMonMessage>> _pendingRegisters;
+
+ // Last read storage state
+ boost::optional<FreeMonStorageState> _lastReadState;
+
+ // Pending update to disk
+ FreeMonStorageState _state;
+
+ // Message queue
+ FreeMonMessageQueue _queue;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/ftdc/ftdc_server.h b/src/mongo/db/ftdc/ftdc_server.h
index 02357c24e6f..c5cb2f83e18 100644
--- a/src/mongo/db/ftdc/ftdc_server.h
+++ b/src/mongo/db/ftdc/ftdc_server.h
@@ -81,7 +81,7 @@ void stopFTDC();
/**
* A simple FTDC Collector that runs Commands.
*/
-class FTDCSimpleInternalCommandCollector final : public FTDCCollectorInterface {
+class FTDCSimpleInternalCommandCollector : public FTDCCollectorInterface {
public:
FTDCSimpleInternalCommandCollector(StringData command,
StringData name,