summaryrefslogtreecommitdiff
path: root/src/mongo/db/free_mon/free_mon_processor.cpp
blob: 4498617172b36ff0eac15e8e79c87aa57b134a80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl

#include "mongo/platform/basic.h"

#include "mongo/db/free_mon/free_mon_processor.h"

#include <functional>
#include <numeric>
#include <snappy.h>
#include <tuple>
#include <utility>

#include "mongo/base/data_range.h"
#include "mongo/base/status.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/free_mon/free_mon_storage.h"
#include "mongo/db/service_context.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"

namespace mongo {

namespace {

constexpr auto kMinProtocolVersion = 1;
constexpr auto kMaxProtocolVersion = 2;
constexpr auto kStorageVersion = 1;

constexpr auto kRegistrationIdMaxLength = 4096;
constexpr auto kInformationalURLMaxLength = 4096;
constexpr auto kInformationalMessageMaxLength = 4096;
constexpr auto kUserReminderMaxLength = 4096;

constexpr auto kReportingIntervalSecondsMin = 1;
constexpr auto kReportingIntervalSecondsMax = 30 * 60 * 60 * 24;

constexpr auto kMetricsRequestArrayElement = "data"_sd;

int64_t randomJitter(PseudoRandom& random, int64_t min, int64_t max) {
    dassert(max > min);
    return (std::abs(random.nextInt64()) % (max - min)) + min;
}

}  // namespace

void RegistrationRetryCounter::reset() {
    _current = _min;
    _base = _min;
    _retryCount = 0;
    _total = Hours(0);
}

bool RegistrationRetryCounter::incrementError() {
    if (_retryCount < kStage1RetryCountMax) {
        _base = 2 * _base;
        _current = _base + Seconds(randomJitter(_random, kStage1JitterMin, kStage1JitterMax));
        ++_retryCount;
    } else {
        _current = _base + Seconds(randomJitter(_random, kStage2JitterMin, kStage2JitterMax));
    }

    _total += _current;

    if (_total > kStage2DurationMax) {
        return false;
    }

    return true;
}

void MetricsRetryCounter::reset() {
    _current = _min;
    _base = _min;
    _retryCount = 0;
    _total = Hours(0);
}

bool MetricsRetryCounter::incrementError() {
    _base = static_cast<int>(pow(2, std::min(6, static_cast<int>(_retryCount)))) * _min;
    _current = _base + Seconds(randomJitter(_random, _min.count() / 2, _min.count()));
    ++_retryCount;

    _total += _current;

    if (_total > kDurationMax) {
        return false;
    }

    return true;
}

FreeMonProcessor::FreeMonProcessor(FreeMonCollectorCollection& registration,
                                   FreeMonCollectorCollection& metrics,
                                   FreeMonNetworkInterface* network,
                                   bool useCrankForTest,
                                   Seconds metricsGatherInterval)
    : _registration(registration),
      _metrics(metrics),
      _network(network),
      _random(Date_t::now().asInt64()),
      _registrationRetry(RegistrationRetryCounter(_random)),
      _metricsRetry(MetricsRetryCounter(_random)),
      _metricsGatherInterval(metricsGatherInterval),
      _queue(useCrankForTest) {
    _registrationRetry->reset();
    _metricsRetry->reset();
}

void FreeMonProcessor::enqueue(std::shared_ptr<FreeMonMessage> msg) {
    _queue.enqueue(std::move(msg));
}

void FreeMonProcessor::stop() {
    _queue.stop();
}

void FreeMonProcessor::turnCrankForTest(size_t countMessagesToIgnore) {
    _countdown.reset(countMessagesToIgnore);

    _queue.turnCrankForTest(countMessagesToIgnore);

    _countdown.wait();
}

void FreeMonProcessor::run() {
    try {

        Client::initThread("FreeMonProcessor");
        Client* client = &cc();

        while (true) {
            auto item = _queue.dequeue(client->getServiceContext()->getPreciseClockSource());
            if (!item.is_initialized()) {
                // Shutdown was triggered
                return;
            }

            auto msg = item.get();

            // Do work here
            switch (msg->getType()) {
                case FreeMonMessageType::RegisterCommand: {
                    doCommandRegister(client, msg);
                    break;
                }
                case FreeMonMessageType::RegisterServer: {
                    doServerRegister(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::UnregisterCommand: {
                    doCommandUnregister(client,
                                        checked_cast<FreeMonWaitableMessageWithPayload<
                                            FreeMonMessageType::UnregisterCommand>*>(msg.get()));
                    break;
                }
                case FreeMonMessageType::AsyncRegisterComplete: {
                    doAsyncRegisterComplete(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::AsyncRegisterFail: {
                    doAsyncRegisterFail(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::MetricsCollect: {
                    doMetricsCollect(client);
                    break;
                }
                case FreeMonMessageType::MetricsSend: {
                    doMetricsSend(client);
                    break;
                }
                case FreeMonMessageType::AsyncMetricsComplete: {
                    doAsyncMetricsComplete(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::AsyncMetricsFail: {
                    doAsyncMetricsFail(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::OnTransitionToPrimary: {
                    doOnTransitionToPrimary(client);
                    break;
                }
                case FreeMonMessageType::NotifyOnUpsert: {
                    doNotifyOnUpsert(
                        client,
                        checked_cast<
                            FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>*>(
                            msg.get()));
                    break;
                }
                case FreeMonMessageType::NotifyOnDelete: {
                    doNotifyOnDelete(client);
                    break;
                }
                case FreeMonMessageType::NotifyOnRollback: {
                    doNotifyOnRollback(client);
                    break;
                }
                default:
                    MONGO_UNREACHABLE;
            }

            // Record that we have finished processing the message for testing purposes.
            _countdown.countDown();
        }
    } catch (...) {
        // Stop the queue
        _queue.stop();

        LOGV2_WARNING(20619,
                      "Uncaught exception in '{error}' in free monitoring subsystem. "
                      "Shutting down the free monitoring subsystem.",
                      "Uncaught exception in free monitoring subsystem. "
                      "Shutting down the free monitoring subsystem.",
                      "error"_attr = exceptionToStatus());
    }
}

void FreeMonProcessor::readState(OperationContext* opCtx, bool updateInMemory) {
    auto state = FreeMonStorage::read(opCtx);

    _lastReadState = state;

    if (state.is_initialized()) {
        invariant(state.get().getVersion() == kStorageVersion);

        if (updateInMemory) {
            _state = state.get();
        }
    } else if (!state.is_initialized()) {
        // Default the state
        auto state = _state.synchronize();
        state->setVersion(kStorageVersion);
        state->setState(StorageStateEnum::disabled);
        state->setRegistrationId("");
        state->setInformationalURL("");
        state->setMessage("");
        state->setUserReminder("");
    }
}

void FreeMonProcessor::readState(Client* client, bool updateInMemory) {
    auto opCtx = client->makeOperationContext();
    readState(opCtx.get(), updateInMemory);
}

void FreeMonProcessor::writeState(Client* client) {

    // Do a compare and swap
    // Verify the document is the same as the one on disk, if it is the same, then do the update
    // If the local document is different, then oh-well we do nothing, and wait until the next round

    // Has our in-memory state changed, if so consider writing
    if (_lastReadState != _state.get()) {

        // The read and write are bound the same operation context
        {
            auto optCtx = client->makeOperationContext();

            auto state = FreeMonStorage::read(optCtx.get());

            // If our in-memory copy matches the last read, then write it to disk
            if (state == _lastReadState) {
                FreeMonStorage::replace(optCtx.get(), _state.get());

                _lastReadState = boost::make_optional(_state.get());
            }
        }
    }
}

void FreeMonProcessor::doServerRegister(
    Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::RegisterServer>* msg) {

    // Enqueue the first metrics gather first so we have something to send on intial registration
    enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsCollect));

    // If we are asked to register now, then kick off a registration request
    const auto regType = msg->getPayload().first;
    if (regType == RegistrationType::RegisterOnStart) {
        enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
    } else {
        invariant((regType == RegistrationType::RegisterAfterOnTransitionToPrimary) ||
                  (regType == RegistrationType::RegisterAfterOnTransitionToPrimaryIfEnabled));
        // Check if we need to wait to become primary:
        // If the 'admin.system.version' has content, do not wait and just re-register
        // If the collection is empty, wait until we become primary
        //    If we become secondary, OpObserver hooks will tell us our registration id

        auto optCtx = client->makeOperationContext();

        // Check if there is an existing document
        auto state = FreeMonStorage::read(optCtx.get());

        // If there is no document, we may be:
        // 1. in a replica set and may need to register after becoming primary since we cannot
        // record the registration id until after becoming primary
        // 2. a standalone which has never been registered
        //
        if (!state.is_initialized()) {
            _registerOnTransitionToPrimary = regType;
        } else {
            // We are standalone or secondary, if we have a registration id, then send a
            // registration notification, else wait for the user to register us.
            if (state.get().getState() == StorageStateEnum::enabled) {
                enqueue(FreeMonRegisterCommandMessage::createNow(msg->getPayload().second));
            }
        }

        // Ensure we read the state once.
        // This is important on a disabled secondary so that the in-memory state knows we are
        // disabled.
        readState(optCtx.get());
    }
}

namespace {
template <typename T>
std::unique_ptr<Future<void>> doAsyncCallback(FreeMonProcessor* proc,
                                              Future<T> future,
                                              std::function<void(const T&)> onSuccess,
                                              std::function<void(Status)> onErrorFunc) {

    // Grab a weak_ptr to be sure that FreeMonProcessor is alive during the callback
    std::weak_ptr<FreeMonProcessor> wpProc(proc->shared_from_this());

    auto spError = std::make_shared<bool>(false);

    return std::make_unique<Future<void>>(std::move(future)
                                              .onError([=](Status s) {
                                                  *(spError.get()) = true;
                                                  if (auto spProc = wpProc.lock()) {
                                                      onErrorFunc(s);
                                                  }

                                                  return T();
                                              })
                                              .then([=](const auto& resp) {
                                                  // If we hit an error, then do not call onSuccess
                                                  if (*(spError.get()) == true) {
                                                      return;
                                                  }

                                                  // Use a shared pointer here because the callback
                                                  // could return after we disappear
                                                  if (auto spProc = wpProc.lock()) {
                                                      onSuccess(resp);
                                                  }
                                              }));
}
}  // namespace

void FreeMonProcessor::doCommandRegister(Client* client,
                                         std::shared_ptr<FreeMonMessage> sharedMsg) {
    auto msg = checked_cast<FreeMonRegisterCommandMessage*>(sharedMsg.get());

    if (_futureRegistrationResponse) {
        msg->setStatus(Status(ErrorCodes::FreeMonHttpInFlight,
                              "Free Monitoring Registration request in-flight already"));
        return;
    }

    _pendingRegisters.push_back(sharedMsg);

    readState(client);

    FreeMonRegistrationRequest req;

    auto regid = _state->getRegistrationId();
    if (!regid.empty()) {
        req.setId(regid);
    }

    req.setVersion(kMaxProtocolVersion);

    req.setLocalTime(client->getServiceContext()->getPreciseClockSource()->now());

    if (!msg->getPayload().empty()) {
        // Cache the tags for subsequent retries
        _tags = msg->getPayload();
    }

    if (!_tags.empty()) {
        req.setTags(transformVector(msg->getPayload()));
    }

    // Collect the data
    auto collect = _registration.collect(client);

    req.setPayload(std::get<0>(collect));

    // Record that the registration is pending
    _state->setState(StorageStateEnum::pending);
    _registrationStatus = FreeMonRegistrationStatus::kPending;

    writeState(client);

    // Send the async request
    _futureRegistrationResponse = doAsyncCallback<FreeMonRegistrationResponse>(
        this,
        _network->sendRegistrationAsync(req),
        [this](const auto& resp) {
            this->enqueue(
                FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>::createNow(
                    resp));
        },
        [this](Status s) {
            this->enqueue(
                FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>::createNow(s));
        });
}

Status FreeMonProcessor::validateRegistrationResponse(const FreeMonRegistrationResponse& resp) {
    // Any validation failure stops registration from proceeding to upload
    if (!(resp.getVersion() >= kMinProtocolVersion && resp.getVersion() <= kMaxProtocolVersion)) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream()
                          << "Unexpected registration response protocol version, expected ("
                          << kMinProtocolVersion << ", " << kMaxProtocolVersion << "), received '"
                          << resp.getVersion() << "'");
    }

    if (resp.getId().size() >= kRegistrationIdMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Id is '" << resp.getId().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kRegistrationIdMaxLength << "'");
    }

    if (resp.getInformationalURL().size() >= kInformationalURLMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "InformationURL is '" << resp.getInformationalURL().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kInformationalURLMaxLength << "'");
    }

    if (resp.getMessage().size() >= kInformationalMessageMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Message is '" << resp.getMessage().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kInformationalMessageMaxLength << "'");
    }

    if (resp.getUserReminder().is_initialized() &&
        resp.getUserReminder().get().size() >= kUserReminderMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "UserReminder is '" << resp.getUserReminder().get().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kUserReminderMaxLength << "'");
    }

    if (resp.getReportingInterval() < kReportingIntervalSecondsMin ||
        resp.getReportingInterval() > kReportingIntervalSecondsMax) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Reporting Interval '" << resp.getReportingInterval()
                                    << "' must be in the range [" << kReportingIntervalSecondsMin
                                    << "," << kReportingIntervalSecondsMax << "]");
    }

    // Did cloud ask us to stop uploading?
    if (resp.getHaltMetricsUploading()) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Halting metrics upload due to response");
    }

    return Status::OK();
}


void FreeMonProcessor::notifyPendingRegisters(const Status s) {
    for (auto&& pendingRegister : _pendingRegisters) {
        (checked_cast<FreeMonRegisterCommandMessage*>(pendingRegister.get()))->setStatus(s);
    }
    _pendingRegisters.clear();
}


Status FreeMonProcessor::validateMetricsResponse(const FreeMonMetricsResponse& resp) {
    // Any validation failure stops registration from proceeding to upload
    if (!(resp.getVersion() >= kMinProtocolVersion && resp.getVersion() <= kMaxProtocolVersion)) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Unexpected metrics response protocol version, expected ("
                                    << kMinProtocolVersion << ", " << kMaxProtocolVersion
                                    << "), received '" << resp.getVersion() << "'");
    }

    if (resp.getId().is_initialized() && resp.getId().get().size() >= kRegistrationIdMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Id is '" << resp.getId().get().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kRegistrationIdMaxLength << "'");
    }

    if (resp.getInformationalURL().is_initialized() &&
        resp.getInformationalURL().get().size() >= kInformationalURLMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream()
                          << "InformationURL is '" << resp.getInformationalURL().get().size()
                          << "' bytes in length, maximum allowed length is '"
                          << kInformationalURLMaxLength << "'");
    }

