diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-12 14:32:28 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-12 14:32:28 -0400 |
commit | c6620182aebd1b62d31879ce4d9456ff197aea22 (patch) | |
tree | d5e1c635b7054b0f4ee576d2fb10ff7206aa1b71 /src/mongo | |
parent | ec25294c8d0c1c60ff786ea99198749dc4788dd1 (diff) | |
download | mongo-c6620182aebd1b62d31879ce4d9456ff197aea22.tar.gz |
SERVER-34226 Implement FreeMonController - Registration piece
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/free_mon/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_commands.idl | 44 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_controller.cpp | 143 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_controller.h | 164 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_controller_test.cpp | 430 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_message.h | 184 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_mongod.cpp | 186 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_processor.cpp | 466 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_processor.h | 264 | ||||
-rw-r--r-- | src/mongo/db/ftdc/ftdc_server.h | 2 |
10 files changed, 1885 insertions, 9 deletions
diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript index f9210015636..384d960c814 100644 --- a/src/mongo/db/free_mon/SConscript +++ b/src/mongo/db/free_mon/SConscript @@ -10,12 +10,15 @@ fmEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) fmEnv.Library( target='free_mon', source=[ + 'free_mon_processor.cpp', 'free_mon_queue.cpp', 'free_mon_storage.cpp', + 'free_mon_controller.cpp', env.Idlc('free_mon_protocol.idl')[0], + env.Idlc('free_mon_commands.idl')[0], env.Idlc('free_mon_storage.idl')[0], ], - LIBDEPS_PRIVATE=[ + LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/dbhelpers', @@ -36,6 +39,11 @@ if get_option("enable-free-mon") == "on": ], LIBDEPS=[ 'free_mon', + '$BUILD_DIR/mongo/db/ftdc/ftdc_server', + '$BUILD_DIR/mongo/util/options_parser/options_parser', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', ], SYSLIBDEPS=[ 'libcurld' if env.TargetOSIs('windows') else 'curl', @@ -54,6 +62,7 @@ else: env.CppUnitTest( target='free_mon_test', source=[ + 'free_mon_controller_test.cpp', 'free_mon_queue_test.cpp', 'free_mon_storage_test.cpp', ], diff --git a/src/mongo/db/free_mon/free_mon_commands.idl b/src/mongo/db/free_mon/free_mon_commands.idl new file mode 100644 index 00000000000..0e41e96a9f8 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_commands.idl @@ -0,0 +1,44 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +global: + cpp_namespace: "mongo" + + +imports: + - "mongo/idl/basic_types.idl" + + +enums: + SetFreeMonAction: + description: "Action types" + type: string + values: + enable: "enable" + disable: "disable" + + +commands: + setFreeMonitoring: + description: "setFreeMonitoring Command" + namespace: ignored + fields: + action: + description: "Action to take" + type: SetFreeMonAction + + getFreeMonitoringStatus: + description: "getFreeMonitoringStatus Command" + namespace: ignored + diff --git a/src/mongo/db/free_mon/free_mon_controller.cpp b/src/mongo/db/free_mon/free_mon_controller.cpp new file mode 100644 index 00000000000..f8d17816db7 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_controller.cpp @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/free_mon/free_mon_controller.h" + +#include "mongo/db/ftdc/collector.h" +#include "mongo/logger/logstream_builder.h" +#include "mongo/util/log.h" + +namespace mongo { + +FreeMonNetworkInterface::~FreeMonNetworkInterface() = default; + +void FreeMonController::addRegistrationCollector( + std::unique_ptr<FreeMonCollectorInterface> collector) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kNotStarted); + + _registrationCollectors.add(std::move(collector)); + } +} + +void FreeMonController::addMetricsCollector(std::unique_ptr<FreeMonCollectorInterface> collector) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kNotStarted); + + _metricCollectors.add(std::move(collector)); + } +} + +void FreeMonController::registerServerStartup(RegistrationType registrationType, + std::vector<std::string>& tags) { + _enqueue(FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>::createNow( + FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>::payload_type( + registrationType, tags))); +} + +boost::optional<Status> FreeMonController::registerServerCommand(Milliseconds timeout) { + auto msg = FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()); + _enqueue(msg); + + if (timeout > Milliseconds::min()) { + return msg->wait_for(timeout); + } + + return Status::OK(); +} + +void FreeMonController::_enqueue(std::shared_ptr<FreeMonMessage> msg) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kStarted); + } + + _processor->enqueue(std::move(msg)); +} + +void FreeMonController::start(RegistrationType registrationType) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + invariant(_state == State::kNotStarted); + } + + // Start the agent + _processor = std::make_shared<FreeMonProcessor>( + _registrationCollectors, _metricCollectors, _network.get()); + + _thread = stdx::thread([this] { _processor->run(); }); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + invariant(_state == State::kNotStarted); + _state = State::kStarted; + } + + if (registrationType != RegistrationType::DoNotRegister) { + std::vector<std::string> vec; + registerServerStartup(registrationType, vec); + } +} + +void FreeMonController::stop() { + // Stop the agent + log() << "Shutting down free monitoring"; + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + bool started = (_state == State::kStarted); + + invariant(_state == State::kNotStarted || _state == State::kStarted); + + if (!started) { + _state = State::kDone; + return; + } + + _state = State::kStopRequested; + + // Tell the processor to stop + _processor->stop(); + } + + _thread.join(); + + _state = State::kDone; +} + + +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_controller.h b/src/mongo/db/free_mon/free_mon_controller.h new file mode 100644 index 00000000000..e02b9b71957 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_controller.h @@ -0,0 +1,164 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> +#include <mutex> +#include <string> +#include <utility> +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/db/free_mon/free_mon_message.h" +#include "mongo/db/free_mon/free_mon_network.h" +#include "mongo/db/free_mon/free_mon_processor.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/duration.h" + +namespace mongo { + +/** + * Manages and control Free Monitoring. This is the entry point for non-free monitoring components + * into free-monitoring. + */ +class FreeMonController { +public: + explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network) + : _network(std::move(network)) {} + + /** + * Initializes free monitoring. + * Start free monitoring thread in the background. + */ + void start(RegistrationType registrationType); + + /** + * Stops free monitoring thread. + */ + void stop(); + + /** + * Add a metric collector to collect on registration + */ + void addRegistrationCollector(std::unique_ptr<FreeMonCollectorInterface> collector); + + /** + * Add a metric collector to collect periodically + */ + void addMetricsCollector(std::unique_ptr<FreeMonCollectorInterface> collector); + + /** + * Get the FreeMonController from ServiceContext. + */ + static FreeMonController* get(ServiceContext* serviceContext); + + /** + * Start registration of mongod with remote service. + * + * Only sends one remote registration at a time. + * Returns after timeout if registrations is not complete. Registration continues though. + */ + void registerServerStartup(RegistrationType registrationType, std::vector<std::string>& tags); + + /** + * Start registration of mongod with remote service. + * + * Only sends one remote registration at a time. + * Returns after timeout if registrations is not complete. Registration continues though. + * Update is synchronous with 10sec timeout + * kicks off register, and once register is done kicks off metrics upload + */ + boost::optional<Status> registerServerCommand(Milliseconds timeout); + + // TODO - add these methods + // Status deregisterServerCommand(Milliseconds timeout); + + // void getStatus(BSONObjBuilder* builder); + // void getServerStatus(BSONObjBuilder* builder); + + // void notifyObserver(const BSONObj& doc); +private: + void _enqueue(std::shared_ptr<FreeMonMessage> msg); + +private: + /** + * Private enum to track state. + * + * +-----------------------------------------------------------+ + * | v + * +-------------+ +----------+ +----------------+ +-------+ + * | kNotStarted | --> | kStarted | --> | kStopRequested | --> | kDone | + * +-------------+ +----------+ +----------------+ +-------+ + */ + enum class State { + /** + * Initial state. Either start() or stop() can be called next. + */ + kNotStarted, + + /** + * start() has been called. stop() should be called next. + */ + kStarted, + + /** + * stop() has been called, and the background thread is in progress of shutting down + */ + kStopRequested, + + /** + * Controller has been stopped. + */ + kDone, + }; + + // Controller state + State _state{State::kNotStarted}; + + // Mutext to protect internal state + stdx::mutex _mutex; + + // Set of registration collectors + FreeMonCollectorCollection _registrationCollectors; + + // Set of metric collectors + FreeMonCollectorCollection _metricCollectors; + + // Network interface + std::unique_ptr<FreeMonNetworkInterface> _network; + + // Background thead for agent + stdx::thread _thread; + + // Background agent + std::shared_ptr<FreeMonProcessor> _processor; +}; + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp new file mode 100644 index 00000000000..6ac7f582d13 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp @@ -0,0 +1,430 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kFTDC + + +#include "mongo/platform/basic.h" + +#include <boost/filesystem.hpp> +#include <future> +#include <iostream> + +#include "mongo/db/free_mon/free_mon_controller.h" +#include "mongo/db/free_mon/free_mon_storage.h" + +#include "mongo/base/data_type_validated.h" +#include "mongo/base/deinitializer_context.h" +#include "mongo/bson/bson_validate.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" +#include "mongo/db/ftdc/collector.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/ftdc/constants.h" +#include "mongo/db/ftdc/controller.h" +#include "mongo/db/ftdc/ftdc_test.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/op_observer_noop.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/service_context_noop.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/barrier.h" +#include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/log.h" + + +namespace mongo { +namespace { + +class FreeMonNetworkInterfaceMock : public FreeMonNetworkInterface { +public: + struct Options { + bool failRegisterHttp{false}; + bool invalidRegister{false}; + }; + + explicit FreeMonNetworkInterfaceMock(executor::ThreadPoolTaskExecutor* threadPool, + Options options) + : _threadPool(threadPool), _options(options) {} + ~FreeMonNetworkInterfaceMock() final = default; + + Future<FreeMonRegistrationResponse> sendRegistrationAsync( + const FreeMonRegistrationRequest& req) final { + log() << "Sending Registration ..."; + + _registers.addAndFetch(1); + + Promise<FreeMonRegistrationResponse> promise; + auto future = promise.getFuture(); + auto shared_promise = promise.share(); + + auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this]( + const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + + if (_options.failRegisterHttp) { + shared_promise.setError( + Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure")); + return; + } + + auto resp = FreeMonRegistrationResponse(); + resp.setVersion(1); + + if (_options.invalidRegister) { + resp.setVersion(42); + } + + if (req.getId().is_initialized()) { + resp.setId(req.getId().get()); + } else { + resp.setId(UUID::gen().toString()); + } + + resp.setReportingInterval(1); + + shared_promise.emplaceValue(resp); + }); + ASSERT_OK(swSchedule.getStatus()); + + return future; + } + + Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) final { + log() << "Sending Metrics ..."; + ASSERT_FALSE(req.getId().empty()); + + _metrics.addAndFetch(1); + + Promise<FreeMonMetricsResponse> promise; + auto future = promise.getFuture(); + auto shared_promise = promise.share(); + + auto swSchedule = _threadPool->scheduleWork( + [shared_promise, req](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + auto resp = FreeMonMetricsResponse(); + resp.setVersion(1); + resp.setReportingInterval(1); + + shared_promise.emplaceValue(resp); + }); + ASSERT_OK(swSchedule.getStatus()); + + + return future; + } + + int32_t getRegistersCalls() const { + return _registers.load(); + } + int32_t getMetricsCalls() const { + return _metrics.load(); + } + +private: + AtomicInt32 _registers; + AtomicInt32 _metrics; + + executor::ThreadPoolTaskExecutor* _threadPool; + + Options _options; +}; + +class FreeMonControllerTest : public ServiceContextMongoDTest { + +private: + void setUp() final; + void tearDown() final; + +protected: + /** + * Looks up the current ReplicationCoordinator. + * The result is cast to a ReplicationCoordinatorMock to provide access to test features. + */ + repl::ReplicationCoordinatorMock* _getReplCoord() const; + + ServiceContext::UniqueOperationContext _opCtx; + + executor::NetworkInterfaceMock* _mockNetwork{nullptr}; + + std::unique_ptr<executor::ThreadPoolTaskExecutor> _mockThreadPool; +}; + +void FreeMonControllerTest::setUp() { + ServiceContextMongoDTest::setUp(); + auto service = getServiceContext(); + + repl::ReplicationCoordinator::set(service, + std::make_unique<repl::ReplicationCoordinatorMock>(service)); + + // Set up a NetworkInterfaceMock. Note, unlike NetworkInterfaceASIO, which has its own pool of + // threads, tasks in the NetworkInterfaceMock must be carried out synchronously by the (single) + // thread the unit test is running on. + auto netForFixedTaskExecutor = std::make_unique<executor::NetworkInterfaceMock>(); + _mockNetwork = netForFixedTaskExecutor.get(); + + // Set up a ThreadPoolTaskExecutor. Note, for local tasks this TaskExecutor uses a + // ThreadPoolMock, and for remote tasks it uses the NetworkInterfaceMock created above. However, + // note that the ThreadPoolMock uses the NetworkInterfaceMock's threads to run tasks, which is + // again just the (single) thread the unit test is running on. Therefore, all tasks, local and + // remote, must be carried out synchronously by the test thread. + _mockThreadPool = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor)); + + _mockThreadPool->startup(); + + _opCtx = cc().makeOperationContext(); + + //_storage = stdx::make_unique<repl::StorageInterfaceImpl>(); + repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>()); + + + // Transition to PRIMARY so that the server can accept writes. + ASSERT_OK(_getReplCoord()->setFollowerMode(repl::MemberState::RS_PRIMARY)); + + + // Create collection with one document. + CollectionOptions collectionOptions; + collectionOptions.uuid = UUID::gen(); + + auto statusCC = repl::StorageInterface::get(service)->createCollection( + _opCtx.get(), NamespaceString("admin", "system.version"), collectionOptions); + ASSERT_OK(statusCC); +} + +void FreeMonControllerTest::tearDown() { + _opCtx = {}; + ServiceContextMongoDTest::tearDown(); +} + +repl::ReplicationCoordinatorMock* FreeMonControllerTest::_getReplCoord() const { + auto replCoord = repl::ReplicationCoordinator::get(_opCtx.get()); + ASSERT(replCoord) << "No ReplicationCoordinator installed"; + auto replCoordMock = dynamic_cast<repl::ReplicationCoordinatorMock*>(replCoord); + ASSERT(replCoordMock) << "Unexpected type for installed ReplicationCoordinator"; + return replCoordMock; +} + +#define ASSERT_RANGE(target, lower, upper) \ + { \ + auto __x = counter.getNextDuration(); \ + ASSERT_GTE(__x, target + lower); \ + ASSERT_LTE(__x, target + upper); \ + } + +// Positive: Ensure deadlines sort properly +TEST(FreeMonRetryTest, TestRegistration) { + PseudoRandom random(0); + RegistrationRetryCounter counter(random); + counter.reset(); + + ASSERT_EQ(counter.getNextDuration(), Seconds(1)); + ASSERT_EQ(counter.getNextDuration(), Seconds(1)); + + for (int j = 0; j < 3; j++) { + // Fail requests + for (int i = 1; i <= 10; ++i) { + ASSERT_TRUE(counter.incrementError()); + + int64_t base = pow(2, i); + ASSERT_RANGE(Seconds(base), Seconds(2), Seconds(10)); + } + + ASSERT_TRUE(counter.incrementError()); + ASSERT_RANGE(Seconds(1024), Seconds(60), Seconds(120)); + ASSERT_TRUE(counter.incrementError()); + ASSERT_RANGE(Seconds(1024), Seconds(60), Seconds(120)); + + counter.reset(); + } +} + +// Positive: Ensure the response is validated correctly +TEST(AFreeMonProcessorTest, TestResponseValidation) { + ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: bad protocol version + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 42LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: halt uploading + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << true << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large registartation id + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" << std::string(5000, 'a') + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large URL + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << std::string(5000, 'b') + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large message + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << std::string(5000, 'c') + << "reportingInterval" + << 1LL)))); + + // Negative: too small a reporting interval + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 0LL)))); + + // Negative: too large a reporting interval + ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << (60LL * 60 * 24 + 1LL))))); +} + +// Positive: Test Register works +TEST_F(FreeMonControllerTest, TestRegister) { + // FreeMonNetworkInterfaceMock network; + FreeMonController controller( + std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock( + _mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()))); + + controller.start(RegistrationType::DoNotRegister); + + ASSERT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(5)))); + + ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); + + controller.stop(); +} + +// Negatve: Test Register times out if network stack drops messages +TEST_F(FreeMonControllerTest, TestRegisterTimeout) { + + FreeMonNetworkInterfaceMock::Options opts; + opts.failRegisterHttp = true; + auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>( + new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts)); + auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get()); + FreeMonController controller(std::move(networkUnique)); + + controller.start(RegistrationType::DoNotRegister); + + ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15)))); + + ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized()); + ASSERT_GTE(network->getRegistersCalls(), 2); + + controller.stop(); +} + +// Negatve: Test Register times out if the registration is wrong +TEST_F(FreeMonControllerTest, TestRegisterFail) { + + FreeMonNetworkInterfaceMock::Options opts; + opts.invalidRegister = true; + auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>( + new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts)); + auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get()); + FreeMonController controller(std::move(networkUnique)); + + controller.start(RegistrationType::DoNotRegister); + + ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15)))); + + ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized()); + ASSERT_EQ(network->getRegistersCalls(), 1); + + controller.stop(); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h index e2b4e727463..3ee260f7644 100644 --- a/src/mongo/db/free_mon/free_mon_message.h +++ b/src/mongo/db/free_mon/free_mon_message.h @@ -31,6 +31,7 @@ #include <condition_variable> #include <vector> +#include "mongo/db/free_mon/free_mon_protocol_gen.h" #include "mongo/util/duration.h" #include "mongo/util/time_support.h" @@ -74,6 +75,26 @@ enum class FreeMonMessageType { }; /** + * Supported types of registration that occur on server startup. + */ +enum class RegistrationType { + /** + * Do not register on start because it was not configured via commandline/config file. + */ + DoNotRegister, + + /** + * Register immediately on start since we are a standalone. + */ + RegisterOnStart, + + /** + * Register after transition to becoming primary because we are in a replica set. + */ + RegisterAfterOnTransitionToPrimary, +}; + +/** * Message class that encapsulate a message to the FreeMonMessageProcessor * * Has a type and a deadline for when to start processing the message. @@ -126,4 +147,167 @@ private: }; +/** + * Most messages have a simple payload, and this template ensures we create type-safe messages for + * each message type without copy-pasting repeatedly. + */ +template <FreeMonMessageType typeT> +struct FreeMonPayloadForMessage { + using payload_type = void; +}; + +template <> +struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterComplete> { + using payload_type = FreeMonRegistrationResponse; +}; + +template <> +struct FreeMonPayloadForMessage<FreeMonMessageType::RegisterServer> { + using payload_type = std::pair<RegistrationType, std::vector<std::string>>; +}; + +template <> +struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterFail> { + using payload_type = Status; +}; + +/** + * Message with a generic payload based on the type of message. + */ +template <FreeMonMessageType typeT> +class FreeMonMessageWithPayload : public FreeMonMessage { +public: + using payload_type = typename FreeMonPayloadForMessage<typeT>::payload_type; + + /** + * Create a message that should processed immediately. + */ + static std::shared_ptr<FreeMonMessageWithPayload> createNow(payload_type t) { + return std::make_shared<FreeMonMessageWithPayload>(t, Date_t::min()); + } + + /** + * Get message payload. + */ + const payload_type& getPayload() const { + return _t; + } + +public: + FreeMonMessageWithPayload(payload_type t, Date_t deadline) + : FreeMonMessage(typeT, deadline), _t(std::move(t)) {} + +private: + // Message payload + payload_type _t; +}; + +/** + * Single-shot class that encapsulates a Status and allows a caller to wait for a time. + * + * Basically, a single producer, single consumer queue with one event. + */ +class WaitableResult { +public: + WaitableResult() : _status(Status::OK()) {} + + /** + * Set Status and signal waiter. + */ + void set(Status status) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + invariant(!_set); + if (!_set) { + _set = true; + _status = std::move(status); + _condvar.notify_one(); + } + } + + /** + * Waits for duration until status has been set. + * + * Returns boost::none on timeout. + */ + boost::optional<Status> wait_for(Milliseconds duration) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + if (!_condvar.wait_for(lock, duration.toSystemDuration(), [this]() { return _set; })) { + return {}; + } + + return _status; + } + +private: + // Condition variable to signal consumer + std::condition_variable _condvar; + + // Lock for condition variable and to protect state + std::mutex _mutex; + + // Indicates whether _status has been set + bool _set{false}; + + // Provided status + Status _status; +}; + +/** + * Custom waitable message for Register Command message. + */ +class FreeMonRegisterCommandMessage : public FreeMonMessage { +public: + /** + * Create a message that should processed immediately. + */ + static std::shared_ptr<FreeMonRegisterCommandMessage> createNow( + const std::vector<std::string>& tags) { + return std::make_shared<FreeMonRegisterCommandMessage>(tags, Date_t::min()); + } + + /** + * Create a message that should processed after the specified deadline. + */ + static std::shared_ptr<FreeMonRegisterCommandMessage> createWithDeadline( + const std::vector<std::string>& tags, Date_t deadline) { + return std::make_shared<FreeMonRegisterCommandMessage>(tags, deadline); + } + + /** + * Get tags. + */ + const std::vector<std::string>& getTags() const { + return _tags; + } + + /** + * Set Status and signal waiter. + */ + void setStatus(Status status) { + _waitable.set(std::move(status)); + } + + /** + * Waits for duration until status has been set. + * + * Returns boost::none on timeout. + */ + boost::optional<Status> wait_for(Milliseconds duration) { + return _waitable.wait_for(duration); + } + +public: + FreeMonRegisterCommandMessage(std::vector<std::string> tags, Date_t deadline) + : FreeMonMessage(FreeMonMessageType::RegisterCommand, deadline), _tags(std::move(tags)) {} + +private: + // WaitaleResult to notify caller + WaitableResult _waitable{}; + + // Tags + const std::vector<std::string> _tags; +}; + } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp index aa4008973d5..9d41ba96d38 100644 --- a/src/mongo/db/free_mon/free_mon_mongod.cpp +++ b/src/mongo/db/free_mon/free_mon_mongod.cpp @@ -26,6 +26,8 @@ * then also delete it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/free_mon/free_mon_mongod.h" @@ -42,9 +44,11 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/bsontypes.h" #include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/free_mon/free_mon_controller.h" #include "mongo/db/free_mon/free_mon_http.h" #include "mongo/db/free_mon/free_mon_message.h" #include "mongo/db/free_mon/free_mon_network.h" +#include "mongo/db/free_mon/free_mon_options.h" #include "mongo/db/free_mon/free_mon_protocol_gen.h" #include "mongo/db/free_mon/free_mon_storage.h" #include "mongo/db/ftdc/ftdc_server.h" @@ -56,11 +60,24 @@ #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" +#include "mongo/util/log.h" namespace mongo { namespace { +const auto getFreeMonController = + ServiceContext::declareDecoration<std::unique_ptr<FreeMonController>>(); + +FreeMonController* getGlobalFreeMonController() { + if (!hasGlobalServiceContext()) { + return nullptr; + } + + return getFreeMonController(getGlobalServiceContext()).get(); +} + + /** * Expose cloudFreeMonitoringEndpointURL set parameter to URL for free monitoring. */ @@ -74,7 +91,7 @@ public: Status setFromString(const std::string& str) final { // Check for http, not https here because testEnabled may not be set yet - if (!str.compare(0, 4, "http")) { + if (str.compare(0, 4, "http") != 0) { return Status(ErrorCodes::BadValue, "ExportedFreeMonEndpointURL only supports https:// URLs"); } @@ -94,7 +111,8 @@ public: const FreeMonRegistrationRequest& req) override { BSONObj reqObj = req.toBSON(); - return _client->postAsync(exportedExportedFreeMonEndpointURL.getURL() + "/register", reqObj) + return _client + ->postAsync(exportedExportedFreeMonEndpointURL.getLocked() + "/register", reqObj) .then([](std::vector<uint8_t> blob) { if (blob.empty()) { @@ -119,7 +137,8 @@ public: Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) override { BSONObj reqObj = req.toBSON(); - return _client->postAsync(exportedExportedFreeMonEndpointURL.getURL() + "/metrics", reqObj) + return _client + ->postAsync(exportedExportedFreeMonEndpointURL.getLocked() + "/metrics", reqObj) .then([](std::vector<uint8_t> blob) { if (blob.empty()) { @@ -145,6 +164,76 @@ private: std::unique_ptr<FreeMonHttpClientInterface> _client; }; +/** + * Collect the mms-automation state document from local.clustermanager during registration. + */ +class FreeMonLocalClusterManagerCollector : public FreeMonCollectorInterface { +public: + std::string name() const final { + return "clustermanager"; + } + + void collect(OperationContext* opCtx, BSONObjBuilder& builder) { + auto optionalObj = FreeMonStorage::readClusterManagerState(opCtx); + if (optionalObj.is_initialized()) { + builder.appendElements(optionalObj.get()); + } + } +}; + +/** + * Get the "storageEngine" section of "serverStatus" during registration. + */ +class FreeMonLocalStorageEngineStatusCollector : public FTDCSimpleInternalCommandCollector { +public: + FreeMonLocalStorageEngineStatusCollector() + : FTDCSimpleInternalCommandCollector( + "serverStatus", + "serverStatus", + "", + // Try to filter server status to make it cheaper to collect. Harmless if we gather + // extra + BSON("serverStatus" << 1 << "storageEngine" << true << "extra_info" << false + << "opLatencies" + << false + << "opcountersRepl" + << false + << "opcounters" + << false + << "transactions" + << false + << "connections" + << false + << "network" + << false + << "tcMalloc" + << false + << "network" + << false + << "wiredTiger" + << false + << "sharding" + << false + << "metrics" + << false)) {} + + std::string name() const final { + return "storageEngine"; + } + + void collect(OperationContext* opCtx, BSONObjBuilder& builder) { + BSONObjBuilder localBuilder; + + FTDCSimpleInternalCommandCollector::collect(opCtx, localBuilder); + + BSONObj obj = localBuilder.obj(); + + builder.appendElements(obj["storageEngine"].Obj()); + } +}; + +} // namespace + auto makeTaskExecutor(ServiceContext* /*serviceContext*/) { ThreadPool::Options tpOptions; @@ -158,10 +247,59 @@ auto makeTaskExecutor(ServiceContext* /*serviceContext*/) { executor::makeNetworkInterface("NetworkInterfaceASIO-FreeMon")); } -} // namespace +void registerCollectors(FreeMonController* controller) { + // These are collected only at registration + // + // CmdBuildInfo + controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "buildInfo", "buildInfo", "", BSON("buildInfo" << 1))); + + // HostInfoCmd + controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "hostInfo", "hostInfo", "", BSON("hostInfo" << 1))); + + // Add storageEngine section from serverStatus + controller->addRegistrationCollector( + stdx::make_unique<FreeMonLocalStorageEngineStatusCollector>()); + + // Gather one document from local.clustermanager + controller->addRegistrationCollector(stdx::make_unique<FreeMonLocalClusterManagerCollector>()); + + // These are periodically for metrics upload + // + controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "getDiagnosticData", "diagnosticData", "", BSON("getDiagnosticData" << 1))); + + // These are collected at registration and as metrics periodically + // + if (repl::ReplicationCoordinator::get(getGlobalServiceContext())->getReplicationMode() != + repl::ReplicationCoordinator::modeNone) { + // CmdReplSetGetConfig + controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "replSetGetConfig", "replSetGetConfig", "", BSON("replSetGetConfig" << 1))); + + controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "replSetGetConfig", "replSetGetConfig", "", BSON("replSetGetConfig" << 1))); + } + controller->addRegistrationCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "isMaster", "isMaster", "", BSON("isMaster" << 1))); + + controller->addMetricsCollector(stdx::make_unique<FTDCSimpleInternalCommandCollector>( + "isMaster", "isMaster", "", BSON("isMaster" << 1))); +} void startFreeMonitoring(ServiceContext* serviceContext) { + if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOff) { + return; + } + + // Check for http, not https here because testEnabled may not be set yet + if (!getTestCommandsEnabled()) { + uassert(50774, + "ExportedFreeMonEndpointURL only supports https:// URLs", + exportedExportedFreeMonEndpointURL.getLocked().compare(0, 5, "https") == 0); + } auto executor = makeTaskExecutor(serviceContext); @@ -175,12 +313,46 @@ void startFreeMonitoring(ServiceContext* serviceContext) { auto network = std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkHttp(std::move(http))); + + auto controller = stdx::make_unique<FreeMonController>(std::move(network)); + + registerCollectors(controller.get()); + + // Install the new controller + auto& staticFreeMon = getFreeMonController(serviceContext); + + staticFreeMon = std::move(controller); + + RegistrationType registrationType = RegistrationType::DoNotRegister; + if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOn) { + // If replication is enabled, we may need to register on becoming primary + if (repl::ReplicationCoordinator::get(getGlobalServiceContext())->getReplicationMode() != + repl::ReplicationCoordinator::modeNone) { + registrationType = RegistrationType::RegisterAfterOnTransitionToPrimary; + } else { + registrationType = RegistrationType::RegisterOnStart; + } + } + + staticFreeMon->start(registrationType); } -void stopFreeMonitoring() {} +void stopFreeMonitoring() { + if (globalFreeMonParams.freeMonitoringState == EnableCloudStateEnum::kOff) { + return; + } -FreeMonHttpClientInterface::~FreeMonHttpClientInterface() = default; + auto controller = getGlobalFreeMonController(); -FreeMonNetworkInterface::~FreeMonNetworkInterface() = default; + if (controller != nullptr) { + controller->stop(); + } +} + +FreeMonController* FreeMonController::get(ServiceContext* serviceContext) { + return getFreeMonController(serviceContext).get(); +} + +FreeMonHttpClientInterface::~FreeMonHttpClientInterface() = default; } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp new file mode 100644 index 00000000000..2713e19be32 --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_processor.cpp @@ -0,0 +1,466 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/free_mon/free_mon_processor.h" + +#include <functional> +#include <tuple> +#include <utility> + +#include "mongo/base/data_range.h" +#include "mongo/base/status.h" +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/free_mon/free_mon_storage.h" +#include "mongo/db/service_context.h" +#include "mongo/idl/idl_parser.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +constexpr auto kProtocolVersion = 1; + +constexpr auto kRegistrationIdMaxLength = 4096; +constexpr auto kInformationalURLMaxLength = 4096; +constexpr auto kInformationalMessageMaxLength = 4096; +constexpr auto kUserReminderMaxLength = 4096; + +constexpr auto kReportingIntervalMinutesMin = 1; +constexpr auto kReportingIntervalMinutesMax = 60 * 60 * 24; + +int64_t randomJitter(PseudoRandom& random, int64_t min, int64_t max) { + dassert(max > min); + return (std::abs(random.nextInt64()) % (max - min)) + min; +} + +} // namespace + +void RegistrationRetryCounter::reset() { + _current = _min; + _base = _min; + _retryCount = 0; + _total = Hours(0); +} + +bool RegistrationRetryCounter::incrementError() { + if (_retryCount < kStage1RetryCountMax) { + _base = 2 * _base; + _current = _base + Seconds(randomJitter(_random, kStage1JitterMin, kStage1JitterMax)); + ++_retryCount; + } else { + _base = _base; + _current = _base + Seconds(randomJitter(_random, kStage2JitterMin, kStage2JitterMax)); + } + + _total += _current; + + if (_total > kStage2DurationMax) { + return false; + } + + return true; +} + +void FreeMonProcessor::enqueue(std::shared_ptr<FreeMonMessage> msg) { + _queue.enqueue(std::move(msg)); +} + +void FreeMonProcessor::stop() { + _queue.stop(); +} + +void FreeMonProcessor::run() { + try { + + Client::initThread("free_mon"); + Client* client = &cc(); + + while (true) { + auto item = _queue.dequeue(client->getServiceContext()->getPreciseClockSource()); + if (!item.is_initialized()) { + // Shutdown was triggered + return; + } + + // Do work here + switch (item.get()->getType()) { + case FreeMonMessageType::RegisterCommand: { + doCommandRegister(client, item.get()); + break; + } + case FreeMonMessageType::RegisterServer: { + doServerRegister( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>*>( + item.get().get())); + break; + } + case FreeMonMessageType::AsyncRegisterComplete: { + doAsyncRegisterComplete( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>*>( + item.get().get())); + break; + } + case FreeMonMessageType::AsyncRegisterFail: { + doAsyncRegisterFail( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>*>( + item.get().get())); + break; + } + default: + MONGO_UNREACHABLE; + } + } + } catch (...) { + // Stop the queue + _queue.stop(); + + warning() << "Uncaught exception in '" << exceptionToStatus() + << "' in free monitoring subsystem. Shutting down the " + "free monitoring subsystem."; + } +} + +void FreeMonProcessor::readState(Client* client) { + + auto optCtx = client->makeOperationContext(); + + auto state = FreeMonStorage::read(optCtx.get()); + + _lastReadState = state; + + if (state.is_initialized()) { + invariant(state.get().getVersion() == kProtocolVersion); + + _state = state.get(); + } else if (!state.is_initialized()) { + // Default the state + _state.setVersion(kProtocolVersion); + _state.setState(StorageStateEnum::enabled); + _state.setRegistrationId(""); + _state.setInformationalURL(""); + _state.setMessage(""); + _state.setUserReminder(""); + } +} + +void FreeMonProcessor::writeState(Client* client) { + + // Do a compare and swap + // Verify the document is the same as the one on disk, if it is the same, then do the update + // If the local document is different, then oh-well we do nothing, and wait until the next round + + // Has our in-memory state changed, if so consider writing + if (_lastReadState != _state) { + + // The read and write are bound the same operation context + { + auto optCtx = client->makeOperationContext(); + + auto state = FreeMonStorage::read(optCtx.get()); + + // If our in-memory copy matches the last read, then write it to disk + if (state == _lastReadState) { + FreeMonStorage::replace(optCtx.get(), _state); + } + } + } +} + +void FreeMonProcessor::doServerRegister( + Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg) { + + // If we are asked to register now, then kick off a registration request + if (msg->getPayload().first == RegistrationType::RegisterOnStart) { + enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + } else if (msg->getPayload().first == RegistrationType::RegisterAfterOnTransitionToPrimary) { + // Check if we need to wait to become primary + // If the 'admin.system.version' has content, do not wait and just re-register + // If the collection is empty, wait until we become primary + // If we become secondary, OpObserver hooks will tell us our registration id + + auto optCtx = client->makeOperationContext(); + + // Check if there is an existing document + auto state = FreeMonStorage::read(optCtx.get()); + + // If there is no document, we may be in a replica set and may need to register after + // becoming primary + // since we cannot record the registration id until after becoming primary + if (!state.is_initialized()) { + // TODO: hook OnTransitionToPrimary instead of this hack + enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + } else { + // If we have state, then we can do the normal register on startup + enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + } + } +} + +namespace { +template <typename T> +std::unique_ptr<Future<void>> doAsyncCallback(FreeMonProcessor* proc, + Future<T> future, + std::function<void(const T&)> onSuccess, + std::function<void(Status)> onErrorFunc) { + + // Grab a weak_ptr to be sure that FreeMonProcessor is alive during the callback + std::weak_ptr<FreeMonProcessor> wpProc(proc->shared_from_this()); + + auto spError = std::make_shared<bool>(false); + + return std::make_unique<Future<void>>(std::move(future) + .onError([=](Status s) { + *(spError.get()) = true; + if (auto spProc = wpProc.lock()) { + onErrorFunc(s); + } + + return T(); + }) + .then([=](const auto& resp) { + // If we hit an error, then do not call onSuccess + if (*(spError.get()) == true) { + return; + } + + // Use a shared pointer here because the callback + // could return after we disappear + if (auto spProc = wpProc.lock()) { + onSuccess(resp); + } + })); +} +} // namespace + +void FreeMonProcessor::doCommandRegister(Client* client, + std::shared_ptr<FreeMonMessage> sharedMsg) { + auto msg = checked_cast<FreeMonRegisterCommandMessage*>(sharedMsg.get()); + + if (_futureRegistrationResponse) { + msg->setStatus(Status(ErrorCodes::FreeMonHttpInFlight, + "Free Monitoring Registration request in-flight already")); + return; + } + + _pendingRegisters.push_back(sharedMsg); + + readState(client); + + FreeMonRegistrationRequest req; + + if (!_state.getRegistrationId().empty()) { + req.setId(_state.getRegistrationId()); + } + + req.setVersion(kProtocolVersion); + + if (!msg->getTags().empty()) { + // Cache the tags for subsequent retries + _tags = msg->getTags(); + } + + if (!_tags.empty()) { + req.setTag(transformVector(msg->getTags())); + } + + // Collect the data + auto collect = _registration.collect(client); + + req.setPayload(std::get<0>(collect)); + + // Send the async request + _futureRegistrationResponse = doAsyncCallback<FreeMonRegistrationResponse>( + this, + _network->sendRegistrationAsync(req), + [this](const auto& resp) { + this->enqueue( + FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>::createNow( + resp)); + }, + [this](Status s) { + this->enqueue( + FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>::createNow(s)); + }); +} + +Status FreeMonProcessor::validateRegistrationResponse(const FreeMonRegistrationResponse& resp) { + // Any validation failure stops registration from proceeding to upload + if (resp.getVersion() != kProtocolVersion) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() + << "Unexpected registration response protocol version, expected '" + << kProtocolVersion + << "', received '" + << resp.getVersion() + << "'"); + } + + if (resp.getId().size() >= kRegistrationIdMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Id is '" << resp.getId().size() + << "' bytes in length, maximum allowed length is '" + << kRegistrationIdMaxLength + << "'"); + } + + if (resp.getInformationalURL().size() >= kInformationalURLMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "InformationURL is '" << resp.getInformationalURL().size() + << "' bytes in length, maximum allowed length is '" + << kInformationalURLMaxLength + << "'"); + } + + if (resp.getMessage().size() >= kInformationalMessageMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Message is '" << resp.getMessage().size() + << "' bytes in length, maximum allowed length is '" + << kInformationalMessageMaxLength + << "'"); + } + + if (resp.getUserReminder().is_initialized() && + resp.getUserReminder().get().size() >= kUserReminderMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "UserReminder is '" << resp.getUserReminder().get().size() + << "' bytes in length, maximum allowed length is '" + << kUserReminderMaxLength + << "'"); + } + + if (resp.getReportingInterval() < kReportingIntervalMinutesMin || + resp.getReportingInterval() > kReportingIntervalMinutesMax) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Reporting Interval '" << resp.getReportingInterval() + << "' must be in the range [" + << kReportingIntervalMinutesMin + << "," + << kReportingIntervalMinutesMax + << "]"); + } + + // Did cloud ask us to stop uploading? + if (resp.getHaltMetricsUploading()) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Halting metrics upload due to response"); + } + + return Status::OK(); +} + + +void FreeMonProcessor::notifyPendingRegisters(const Status s) { + for (auto&& pendingRegister : _pendingRegisters) { + (checked_cast<FreeMonRegisterCommandMessage*>(pendingRegister.get()))->setStatus(s); + } + _pendingRegisters.clear(); +} + +void FreeMonProcessor::doAsyncRegisterComplete( + Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg) { + + // Our request is no longer in-progress so delete it + _futureRegistrationResponse.reset(); + + auto& resp = msg->getPayload(); + + Status s = validateRegistrationResponse(resp); + if (!s.isOK()) { + warning() << "Free Monitoring registration halted due to " << s; + + notifyPendingRegisters(s); + + // If validation fails, we do not retry + return; + } + + // Update in-memory state + _registrationRetry.setMin(Seconds(resp.getReportingInterval())); + + _state.setRegistrationId(resp.getId()); + + if (resp.getUserReminder().is_initialized()) { + _state.setUserReminder(resp.getUserReminder().get()); + } else { + _state.setUserReminder(""); + } + + _state.setMessage(resp.getMessage()); + _state.setInformationalURL(resp.getInformationalURL()); + + // Persist state + writeState(client); + + // Reset retry counter + _registrationRetry.reset(); + + // Notify waiters + notifyPendingRegisters(Status::OK()); + + // TODO: Enqueue next metrics upload + // enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCallTimer, + // _registrationRetry.getNextDeadline(client))); +} + +void FreeMonProcessor::doAsyncRegisterFail( + Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>* msg) { + + // Our request is no longer in-progress so delete it + _futureRegistrationResponse.reset(); + + if (!_registrationRetry.incrementError()) { + // We have exceeded our retry + warning() << "Free Monitoring is abandoning registration after excess retries"; + return; + } + + LOG(1) << "Free Monitoring Registration Failed, " << msg->getPayload() << ", retrying in " + << _registrationRetry.getNextDuration(); + + // Enqueue a register retry + enqueue(FreeMonRegisterCommandMessage::createWithDeadline( + _tags, _registrationRetry.getNextDeadline(client))); +} + +void FreeMonProcessor::doUnregister(Client* /*client*/) {} + +} // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_processor.h b/src/mongo/db/free_mon/free_mon_processor.h new file mode 100644 index 00000000000..ec09286addd --- /dev/null +++ b/src/mongo/db/free_mon/free_mon_processor.h @@ -0,0 +1,264 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ +#pragma once + +#include <boost/optional.hpp> +#include <cstdint> +#include <memory> +#include <ratio> +#include <string> +#include <vector> + +#include "mongo/db/client.h" +#include "mongo/db/free_mon/free_mon_message.h" +#include "mongo/db/free_mon/free_mon_network.h" +#include "mongo/db/free_mon/free_mon_processor.h" +#include "mongo/db/free_mon/free_mon_protocol_gen.h" +#include "mongo/db/free_mon/free_mon_queue.h" +#include "mongo/db/free_mon/free_mon_storage_gen.h" +#include "mongo/db/ftdc/collector.h" +#include "mongo/db/service_context.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/duration.h" +#include "mongo/util/future.h" +#include "mongo/util/time_support.h" + +namespace mongo { +using FreeMonCollectorInterface = FTDCCollectorInterface; +using FreeMonCollectorCollection = FTDCCollectorCollection; + +/** + * Reponsible for tracking when to send the next retry after errors are encountered. + */ +class RetryCounter { + const int64_t kMax = 60 * 60 * 24; + +public: + RetryCounter() : _min(1), _max(kMax) {} + virtual ~RetryCounter() = default; + + /** + * Set Minimum rety interval + */ + void setMin(Seconds s) { + _min = s; + reset(); + } + + /** + * Reset the retry interval, typically occurs after a succesfull message is sent. + */ + virtual void reset() = 0; + + /** + * Increment the error count and compute the next interval. + */ + virtual bool incrementError() = 0; + + /** + * Get the next retry duration. + */ + Seconds getNextDuration() { + dassert(_current != Seconds(0)); + return _current; + } + + /** + * Get the next retry deadline + */ + Date_t getNextDeadline(Client* client) { + return client->getServiceContext()->getPreciseClockSource()->now() + _current; + } + +protected: + // Current retry interval + Seconds _current; + + // Minimum retry interval + Seconds _min; + + // Maximum retry interval + Seconds _max; +}; + +/** + * Manage retries for registrations + */ +class RegistrationRetryCounter : public RetryCounter { +public: + explicit RegistrationRetryCounter(PseudoRandom& random) : _random(random) {} + + void reset() final; + + bool incrementError() final; + +private: + // Random number generator for jitter + PseudoRandom& _random; + + // Retry count for stage 1 retry + size_t _retryCount{0}; + + // Total Seconds we have retried for + Seconds _total; + + // Last retry interval without jitter + Seconds _base; + + // Max Retry count + const size_t kStage1RetryCountMax{10}; + + const size_t kStage1JitterMin{2}; + const size_t kStage1JitterMax{10}; + + const Hours kStage2DurationMax{48}; + + const size_t kStage2JitterMin{60}; + const size_t kStage2JitterMax{120}; +}; + +/** + * Process in an Agent in a Agent/Message Passing model. + * + * Messages are given to it by enqueue, and the Processor processes messages with run(). + */ +class FreeMonProcessor : public std::enable_shared_from_this<FreeMonProcessor> { +public: + FreeMonProcessor(FreeMonCollectorCollection& registration, + FreeMonCollectorCollection& metrics, + FreeMonNetworkInterface* network) + : _registration(registration), + _metrics(metrics), + _network(network), + _random(Date_t::now().asInt64()), + _registrationRetry(_random) { + _registrationRetry.reset(); + } + + /** + * Enqueue a message to process + */ + void enqueue(std::shared_ptr<FreeMonMessage> msg); + + /** + * Stop processing messages. + */ + void stop(); + + /** + * Processes messages forever + */ + void run(); + + /** + * Validate the registration response. Public for unit testing. + */ + static Status validateRegistrationResponse(const FreeMonRegistrationResponse& resp); + +private: + /** + * Read the state from the database. + */ + void readState(Client* client); + + /** + * Write the state to disk if there are any changes. + */ + void writeState(Client* client); + + /** + * Process a registration from a command. + */ + void doCommandRegister(Client* client, std::shared_ptr<FreeMonMessage> sharedMsg); + + /** + * Process a registration from configuration. + */ + void doServerRegister(Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg); + + /** + * Process unregistration. + */ + void doUnregister(Client* client); + + /** + * Process a successful HTTP request. + */ + void doAsyncRegisterComplete( + Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg); + + /** + * Process an unsuccessful HTTP request. + */ + void doAsyncRegisterFail( + Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>* msg); + + /** + * Notify any command registers that are waiting. + */ + void notifyPendingRegisters(const Status s); + +private: + // Collection of collectors to send on registration + FreeMonCollectorCollection& _registration; + + // Collection of collectors to send on each metrics call + FreeMonCollectorCollection& _metrics; + + // HTTP Network interface + FreeMonNetworkInterface* _network; + + // Random number generator for retries + PseudoRandom _random; + + // Registration Retry logic + RegistrationRetryCounter _registrationRetry; + + // List of tags from server configuration registration + std::vector<std::string> _tags; + + // In-flight registration response + std::unique_ptr<Future<void>> _futureRegistrationResponse; + + // List of command registers waiting to be told about registration + std::vector<std::shared_ptr<FreeMonMessage>> _pendingRegisters; + + // Last read storage state + boost::optional<FreeMonStorageState> _lastReadState; + + // Pending update to disk + FreeMonStorageState _state; + + // Message queue + FreeMonMessageQueue _queue; +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/ftdc_server.h b/src/mongo/db/ftdc/ftdc_server.h index 02357c24e6f..c5cb2f83e18 100644 --- a/src/mongo/db/ftdc/ftdc_server.h +++ b/src/mongo/db/ftdc/ftdc_server.h @@ -81,7 +81,7 @@ void stopFTDC(); /** * A simple FTDC Collector that runs Commands. */ -class FTDCSimpleInternalCommandCollector final : public FTDCCollectorInterface { +class FTDCSimpleInternalCommandCollector : public FTDCCollectorInterface { public: FTDCSimpleInternalCommandCollector(StringData command, StringData name, |