diff options
Diffstat (limited to 'src/mongo/db/s/query_analysis_writer.cpp')
| -rw-r--r-- | src/mongo/db/s/query_analysis_writer.cpp | 695 |
1 files changed, 695 insertions, 0 deletions
diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp new file mode 100644 index 00000000000..19b942e1775 --- /dev/null +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -0,0 +1,695 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/platform/basic.h" + +#include "mongo/db/s/query_analysis_writer.h" + +#include "mongo/client/connpool.h" +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/server_options.h" +#include "mongo/db/update/document_diff_calculator.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/s/analyze_shard_key_documents_gen.h" +#include "mongo/s/analyze_shard_key_server_parameters_gen.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/concurrency/thread_pool.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + +namespace mongo { +namespace analyze_shard_key { + +namespace { + +MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisWriter); +MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingLocally); +MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingRemotely); + +const auto getQueryAnalysisWriter = ServiceContext::declareDecoration<QueryAnalysisWriter>(); + +constexpr int kMaxRetriesOnRetryableErrors = 5; +const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kWriteConcernTimeoutSystem}; + +// The size limit for the documents to an insert in a single batch. Leave some padding for other +// fields in the insert command. +constexpr int kMaxBSONObjSizeForInsert = BSONObjMaxUserSize - 500 * 1024; + +/* + * Returns true if this mongod can accept writes to the given collection. Unless the collection is + * in the "local" database, this will only return true if this mongod is a primary (or a + * standalone). + */ +bool canAcceptWrites(OperationContext* opCtx, const NamespaceString& ns) { + ShouldNotConflictWithSecondaryBatchApplicationBlock noPBWMBlock(opCtx->lockState()); + Lock::DBLock lk(opCtx, ns.dbName(), MODE_IS); + Lock::CollectionLock lock(opCtx, ns, MODE_IS); + return mongo::repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, + ns.db()); +} + +/* + * Runs the given write command against the given database locally, asserts that the top-level + * command is OK, then asserts the write status using the given 'uassertWriteStatusCb' callback. + * Returns the command response. + */ +BSONObj executeWriteCommandLocal(OperationContext* opCtx, + const std::string dbName, + const BSONObj& cmdObj, + const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { + DBDirectClient client(opCtx); + BSONObj resObj; + + if (!client.runCommand(dbName, cmdObj, resObj)) { + uassertStatusOK(getStatusFromCommandResult(resObj)); + } + uassertWriteStatusCb(resObj); + + return resObj; +} + +/* + * Runs the given write command against the given database on the (remote) primary, asserts that the + * top-level command is OK, then asserts the write status using the given 'uassertWriteStatusCb' + * callback. Throws a PrimarySteppedDown error if no primary is found. Returns the command response. + */ +BSONObj executeWriteCommandRemote(OperationContext* opCtx, + const std::string dbName, + const BSONObj& cmdObj, + const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { + auto hostAndPort = repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort(); + + if (hostAndPort.empty()) { + uasserted(ErrorCodes::PrimarySteppedDown, "No primary exists currently"); + } + + auto conn = std::make_unique<ScopedDbConnection>(hostAndPort.toString()); + + if (auth::isInternalAuthSet()) { + uassertStatusOK(conn->get()->authenticateInternalUser()); + } + + DBClientBase* client = conn->get(); + ScopeGuard guard([&] { conn->done(); }); + try { + BSONObj resObj; + + if (!client->runCommand(dbName, cmdObj, resObj)) { + uassertStatusOK(getStatusFromCommandResult(resObj)); + } + uassertWriteStatusCb(resObj); + + return resObj; + } catch (...) { + guard.dismiss(); + conn->kill(); + throw; + } +} + +/* + * Runs the given write command against the given collection. If this mongod is currently the + * primary, runs the write command locally. Otherwise, runs the command on the remote primary. + * Internally asserts that the top-level command is OK, then asserts the write status using the + * given 'uassertWriteStatusCb' callback. Internally retries the write command on retryable errors + * (for kMaxRetriesOnRetryableErrors times) so the writes must be idempotent. Returns the + * command response. + */ +BSONObj executeWriteCommand(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& cmdObj, + const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { + const auto dbName = ns.db().toString(); + auto numRetries = 0; + + while (true) { + try { + if (canAcceptWrites(opCtx, ns)) { + // There is a window here where this mongod may step down after check above. In this + // case, a NotWritablePrimary error would be thrown. However, this is preferable to + // running the command while holding locks. + hangQueryAnalysisWriterBeforeWritingLocally.pauseWhileSet(opCtx); + return executeWriteCommandLocal(opCtx, dbName, cmdObj, uassertWriteStatusCb); + } + + hangQueryAnalysisWriterBeforeWritingRemotely.pauseWhileSet(opCtx); + return executeWriteCommandRemote(opCtx, dbName, cmdObj, uassertWriteStatusCb); + } catch (DBException& ex) { + if (ErrorCodes::isRetriableError(ex) && numRetries < kMaxRetriesOnRetryableErrors) { + numRetries++; + continue; + } + throw; + } + } + + return {}; +} + +struct SampledWriteCommandRequest { + UUID sampleId; + NamespaceString nss; + BSONObj cmd; // the BSON for a {Update,Delete,FindAndModify}CommandRequest +}; + +/* + * Returns a sampled update command for the update at 'opIndex' in the given update command. + */ +SampledWriteCommandRequest makeSampledUpdateCommandRequest( + const write_ops::UpdateCommandRequest& originalCmd, int opIndex) { + auto op = originalCmd.getUpdates()[opIndex]; + auto sampleId = op.getSampleId(); + invariant(sampleId); + + write_ops::UpdateCommandRequest sampledCmd(originalCmd.getNamespace(), {std::move(op)}); + sampledCmd.setLet(originalCmd.getLet()); + + return {*sampleId, + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + +/* + * Returns a sampled delete command for the delete at 'opIndex' in the given delete command. + */ +SampledWriteCommandRequest makeSampledDeleteCommandRequest( + const write_ops::DeleteCommandRequest& originalCmd, int opIndex) { + auto op = originalCmd.getDeletes()[opIndex]; + auto sampleId = op.getSampleId(); + invariant(sampleId); + + write_ops::DeleteCommandRequest sampledCmd(originalCmd.getNamespace(), {std::move(op)}); + sampledCmd.setLet(originalCmd.getLet()); + + return {*sampleId, + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + +/* + * Returns a sampled findAndModify command for the given findAndModify command. + */ +SampledWriteCommandRequest makeSampledFindAndModifyCommandRequest( + const write_ops::FindAndModifyCommandRequest& originalCmd) { + invariant(originalCmd.getSampleId()); + + write_ops::FindAndModifyCommandRequest sampledCmd(originalCmd.getNamespace()); + sampledCmd.setQuery(originalCmd.getQuery()); + sampledCmd.setUpdate(originalCmd.getUpdate()); + sampledCmd.setRemove(originalCmd.getRemove()); + sampledCmd.setUpsert(originalCmd.getUpsert()); + sampledCmd.setNew(originalCmd.getNew()); + sampledCmd.setSort(originalCmd.getSort()); + sampledCmd.setCollation(originalCmd.getCollation()); + sampledCmd.setArrayFilters(originalCmd.getArrayFilters()); + sampledCmd.setLet(originalCmd.getLet()); + sampledCmd.setSampleId(originalCmd.getSampleId()); + + return {*sampledCmd.getSampleId(), + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + +} // namespace + +QueryAnalysisWriter& QueryAnalysisWriter::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +QueryAnalysisWriter& QueryAnalysisWriter::get(ServiceContext* serviceContext) { + invariant(analyze_shard_key::isFeatureFlagEnabledIgnoreFCV(), + "Only support analyzing queries when the feature flag is enabled"); + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer, + "Only support analyzing queries on a sharded cluster"); + return getQueryAnalysisWriter(serviceContext); +} + +void QueryAnalysisWriter::onStartup() { + auto serviceContext = getQueryAnalysisWriter.owner(this); + auto periodicRunner = serviceContext->getPeriodicRunner(); + invariant(periodicRunner); + + stdx::lock_guard<Latch> lk(_mutex); + + PeriodicRunner::PeriodicJob QueryWriterJob( + "QueryAnalysisQueryWriter", + [this](Client* client) { + if (MONGO_unlikely(disableQueryAnalysisWriter.shouldFail())) { + return; + } + auto opCtx = client->makeOperationContext(); + _flushQueries(opCtx.get()); + }, + Seconds(gQueryAnalysisWriterIntervalSecs)); + _periodicQueryWriter = periodicRunner->makeJob(std::move(QueryWriterJob)); + _periodicQueryWriter.start(); + + PeriodicRunner::PeriodicJob diffWriterJob( + "QueryAnalysisDiffWriter", + [this](Client* client) { + if (MONGO_unlikely(disableQueryAnalysisWriter.shouldFail())) { + return; + } + auto opCtx = client->makeOperationContext(); + _flushDiffs(opCtx.get()); + }, + Seconds(gQueryAnalysisWriterIntervalSecs)); + _periodicDiffWriter = periodicRunner->makeJob(std::move(diffWriterJob)); + _periodicDiffWriter.start(); + + ThreadPool::Options threadPoolOptions; + threadPoolOptions.maxThreads = gQueryAnalysisWriterMaxThreadPoolSize; + threadPoolOptions.minThreads = gQueryAnalysisWriterMinThreadPoolSize; + threadPoolOptions.threadNamePrefix = "QueryAnalysisWriter-"; + threadPoolOptions.poolName = "QueryAnalysisWriterThreadPool"; + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + _executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(threadPoolOptions), + executor::makeNetworkInterface("QueryAnalysisWriterNetwork")); + _executor->startup(); +} + +void QueryAnalysisWriter::onShutdown() { + if (_executor) { + _executor->shutdown(); + _executor->join(); + } + if (_periodicQueryWriter.isValid()) { + _periodicQueryWriter.stop(); + } + if (_periodicDiffWriter.isValid()) { + _periodicDiffWriter.stop(); + } +} + +void QueryAnalysisWriter::_flushQueries(OperationContext* opCtx) { + try { + _flush(opCtx, NamespaceString::kConfigSampledQueriesNamespace, &_queries); + } catch (DBException& ex) { + LOGV2(7047300, + "Failed to flush queries, will try again at the next interval", + "error"_attr = redact(ex)); + } +} + +void QueryAnalysisWriter::_flushDiffs(OperationContext* opCtx) { + try { + _flush(opCtx, NamespaceString::kConfigSampledQueriesDiffNamespace, &_diffs); + } catch (DBException& ex) { + LOGV2(7075400, + "Failed to flush diffs, will try again at the next interval", + "error"_attr = redact(ex)); + } +} + +void QueryAnalysisWriter::_flush(OperationContext* opCtx, + const NamespaceString& ns, + Buffer* buffer) { + Buffer tmpBuffer; + // The indices of invalid documents, e.g. documents that fail to insert with DuplicateKey errors + // (i.e. duplicates) and BadValue errors. Such documents should not get added back to the buffer + // when the inserts below fail. + std::set<int> invalid; + { + stdx::lock_guard<Latch> lk(_mutex); + if (buffer->isEmpty()) { + return; + } + std::swap(tmpBuffer, *buffer); + } + ScopeGuard backSwapper([&] { + stdx::lock_guard<Latch> lk(_mutex); + for (int i = 0; i < tmpBuffer.getCount(); i++) { + if (invalid.find(i) == invalid.end()) { + buffer->add(tmpBuffer.at(i)); + } + } + }); + + // Insert the documents in batches from the back of the buffer so that we don't need to move the + // documents forward after each batch. + size_t baseIndex = tmpBuffer.getCount() - 1; + size_t maxBatchSize = gQueryAnalysisWriterMaxBatchSize.load(); + + while (!tmpBuffer.isEmpty()) { + std::vector<BSONObj> docsToInsert; + long long objSize = 0; + + size_t lastIndex = tmpBuffer.getCount(); // inclusive + while (lastIndex > 0 && docsToInsert.size() < maxBatchSize) { + // Check if the next document can fit in the batch. + auto doc = tmpBuffer.at(lastIndex - 1); + if (doc.objsize() + objSize >= kMaxBSONObjSizeForInsert) { + break; + } + lastIndex--; + objSize += doc.objsize(); + docsToInsert.push_back(std::move(doc)); + } + // We don't add a document that is above the size limit to the buffer so we should have + // added at least one document to 'docsToInsert'. + invariant(!docsToInsert.empty()); + + write_ops::InsertCommandRequest insertCmd(ns); + insertCmd.setDocuments(std::move(docsToInsert)); + insertCmd.setWriteCommandRequestBase([&] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(false); + return wcb; + }()); + auto insertCmdBson = insertCmd.toBSON( + {BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())}); + + executeWriteCommand(opCtx, ns, std::move(insertCmdBson), [&](const BSONObj& resObj) { + BatchedCommandResponse response; + std::string errMsg; + + if (!response.parseBSON(resObj, &errMsg)) { + uasserted(ErrorCodes::FailedToParse, errMsg); + } + + if (response.isErrDetailsSet() && response.sizeErrDetails() > 0) { + boost::optional<write_ops::WriteError> firstWriteErr; + + for (const auto& err : response.getErrDetails()) { + if (err.getStatus() == ErrorCodes::DuplicateKey || + err.getStatus() == ErrorCodes::BadValue) { + LOGV2(7075402, + "Ignoring insert error", + "error"_attr = redact(err.getStatus())); + invalid.insert(baseIndex - err.getIndex()); + continue; + } + if (!firstWriteErr) { + // Save the error for later. Go through the rest of the errors to see if + // there are any invalid documents so they can be discarded from the buffer. + firstWriteErr.emplace(err); + } + } + if (firstWriteErr) { + uassertStatusOK(firstWriteErr->getStatus()); + } + } else { + uassertStatusOK(response.toStatus()); + } + }); + + tmpBuffer.truncate(lastIndex, objSize); + baseIndex -= lastIndex; + } + + backSwapper.dismiss(); +} + +void QueryAnalysisWriter::Buffer::add(BSONObj doc) { + if (doc.objsize() > kMaxBSONObjSizeForInsert) { + return; + } + _docs.push_back(std::move(doc)); + _numBytes += _docs.back().objsize(); +} + +void QueryAnalysisWriter::Buffer::truncate(size_t index, long long numBytes) { + invariant(index >= 0); + invariant(index < _docs.size()); + invariant(numBytes > 0); + invariant(numBytes <= _numBytes); + _docs.resize(index); + _numBytes -= numBytes; +} + +bool QueryAnalysisWriter::_exceedsMaxSizeBytes() { + stdx::lock_guard<Latch> lk(_mutex); + return _queries.getSize() + _diffs.getSize() >= gQueryAnalysisWriterMaxMemoryUsageBytes.load(); +} + +ExecutorFuture<void> QueryAnalysisWriter::addFindQuery(const UUID& sampleId, + const NamespaceString& nss, + const BSONObj& filter, + const BSONObj& collation) { + return _addReadQuery(sampleId, nss, SampledReadCommandNameEnum::kFind, filter, collation); +} + +ExecutorFuture<void> QueryAnalysisWriter::addCountQuery(const UUID& sampleId, + const NamespaceString& nss, + const BSONObj& filter, + const BSONObj& collation) { + return _addReadQuery(sampleId, nss, SampledReadCommandNameEnum::kCount, filter, collation); +} + +ExecutorFuture<void> QueryAnalysisWriter::addDistinctQuery(const UUID& sampleId, + const NamespaceString& nss, + const BSONObj& filter, + const BSONObj& collation) { + return _addReadQuery(sampleId, nss, SampledReadCommandNameEnum::kDistinct, filter, collation); +} + +ExecutorFuture<void> QueryAnalysisWriter::addAggregateQuery(const UUID& sampleId, + const NamespaceString& nss, + const BSONObj& filter, + const BSONObj& collation) { + return _addReadQuery(sampleId, nss, SampledReadCommandNameEnum::kAggregate, filter, collation); +} + +ExecutorFuture<void> QueryAnalysisWriter::_addReadQuery(const UUID& sampleId, + const NamespaceString& nss, + SampledReadCommandNameEnum cmdName, + const BSONObj& filter, + const BSONObj& collation) { + invariant(_executor); + return ExecutorFuture<void>(_executor) + .then([this, + sampleId, + nss, + cmdName, + filter = filter.getOwned(), + collation = collation.getOwned()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss); + + if (!collUuid) { + LOGV2(7047301, "Found a sampled read query for non-existing collection"); + return; + } + + auto cmd = SampledReadCommand{filter.getOwned(), collation.getOwned()}; + auto doc = SampledReadQueryDocument{sampleId, nss, *collUuid, cmdName, cmd.toBSON()}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss](Status status) { + LOGV2(7047302, + "Failed to add read query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addUpdateQuery( + const write_ops::UpdateCommandRequest& updateCmd, int opIndex) { + invariant(updateCmd.getUpdates()[opIndex].getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, sampledUpdateCmd = makeSampledUpdateCommandRequest(updateCmd, opIndex)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledUpdateCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075300, + "Found a sampled update query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledUpdateCmd.sampleId, + sampledUpdateCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kUpdate, + std::move(sampledUpdateCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = updateCmd.getNamespace()](Status status) { + LOGV2(7075301, + "Failed to add update query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addDeleteQuery( + const write_ops::DeleteCommandRequest& deleteCmd, int opIndex) { + invariant(deleteCmd.getDeletes()[opIndex].getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, sampledDeleteCmd = makeSampledDeleteCommandRequest(deleteCmd, opIndex)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledDeleteCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075302, + "Found a sampled delete query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledDeleteCmd.sampleId, + sampledDeleteCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kDelete, + std::move(sampledDeleteCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = deleteCmd.getNamespace()](Status status) { + LOGV2(7075303, + "Failed to add delete query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addFindAndModifyQuery( + const write_ops::FindAndModifyCommandRequest& findAndModifyCmd) { + invariant(findAndModifyCmd.getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, + sampledFindAndModifyCmd = + makeSampledFindAndModifyCommandRequest(findAndModifyCmd)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledFindAndModifyCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075304, + "Found a sampled findAndModify query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledFindAndModifyCmd.sampleId, + sampledFindAndModifyCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kFindAndModify, + std::move(sampledFindAndModifyCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = findAndModifyCmd.getNamespace()](Status status) { + LOGV2(7075305, + "Failed to add findAndModify query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addDiff(const UUID& sampleId, + const NamespaceString& nss, + const UUID& collUuid, + const BSONObj& preImage, + const BSONObj& postImage) { + invariant(_executor); + return ExecutorFuture<void>(_executor) + .then([this, + sampleId, + nss, + collUuid, + preImage = preImage.getOwned(), + postImage = postImage.getOwned()]() { + auto diff = doc_diff::computeInlineDiff(preImage, postImage); + + if (!diff || diff->isEmpty()) { + return; + } + + auto doc = SampledQueryDiffDocument{sampleId, nss, collUuid, std::move(*diff)}; + stdx::lock_guard<Latch> lk(_mutex); + _diffs.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushDiffs(opCtx); + } + }) + .onError([this, nss](Status status) { + LOGV2(7075401, "Failed to add diff", "ns"_attr = nss, "error"_attr = redact(status)); + }); +} + +} // namespace analyze_shard_key +} // namespace mongo |