    if (resp.getMessage().is_initialized() &&
        resp.getMessage().get().size() >= kInformationalMessageMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Message is '" << resp.getMessage().get().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kInformationalMessageMaxLength << "'");
    }

    if (resp.getUserReminder().is_initialized() &&
        resp.getUserReminder().get().size() >= kUserReminderMaxLength) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "UserReminder is '" << resp.getUserReminder().get().size()
                                    << "' bytes in length, maximum allowed length is '"
                                    << kUserReminderMaxLength << "'");
    }

    if (resp.getReportingInterval() < kReportingIntervalSecondsMin ||
        resp.getReportingInterval() > kReportingIntervalSecondsMax) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Reporting Interval '" << resp.getReportingInterval()
                                    << "' must be in the range [" << kReportingIntervalSecondsMin
                                    << "," << kReportingIntervalSecondsMax << "]");
    }

    // Did cloud ask us to stop uploading?
    if (resp.getHaltMetricsUploading()) {
        return Status(ErrorCodes::FreeMonHttpPermanentFailure,
                      str::stream() << "Halting metrics upload due to response");
    }

    return Status::OK();
}


void FreeMonProcessor::doAsyncRegisterComplete(
    Client* client,
    const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterComplete>* msg) {

    // Our request is no longer in-progress so delete it
    _futureRegistrationResponse.reset();

    if (_registrationStatus != FreeMonRegistrationStatus::kPending) {
        notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled"));

        return;
    }

    auto& resp = msg->getPayload();

    Status s = validateRegistrationResponse(resp);
    if (!s.isOK()) {
        LOGV2_WARNING(
            20620, "Free Monitoring registration halted due to {status}", "status"_attr = s);

        // Disable on any error
        _state->setState(StorageStateEnum::disabled);
        _registrationStatus = FreeMonRegistrationStatus::kDisabled;

        // Persist state
        writeState(client);

        notifyPendingRegisters(s);

        // If validation fails, we do not retry
        return;
    }

    // Update in-memory state
    _registrationRetry->setMin(Seconds(resp.getReportingInterval()));
    _metricsGatherInterval = Seconds(resp.getReportingInterval());

    {
        auto state = _state.synchronize();
        state->setRegistrationId(resp.getId());

        if (resp.getUserReminder().is_initialized()) {
            state->setUserReminder(resp.getUserReminder().get());
        } else {
            state->setUserReminder("");
        }

        state->setMessage(resp.getMessage());
        state->setInformationalURL(resp.getInformationalURL());

        state->setState(StorageStateEnum::enabled);
    }

    _registrationStatus = FreeMonRegistrationStatus::kEnabled;

    // Persist state
    writeState(client);

    // Reset retry counter
    _registrationRetry->reset();

    // Notify waiters
    notifyPendingRegisters(Status::OK());

    LOGV2(20615,
          "Free Monitoring is Enabled. Frequency: {interval} seconds",
          "Free Moniforing is Enabled",
          "interval"_attr = resp.getReportingInterval());

    // Enqueue next metrics upload immediately to deliver a good experience
    enqueue(FreeMonMessage::createNow(FreeMonMessageType::MetricsSend));
}

