diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-05-01 13:37:31 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-05-01 13:37:31 -0400 |
commit | 2187223b39c7824d9edc33cccbe84c577081cd81 (patch) | |
tree | fbb8f06dbc9256bdca1507c1db00e4473f769b32 /src/mongo/db/free_mon/free_mon_processor.cpp | |
parent | 404b3532261866aec6a05a8e4fb7e35c212986d5 (diff) | |
download | mongo-2187223b39c7824d9edc33cccbe84c577081cd81.tar.gz |
SERVER-34228 Free Monitoring Replica Set support
Diffstat (limited to 'src/mongo/db/free_mon/free_mon_processor.cpp')
-rw-r--r-- | src/mongo/db/free_mon/free_mon_processor.cpp | 111 |
1 files changed, 107 insertions, 4 deletions
diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp index e5db7630cb8..04620d1cf23 100644 --- a/src/mongo/db/free_mon/free_mon_processor.cpp +++ b/src/mongo/db/free_mon/free_mon_processor.cpp @@ -53,6 +53,7 @@ namespace mongo { namespace { constexpr auto kProtocolVersion = 1; +constexpr auto kStorageVersion = 1; constexpr auto kRegistrationIdMaxLength = 4096; constexpr auto kInformationalURLMaxLength = 4096; @@ -224,6 +225,26 @@ void FreeMonProcessor::run() { msg.get())); break; } + case FreeMonMessageType::OnTransitionToPrimary: { + doOnTransitionToPrimary(client); + break; + } + case FreeMonMessageType::NotifyOnUpsert: { + doNotifyOnUpsert( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>*>( + msg.get())); + break; + } + case FreeMonMessageType::NotifyOnDelete: { + doNotifyOnDelete(client); + break; + } + case FreeMonMessageType::NotifyOnRollback: { + doNotifyOnRollback(client); + break; + } default: MONGO_UNREACHABLE; } @@ -256,7 +277,7 @@ void FreeMonProcessor::readState(Client* client) { } else if (!state.is_initialized()) { // Default the state _state.setVersion(kProtocolVersion); - _state.setState(StorageStateEnum::enabled); + _state.setState(StorageStateEnum::disabled); _state.setRegistrationId(""); _state.setInformationalURL(""); _state.setMessage(""); @@ -312,7 +333,7 @@ void FreeMonProcessor::doServerRegister( // 2. a standalone which has never been registered // if (!state.is_initialized()) { - // TODO: hook OnTransitionToPrimary + _registerOnTransitionToPrimary = true; } else { // We are standalone, if we have a registration id, then send a registration // notification, else wait for the user to register us @@ -324,7 +345,7 @@ void FreeMonProcessor::doServerRegister( MONGO_UNREACHABLE; } - // Enqueue the first metrics gather + // Enqueue the first metrics gather unless we are not going to register enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsCollect)); } @@ -768,7 +789,6 @@ void FreeMonProcessor::doAsyncMetricsComplete( // TODO: do we reset only the metrics we send or all pending on success? _metricsBuffer.reset(); - _metricsRetry.setMin(Seconds(resp.getReportingInterval())); if (resp.getId().is_initialized()) { _state.setRegistrationId(resp.getId().get()); @@ -790,6 +810,7 @@ void FreeMonProcessor::doAsyncMetricsComplete( writeState(client); // Reset retry counter + _metricsRetry.setMin(Seconds(resp.getReportingInterval())); _metricsRetry.reset(); // Enqueue next metrics upload @@ -814,4 +835,86 @@ void FreeMonProcessor::doAsyncMetricsFail( _metricsRetry.getNextDeadline(client))); } +void FreeMonProcessor::doOnTransitionToPrimary(Client* client) { + if (_registerOnTransitionToPrimary) { + enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>())); + + // On transition to primary once + _registerOnTransitionToPrimary = false; + } +} + +void FreeMonProcessor::processInMemoryStateChange(const FreeMonStorageState& originalState, + const FreeMonStorageState& newState) { + // Are we transition from disabled -> enabled? + if (originalState.getState() != newState.getState()) { + if (originalState.getState() != StorageStateEnum::enabled && + newState.getState() == StorageStateEnum::enabled) { + + // Secondary needs to start registration + enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>())); + } + } +} + + +void FreeMonProcessor::doNotifyOnUpsert( + Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>* msg) { + try { + const BSONObj& doc = msg->getPayload(); + auto newState = FreeMonStorageState::parse(IDLParserErrorContext("free_mon_storage"), doc); + + // Likely, the update changed something + if (newState != _state) { + uassert(50839, + str::stream() << "Unexpected free monitoring storage version " + << newState.getVersion(), + newState.getVersion() == kStorageVersion); + + processInMemoryStateChange(_state, newState); + + // Note: enabled -> disabled is handled implicitly by register and send metrics checks + // after _state is updated below + + // Copy the fields + _state = newState; + } + + } catch (...) { + + // Stop the queue + _queue.stop(); + + warning() << "Uncaught exception in '" << exceptionToStatus() + << "' in free monitoring op observer. Shutting down the " + "free monitoring subsystem."; + } +} + +void FreeMonProcessor::doNotifyOnDelete(Client* client) { + // The config document was either deleted or the entire collection was dropped, we treat them + // the same and stop free monitoring. We continue collecting though. + + // So we mark the internal state as disabled which stop registration and metrics send + _state.setState(StorageStateEnum::disabled); +} + +void FreeMonProcessor::doNotifyOnRollback(Client* client) { + // We have rolled back, the state on disk reflects our new reality + // We should re-read the disk state and proceed. + + // copy the in-memory state + auto originalState = _state; + + // Re-read state from disk + readState(client); + + auto& newState = _state; + + if (newState != originalState) { + processInMemoryStateChange(originalState, newState); + } +} + + } // namespace mongo |