summaryrefslogtreecommitdiff
path: root/src/mongo/db/free_mon/free_mon_processor.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-05-01 13:37:31 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-05-01 13:37:31 -0400
commit2187223b39c7824d9edc33cccbe84c577081cd81 (patch)
treefbb8f06dbc9256bdca1507c1db00e4473f769b32 /src/mongo/db/free_mon/free_mon_processor.cpp
parent404b3532261866aec6a05a8e4fb7e35c212986d5 (diff)
downloadmongo-2187223b39c7824d9edc33cccbe84c577081cd81.tar.gz
SERVER-34228 Free Monitoring Replica Set support
Diffstat (limited to 'src/mongo/db/free_mon/free_mon_processor.cpp')
-rw-r--r--src/mongo/db/free_mon/free_mon_processor.cpp111
1 files changed, 107 insertions, 4 deletions
diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp
index e5db7630cb8..04620d1cf23 100644
--- a/src/mongo/db/free_mon/free_mon_processor.cpp
+++ b/src/mongo/db/free_mon/free_mon_processor.cpp
@@ -53,6 +53,7 @@ namespace mongo {
namespace {
constexpr auto kProtocolVersion = 1;
+constexpr auto kStorageVersion = 1;
constexpr auto kRegistrationIdMaxLength = 4096;
constexpr auto kInformationalURLMaxLength = 4096;
@@ -224,6 +225,26 @@ void FreeMonProcessor::run() {
msg.get()));
break;
}
+ case FreeMonMessageType::OnTransitionToPrimary: {
+ doOnTransitionToPrimary(client);
+ break;
+ }
+ case FreeMonMessageType::NotifyOnUpsert: {
+ doNotifyOnUpsert(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>*>(
+ msg.get()));
+ break;
+ }
+ case FreeMonMessageType::NotifyOnDelete: {
+ doNotifyOnDelete(client);
+ break;
+ }
+ case FreeMonMessageType::NotifyOnRollback: {
+ doNotifyOnRollback(client);
+ break;
+ }
default:
MONGO_UNREACHABLE;
}
@@ -256,7 +277,7 @@ void FreeMonProcessor::readState(Client* client) {
} else if (!state.is_initialized()) {
// Default the state
_state.setVersion(kProtocolVersion);
- _state.setState(StorageStateEnum::enabled);
+ _state.setState(StorageStateEnum::disabled);
_state.setRegistrationId("");
_state.setInformationalURL("");
_state.setMessage("");
@@ -312,7 +333,7 @@ void FreeMonProcessor::doServerRegister(
// 2. a standalone which has never been registered
//
if (!state.is_initialized()) {
- // TODO: hook OnTransitionToPrimary
+ _registerOnTransitionToPrimary = true;
} else {
// We are standalone, if we have a registration id, then send a registration
// notification, else wait for the user to register us
@@ -324,7 +345,7 @@ void FreeMonProcessor::doServerRegister(
MONGO_UNREACHABLE;
}
- // Enqueue the first metrics gather
+ // Enqueue the first metrics gather unless we are not going to register
enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsCollect));
}
@@ -768,7 +789,6 @@ void FreeMonProcessor::doAsyncMetricsComplete(
// TODO: do we reset only the metrics we send or all pending on success?
_metricsBuffer.reset();
- _metricsRetry.setMin(Seconds(resp.getReportingInterval()));
if (resp.getId().is_initialized()) {
_state.setRegistrationId(resp.getId().get());
@@ -790,6 +810,7 @@ void FreeMonProcessor::doAsyncMetricsComplete(
writeState(client);
// Reset retry counter
+ _metricsRetry.setMin(Seconds(resp.getReportingInterval()));
_metricsRetry.reset();
// Enqueue next metrics upload
@@ -814,4 +835,86 @@ void FreeMonProcessor::doAsyncMetricsFail(
_metricsRetry.getNextDeadline(client)));
}
+void FreeMonProcessor::doOnTransitionToPrimary(Client* client) {
+ if (_registerOnTransitionToPrimary) {
+ enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()));
+
+ // On transition to primary once
+ _registerOnTransitionToPrimary = false;
+ }
+}
+
+void FreeMonProcessor::processInMemoryStateChange(const FreeMonStorageState& originalState,
+ const FreeMonStorageState& newState) {
+ // Are we transition from disabled -> enabled?
+ if (originalState.getState() != newState.getState()) {
+ if (originalState.getState() != StorageStateEnum::enabled &&
+ newState.getState() == StorageStateEnum::enabled) {
+
+ // Secondary needs to start registration
+ enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()));
+ }
+ }
+}
+
+
+void FreeMonProcessor::doNotifyOnUpsert(
+ Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>* msg) {
+ try {
+ const BSONObj& doc = msg->getPayload();
+ auto newState = FreeMonStorageState::parse(IDLParserErrorContext("free_mon_storage"), doc);
+
+ // Likely, the update changed something
+ if (newState != _state) {
+ uassert(50839,
+ str::stream() << "Unexpected free monitoring storage version "
+ << newState.getVersion(),
+ newState.getVersion() == kStorageVersion);
+
+ processInMemoryStateChange(_state, newState);
+
+ // Note: enabled -> disabled is handled implicitly by register and send metrics checks
+ // after _state is updated below
+
+ // Copy the fields
+ _state = newState;
+ }
+
+ } catch (...) {
+
+ // Stop the queue
+ _queue.stop();
+
+ warning() << "Uncaught exception in '" << exceptionToStatus()
+ << "' in free monitoring op observer. Shutting down the "
+ "free monitoring subsystem.";
+ }
+}
+
+void FreeMonProcessor::doNotifyOnDelete(Client* client) {
+ // The config document was either deleted or the entire collection was dropped, we treat them
+ // the same and stop free monitoring. We continue collecting though.
+
+ // So we mark the internal state as disabled which stop registration and metrics send
+ _state.setState(StorageStateEnum::disabled);
+}
+
+void FreeMonProcessor::doNotifyOnRollback(Client* client) {
+ // We have rolled back, the state on disk reflects our new reality
+ // We should re-read the disk state and proceed.
+
+ // copy the in-memory state
+ auto originalState = _state;
+
+ // Re-read state from disk
+ readState(client);
+
+ auto& newState = _state;
+
+ if (newState != originalState) {
+ processInMemoryStateChange(originalState, newState);
+ }
+}
+
+
} // namespace mongo