void FreeMonProcessor::doAsyncRegisterFail(
    Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncRegisterFail>* msg) {

    // Our request is no longer in-progress so delete it
    _futureRegistrationResponse.reset();

    if (_registrationStatus != FreeMonRegistrationStatus::kPending) {
        notifyPendingRegisters(Status(ErrorCodes::BadValue, "Registration was canceled"));

        return;
    }

    if (!_registrationRetry->incrementError()) {
        // We have exceeded our retry
        LOGV2_WARNING(20621, "Free Monitoring is abandoning registration after excess retries");
        return;
    }

    LOGV2_DEBUG(
        20616,
        1,
        "Free Monitoring Registration Failed with status '{status}', retrying in {interval}",
        "Free Monitoring Registration Failed",
        "status"_attr = msg->getPayload(),
        "interval"_attr = _registrationRetry->getNextDuration());

    // Enqueue a register retry
    enqueue(FreeMonRegisterCommandMessage::createWithDeadline(
        _tags, _registrationRetry->getNextDeadline(client)));
}

void FreeMonProcessor::doCommandUnregister(
    Client* client, FreeMonWaitableMessageWithPayload<FreeMonMessageType::UnregisterCommand>* msg) {
    // Treat this request as idempotent
    readState(client);

    _state->setState(StorageStateEnum::disabled);
    _registrationStatus = FreeMonRegistrationStatus::kDisabled;

    writeState(client);

    LOGV2(20617, "Free Monitoring is Disabled");

    msg->setStatus(Status::OK());
}

