diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/free_mon/free_mon_controller_test.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_processor.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_processor.h | 24 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_protocol.idl | 9 |
4 files changed, 157 insertions, 24 deletions
diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp index 76ba859fdee..43481e870e4 100644 --- a/src/mongo/db/free_mon/free_mon_controller_test.cpp +++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp @@ -228,6 +228,8 @@ public: bool haltMetrics{false}; bool fail2MetricsUploads{false}; bool permanentlyDeleteAfter3{false}; + + bool resendRegistrationAfter3{false}; }; explicit FreeMonNetworkInterfaceMock(executor::ThreadPoolTaskExecutor* threadPool, @@ -333,6 +335,10 @@ public: resp.setPermanentlyDelete(true); } + if (_options.resendRegistrationAfter3 && _metrics.loadRelaxed() == 3) { + resp.setResendRegistration(true); + } + return resp; } @@ -551,6 +557,29 @@ TEST(FreeMonProcessorTest, TestRegistrationResponseValidation) { << "reportingInterval" << 30 * 60 * 60 * 24LL)))); + // Positive: version 2 + ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 2LL << "haltMetricsUploading" << false << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Positive: empty registration id string + ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "id" + << "" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); // Negative: bad protocol version ASSERT_NOT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( @@ -652,7 +681,38 @@ TEST(FreeMonProcessorTest, TestMetricsResponseValidation) { << "reportingInterval" << 1LL)))); - // max reporting interval + // Positive: Support version 2 + ASSERT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + + BSON("version" << 2LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Positive: Add resendRegistration + ASSERT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + + BSON("version" << 2LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL + << "resendRegistration" + << true)))); + + + // Positive: max reporting interval ASSERT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( IDLParserErrorContext("foo"), @@ -1192,6 +1252,28 @@ TEST_F(FreeMonControllerTest, TestPreRegistrationMetricBatching) { ASSERT_EQ(controller.network->getLastMetrics().nFields(), 2); } +// Positive: resend registration in metrics response +TEST_F(FreeMonControllerTest, TestResendRegistration) { + FreeMonNetworkInterfaceMock::Options opts; + opts.resendRegistrationAfter3 = true; + + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller.start(RegistrationType::RegisterAfterOnTransitionToPrimary); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerServer().registerCommand().collect(2)); + + ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); + + controller->turnCrankForTest( + Turner().metricsSend(3).collect(3).registerCommand().metricsSend(1)); + + ASSERT_EQ(controller.registerCollector->count(), 2UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL); +} + #if 0 // Negative: Test metrics buffers on failure, and retries and ensure 2 metrics occurs after a blip // of an error @@ -1372,13 +1454,13 @@ TEST_F(FreeMonControllerRSTest, StepdownDuringRegistration) { // Finish registration controller->turnCrankForTest(1); - controller->turnCrankForTest(Turner().metricsSend().collect(1)); + controller->turnCrankForTest(Turner().metricsSend().collect(2)); // Registration cannot write back to the local store so remain in pending ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::pending); ASSERT_EQ(controller.registerCollector->count(), 1UL); - ASSERT_EQ(controller.metricsCollector->count(), 2UL); + ASSERT_EQ(controller.metricsCollector->count(), 3UL); } // Negative: Tricky: Primary becomes secondary during metrics send diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp index 6e73596b8a1..2c1452a8738 100644 --- a/src/mongo/db/free_mon/free_mon_processor.cpp +++ b/src/mongo/db/free_mon/free_mon_processor.cpp @@ -52,7 +52,8 @@ namespace mongo { namespace { -constexpr auto kProtocolVersion = 1; +constexpr auto kMinProtocolVersion = 1; +constexpr auto kMaxProtocolVersion = 2; constexpr auto kStorageVersion = 1; constexpr auto kRegistrationIdMaxLength = 4096; @@ -269,13 +270,13 @@ void FreeMonProcessor::readState(OperationContext* opCtx) { _lastReadState = state; if (state.is_initialized()) { - invariant(state.get().getVersion() == kProtocolVersion); + invariant(state.get().getVersion() == kStorageVersion); _state = state.get(); } else if (!state.is_initialized()) { // Default the state auto state = _state.synchronize(); - state->setVersion(kProtocolVersion); + state->setVersion(kStorageVersion); state->setState(StorageStateEnum::disabled); state->setRegistrationId(""); state->setInformationalURL(""); @@ -411,7 +412,7 @@ void FreeMonProcessor::doCommandRegister(Client* client, req.setId(regid); } - req.setVersion(kProtocolVersion); + req.setVersion(kMaxProtocolVersion); req.setLocalTime(client->getServiceContext()->getPreciseClockSource()->now()); @@ -431,6 +432,7 @@ void FreeMonProcessor::doCommandRegister(Client* client, // Record that the registration is pending _state->setState(StorageStateEnum::pending); + _registrationStatus = FreeMonRegistrationStatus::kPending; writeState(client); @@ -451,12 +453,14 @@ void FreeMonProcessor::doCommandRegister(Client* client, Status FreeMonProcessor::validateRegistrationResponse(const FreeMonRegistrationResponse& resp) { // Any validation failure stops registration from proceeding to upload - if (resp.getVersion() != kProtocolVersion) { + if (!(resp.getVersion() >= kMinProtocolVersion && resp.getVersion() <= kMaxProtocolVersion)) { return Status(ErrorCodes::FreeMonHttpPermanentFailure, str::stream() - << "Unexpected registration response protocol version, expected '" - << kProtocolVersion - << "', received '" + << "Unexpected registration response protocol version, expected (" + << kMinProtocolVersion + << ", " + << kMaxProtocolVersion + << "), received '" << resp.getVersion() << "'"); } @@ -525,11 +529,13 @@ void FreeMonProcessor::notifyPendingRegisters(const Status s) { Status FreeMonProcessor::validateMetricsResponse(const FreeMonMetricsResponse& resp) { // Any validation failure stops registration from proceeding to upload - if (resp.getVersion() != kProtocolVersion) { + if (!(resp.getVersion() >= kMinProtocolVersion && resp.getVersion() <= kMaxProtocolVersion)) { return Status(ErrorCodes::FreeMonHttpPermanentFailure, - str::stream() << "Unexpected metrics response protocol version, expected '" - << kProtocolVersion - << "', received '" + str::stream() << "Unexpected metrics response protocol version, expected (" + << kMinProtocolVersion + << ", " + << kMaxProtocolVersion + << "), received '" << resp.getVersion() << "'"); } @@ -598,7 +604,7 @@ void FreeMonProcessor::doAsyncRegisterComplete( // Our request is no longer in-progress so delete it _futureRegistrationResponse.reset(); - if (_state->getState() != StorageStateEnum::pending) { + if (_registrationStatus != FreeMonRegistrationStatus::kPending) { notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled")); return; @@ -612,6 +618,7 @@ void FreeMonProcessor::doAsyncRegisterComplete( // Disable on any error _state->setState(StorageStateEnum::disabled); + _registrationStatus = FreeMonRegistrationStatus::kDisabled; // Persist state writeState(client); @@ -642,6 +649,8 @@ void FreeMonProcessor::doAsyncRegisterComplete( state->setState(StorageStateEnum::enabled); } + _registrationStatus = FreeMonRegistrationStatus::kEnabled; + // Persist state writeState(client); @@ -663,7 +672,7 @@ void FreeMonProcessor::doAsyncRegisterFail( // Our request is no longer in-progress so delete it _futureRegistrationResponse.reset(); - if (_state->getState() != StorageStateEnum::pending) { + if (_registrationStatus != FreeMonRegistrationStatus::kPending) { notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled")); return; @@ -689,6 +698,7 @@ void FreeMonProcessor::doCommandUnregister( readState(client); _state->setState(StorageStateEnum::disabled); + _registrationStatus = FreeMonRegistrationStatus::kDisabled; writeState(client); @@ -733,16 +743,18 @@ std::string compressMetrics(MetricsBuffer& buffer) { void FreeMonProcessor::doMetricsSend(Client* client) { readState(client); - if (_state->getState() != StorageStateEnum::enabled) { + // Only continue metrics send if the local disk state (in-case user deleted local document) + // and in-memory status both say to continue. + if (_registrationStatus != FreeMonRegistrationStatus::kEnabled || + _state->getState() != StorageStateEnum::enabled) { // If we are recently disabled, then stop sending metrics return; } // Build outbound request FreeMonMetricsRequest req; - invariant(!_state->getRegistrationId().empty()); - req.setVersion(kProtocolVersion); + req.setVersion(kMaxProtocolVersion); req.setLocalTime(client->getServiceContext()->getPreciseClockSource()->now()); req.setEncoding(MetricsEncodingEnum::snappy); @@ -781,6 +793,8 @@ void FreeMonProcessor::doAsyncMetricsComplete( // Disable free monitoring on validation errors _state->setState(StorageStateEnum::disabled); + _registrationStatus = FreeMonRegistrationStatus::kDisabled; + writeState(client); // If validation fails, we do not retry @@ -793,6 +807,7 @@ void FreeMonProcessor::doAsyncMetricsComplete( FreeMonStorage::deleteState(opCtxUnique.get()); _state->setState(StorageStateEnum::pending); + _registrationStatus = FreeMonRegistrationStatus::kDisabled; // Clear out the in-memory state _lastReadState = boost::none; @@ -833,9 +848,13 @@ void FreeMonProcessor::doAsyncMetricsComplete( _metricsRetry->setMin(Seconds(resp.getReportingInterval())); _metricsRetry->reset(); - // Enqueue next metrics upload - enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend, - _metricsRetry->getNextDeadline(client))); + if (resp.getResendRegistration().is_initialized() && resp.getResendRegistration()) { + enqueue(FreeMonRegisterCommandMessage::createNow(_tags)); + } else { + // Enqueue next metrics upload + enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend, + _metricsRetry->getNextDeadline(client))); + } } void FreeMonProcessor::doAsyncMetricsFail( @@ -952,6 +971,7 @@ void FreeMonProcessor::doNotifyOnDelete(Client* client) { // So we mark the internal state as disabled which stop registration and metrics send _state->setState(StorageStateEnum::pending); + _registrationStatus = FreeMonRegistrationStatus::kDisabled; // Clear out the in-memory state _lastReadState = boost::none; diff --git a/src/mongo/db/free_mon/free_mon_processor.h b/src/mongo/db/free_mon/free_mon_processor.h index cfaf1b504ad..f16a810f5b7 100644 --- a/src/mongo/db/free_mon/free_mon_processor.h +++ b/src/mongo/db/free_mon/free_mon_processor.h @@ -272,6 +272,27 @@ private: size_t _count; }; +/** + * In-memory registration status + * + * Ensures primaries and secondaries register separately + */ +enum class FreeMonRegistrationStatus { + /** + * Free monitoring is not enabled - default state. + */ + kDisabled, + + /** + * Registration in progress. + */ + kPending, + + /** + * Free Monitoring is enabled. + */ + kEnabled, +}; /** * Process in an Agent in a Agent/Message Passing model. @@ -479,6 +500,9 @@ private: // Pending update to disk boost::synchronized_value<FreeMonStorageState> _state; + // In-memory registration status + FreeMonRegistrationStatus _registrationStatus{FreeMonRegistrationStatus::kDisabled}; + // Countdown launch to support manual cranking FreeMonCountdownLatch _countdown; diff --git a/src/mongo/db/free_mon/free_mon_protocol.idl b/src/mongo/db/free_mon/free_mon_protocol.idl index b08e0638a0f..30fd9081ad3 100644 --- a/src/mongo/db/free_mon/free_mon_protocol.idl +++ b/src/mongo/db/free_mon/free_mon_protocol.idl @@ -95,7 +95,10 @@ structs: description: "Metrics Blob" type: bindata_generic - + # History + # ------- + # Version 2 - added resendRegistration bool + # FreeMonMetricsResponse: description: "Metrics Response from Cloud Server" fields: @@ -127,3 +130,7 @@ structs: description: "Message to display to user to remind them about service" type: string optional: true + resendRegistration: + description: "If true, resend registration to server" + type: bool + optional: true |