diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml | 1 | ||||
-rw-r--r-- | jstests/sharding/analyze_shard_key/libs/query_sampling_util.js | 39 | ||||
-rw-r--r-- | jstests/sharding/analyze_shard_key/persist_sampled_diffs.js | 200 | ||||
-rw-r--r-- | jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js | 6 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/ops/update_request.h | 13 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_op_observer.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.h | 28 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer_test.cpp | 240 | ||||
-rw-r--r-- | src/mongo/s/analyze_shard_key_documents.idl | 18 |
16 files changed, 630 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index b0d9e24def3..b954796c1ca 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -86,6 +86,7 @@ selector: - jstests/sharding/move_primary_fails_without_database_version.js # Calls the config server primary directly (not through mongos) - jstests/sharding/analyze_shard_key/configure_query_analyzer_basic.js + - jstests/sharding/analyze_shard_key/persist_sampled_diffs.js - jstests/sharding/analyze_shard_key/persist_sampled_read_queries.js - jstests/sharding/analyze_shard_key/persist_sampled_write_queries.js - jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js diff --git a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js index 96897bf96e8..9f3e07d8e50 100644 --- a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js +++ b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js @@ -72,12 +72,49 @@ var QuerySamplingUtil = (function() { assert.eq(coll.find({ns}).itcount(), 0); } + /** + * Waits for the config.sampledQueriesDiff collection to have a document with _id equal to + * sampleId, and then asserts that the diff in that document matches one of the diffs in + * 'expectedSampledDiffs'. + */ + function assertSoonSingleSampledDiffDocument( + conn, sampleId, ns, collectionUuid, expectedSampledDiffs) { + const coll = conn.getCollection("config.sampledQueriesDiff"); + + assert.soon(() => { + const doc = coll.findOne({_id: sampleId}); + if (!doc) { + return false; + } + assert.eq(doc.ns, ns, doc); + assert.eq(doc.collectionUuid, collectionUuid, doc); + assert(expectedSampledDiffs.some(diff => { + return bsonUnorderedFieldsCompare(doc.diff, diff) === 0; + }), + doc); + return true; + }); + } + + function assertNoSampledDiffDocuments(conn, ns) { + const coll = conn.getCollection("config.sampledQueriesDiff"); + assert.eq(coll.find({ns: ns}).itcount(), 0); + } + + function clearSampledDiffCollection(primary) { + const coll = primary.getCollection("config.sampledQueriesDiff"); + assert.commandWorked(coll.remove({})); + } + return { getCollectionUuid, generateRandomString, generateRandomCollation, makeCmdObjIgnoreSessionInfo, assertSoonSampledQueryDocuments, - assertNoSampledQueryDocuments + assertNoSampledQueryDocuments, + assertSoonSingleSampledDiffDocument, + assertNoSampledDiffDocuments, + clearSampledDiffCollection }; })(); diff --git a/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js b/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js new file mode 100644 index 00000000000..77b34f52295 --- /dev/null +++ b/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js @@ -0,0 +1,200 @@ +/** + * Tests that shardsvr mongods support persisting diff for sampled write queries and non-shardsvr + * mongods don't support that. Specifically, tests that each write query on a shardsvr mongod + * generates at most one document regardless of the number of documents that it modifies. + * + * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey] + */ +(function() { +"use strict"; + +load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js"); + +const testCases = []; + +// multi=false update. +for (const updateType of ["modifier", "replacement", "pipeline"]) { + const preImageDocs = [{a: 1}]; + const postImageDocs = [{a: 2, b: 0}]; + const updateOp = (() => { + switch (updateType) { + case "modifier": + return {$mul: {a: 2}, $set: {b: 0}}; + case "replacement": + return {a: 2, b: 0}; + case "pipeline": + return [{$set: {a: 2}}, {$set: {b: 0}}]; + default: + throw "Unexpected update type"; + } + })(); + const makeCmdObjFuncs = [ + (collName) => { + const sampleId = UUID(); + const cmdObj = {findAndModify: collName, query: {a: 1}, update: updateOp, sampleId}; + return {sampleId, cmdObj}; + }, + (collName) => { + const sampleId = UUID(); + const cmdObj = { + update: collName, + updates: [{q: {a: 1}, u: updateOp, multi: false, sampleId}] + }; + return {sampleId, cmdObj}; + } + ]; + const expectedDiffs = [{a: 'u', b: 'i'}]; + + testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs}); +} + +// multi=true update. +for (const updateType of ["modifier", "pipeline"]) { + const preImageDocs = [{a: 0}, {a: 1}]; + const postImageDocs = [{a: 1, b: 0}, {a: 1, b: 0}]; + const updateOp = (() => { + switch (updateType) { + case "modifier": + return {$set: {a: 1, b: 0}}; + case "pipeline": + return [{$set: {a: 1}}, {$set: {b: 0}}]; + default: + throw "Unexpected update type"; + } + })(); + const makeCmdObjFuncs = [(collName) => { + const sampleId = UUID(); + const cmdObj = { + update: collName, + updates: [{q: {a: {$gte: 0}}, u: updateOp, multi: true, sampleId}] + }; + return {sampleId, cmdObj}; + }]; + const expectedDiffs = [{a: 'u', b: 'i'}, {b: 'i'}]; + + testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs}); +} + +// no diff. +for (const updateType of ["modifier", "replacement", "pipeline"]) { + const preImageDocs = [{a: 0}]; + const postImageDocs = [{a: 0}]; + const updateOp = (() => { + switch (updateType) { + case "modifier": + return {$mul: {a: 0}}; + case "replacement": + return {a: 0}; + case "pipeline": + return [{$set: {a: 0}}]; + default: + throw "Unexpected update type"; + } + })(); + const makeCmdObjFuncs = [(collName) => { + const sampleId = UUID(); + const cmdObj = { + update: collName, + updates: [{q: {a: 0}, u: updateOp, multi: false, sampleId}] + }; + return {sampleId, cmdObj}; + }]; + const expectedDiffs = []; + + testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs}); +} + +// Make the periodic job for writing sampled queries have a period of 1 second to speed up the test. +const queryAnalysisWriterIntervalSecs = 1; + +function testDiffs(rst, testCase, expectSampling) { + // If running on the config server, use "config" as the database name since it is illegal to + // create a user database on the config server. + const dbName = rst.isConfigRS ? "config" : "testDb"; + const collName = "testColl-" + QuerySamplingUtil.generateRandomString(); + const ns = dbName + "." + collName; + + const primary = rst.getPrimary(); + const db = primary.getDB(dbName); + const coll = db.getCollection(collName); + assert.commandWorked(db.createCollection(collName)); + const collectionUuid = QuerySamplingUtil.getCollectionUuid(db, collName); + + for (const makeCmdObjFunc of testCase.makeCmdObjFuncs) { + assert.commandWorked(coll.insert(testCase.preImageDocs)); + + const {sampleId, cmdObj} = makeCmdObjFunc(collName); + + jsTest.log(`Testing test case ${tojson({ + dbName, + collName, + preImageDocs: testCase.preImageDocs, + postImageDocs: testCase.postImageDocs, + updateType: testCase.updateType, + cmdObj + })}`); + const res = assert.commandWorked(db.runCommand(cmdObj)); + + const cmdName = Object.keys(cmdObj)[0]; + if (cmdName == "update") { + assert.eq(res.n, testCase.postImageDocs.length, res); + } else if (cmdName == "findAndModify") { + assert.eq(res.lastErrorObject.n, testCase.postImageDocs.length, res); + } else { + throw Error("Unknown command " + tojson(cmdObj)); + } + for (const postImageDoc of testCase.postImageDocs) { + assert.neq(coll.findOne(postImageDoc), null, coll.find().toArray()); + } + + if (expectSampling && testCase.expectedDiffs.length > 0) { + QuerySamplingUtil.assertSoonSingleSampledDiffDocument( + primary, sampleId, ns, collectionUuid, testCase.expectedDiffs); + } else { + // Wait for one interval before asserting to verify that the writes did not occur. + sleep(queryAnalysisWriterIntervalSecs * 1000); + QuerySamplingUtil.assertNoSampledDiffDocuments(primary, ns); + } + + assert.commandWorked(coll.remove({})); + QuerySamplingUtil.clearSampledDiffCollection(primary); + } +} + +{ + const st = new ShardingTest({ + shards: 1, + rs: {nodes: 2, setParameter: {queryAnalysisWriterIntervalSecs}}, + // There is no periodic job for writing sample queries on the non-shardsvr mongods but set + // it anyway to verify that no queries are sampled. + other: {configOptions: {setParameter: {queryAnalysisWriterIntervalSecs}}}, + }); + // It is illegal to create a user database on the config server. Set 'isConfigRS' to true to + // allow the test helper to know if it should use "config" as the name for the test database. + st.configRS.isConfigRS = true; + + for (const testCase of testCases) { + testDiffs(st.rs0, testCase, true /* expectSampling */); + testDiffs(st.configRS, testCase, false /* expectSampling */); + } + + st.stop(); +} + +{ + const rst = new ReplSetTest({ + nodes: 2, + // There is no periodic job for writing sample queries on the non-shardsvr mongods but set + // it anyway to verify that no queries are sampled. + setParameter: {queryAnalysisWriterIntervalSecs} + }); + rst.startSet(); + rst.initiate(); + + for (const testCase of testCases) { + testDiffs(rst, testCase, false /* expectSampling */); + } + + rst.stopSet(); +} +})(); diff --git a/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js b/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js index bb60ef3ae76..79f1aceb949 100644 --- a/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js +++ b/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js @@ -21,12 +21,14 @@ function testStepDown(rst) { assert.commandWorked(primaryDB.getCollection(collName).insert({a: 0})); const collectionUuid = QuerySamplingUtil.getCollectionUuid(primaryDB, collName); - const localWriteFp = configureFailPoint(primary, "hangQueryAnalysisWriterBeforeWritingLocally"); + const localWriteFp = + configureFailPoint(primary, "hangQueryAnalysisWriterBeforeWritingLocally", {}, {times: 1}); const originalCmdObj = {findAndModify: collName, query: {a: 0}, update: {a: 1}, sampleId: UUID()}; const expectedSampledQueryDocs = [{sampleId: originalCmdObj.sampleId, cmdName: "findAndModify", cmdObj: originalCmdObj}]; + const expectedDiff = {a: "u"}; assert.commandWorked(primaryDB.getCollection(collName).runCommand(originalCmdObj)); @@ -43,6 +45,8 @@ function testStepDown(rst) { // by stepdown. QuerySamplingUtil.assertSoonSampledQueryDocuments( primary, ns, collectionUuid, expectedSampledQueryDocs); + QuerySamplingUtil.assertSoonSingleSampledDiffDocument( + primary, originalCmdObj.sampleId, ns, collectionUuid, [expectedDiff]); } function testStepUp(rst) { diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 3fac44cff7a..76905a8fe6f 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -67,6 +67,9 @@ struct CollectionUpdateArgs { std::vector<StmtId> stmtIds = {kUninitializedStmtId}; + // The unique sample id for this update if it has been chosen for sampling. + boost::optional<UUID> sampleId; + // The document before modifiers were applied. boost::optional<BSONObj> preImageDoc; diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 6e9bf6d7489..ec70924da40 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -723,6 +723,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( if (opCtx->getTxnNumber()) { updateRequest.setStmtIds({stmtId}); } + updateRequest.setSampleId(req.getSampleId()); const ExtensionsCallbackReal extensionsCallback( opCtx, &updateRequest.getNamespaceString()); diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index ca7d4fb5ef5..a7ed460eb2f 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -239,6 +239,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, if (!request->explain()) { args.stmtIds = request->getStmtIds(); + args.sampleId = request->getSampleId(); args.update = logObj; if (_isUserInitiatedWrite) { auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire( diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 3ef5eecca2b..127f694b9d5 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -1179,6 +1179,8 @@ void setUpObservers(ServiceContext* serviceContext) { std::make_unique<OplogWriterTransactionProxy>(std::make_unique<OplogWriterImpl>()))); opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>()); opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>()); + opObserverRegistry->addObserver( + std::make_unique<analyze_shard_key::QueryAnalysisOpObserver>()); opObserverRegistry->addObserver(std::make_unique<repl::TenantMigrationDonorOpObserver>()); opObserverRegistry->addObserver( std::make_unique<repl::TenantMigrationRecipientOpObserver>()); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 4820d952398..dab24a3c482 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -198,6 +198,9 @@ const NamespaceString NamespaceString::kConfigQueryAnalyzersNamespace(NamespaceS const NamespaceString NamespaceString::kConfigSampledQueriesNamespace(NamespaceString::kConfigDb, "sampledQueries"); +const NamespaceString NamespaceString::kConfigSampledQueriesDiffNamespace( + NamespaceString::kConfigDb, "sampledQueriesDiff"); + NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) { if (!gMultitenancySupport) { return NamespaceString(boost::none, ns); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index c982ba882bd..3c746ad0ddf 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -276,6 +276,9 @@ public: // Namespace used for storing sampled queries. static const NamespaceString kConfigSampledQueriesNamespace; + // Namespace used for storing the diffs for sampled update queries. + static const NamespaceString kConfigSampledQueriesDiffNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index ed775c9e15e..e635ce7f156 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -76,7 +76,7 @@ public: }; UpdateRequest(const write_ops::UpdateOpEntry& updateOp = write_ops::UpdateOpEntry()) - : _updateOp(updateOp) {} + : _updateOp(updateOp), _sampleId(updateOp.getSampleId()) {} void setNamespaceString(const NamespaceString& nsString) { _nsString = nsString; @@ -257,6 +257,14 @@ public: return _stmtIds; } + void setSampleId(boost::optional<UUID> sampleId) { + _sampleId = sampleId; + } + + const boost::optional<UUID>& getSampleId() const { + return _sampleId; + } + std::string toString() const { StringBuilder builder; builder << " query: " << getQuery(); @@ -314,6 +322,9 @@ private: // The statement ids of this request. std::vector<StmtId> _stmtIds = {kUninitializedStmtId}; + // The unique sample id for this request if it has been chosen for sampling. + boost::optional<UUID> _sampleId; + // Flags controlling the update. // God bypasses _id checking and index generation. It is only used on behalf of system diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp index 84606c7f4c7..658c50c992d 100644 --- a/src/mongo/db/s/query_analysis_op_observer.cpp +++ b/src/mongo/db/s/query_analysis_op_observer.cpp @@ -31,6 +31,7 @@ #include "mongo/db/s/query_analysis_coordinator.h" #include "mongo/db/s/query_analysis_op_observer.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/logv2/log.h" #include "mongo/s/analyze_shard_key_util.h" #include "mongo/s/catalog/type_mongos.h" @@ -88,6 +89,17 @@ void QueryAnalysisOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdat }); } } + + if (analyze_shard_key::supportsPersistingSampledQueries() && args.updateArgs->sampleId && + args.updateArgs->preImageDoc && opCtx->writesAreReplicated()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addDiff(*args.updateArgs->sampleId, + args.coll->ns(), + args.coll->uuid(), + *args.updateArgs->preImageDoc, + args.updateArgs->updatedDoc) + .getAsync([](auto) {}); + } } void QueryAnalysisOpObserver::aboutToDelete(OperationContext* opCtx, diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp index 3861c48c464..19b942e1775 100644 --- a/src/mongo/db/s/query_analysis_writer.cpp +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -35,6 +35,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/server_options.h" +#include "mongo/db/update/document_diff_calculator.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" @@ -274,6 +275,19 @@ void QueryAnalysisWriter::onStartup() { _periodicQueryWriter = periodicRunner->makeJob(std::move(QueryWriterJob)); _periodicQueryWriter.start(); + PeriodicRunner::PeriodicJob diffWriterJob( + "QueryAnalysisDiffWriter", + [this](Client* client) { + if (MONGO_unlikely(disableQueryAnalysisWriter.shouldFail())) { + return; + } + auto opCtx = client->makeOperationContext(); + _flushDiffs(opCtx.get()); + }, + Seconds(gQueryAnalysisWriterIntervalSecs)); + _periodicDiffWriter = periodicRunner->makeJob(std::move(diffWriterJob)); + _periodicDiffWriter.start(); + ThreadPool::Options threadPoolOptions; threadPoolOptions.maxThreads = gQueryAnalysisWriterMaxThreadPoolSize; threadPoolOptions.minThreads = gQueryAnalysisWriterMinThreadPoolSize; @@ -296,6 +310,9 @@ void QueryAnalysisWriter::onShutdown() { if (_periodicQueryWriter.isValid()) { _periodicQueryWriter.stop(); } + if (_periodicDiffWriter.isValid()) { + _periodicDiffWriter.stop(); + } } void QueryAnalysisWriter::_flushQueries(OperationContext* opCtx) { @@ -308,6 +325,16 @@ void QueryAnalysisWriter::_flushQueries(OperationContext* opCtx) { } } +void QueryAnalysisWriter::_flushDiffs(OperationContext* opCtx) { + try { + _flush(opCtx, NamespaceString::kConfigSampledQueriesDiffNamespace, &_diffs); + } catch (DBException& ex) { + LOGV2(7075400, + "Failed to flush diffs, will try again at the next interval", + "error"_attr = redact(ex)); + } +} + void QueryAnalysisWriter::_flush(OperationContext* opCtx, const NamespaceString& ns, Buffer* buffer) { @@ -427,7 +454,7 @@ void QueryAnalysisWriter::Buffer::truncate(size_t index, long long numBytes) { bool QueryAnalysisWriter::_exceedsMaxSizeBytes() { stdx::lock_guard<Latch> lk(_mutex); - return _queries.getSize() >= gQueryAnalysisWriterMaxMemoryUsageBytes.load(); + return _queries.getSize() + _diffs.getSize() >= gQueryAnalysisWriterMaxMemoryUsageBytes.load(); } ExecutorFuture<void> QueryAnalysisWriter::addFindQuery(const UUID& sampleId, @@ -629,5 +656,40 @@ ExecutorFuture<void> QueryAnalysisWriter::addFindAndModifyQuery( }); } +ExecutorFuture<void> QueryAnalysisWriter::addDiff(const UUID& sampleId, + const NamespaceString& nss, + const UUID& collUuid, + const BSONObj& preImage, + const BSONObj& postImage) { + invariant(_executor); + return ExecutorFuture<void>(_executor) + .then([this, + sampleId, + nss, + collUuid, + preImage = preImage.getOwned(), + postImage = postImage.getOwned()]() { + auto diff = doc_diff::computeInlineDiff(preImage, postImage); + + if (!diff || diff->isEmpty()) { + return; + } + + auto doc = SampledQueryDiffDocument{sampleId, nss, collUuid, std::move(*diff)}; + stdx::lock_guard<Latch> lk(_mutex); + _diffs.add(doc.toBSON()); + }) + .then([this] { + if (_exceedsMaxSizeBytes()) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + _flushDiffs(opCtx); + } + }) + .onError([this, nss](Status status) { + LOGV2(7075401, "Failed to add diff", "ns"_attr = nss, "error"_attr = redact(status)); + }); +} + } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h index 76ae137273a..d93f187fede 100644 --- a/src/mongo/db/s/query_analysis_writer.h +++ b/src/mongo/db/s/query_analysis_writer.h @@ -44,10 +44,17 @@ namespace analyze_shard_key { * Owns the machinery for persisting sampled queries. That consists of the following: * - The buffer that stores sampled queries and the periodic background job that inserts those * queries into the local config.sampledQueries collection. + * - The buffer that stores diffs for sampled update queries and the periodic background job that + * inserts those diffs into the local config.sampledQueriesDiff collection. * * Currently, query sampling is only supported on a sharded cluster. So a writer must be a shardsvr * mongod. If the mongod is a primary, it will execute the insert commands locally. If it is a * secondary, it will perform the insert commands against the primary. + * + * The memory usage of the buffers is controlled by the 'queryAnalysisWriterMaxMemoryUsageBytes' + * server parameter. Upon adding a query or a diff that causes the total size of buffers to exceed + * the limit, the writer will flush the corresponding buffer immediately instead of waiting for it + * to get flushed later by the periodic job. */ class QueryAnalysisWriter final : public std::enable_shared_from_this<QueryAnalysisWriter> { QueryAnalysisWriter(const QueryAnalysisWriter&) = delete; @@ -137,6 +144,12 @@ public: ExecutorFuture<void> addFindAndModifyQuery( const write_ops::FindAndModifyCommandRequest& findAndModifyCmd); + ExecutorFuture<void> addDiff(const UUID& sampleId, + const NamespaceString& nss, + const UUID& collUuid, + const BSONObj& preImage, + const BSONObj& postImage); + int getQueriesCountForTest() const { stdx::lock_guard<Latch> lk(_mutex); return _queries.getCount(); @@ -146,6 +159,15 @@ public: _flushQueries(opCtx); } + int getDiffsCountForTest() const { + stdx::lock_guard<Latch> lk(_mutex); + return _diffs.getCount(); + } + + void flushDiffsForTest(OperationContext* opCtx) { + _flushDiffs(opCtx); + } + private: ExecutorFuture<void> _addReadQuery(const UUID& sampleId, const NamespaceString& nss, @@ -154,6 +176,7 @@ private: const BSONObj& collation); void _flushQueries(OperationContext* opCtx); + void _flushDiffs(OperationContext* opCtx); /** * The helper for '_flushQueries'. Inserts the documents in 'buffer' into the collection 'ns' @@ -162,6 +185,8 @@ private: * they are expected for the following reasons: * - For the query buffer, a sampled query that is idempotent (e.g. a read or retryable write) * could get added to the buffer (across nodes) more than once due to retries. + * - For the diff buffer, a sampled multi-update query could end up generating multiple diffs + * and each diff is identified using the sample id of the sampled query that creates it. * * Throws an error if the inserts fail with any other error. */ @@ -178,6 +203,9 @@ private: PeriodicJobAnchor _periodicQueryWriter; Buffer _queries; + PeriodicJobAnchor _periodicDiffWriter; + Buffer _diffs; + // Initialized on startup and joined on shutdown. std::shared_ptr<executor::TaskExecutor> _executor; }; diff --git a/src/mongo/db/s/query_analysis_writer_test.cpp b/src/mongo/db/s/query_analysis_writer_test.cpp index 3c78afab971..b17f39c82df 100644 --- a/src/mongo/db/s/query_analysis_writer_test.cpp +++ b/src/mongo/db/s/query_analysis_writer_test.cpp @@ -157,6 +157,11 @@ DEATH_TEST(QueryAnalysisWriterBufferTest, TruncateInvalidSize_Positive, "invaria buffer.truncate(0, doc.objsize() * 2); } +void assertBsonObjEqualUnordered(const BSONObj& lhs, const BSONObj& rhs) { + UnorderedFieldsBSONObjComparator comparator; + ASSERT_EQ(comparator.compare(lhs, rhs), 0); +} + struct QueryAnalysisWriterTest : public ShardServerTestFixture { public: void setUp() { @@ -373,6 +378,32 @@ protected: ASSERT_BSONOBJ_EQ(parsedCmd.toBSON({}), expectedCmd.toBSON({})); } + /* + * Returns the number of the documents for the collection 'nss' in the config.sampledQueriesDiff + * collection. + */ + int getDiffDocumentsCount(const NamespaceString& nss) { + return _getConfigDocumentsCount(NamespaceString::kConfigSampledQueriesDiffNamespace, nss); + } + + /* + * Asserts that there is a sampled diff document with the given sample id and that it has + * the given fields. + */ + void assertDiffDocument(const UUID& sampleId, + const NamespaceString& nss, + const BSONObj& expectedDiff) { + auto doc = + _getConfigDocument(NamespaceString::kConfigSampledQueriesDiffNamespace, sampleId); + auto parsedDiffDoc = + SampledQueryDiffDocument::parse(IDLParserContext("QueryAnalysisWriterTest"), doc); + + ASSERT_EQ(parsedDiffDoc.getNs(), nss); + ASSERT_EQ(parsedDiffDoc.getCollectionUuid(), getCollectionUUID(nss)); + ASSERT_EQ(parsedDiffDoc.getSampleId(), sampleId); + assertBsonObjEqualUnordered(parsedDiffDoc.getDiff(), expectedDiff); + } + const NamespaceString nss0{"testDb", "testColl0"}; const NamespaceString nss1{"testDb", "testColl1"}; @@ -1036,6 +1067,215 @@ TEST_F(QueryAnalysisWriterTest, RemoveDuplicatesFromBufferAfterWriteError) { } } +TEST_F(QueryAnalysisWriterTest, NoDiffs) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + writer.flushQueriesForTest(operationContext()); +} + +TEST_F(QueryAnalysisWriterTest, DiffsBasic) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto collUuid0 = getCollectionUUID(nss0); + auto sampleId = UUID::gen(); + auto preImage = BSON("a" << 0); + auto postImage = BSON("a" << 1); + + writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 1); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 1); + assertDiffDocument(sampleId, nss0, *doc_diff::computeInlineDiff(preImage, postImage)); +} + +TEST_F(QueryAnalysisWriterTest, DiffsMultipleQueriesAndCollections) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + // Make nss0 have a diff for one query. + auto collUuid0 = getCollectionUUID(nss0); + + auto sampleId0 = UUID::gen(); + auto preImage0 = BSON("a" << 0 << "b" << 0 << "c" << 0); + auto postImage0 = BSON("a" << 0 << "b" << 1 << "d" << 1); + + // Make nss1 have diffs for two queries. + auto collUuid1 = getCollectionUUID(nss1); + + auto sampleId1 = UUID::gen(); + auto preImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1) << "d" << BSON("e" << 1)); + auto postImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1 << 2) << "d" << BSON("e" << 2)); + + auto sampleId2 = UUID::gen(); + auto preImage2 = BSON("a" << BSONObj()); + auto postImage2 = BSON("a" << BSON("b" << 2)); + + writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get(); + writer.addDiff(sampleId1, nss1, collUuid1, preImage1, postImage1).get(); + writer.addDiff(sampleId2, nss1, collUuid1, preImage2, postImage2).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 3); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 1); + assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0)); + + ASSERT_EQ(getDiffDocumentsCount(nss1), 2); + assertDiffDocument(sampleId1, nss1, *doc_diff::computeInlineDiff(preImage1, postImage1)); + assertDiffDocument(sampleId2, nss1, *doc_diff::computeInlineDiff(preImage2, postImage2)); +} + +TEST_F(QueryAnalysisWriterTest, DuplicateDiffs) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto collUuid0 = getCollectionUUID(nss0); + + auto sampleId0 = UUID::gen(); + auto preImage0 = BSON("a" << 0); + auto postImage0 = BSON("a" << 1); + + auto sampleId1 = UUID::gen(); + auto preImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1)); + auto postImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1 << 2)); + + auto sampleId2 = UUID::gen(); + auto preImage2 = BSON("a" << BSONObj()); + auto postImage2 = BSON("a" << BSON("b" << 2)); + + writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 1); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 1); + assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0)); + + writer.addDiff(sampleId1, nss0, collUuid0, preImage1, postImage1).get(); + writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0) + .get(); // This is a duplicate. + writer.addDiff(sampleId2, nss0, collUuid0, preImage2, postImage2).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 3); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 3); + assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0)); + assertDiffDocument(sampleId1, nss0, *doc_diff::computeInlineDiff(preImage1, postImage1)); + assertDiffDocument(sampleId2, nss0, *doc_diff::computeInlineDiff(preImage2, postImage2)); +} + +TEST_F(QueryAnalysisWriterTest, DiffsMultipleBatches_MaxBatchSize) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + RAIIServerParameterControllerForTest maxBatchSize{"queryAnalysisWriterMaxBatchSize", 2}; + auto numDiffs = 5; + auto collUuid0 = getCollectionUUID(nss0); + + std::vector<std::pair<UUID, BSONObj>> expectedSampledDiffs; + for (auto i = 0; i < numDiffs; i++) { + auto sampleId = UUID::gen(); + auto preImage = BSON("a" << 0); + auto postImage = BSON(("a" + std::to_string(i)) << 1); + writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get(); + expectedSampledDiffs.push_back( + {sampleId, *doc_diff::computeInlineDiff(preImage, postImage)}); + } + ASSERT_EQ(writer.getDiffsCountForTest(), numDiffs); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), numDiffs); + for (const auto& [sampleId, diff] : expectedSampledDiffs) { + assertDiffDocument(sampleId, nss0, diff); + } +} + +TEST_F(QueryAnalysisWriterTest, DiffsMultipleBatches_MaxBSONObjSize) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto numDiffs = 3; + auto collUuid0 = getCollectionUUID(nss0); + + std::vector<std::pair<UUID, BSONObj>> expectedSampledDiffs; + for (auto i = 0; i < numDiffs; i++) { + auto sampleId = UUID::gen(); + auto preImage = BSON("a" << 0); + auto postImage = BSON(std::string(BSONObjMaxUserSize / 2, 'a') << 1); + writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get(); + expectedSampledDiffs.push_back( + {sampleId, *doc_diff::computeInlineDiff(preImage, postImage)}); + } + ASSERT_EQ(writer.getDiffsCountForTest(), numDiffs); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), numDiffs); + for (const auto& [sampleId, diff] : expectedSampledDiffs) { + assertDiffDocument(sampleId, nss0, diff); + } +} + +TEST_F(QueryAnalysisWriterTest, FlushAfterAddDiffIfExceedsSizeLimit) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto maxMemoryUsageBytes = 1024; + RAIIServerParameterControllerForTest maxMemoryBytes{"queryAnalysisWriterMaxMemoryUsageBytes", + maxMemoryUsageBytes}; + + auto collUuid0 = getCollectionUUID(nss0); + auto sampleId0 = UUID::gen(); + auto preImage0 = BSON("a" << 0); + auto postImage0 = BSON(std::string(maxMemoryUsageBytes / 2, 'a') << 1); + + auto collUuid1 = getCollectionUUID(nss1); + auto sampleId1 = UUID::gen(); + auto preImage1 = BSON("a" << 0); + auto postImage1 = BSON(std::string(maxMemoryUsageBytes / 2, 'b') << 1); + + writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 1); + // Adding the next diff causes the size to exceed the limit. + writer.addDiff(sampleId1, nss1, collUuid1, preImage1, postImage1).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 1); + assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0)); + ASSERT_EQ(getDiffDocumentsCount(nss1), 1); + assertDiffDocument(sampleId1, nss1, *doc_diff::computeInlineDiff(preImage1, postImage1)); +} + +TEST_F(QueryAnalysisWriterTest, DiffEmpty) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto collUuid0 = getCollectionUUID(nss0); + auto sampleId = UUID::gen(); + auto preImage = BSON("a" << 1); + auto postImage = preImage; + + writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 0); +} + +TEST_F(QueryAnalysisWriterTest, DiffExceedsSizeLimit) { + auto& writer = QueryAnalysisWriter::get(operationContext()); + + auto collUuid0 = getCollectionUUID(nss0); + auto sampleId = UUID::gen(); + auto preImage = BSON(std::string(BSONObjMaxUserSize, 'a') << 1); + auto postImage = BSONObj(); + + writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get(); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + writer.flushDiffsForTest(operationContext()); + ASSERT_EQ(writer.getDiffsCountForTest(), 0); + + ASSERT_EQ(getDiffDocumentsCount(nss0), 0); +} + } // namespace } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/s/analyze_shard_key_documents.idl b/src/mongo/s/analyze_shard_key_documents.idl index a3728d6c1cf..b1275858bd3 100644 --- a/src/mongo/s/analyze_shard_key_documents.idl +++ b/src/mongo/s/analyze_shard_key_documents.idl @@ -101,3 +101,21 @@ structs: cmd: type: object description: "The command object for the write." + + SampledQueryDiffDocument: + description: "Represents a document storing the diff for a sampled write query." + strict: false + fields: + _id: + type: uuid + description: "The unique sample id for the write." + cpp_name: sampleId + ns: + type: namespacestring + description: "The namespace of the collection for the write." + collectionUuid: + type: uuid + description: "The UUID of the collection for the write." + diff: + type: object + description: "The diff for the write." |