void FreeMonProcessor::doMetricsCollect(Client* client) {
    // Collect the time at the beginning so the time to collect does not affect the schedule
    Date_t now = client->getServiceContext()->getPreciseClockSource()->now();

    // Collect the data
    auto collect = _metrics.collect(client);

    _metricsBuffer.push(std::get<0>(collect));

    // Enqueue the next metrics collect based on when we started processing the last collection.
    enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsCollect,
                                               now + _metricsGatherInterval));
}

std::string compressMetrics(MetricsBuffer& buffer) {
    BSONObjBuilder builder;

    {
        BSONArrayBuilder arrayBuilder(builder.subarrayStart(kMetricsRequestArrayElement));

        for (const auto& obj : buffer) {
            arrayBuilder.append(obj);
        }
    }

    BSONObj obj = builder.done();

    std::string outBuffer;
    snappy::Compress(obj.objdata(), obj.objsize(), &outBuffer);

    return outBuffer;
}

void FreeMonProcessor::doMetricsSend(Client* client) {
    // We want to read state from disk in case we asked to stop but otherwise
    // use the in-memory state. It is important not to treat disk state as authoritative
    // on secondaries.
    readState(client, false);

    // Only continue metrics send if the local disk state (in-case user deleted local document)
    // and in-memory status both say to continue.
    if (_registrationStatus != FreeMonRegistrationStatus::kEnabled ||
        _state->getState() != StorageStateEnum::enabled) {
        // If we are recently disabled, then stop sending metrics
        return;
    }

    // Build outbound request
    FreeMonMetricsRequest req;

    req.setVersion(kMaxProtocolVersion);
    req.setLocalTime(client->getServiceContext()->getPreciseClockSource()->now());
    req.setEncoding(MetricsEncodingEnum::snappy);

    req.setId(_state->getRegistrationId());

    // Get the buffered metrics
    auto metrics = compressMetrics(_metricsBuffer);
    req.setMetrics(ConstDataRange(metrics.data(), metrics.size()));

    _lastMetricsSend = Date_t::now();

    // Send the async request
    doAsyncCallback<FreeMonMetricsResponse>(
        this,
        _network->sendMetricsAsync(req),
        [this](const auto& resp) {
            this->enqueue(
                FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>::createNow(
                    resp));
        },
        [this](Status s) {
            this->enqueue(
                FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>::createNow(s));
        });
}

void FreeMonProcessor::doAsyncMetricsComplete(
    Client* client,
    const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsComplete>* msg) {

    auto& resp = msg->getPayload();

    Status s = validateMetricsResponse(resp);
    if (!s.isOK()) {
        LOGV2_WARNING(
            20622, "Free Monitoring metrics uploading halted due to {status}", "status"_attr = s);

        // Disable free monitoring on validation errors
        _state->setState(StorageStateEnum::disabled);
        _registrationStatus = FreeMonRegistrationStatus::kDisabled;

        writeState(client);

        // If validation fails, we do not retry
        return;
    }

    // If cloud said delete, not just halt, so erase state
    if (resp.getPermanentlyDelete() == true) {
        auto opCtxUnique = client->makeOperationContext();
        FreeMonStorage::deleteState(opCtxUnique.get());

        _state->setState(StorageStateEnum::pending);
        _registrationStatus = FreeMonRegistrationStatus::kDisabled;

        // Clear out the in-memory state
        _lastReadState = boost::none;

        return;
    }

    // Update in-memory state of buffered metrics
    // TODO: do we reset only the metrics we send or all pending on success?

    _metricsBuffer.reset();

    {
        auto state = _state.synchronize();

        if (resp.getId().is_initialized()) {
            state->setRegistrationId(resp.getId().get());
        }

        if (resp.getUserReminder().is_initialized()) {
            state->setUserReminder(resp.getUserReminder().get());
        }

        if (resp.getInformationalURL().is_initialized()) {
            state->setInformationalURL(resp.getInformationalURL().get());
        }

        if (resp.getMessage().is_initialized()) {
            state->setMessage(resp.getMessage().get());
        }
    }

    // Persist state
    writeState(client);

    // Reset retry counter
    _metricsGatherInterval = Seconds(resp.getReportingInterval());
    _metricsRetry->setMin(Seconds(resp.getReportingInterval()));
    _metricsRetry->reset();

    if (resp.getResendRegistration().is_initialized() && resp.getResendRegistration()) {
        enqueue(FreeMonRegisterCommandMessage::createNow(_tags));
    } else {
        // Enqueue next metrics upload
        enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend,
                                                   _metricsRetry->getNextDeadline(client)));
    }
}

