summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_telemetry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_telemetry.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_telemetry.cpp215
1 files changed, 215 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_telemetry.cpp b/src/mongo/db/pipeline/document_source_telemetry.cpp
new file mode 100644
index 00000000000..b037515796f
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_telemetry.cpp
@@ -0,0 +1,215 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/document_source_telemetry.h"
+
+#include "mongo/bson/bsontypes.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/debug_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+namespace mongo {
+namespace {
+CounterMetric telemetryHmacApplicationErrors("telemetry.numHmacApplicationErrors");
+}
+
+REGISTER_DOCUMENT_SOURCE_WITH_FEATURE_FLAG(telemetry,
+ DocumentSourceTelemetry::LiteParsed::parse,
+ DocumentSourceTelemetry::createFromBson,
+ AllowedWithApiStrict::kNeverInVersion1,
+ feature_flags::gFeatureFlagTelemetry);
+
+namespace {
+/**
+ * Try to parse the applyHmacToIdentifiers property from the element.
+ */
+boost::optional<bool> parseApplyHmacToIdentifiers(const BSONElement& el) {
+ if (el.fieldNameStringData() == "applyHmacToIdentifiers"_sd) {
+ auto type = el.type();
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << DocumentSourceTelemetry::kStageName
+ << " applyHmacToIdentifiers parameter must be boolean. Found type: "
+ << typeName(type),
+ type == BSONType::Bool);
+ return el.trueValue();
+ }
+ return boost::none;
+}
+
+/**
+ * Try to parse the `hmacKey' property from the element.
+ */
+boost::optional<std::string> parseHmacKey(const BSONElement& el) {
+ if (el.fieldNameStringData() == "hmacKey"_sd) {
+ auto type = el.type();
+ if (el.isBinData(BinDataType::BinDataGeneral)) {
+ int len;
+ auto data = el.binData(len);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << DocumentSourceTelemetry::kStageName
+ << "hmacKey must be greater than or equal to 32 bytes",
+ len >= 32);
+ return {{data, (size_t)len}};
+ }
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream()
+ << DocumentSourceTelemetry::kStageName
+ << " hmacKey parameter must be bindata of length 32 or greater. Found type: "
+ << typeName(type));
+ }
+ return boost::none;
+}
+
+/**
+ * Parse the spec object calling the `ctor` with the bool applyHmacToIdentifiers and std::string
+ * hmacKey arguments.
+ */
+template <typename Ctor>
+auto parseSpec(const BSONElement& spec, const Ctor& ctor) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << DocumentSourceTelemetry::kStageName
+ << " value must be an object. Found: " << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ bool applyHmacToIdentifiers = false;
+ std::string hmacKey;
+ for (auto&& el : spec.embeddedObject()) {
+ if (auto maybeApplyHmacToIdentifiers = parseApplyHmacToIdentifiers(el);
+ maybeApplyHmacToIdentifiers) {
+ applyHmacToIdentifiers = *maybeApplyHmacToIdentifiers;
+ } else if (auto maybeHmacKey = parseHmacKey(el); maybeHmacKey) {
+ hmacKey = *maybeHmacKey;
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream()
+ << DocumentSourceTelemetry::kStageName
+ << " parameters object may only contain 'applyHmacToIdentifiers' or "
+ "'hmacKey' options. Found: "
+ << el.fieldName());
+ }
+ }
+
+ return ctor(applyHmacToIdentifiers, hmacKey);
+}
+
+} // namespace
+
+std::unique_ptr<DocumentSourceTelemetry::LiteParsed> DocumentSourceTelemetry::LiteParsed::parse(
+ const NamespaceString& nss, const BSONElement& spec) {
+ return parseSpec(spec, [&](bool applyHmacToIdentifiers, std::string hmacKey) {
+ return std::make_unique<DocumentSourceTelemetry::LiteParsed>(
+ spec.fieldName(), applyHmacToIdentifiers, hmacKey);
+ });
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceTelemetry::createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+ const NamespaceString& nss = pExpCtx->ns;
+
+ uassert(ErrorCodes::InvalidNamespace,
+ "$telemetry must be run against the 'admin' database with {aggregate: 1}",
+ nss.db() == DatabaseName::kAdmin.db() && nss.isCollectionlessAggregateNS());
+
+ return parseSpec(spec, [&](bool applyHmacToIdentifiers, std::string hmacKey) {
+ return new DocumentSourceTelemetry(pExpCtx, applyHmacToIdentifiers, hmacKey);
+ });
+}
+
+Value DocumentSourceTelemetry::serialize(SerializationOptions opts) const {
+ // This document source never contains any user information, so no need for any work when
+ // applying hmac.
+ return Value{Document{{kStageName, Document{}}}};
+}
+
+DocumentSource::GetNextResult DocumentSourceTelemetry::doGetNext() {
+ /**
+ * We maintain nested iterators:
+ * - Outer one over the set of partitions.
+ * - Inner one over the set of entries in a "materialized" partition.
+ *
+ * When an inner iterator is present and contains more elements, we can return the next element.
+ * When the inner iterator is exhausted, we move to the next element in the outer iterator and
+ * create a new inner iterator. When the outer iterator is exhausted, we have finished iterating
+ * over the telemetry store entries.
+ *
+ * The inner iterator iterates over a materialized container of all entries in the partition.
+ * This is done to reduce the time under which the partition lock is held.
+ */
+ while (true) {
+ // First, attempt to exhaust all elements in the materialized partition.
+ if (!_materializedPartition.empty()) {
+ // Move out of the container reference.
+ auto doc = std::move(_materializedPartition.front());
+ _materializedPartition.pop_front();
+ return {std::move(doc)};
+ }
+
+ TelemetryStore& _telemetryStore = getTelemetryStore(getContext()->opCtx);
+
+ // Materialized partition is exhausted, move to the next.
+ _currentPartition++;
+ if (_currentPartition >= _telemetryStore.numPartitions()) {
+ return DocumentSource::GetNextResult::makeEOF();
+ }
+
+ // We only keep the partition (which holds a lock) for the time needed to materialize it to
+ // a set of Document instances.
+ auto&& partition = _telemetryStore.getPartition(_currentPartition);
+
+ // Capture the time at which reading the partition begins to indicate to the caller
+ // when the snapshot began.
+ const auto partitionReadTime =
+ Timestamp{Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0)};
+ for (auto&& [key, metrics] : *partition) {
+ try {
+ auto telemetryKey =
+ metrics->computeTelemetryKey(pExpCtx->opCtx, _applyHmacToIdentifiers, _hmacKey);
+ _materializedPartition.push_back({{"key", std::move(telemetryKey)},
+ {"metrics", metrics->toBSON()},
+ {"asOf", partitionReadTime}});
+ } catch (const DBException& ex) {
+ telemetryHmacApplicationErrors.increment();
+ LOGV2_DEBUG(7349403,
+ 3,
+ "Error encountered when applying hmac to query shape, will not publish "
+ "telemetry for this entry.",
+ "status"_attr = ex.toStatus(),
+ "hash"_attr = key);
+ if (kDebugBuild) {
+ tasserted(7349401,
+ "Was not able to re-parse telemetry key when reading telemetry.");
+ }
+ }
+ }
+ }
+}
+
+} // namespace mongo