diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-20 11:59:06 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2018-04-20 12:38:28 -0400 |
commit | 70e47745a78fffcc20b70f8e09a8b80a2b62e3bf (patch) | |
tree | 81ed086d27b6f84f6a336b270e83687c66ecfc57 | |
parent | 697ed6fa176220f770231ab5f7ac337328a5a53c (diff) | |
download | mongo-70e47745a78fffcc20b70f8e09a8b80a2b62e3bf.tar.gz |
SERVER-34227 FreeMonController - Metrics
19 files changed, 1797 insertions, 208 deletions
diff --git a/jstests/free_mon/free_mon_metrics_halt.js b/jstests/free_mon/free_mon_metrics_halt.js new file mode 100644 index 00000000000..0059b85705b --- /dev/null +++ b/jstests/free_mon/free_mon_metrics_halt.js @@ -0,0 +1,31 @@ +// Ensure free monitoring gives up if metrics returns halt +// +load("jstests/free_mon/libs/free_mon.js"); + +(function() { + 'use strict'; + + let mock_web = new FreeMonWebServer(FAULT_HALT_METRICS_5); + + mock_web.start(); + + let options = { + setParameter: "cloudFreeMonitoringEndpointURL=" + mock_web.getURL(), + enableFreeMonitoring: "on", + verbose: 1, + }; + + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up'); + + mock_web.waitMetrics(6); + + // It gets marked as disabled on halt + const reg = FreeMonGetRegistration(conn); + print(tojson(reg)); + assert.eq(reg.state, "disabled"); + + MongoRunner.stopMongod(conn); + + mock_web.stop(); +})(); diff --git a/jstests/free_mon/free_mon_metrics_perm_del.js b/jstests/free_mon/free_mon_metrics_perm_del.js new file mode 100644 index 00000000000..369a788159d --- /dev/null +++ b/jstests/free_mon/free_mon_metrics_perm_del.js @@ -0,0 +1,31 @@ +// Ensure free monitoring gives up if metrics returns permanently delete +// +load("jstests/free_mon/libs/free_mon.js"); + +(function() { + 'use strict'; + + let mock_web = new FreeMonWebServer(FAULT_PERMANENTLY_DELETE_AFTER_3); + + mock_web.start(); + + let options = { + setParameter: "cloudFreeMonitoringEndpointURL=" + mock_web.getURL(), + enableFreeMonitoring: "on", + verbose: 1, + }; + + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up'); + + mock_web.waitMetrics(4); + + // Make sure the registration document gets removed + const reg = FreeMonGetRegistration(conn); + print(tojson(reg)); + assert.eq(reg, undefined); + + MongoRunner.stopMongod(conn); + + mock_web.stop(); +})(); diff --git a/jstests/free_mon/free_mon_register.js b/jstests/free_mon/free_mon_register.js index 362cc74cb04..5524691a698 100644 --- a/jstests/free_mon/free_mon_register.js +++ b/jstests/free_mon/free_mon_register.js @@ -34,6 +34,13 @@ load("jstests/free_mon/libs/free_mon.js"); assert.eq(last_register.payload.storageEngine.readOnly, false); assert.eq(last_register.payload.isMaster.ok, 1); + mock_web.waitMetrics(2); + + const last_metrics = mock_web.query("last_metrics"); + print(tojson(last_metrics)); + + assert.eq(last_metrics.version, 1); + MongoRunner.stopMongod(conn); mock_web.stop(); diff --git a/jstests/free_mon/libs/free_mon.js b/jstests/free_mon/libs/free_mon.js index 5e507ce3641..68665424c5d 100644 --- a/jstests/free_mon/libs/free_mon.js +++ b/jstests/free_mon/libs/free_mon.js @@ -7,6 +7,8 @@ // SUPPORTED_FAULT_TYPES list in mock_http_server.py const FAULT_FAIL_REGISTER = "fail_register"; const FAULT_INVALID_REGISTER = "invalid_register"; +const FAULT_HALT_METRICS_5 = "halt_metrics_5"; +const FAULT_PERMANENTLY_DELETE_AFTER_3 = "permanently_delete_after_3"; class FreeMonWebServer { /** @@ -138,6 +140,21 @@ class FreeMonWebServer { return stats.registers == count; }, "Failed to web server register", 60 * 1000); } + + /** + * Wait for N metrics calls to be received by web server. + * + * @throws assert.soon() exception + */ + waitMetrics(count) { + const qs = this.queryStats.bind(this); + // Wait for metrics uploads to occur + assert.soon(function() { + const stats = qs(); + print("QS : " + tojson(stats)); + return stats.metrics == count; + }, "Failed to web server metrics", 60 * 1000); + } } /** @@ -155,3 +172,15 @@ function WaitForRegistration(conn) { return da.length != 0; }, "Failed to register", 60 * 1000); } + +/** + * Get registration document. + * + * @param {object} registration document + */ +function FreeMonGetRegistration(conn) { + const admin = conn.getDB("admin"); + const docs = admin.system.version.find({_id: "free_monitoring"}); + const da = docs.toArray(); + return da[0]; +}
\ No newline at end of file diff --git a/jstests/free_mon/libs/mock_http_server.py b/jstests/free_mon/libs/mock_http_server.py index cafdf989550..a90c045177f 100644 --- a/jstests/free_mon/libs/mock_http_server.py +++ b/jstests/free_mon/libs/mock_http_server.py @@ -30,10 +30,18 @@ FAULT_FAIL_REGISTER = "fail_register" """Fault which causes the server to return a response with a document with a bad version.""" FAULT_INVALID_REGISTER = "invalid_register" +"""Fault which causes metrics to return halt after 5 metric uploads have occurred.""" +FAULT_HALT_METRICS_5 = "halt_metrics_5" + +"""Fault which causes metrics to return permanentlyDelete = true after 3 uploads.""" +FAULT_PERMANENTLY_DELETE_AFTER_3 = "permanently_delete_after_3" + # List of supported fault types SUPPORTED_FAULT_TYPES = [ FAULT_FAIL_REGISTER, - FAULT_INVALID_REGISTER + FAULT_INVALID_REGISTER, + FAULT_HALT_METRICS_5, + FAULT_PERMANENTLY_DELETE_AFTER_3, ] # Supported POST URL types @@ -132,14 +140,33 @@ class FreeMonHandler(http.server.BaseHTTPRequestHandler): decoded_doc = bson.BSON.decode(raw_input) last_metrics = dumps(decoded_doc) - data = bson.BSON.encode({ - 'version': bson.int64.Int64(1), - 'haltMetricsUploading': False, - 'permanentlyDelete': False, - 'id': 'mock123', - 'reportingInterval': bson.int64.Int64(1), - 'message': 'Thanks for all the metrics', - }) + if stats.metrics_calls > 5 and fault_type == FAULT_HALT_METRICS_5: + data = bson.BSON.encode({ + 'version': bson.int64.Int64(1), + 'haltMetricsUploading': True, + 'permanentlyDelete': False, + 'id': 'mock123', + 'reportingInterval': bson.int64.Int64(1), + 'message': 'Thanks for all the metrics', + }) + elif stats.metrics_calls > 3 and fault_type == FAULT_PERMANENTLY_DELETE_AFTER_3: + data = bson.BSON.encode({ + 'version': bson.int64.Int64(1), + 'haltMetricsUploading': False, + 'permanentlyDelete': True, + 'id': 'mock123', + 'reportingInterval': bson.int64.Int64(1), + 'message': 'Thanks for all the metrics', + }) + else: + data = bson.BSON.encode({ + 'version': bson.int64.Int64(1), + 'haltMetricsUploading': False, + 'permanentlyDelete': False, + 'id': 'mock123', + 'reportingInterval': bson.int64.Int64(1), + 'message': 'Thanks for all the metrics', + }) # TODO: test what if header is sent first? self._send_header() diff --git a/src/mongo/db/free_mon/SConscript b/src/mongo/db/free_mon/SConscript index 3047553317a..e2b22b7fc80 100644 --- a/src/mongo/db/free_mon/SConscript +++ b/src/mongo/db/free_mon/SConscript @@ -66,7 +66,7 @@ else: ) -env.CppUnitTest( +fmEnv.CppUnitTest( target='free_mon_test', source=[ 'free_mon_controller_test.cpp', diff --git a/src/mongo/db/free_mon/free_mon_commands.cpp b/src/mongo/db/free_mon/free_mon_commands.cpp index 1e78bc5b768..c04af62777e 100644 --- a/src/mongo/db/free_mon/free_mon_commands.cpp +++ b/src/mongo/db/free_mon/free_mon_commands.cpp @@ -123,7 +123,7 @@ public: if (cmd.getAction() == SetFreeMonActionEnum::enable) { optStatus = controller->registerServerCommand(kRegisterSyncTimeout); } else { - optStatus = controller->unregisterServerCommand(); + optStatus = controller->unregisterServerCommand(kRegisterSyncTimeout); } if (optStatus) { diff --git a/src/mongo/db/free_mon/free_mon_controller.cpp b/src/mongo/db/free_mon/free_mon_controller.cpp index a1fcb4fd214..3c50d193d07 100644 --- a/src/mongo/db/free_mon/free_mon_controller.cpp +++ b/src/mongo/db/free_mon/free_mon_controller.cpp @@ -77,8 +77,15 @@ boost::optional<Status> FreeMonController::registerServerCommand(Milliseconds ti return Status::OK(); } -Status FreeMonController::unregisterServerCommand() { - _enqueue(FreeMonMessage::createNow(FreeMonMessageType::UnregisterCommand)); +boost::optional<Status> FreeMonController::unregisterServerCommand(Milliseconds timeout) { + auto msg = + FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>::createNow(true); + _enqueue(msg); + + if (timeout > Milliseconds::min()) { + return msg->wait_for(timeout); + } + return Status::OK(); } @@ -100,7 +107,7 @@ void FreeMonController::start(RegistrationType registrationType) { // Start the agent _processor = std::make_shared<FreeMonProcessor>( - _registrationCollectors, _metricCollectors, _network.get()); + _registrationCollectors, _metricCollectors, _network.get(), _useCrankForTest); _thread = stdx::thread([this] { _processor->run(); }); @@ -144,4 +151,15 @@ void FreeMonController::stop() { _state = State::kDone; } +void FreeMonController::turnCrankForTest(size_t countMessagesToIgnore) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kStarted); + } + + log() << "Turning Crank: " << countMessagesToIgnore; + + _processor->turnCrankForTest(countMessagesToIgnore); +} + } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_controller.h b/src/mongo/db/free_mon/free_mon_controller.h index 69c1f811934..0e3aece0694 100644 --- a/src/mongo/db/free_mon/free_mon_controller.h +++ b/src/mongo/db/free_mon/free_mon_controller.h @@ -50,8 +50,9 @@ namespace mongo { */ class FreeMonController { public: - explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network) - : _network(std::move(network)) {} + explicit FreeMonController(std::unique_ptr<FreeMonNetworkInterface> network, + bool useCrankForTest = false) + : _network(std::move(network)), _useCrankForTest(useCrankForTest) {} /** * Initializes free monitoring. @@ -65,6 +66,11 @@ public: void stop(); /** + * Turn the crank of the message queue by ignoring deadlines for N messages. + */ + void turnCrankForTest(size_t countMessagesToIgnore); + + /** * Add a metric collector to collect on registration */ void addRegistrationCollector(std::unique_ptr<FreeMonCollectorInterface> collector); @@ -103,7 +109,7 @@ public: * As with registerServerCommand() above, but undoes registration. * On complettion of this command, no further metrics will be transmitted. */ - Status unregisterServerCommand(); + boost::optional<Status> unregisterServerCommand(Milliseconds timeout); // TODO - add these methods // void getServerStatus(BSONObjBuilder* builder); @@ -162,6 +168,9 @@ private: // Background thead for agent stdx::thread _thread; + // Crank for test + bool _useCrankForTest; + // Background agent std::shared_ptr<FreeMonProcessor> _processor; }; diff --git a/src/mongo/db/free_mon/free_mon_controller_test.cpp b/src/mongo/db/free_mon/free_mon_controller_test.cpp index 9b7af519558..28ac5506f92 100644 --- a/src/mongo/db/free_mon/free_mon_controller_test.cpp +++ b/src/mongo/db/free_mon/free_mon_controller_test.cpp @@ -26,14 +26,14 @@ * then also delete it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kFTDC - +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl #include "mongo/platform/basic.h" #include <boost/filesystem.hpp> #include <future> #include <iostream> +#include <snappy.h> #include "mongo/db/free_mon/free_mon_controller.h" #include "mongo/db/free_mon/free_mon_storage.h" @@ -61,6 +61,7 @@ #include "mongo/db/service_context_noop.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/rpc/object_check.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/temp_dir.h" @@ -72,16 +73,168 @@ namespace mongo { namespace { + +class FreeMonMetricsCollectorMock : public FreeMonCollectorInterface { +public: + ~FreeMonMetricsCollectorMock() { + // ASSERT_TRUE(_state == State::kStarted); + } + + void collect(OperationContext* opCtx, BSONObjBuilder& builder) final { + _state = State::kStarted; + + builder.append("mock", "some data"); + + { + stdx::lock_guard<stdx::mutex> lck(_mutex); + + ++_counter; + + if (_counter == _wait) { + _condvar.notify_all(); + } + } + } + + std::string name() const final { + return "mock"; + } + + void setSignalOnCount(int c) { + _wait = c; + } + + std::uint32_t count() { + stdx::lock_guard<stdx::mutex> lck(_mutex); + return _counter; + } + + void wait() { + stdx::unique_lock<stdx::mutex> lck(_mutex); + while (_counter < _wait) { + _condvar.wait(lck); + } + } + +private: + /** + * Private enum to ensure caller uses class correctly. + */ + enum class State { + kNotStarted, + kStarted, + }; + + // state + State _state{State::kNotStarted}; + + std::uint32_t _counter{0}; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; + std::uint32_t _wait{0}; +}; + +std::vector<BSONObj> decompressMetrics(ConstDataRange cdr) { + std::string outBuffer; + snappy::Uncompress(cdr.data(), cdr.length(), &outBuffer); + + std::vector<BSONObj> metrics; + ConstDataRangeCursor cdrc(outBuffer.data(), outBuffer.data() + outBuffer.size()); + while (!cdrc.empty()) { + auto swDoc = cdrc.readAndAdvance<Validated<BSONObj>>(); + ASSERT_OK(swDoc.getStatus()); + metrics.emplace_back(swDoc.getValue().val.getOwned()); + } + + return metrics; +} + +/** + * Countdown latch that propagates a message. + */ +template <typename T> +class CountdownLatchResult { +public: + CountdownLatchResult(uint32_t count) : _count(count) {} + + /** + * Set the count of events to wait for. + */ + void reset(uint32_t count) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + ASSERT_EQ(_count, 0UL); + ASSERT_GT(count, 0UL); + + _count = count; + _payload = T(); + } + + /** + * Set the payload and signal waiter. + */ + void set(T payload) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (_count > 0) { + --_count; + _payload = std::move(payload); + _condvar.notify_one(); + } + } + + /** + * Waits for duration until N events have occured. + * + * Returns boost::none on timeout. + */ + boost::optional<T> wait_for(Milliseconds duration) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + if (!_condvar.wait_for( + lock, duration.toSystemDuration(), [this]() { return _count == 0; })) { + return {}; + } + + return _payload; + } + +private: + // Condition variable to signal consumer + stdx::condition_variable _condvar; + + // Lock for condition variable and to protect state + stdx::mutex _mutex; + + // Count to wait fore + uint32_t _count; + + // Provided payload + T _payload; +}; + class FreeMonNetworkInterfaceMock : public FreeMonNetworkInterface { public: struct Options { + // If sync = true, then execute the callback immediately and the subsequent future chain + // This allows us to ensure the follow up functions to a network request are executed + // before anything else is processed by FreeMonProcessor + bool doSync{false}; + + // Faults to inject for registration bool failRegisterHttp{false}; bool invalidRegister{false}; + bool haltRegister{false}; + + // Faults to inject for metrics + bool haltMetrics{false}; + bool fail2MetricsUploads{false}; + bool permanentlyDeleteAfter3{false}; }; explicit FreeMonNetworkInterfaceMock(executor::ThreadPoolTaskExecutor* threadPool, Options options) - : _threadPool(threadPool), _options(options) {} + : _threadPool(threadPool), _options(options), _countdownMetrics(0) {} ~FreeMonNetworkInterfaceMock() final = default; Future<FreeMonRegistrationResponse> sendRegistrationAsync( @@ -92,77 +245,146 @@ public: Promise<FreeMonRegistrationResponse> promise; auto future = promise.getFuture(); - auto shared_promise = promise.share(); + if (_options.doSync) { + promise.setFrom(doRegister(req)); + } else { + auto shared_promise = promise.share(); - auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this]( - const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this]( + const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { - if (_options.failRegisterHttp) { - shared_promise.setError( - Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure")); - return; - } + auto swResp = doRegister(req); + if (!swResp.isOK()) { + shared_promise.setError(swResp.getStatus()); + } else { + shared_promise.emplaceValue(swResp.getValue()); + } + + }); - auto resp = FreeMonRegistrationResponse(); - resp.setVersion(1); + ASSERT_OK(swSchedule.getStatus()); + } - if (_options.invalidRegister) { - resp.setVersion(42); - } + return future; + } - if (req.getId().is_initialized()) { - resp.setId(req.getId().get()); - } else { - resp.setId(UUID::gen().toString()); - } + StatusWith<FreeMonRegistrationResponse> doRegister(const FreeMonRegistrationRequest& req) { - resp.setReportingInterval(1); + if (_options.failRegisterHttp) { + return Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure"); + } - shared_promise.emplaceValue(resp); - }); - ASSERT_OK(swSchedule.getStatus()); + auto resp = FreeMonRegistrationResponse(); + resp.setVersion(1); - return future; + if (_options.invalidRegister) { + resp.setVersion(42); + } + + resp.setId("regId123"); + + if (_options.haltRegister) { + resp.setHaltMetricsUploading(true); + } + + resp.setReportingInterval(1); + + return resp; } + Future<FreeMonMetricsResponse> sendMetricsAsync(const FreeMonMetricsRequest& req) final { log() << "Sending Metrics ..."; - ASSERT_FALSE(req.getId().empty()); _metrics.addAndFetch(1); Promise<FreeMonMetricsResponse> promise; auto future = promise.getFuture(); - auto shared_promise = promise.share(); + if (_options.doSync) { + promise.setFrom(doMetrics(req)); + } else { + auto shared_promise = promise.share(); - auto swSchedule = _threadPool->scheduleWork( - [shared_promise, req](const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { - auto resp = FreeMonMetricsResponse(); - resp.setVersion(1); - resp.setReportingInterval(1); + auto swSchedule = _threadPool->scheduleWork([shared_promise, req, this]( + const executor::TaskExecutor::CallbackArgs& cbArgs) mutable { + + auto swResp = doMetrics(req); + if (!swResp.isOK()) { + shared_promise.setError(swResp.getStatus()); + } else { + shared_promise.emplaceValue(swResp.getValue()); + } - shared_promise.emplaceValue(resp); }); - ASSERT_OK(swSchedule.getStatus()); + ASSERT_OK(swSchedule.getStatus()); + } return future; } + StatusWith<FreeMonMetricsResponse> doMetrics(const FreeMonMetricsRequest& req) { + auto cdr = req.getMetrics(); + + { + stdx::lock_guard<stdx::mutex> lock(_metricsLock); + auto metrics = decompressMetrics(cdr); + _lastMetrics = metrics; + _countdownMetrics.set(metrics); + } + + if (_options.fail2MetricsUploads && _metrics.loadRelaxed() < 3) { + return Status(ErrorCodes::FreeMonHttpTemporaryFailure, "Mock failure"); + } + + auto resp = FreeMonMetricsResponse(); + resp.setVersion(1); + resp.setReportingInterval(1); + + resp.setId("metricsId456"_sd); + + if (_options.haltMetrics) { + resp.setHaltMetricsUploading(true); + } + + if (_options.permanentlyDeleteAfter3 && _metrics.loadRelaxed() > 3) { + resp.setPermanentlyDelete(true); + } + + return resp; + } + int32_t getRegistersCalls() const { return _registers.load(); } + int32_t getMetricsCalls() const { return _metrics.load(); } + boost::optional<std::vector<BSONObj>> waitMetricsCalls(uint32_t count, Milliseconds wait) { + _countdownMetrics.reset(count); + return _countdownMetrics.wait_for(wait); + } + + std::vector<BSONObj> getLastMetrics() { + stdx::lock_guard<stdx::mutex> lock(_metricsLock); + return _lastMetrics; + } + + private: AtomicInt32 _registers; AtomicInt32 _metrics; executor::ThreadPoolTaskExecutor* _threadPool; + stdx::mutex _metricsLock; + std::vector<BSONObj> _lastMetrics; + Options _options; + + CountdownLatchResult<std::vector<BSONObj>> _countdownMetrics; }; class FreeMonControllerTest : public ServiceContextMongoDTest { @@ -271,10 +493,60 @@ TEST(FreeMonRetryTest, TestRegistration) { counter.reset(); } + + // Validate max timeout + for (int j = 0; j < 3; j++) { + // Fail requests + for (int i = 1; i <= 163; ++i) { + ASSERT_TRUE(counter.incrementError()); + } + ASSERT_FALSE(counter.incrementError()); + + counter.reset(); + } +} + +// Positive: Ensure deadlines sort properly +TEST(FreeMonRetryTest, TestMetrics) { + PseudoRandom random(0); + MetricsRetryCounter counter(random); + counter.reset(); + + ASSERT_EQ(counter.getNextDuration(), Seconds(1)); + ASSERT_EQ(counter.getNextDuration(), Seconds(1)); + + int32_t minTime = 1; + for (int j = 0; j < 3; j++) { + // Fail requests + for (int i = 0; i <= 6; ++i) { + ASSERT_TRUE(counter.incrementError()); + + int64_t base = pow(2, i); + ASSERT_RANGE(Seconds(base), Seconds(minTime / 2), Seconds(minTime)); + } + + ASSERT_TRUE(counter.incrementError()); + ASSERT_RANGE(Seconds(64), Seconds(minTime / 2), Seconds(minTime)); + ASSERT_TRUE(counter.incrementError()); + ASSERT_RANGE(Seconds(64), Seconds(minTime / 2), Seconds(minTime)); + + counter.reset(); + } + + // Validate max timeout + for (int j = 0; j < 3; j++) { + // Fail requests + for (int i = 1; i < 9456; ++i) { + ASSERT_TRUE(counter.incrementError()); + } + ASSERT_FALSE(counter.incrementError()); + + counter.reset(); + } } // Positive: Ensure the response is validated correctly -TEST(FreeMonProcessorTest, TestResponseValidation) { +TEST(FreeMonProcessorTest, TestRegistrationResponseValidation) { ASSERT_OK(FreeMonProcessor::validateRegistrationResponse(FreeMonRegistrationResponse::parse( IDLParserErrorContext("foo"), BSON("version" << 1LL << "haltMetricsUploading" << false << "id" @@ -370,20 +642,220 @@ TEST(FreeMonProcessorTest, TestResponseValidation) { << (60LL * 60 * 24 + 1LL))))); } + +// Positive: Ensure the response is validated correctly +TEST(FreeMonProcessorTest, TestMetricsResponseValidation) { + ASSERT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + + BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: bad protocol version + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 42LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: halt uploading + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << true << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large registartation id + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << std::string(5000, 'a') + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large URL + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse( + FreeMonMetricsResponse::parse(IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false + + << "permanentlyDelete" + << false + << "id" + << "mock123" + << "informationalURL" + << std::string(5000, 'b') + << "message" + << "msg456" + << "reportingInterval" + << 1LL)))); + + // Negative: large message + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << std::string(5000, 'c') + << "reportingInterval" + << 1LL)))); + + // Negative: too small a reporting interval + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << 0LL)))); + + // Negative: too large a reporting interval + ASSERT_NOT_OK(FreeMonProcessor::validateMetricsResponse(FreeMonMetricsResponse::parse( + IDLParserErrorContext("foo"), + BSON("version" << 1LL << "haltMetricsUploading" << false << "permanentlyDelete" << false + << "id" + << "mock123" + << "informationalURL" + << "http://www.example.com/123" + << "message" + << "msg456" + << "reportingInterval" + << (60LL * 60 * 24 + 1LL))))); +} + +/** + * Fluent class that encapsulates how many turns of a crank is needed to do a particular operation. + * + * All commands take 1 turn except registerCommand and metricsSend since these have a HTTP send an + * HTTP receive. + */ +class Turner { +public: + Turner() = default; + + Turner& registerServer() { + return inc(1, 1); + } + + Turner& registerCommand(size_t count = 1) { + return inc(2, count); + } + + Turner& unRegisterCommand() { + return inc(1, 1); + } + + Turner& collect(size_t count = 1) { + return inc(1, count); + } + + Turner& metricsSend(size_t count = 1) { + return inc(2, count); + } + + operator size_t() { + return _count; + } + +private: + Turner& inc(size_t perOperatioCost, size_t numberOfOperations) { + _count += (perOperatioCost * numberOfOperations); + return *this; + } + +private: + size_t _count; +}; + +/** + * Utility class to manage controller setup and lifecycle for testing. + */ +struct ControllerHolder { + ControllerHolder(executor::ThreadPoolTaskExecutor* pool, + FreeMonNetworkInterfaceMock::Options opts, + bool useCrankForTest = true) { + auto registerCollectorUnique = stdx::make_unique<FreeMonMetricsCollectorMock>(); + auto metricsCollectorUnique = stdx::make_unique<FreeMonMetricsCollectorMock>(); + + // If we want to manually turn the crank the queue, we must process the messages + // synchronously + if (useCrankForTest) { + opts.doSync = true; + } + + ASSERT_EQ(opts.doSync, useCrankForTest); + + auto networkUnique = + std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock(pool, opts)); + network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get()); + controller = std::make_unique<FreeMonController>(std::move(networkUnique), useCrankForTest); + + registerCollector = registerCollectorUnique.get(); + metricsCollector = metricsCollectorUnique.get(); + + controller->addRegistrationCollector(std::move(registerCollectorUnique)); + controller->addMetricsCollector(std::move(metricsCollectorUnique)); + } + + ~ControllerHolder() { + controller->stop(); + } + + FreeMonController* operator->() { + return controller.get(); + } + + FreeMonMetricsCollectorMock* registerCollector; + FreeMonMetricsCollectorMock* metricsCollector; + FreeMonNetworkInterfaceMock* network; + + std::unique_ptr<FreeMonController> controller; +}; + // Positive: Test Register works TEST_F(FreeMonControllerTest, TestRegister) { - // FreeMonNetworkInterfaceMock network; - FreeMonController controller( - std::unique_ptr<FreeMonNetworkInterface>(new FreeMonNetworkInterfaceMock( - _mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()))); + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); - controller.start(RegistrationType::DoNotRegister); + controller->start(RegistrationType::DoNotRegister); - ASSERT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(5)))); + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerCommand()); ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); - controller.stop(); + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 0UL); } // Negatve: Test Register times out if network stack drops messages @@ -391,19 +863,17 @@ TEST_F(FreeMonControllerTest, TestRegisterTimeout) { FreeMonNetworkInterfaceMock::Options opts; opts.failRegisterHttp = true; - auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>( - new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts)); - auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get()); - FreeMonController controller(std::move(networkUnique)); - controller.start(RegistrationType::DoNotRegister); + ControllerHolder controller(_mockThreadPool.get(), opts); - ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15)))); + controller->start(RegistrationType::DoNotRegister); - ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized()); - ASSERT_GTE(network->getRegistersCalls(), 2); + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + controller->turnCrankForTest(Turner().registerCommand(2)); - controller.stop(); + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::pending); + ASSERT_GTE(controller.network->getRegistersCalls(), 2); + ASSERT_GTE(controller.registerCollector->count(), 2UL); } // Negatve: Test Register times out if the registration is wrong @@ -411,20 +881,345 @@ TEST_F(FreeMonControllerTest, TestRegisterFail) { FreeMonNetworkInterfaceMock::Options opts; opts.invalidRegister = true; - auto networkUnique = std::unique_ptr<FreeMonNetworkInterface>( - new FreeMonNetworkInterfaceMock(_mockThreadPool.get(), opts)); - auto network = static_cast<FreeMonNetworkInterfaceMock*>(networkUnique.get()); - FreeMonController controller(std::move(networkUnique)); + ControllerHolder controller(_mockThreadPool.get(), opts, false); + + controller->start(RegistrationType::DoNotRegister); + + ASSERT_NOT_OK(controller->registerServerCommand(duration_cast<Milliseconds>(Seconds(15)))); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled); + ASSERT_EQ(controller.network->getRegistersCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); +} + +// Positive: Ensure registration halts +TEST_F(FreeMonControllerTest, TestRegisterHalts) { + + FreeMonNetworkInterfaceMock::Options opts; + opts.haltRegister = true; + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller->start(RegistrationType::DoNotRegister); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + controller->turnCrankForTest(Turner().registerCommand()); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled); + ASSERT_EQ(controller.network->getRegistersCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); +} + +// Positive: Test Metrics works on server register +TEST_F(FreeMonControllerTest, TestMetrics) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + controller->start(RegistrationType::RegisterOnStart); + + controller->turnCrankForTest( + Turner().registerServer().registerCommand().collect(2).metricsSend()); + + ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); + + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 1UL); +} + + +// Positive: Test Metrics is collected but no registration happens on empty storage +TEST_F(FreeMonControllerTest, TestMetricsWithEmptyStorage) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest(Turner().registerServer().collect(4)); + + ASSERT_GTE(controller.network->getRegistersCalls(), 0); + ASSERT_GTE(controller.network->getMetricsCalls(), 0); + + ASSERT_EQ(controller.registerCollector->count(), 0UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL); +} + +FreeMonStorageState initStorage(StorageStateEnum e) { + FreeMonStorageState storage; + storage.setVersion(1UL); + + storage.setRegistrationId("Foo"); + storage.setState(e); + storage.setInformationalURL("http://www.example.com"); + storage.setMessage("Hello World"); + storage.setUserReminder(""); + return storage; +} + +// Positive: Test Metrics is collected and implicit registration happens when storage is initialized +TEST_F(FreeMonControllerTest, TestMetricsWithEnabledStorage) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::enabled)); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest( + Turner().registerServer().registerCommand().collect(2).metricsSend()); + + ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); + + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 1UL); +} + +// Positive: Test Metrics is collected but no registration happens on disabled storage +TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorage) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled)); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest(Turner().registerServer().collect(4)); + + ASSERT_GTE(controller.network->getRegistersCalls(), 0); + ASSERT_GTE(controller.network->getMetricsCalls(), 0); + + ASSERT_EQ(controller.registerCollector->count(), 0UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL); +} + + +// Positive: Test Metrics is collected but no registration happens on disabled storage until user +// registers +TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorageThenRegister) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled)); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest(Turner().registerServer().collect(4)); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend()); + + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL + 2UL); +} + +// Positive: Test Metrics is collected but no registration happens, then register, then Unregister, +// and finally register again +TEST_F(FreeMonControllerTest, TestMetricsWithDisabledStorageThenRegisterAndReregister) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::disabled)); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest(Turner().registerServer().collect(4)); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend()); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::enabled); - controller.start(RegistrationType::DoNotRegister); + ASSERT_OK(controller->unregisterServerCommand(Milliseconds::min())); - ASSERT_NOT_OK(controller.registerServerCommand(duration_cast<Milliseconds>(Seconds(15)))); + controller->turnCrankForTest(Turner().unRegisterCommand().collect(3)); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::disabled); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerCommand().collect(2).metricsSend()); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get())->getState() == StorageStateEnum::enabled); + + ASSERT_GTE(controller.network->getRegistersCalls(), 2); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 2UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL + 3UL + 2UL); +} + +// Positive: Test DeRegister cancels a register that is in the middle of retrying +TEST_F(FreeMonControllerTest, TestMetricsUnregisterCancelsRegister) { + FreeMonNetworkInterfaceMock::Options opts; + opts.failRegisterHttp = true; + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller->start(RegistrationType::DoNotRegister); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + controller->turnCrankForTest(Turner().registerCommand(2)); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::pending); + + ASSERT_GTE(controller.network->getRegistersCalls(), 2); + ASSERT_GTE(controller.registerCollector->count(), 2UL); + + ASSERT_OK(controller->unregisterServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().unRegisterCommand()); + + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled); + + ASSERT_GTE(controller.network->getRegistersCalls(), 2); + ASSERT_GTE(controller.registerCollector->count(), 2UL); +} + +// Positive: Test Metrics halts +TEST_F(FreeMonControllerTest, TestMetricsHalt) { + FreeMonNetworkInterfaceMock::Options opts; + opts.haltMetrics = true; + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller->start(RegistrationType::RegisterOnStart); + + controller->turnCrankForTest( + Turner().registerServer().registerCommand().collect(4).metricsSend()); + + ASSERT_TRUE(!FreeMonStorage::read(_opCtx.get()).get().getRegistrationId().empty()); + ASSERT_TRUE(FreeMonStorage::read(_opCtx.get()).get().getState() == StorageStateEnum::disabled); + + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 4UL); +} + + +// Positive: Test Metrics permanently deletes if requested +TEST_F(FreeMonControllerTest, TestMetricsPermanentlyDelete) { + FreeMonNetworkInterfaceMock::Options opts; + opts.permanentlyDeleteAfter3 = true; + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller->start(RegistrationType::RegisterOnStart); + + controller->turnCrankForTest( + Turner().registerServer().registerCommand().collect(5).metricsSend(4)); ASSERT_FALSE(FreeMonStorage::read(_opCtx.get()).is_initialized()); - ASSERT_EQ(network->getRegistersCalls(), 1); - controller.stop(); + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 3); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 3UL); +} + +// Positive: ensure registration id rotates +TEST_F(FreeMonControllerTest, TestRegistrationIdRotatesAfterRegistration) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + FreeMonStorage::replace(_opCtx.get(), initStorage(StorageStateEnum::enabled)); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + controller->turnCrankForTest(Turner().registerServer().registerCommand().collect(2)); + + // Ensure registration rotated the id + ASSERT_EQ(FreeMonStorage::read(_opCtx.get())->getRegistrationId(), "regId123"); + + controller->turnCrankForTest(Turner().metricsSend().collect()); + + // Ensure metrics rotated the id + ASSERT_EQ(FreeMonStorage::read(_opCtx.get())->getRegistrationId(), "metricsId456"); + + ASSERT_GTE(controller.network->getRegistersCalls(), 1); + ASSERT_GTE(controller.network->getMetricsCalls(), 1); + + ASSERT_EQ(controller.registerCollector->count(), 1UL); + ASSERT_GTE(controller.metricsCollector->count(), 1UL); +} + +// Positive: ensure pre-registration metrics batching occurs +// Positive: ensure we only get two metrics each time +TEST_F(FreeMonControllerTest, TestPreRegistrationMetricBatching) { + ControllerHolder controller(_mockThreadPool.get(), FreeMonNetworkInterfaceMock::Options()); + + controller->start(RegistrationType::RegisterAfterOnTransitionToPrimary); + + controller->turnCrankForTest(Turner().registerServer().collect(3)); + + ASSERT_OK(controller->registerServerCommand(Milliseconds::min())); + + controller->turnCrankForTest(Turner().registerCommand().collect(1)); + + controller->turnCrankForTest(Turner().metricsSend().collect(1)); + + // Ensure we sent all the metrics batched before registration + ASSERT_EQ(controller.network->getLastMetrics().size(), 4UL); + + controller->turnCrankForTest(Turner().metricsSend().collect(1)); + + // Ensure we only send 2 metrics in the normal happy case + ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL); +} + +// Negative: Test metrics buffers on failure, and retries +TEST_F(FreeMonControllerTest, TestMetricBatchingOnError) { + FreeMonNetworkInterfaceMock::Options opts; + opts.fail2MetricsUploads = true; + ControllerHolder controller(_mockThreadPool.get(), opts); + + controller->start(RegistrationType::RegisterOnStart); + + controller->turnCrankForTest(Turner().registerServer().registerCommand().collect(2)); + + controller->turnCrankForTest(Turner().metricsSend().collect()); + + // Ensure we sent all the metrics batched before registration + ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL); + + controller->turnCrankForTest(Turner().metricsSend().collect()); + + // Ensure we resent all the failed metrics + ASSERT_EQ(controller.network->getLastMetrics().size(), 3UL); +} + +// Negative: Test metrics buffers on failure, and retries and ensure 2 metrics occurs after a blip +// of an error +// Note: this test operates in real-time because it needs to test multiple retries matched with +// metrics collection. +TEST_F(FreeMonControllerTest, TestMetricBatchingOnErrorRealtime) { + FreeMonNetworkInterfaceMock::Options opts; + opts.fail2MetricsUploads = true; + ControllerHolder controller(_mockThreadPool.get(), opts, false); + + controller->start(RegistrationType::RegisterOnStart); + + // Ensure the first upload sends 2 samples + ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized()); + ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL); + + // Ensure the second upload sends 3 samples because first failed + ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized()); + ASSERT_EQ(controller.network->getLastMetrics().size(), 3UL); + + // Ensure the third upload sends 5 samples because second failed + // Since the second retry is 2s, we collected 2 samples + ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized()); + ASSERT_GTE(controller.network->getLastMetrics().size(), 4UL); + + // Ensure the fourth upload sends 2 samples + ASSERT_TRUE(controller.network->waitMetricsCalls(1, Seconds(5)).is_initialized()); + ASSERT_EQ(controller.network->getLastMetrics().size(), 2UL); } +// TODO: Positive: ensure optional fields are rotated + +// TODO: Positive: Test Metrics works on command register on primary +// TODO: Positive: Test Metrics works on startup register on secondary +// TODO: Positive: Test Metrics works on secondary after opObserver register +// TODO: Positive: Test Metrics works on secondary after opObserver de-register + } // namespace } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_message.h b/src/mongo/db/free_mon/free_mon_message.h index b7b514ee775..2247532b3be 100644 --- a/src/mongo/db/free_mon/free_mon_message.h +++ b/src/mongo/db/free_mon/free_mon_message.h @@ -56,25 +56,39 @@ enum class FreeMonMessageType { RegisterCommand, /** - * Internal: Generated when an async HTTP request completes succesfully. + * Internal: Generated when an async registration HTTP request completes succesfully. */ AsyncRegisterComplete, /** - * Internal: Generated when an async HTTP request completes with an error. + * Internal: Generated when an async registration HTTP request completes with an error. */ AsyncRegisterFail, /** - * Unregister server from server command. - */ + * Unregister server from server command. + */ UnregisterCommand, - // TODO - add metrics messages - // MetricsCollect - Cloud wants the "wait" time to calculated when the message processing - // starts, not ends - // AsyncMetricsComplete, - // AsyncMetricsFail, + /** + * Internal: Collect metrics and buffer them in-memory + */ + MetricsCollect, + + /** + * Internal: Send metrics to the cloud endpoint by beginning an async HTTP request. + */ + MetricsSend, + + /** + * Internal: Generated when an async metrics HTTP request completes succesfully. + */ + AsyncMetricsComplete, + + /** + * Internal: Generated when an async metrics HTTP request completes with an error. + */ + AsyncMetricsFail, // TODO - add replication messages // OnPrimary, @@ -178,6 +192,17 @@ struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncRegisterFail> { using payload_type = Status; }; +template <> +struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncMetricsComplete> { + using payload_type = FreeMonMetricsResponse; +}; + +template <> +struct FreeMonPayloadForMessage<FreeMonMessageType::AsyncMetricsFail> { + using payload_type = Status; +}; + + /** * Message with a generic payload based on the type of message. */ @@ -262,31 +287,52 @@ private: }; /** - * Custom waitable message for Register Command message. + * For the messages that the caller needs to wait on, this provides a mechanism to wait on messages + * to be processed. +*/ +template <FreeMonMessageType typeT> +struct FreeMonWaitablePayloadForMessage { + using payload_type = void; +}; + +template <> +struct FreeMonWaitablePayloadForMessage<FreeMonMessageType::RegisterCommand> { + using payload_type = std::vector<std::string>; +}; + +template <> +struct FreeMonWaitablePayloadForMessage<FreeMonMessageType::UnregisterCommand> { + // The parameter is unused but most not be void. + using payload_type = bool; +}; + +/** + * Message with a generic payload based on the type of message. */ -class FreeMonRegisterCommandMessage : public FreeMonMessage { +template <FreeMonMessageType typeT> +class FreeMonWaitableMessageWithPayload : public FreeMonMessage { public: + using payload_type = typename FreeMonWaitablePayloadForMessage<typeT>::payload_type; + /** * Create a message that should processed immediately. */ - static std::shared_ptr<FreeMonRegisterCommandMessage> createNow( - const std::vector<std::string>& tags) { - return std::make_shared<FreeMonRegisterCommandMessage>(tags, Date_t::min()); + static std::shared_ptr<FreeMonWaitableMessageWithPayload> createNow(payload_type t) { + return std::make_shared<FreeMonWaitableMessageWithPayload>(t, Date_t::min()); } /** - * Create a message that should processed after the specified deadline. + * Create a message that should processed immediately. */ - static std::shared_ptr<FreeMonRegisterCommandMessage> createWithDeadline( - const std::vector<std::string>& tags, Date_t deadline) { - return std::make_shared<FreeMonRegisterCommandMessage>(tags, deadline); + static std::shared_ptr<FreeMonWaitableMessageWithPayload> createWithDeadline(payload_type t, + Date_t deadline) { + return std::make_shared<FreeMonWaitableMessageWithPayload>(t, deadline); } - /** - * Get tags. + * Get message payload. */ - const std::vector<std::string>& getTags() const { - return _tags; + const payload_type& getPayload() const { + return _t; } /** @@ -306,15 +352,17 @@ public: } public: - FreeMonRegisterCommandMessage(std::vector<std::string> tags, Date_t deadline) - : FreeMonMessage(FreeMonMessageType::RegisterCommand, deadline), _tags(std::move(tags)) {} + FreeMonWaitableMessageWithPayload(payload_type t, Date_t deadline) + : FreeMonMessage(typeT, deadline), _t(std::move(t)) {} private: + // Message payload + payload_type _t; + // WaitaleResult to notify caller WaitableResult _waitable{}; - - // Tags - const std::vector<std::string> _tags; }; +using FreeMonRegisterCommandMessage = + FreeMonWaitableMessageWithPayload<FreeMonMessageType::RegisterCommand>; } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_mongod.cpp b/src/mongo/db/free_mon/free_mon_mongod.cpp index 4201d4e39ba..df72066aabf 100644 --- a/src/mongo/db/free_mon/free_mon_mongod.cpp +++ b/src/mongo/db/free_mon/free_mon_mongod.cpp @@ -33,6 +33,7 @@ #include "mongo/db/free_mon/free_mon_mongod.h" #include <mutex> +#include <snappy.h> #include <string> #include "mongo/base/data_type_validated.h" diff --git a/src/mongo/db/free_mon/free_mon_options.cpp b/src/mongo/db/free_mon/free_mon_options.cpp index 81293405b59..c99a899ccd9 100644 --- a/src/mongo/db/free_mon/free_mon_options.cpp +++ b/src/mongo/db/free_mon/free_mon_options.cpp @@ -70,7 +70,6 @@ StatusWith<EnableCloudStateEnum> EnableCloudState_parse(StringData value) { return EnableCloudStateEnum::kRuntime; } - // TODO return Status(ErrorCodes::InvalidOptions, "Unrecognized state"); } diff --git a/src/mongo/db/free_mon/free_mon_processor.cpp b/src/mongo/db/free_mon/free_mon_processor.cpp index 5f3e0121914..53e60ed316d 100644 --- a/src/mongo/db/free_mon/free_mon_processor.cpp +++ b/src/mongo/db/free_mon/free_mon_processor.cpp @@ -33,6 +33,8 @@ #include "mongo/db/free_mon/free_mon_processor.h" #include <functional> +#include <numeric> +#include <snappy.h> #include <tuple> #include <utility> @@ -57,6 +59,8 @@ constexpr auto kInformationalURLMaxLength = 4096; constexpr auto kInformationalMessageMaxLength = 4096; constexpr auto kUserReminderMaxLength = 4096; +constexpr Seconds kDefaultMetricsGatherInterval(1); + constexpr auto kReportingIntervalMinutesMin = 1; constexpr auto kReportingIntervalMinutesMax = 60 * 60 * 24; @@ -80,7 +84,6 @@ bool RegistrationRetryCounter::incrementError() { _current = _base + Seconds(randomJitter(_random, kStage1JitterMin, kStage1JitterMax)); ++_retryCount; } else { - _base = _base; _current = _base + Seconds(randomJitter(_random, kStage2JitterMin, kStage2JitterMax)); } @@ -93,6 +96,43 @@ bool RegistrationRetryCounter::incrementError() { return true; } +void MetricsRetryCounter::reset() { + _current = _min; + _base = _min; + _retryCount = 0; + _total = Hours(0); +} + +bool MetricsRetryCounter::incrementError() { + _base = static_cast<int>(pow(2, std::min(6, static_cast<int>(_retryCount)))) * _min; + _current = _base + Seconds(randomJitter(_random, _min.count() / 2, _min.count())); + ++_retryCount; + + _total += _current; + + if (_total > kDurationMax) { + return false; + } + + return true; +} + +FreeMonProcessor::FreeMonProcessor(FreeMonCollectorCollection& registration, + FreeMonCollectorCollection& metrics, + FreeMonNetworkInterface* network, + bool useCrankForTest) + : _registration(registration), + _metrics(metrics), + _network(network), + _random(Date_t::now().asInt64()), + _registrationRetry(_random), + _metricsRetry(_random), + _metricsGatherInterval(kDefaultMetricsGatherInterval), + _queue(useCrankForTest) { + _registrationRetry.reset(); + _metricsRetry.reset(); +} + void FreeMonProcessor::enqueue(std::shared_ptr<FreeMonMessage> msg) { _queue.enqueue(std::move(msg)); } @@ -101,6 +141,14 @@ void FreeMonProcessor::stop() { _queue.stop(); } +void FreeMonProcessor::turnCrankForTest(size_t countMessagesToIgnore) { + _countdown.reset(countMessagesToIgnore); + + _queue.turnCrankForTest(countMessagesToIgnore); + + _countdown.wait(); +} + void FreeMonProcessor::run() { try { @@ -114,10 +162,12 @@ void FreeMonProcessor::run() { return; } + auto msg = item.get(); + // Do work here - switch (item.get()->getType()) { + switch (msg->getType()) { case FreeMonMessageType::RegisterCommand: { - doCommandRegister(client, item.get()); + doCommandRegister(client, msg); break; } case FreeMonMessageType::RegisterServer: { @@ -125,7 +175,13 @@ void FreeMonProcessor::run() { client, checked_cast< FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>*>( - item.get().get())); + msg.get())); + break; + } + case FreeMonMessageType::UnregisterCommand: { + doCommandUnregister(client, + checked_cast<FreeMonWaitableMessageWithPayload< + FreeMonMessageType::UnregisterCommand>*>(msg.get())); break; } case FreeMonMessageType::AsyncRegisterComplete: { @@ -133,7 +189,7 @@ void FreeMonProcessor::run() { client, checked_cast< FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>*>( - item.get().get())); + msg.get())); break; } case FreeMonMessageType::AsyncRegisterFail: { @@ -141,16 +197,39 @@ void FreeMonProcessor::run() { client, checked_cast< FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>*>( - item.get().get())); + msg.get())); break; } - case FreeMonMessageType::UnregisterCommand: { - doUnregister(client); + case FreeMonMessageType::MetricsCollect: { + doMetricsCollect(client); + break; + } + case FreeMonMessageType::MetricsSend: { + doMetricsSend(client); + break; + } + case FreeMonMessageType::AsyncMetricsComplete: { + doAsyncMetricsComplete( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>*>( + msg.get())); + break; + } + case FreeMonMessageType::AsyncMetricsFail: { + doAsyncMetricsFail( + client, + checked_cast< + FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>*>( + msg.get())); break; } default: MONGO_UNREACHABLE; } + + // Record that we have finished processing the message for testing purposes. + _countdown.countDown(); } } catch (...) { // Stop the queue @@ -203,6 +282,8 @@ void FreeMonProcessor::writeState(Client* client) { // If our in-memory copy matches the last read, then write it to disk if (state == _lastReadState) { FreeMonStorage::replace(optCtx.get(), _state); + + _lastReadState = _state; } } } @@ -215,7 +296,7 @@ void FreeMonProcessor::doServerRegister( if (msg->getPayload().first == RegistrationType::RegisterOnStart) { enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); } else if (msg->getPayload().first == RegistrationType::RegisterAfterOnTransitionToPrimary) { - // Check if we need to wait to become primary + // Check if we need to wait to become primary: // If the 'admin.system.version' has content, do not wait and just re-register // If the collection is empty, wait until we become primary // If we become secondary, OpObserver hooks will tell us our registration id @@ -225,17 +306,26 @@ void FreeMonProcessor::doServerRegister( // Check if there is an existing document auto state = FreeMonStorage::read(optCtx.get()); - // If there is no document, we may be in a replica set and may need to register after - // becoming primary - // since we cannot record the registration id until after becoming primary + // If there is no document, we may be: + // 1. in a replica set and may need to register after becoming primary since we cannot + // record the registration id until after becoming primary + // 2. a standalone which has never been registered + // if (!state.is_initialized()) { - // TODO: hook OnTransitionToPrimary instead of this hack - enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + // TODO: hook OnTransitionToPrimary } else { - // If we have state, then we can do the normal register on startup - enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + // We are standalone, if we have a registration id, then send a registration + // notification, else wait for the user to register us + if (state.get().getState() == StorageStateEnum::enabled) { + enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second)); + } } + } else { + MONGO_UNREACHABLE; } + + // Enqueue the first metrics gather + enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsCollect)); } namespace { @@ -296,13 +386,13 @@ void FreeMonProcessor::doCommandRegister(Client* client, req.setVersion(kProtocolVersion); - if (!msg->getTags().empty()) { + if (!msg->getPayload().empty()) { // Cache the tags for subsequent retries - _tags = msg->getTags(); + _tags = msg->getPayload(); } if (!_tags.empty()) { - req.setTag(transformVector(msg->getTags())); + req.setTag(transformVector(msg->getPayload())); } // Collect the data @@ -310,6 +400,11 @@ void FreeMonProcessor::doCommandRegister(Client* client, req.setPayload(std::get<0>(collect)); + // Record that the registration is pending + _state.setState(StorageStateEnum::pending); + + writeState(client); + // Send the async request _futureRegistrationResponse = doAsyncCallback<FreeMonRegistrationResponse>( this, @@ -398,6 +493,75 @@ void FreeMonProcessor::notifyPendingRegisters(const Status s) { _pendingRegisters.clear(); } + +Status FreeMonProcessor::validateMetricsResponse(const FreeMonMetricsResponse& resp) { + // Any validation failure stops registration from proceeding to upload + if (resp.getVersion() != kProtocolVersion) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Unexpected metrics response protocol version, expected '" + << kProtocolVersion + << "', received '" + << resp.getVersion() + << "'"); + } + + if (resp.getId().is_initialized() && resp.getId().get().size() >= kRegistrationIdMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Id is '" << resp.getId().get().size() + << "' bytes in length, maximum allowed length is '" + << kRegistrationIdMaxLength + << "'"); + } + + if (resp.getInformationalURL().is_initialized() && + resp.getInformationalURL().get().size() >= kInformationalURLMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "InformationURL is '" + << resp.getInformationalURL().get().size() + << "' bytes in length, maximum allowed length is '" + << kInformationalURLMaxLength + << "'"); + } + + if (resp.getMessage().is_initialized() && + resp.getMessage().get().size() >= kInformationalMessageMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Message is '" << resp.getMessage().get().size() + << "' bytes in length, maximum allowed length is '" + << kInformationalMessageMaxLength + << "'"); + } + + if (resp.getUserReminder().is_initialized() && + resp.getUserReminder().get().size() >= kUserReminderMaxLength) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "UserReminder is '" << resp.getUserReminder().get().size() + << "' bytes in length, maximum allowed length is '" + << kUserReminderMaxLength + << "'"); + } + + if (resp.getReportingInterval() < kReportingIntervalMinutesMin || + resp.getReportingInterval() > kReportingIntervalMinutesMax) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Reporting Interval '" << resp.getReportingInterval() + << "' must be in the range [" + << kReportingIntervalMinutesMin + << "," + << kReportingIntervalMinutesMax + << "]"); + } + + // Did cloud ask us to stop uploading? + if (resp.getHaltMetricsUploading()) { + return Status(ErrorCodes::FreeMonHttpPermanentFailure, + str::stream() << "Halting metrics upload due to response"); + } + + return Status::OK(); +} + + void FreeMonProcessor::doAsyncRegisterComplete( Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg) { @@ -405,12 +569,24 @@ void FreeMonProcessor::doAsyncRegisterComplete( // Our request is no longer in-progress so delete it _futureRegistrationResponse.reset(); + if (_state.getState() != StorageStateEnum::pending) { + notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled")); + + return; + } + auto& resp = msg->getPayload(); Status s = validateRegistrationResponse(resp); if (!s.isOK()) { warning() << "Free Monitoring registration halted due to " << s; + // Disable on any error + _state.setState(StorageStateEnum::disabled); + + // Persist state + writeState(client); + notifyPendingRegisters(s); // If validation fails, we do not retry @@ -431,6 +607,8 @@ void FreeMonProcessor::doAsyncRegisterComplete( _state.setMessage(resp.getMessage()); _state.setInformationalURL(resp.getInformationalURL()); + _state.setState(StorageStateEnum::enabled); + // Persist state writeState(client); @@ -440,9 +618,11 @@ void FreeMonProcessor::doAsyncRegisterComplete( // Notify waiters notifyPendingRegisters(Status::OK()); - // TODO: Enqueue next metrics upload - // enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCallTimer, - // _registrationRetry.getNextDeadline(client))); + log() << "Free Monitoring is Enabled. Frequency: " << resp.getReportingInterval() << " seconds"; + + // Enqueue next metrics upload + enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend, + _registrationRetry.getNextDeadline(client))); } void FreeMonProcessor::doAsyncRegisterFail( @@ -451,6 +631,12 @@ void FreeMonProcessor::doAsyncRegisterFail( // Our request is no longer in-progress so delete it _futureRegistrationResponse.reset(); + if (_state.getState() != StorageStateEnum::pending) { + notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled")); + + return; + } + if (!_registrationRetry.incrementError()) { // We have exceeded our retry warning() << "Free Monitoring is abandoning registration after excess retries"; @@ -465,6 +651,167 @@ void FreeMonProcessor::doAsyncRegisterFail( _tags, _registrationRetry.getNextDeadline(client))); } -void FreeMonProcessor::doUnregister(Client* /*client*/) {} +void FreeMonProcessor::doCommandUnregister( + Client* client, FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>* msg) { + // Treat this request as idempotent + if (_state.getState() != StorageStateEnum::disabled) { + + _state.setState(StorageStateEnum::disabled); + + writeState(client); + + log() << "Free Monitoring is Disabled"; + } + + msg->setStatus(Status::OK()); +} + +void FreeMonProcessor::doMetricsCollect(Client* client) { + // Collect the time at the beginning so the time to collect does not affect the schedule + Date_t now = client->getServiceContext()->getPreciseClockSource()->now(); + + // Collect the data + auto collect = _metrics.collect(client); + + _metricsBuffer.push(std::get<0>(collect)); + + // Enqueue the next metrics collect based on when we started processing the last collection. + enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCollect, + now + _metricsGatherInterval)); +} + +std::string compressMetrics(MetricsBuffer& buffer) { + + std::vector<char> rawBuffer; + + size_t totalReserve = std::accumulate( + buffer.begin(), buffer.end(), 0, [](size_t sum, auto& o) { return sum + o.objsize(); }); + + rawBuffer.reserve(totalReserve); + + int count = 0; + for (const auto& obj : buffer) { + ++count; + std::copy(obj.objdata(), obj.objdata() + obj.objsize(), std::back_inserter(rawBuffer)); + } + + std::string outBuffer; + + snappy::Compress(rawBuffer.data(), rawBuffer.size(), &outBuffer); + + return outBuffer; +} + +void FreeMonProcessor::doMetricsSend(Client* client) { + readState(client); + + if (_state.getState() != StorageStateEnum::enabled) { + // If we are recently disabled, then stop sending metrics + return; + } + + // Build outbound request + FreeMonMetricsRequest req; + invariant(!_state.getRegistrationId().empty()); + + req.setVersion(kProtocolVersion); + req.setEncoding(MetricsEncodingEnum::snappy); + + req.setId(_state.getRegistrationId()); + + // Get the buffered metrics + auto metrics = compressMetrics(_metricsBuffer); + req.setMetrics(ConstDataRange(metrics.data(), metrics.size())); + + // Send the async request + doAsyncCallback<FreeMonMetricsResponse>( + this, + _network->sendMetricsAsync(req), + [this](const auto& resp) { + this->enqueue( + FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>::createNow( + resp)); + }, + [this](Status s) { + this->enqueue( + FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>::createNow(s)); + }); +} + +void FreeMonProcessor::doAsyncMetricsComplete( + Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>* msg) { + + auto& resp = msg->getPayload(); + + Status s = validateMetricsResponse(resp); + if (!s.isOK()) { + warning() << "Free Monitoring metrics uploading halted due to " << s; + + // Disable free monitoring on validation errors + _state.setState(StorageStateEnum::disabled); + writeState(client); + + // If validation fails, we do not retry + return; + } + + // If cloud said delete, not just halt, so erase state + if (resp.getPermanentlyDelete() == true) { + auto opCtxUnique = client->makeOperationContext(); + FreeMonStorage::deleteState(opCtxUnique.get()); + + return; + } + + // Update in-memory state of buffered metrics + // TODO: do we reset only the metrics we send or all pending on success? + + _metricsBuffer.reset(); + _metricsRetry.setMin(Seconds(resp.getReportingInterval())); + + if (resp.getId().is_initialized()) { + _state.setRegistrationId(resp.getId().get()); + } + + if (resp.getUserReminder().is_initialized()) { + _state.setUserReminder(resp.getUserReminder().get()); + } + + if (resp.getInformationalURL().is_initialized()) { + _state.setInformationalURL(resp.getInformationalURL().get()); + } + + if (resp.getMessage().is_initialized()) { + _state.setMessage(resp.getMessage().get()); + } + + // Persist state + writeState(client); + + // Reset retry counter + _metricsRetry.reset(); + + // Enqueue next metrics upload + enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend, + _registrationRetry.getNextDeadline(client))); +} + +void FreeMonProcessor::doAsyncMetricsFail( + Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>* msg) { + + if (!_metricsRetry.incrementError()) { + // We have exceeded our retry + warning() << "Free Monitoring is abandoning metrics upload after excess retries"; + return; + } + + LOG(1) << "Free Monitoring Metrics upload failed with status " << msg->getPayload() + << ", retrying in " << _metricsRetry.getNextDuration(); + + // Enqueue next metrics upload + enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend, + _metricsRetry.getNextDeadline(client))); +} } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_processor.h b/src/mongo/db/free_mon/free_mon_processor.h index ec09286addd..ade74fc09e6 100644 --- a/src/mongo/db/free_mon/free_mon_processor.h +++ b/src/mongo/db/free_mon/free_mon_processor.h @@ -29,6 +29,7 @@ #include <boost/optional.hpp> #include <cstdint> +#include <deque> #include <memory> #include <ratio> #include <string> @@ -143,6 +144,125 @@ private: }; /** + * Manage retries for metrics + */ +class MetricsRetryCounter : public RetryCounter { +public: + explicit MetricsRetryCounter(PseudoRandom& random) : _random(random) {} + + void reset() final; + + bool incrementError() final; + +private: + // Random number generator for jitter + PseudoRandom& _random; + + // Retry count for stage 1 retry + size_t _retryCount{0}; + + // Total Seconds we have retried for + Seconds _total; + + // Last retry interval without jitter + Seconds _base; + + // Max Duration + const Hours kDurationMax{7 * 24}; +}; + +/** + * Simple bounded buffer of metrics to upload. + */ +class MetricsBuffer { +public: + using container_type = std::deque<BSONObj>; + + /** + * Add a metric to the buffer. Oldest metric will be discarded if buffer is at capacity. + */ + void push(BSONObj obj) { + if (_queue.size() == kMaxElements) { + _queue.pop_front(); + } + + _queue.push_back(obj); + } + + /** + * Flush the buffer down to kMinElements entries. The last entries are held for cloud. + */ + void reset() { + while (_queue.size() > kMinElements) { + _queue.pop_front(); + } + } + + container_type::iterator begin() { + return _queue.begin(); + } + container_type::iterator end() { + return _queue.end(); + } + +private: + // Bounded queue of metrics + container_type _queue; + + const size_t kMinElements = 1; + const size_t kMaxElements = 10; +}; + +/** + * Countdown latch for test support in FreeMonProcessor so that a crank can be turned manually. + */ +class FreeMonCountdownLatch { +public: + explicit FreeMonCountdownLatch() : _count(0) {} + + /** + * Reset countdown latch wait for N events. + */ + void reset(uint32_t count) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + dassert(_count == 0); + dassert(count > 0); + _count = count; + } + + /** + * Count down an event. + */ + void countDown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (_count > 0) { + --_count; + _condvar.notify_one(); + } + } + + /** + * Wait until the N events specified in reset have occured. + */ + void wait() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condvar.wait(lock, [&] { return _count == 0; }); + } + +private: + // mutex to break count and cond var + stdx::mutex _mutex; + + // cond var to signal and wait on + stdx::condition_variable _condvar; + + // count of events to wait for + size_t _count; +}; + + +/** * Process in an Agent in a Agent/Message Passing model. * * Messages are given to it by enqueue, and the Processor processes messages with run(). @@ -151,14 +271,8 @@ class FreeMonProcessor : public std::enable_shared_from_this<FreeMonProcessor> { public: FreeMonProcessor(FreeMonCollectorCollection& registration, FreeMonCollectorCollection& metrics, - FreeMonNetworkInterface* network) - : _registration(registration), - _metrics(metrics), - _network(network), - _random(Date_t::now().asInt64()), - _registrationRetry(_random) { - _registrationRetry.reset(); - } + FreeMonNetworkInterface* network, + bool useCrankForTest); /** * Enqueue a message to process @@ -171,6 +285,11 @@ public: void stop(); /** + * Turn the crank of the message queue by ignoring deadlines for N messages. + */ + void turnCrankForTest(size_t countMessagesToIgnore); + + /** * Processes messages forever */ void run(); @@ -180,6 +299,11 @@ public: */ static Status validateRegistrationResponse(const FreeMonRegistrationResponse& resp); + /** + * Validate the metrics response. Public for unit testing. + */ + static Status validateMetricsResponse(const FreeMonMetricsResponse& resp); + private: /** * Read the state from the database. @@ -203,9 +327,11 @@ private: const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg); /** - * Process unregistration. + * Process unregistration from a command. */ - void doUnregister(Client* client); + void doCommandUnregister( + Client* client, + FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>* msg); /** * Process a successful HTTP request. @@ -226,6 +352,29 @@ private: */ void notifyPendingRegisters(const Status s); + /** + * Upload collected metrics. + */ + void doMetricsCollect(Client* client); + + /** + * Upload gathered metrics. + */ + void doMetricsSend(Client* client); + + /** + * Process a successful HTTP request. + */ + void doAsyncMetricsComplete( + Client* client, + const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>* msg); + + /** + * Process an unsuccessful HTTP request. + */ + void doAsyncMetricsFail( + Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>* msg); + private: // Collection of collectors to send on registration FreeMonCollectorCollection& _registration; @@ -242,6 +391,15 @@ private: // Registration Retry logic RegistrationRetryCounter _registrationRetry; + // Metrics Retry logic + MetricsRetryCounter _metricsRetry; + + // Interval for gathering metrics + Seconds _metricsGatherInterval; + + // Buffer of metrics to upload + MetricsBuffer _metricsBuffer; + // List of tags from server configuration registration std::vector<std::string> _tags; @@ -257,6 +415,9 @@ private: // Pending update to disk FreeMonStorageState _state; + // Countdown launch to support manual cranking + FreeMonCountdownLatch _countdown; + // Message queue FreeMonMessageQueue _queue; }; diff --git a/src/mongo/db/free_mon/free_mon_queue.cpp b/src/mongo/db/free_mon/free_mon_queue.cpp index c8a0b74aa87..d1246b677a5 100644 --- a/src/mongo/db/free_mon/free_mon_queue.cpp +++ b/src/mongo/db/free_mon/free_mon_queue.cpp @@ -62,35 +62,79 @@ boost::optional<std::shared_ptr<FreeMonMessage>> FreeMonMessageQueue::dequeue( return {}; } - Date_t deadlineCV = Date_t::max(); - if (!_queue.empty()) { - deadlineCV = _queue.top()->getDeadline(); - } else { - deadlineCV = clockSource->now() + Hours(24); - } + while (true) { + Date_t deadlineCV = Date_t::max(); + if (_useCrank) { + if (!_queue.empty() && _countMessagesIgnored < _countMessagesToIgnore) { + // For testing purposes, ignore the deadline + deadlineCV = Date_t(); + } else { + deadlineCV = clockSource->now() + Hours(1); + } + } else { + if (!_queue.empty()) { + deadlineCV = _queue.top()->getDeadline(); + } else { + deadlineCV = clockSource->now() + Hours(24); + } + } + + _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() { + if (_stop) { + return true; + } + + if (this->_queue.empty()) { + return false; + } + + // Always wake in test mode + if (_useCrank) { + if (_countMessagesIgnored < _countMessagesToIgnore) { + return true; + } else { + dassert(_countMessagesIgnored == _countMessagesToIgnore); + return false; + } + } + + auto deadlineMessage = this->_queue.top()->getDeadline(); + if (deadlineMessage == Date_t::min()) { + return true; + } + + auto now = clockSource->now(); + + bool check = deadlineMessage < now; + return check; + }); - _condvar.wait_until(lock, deadlineCV.toSystemTimePoint(), [this, clockSource]() { if (_stop) { - return true; + return {}; } - if (this->_queue.empty()) { - return false; - } + // We were woken-up by a message being enqueue, go back to sleep and wait until crank is + // installed and turned. + if (_useCrank) { + if (_countMessagesIgnored == _countMessagesToIgnore) { + continue; + } - auto deadlineMessage = this->_queue.top()->getDeadline(); - if (deadlineMessage == Date_t::min()) { - return true; + dassert(_countMessagesIgnored <= _countMessagesToIgnore); } - auto now = clockSource->now(); - - bool check = deadlineMessage < now; - return check; - }); + // If the queue is not empty, return the message + // otherwise we need to go back to sleep in the hope we get a message. + if (!_queue.empty()) { + break; + } else if (_useCrank) { + dassert(0, "Was asked to wait for more messages then available"); + } + } - if (_stop || _queue.empty()) { - return {}; + _countMessagesIgnored++; + if (_useCrank && _countMessagesIgnored == _countMessagesToIgnore && _waitable) { + _waitable->set(Status::OK()); } auto item = _queue.top(); @@ -113,4 +157,20 @@ void FreeMonMessageQueue::stop() { } } +void FreeMonMessageQueue::turnCrankForTest(size_t countMessagesToIgnore) { + invariant(_useCrank); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + _waitable = std::make_unique<WaitableResult>(); + + _countMessagesIgnored = 0; + _countMessagesToIgnore = countMessagesToIgnore; + + _condvar.notify_one(); + } + + //_waitable->wait_for(Seconds(10)); +} } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_queue.h b/src/mongo/db/free_mon/free_mon_queue.h index 687e61c13bd..d91aeb9e157 100644 --- a/src/mongo/db/free_mon/free_mon_queue.h +++ b/src/mongo/db/free_mon/free_mon_queue.h @@ -58,6 +58,8 @@ struct FreeMonMessageGreater { */ class FreeMonMessageQueue { public: + FreeMonMessageQueue(bool useCrankForTest = false) : _useCrank(useCrankForTest) {} + /** * Enqueue a message and wake consumer if needed. * @@ -77,6 +79,11 @@ public: */ void stop(); + /** + * Turn the crank of the message queue by ignoring deadlines for N messages. + */ + void turnCrankForTest(size_t countMessagesToIgnore); + private: // Condition variable to signal consumer stdx::condition_variable _condvar; @@ -93,6 +100,18 @@ private: std::vector<std::shared_ptr<FreeMonMessage>>, FreeMonMessageGreater> _queue; + + // Use manual crank to process messages in-order instead of based on deadlines. + bool _useCrank{false}; + + // Number of messages to ignore + size_t _countMessagesToIgnore{0}; + + // Number of messages that have been ignored + size_t _countMessagesIgnored{0}; + + // Waitable result for testing + std::unique_ptr<WaitableResult> _waitable; }; diff --git a/src/mongo/db/free_mon/free_mon_storage.idl b/src/mongo/db/free_mon/free_mon_storage.idl index a181d39f23a..2bd4f15185a 100644 --- a/src/mongo/db/free_mon/free_mon_storage.idl +++ b/src/mongo/db/free_mon/free_mon_storage.idl @@ -25,6 +25,7 @@ enums: values: disabled: disabled enabled: enabled + pending: pending structs: FreeMonStorageState: diff --git a/src/mongo/db/free_mon/http_client_curl.cpp b/src/mongo/db/free_mon/http_client_curl.cpp index ad78618b9bd..1604c1c4d0f 100644 --- a/src/mongo/db/free_mon/http_client_curl.cpp +++ b/src/mongo/db/free_mon/http_client_curl.cpp @@ -140,75 +140,81 @@ private: static void doPost(SharedPromise<std::vector<uint8_t>> shared_promise, const std::string& urlString, const BSONObj& obj) { - ConstDataRange data(obj.objdata(), obj.objdata() + obj.objsize()); + try { + ConstDataRange data(obj.objdata(), obj.objdata() + obj.objsize()); - ConstDataRangeCursor cdrc(data); + ConstDataRangeCursor cdrc(data); - std::unique_ptr<CURL, void (*)(CURL*)> myHandle(curl_easy_init(), curl_easy_cleanup); + std::unique_ptr<CURL, void (*)(CURL*)> myHandle(curl_easy_init(), curl_easy_cleanup); - if (!myHandle) { - shared_promise.setError({ErrorCodes::InternalError, "Curl initialization failed"}); - return; - } + if (!myHandle) { + shared_promise.setError({ErrorCodes::InternalError, "Curl initialization failed"}); + return; + } - curl_easy_setopt(myHandle.get(), CURLOPT_URL, urlString.c_str()); - curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1); + curl_easy_setopt(myHandle.get(), CURLOPT_URL, urlString.c_str()); + curl_easy_setopt(myHandle.get(), CURLOPT_POST, 1); - // Allow http only if test commands are enabled - if (getTestCommandsEnabled()) { - curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); - } else { - curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); - } + // Allow http only if test commands are enabled + if (getTestCommandsEnabled()) { + curl_easy_setopt( + myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); + } else { + curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS); + } - curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); - curl_easy_setopt(myHandle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + curl_easy_setopt(myHandle.get(), CURLOPT_PROTOCOLS, CURLPROTO_HTTPS | CURLPROTO_HTTP); + curl_easy_setopt(myHandle.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); - DataBuilder dataBuilder(4096); + DataBuilder dataBuilder(4096); - curl_easy_setopt(myHandle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); - curl_easy_setopt(myHandle.get(), CURLOPT_WRITEDATA, &dataBuilder); + curl_easy_setopt(myHandle.get(), CURLOPT_WRITEFUNCTION, WriteMemoryCallback); + curl_easy_setopt(myHandle.get(), CURLOPT_WRITEDATA, &dataBuilder); - curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback); - curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc); - curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); + curl_easy_setopt(myHandle.get(), CURLOPT_READFUNCTION, ReadMemoryCallback); + curl_easy_setopt(myHandle.get(), CURLOPT_READDATA, &cdrc); + curl_easy_setopt(myHandle.get(), CURLOPT_POSTFIELDSIZE, (long)cdrc.length()); - // CURLOPT_EXPECT_100_TIMEOUT_MS?? - curl_easy_setopt(myHandle.get(), CURLOPT_CONNECTTIMEOUT, kConnectionTimeoutSeconds); - curl_easy_setopt(myHandle.get(), CURLOPT_TIMEOUT, kTotalRequestTimeoutSeconds); + // CURLOPT_EXPECT_100_TIMEOUT_MS?? + curl_easy_setopt(myHandle.get(), CURLOPT_CONNECTTIMEOUT, kConnectionTimeoutSeconds); + curl_easy_setopt(myHandle.get(), CURLOPT_TIMEOUT, kTotalRequestTimeoutSeconds); #if LIBCURL_VERSION_NUM > 0x072200 - // Requires >= 7.34.0 - curl_easy_setopt(myHandle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); + // Requires >= 7.34.0 + curl_easy_setopt(myHandle.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); #endif - curl_easy_setopt(myHandle.get(), CURLOPT_FOLLOWLOCATION, 0); + curl_easy_setopt(myHandle.get(), CURLOPT_FOLLOWLOCATION, 0); - curl_easy_setopt(myHandle.get(), CURLOPT_NOSIGNAL, 1); - // TODO: consider making this configurable If server log level > 3 - // curl_easy_setopt(myHandle.get(), CURLOPT_VERBOSE, 1); - // curl_easy_setopt(myHandle.get(), CURLOPT_DEBUGFUNCTION , ???); + curl_easy_setopt(myHandle.get(), CURLOPT_NOSIGNAL, 1); + // TODO: consider making this configurable If server log level > 3 + // curl_easy_setopt(myHandle.get(), CURLOPT_VERBOSE, 1); + // curl_easy_setopt(myHandle.get(), CURLOPT_DEBUGFUNCTION , ???); - curl_slist* chunk = nullptr; - chunk = curl_slist_append(chunk, "Content-Type: application/octet-stream"); - chunk = curl_slist_append(chunk, "Accept: application/octet-stream"); + curl_slist* chunk = nullptr; + chunk = curl_slist_append(chunk, "Content-Type: application/octet-stream"); + chunk = curl_slist_append(chunk, "Accept: application/octet-stream"); - // Send the empty expect because we do not need the server to respond with 100-Contine - chunk = curl_slist_append(chunk, "Expect:"); + // Send the empty expect because we do not need the server to respond with 100-Contine + chunk = curl_slist_append(chunk, "Expect:"); - std::unique_ptr<curl_slist, void (*)(curl_slist*)> chunkHolder(chunk, curl_slist_free_all); + std::unique_ptr<curl_slist, void (*)(curl_slist*)> chunkHolder(chunk, + curl_slist_free_all); - curl_easy_setopt(myHandle.get(), CURLOPT_HTTPHEADER, chunk); + curl_easy_setopt(myHandle.get(), CURLOPT_HTTPHEADER, chunk); - CURLcode result = curl_easy_perform(myHandle.get()); - if (result != CURLE_OK) { - shared_promise.setError({ErrorCodes::OperationFailed, - str::stream() << "Bad HTTP response from API server: " - << curl_easy_strerror(result)}); - return; - } + CURLcode result = curl_easy_perform(myHandle.get()); + if (result != CURLE_OK) { + shared_promise.setError({ErrorCodes::OperationFailed, + str::stream() << "Bad HTTP response from API server: " + << curl_easy_strerror(result)}); + return; + } - auto d = dataBuilder.getCursor(); - shared_promise.emplaceValue(std::vector<uint8_t>(d.data(), d.data() + d.length())); + auto d = dataBuilder.getCursor(); + shared_promise.emplaceValue(std::vector<uint8_t>(d.data(), d.data() + d.length())); + } catch (...) { + shared_promise.setError(exceptionToStatus()); + } } private: |