summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-20 11:59:06 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2018-04-20 12:38:28 -0400
commit70e47745a78fffcc20b70f8e09a8b80a2b62e3bf (patch)
tree81ed086d27b6f84f6a336b270e83687c66ecfc57 /src
parent697ed6fa176220f770231ab5f7ac337328a5a53c (diff)
downloadmongo-70e47745a78fffcc20b70f8e09a8b80a2b62e3bf.tar.gz
SERVER-34227 FreeMonController - Metrics
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/free_mon/SConscript2
-rw-r--r--src/mongo/db/free_mon/free_mon_commands.cpp2
-rw-r--r--src/mongo/db/free_mon/free_mon_controller.cpp24
-rw-r--r--src/mongo/db/free_mon/free_mon_controller.h15
-rw-r--r--src/mongo/db/free_mon/free_mon_controller_test.cpp915
-rw-r--r--src/mongo/db/free_mon/free_mon_message.h102
-rw-r--r--src/mongo/db/free_mon/free_mon_mongod.cpp1
-rw-r--r--src/mongo/db/free_mon/free_mon_options.cpp1
-rw-r--r--src/mongo/db/free_mon/free_mon_processor.cpp393
-rw-r--r--src/mongo/db/free_mon/free_mon_processor.h181
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.cpp102
-rw-r--r--src/mongo/db/free_mon/free_mon_queue.h19
-rw-r--r--src/mongo/db/free_mon/free_mon_storage.idl1
-rw-r--r--src/mongo/db/free_mon/http_client_curl.cpp104
14 files changed, 1663 insertions, 199 deletions
diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript
index 3047553317a..e2b22b7fc80 100644
--- a/src/mongo/db/free_mon/SConscript
+++ b/src/mongo/db/free_mon/SConscript
@@ -66,7 +66,7 @@ else:
)
-env.CppUnitTest(
+fmEnv.CppUnitTest(
target='free_mon_test',
source=[
'free_mon_controller_test.cpp',
diff --git a/src/mongo/db/free_mon/free_mon_commands.cpp b/src/mongo/db/free_mon/free_mon_commands.cpp
index 1e78bc5b768..c04af62777e 100644
--- a/src/mongo/db/free_mon/free_mon_commands.cpp
+++ b/src/mongo/db/free_mon/free_mon_commands.cpp
@@ -123,7 +123,7 @@ public:
if (cmd.getAction() == SetFreeMonActionEnum::enable) {
optStatus = controller->registerServerCommand(kRegisterSyncTimeout);
} else {
- optStatus = controller->unregisterServerCommand();
+ optStatus = controller->unregisterServerCommand(kRegisterSyncTimeout);
}
if (optStatus) {
diff --git a/src/mongo/db/free_mon/free_mon_controller.cpp b/src/mongo/db/free_mon/free_mon_controller.cpp
index a1fcb4fd214..3c50d193d07 100644
--- a/src/mongo/db/free_mon/free_mon_controller.cpp
+++ b/src/mongo/db/free_mon/free_mon_controller.cpp
@@ -77,8 +77,15 @@ boost::optional<Status> FreeMonController::registerServerCommand(Milliseconds ti
return Status::OK();
}
-Status FreeMonController::unregisterServerCommand() {
- _enqueue(FreeMonMessage::createNow(FreeMonMessageType::UnregisterCommand));
+boost::optional<Status> FreeMonController::unregisterServerCommand(Milliseconds timeout) {
+ auto msg =
+ FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>::createNow(true);
+ _enqueue(msg);
+
+ if (timeout > Milliseconds::min()) {
+ return msg->wait_for(timeout);
+ }
+
return Status::OK();
}
@@ -100,7 +107,7 @@ void FreeMonController::start(RegistrationType registrationType) {
// Start the agent
_processor = std::make_shared<FreeMonProcessor>(
- _registrationCollectors, _metricCollectors, _network.get());
+ _registrationCollectors, _metricCollectors, _network.get(), _useCrankForTest);
_thread = stdx::thread([this] { _processor->run(); });
@@ -144,4 +151,15 @@ void FreeMonController::stop() {
_state = State::kDone;
}
+void FreeMonController::turnCrankForTest(size_t countMessagesToIgnore) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kStarted);
+ }
+
+ log() << "Turning Crank: " << countMessagesToIgnore;
+
+ _processor->turnCrankForTest(countMessagesToIgnore);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_controller.h b/src/mongo/db/free_mon/free_mon_controller.h
index 69c1f811934..0e3aece0694 100644
--- a/src/mongo/db/free_mon/free_mon_controller.h
+++ b/src/mongo/db/free_mon/free_mon_controller.h
@@ -50,8 +50,9 @@ namespace mongo {
*/
class FreeMonController {
public:
- explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network)
- : _network(std::move(network)) {}
+ explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network,
+ bool useCrankForTest = false)
+ : _network(std::move(network)), _useCrankForTest(useCrankForTest) {}
/**
* Initializes free monitoring.
@@ -65,6 +66,11 @@ public:
void stop();
/**
+ * Turn the crank of the message queue by ignoring deadlines for N messages.
+ */
+ void turnCrankForTest(size_t countMessagesToIgnore);
+
+ /**
* Add a metric collector to collect on registration
*/
void addRegistrationCollector(std::unique_ptr<FreeMonCollectorInterface> collector);
@@ -103,7 +109,7 @@ public:
* As with registerServerCommand() above, but undoes registration.
* On complettion of this command, no further metrics will be transmitted.
*/
- Status unregisterServerCommand();
+ boost::optional<Status> unregisterServerCommand(Milliseconds timeout);
// TODO - add these methods
// void getServerStatus(BSONObjBuilder* builder);
@@ -162,6 +168,9 @@ private:
// Background thead for agent
stdx::thread _thread;
+ // Crank for test
+ bool _useCrankForTest;
+
// Background agent
std::shared_ptr<FreeMonProcessor> _processor;
};
diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp
index 9b7af519558..28ac5506f92 100644
--- a/src/mongo/db/free_mon/free_mon_controller_test.cpp
+++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp
@@ -26,14 +26,14 @@
* then also delete it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kFTDC
-
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
#include "mongo/platform/basic.h"
#include <boost/filesystem.hpp>
#include <future>
#include <iostream>
+#include <snappy.h>
#include "mongo/db/free_mon/free_mon_controller.h"
#include "mongo/db/free_mon/free_mon_storage.h"
@@ -61,6 +61,7 @@
#include "mongo/db/service_context_noop.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/rpc/object_check.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/temp_dir.h"
@@ -72,16 +73,168 @@
namespace mongo {
namespace {
+
+class FreeMonMetricsCollectorMock : public FreeMonCollectorInterface {
+public:
+ ~FreeMonMetricsCollectorMock() {
+ // ASSERT_TRUE(_state == State::kStarted);
+ }
+
+ void collect(OperationContext* opCtx, BSONObjBuilder& builder) final {
+ _state = State::kStarted;
+
+ builder.append("mock", "some data");
+
+ {
+ stdx::lock_guard<stdx::mutex> lck(_mutex);
+
+ ++_counter;
+
+ if (_counter == _wait) {
+ _condvar.notify_all();
+ }
+ }
+ }
+
+ std::string name() const final {
+ return "mock";
+ }
+
+ void setSignalOnCount(int c) {
+ _wait = c;
+ }
+
+ std::uint32_t count() {
+ stdx::lock_guard<stdx::mutex> lck(_mutex);
+ return _counter;
+ }
+
+ void wait() {
+ stdx::unique_lock<stdx::mutex> lck(_mutex);
+ while (_counter < _wait) {
+ _condvar.wait(lck);
+ }
+ }
+
+private:
+ /**
+ * Private enum to ensure caller uses class correctly.
+ */
+ enum class State {
+ kNotStarted,
+ kStarted,
+ };
+
+ // state
+ State _state{State::kNotStarted};
+
+ std::uint32_t _counter{0};
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _condvar;
+ std::uint32_t _wait{0};
+};
+
+std::vector<BSONObj> decompressMetrics(ConstDataRange cdr) {
+ std::string outBuffer;
+ snappy::Uncompress(cdr.data(), cdr.length(), &outBuffer);
+
+ std::vector<BSONObj> metrics;
+ ConstDataRangeCursor cdrc(outBuffer.data(), outBuffer.data() + outBuffer.size());
+ while (!cdrc.empty()) {
+ auto swDoc = cdrc.readAndAdvance<Validated<BSONObj>>();
+ ASSERT_OK(swDoc.getStatus());
+ metrics.emplace_back(swDoc.getValue().val.getOwned());
+ }
+
+ return metrics;
+}
+
+/**
+ * Countdown latch that propagates a message.
+ */
+template <typename T>
+class CountdownLatchResult {
+public:
+ CountdownLatchResult(uint32_t count) : _count(count) {}
+
+ /**
+ * Set the count of events to wait for.
+ */
+ void reset(uint32_t count) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ ASSERT_EQ(_count, 0UL);
+ ASSERT_GT(count, 0UL);
+
+ _count = count;
+ _payload = T();
+ }
+
+ /**
+ * Set the payload and signal waiter.
+ */
+ void set(T payload) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ if (_count > 0) {
+ --_count;
+ _payload = std::move(payload);
+ _condvar.notify_one();
+ }
+ }
+
+ /**
+ * Waits for duration until N events have occured.
+ *
+ * Returns boost::none on timeout.
+ */
+ boost::optional<T> wait_for(Milliseconds duration) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ if (!_condvar.wait_for(
+ lock, duration.toSystemDuration(), [this]() { return _count == 0; })) {
+ return {};
+ }
+
+ return _payload;
+ }
+
+private:
+ // Condition variable to signal consumer
+ stdx::condition_variable _condvar;
+
+ // Lock for condition variable and to protect state
+ stdx::mutex _mutex;
+
+ // Count to wait fore
+ uint32_t _count;
+
+ // Provided payload
+ T _payload;
+};
+
class FreeMonNetworkInterfaceMock : public FreeMonNetworkInterface {
public:
struct Options {
+ // If sync = true, then execute the callback immediately and the subsequent future chain
+ // This allows us to ensure the follow up functions to a network request are executed
+ // before anything else is processed by FreeMonProcessor
+ bool doSync{false};
+
+ // Faults to inject for registration
bool failRegisterHttp{false};
bool invalidRegister{false};
+ bool haltRegister{false};
+
+ // Faults to inject for metrics
+ bool haltMetrics{false};
+ bool fail2MetricsUploads{false};
+ bool permanentlyDeleteAfter3{false};
};
explicit FreeMonNetworkInterfaceMock(executor::ThreadPoolTaskExecutor* threadPool,
Options options)
- : _threadPool(threadPool), _options(options) {}
+ : _threadPool(threadPool), _options(options), _countdownMetrics(0) {}
~FreeMonNetworkInterfaceMock() final = default;
Future<FreeMonRegistrationResponse> sendRegistrationAsync(
@@ -92,77 +245,146 @@ public:
Promise<FreeMonRegistrationResponse> promise;
auto future = promise.getFuture();
- auto shared_promise = promise.share();
+ if (_options.doSync) {
+ promise.setFrom(doRegister(req));
+ } else {
+ auto shared_promise = promise.share();
- auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this](
- const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+ auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this](
+ const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
- if (_options.failRegisterHttp) {
- shared_promise.setError(
- Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure"));
- return;
- }
+ auto swResp = doRegister(req);
+ if (!swResp.isOK()) {
+ shared_promise.setError(swResp.getStatus());
+ } else {
+ shared_promise.emplaceValue(swResp.getValue());
+ }
+
+ });
- auto resp = FreeMonRegistrationResponse();
- resp.setVersion(1);
+ ASSERT_OK(swSchedule.getStatus());
+ }
- if (_options.invalidRegister) {
- resp.setVersion(42);
- }
+ return future;
+ }
- if (req.getId().is_initialized()) {
- resp.setId(req.getId().get());
- } else {
- resp.setId(UUID::gen().toString());
- }
+ StatusWith<FreeMonRegistrationResponse> doRegister(const FreeMonRegistrationRequest& req) {
- resp.setReportingInterval(1);
+ if (_options.failRegisterHttp) {
+ return Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure");
+ }
- shared_promise.emplaceValue(resp);
- });
- ASSERT_OK(swSchedule.getStatus());
+ auto resp = FreeMonRegistrationResponse();
+ resp.setVersion(1);
- return future;
+ if (_options.invalidRegister) {
+ resp.setVersion(42);
+ }
+
+ resp.setId("regId123");
+
+ if (_options.haltRegister) {
+ resp.setHaltMetricsUploading(true);
+ }
+
+ resp.setReportingInterval(1);
+
+ return resp;
}
+
Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) final {
log() << "Sending Metrics ...";
- ASSERT_FALSE(req.getId().empty());
_metrics.addAndFetch(1);
Promise<FreeMonMetricsResponse> promise;
auto future = promise.getFuture();
- auto shared_promise = promise.share();
+ if (_options.doSync) {
+ promise.setFrom(doMetrics(req));
+ } else {
+ auto shared_promise = promise.share();
- auto swSchedule = _threadPool->scheduleWork(
- [shared_promise, req](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
- auto resp = FreeMonMetricsResponse();
- resp.setVersion(1);
- resp.setReportingInterval(1);
+ auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this](
+ const executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
+
+ auto swResp = doMetrics(req);
+ if (!swResp.isOK()) {
+ shared_promise.setError(swResp.getStatus());
+ } else {
+ shared_promise.emplaceValue(swResp.getValue());
+ }
- shared_promise.emplaceValue(resp);
});
- ASSERT_OK(swSchedule.getStatus());
+ ASSERT_OK(swSchedule.getStatus());
+ }
return future;
}
+ StatusWith<FreeMonMetricsResponse> doMetrics(const FreeMonMetricsRequest& req) {
+ auto cdr = req.getMetrics();
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_metricsLock);
+ auto metrics = decompressMetrics(cdr);
+ _lastMetrics = metrics;
+ _countdownMetrics.set(metrics);
+ }
+
+ if (_options.fail2MetricsUploads && _metrics.loadRelaxed() < 3) {
+ return Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure");
+ }
+
+ auto resp = FreeMonMetricsResponse();
+ resp.setVersion(1);
+ resp.setReportingInterval(1);
+
+ resp.setId("metricsId456"_sd);
+
+ if (_options.haltMetrics) {
+ resp.setHaltMetricsUploading(true);
+ }
+
+ if (_options.permanentlyDeleteAfter3 && _metrics.loadRelaxed() > 3) {
+ resp.setPermanentlyDelete(true);
+ }
+
+ return resp;
+ }
+
int32_t getRegistersCalls() const {
return _registers.load();
}
+
int32_t getMetricsCalls() const {
return _metrics.load();
}
+ boost::optional<std::vector<BSONObj>> waitMetricsCalls(uint32_t count, Milliseconds wait) {
+ _countdownMetrics.reset(count);
+ return _countdownMetrics.wait_for(wait);
+ }
+
+ std::vector<BSONObj> getLastMetrics() {
+ stdx::lock_guard<stdx::mutex> lock(_metricsLock);
+ return _lastMetrics;
+ }
+
+
private:
AtomicInt32 _registers;
AtomicInt32 _metrics;
executor::ThreadPoolTaskExecutor* _threadPool;
+ stdx::mutex _metricsLock;
+ std::vector<BSONObj> _lastMetrics;
+
Options _options;
+
+ CountdownLatchResult<std::vector<BSONObj>> _countdownMetrics;
};
class FreeMonControllerTest : public ServiceContextMongoDTest {
@@ -271,10 +493,60 @@ TEST(FreeMonRetryTest, TestRegistration) {
counter.reset();
}
+
+ // Validate max timeout
+ for (int j = 0; j < 3; j++) {
+ // Fail requests
+ for (int i = 1; i <= 163; ++i) {
+ ASSERT_TRUE(counter.incrementError());
+ }
+ ASSERT_FALSE(counter.incrementError());
+
+ counter.reset();
+ }
+}
+
+// Positive: Ensure deadlines sort properly
+TEST(FreeMonRetryTest, TestMetrics) {
+ PseudoRandom random(0);
+ MetricsRetryCounter counter(random);
+ counter.reset();
+
+ ASSERT_EQ(counter.getNextDuration(), Seconds(1));
+ ASSERT_EQ(counter.getNextDuration(), Seconds(1));
+
+ int32_t minTime = 1;
+ for (int j = 0; j < 3; j++) {
+ // Fail requests
+ for (int i = 0; i <= 6; ++i) {
+ ASSERT_TRUE(counter.incrementError());
+
+ int64_t base = pow(2, i);
+ ASSERT_RANGE(Seconds(base), Seconds(minTime / 2), Seconds(minTime));
+ }
+
+ ASSERT_TRUE(counter.incrementError());
+ ASSERT_RANGE(Seconds(64), Seconds(minTime / 2), Seconds(minTime));
+ ASSERT_TRUE(counter.incrementError());
+ ASSERT_RANGE(Seconds(64), Seconds(minTime / 2), Seconds(minTime));
+
+ counter.reset();
+ }
+
+ // Validate max timeout
+ for (int j = 0; j < 3; j++) {
+ // Fail requests
+ for (int i = 1; i < 9456; ++i) {
+ ASSERT_TRUE(counter.incrementError());
+ }
+ ASSERT_FALSE(counter.incrementError());
+
+ counter.reset();
+ }
}
// Positive: Ensure the response is validated correctly
-TEST(FreeMonProcessorTest, TestResponseValidation) {
+TEST(FreeMonProcessorTest, TestRegistrationResponseValidation) {
ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse(
IDLParserErrorContext("foo"),
BSON("version" << 1LL << "haltMetricsUploading" << false << "id"
@@ -370,20 +642,220 @@ TEST(FreeMonProcessorTest, TestResponseValidation) {
<< (60LL * 60 * 24 + 1LL)))));
}
+
+// Positive: Ensure the response is validated correctly
+TEST(FreeMonProcessorTest, TestMetricsResponseValidation) {
+ ASSERT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: bad protocol version
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 42LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: halt uploading
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << true << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large registartation id
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << std::string(5000, 'a')
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large URL
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(
+ FreeMonMetricsResponse::parse(IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false
+
+ << "permanentlyDelete"
+ << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << std::string(5000, 'b')
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: large message
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << std::string(5000, 'c')
+ << "reportingInterval"
+ << 1LL))));
+
+ // Negative: too small a reporting interval
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << 0LL))));
+
+ // Negative: too large a reporting interval
+ ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse(
+ IDLParserErrorContext("foo"),
+ BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false
+ << "id"
+ << "mock123"
+ << "informationalURL"
+ << "http://www.example.com/123"
+ << "message"
+ << "msg456"
+ << "reportingInterval"
+ << (60LL * 60 * 24 + 1LL)))));
+}
+
+/**
+ * Fluent class that encapsulates how many turns of a crank is needed to do a particular operation.
+ *
+ * All commands take 1 turn except registerCommand and metricsSend since these have a HTTP send an
+ * HTTP receive.
+ */
+class Turner {
+public:
+ Turner() = default;
+
+ Turner& registerServer() {
+ return inc(1, 1);
+ }
+
+ Turner& registerCommand(size_t count = 1) {
+ return inc(2, count);
+ }
+
+ Turner& unRegisterCommand() {
+ return inc(1, 1);
+ }
+
+ Turner& collect(size_t count = 1) {
+ return inc(1, count);
+ }
+
+ Turner& metricsSend(size_t count = 1) {
+ return inc(2, count);
+ }
+
+ operator size_t() {
+ return _count;
+ }
+
+private:
+ Turner& inc(size_t perOperatioCost, size_t numberOfOperations) {
+ _count += (perOperatioCost * numberOfOperations);
+ return *this;
+ }
+
+private:
+ size_t _count;
+};
+
+/**
+ * Utility class to manage controller setup and lifecycle for testing.
+ */
+struct ControllerHolder {
+ ControllerHolder(executor::ThreadPoolTaskExecutor* pool,
+ FreeMonNetworkInterfaceMock::Options opts,
+ bool useCrankForTest = true) {
+ auto registerCollectorUnique = stdx::make_unique<FreeMonMetricsCollectorMock>();
+ auto metricsCollectorUnique = stdx::make_unique<FreeMonMetricsCollectorMock>();
+
+ // If we want to manually turn the crank the queue, we must process the messages
+ // synchronously
+ if (useCrankForTest) {
+ opts.doSync = true;
+ }
+
+ ASSERT_EQ(opts.doSync, useCrankForTest);
+
+ auto networkUnique =
+ std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock(pool, opts));
+ network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get());
+ controller = std::make_unique<FreeMonController>(std::move(networkUnique), useCrankForTest);
+
+ registerCollector = registerCollectorUnique.get();
+ metricsCollector = metricsCollectorUnique.get();
+
+ controller->addRegistrationCollector(std::move(registerCollectorUnique));
+ controller->addMetricsCollector(std::move(metricsCollectorUnique));
+ }
+
+ ~ControllerHolder() {
+ controller->stop();
+ }
+
+ FreeMonController* operator->() {
+ return controller.get();
+ }
+
+ FreeMonMetricsCollectorMock* registerCollector;
+ FreeMonMetricsCollectorMock* metricsCollector;
+ FreeMonNetworkInterfaceMock* network;
+
+ std::unique_ptr<FreeMonController> controller;
+};
+
// Positive: Test Register works
TEST_F(FreeMonControllerTest, TestRegister) {
- // FreeMonNetworkInterfaceMock network;
- FreeMonController controller(
- std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock(
- _mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options())));
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
- controller.start(RegistrationType::DoNotRegister);
+ controller->start(RegistrationType::DoNotRegister);
- ASSERT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(5))));
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().registerCommand());
ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty());
- controller.stop();
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 0UL);
}
// Negatve: Test Register times out if network stack drops messages
@@ -391,19 +863,17 @@ TEST_F(FreeMonControllerTest, TestRegisterTimeout) {
FreeMonNetworkInterfaceMock::Options opts;
opts.failRegisterHttp = true;
- auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>(
- new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts));
- auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get());
- FreeMonController controller(std::move(networkUnique));
- controller.start(RegistrationType::DoNotRegister);
+ ControllerHolder controller(_mockThreadPool.get(), opts);
- ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15))));
+ controller->start(RegistrationType::DoNotRegister);
- ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized());
- ASSERT_GTE(network->getRegistersCalls(), 2);
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+ controller->turnCrankForTest(Turner().registerCommand(2));
- controller.stop();
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::pending);
+ ASSERT_GTE(controller.network->getRegistersCalls(), 2);
+ ASSERT_GTE(controller.registerCollector->count(), 2UL);
}
// Negatve: Test Register times out if the registration is wrong
@@ -411,20 +881,345 @@ TEST_F(FreeMonControllerTest, TestRegisterFail) {
FreeMonNetworkInterfaceMock::Options opts;
opts.invalidRegister = true;
- auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>(
- new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts));
- auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get());
- FreeMonController controller(std::move(networkUnique));
+ ControllerHolder controller(_mockThreadPool.get(), opts, false);
+
+ controller->start(RegistrationType::DoNotRegister);
+
+ ASSERT_NOT_OK(controller->registerServerCommand(duration_cast<Milliseconds>(Seconds(15))));
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled);
+ ASSERT_EQ(controller.network->getRegistersCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+}
+
+// Positive: Ensure registration halts
+TEST_F(FreeMonControllerTest, TestRegisterHalts) {
+
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.haltRegister = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts);
+
+ controller->start(RegistrationType::DoNotRegister);
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+ controller->turnCrankForTest(Turner().registerCommand());
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled);
+ ASSERT_EQ(controller.network->getRegistersCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+}
+
+// Positive: Test Metrics works on server register
+TEST_F(FreeMonControllerTest, TestMetrics) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ controller->start(RegistrationType::RegisterOnStart);
+
+ controller->turnCrankForTest(
+ Turner().registerServer().registerCommand().collect(2).metricsSend());
+
+ ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty());
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 1UL);
+}
+
+
+// Positive: Test Metrics is collected but no registration happens on empty storage
+TEST_F(FreeMonControllerTest, TestMetricsWithEmptyStorage) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(Turner().registerServer().collect(4));
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 0);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 0);
+
+ ASSERT_EQ(controller.registerCollector->count(), 0UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 4UL);
+}
+
+FreeMonStorageState initStorage(StorageStateEnum e) {
+ FreeMonStorageState storage;
+ storage.setVersion(1UL);
+
+ storage.setRegistrationId("Foo");
+ storage.setState(e);
+ storage.setInformationalURL("http://www.example.com");
+ storage.setMessage("Hello World");
+ storage.setUserReminder("");
+ return storage;
+}
+
+// Positive: Test Metrics is collected and implicit registration happens when storage is initialized
+TEST_F(FreeMonControllerTest, TestMetricsWithEnabledStorage) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::enabled));
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(
+ Turner().registerServer().registerCommand().collect(2).metricsSend());
+
+ ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty());
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 1UL);
+}
+
+// Positive: Test Metrics is collected but no registration happens on disabled storage
+TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorage) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled));
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(Turner().registerServer().collect(4));
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 0);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 0);
+
+ ASSERT_EQ(controller.registerCollector->count(), 0UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 4UL);
+}
+
+
+// Positive: Test Metrics is collected but no registration happens on disabled storage until user
+// registers
+TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorageThenRegister) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled));
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(Turner().registerServer().collect(4));
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend());
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 4UL + 2UL);
+}
+
+// Positive: Test Metrics is collected but no registration happens, then register, then Unregister,
+// and finally register again
+TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorageThenRegisterAndReregister) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled));
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(Turner().registerServer().collect(4));
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend());
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::enabled);
- controller.start(RegistrationType::DoNotRegister);
+ ASSERT_OK(controller->unregisterServerCommand(Milliseconds::min()));
- ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15))));
+ controller->turnCrankForTest(Turner().unRegisterCommand().collect(3));
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::disabled);
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend());
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::enabled);
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 2);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 2UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 4UL + 3UL + 2UL);
+}
+
+// Positive: Test DeRegister cancels a register that is in the middle of retrying
+TEST_F(FreeMonControllerTest, TestMetricsUnregisterCancelsRegister) {
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.failRegisterHttp = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts);
+
+ controller->start(RegistrationType::DoNotRegister);
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+ controller->turnCrankForTest(Turner().registerCommand(2));
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::pending);
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 2);
+ ASSERT_GTE(controller.registerCollector->count(), 2UL);
+
+ ASSERT_OK(controller->unregisterServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().unRegisterCommand());
+
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled);
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 2);
+ ASSERT_GTE(controller.registerCollector->count(), 2UL);
+}
+
+// Positive: Test Metrics halts
+TEST_F(FreeMonControllerTest, TestMetricsHalt) {
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.haltMetrics = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts);
+
+ controller->start(RegistrationType::RegisterOnStart);
+
+ controller->turnCrankForTest(
+ Turner().registerServer().registerCommand().collect(4).metricsSend());
+
+ ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty());
+ ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled);
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 4UL);
+}
+
+
+// Positive: Test Metrics permanently deletes if requested
+TEST_F(FreeMonControllerTest, TestMetricsPermanentlyDelete) {
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.permanentlyDeleteAfter3 = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts);
+
+ controller->start(RegistrationType::RegisterOnStart);
+
+ controller->turnCrankForTest(
+ Turner().registerServer().registerCommand().collect(5).metricsSend(4));
ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized());
- ASSERT_EQ(network->getRegistersCalls(), 1);
- controller.stop();
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 3);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 3UL);
+}
+
+// Positive: ensure registration id rotates
+TEST_F(FreeMonControllerTest, TestRegistrationIdRotatesAfterRegistration) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::enabled));
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+ controller->turnCrankForTest(Turner().registerServer().registerCommand().collect(2));
+
+ // Ensure registration rotated the id
+ ASSERT_EQ(FreeMonStorage::read(_opCtx.get())->getRegistrationId(), "regId123");
+
+ controller->turnCrankForTest(Turner().metricsSend().collect());
+
+ // Ensure metrics rotated the id
+ ASSERT_EQ(FreeMonStorage::read(_opCtx.get())->getRegistrationId(), "metricsId456");
+
+ ASSERT_GTE(controller.network->getRegistersCalls(), 1);
+ ASSERT_GTE(controller.network->getMetricsCalls(), 1);
+
+ ASSERT_EQ(controller.registerCollector->count(), 1UL);
+ ASSERT_GTE(controller.metricsCollector->count(), 1UL);
+}
+
+// Positive: ensure pre-registration metrics batching occurs
+// Positive: ensure we only get two metrics each time
+TEST_F(FreeMonControllerTest, TestPreRegistrationMetricBatching) {
+ ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options());
+
+ controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary);
+
+ controller->turnCrankForTest(Turner().registerServer().collect(3));
+
+ ASSERT_OK(controller->registerServerCommand(Milliseconds::min()));
+
+ controller->turnCrankForTest(Turner().registerCommand().collect(1));
+
+ controller->turnCrankForTest(Turner().metricsSend().collect(1));
+
+ // Ensure we sent all the metrics batched before registration
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 4UL);
+
+ controller->turnCrankForTest(Turner().metricsSend().collect(1));
+
+ // Ensure we only send 2 metrics in the normal happy case
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL);
+}
+
+// Negative: Test metrics buffers on failure, and retries
+TEST_F(FreeMonControllerTest, TestMetricBatchingOnError) {
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.fail2MetricsUploads = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts);
+
+ controller->start(RegistrationType::RegisterOnStart);
+
+ controller->turnCrankForTest(Turner().registerServer().registerCommand().collect(2));
+
+ controller->turnCrankForTest(Turner().metricsSend().collect());
+
+ // Ensure we sent all the metrics batched before registration
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL);
+
+ controller->turnCrankForTest(Turner().metricsSend().collect());
+
+ // Ensure we resent all the failed metrics
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 3UL);
+}
+
+// Negative: Test metrics buffers on failure, and retries and ensure 2 metrics occurs after a blip
+// of an error
+// Note: this test operates in real-time because it needs to test multiple retries matched with
+// metrics collection.
+TEST_F(FreeMonControllerTest, TestMetricBatchingOnErrorRealtime) {
+ FreeMonNetworkInterfaceMock::Options opts;
+ opts.fail2MetricsUploads = true;
+ ControllerHolder controller(_mockThreadPool.get(), opts, false);
+
+ controller->start(RegistrationType::RegisterOnStart);
+
+ // Ensure the first upload sends 2 samples
+ ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized());
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL);
+
+ // Ensure the second upload sends 3 samples because first failed
+ ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized());
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 3UL);
+
+ // Ensure the third upload sends 5 samples because second failed
+ // Since the second retry is 2s, we collected 2 samples
+ ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized());
+ ASSERT_GTE(controller.network->getLastMetrics().size(), 4UL);
+
+ // Ensure the fourth upload sends 2 samples
+ ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized());
+ ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL);
}
+// TODO: Positive: ensure optional fields are rotated
+
+// TODO: Positive: Test Metrics works on command register on primary
+// TODO: Positive: Test Metrics works on startup register on secondary
+// TODO: Positive: Test Metrics works on secondary after opObserver register
+// TODO: Positive: Test Metrics works on secondary after opObserver de-register
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h
index b7b514ee775..2247532b3be 100644
--- a/src/mongo/db/free_mon/free_mon_message.h
+++ b/src/mongo/db/free_mon/free_mon_message.h
@@ -56,25 +56,39 @@ enum class FreeMonMessageType {
RegisterCommand,
/**
- * Internal: Generated when an async HTTP request completes succesfully.
+ * Internal: Generated when an async registration HTTP request completes succesfully.
*/
AsyncRegisterComplete,
/**
- * Internal: Generated when an async HTTP request completes with an error.
+ * Internal: Generated when an async registration HTTP request completes with an error.
*/
AsyncRegisterFail,
/**
- * Unregister server from server command.
- */
+ * Unregister server from server command.
+ */
UnregisterCommand,
- // TODO - add metrics messages
- // MetricsCollect - Cloud wants the "wait" time to calculated when the message processing
- // starts, not ends
- // AsyncMetricsComplete,
- // AsyncMetricsFail,
+ /**
+ * Internal: Collect metrics and buffer them in-memory
+ */
+ MetricsCollect,
+
+ /**
+ * Internal: Send metrics to the cloud endpoint by beginning an async HTTP request.
+ */
+ MetricsSend,
+
+ /**
+ * Internal: Generated when an async metrics HTTP request completes succesfully.
+ */
+ AsyncMetricsComplete,
+
+ /**
+ * Internal: Generated when an async metrics HTTP request completes with an error.
+ */
+ AsyncMetricsFail,
// TODO - add replication messages
// OnPrimary,
@@ -178,6 +192,17 @@ struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterFail> {
using payload_type = Status;
};
+template <>
+struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncMetricsComplete> {
+ using payload_type = FreeMonMetricsResponse;
+};
+
+template <>
+struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncMetricsFail> {
+ using payload_type = Status;
+};
+
+
/**
* Message with a generic payload based on the type of message.
*/
@@ -262,31 +287,52 @@ private:
};
/**
- * Custom waitable message for Register Command message.
+ * For the messages that the caller needs to wait on, this provides a mechanism to wait on messages
+ * to be processed.
+*/
+template <FreeMonMessageType typeT>
+struct FreeMonWaitablePayloadForMessage {
+ using payload_type = void;
+};
+
+template <>
+struct FreeMonWaitablePayloadForMessage<FreeMonMessageType::RegisterCommand> {
+ using payload_type = std::vector<std::string>;
+};
+
+template <>
+struct FreeMonWaitablePayloadForMessage<FreeMonMessageType::UnregisterCommand> {
+ // The parameter is unused but most not be void.
+ using payload_type = bool;
+};
+
+/**
+ * Message with a generic payload based on the type of message.
*/
-class FreeMonRegisterCommandMessage : public FreeMonMessage {
+template <FreeMonMessageType typeT>
+class FreeMonWaitableMessageWithPayload : public FreeMonMessage {
public:
+ using payload_type = typename FreeMonWaitablePayloadForMessage<typeT>::payload_type;
+
/**
* Create a message that should processed immediately.
*/
- static std::shared_ptr<FreeMonRegisterCommandMessage> createNow(
- const std::vector<std::string>& tags) {
- return std::make_shared<FreeMonRegisterCommandMessage>(tags, Date_t::min());
+ static std::shared_ptr<FreeMonWaitableMessageWithPayload> createNow(payload_type t) {
+ return std::make_shared<FreeMonWaitableMessageWithPayload>(t, Date_t::min());
}
/**
- * Create a message that should processed after the specified deadline.
+ * Create a message that should processed immediately.
*/
- static std::shared_ptr<FreeMonRegisterCommandMessage> createWithDeadline(
- const std::vector<std::string>& tags, Date_t deadline) {
- return std::make_shared<FreeMonRegisterCommandMessage>(tags, deadline);
+ static std::shared_ptr<FreeMonWaitableMessageWithPayload> createWithDeadline(payload_type t,
+ Date_t deadline) {
+ return std::make_shared<FreeMonWaitableMessageWithPayload>(t, deadline);
}
-
/**
- * Get tags.
+ * Get message payload.
*/
- const std::vector<std::string>& getTags() const {
- return _tags;
+ const payload_type& getPayload() const {
+ return _t;
}
/**
@@ -306,15 +352,17 @@ public:
}
public:
- FreeMonRegisterCommandMessage(std::vector<std::string> tags, Date_t deadline)
- : FreeMonMessage(FreeMonMessageType::RegisterCommand, deadline), _tags(std::move(tags)) {}
+ FreeMonWaitableMessageWithPayload(payload_type t, Date_t deadline)
+ : FreeMonMessage(typeT, deadline), _t(std::move(t)) {}
private:
+ // Message payload
+ payload_type _t;
+
// WaitaleResult to notify caller
WaitableResult _waitable{};
-
- // Tags
- const std::vector<std::string> _tags;
};
+using FreeMonRegisterCommandMessage =
+ FreeMonWaitableMessageWithPayload<FreeMonMessageType::RegisterCommand>;
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp
index 4201d4e39ba..df72066aabf 100644
--- a/src/mongo/db/free_mon/free_mon_mongod.cpp
+++ b/src/mongo/db/free_mon/free_mon_mongod.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/free_mon/free_mon_mongod.h"
#include <mutex>
+#include <snappy.h>
#include <string>
#include "mongo/base/data_type_validated.h"
diff --git a/src/mongo/db/free_mon/free_mon_options.cpp b/src/mongo/db/free_mon/free_mon_options.cpp
index 81293405b59..c99a899ccd9 100644
--- a/src/mongo/db/free_mon/free_mon_options.cpp
+++ b/src/mongo/db/free_mon/free_mon_options.cpp
@@ -70,7 +70,6 @@ StatusWith<EnableCloudStateEnum> EnableCloudState_parse(StringData value) {
return EnableCloudStateEnum::kRuntime;
}
- // TODO
return Status(ErrorCodes::InvalidOptions, "Unrecognized state");
}
diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp
index 5f3e0121914..53e60ed316d 100644
--- a/src/mongo/db/free_mon/free_mon_processor.cpp
+++ b/src/mongo/db/free_mon/free_mon_processor.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/free_mon/free_mon_processor.h"
#include <functional>
+#include <numeric>
+#include <snappy.h>
#include <tuple>
#include <utility>
@@ -57,6 +59,8 @@ constexpr auto kInformationalURLMaxLength = 4096;
constexpr auto kInformationalMessageMaxLength = 4096;
constexpr auto kUserReminderMaxLength = 4096;
+constexpr Seconds kDefaultMetricsGatherInterval(1);
+
constexpr auto kReportingIntervalMinutesMin = 1;
constexpr auto kReportingIntervalMinutesMax = 60 * 60 * 24;
@@ -80,7 +84,6 @@ bool RegistrationRetryCounter::incrementError() {
_current = _base + Seconds(randomJitter(_random, kStage1JitterMin, kStage1JitterMax));
++_retryCount;
} else {
- _base = _base;
_current = _base + Seconds(randomJitter(_random, kStage2JitterMin, kStage2JitterMax));
}
@@ -93,6 +96,43 @@ bool RegistrationRetryCounter::incrementError() {
return true;
}
+void MetricsRetryCounter::reset() {
+ _current = _min;
+ _base = _min;
+ _retryCount = 0;
+ _total = Hours(0);
+}
+
+bool MetricsRetryCounter::incrementError() {
+ _base = static_cast<int>(pow(2, std::min(6, static_cast<int>(_retryCount)))) * _min;
+ _current = _base + Seconds(randomJitter(_random, _min.count() / 2, _min.count()));
+ ++_retryCount;
+
+ _total += _current;
+
+ if (_total > kDurationMax) {
+ return false;
+ }
+
+ return true;
+}
+
+FreeMonProcessor::FreeMonProcessor(FreeMonCollectorCollection& registration,
+ FreeMonCollectorCollection& metrics,
+ FreeMonNetworkInterface* network,
+ bool useCrankForTest)
+ : _registration(registration),
+ _metrics(metrics),
+ _network(network),
+ _random(Date_t::now().asInt64()),
+ _registrationRetry(_random),
+ _metricsRetry(_random),
+ _metricsGatherInterval(kDefaultMetricsGatherInterval),
+ _queue(useCrankForTest) {
+ _registrationRetry.reset();
+ _metricsRetry.reset();
+}
+
void FreeMonProcessor::enqueue(std::shared_ptr<FreeMonMessage> msg) {
_queue.enqueue(std::move(msg));
}
@@ -101,6 +141,14 @@ void FreeMonProcessor::stop() {
_queue.stop();
}
+void FreeMonProcessor::turnCrankForTest(size_t countMessagesToIgnore) {
+ _countdown.reset(countMessagesToIgnore);
+
+ _queue.turnCrankForTest(countMessagesToIgnore);
+
+ _countdown.wait();
+}
+
void FreeMonProcessor::run() {
try {
@@ -114,10 +162,12 @@ void FreeMonProcessor::run() {
return;
}
+ auto msg = item.get();
+
// Do work here
- switch (item.get()->getType()) {
+ switch (msg->getType()) {
case FreeMonMessageType::RegisterCommand: {
- doCommandRegister(client, item.get());
+ doCommandRegister(client, msg);
break;
}
case FreeMonMessageType::RegisterServer: {
@@ -125,7 +175,13 @@ void FreeMonProcessor::run() {
client,
checked_cast<
FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>*>(
- item.get().get()));
+ msg.get()));
+ break;
+ }
+ case FreeMonMessageType::UnregisterCommand: {
+ doCommandUnregister(client,
+ checked_cast<FreeMonWaitableMessageWithPayload<
+ FreeMonMessageType::UnregisterCommand>*>(msg.get()));
break;
}
case FreeMonMessageType::AsyncRegisterComplete: {
@@ -133,7 +189,7 @@ void FreeMonProcessor::run() {
client,
checked_cast<
FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>*>(
- item.get().get()));
+ msg.get()));
break;
}
case FreeMonMessageType::AsyncRegisterFail: {
@@ -141,16 +197,39 @@ void FreeMonProcessor::run() {
client,
checked_cast<
FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>*>(
- item.get().get()));
+ msg.get()));
break;
}
- case FreeMonMessageType::UnregisterCommand: {
- doUnregister(client);
+ case FreeMonMessageType::MetricsCollect: {
+ doMetricsCollect(client);
+ break;
+ }
+ case FreeMonMessageType::MetricsSend: {
+ doMetricsSend(client);
+ break;
+ }
+ case FreeMonMessageType::AsyncMetricsComplete: {
+ doAsyncMetricsComplete(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>*>(
+ msg.get()));
+ break;
+ }
+ case FreeMonMessageType::AsyncMetricsFail: {
+ doAsyncMetricsFail(
+ client,
+ checked_cast<
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>*>(
+ msg.get()));
break;
}
default:
MONGO_UNREACHABLE;
}
+
+ // Record that we have finished processing the message for testing purposes.
+ _countdown.countDown();
}
} catch (...) {
// Stop the queue
@@ -203,6 +282,8 @@ void FreeMonProcessor::writeState(Client* client) {
// If our in-memory copy matches the last read, then write it to disk
if (state == _lastReadState) {
FreeMonStorage::replace(optCtx.get(), _state);
+
+ _lastReadState = _state;
}
}
}
@@ -215,7 +296,7 @@ void FreeMonProcessor::doServerRegister(
if (msg->getPayload().first == RegistrationType::RegisterOnStart) {
enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
} else if (msg->getPayload().first == RegistrationType::RegisterAfterOnTransitionToPrimary) {
- // Check if we need to wait to become primary
+ // Check if we need to wait to become primary:
// If the 'admin.system.version' has content, do not wait and just re-register
// If the collection is empty, wait until we become primary
// If we become secondary, OpObserver hooks will tell us our registration id
@@ -225,17 +306,26 @@ void FreeMonProcessor::doServerRegister(
// Check if there is an existing document
auto state = FreeMonStorage::read(optCtx.get());
- // If there is no document, we may be in a replica set and may need to register after
- // becoming primary
- // since we cannot record the registration id until after becoming primary
+ // If there is no document, we may be:
+ // 1. in a replica set and may need to register after becoming primary since we cannot
+ // record the registration id until after becoming primary
+ // 2. a standalone which has never been registered
+ //
if (!state.is_initialized()) {
- // TODO: hook OnTransitionToPrimary instead of this hack
- enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ // TODO: hook OnTransitionToPrimary
} else {
- // If we have state, then we can do the normal register on startup
- enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ // We are standalone, if we have a registration id, then send a registration
+ // notification, else wait for the user to register us
+ if (state.get().getState() == StorageStateEnum::enabled) {
+ enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
+ }
}
+ } else {
+ MONGO_UNREACHABLE;
}
+
+ // Enqueue the first metrics gather
+ enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsCollect));
}
namespace {
@@ -296,13 +386,13 @@ void FreeMonProcessor::doCommandRegister(Client* client,
req.setVersion(kProtocolVersion);
- if (!msg->getTags().empty()) {
+ if (!msg->getPayload().empty()) {
// Cache the tags for subsequent retries
- _tags = msg->getTags();
+ _tags = msg->getPayload();
}
if (!_tags.empty()) {
- req.setTag(transformVector(msg->getTags()));
+ req.setTag(transformVector(msg->getPayload()));
}
// Collect the data
@@ -310,6 +400,11 @@ void FreeMonProcessor::doCommandRegister(Client* client,
req.setPayload(std::get<0>(collect));
+ // Record that the registration is pending
+ _state.setState(StorageStateEnum::pending);
+
+ writeState(client);
+
// Send the async request
_futureRegistrationResponse = doAsyncCallback<FreeMonRegistrationResponse>(
this,
@@ -398,6 +493,75 @@ void FreeMonProcessor::notifyPendingRegisters(const Status s) {
_pendingRegisters.clear();
}
+
+Status FreeMonProcessor::validateMetricsResponse(const FreeMonMetricsResponse& resp) {
+ // Any validation failure stops registration from proceeding to upload
+ if (resp.getVersion() != kProtocolVersion) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Unexpected metrics response protocol version, expected '"
+ << kProtocolVersion
+ << "', received '"
+ << resp.getVersion()
+ << "'");
+ }
+
+ if (resp.getId().is_initialized() && resp.getId().get().size() >= kRegistrationIdMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Id is '" << resp.getId().get().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kRegistrationIdMaxLength
+ << "'");
+ }
+
+ if (resp.getInformationalURL().is_initialized() &&
+ resp.getInformationalURL().get().size() >= kInformationalURLMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "InformationURL is '"
+ << resp.getInformationalURL().get().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kInformationalURLMaxLength
+ << "'");
+ }
+
+ if (resp.getMessage().is_initialized() &&
+ resp.getMessage().get().size() >= kInformationalMessageMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Message is '" << resp.getMessage().get().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kInformationalMessageMaxLength
+ << "'");
+ }
+
+ if (resp.getUserReminder().is_initialized() &&
+ resp.getUserReminder().get().size() >= kUserReminderMaxLength) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "UserReminder is '" << resp.getUserReminder().get().size()
+ << "' bytes in length, maximum allowed length is '"
+ << kUserReminderMaxLength
+ << "'");
+ }
+
+ if (resp.getReportingInterval() < kReportingIntervalMinutesMin ||
+ resp.getReportingInterval() > kReportingIntervalMinutesMax) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Reporting Interval '" << resp.getReportingInterval()
+ << "' must be in the range ["
+ << kReportingIntervalMinutesMin
+ << ","
+ << kReportingIntervalMinutesMax
+ << "]");
+ }
+
+ // Did cloud ask us to stop uploading?
+ if (resp.getHaltMetricsUploading()) {
+ return Status(ErrorCodes::FreeMonHttpPermanentFailure,
+ str::stream() << "Halting metrics upload due to response");
+ }
+
+ return Status::OK();
+}
+
+
void FreeMonProcessor::doAsyncRegisterComplete(
Client* client,
const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg) {
@@ -405,12 +569,24 @@ void FreeMonProcessor::doAsyncRegisterComplete(
// Our request is no longer in-progress so delete it
_futureRegistrationResponse.reset();
+ if (_state.getState() != StorageStateEnum::pending) {
+ notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled"));
+
+ return;
+ }
+
auto& resp = msg->getPayload();
Status s = validateRegistrationResponse(resp);
if (!s.isOK()) {
warning() << "Free Monitoring registration halted due to " << s;
+ // Disable on any error
+ _state.setState(StorageStateEnum::disabled);
+
+ // Persist state
+ writeState(client);
+
notifyPendingRegisters(s);
// If validation fails, we do not retry
@@ -431,6 +607,8 @@ void FreeMonProcessor::doAsyncRegisterComplete(
_state.setMessage(resp.getMessage());
_state.setInformationalURL(resp.getInformationalURL());
+ _state.setState(StorageStateEnum::enabled);
+
// Persist state
writeState(client);
@@ -440,9 +618,11 @@ void FreeMonProcessor::doAsyncRegisterComplete(
// Notify waiters
notifyPendingRegisters(Status::OK());
- // TODO: Enqueue next metrics upload
- // enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCallTimer,
- // _registrationRetry.getNextDeadline(client)));
+ log() << "Free Monitoring is Enabled. Frequency: " << resp.getReportingInterval() << " seconds";
+
+ // Enqueue next metrics upload
+ enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend,
+ _registrationRetry.getNextDeadline(client)));
}
void FreeMonProcessor::doAsyncRegisterFail(
@@ -451,6 +631,12 @@ void FreeMonProcessor::doAsyncRegisterFail(
// Our request is no longer in-progress so delete it
_futureRegistrationResponse.reset();
+ if (_state.getState() != StorageStateEnum::pending) {
+ notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled"));
+
+ return;
+ }
+
if (!_registrationRetry.incrementError()) {
// We have exceeded our retry
warning() << "Free Monitoring is abandoning registration after excess retries";
@@ -465,6 +651,167 @@ void FreeMonProcessor::doAsyncRegisterFail(
_tags, _registrationRetry.getNextDeadline(client)));
}
-void FreeMonProcessor::doUnregister(Client* /*client*/) {}
+void FreeMonProcessor::doCommandUnregister(
+ Client* client, FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>* msg) {
+ // Treat this request as idempotent
+ if (_state.getState() != StorageStateEnum::disabled) {
+
+ _state.setState(StorageStateEnum::disabled);
+
+ writeState(client);
+
+ log() << "Free Monitoring is Disabled";
+ }
+
+ msg->setStatus(Status::OK());
+}
+
+void FreeMonProcessor::doMetricsCollect(Client* client) {
+ // Collect the time at the beginning so the time to collect does not affect the schedule
+ Date_t now = client->getServiceContext()->getPreciseClockSource()->now();
+
+ // Collect the data
+ auto collect = _metrics.collect(client);
+
+ _metricsBuffer.push(std::get<0>(collect));
+
+ // Enqueue the next metrics collect based on when we started processing the last collection.
+ enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCollect,
+ now + _metricsGatherInterval));
+}
+
+std::string compressMetrics(MetricsBuffer& buffer) {
+
+ std::vector<char> rawBuffer;
+
+ size_t totalReserve = std::accumulate(
+ buffer.begin(), buffer.end(), 0, [](size_t sum, auto& o) { return sum + o.objsize(); });
+
+ rawBuffer.reserve(totalReserve);
+
+ int count = 0;
+ for (const auto& obj : buffer) {
+ ++count;
+ std::copy(obj.objdata(), obj.objdata() + obj.objsize(), std::back_inserter(rawBuffer));
+ }
+
+ std::string outBuffer;
+
+ snappy::Compress(rawBuffer.data(), rawBuffer.size(), &outBuffer);
+
+ return outBuffer;
+}
+
+void FreeMonProcessor::doMetricsSend(Client* client) {
+ readState(client);
+
+ if (_state.getState() != StorageStateEnum::enabled) {
+ // If we are recently disabled, then stop sending metrics
+ return;
+ }
+
+ // Build outbound request
+ FreeMonMetricsRequest req;
+ invariant(!_state.getRegistrationId().empty());
+
+ req.setVersion(kProtocolVersion);
+ req.setEncoding(MetricsEncodingEnum::snappy);
+
+ req.setId(_state.getRegistrationId());
+
+ // Get the buffered metrics
+ auto metrics = compressMetrics(_metricsBuffer);
+ req.setMetrics(ConstDataRange(metrics.data(), metrics.size()));
+
+ // Send the async request
+ doAsyncCallback<FreeMonMetricsResponse>(
+ this,
+ _network->sendMetricsAsync(req),
+ [this](const auto& resp) {
+ this->enqueue(
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>::createNow(
+ resp));
+ },
+ [this](Status s) {
+ this->enqueue(
+ FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>::createNow(s));
+ });
+}
+
+void FreeMonProcessor::doAsyncMetricsComplete(
+ Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>* msg) {
+
+ auto& resp = msg->getPayload();
+
+ Status s = validateMetricsResponse(resp);
+ if (!s.isOK()) {
+ warning() << "Free Monitoring metrics uploading halted due to " << s;
+
+ // Disable free monitoring on validation errors
+ _state.setState(StorageStateEnum::disabled);
+ writeState(client);
+
+ // If validation fails, we do not retry
+ return;
+ }
+
+ // If cloud said delete, not just halt, so erase state
+ if (resp.getPermanentlyDelete() == true) {
+ auto opCtxUnique = client->makeOperationContext();
+ FreeMonStorage::deleteState(opCtxUnique.get());
+
+ return;
+ }
+
+ // Update in-memory state of buffered metrics
+ // TODO: do we reset only the metrics we send or all pending on success?
+
+ _metricsBuffer.reset();
+ _metricsRetry.setMin(Seconds(resp.getReportingInterval()));
+
+ if (resp.getId().is_initialized()) {
+ _state.setRegistrationId(resp.getId().get());
+ }
+
+ if (resp.getUserReminder().is_initialized()) {
+ _state.setUserReminder(resp.getUserReminder().get());
+ }
+
+ if (resp.getInformationalURL().is_initialized()) {
+ _state.setInformationalURL(resp.getInformationalURL().get());
+ }
+
+ if (resp.getMessage().is_initialized()) {
+ _state.setMessage(resp.getMessage().get());
+ }
+
+ // Persist state
+ writeState(client);
+
+ // Reset retry counter
+ _metricsRetry.reset();
+
+ // Enqueue next metrics upload
+ enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend,
+ _registrationRetry.getNextDeadline(client)));
+}
+
+void FreeMonProcessor::doAsyncMetricsFail(
+ Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>* msg) {
+
+ if (!_metricsRetry.incrementError()) {
+ // We have exceeded our retry
+ warning() << "Free Monitoring is abandoning metrics upload after excess retries";
+ return;
+ }
+
+ LOG(1) << "Free Monitoring Metrics upload failed with status " << msg->getPayload()
+ << ", retrying in " << _metricsRetry.getNextDuration();
+
+ // Enqueue next metrics upload
+ enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend,
+ _metricsRetry.getNextDeadline(client)));
+}
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_processor.h b/src/mongo/db/free_mon/free_mon_processor.h
index ec09286addd..ade74fc09e6 100644
--- a/src/mongo/db/free_mon/free_mon_processor.h
+++ b/src/mongo/db/free_mon/free_mon_processor.h
@@ -29,6 +29,7 @@
#include <boost/optional.hpp>
#include <cstdint>
+#include <deque>
#include <memory>
#include <ratio>
#include <string>
@@ -143,6 +144,125 @@ private:
};
/**
+ * Manage retries for metrics
+ */
+class MetricsRetryCounter : public RetryCounter {
+public:
+ explicit MetricsRetryCounter(PseudoRandom& random) : _random(random) {}
+
+ void reset() final;
+
+ bool incrementError() final;
+
+private:
+ // Random number generator for jitter
+ PseudoRandom& _random;
+
+ // Retry count for stage 1 retry
+ size_t _retryCount{0};
+
+ // Total Seconds we have retried for
+ Seconds _total;
+
+ // Last retry interval without jitter
+ Seconds _base;
+
+ // Max Duration
+ const Hours kDurationMax{7 * 24};
+};
+
+/**
+ * Simple bounded buffer of metrics to upload.
+ */
+class MetricsBuffer {
+public:
+ using container_type = std::deque<BSONObj>;
+
+ /**
+ * Add a metric to the buffer. Oldest metric will be discarded if buffer is at capacity.
+ */
+ void push(BSONObj obj) {
+ if (_queue.size() == kMaxElements) {
+ _queue.pop_front();
+ }
+
+ _queue.push_back(obj);
+ }
+
+ /**
+ * Flush the buffer down to kMinElements entries. The last entries are held for cloud.
+ */
+ void reset() {
+ while (_queue.size() > kMinElements) {
+ _queue.pop_front();
+ }
+ }
+
+ container_type::iterator begin() {
+ return _queue.begin();
+ }
+ container_type::iterator end() {
+ return _queue.end();
+ }
+
+private:
+ // Bounded queue of metrics
+ container_type _queue;
+
+ const size_t kMinElements = 1;
+ const size_t kMaxElements = 10;
+};
+
+/**
+ * Countdown latch for test support in FreeMonProcessor so that a crank can be turned manually.
+ */
+class FreeMonCountdownLatch {
+public:
+ explicit FreeMonCountdownLatch() : _count(0) {}
+
+ /**
+ * Reset countdown latch wait for N events.
+ */
+ void reset(uint32_t count) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ dassert(_count == 0);
+ dassert(count > 0);
+ _count = count;
+ }
+
+ /**
+ * Count down an event.
+ */
+ void countDown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ if (_count > 0) {
+ --_count;
+ _condvar.notify_one();
+ }
+ }
+
+ /**
+ * Wait until the N events specified in reset have occured.
+ */
+ void wait() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condvar.wait(lock, [&] { return _count == 0; });
+ }
+
+private:
+ // mutex to break count and cond var
+ stdx::mutex _mutex;
+
+ // cond var to signal and wait on
+ stdx::condition_variable _condvar;
+
+ // count of events to wait for
+ size_t _count;
+};
+
+
+/**
* Process in an Agent in a Agent/Message Passing model.
*
* Messages are given to it by enqueue, and the Processor processes messages with run().
@@ -151,14 +271,8 @@ class FreeMonProcessor : public std::enable_shared_from_this<FreeMonProcessor> {
public:
FreeMonProcessor(FreeMonCollectorCollection& registration,
FreeMonCollectorCollection& metrics,
- FreeMonNetworkInterface* network)
- : _registration(registration),
- _metrics(metrics),
- _network(network),
- _random(Date_t::now().asInt64()),
- _registrationRetry(_random) {
- _registrationRetry.reset();
- }
+ FreeMonNetworkInterface* network,
+ bool useCrankForTest);
/**
* Enqueue a message to process
@@ -171,6 +285,11 @@ public:
void stop();
/**
+ * Turn the crank of the message queue by ignoring deadlines for N messages.
+ */
+ void turnCrankForTest(size_t countMessagesToIgnore);
+
+ /**
* Processes messages forever
*/
void run();
@@ -180,6 +299,11 @@ public:
*/
static Status validateRegistrationResponse(const FreeMonRegistrationResponse& resp);
+ /**
+ * Validate the metrics response. Public for unit testing.
+ */
+ static Status validateMetricsResponse(const FreeMonMetricsResponse& resp);
+
private:
/**
* Read the state from the database.
@@ -203,9 +327,11 @@ private:
const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg);
/**
- * Process unregistration.
+ * Process unregistration from a command.
*/
- void doUnregister(Client* client);
+ void doCommandUnregister(
+ Client* client,
+ FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>* msg);
/**
* Process a successful HTTP request.
@@ -226,6 +352,29 @@ private:
*/
void notifyPendingRegisters(const Status s);
+ /**
+ * Upload collected metrics.
+ */
+ void doMetricsCollect(Client* client);
+
+ /**
+ * Upload gathered metrics.
+ */
+ void doMetricsSend(Client* client);
+
+ /**
+ * Process a successful HTTP request.
+ */
+ void doAsyncMetricsComplete(
+ Client* client,
+ const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>* msg);
+
+ /**
+ * Process an unsuccessful HTTP request.
+ */
+ void doAsyncMetricsFail(
+ Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>* msg);
+
private:
// Collection of collectors to send on registration
FreeMonCollectorCollection& _registration;
@@ -242,6 +391,15 @@ private:
// Registration Retry logic
RegistrationRetryCounter _registrationRetry;
+ // Metrics Retry logic
+ MetricsRetryCounter _metricsRetry;
+
+ // Interval for gathering metrics
+ Seconds _metricsGatherInterval;
+
+ // Buffer of metrics to upload
+ MetricsBuffer _metricsBuffer;
+
// List of tags from server configuration registration
std::vector<std::string> _tags;
@@ -257,6 +415,9 @@ private:
// Pending update to disk
FreeMonStorageState _state;
+ // Countdown launch to support manual cranking
+ FreeMonCountdownLatch _countdown;
+
// Message queue
FreeMonMessageQueue _queue;
};
diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp
index c8a0b74aa87..d1246b677a5 100644
--- a/src/mongo/db/free_mon/free_mon_queue.cpp
+++ b/src/mongo/db/free_mon/free_mon_queue.cpp
@@ -62,35 +62,79 @@ boost::optional<std::shared_ptr<FreeMonMessage>> FreeMonMessageQueue::dequeue(
return {};
}
- Date_t deadlineCV = Date_t::max();
- if (!_queue.empty()) {
- deadlineCV = _queue.top()->getDeadline();
- } else {
- deadlineCV = clockSource->now() + Hours(24);
- }
+ while (true) {
+ Date_t deadlineCV = Date_t::max();
+ if (_useCrank) {
+ if (!_queue.empty() && _countMessagesIgnored < _countMessagesToIgnore) {
+ // For testing purposes, ignore the deadline
+ deadlineCV = Date_t();
+ } else {
+ deadlineCV = clockSource->now() + Hours(1);
+ }
+ } else {
+ if (!_queue.empty()) {
+ deadlineCV = _queue.top()->getDeadline();
+ } else {
+ deadlineCV = clockSource->now() + Hours(24);
+ }
+ }
+
+ _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() {
+ if (_stop) {
+ return true;
+ }
+
+ if (this->_queue.empty()) {
+ return false;
+ }
+
+ // Always wake in test mode
+ if (_useCrank) {
+ if (_countMessagesIgnored < _countMessagesToIgnore) {
+ return true;
+ } else {
+ dassert(_countMessagesIgnored == _countMessagesToIgnore);
+ return false;
+ }
+ }
+
+ auto deadlineMessage = this->_queue.top()->getDeadline();
+ if (deadlineMessage == Date_t::min()) {
+ return true;
+ }
+
+ auto now = clockSource->now();
+
+ bool check = deadlineMessage < now;
+ return check;
+ });
- _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() {
if (_stop) {
- return true;
+ return {};
}
- if (this->_queue.empty()) {
- return false;
- }
+ // We were woken-up by a message being enqueue, go back to sleep and wait until crank is
+ // installed and turned.
+ if (_useCrank) {
+ if (_countMessagesIgnored == _countMessagesToIgnore) {
+ continue;
+ }
- auto deadlineMessage = this->_queue.top()->getDeadline();
- if (deadlineMessage == Date_t::min()) {
- return true;
+ dassert(_countMessagesIgnored <= _countMessagesToIgnore);
}
- auto now = clockSource->now();
-
- bool check = deadlineMessage < now;
- return check;
- });
+ // If the queue is not empty, return the message
+ // otherwise we need to go back to sleep in the hope we get a message.
+ if (!_queue.empty()) {
+ break;
+ } else if (_useCrank) {
+ dassert(0, "Was asked to wait for more messages then available");
+ }
+ }
- if (_stop || _queue.empty()) {
- return {};
+ _countMessagesIgnored++;
+ if (_useCrank && _countMessagesIgnored == _countMessagesToIgnore && _waitable) {
+ _waitable->set(Status::OK());
}
auto item = _queue.top();
@@ -113,4 +157,20 @@ void FreeMonMessageQueue::stop() {
}
}
+void FreeMonMessageQueue::turnCrankForTest(size_t countMessagesToIgnore) {
+ invariant(_useCrank);
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ _waitable = std::make_unique<WaitableResult>();
+
+ _countMessagesIgnored = 0;
+ _countMessagesToIgnore = countMessagesToIgnore;
+
+ _condvar.notify_one();
+ }
+
+ //_waitable->wait_for(Seconds(10));
+}
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h
index 687e61c13bd..d91aeb9e157 100644
--- a/src/mongo/db/free_mon/free_mon_queue.h
+++ b/src/mongo/db/free_mon/free_mon_queue.h
@@ -58,6 +58,8 @@ struct FreeMonMessageGreater {
*/
class FreeMonMessageQueue {
public:
+ FreeMonMessageQueue(bool useCrankForTest = false) : _useCrank(useCrankForTest) {}
+
/**
* Enqueue a message and wake consumer if needed.
*
@@ -77,6 +79,11 @@ public:
*/
void stop();
+ /**
+ * Turn the crank of the message queue by ignoring deadlines for N messages.
+ */
+ void turnCrankForTest(size_t countMessagesToIgnore);
+
private:
// Condition variable to signal consumer
stdx::condition_variable _condvar;
@@ -93,6 +100,18 @@ private:
std::vector<std::shared_ptr<FreeMonMessage>>,
FreeMonMessageGreater>
_queue;
+
+ // Use manual crank to process messages in-order instead of based on deadlines.
+ bool _useCrank{false};
+
+ // Number of messages to ignore
+ size_t _countMessagesToIgnore{0};
+
+ // Number of messages that have been ignored
+ size_t _countMessagesIgnored{0};
+
+ // Waitable result for testing
+ std::unique_ptr<WaitableResult> _waitable;
};
diff --git a/src/mongo/db/free_mon/free_mon_storage.idl b/src/mongo/db/free_mon/free_mon_storage.idl
index a181d39f23a..2bd4f15185a 100644
--- a/src/mongo/db/free_mon/free_mon_storage.idl
+++ b/src/mongo/db/free_mon/free_mon_storage.idl
@@ -25,6 +25,7 @@ enums:
values:
disabled: disabled
enabled: enabled
+ pending: pending
structs:
FreeMonStorageState:
diff --git a/src/mongo/db/free_mon/http_client_curl.cpp b/src/mongo/db/free_mon/http_client_curl.cpp
index ad78618b9bd..1604c1c4d0f 100644
--- a/src/mongo/db/free_mon/http_client_curl.cpp
+++ b/src/mongo/db/free_mon/http_client_curl.cpp
@@ -140,75 +140,81 @@ private:
static void doPost(SharedPromise<std::vector<uint8_t>> shared_promise,
const std::string& urlString,
const BSONObj& obj) {
- ConstDataRange data(obj.objdata(), obj.objdata() + obj.objsize());
+ try {
+ ConstDataRange data(obj.objdata(), obj.objdata() + obj.objsize());
- ConstDataRangeCursor cdrc(data);
+ ConstDataRangeCursor cdrc(data);
- std::unique_ptr<CURL, void (*)(CURL*)> myHandle(curl_easy_init(), curl_easy_cleanup);
+ std::unique_ptr<CURL, void (*)(CURL*)> myHandle(curl_easy_init(), curl_easy_cleanup);
- if (!myHandle) {
- shared_promise.setError({ErrorCodes::InternalError, "Curl initialization failed"});
- return;
- }
+ if (!myHandle) {
+ shared_promise.setError({ErrorCodes::InternalError, "Curl initialization failed"});
+ return;
+ }
- curl_easy_setopt(myHandle.get(), CURLOPT_URL, urlString.c_str());
- curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1);
+ curl_easy_setopt(myHandle.get(), CURLOPT_URL, urlString.c_str());
+ curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1);
- // Allow http only if test commands are enabled
- if (getTestCommandsEnabled()) {
- curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
- } else {
- curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
- }
+ // Allow http only if test commands are enabled
+ if (getTestCommandsEnabled()) {
+ curl_easy_setopt(
+ myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
+ } else {
+ curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS);
+ }
- curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
- curl_easy_setopt(myHandle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
+ curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP);
+ curl_easy_setopt(myHandle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
- DataBuilder dataBuilder(4096);
+ DataBuilder dataBuilder(4096);
- curl_easy_setopt(myHandle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
- curl_easy_setopt(myHandle.get(), CURLOPT_WRITEDATA, &dataBuilder);
+ curl_easy_setopt(myHandle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+ curl_easy_setopt(myHandle.get(), CURLOPT_WRITEDATA, &dataBuilder);
- curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback);
- curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc);
- curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length());
+ curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback);
+ curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc);
+ curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length());
- // CURLOPT_EXPECT_100_TIMEOUT_MS??
- curl_easy_setopt(myHandle.get(), CURLOPT_CONNECTTIMEOUT, kConnectionTimeoutSeconds);
- curl_easy_setopt(myHandle.get(), CURLOPT_TIMEOUT, kTotalRequestTimeoutSeconds);
+ // CURLOPT_EXPECT_100_TIMEOUT_MS??
+ curl_easy_setopt(myHandle.get(), CURLOPT_CONNECTTIMEOUT, kConnectionTimeoutSeconds);
+ curl_easy_setopt(myHandle.get(), CURLOPT_TIMEOUT, kTotalRequestTimeoutSeconds);
#if LIBCURL_VERSION_NUM > 0x072200
- // Requires >= 7.34.0
- curl_easy_setopt(myHandle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
+ // Requires >= 7.34.0
+ curl_easy_setopt(myHandle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
#endif
- curl_easy_setopt(myHandle.get(), CURLOPT_FOLLOWLOCATION, 0);
+ curl_easy_setopt(myHandle.get(), CURLOPT_FOLLOWLOCATION, 0);
- curl_easy_setopt(myHandle.get(), CURLOPT_NOSIGNAL, 1);
- // TODO: consider making this configurable If server log level > 3
- // curl_easy_setopt(myHandle.get(), CURLOPT_VERBOSE, 1);
- // curl_easy_setopt(myHandle.get(), CURLOPT_DEBUGFUNCTION , ???);
+ curl_easy_setopt(myHandle.get(), CURLOPT_NOSIGNAL, 1);
+ // TODO: consider making this configurable If server log level > 3
+ // curl_easy_setopt(myHandle.get(), CURLOPT_VERBOSE, 1);
+ // curl_easy_setopt(myHandle.get(), CURLOPT_DEBUGFUNCTION , ???);
- curl_slist* chunk = nullptr;
- chunk = curl_slist_append(chunk, "Content-Type: application/octet-stream");
- chunk = curl_slist_append(chunk, "Accept: application/octet-stream");
+ curl_slist* chunk = nullptr;
+ chunk = curl_slist_append(chunk, "Content-Type: application/octet-stream");
+ chunk = curl_slist_append(chunk, "Accept: application/octet-stream");
- // Send the empty expect because we do not need the server to respond with 100-Contine
- chunk = curl_slist_append(chunk, "Expect:");
+ // Send the empty expect because we do not need the server to respond with 100-Contine
+ chunk = curl_slist_append(chunk, "Expect:");
- std::unique_ptr<curl_slist, void (*)(curl_slist*)> chunkHolder(chunk, curl_slist_free_all);
+ std::unique_ptr<curl_slist, void (*)(curl_slist*)> chunkHolder(chunk,
+ curl_slist_free_all);
- curl_easy_setopt(myHandle.get(), CURLOPT_HTTPHEADER, chunk);
+ curl_easy_setopt(myHandle.get(), CURLOPT_HTTPHEADER, chunk);
- CURLcode result = curl_easy_perform(myHandle.get());
- if (result != CURLE_OK) {
- shared_promise.setError({ErrorCodes::OperationFailed,
- str::stream() << "Bad HTTP response from API server: "
- << curl_easy_strerror(result)});
- return;
- }
+ CURLcode result = curl_easy_perform(myHandle.get());
+ if (result != CURLE_OK) {
+ shared_promise.setError({ErrorCodes::OperationFailed,
+ str::stream() << "Bad HTTP response from API server: "
+ << curl_easy_strerror(result)});
+ return;
+ }
- auto d = dataBuilder.getCursor();
- shared_promise.emplaceValue(std::vector<uint8_t>(d.data(), d.data() + d.length()));
+ auto d = dataBuilder.getCursor();
+ shared_promise.emplaceValue(std::vector<uint8_t>(d.data(), d.data() + d.length()));
+ } catch (...) {
+ shared_promise.setError(exceptionToStatus());
+ }
}
private: