/**
* Copyright (C) 2018 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include "mongo/db/client.h"
#include "mongo/db/free_mon/free_mon_message.h"
#include "mongo/db/free_mon/free_mon_network.h"
#include "mongo/db/free_mon/free_mon_processor.h"
#include "mongo/db/free_mon/free_mon_protocol_gen.h"
#include "mongo/db/free_mon/free_mon_queue.h"
#include "mongo/db/free_mon/free_mon_storage_gen.h"
#include "mongo/db/ftdc/collector.h"
#include "mongo/db/service_context.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/duration.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
namespace mongo {
using FreeMonCollectorInterface = FTDCCollectorInterface;
using FreeMonCollectorCollection = FTDCCollectorCollection;
/**
* Reponsible for tracking when to send the next retry after errors are encountered.
*/
class RetryCounter {
const int64_t kMax = 60 * 60 * 24;
public:
RetryCounter() : _min(1), _max(kMax) {}
virtual ~RetryCounter() = default;
/**
* Set Minimum rety interval
*/
void setMin(Seconds s) {
_min = s;
reset();
}
/**
* Reset the retry interval, typically occurs after a succesfull message is sent.
*/
virtual void reset() = 0;
/**
* Increment the error count and compute the next interval.
*/
virtual bool incrementError() = 0;
/**
* Get the next retry duration.
*/
Seconds getNextDuration() const {
dassert(_current != Seconds(0));
return _current;
}
/**
* Get the next retry deadline
*/
Date_t getNextDeadline(Client* client) const {
return client->getServiceContext()->getPreciseClockSource()->now() + _current;
}
protected:
// Current retry interval
Seconds _current;
// Minimum retry interval
Seconds _min;
// Maximum retry interval
Seconds _max;
};
/**
* Manage retries for registrations
*/
class RegistrationRetryCounter : public RetryCounter {
public:
explicit RegistrationRetryCounter(PseudoRandom& random) : _random(random) {}
void reset() final;
bool incrementError() final;
size_t getCount() const {
return _retryCount;
}
private:
// Random number generator for jitter
PseudoRandom& _random;
// Retry count for stage 1 retry
size_t _retryCount{0};
// Total Seconds we have retried for
Seconds _total;
// Last retry interval without jitter
Seconds _base;
// Max Retry count
const size_t kStage1RetryCountMax{10};
const size_t kStage1JitterMin{2};
const size_t kStage1JitterMax{10};
const Hours kStage2DurationMax{48};
const size_t kStage2JitterMin{60};
const size_t kStage2JitterMax{120};
};
/**
* Manage retries for metrics
*/
class MetricsRetryCounter : public RetryCounter {
public:
explicit MetricsRetryCounter(PseudoRandom& random) : _random(random) {}
void reset() final;
bool incrementError() final;
size_t getCount() const {
return _retryCount;
}
private:
// Random number generator for jitter
PseudoRandom& _random;
// Retry count for stage 1 retry
size_t _retryCount{0};
// Total Seconds we have retried for
Seconds _total;
// Last retry interval without jitter
Seconds _base;
// Max Duration
const Hours kDurationMax{7 * 24};
};
/**
* Simple bounded buffer of metrics to upload.
*/
class MetricsBuffer {
public:
using container_type = std::deque;
/**
* Add a metric to the buffer. Oldest metric will be discarded if buffer is at capacity.
*/
void push(BSONObj obj) {
if (_queue.size() == kMaxElements) {
_queue.pop_front();
}
_queue.push_back(obj);
}
/**
* Flush the buffer down to kMinElements entries. The last entries are held for cloud.
*/
void reset() {
while (_queue.size() > kMinElements) {
_queue.pop_front();
}
}
container_type::iterator begin() {
return _queue.begin();
}
container_type::iterator end() {
return _queue.end();
}
private:
// Bounded queue of metrics
container_type _queue;
const size_t kMinElements = 1;
const size_t kMaxElements = 10;
};
/**
* Countdown latch for test support in FreeMonProcessor so that a crank can be turned manually.
*/
class FreeMonCountdownLatch {
public:
explicit FreeMonCountdownLatch() : _count(0) {}
/**
* Reset countdown latch wait for N events.
*/
void reset(uint32_t count) {
stdx::lock_guard lock(_mutex);
dassert(_count == 0);
dassert(count > 0);
_count = count;
}
/**
* Count down an event.
*/
void countDown() {
stdx::lock_guard lock(_mutex);
if (_count > 0) {
--_count;
if (_count == 0) {
_condvar.notify_one();
}
}
}
/**
* Wait until the N events specified in reset have occured.
*/
void wait() {
stdx::unique_lock lock(_mutex);
_condvar.wait(lock, [&] { return _count == 0; });
}
private:
// mutex to break count and cond var
stdx::mutex _mutex;
// cond var to signal and wait on
stdx::condition_variable _condvar;
// count of events to wait for
size_t _count;
};
/**
* In-memory registration status
*
* Ensures primaries and secondaries register separately
*/
enum class FreeMonRegistrationStatus {
/**
* Free monitoring is not enabled - default state.
*/
kDisabled,
/**
* Registration in progress.
*/
kPending,
/**
* Free Monitoring is enabled.
*/
kEnabled,
};
/**
* Process in an Agent in a Agent/Message Passing model.
*
* Messages are given to it by enqueue, and the Processor processes messages with run().
*/
class FreeMonProcessor : public std::enable_shared_from_this {
public:
FreeMonProcessor(FreeMonCollectorCollection& registration,
FreeMonCollectorCollection& metrics,
FreeMonNetworkInterface* network,
bool useCrankForTest,
Seconds metricsGatherInterval);
/**
* Enqueue a message to process
*/
void enqueue(std::shared_ptr msg);
/**
* Stop processing messages.
*/
void stop();
/**
* Turn the crank of the message queue by ignoring deadlines for N messages.
*/
void turnCrankForTest(size_t countMessagesToIgnore);
/**
* Processes messages forever
*/
void run();
/**
* Validate the registration response. Public for unit testing.
*/
static Status validateRegistrationResponse(const FreeMonRegistrationResponse& resp);
/**
* Validate the metrics response. Public for unit testing.
*/
static Status validateMetricsResponse(const FreeMonMetricsResponse& resp);
private:
/**
* Read the state from the database.
*/
void readState(OperationContext* opCtx);
/**
* Create a short-lived opCtx and read the state from the database.
*/
void readState(Client* client);
/**
* Write the state to disk if there are any changes.
*/
void writeState(Client* client);
/**
* Process a registration from a command.
*/
void doCommandRegister(Client* client, std::shared_ptr sharedMsg);
/**
* Process a registration from configuration.
*/
void doServerRegister(Client* client,
const FreeMonMessageWithPayload* msg);
/**
* Process unregistration from a command.
*/
void doCommandUnregister(
Client* client,
FreeMonWaitableMessageWithPayload* msg);
/**
* Process a successful HTTP request.
*/
void doAsyncRegisterComplete(
Client* client,
const FreeMonMessageWithPayload* msg);
/**
* Process an unsuccessful HTTP request.
*/
void doAsyncRegisterFail(
Client* client,
const FreeMonMessageWithPayload* msg);
/**
* Notify any command registers that are waiting.
*/
void notifyPendingRegisters(const Status s);
/**
* Upload collected metrics.
*/
void doMetricsCollect(Client* client);
/**
* Upload gathered metrics.
*/
void doMetricsSend(Client* client);
/**
* Process a successful HTTP request.
*/
void doAsyncMetricsComplete(
Client* client,
const FreeMonMessageWithPayload* msg);
/**
* Process an unsuccessful HTTP request.
*/
void doAsyncMetricsFail(
Client* client, const FreeMonMessageWithPayload* msg);
/**
* Process a change to become a replica set primary
*/
void doOnTransitionToPrimary(Client* client);
/**
* Process a notification that storage has received insert or update.
*/
void doNotifyOnUpsert(Client* client,
const FreeMonMessageWithPayload* msg);
/**
* Process a notification that storage has received delete or drop collection.
*/
void doNotifyOnDelete(Client* client);
/**
* Process a notification that storage has rolled back.
*/
void doNotifyOnRollback(Client* client);
/**
* Process a in-memory state transition of state.
*/
void processInMemoryStateChange(const FreeMonStorageState& originalState,
const FreeMonStorageState& newState);
protected:
friend class FreeMonController;
enum FreeMonGetStatusEnum {
kServerStatus,
kCommandStatus,
};
/**
* Populate results for getFreeMonitoringStatus or serverStatus commands.
*/
void getStatus(OperationContext* opCtx, BSONObjBuilder* status, FreeMonGetStatusEnum mode);
private:
// Collection of collectors to send on registration
FreeMonCollectorCollection& _registration;
// Collection of collectors to send on each metrics call
FreeMonCollectorCollection& _metrics;
// HTTP Network interface
FreeMonNetworkInterface* _network;
// Random number generator for retries
PseudoRandom _random;
// Registration Retry logic
boost::synchronized_value _registrationRetry;
// Metrics Retry logic
boost::synchronized_value _metricsRetry;
// Interval for gathering metrics
Seconds _metricsGatherInterval;
// Buffer of metrics to upload
MetricsBuffer _metricsBuffer;
// When did we last send a metrics batch?
boost::synchronized_value> _lastMetricsSend;
// List of tags from server configuration registration
std::vector _tags;
// In-flight registration response
std::unique_ptr> _futureRegistrationResponse;
// List of command registers waiting to be told about registration
std::vector> _pendingRegisters;
// Last read storage state
boost::synchronized_value> _lastReadState;
// When we change to primary, do we register?
RegistrationType _registerOnTransitionToPrimary{RegistrationType::DoNotRegister};
// Pending update to disk
boost::synchronized_value _state;
// In-memory registration status
FreeMonRegistrationStatus _registrationStatus{FreeMonRegistrationStatus::kDisabled};
// Countdown launch to support manual cranking
FreeMonCountdownLatch _countdown;
// Message queue
FreeMonMessageQueue _queue;
};
} // namespace mongo