void FreeMonProcessor::doAsyncMetricsFail(
    Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::AsyncMetricsFail>* msg) {

    if (!_metricsRetry->incrementError()) {
        // We have exceeded our retry
        LOGV2_WARNING(20623, "Free Monitoring is abandoning metrics upload after excess retries");
        return;
    }

    LOGV2_DEBUG(
        20618,
        1,
        "Free Monitoring Metrics upload failed with status {status}, retrying in {interval}",
        "Free Monitoring Metrics upload failed",
        "status"_attr = msg->getPayload(),
        "interval"_attr = _metricsRetry->getNextDuration());

    // Enqueue next metrics upload
    enqueue(FreeMonMessage::createWithDeadline(FreeMonMessageType::MetricsSend,
                                               _metricsRetry->getNextDeadline(client)));
}

void FreeMonProcessor::getStatus(OperationContext* opCtx,
                                 BSONObjBuilder* status,
                                 FreeMonGetStatusEnum mode) {
    if (!_lastReadState.get()) {
        // _state gets initialized by readState() regardless,
        // use _lastReadState to differential "undecided" from default.
        status->append("state", "undecided");
        return;
    }

    if (mode == FreeMonGetStatusEnum::kServerStatus) {
        status->append("state", StorageState_serializer(_state->getState()));
        status->append("retryIntervalSecs",
                       durationCount<Seconds>(_metricsRetry->getNextDuration()));
        auto lastMetricsSend = _lastMetricsSend.get();
        if (lastMetricsSend) {
            status->append("lastRunTime", lastMetricsSend->toString());
        }
        status->append("registerErrors", static_cast<long long>(_registrationRetry->getCount()));
        status->append("metricsErrors", static_cast<long long>(_metricsRetry->getCount()));
    } else {
        auto state = _state.synchronize();
        status->append("state", StorageState_serializer(state->getState()));
        status->append("message", state->getMessage());
        status->append("url", state->getInformationalURL());
        status->append("userReminder", state->getUserReminder());
    }
}

