diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-11-01 23:48:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-02 00:43:35 +0000 |
commit | d3ec2b61a2589c9f35f2e11abaee5f163575c99e (patch) | |
tree | 33422391ca23365f8bdc72fbd7fc734413f2f29a /src | |
parent | 433c0e26e0d4e35bb9b2bf6159ca1556808ab9b9 (diff) | |
download | mongo-d3ec2b61a2589c9f35f2e11abaee5f163575c99e.tar.gz |
SERVER-70753 Make shards support persisting sampled write queries
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/ops/write_ops.idl | 15 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.cpp | 192 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer_test.cpp | 420 | ||||
-rw-r--r-- | src/mongo/s/analyze_shard_key_common.idl | 9 | ||||
-rw-r--r-- | src/mongo/s/analyze_shard_key_documents.idl | 21 |
6 files changed, 655 insertions, 11 deletions
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index a0567a957f8..be734dae3a6 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -259,6 +259,11 @@ structs: type: object optional: true stability: stable + sampleId: + description: "The unique sample id for the operation if it has been chosen for sampling." + type: uuid + optional: true + stability: unstable DeleteOpEntry: description: "Parser for the entries in the 'deletes' array of a delete command." @@ -285,6 +290,11 @@ structs: type: object optional: true stability: stable + sampleId: + description: "The unique sample id for the operation if it has been chosen for sampling." + type: uuid + optional: true + stability: unstable FindAndModifyLastError: description: "Contains execution details for the findAndModify command" @@ -543,3 +553,8 @@ commands: description: "Indicates whether the operation is a mirrored read" type: optionalBool stability: unstable + sampleId: + description: "The unique sample id for the operation if it has been chosen for sampling." + type: uuid + optional: true + stability: unstable diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp index bad5c0c0153..3861c48c464 100644 --- a/src/mongo/db/s/query_analysis_writer.cpp +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -176,6 +176,70 @@ BSONObj executeWriteCommand(OperationContext* opCtx, return {}; } +struct SampledWriteCommandRequest { + UUID sampleId; + NamespaceString nss; + BSONObj cmd; // the BSON for a {Update,Delete,FindAndModify}CommandRequest +}; + +/* + * Returns a sampled update command for the update at 'opIndex' in the given update command. + */ +SampledWriteCommandRequest makeSampledUpdateCommandRequest( + const write_ops::UpdateCommandRequest& originalCmd, int opIndex) { + auto op = originalCmd.getUpdates()[opIndex]; + auto sampleId = op.getSampleId(); + invariant(sampleId); + + write_ops::UpdateCommandRequest sampledCmd(originalCmd.getNamespace(), {std::move(op)}); + sampledCmd.setLet(originalCmd.getLet()); + + return {*sampleId, + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + +/* + * Returns a sampled delete command for the delete at 'opIndex' in the given delete command. + */ +SampledWriteCommandRequest makeSampledDeleteCommandRequest( + const write_ops::DeleteCommandRequest& originalCmd, int opIndex) { + auto op = originalCmd.getDeletes()[opIndex]; + auto sampleId = op.getSampleId(); + invariant(sampleId); + + write_ops::DeleteCommandRequest sampledCmd(originalCmd.getNamespace(), {std::move(op)}); + sampledCmd.setLet(originalCmd.getLet()); + + return {*sampleId, + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + +/* + * Returns a sampled findAndModify command for the given findAndModify command. + */ +SampledWriteCommandRequest makeSampledFindAndModifyCommandRequest( + const write_ops::FindAndModifyCommandRequest& originalCmd) { + invariant(originalCmd.getSampleId()); + + write_ops::FindAndModifyCommandRequest sampledCmd(originalCmd.getNamespace()); + sampledCmd.setQuery(originalCmd.getQuery()); + sampledCmd.setUpdate(originalCmd.getUpdate()); + sampledCmd.setRemove(originalCmd.getRemove()); + sampledCmd.setUpsert(originalCmd.getUpsert()); + sampledCmd.setNew(originalCmd.getNew()); + sampledCmd.setSort(originalCmd.getSort()); + sampledCmd.setCollation(originalCmd.getCollation()); + sampledCmd.setArrayFilters(originalCmd.getArrayFilters()); + sampledCmd.setLet(originalCmd.getLet()); + sampledCmd.setSampleId(originalCmd.getSampleId()); + + return {*sampledCmd.getSampleId(), + sampledCmd.getNamespace(), + sampledCmd.toBSON(BSON("$db" << sampledCmd.getNamespace().db().toString()))}; +} + } // namespace QueryAnalysisWriter& QueryAnalysisWriter::get(OperationContext* opCtx) { @@ -437,5 +501,133 @@ ExecutorFuture<void> QueryAnalysisWriter::_addReadQuery(const UUID& sampleId, }); } +ExecutorFuture<void> QueryAnalysisWriter::addUpdateQuery( + const write_ops::UpdateCommandRequest& updateCmd, int opIndex) { + invariant(updateCmd.getUpdates()[opIndex].getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, sampledUpdateCmd = makeSampledUpdateCommandRequest(updateCmd, opIndex)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledUpdateCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075300, + "Found a sampled update query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledUpdateCmd.sampleId, + sampledUpdateCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kUpdate, + std::move(sampledUpdateCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = updateCmd.getNamespace()](Status status) { + LOGV2(7075301, + "Failed to add update query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addDeleteQuery( + const write_ops::DeleteCommandRequest& deleteCmd, int opIndex) { + invariant(deleteCmd.getDeletes()[opIndex].getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, sampledDeleteCmd = makeSampledDeleteCommandRequest(deleteCmd, opIndex)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledDeleteCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075302, + "Found a sampled delete query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledDeleteCmd.sampleId, + sampledDeleteCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kDelete, + std::move(sampledDeleteCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = deleteCmd.getNamespace()](Status status) { + LOGV2(7075303, + "Failed to add delete query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + +ExecutorFuture<void> QueryAnalysisWriter::addFindAndModifyQuery( + const write_ops::FindAndModifyCommandRequest& findAndModifyCmd) { + invariant(findAndModifyCmd.getSampleId()); + invariant(_executor); + + return ExecutorFuture<void>(_executor) + .then([this, + sampledFindAndModifyCmd = + makeSampledFindAndModifyCommandRequest(findAndModifyCmd)]() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto collUuid = + CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, sampledFindAndModifyCmd.nss); + + if (!collUuid) { + LOGV2_WARNING(7075304, + "Found a sampled findAndModify query for a non-existing collection"); + return; + } + + auto doc = SampledWriteQueryDocument{sampledFindAndModifyCmd.sampleId, + sampledFindAndModifyCmd.nss, + *collUuid, + SampledWriteCommandNameEnum::kFindAndModify, + std::move(sampledFindAndModifyCmd.cmd)}; + stdx::lock_guard<Latch> lk(_mutex); + _queries.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushQueries(opCtx); + } + }) + .onError([this, nss = findAndModifyCmd.getNamespace()](Status status) { + LOGV2(7075305, + "Failed to add findAndModify query", + "ns"_attr = nss, + "error"_attr = redact(status)); + }); +} + } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h index 8937639985e..76ae137273a 100644 --- a/src/mongo/db/s/query_analysis_writer.h +++ b/src/mongo/db/s/query_analysis_writer.h @@ -34,6 +34,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/s/analyze_shard_key_common_gen.h" #include "mongo/s/analyze_shard_key_util.h" +#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/periodic_runner.h" namespace mongo { @@ -128,6 +129,14 @@ public: const BSONObj& filter, const BSONObj& collation); + ExecutorFuture<void> addUpdateQuery(const write_ops::UpdateCommandRequest& updateCmd, + int opIndex); + ExecutorFuture<void> addDeleteQuery(const write_ops::DeleteCommandRequest& deleteCmd, + int opIndex); + + ExecutorFuture<void> addFindAndModifyQuery( + const write_ops::FindAndModifyCommandRequest& findAndModifyCmd); + int getQueriesCountForTest() const { stdx::lock_guard<Latch> lk(_mutex); return _queries.getCount(); diff --git a/src/mongo/db/s/query_analysis_writer_test.cpp b/src/mongo/db/s/query_analysis_writer_test.cpp index 641cbf16e61..3c78afab971 100644 --- a/src/mongo/db/s/query_analysis_writer_test.cpp +++ b/src/mongo/db/s/query_analysis_writer_test.cpp @@ -190,6 +190,131 @@ protected: << "strength" << strength); } + /* + * Makes an UpdateCommandRequest for the collection 'nss' such that the command contains + * 'numOps' updates and the ones whose indices are in 'markForSampling' are marked for sampling. + * Returns the UpdateCommandRequest and the map storing the expected sampled + * UpdateCommandRequests by sample id, if any. + */ + std::pair<write_ops::UpdateCommandRequest, std::map<UUID, write_ops::UpdateCommandRequest>> + makeUpdateCommandRequest(const NamespaceString& nss, + int numOps, + std::set<int> markForSampling, + std::string filterFieldName = "a") { + write_ops::UpdateCommandRequest originalCmd(nss); + std::vector<write_ops::UpdateOpEntry> updateOps; // populated below. + originalCmd.setLet(let); + originalCmd.getWriteCommandRequestBase().setEncryptionInformation(encryptionInformation); + + std::map<UUID, write_ops::UpdateCommandRequest> expectedSampledCmds; + + for (auto i = 0; i < numOps; i++) { + auto updateOp = write_ops::UpdateOpEntry( + BSON(filterFieldName << i), + write_ops::UpdateModification(BSON("$set" << BSON("b.$[element]" << i)))); + updateOp.setC(BSON("x" << i)); + updateOp.setArrayFilters(std::vector<BSONObj>{BSON("element" << BSON("$gt" << i))}); + updateOp.setMulti(_getRandomBool()); + updateOp.setUpsert(_getRandomBool()); + updateOp.setUpsertSupplied(_getRandomBool()); + updateOp.setCollation(makeNonEmptyCollation()); + + if (markForSampling.find(i) != markForSampling.end()) { + auto sampleId = UUID::gen(); + updateOp.setSampleId(sampleId); + + write_ops::UpdateCommandRequest expectedSampledCmd = originalCmd; + expectedSampledCmd.setUpdates({updateOp}); + expectedSampledCmd.getWriteCommandRequestBase().setEncryptionInformation( + boost::none); + expectedSampledCmds.emplace(sampleId, std::move(expectedSampledCmd)); + } + updateOps.push_back(updateOp); + } + originalCmd.setUpdates(updateOps); + + return {originalCmd, expectedSampledCmds}; + } + + /* + * Makes an DeleteCommandRequest for the collection 'nss' such that the command contains + * 'numOps' deletes and the ones whose indices are in 'markForSampling' are marked for sampling. + * Returns the DeleteCommandRequest and the map storing the expected sampled + * DeleteCommandRequests by sample id, if any. + */ + std::pair<write_ops::DeleteCommandRequest, std::map<UUID, write_ops::DeleteCommandRequest>> + makeDeleteCommandRequest(const NamespaceString& nss, + int numOps, + std::set<int> markForSampling, + std::string filterFieldName = "a") { + write_ops::DeleteCommandRequest originalCmd(nss); + std::vector<write_ops::DeleteOpEntry> deleteOps; // populated and set below. + originalCmd.setLet(let); + originalCmd.getWriteCommandRequestBase().setEncryptionInformation(encryptionInformation); + + std::map<UUID, write_ops::DeleteCommandRequest> expectedSampledCmds; + + for (auto i = 0; i < numOps; i++) { + auto deleteOp = + write_ops::DeleteOpEntry(BSON(filterFieldName << i), _getRandomBool() /* multi */); + deleteOp.setCollation(makeNonEmptyCollation()); + + if (markForSampling.find(i) != markForSampling.end()) { + auto sampleId = UUID::gen(); + deleteOp.setSampleId(sampleId); + + write_ops::DeleteCommandRequest expectedSampledCmd = originalCmd; + expectedSampledCmd.setDeletes({deleteOp}); + expectedSampledCmd.getWriteCommandRequestBase().setEncryptionInformation( + boost::none); + expectedSampledCmds.emplace(sampleId, std::move(expectedSampledCmd)); + } + deleteOps.push_back(deleteOp); + } + originalCmd.setDeletes(deleteOps); + + return {originalCmd, expectedSampledCmds}; + } + + /* + * Makes a FindAndModifyCommandRequest for the collection 'nss'. The findAndModify is an update + * if 'isUpdate' is true, and a remove otherwise. If 'markForSampling' is true, it is marked for + * sampling. Returns the FindAndModifyCommandRequest and the map storing the expected sampled + * FindAndModifyCommandRequests by sample id, if any. + */ + std::pair<write_ops::FindAndModifyCommandRequest, + std::map<UUID, write_ops::FindAndModifyCommandRequest>> + makeFindAndModifyCommandRequest(const NamespaceString& nss, + bool isUpdate, + bool markForSampling, + std::string filterFieldName = "a") { + write_ops::FindAndModifyCommandRequest originalCmd(nss); + originalCmd.setQuery(BSON(filterFieldName << 0)); + originalCmd.setUpdate( + write_ops::UpdateModification(BSON("$set" << BSON("b.$[element]" << 0)))); + originalCmd.setArrayFilters(std::vector<BSONObj>{BSON("element" << BSON("$gt" << 10))}); + originalCmd.setSort(BSON("_id" << 1)); + if (isUpdate) { + originalCmd.setUpsert(_getRandomBool()); + originalCmd.setNew(_getRandomBool()); + } + originalCmd.setCollation(makeNonEmptyCollation()); + originalCmd.setLet(let); + originalCmd.setEncryptionInformation(encryptionInformation); + + std::map<UUID, write_ops::FindAndModifyCommandRequest> expectedSampledCmds; + if (markForSampling) { + auto sampleId = UUID::gen(); + originalCmd.setSampleId(sampleId); + + auto expectedSampledCmd = originalCmd; + expectedSampledCmd.setEncryptionInformation(boost::none); + expectedSampledCmds.emplace(sampleId, std::move(expectedSampledCmd)); + } + + return {originalCmd, expectedSampledCmds}; + } + void deleteSampledQueryDocuments() const { DBDirectClient client(operationContext()); client.remove(NamespaceString::kConfigSampledQueriesNamespace.toString(), BSONObj()); @@ -226,6 +351,28 @@ protected: ASSERT_BSONOBJ_EQ(parsedCmd.getCollation(), collation); } + /* + * Asserts that there is a sampled write query document with the given sample id and that it has + * the given fields. + */ + template <typename CommandRequestType> + void assertSampledWriteQueryDocument(const UUID& sampleId, + const NamespaceString& nss, + SampledWriteCommandNameEnum cmdName, + const CommandRequestType& expectedCmd) { + auto doc = _getConfigDocument(NamespaceString::kConfigSampledQueriesNamespace, sampleId); + auto parsedQueryDoc = + SampledWriteQueryDocument::parse(IDLParserContext("QueryAnalysisWriterTest"), doc); + + ASSERT_EQ(parsedQueryDoc.getNs(), nss); + ASSERT_EQ(parsedQueryDoc.getCollectionUuid(), getCollectionUUID(nss)); + ASSERT_EQ(parsedQueryDoc.getSampleId(), sampleId); + ASSERT(parsedQueryDoc.getCmdName() == cmdName); + auto parsedCmd = CommandRequestType::parse(IDLParserContext("QueryAnalysisWriterTest"), + parsedQueryDoc.getCmd()); + ASSERT_BSONOBJ_EQ(parsedCmd.toBSON({}), expectedCmd.toBSON({})); + } + const NamespaceString nss0{"testDb", "testColl0"}; const NamespaceString nss1{"testDb", "testColl1"}; @@ -234,7 +381,17 @@ protected: const BSONObj emptyFilter{}; const BSONObj emptyCollation{}; + const BSONObj let = BSON("x" << 1); + // Test with EncryptionInformation to verify that QueryAnalysisWriter does not persist the + // WriteCommandRequestBase fields, especially this sensitive field. + const EncryptionInformation encryptionInformation{BSON("foo" + << "bar")}; + private: + bool _getRandomBool() { + return rand() % 2 == 0; + } + /** * Returns the number of the documents for the collection 'collNss' in the config collection * 'configNss'. @@ -379,6 +536,152 @@ TEST_F(QueryAnalysisWriterTest, AggregateQuery) { testAggregateCmdCommon(emptyFilter, emptyCollation); } +DEATH_TEST_F(QueryAnalysisWriterTest, UpdateQueryNotMarkedForSampling, "invariant") { + auto& writer = QueryAnalysisWriter::get(operationContext()); + auto [originalCmd, _] = makeUpdateCommandRequest(nss0, 1, {} /* markForSampling */); + writer.addUpdateQuery(originalCmd, 0).get(); +} + +TEST_F(QueryAnalysisWriterTest, UpdateQueriesMarkedForSampling) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto [originalCmd, expectedSampledCmds] = + makeUpdateCommandRequest(nss0, 3, {0, 2} /* markForSampling */); + ASSERT_EQ(expectedSampledCmds.size(), 2U); + + writer.addUpdateQuery(originalCmd, 0).get(); + writer.addUpdateQuery(originalCmd, 2).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 2); + writer.flushQueriesForTest(operationContext()); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 2); + for (const auto& [sampleId, expectedSampledCmd] : expectedSampledCmds) { + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kUpdate, + expectedSampledCmd); + } +} + +DEATH_TEST_F(QueryAnalysisWriterTest, DeleteQueryNotMarkedForSampling, "invariant") { + auto& writer = QueryAnalysisWriter::get(operationContext()); + auto [originalCmd, _] = makeDeleteCommandRequest(nss0, 1, {} /* markForSampling */); + writer.addDeleteQuery(originalCmd, 0).get(); +} + +TEST_F(QueryAnalysisWriterTest, DeleteQueriesMarkedForSampling) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto [originalCmd, expectedSampledCmds] = + makeDeleteCommandRequest(nss0, 3, {1, 2} /* markForSampling */); + ASSERT_EQ(expectedSampledCmds.size(), 2U); + + writer.addDeleteQuery(originalCmd, 1).get(); + writer.addDeleteQuery(originalCmd, 2).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 2); + writer.flushQueriesForTest(operationContext()); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 2); + for (const auto& [sampleId, expectedSampledCmd] : expectedSampledCmds) { + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kDelete, + expectedSampledCmd); + } +} + +DEATH_TEST_F(QueryAnalysisWriterTest, FindAndModifyQueryNotMarkedForSampling, "invariant") { + auto& writer = QueryAnalysisWriter::get(operationContext()); + auto [originalCmd, _] = + makeFindAndModifyCommandRequest(nss0, true /* isUpdate */, false /* markForSampling */); + writer.addFindAndModifyQuery(originalCmd).get(); +} + +TEST_F(QueryAnalysisWriterTest, FindAndModifyQueryUpdateMarkedForSampling) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto [originalCmd, expectedSampledCmds] = + makeFindAndModifyCommandRequest(nss0, true /* isUpdate */, true /* markForSampling */); + ASSERT_EQ(expectedSampledCmds.size(), 1U); + auto [sampleId, expectedSampledCmd] = *expectedSampledCmds.begin(); + + writer.addFindAndModifyQuery(originalCmd).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 1); + writer.flushQueriesForTest(operationContext()); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 1); + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kFindAndModify, + expectedSampledCmd); +} + +TEST_F(QueryAnalysisWriterTest, FindAndModifyQueryRemoveMarkedForSampling) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto [originalCmd, expectedSampledCmds] = + makeFindAndModifyCommandRequest(nss0, false /* isUpdate */, true /* markForSampling */); + ASSERT_EQ(expectedSampledCmds.size(), 1U); + auto [sampleId, expectedSampledCmd] = *expectedSampledCmds.begin(); + + writer.addFindAndModifyQuery(originalCmd).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 1); + writer.flushQueriesForTest(operationContext()); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 1); + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kFindAndModify, + expectedSampledCmd); +} + +TEST_F(QueryAnalysisWriterTest, MultipleQueriesAndCollections) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + // Make nss0 have one query. + auto [originalDeleteCmd, expectedSampledDeleteCmds] = + makeDeleteCommandRequest(nss1, 3, {1} /* markForSampling */); + ASSERT_EQ(expectedSampledDeleteCmds.size(), 1U); + auto [deleteSampleId, expectedSampledDeleteCmd] = *expectedSampledDeleteCmds.begin(); + + // Make nss1 have two queries. + auto [originalUpdateCmd, expectedSampledUpdateCmds] = + makeUpdateCommandRequest(nss0, 1, {0} /* markForSampling */); + ASSERT_EQ(expectedSampledUpdateCmds.size(), 1U); + auto [updateSampleId, expectedSampledUpdateCmd] = *expectedSampledUpdateCmds.begin(); + + auto countSampleId = UUID::gen(); + auto originalCountFilter = makeNonEmptyFilter(); + auto originalCountCollation = makeNonEmptyCollation(); + + writer.addDeleteQuery(originalDeleteCmd, 1).get(); + writer.addUpdateQuery(originalUpdateCmd, 0).get(); + writer.addCountQuery(countSampleId, nss1, originalCountFilter, originalCountCollation).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 3); + writer.flushQueriesForTest(operationContext()); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 1); + assertSampledWriteQueryDocument(deleteSampleId, + expectedSampledDeleteCmd.getNamespace(), + SampledWriteCommandNameEnum::kDelete, + expectedSampledDeleteCmd); + ASSERT_EQ(getSampledQueryDocumentsCount(nss1), 2); + assertSampledWriteQueryDocument(updateSampleId, + expectedSampledUpdateCmd.getNamespace(), + SampledWriteCommandNameEnum::kUpdate, + expectedSampledUpdateCmd); + assertSampledReadQueryDocument(countSampleId, + nss1, + SampledReadCommandNameEnum::kCount, + originalCountFilter, + originalCountCollation); +} + TEST_F(QueryAnalysisWriterTest, DuplicateQueries) { auto& writer = QueryAnalysisWriter::get(operationContext()); @@ -386,9 +689,10 @@ TEST_F(QueryAnalysisWriterTest, DuplicateQueries) { auto originalFindFilter = makeNonEmptyFilter(); auto originalFindCollation = makeNonEmptyCollation(); - auto distinctSampleId = UUID::gen(); - auto originalDistinctFilter = makeNonEmptyFilter(); - auto originalDistinctCollation = makeNonEmptyCollation(); + auto [originalUpdateCmd, expectedSampledUpdateCmds] = + makeUpdateCommandRequest(nss0, 1, {0} /* markForSampling */); + ASSERT_EQ(expectedSampledUpdateCmds.size(), 1U); + auto [updateSampleId, expectedSampledUpdateCmd] = *expectedSampledUpdateCmds.begin(); auto countSampleId = UUID::gen(); auto originalCountFilter = makeNonEmptyFilter(); @@ -406,9 +710,7 @@ TEST_F(QueryAnalysisWriterTest, DuplicateQueries) { originalFindFilter, originalFindCollation); - writer - .addDistinctQuery(distinctSampleId, nss0, originalDistinctFilter, originalDistinctCollation) - .get(); + writer.addUpdateQuery(originalUpdateCmd, 0).get(); writer.addFindQuery(findSampleId, nss0, originalFindFilter, originalFindCollation) .get(); // This is a duplicate. writer.addCountQuery(countSampleId, nss0, originalCountFilter, originalCountCollation).get(); @@ -417,11 +719,10 @@ TEST_F(QueryAnalysisWriterTest, DuplicateQueries) { ASSERT_EQ(writer.getQueriesCountForTest(), 0); ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 3); - assertSampledReadQueryDocument(distinctSampleId, - nss0, - SampledReadCommandNameEnum::kDistinct, - originalDistinctFilter, - originalDistinctCollation); + assertSampledWriteQueryDocument(updateSampleId, + expectedSampledUpdateCmd.getNamespace(), + SampledWriteCommandNameEnum::kUpdate, + expectedSampledUpdateCmd); assertSampledReadQueryDocument(findSampleId, nss0, SampledReadCommandNameEnum::kFind, @@ -511,6 +812,103 @@ TEST_F(QueryAnalysisWriterTest, FlushAfterAddReadIfExceedsSizeLimit) { sampleId1, nss1, SampledReadCommandNameEnum::kAggregate, filter1, collation1); } +TEST_F(QueryAnalysisWriterTest, FlushAfterAddUpdateIfExceedsSizeLimit) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto maxMemoryUsageBytes = 1024; + RAIIServerParameterControllerForTest maxMemoryBytes{"queryAnalysisWriterMaxMemoryUsageBytes", + maxMemoryUsageBytes}; + auto [originalCmd, expectedSampledCmds] = + makeUpdateCommandRequest(nss0, + 3, + {0, 2} /* markForSampling */, + std::string(maxMemoryUsageBytes / 2, 'a') /* filterFieldName */); + ASSERT_EQ(expectedSampledCmds.size(), 2U); + + writer.addUpdateQuery(originalCmd, 0).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 1); + // Adding the next query causes the size to exceed the limit. + writer.addUpdateQuery(originalCmd, 2).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 2); + for (const auto& [sampleId, expectedSampledCmd] : expectedSampledCmds) { + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kUpdate, + expectedSampledCmd); + } +} + +TEST_F(QueryAnalysisWriterTest, FlushAfterAddDeleteIfExceedsSizeLimit) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto maxMemoryUsageBytes = 1024; + RAIIServerParameterControllerForTest maxMemoryBytes{"queryAnalysisWriterMaxMemoryUsageBytes", + maxMemoryUsageBytes}; + auto [originalCmd, expectedSampledCmds] = + makeDeleteCommandRequest(nss0, + 3, + {0, 1} /* markForSampling */, + std::string(maxMemoryUsageBytes / 2, 'a') /* filterFieldName */); + ASSERT_EQ(expectedSampledCmds.size(), 2U); + + writer.addDeleteQuery(originalCmd, 0).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 1); + // Adding the next query causes the size to exceed the limit. + writer.addDeleteQuery(originalCmd, 1).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 2); + for (const auto& [sampleId, expectedSampledCmd] : expectedSampledCmds) { + assertSampledWriteQueryDocument(sampleId, + expectedSampledCmd.getNamespace(), + SampledWriteCommandNameEnum::kDelete, + expectedSampledCmd); + } +} + +TEST_F(QueryAnalysisWriterTest, FlushAfterAddFindAndModifyIfExceedsSizeLimit) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto maxMemoryUsageBytes = 1024; + RAIIServerParameterControllerForTest maxMemoryBytes{"queryAnalysisWriterMaxMemoryUsageBytes", + maxMemoryUsageBytes}; + + auto [originalCmd0, expectedSampledCmds0] = makeFindAndModifyCommandRequest( + nss0, + true /* isUpdate */, + true /* markForSampling */, + std::string(maxMemoryUsageBytes / 2, 'a') /* filterFieldName */); + ASSERT_EQ(expectedSampledCmds0.size(), 1U); + auto [sampleId0, expectedSampledCmd0] = *expectedSampledCmds0.begin(); + + auto [originalCmd1, expectedSampledCmds1] = makeFindAndModifyCommandRequest( + nss1, + false /* isUpdate */, + true /* markForSampling */, + std::string(maxMemoryUsageBytes / 2, 'b') /* filterFieldName */); + ASSERT_EQ(expectedSampledCmds0.size(), 1U); + auto [sampleId1, expectedSampledCmd1] = *expectedSampledCmds1.begin(); + + writer.addFindAndModifyQuery(originalCmd0).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 1); + // Adding the next query causes the size to exceed the limit. + writer.addFindAndModifyQuery(originalCmd1).get(); + ASSERT_EQ(writer.getQueriesCountForTest(), 0); + + ASSERT_EQ(getSampledQueryDocumentsCount(nss0), 1); + assertSampledWriteQueryDocument(sampleId0, + expectedSampledCmd0.getNamespace(), + SampledWriteCommandNameEnum::kFindAndModify, + expectedSampledCmd0); + ASSERT_EQ(getSampledQueryDocumentsCount(nss1), 1); + assertSampledWriteQueryDocument(sampleId1, + expectedSampledCmd1.getNamespace(), + SampledWriteCommandNameEnum::kFindAndModify, + expectedSampledCmd1); +} + TEST_F(QueryAnalysisWriterTest, AddQueriesBackAfterWriteError) { auto& writer = QueryAnalysisWriter::get(operationContext()); diff --git a/src/mongo/s/analyze_shard_key_common.idl b/src/mongo/s/analyze_shard_key_common.idl index c89776591e3..af352a8ff1e 100644 --- a/src/mongo/s/analyze_shard_key_common.idl +++ b/src/mongo/s/analyze_shard_key_common.idl @@ -49,6 +49,15 @@ enums: kCount: "count" kDistinct: "distinct" + SampledWriteCommandName: + description: "The command name of a sampled write query." + type: string + values: + kInsert: "insert" + kUpdate: "update" + kDelete: "delete" + kFindAndModify: "findAndModify" + structs: QueryAnalyzerConfiguration: description: "The query analyzer configuration for a collection as configured via the diff --git a/src/mongo/s/analyze_shard_key_documents.idl b/src/mongo/s/analyze_shard_key_documents.idl index 190d67d8cde..a3728d6c1cf 100644 --- a/src/mongo/s/analyze_shard_key_documents.idl +++ b/src/mongo/s/analyze_shard_key_documents.idl @@ -80,3 +80,24 @@ structs: cmd: type: object description: "The command object for the read." + + SampledWriteQueryDocument: + description: "Represents a document storing a sampled write query." + strict: false + fields: + _id: + type: uuid + description: "The unique sample id for the write." + cpp_name: sampleId + ns: + type: namespacestring + description: "The namespace of the collection for the write." + collectionUuid: + type: uuid + description: "The UUID of the collection for the write." + cmdName: + type: SampledWriteCommandName + description: "The command name for the write." + cmd: + type: object + description: "The command object for the write." |