void FreeMonProcessor::doOnTransitionToPrimary(Client* client) {
    if (_registerOnTransitionToPrimary == RegistrationType::RegisterAfterOnTransitionToPrimary) {
        enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()));

    } else if (_registerOnTransitionToPrimary ==
               RegistrationType::RegisterAfterOnTransitionToPrimaryIfEnabled) {
        readState(client);
        if (_state->getState() == StorageStateEnum::enabled) {
            enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()));
        }
    }

    // On transition to primary once
    _registerOnTransitionToPrimary = RegistrationType::DoNotRegister;
}

void FreeMonProcessor::processInMemoryStateChange(const FreeMonStorageState& originalState,
                                                  const FreeMonStorageState& newState) {
    // Are we transition from disabled -> enabled?
    if (originalState.getState() != newState.getState()) {
        if (originalState.getState() != StorageStateEnum::enabled &&
            newState.getState() == StorageStateEnum::enabled) {

            // Secondary needs to start registration
            enqueue(FreeMonRegisterCommandMessage::createNow(std::vector<std::string>()));
        }
    }
}

void FreeMonProcessor::doNotifyOnUpsert(
    Client* client, const FreeMonMessageWithPayload<FreeMonMessageType::NotifyOnUpsert>* msg) {
    try {
        const BSONObj& doc = msg->getPayload();
        auto newState = FreeMonStorageState::parse(IDLParserErrorContext("free_mon_storage"), doc);

        // Likely, the update changed something
        if (newState != _state) {
            uassert(50839,
                    str::stream() << "Unexpected free monitoring storage version "
                                  << newState.getVersion(),
                    newState.getVersion() == kStorageVersion);

            processInMemoryStateChange(_state.get(), newState);

            // Note: enabled -> disabled is handled implicitly by register and send metrics checks
            // after _state is updated below

            // Copy the fields
            _state = newState;
        }

    } catch (...) {

        // Stop the queue
        _queue.stop();

        LOGV2_WARNING(20624,
                      "Uncaught exception in '{exception}' in free monitoring op observer. "
                      "Shutting down the free monitoring subsystem.",
                      "exception"_attr = exceptionToStatus());
    }
}

void FreeMonProcessor::doNotifyOnDelete(Client* client) {
    // The config document was either deleted or the entire collection was dropped, we treat them
    // the same and stop free monitoring. We continue collecting though.

    // So we mark the internal state as disabled which stop registration and metrics send
    _state->setState(StorageStateEnum::pending);
    _registrationStatus = FreeMonRegistrationStatus::kDisabled;

    // Clear out the in-memory state
    _lastReadState = boost::none;
}

void FreeMonProcessor::doNotifyOnRollback(Client* client) {
    // We have rolled back, the state on disk reflects our new reality
    // We should re-read the disk state and proceed.

    // copy the in-memory state
    auto originalState = _state.get();

    // Re-read state from disk
    readState(client);

    auto newState = _state.get();

    if (newState != originalState) {
        processInMemoryStateChange(originalState, newState);
    }
}


}  // namespace mongo