summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-11-02 01:40:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-02 02:34:26 +0000
commit64c4305a9b1f4149c08c3eb91d31e749d24d2e56 (patch)
tree55eb46039d4584d9eec4ffe52f43e267bf3251f3
parent35e593da151e7bc08bb5409b9d770942431d64ae (diff)
downloadmongo-64c4305a9b1f4149c08c3eb91d31e749d24d2e56.tar.gz
SERVER-70754 Make shards support persisting diffs for sampled update queries
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml1
-rw-r--r--jstests/sharding/analyze_shard_key/libs/query_sampling_util.js39
-rw-r--r--jstests/sharding/analyze_shard_key/persist_sampled_diffs.js200
-rw-r--r--jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js6
-rw-r--r--src/mongo/db/catalog/collection.h3
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp1
-rw-r--r--src/mongo/db/exec/update_stage.cpp1
-rw-r--r--src/mongo/db/mongod_main.cpp2
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/ops/update_request.h13
-rw-r--r--src/mongo/db/s/query_analysis_op_observer.cpp12
-rw-r--r--src/mongo/db/s/query_analysis_writer.cpp64
-rw-r--r--src/mongo/db/s/query_analysis_writer.h28
-rw-r--r--src/mongo/db/s/query_analysis_writer_test.cpp240
-rw-r--r--src/mongo/s/analyze_shard_key_documents.idl18
16 files changed, 630 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index b0d9e24def3..b954796c1ca 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -86,6 +86,7 @@ selector:
- jstests/sharding/move_primary_fails_without_database_version.js
# Calls the config server primary directly (not through mongos)
- jstests/sharding/analyze_shard_key/configure_query_analyzer_basic.js
+ - jstests/sharding/analyze_shard_key/persist_sampled_diffs.js
- jstests/sharding/analyze_shard_key/persist_sampled_read_queries.js
- jstests/sharding/analyze_shard_key/persist_sampled_write_queries.js
- jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js
diff --git a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
index 96897bf96e8..9f3e07d8e50 100644
--- a/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
+++ b/jstests/sharding/analyze_shard_key/libs/query_sampling_util.js
@@ -72,12 +72,49 @@ var QuerySamplingUtil = (function() {
assert.eq(coll.find({ns}).itcount(), 0);
}
+ /**
+ * Waits for the config.sampledQueriesDiff collection to have a document with _id equal to
+ * sampleId, and then asserts that the diff in that document matches one of the diffs in
+ * 'expectedSampledDiffs'.
+ */
+ function assertSoonSingleSampledDiffDocument(
+ conn, sampleId, ns, collectionUuid, expectedSampledDiffs) {
+ const coll = conn.getCollection("config.sampledQueriesDiff");
+
+ assert.soon(() => {
+ const doc = coll.findOne({_id: sampleId});
+ if (!doc) {
+ return false;
+ }
+ assert.eq(doc.ns, ns, doc);
+ assert.eq(doc.collectionUuid, collectionUuid, doc);
+ assert(expectedSampledDiffs.some(diff => {
+ return bsonUnorderedFieldsCompare(doc.diff, diff) === 0;
+ }),
+ doc);
+ return true;
+ });
+ }
+
+ function assertNoSampledDiffDocuments(conn, ns) {
+ const coll = conn.getCollection("config.sampledQueriesDiff");
+ assert.eq(coll.find({ns: ns}).itcount(), 0);
+ }
+
+ function clearSampledDiffCollection(primary) {
+ const coll = primary.getCollection("config.sampledQueriesDiff");
+ assert.commandWorked(coll.remove({}));
+ }
+
return {
getCollectionUuid,
generateRandomString,
generateRandomCollation,
makeCmdObjIgnoreSessionInfo,
assertSoonSampledQueryDocuments,
- assertNoSampledQueryDocuments
+ assertNoSampledQueryDocuments,
+ assertSoonSingleSampledDiffDocument,
+ assertNoSampledDiffDocuments,
+ clearSampledDiffCollection
};
})();
diff --git a/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js b/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js
new file mode 100644
index 00000000000..77b34f52295
--- /dev/null
+++ b/jstests/sharding/analyze_shard_key/persist_sampled_diffs.js
@@ -0,0 +1,200 @@
+/**
+ * Tests that shardsvr mongods support persisting diff for sampled write queries and non-shardsvr
+ * mongods don't support that. Specifically, tests that each write query on a shardsvr mongod
+ * generates at most one document regardless of the number of documents that it modifies.
+ *
+ * @tags: [requires_fcv_62, featureFlagAnalyzeShardKey]
+ */
+(function() {
+"use strict";
+
+load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
+
+const testCases = [];
+
+// multi=false update.
+for (const updateType of ["modifier", "replacement", "pipeline"]) {
+ const preImageDocs = [{a: 1}];
+ const postImageDocs = [{a: 2, b: 0}];
+ const updateOp = (() => {
+ switch (updateType) {
+ case "modifier":
+ return {$mul: {a: 2}, $set: {b: 0}};
+ case "replacement":
+ return {a: 2, b: 0};
+ case "pipeline":
+ return [{$set: {a: 2}}, {$set: {b: 0}}];
+ default:
+ throw "Unexpected update type";
+ }
+ })();
+ const makeCmdObjFuncs = [
+ (collName) => {
+ const sampleId = UUID();
+ const cmdObj = {findAndModify: collName, query: {a: 1}, update: updateOp, sampleId};
+ return {sampleId, cmdObj};
+ },
+ (collName) => {
+ const sampleId = UUID();
+ const cmdObj = {
+ update: collName,
+ updates: [{q: {a: 1}, u: updateOp, multi: false, sampleId}]
+ };
+ return {sampleId, cmdObj};
+ }
+ ];
+ const expectedDiffs = [{a: 'u', b: 'i'}];
+
+ testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs});
+}
+
+// multi=true update.
+for (const updateType of ["modifier", "pipeline"]) {
+ const preImageDocs = [{a: 0}, {a: 1}];
+ const postImageDocs = [{a: 1, b: 0}, {a: 1, b: 0}];
+ const updateOp = (() => {
+ switch (updateType) {
+ case "modifier":
+ return {$set: {a: 1, b: 0}};
+ case "pipeline":
+ return [{$set: {a: 1}}, {$set: {b: 0}}];
+ default:
+ throw "Unexpected update type";
+ }
+ })();
+ const makeCmdObjFuncs = [(collName) => {
+ const sampleId = UUID();
+ const cmdObj = {
+ update: collName,
+ updates: [{q: {a: {$gte: 0}}, u: updateOp, multi: true, sampleId}]
+ };
+ return {sampleId, cmdObj};
+ }];
+ const expectedDiffs = [{a: 'u', b: 'i'}, {b: 'i'}];
+
+ testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs});
+}
+
+// no diff.
+for (const updateType of ["modifier", "replacement", "pipeline"]) {
+ const preImageDocs = [{a: 0}];
+ const postImageDocs = [{a: 0}];
+ const updateOp = (() => {
+ switch (updateType) {
+ case "modifier":
+ return {$mul: {a: 0}};
+ case "replacement":
+ return {a: 0};
+ case "pipeline":
+ return [{$set: {a: 0}}];
+ default:
+ throw "Unexpected update type";
+ }
+ })();
+ const makeCmdObjFuncs = [(collName) => {
+ const sampleId = UUID();
+ const cmdObj = {
+ update: collName,
+ updates: [{q: {a: 0}, u: updateOp, multi: false, sampleId}]
+ };
+ return {sampleId, cmdObj};
+ }];
+ const expectedDiffs = [];
+
+ testCases.push({preImageDocs, postImageDocs, updateType, makeCmdObjFuncs, expectedDiffs});
+}
+
+// Make the periodic job for writing sampled queries have a period of 1 second to speed up the test.
+const queryAnalysisWriterIntervalSecs = 1;
+
+function testDiffs(rst, testCase, expectSampling) {
+ // If running on the config server, use "config" as the database name since it is illegal to
+ // create a user database on the config server.
+ const dbName = rst.isConfigRS ? "config" : "testDb";
+ const collName = "testColl-" + QuerySamplingUtil.generateRandomString();
+ const ns = dbName + "." + collName;
+
+ const primary = rst.getPrimary();
+ const db = primary.getDB(dbName);
+ const coll = db.getCollection(collName);
+ assert.commandWorked(db.createCollection(collName));
+ const collectionUuid = QuerySamplingUtil.getCollectionUuid(db, collName);
+
+ for (const makeCmdObjFunc of testCase.makeCmdObjFuncs) {
+ assert.commandWorked(coll.insert(testCase.preImageDocs));
+
+ const {sampleId, cmdObj} = makeCmdObjFunc(collName);
+
+ jsTest.log(`Testing test case ${tojson({
+ dbName,
+ collName,
+ preImageDocs: testCase.preImageDocs,
+ postImageDocs: testCase.postImageDocs,
+ updateType: testCase.updateType,
+ cmdObj
+ })}`);
+ const res = assert.commandWorked(db.runCommand(cmdObj));
+
+ const cmdName = Object.keys(cmdObj)[0];
+ if (cmdName == "update") {
+ assert.eq(res.n, testCase.postImageDocs.length, res);
+ } else if (cmdName == "findAndModify") {
+ assert.eq(res.lastErrorObject.n, testCase.postImageDocs.length, res);
+ } else {
+ throw Error("Unknown command " + tojson(cmdObj));
+ }
+ for (const postImageDoc of testCase.postImageDocs) {
+ assert.neq(coll.findOne(postImageDoc), null, coll.find().toArray());
+ }
+
+ if (expectSampling && testCase.expectedDiffs.length > 0) {
+ QuerySamplingUtil.assertSoonSingleSampledDiffDocument(
+ primary, sampleId, ns, collectionUuid, testCase.expectedDiffs);
+ } else {
+ // Wait for one interval before asserting to verify that the writes did not occur.
+ sleep(queryAnalysisWriterIntervalSecs * 1000);
+ QuerySamplingUtil.assertNoSampledDiffDocuments(primary, ns);
+ }
+
+ assert.commandWorked(coll.remove({}));
+ QuerySamplingUtil.clearSampledDiffCollection(primary);
+ }
+}
+
+{
+ const st = new ShardingTest({
+ shards: 1,
+ rs: {nodes: 2, setParameter: {queryAnalysisWriterIntervalSecs}},
+ // There is no periodic job for writing sample queries on the non-shardsvr mongods but set
+ // it anyway to verify that no queries are sampled.
+ other: {configOptions: {setParameter: {queryAnalysisWriterIntervalSecs}}},
+ });
+ // It is illegal to create a user database on the config server. Set 'isConfigRS' to true to
+ // allow the test helper to know if it should use "config" as the name for the test database.
+ st.configRS.isConfigRS = true;
+
+ for (const testCase of testCases) {
+ testDiffs(st.rs0, testCase, true /* expectSampling */);
+ testDiffs(st.configRS, testCase, false /* expectSampling */);
+ }
+
+ st.stop();
+}
+
+{
+ const rst = new ReplSetTest({
+ nodes: 2,
+ // There is no periodic job for writing sample queries on the non-shardsvr mongods but set
+ // it anyway to verify that no queries are sampled.
+ setParameter: {queryAnalysisWriterIntervalSecs}
+ });
+ rst.startSet();
+ rst.initiate();
+
+ for (const testCase of testCases) {
+ testDiffs(rst, testCase, false /* expectSampling */);
+ }
+
+ rst.stopSet();
+}
+})();
diff --git a/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js b/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js
index bb60ef3ae76..79f1aceb949 100644
--- a/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js
+++ b/jstests/sharding/analyze_shard_key/persist_sampled_queries_failover.js
@@ -21,12 +21,14 @@ function testStepDown(rst) {
assert.commandWorked(primaryDB.getCollection(collName).insert({a: 0}));
const collectionUuid = QuerySamplingUtil.getCollectionUuid(primaryDB, collName);
- const localWriteFp = configureFailPoint(primary, "hangQueryAnalysisWriterBeforeWritingLocally");
+ const localWriteFp =
+ configureFailPoint(primary, "hangQueryAnalysisWriterBeforeWritingLocally", {}, {times: 1});
const originalCmdObj =
{findAndModify: collName, query: {a: 0}, update: {a: 1}, sampleId: UUID()};
const expectedSampledQueryDocs =
[{sampleId: originalCmdObj.sampleId, cmdName: "findAndModify", cmdObj: originalCmdObj}];
+ const expectedDiff = {a: "u"};
assert.commandWorked(primaryDB.getCollection(collName).runCommand(originalCmdObj));
@@ -43,6 +45,8 @@ function testStepDown(rst) {
// by stepdown.
QuerySamplingUtil.assertSoonSampledQueryDocuments(
primary, ns, collectionUuid, expectedSampledQueryDocs);
+ QuerySamplingUtil.assertSoonSingleSampledDiffDocument(
+ primary, originalCmdObj.sampleId, ns, collectionUuid, [expectedDiff]);
}
function testStepUp(rst) {
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 3fac44cff7a..76905a8fe6f 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -67,6 +67,9 @@ struct CollectionUpdateArgs {
std::vector<StmtId> stmtIds = {kUninitializedStmtId};
+ // The unique sample id for this update if it has been chosen for sampling.
+ boost::optional<UUID> sampleId;
+
// The document before modifiers were applied.
boost::optional<BSONObj> preImageDoc;
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 6e9bf6d7489..ec70924da40 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -723,6 +723,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun(
if (opCtx->getTxnNumber()) {
updateRequest.setStmtIds({stmtId});
}
+ updateRequest.setSampleId(req.getSampleId());
const ExtensionsCallbackReal extensionsCallback(
opCtx, &updateRequest.getNamespaceString());
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index ca7d4fb5ef5..a7ed460eb2f 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -239,6 +239,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
if (!request->explain()) {
args.stmtIds = request->getStmtIds();
+ args.sampleId = request->getSampleId();
args.update = logObj;
if (_isUserInitiatedWrite) {
auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 3ef5eecca2b..127f694b9d5 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -1179,6 +1179,8 @@ void setUpObservers(ServiceContext* serviceContext) {
std::make_unique<OplogWriterTransactionProxy>(std::make_unique<OplogWriterImpl>())));
opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
+ opObserverRegistry->addObserver(
+ std::make_unique<analyze_shard_key::QueryAnalysisOpObserver>());
opObserverRegistry->addObserver(std::make_unique<repl::TenantMigrationDonorOpObserver>());
opObserverRegistry->addObserver(
std::make_unique<repl::TenantMigrationRecipientOpObserver>());
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 4820d952398..dab24a3c482 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -198,6 +198,9 @@ const NamespaceString NamespaceString::kConfigQueryAnalyzersNamespace(NamespaceS
const NamespaceString NamespaceString::kConfigSampledQueriesNamespace(NamespaceString::kConfigDb,
"sampledQueries");
+const NamespaceString NamespaceString::kConfigSampledQueriesDiffNamespace(
+ NamespaceString::kConfigDb, "sampledQueriesDiff");
+
NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) {
if (!gMultitenancySupport) {
return NamespaceString(boost::none, ns);
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c982ba882bd..3c746ad0ddf 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -276,6 +276,9 @@ public:
// Namespace used for storing sampled queries.
static const NamespaceString kConfigSampledQueriesNamespace;
+ // Namespace used for storing the diffs for sampled update queries.
+ static const NamespaceString kConfigSampledQueriesDiffNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index ed775c9e15e..e635ce7f156 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -76,7 +76,7 @@ public:
};
UpdateRequest(const write_ops::UpdateOpEntry& updateOp = write_ops::UpdateOpEntry())
- : _updateOp(updateOp) {}
+ : _updateOp(updateOp), _sampleId(updateOp.getSampleId()) {}
void setNamespaceString(const NamespaceString& nsString) {
_nsString = nsString;
@@ -257,6 +257,14 @@ public:
return _stmtIds;
}
+ void setSampleId(boost::optional<UUID> sampleId) {
+ _sampleId = sampleId;
+ }
+
+ const boost::optional<UUID>& getSampleId() const {
+ return _sampleId;
+ }
+
std::string toString() const {
StringBuilder builder;
builder << " query: " << getQuery();
@@ -314,6 +322,9 @@ private:
// The statement ids of this request.
std::vector<StmtId> _stmtIds = {kUninitializedStmtId};
+ // The unique sample id for this request if it has been chosen for sampling.
+ boost::optional<UUID> _sampleId;
+
// Flags controlling the update.
// God bypasses _id checking and index generation. It is only used on behalf of system
diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp
index 84606c7f4c7..658c50c992d 100644
--- a/src/mongo/db/s/query_analysis_op_observer.cpp
+++ b/src/mongo/db/s/query_analysis_op_observer.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/s/query_analysis_coordinator.h"
#include "mongo/db/s/query_analysis_op_observer.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/logv2/log.h"
#include "mongo/s/analyze_shard_key_util.h"
#include "mongo/s/catalog/type_mongos.h"
@@ -88,6 +89,17 @@ void QueryAnalysisOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdat
});
}
}
+
+ if (analyze_shard_key::supportsPersistingSampledQueries() && args.updateArgs->sampleId &&
+ args.updateArgs->preImageDoc && opCtx->writesAreReplicated()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addDiff(*args.updateArgs->sampleId,
+ args.coll->ns(),
+ args.coll->uuid(),
+ *args.updateArgs->preImageDoc,
+ args.updateArgs->updatedDoc)
+ .getAsync([](auto) {});
+ }
}
void QueryAnalysisOpObserver::aboutToDelete(OperationContext* opCtx,
diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp
index 3861c48c464..19b942e1775 100644
--- a/src/mongo/db/s/query_analysis_writer.cpp
+++ b/src/mongo/db/s/query_analysis_writer.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
@@ -274,6 +275,19 @@ void QueryAnalysisWriter::onStartup() {
_periodicQueryWriter = periodicRunner->makeJob(std::move(QueryWriterJob));
_periodicQueryWriter.start();
+ PeriodicRunner::PeriodicJob diffWriterJob(
+ "QueryAnalysisDiffWriter",
+ [this](Client* client) {
+ if (MONGO_unlikely(disableQueryAnalysisWriter.shouldFail())) {
+ return;
+ }
+ auto opCtx = client->makeOperationContext();
+ _flushDiffs(opCtx.get());
+ },
+ Seconds(gQueryAnalysisWriterIntervalSecs));
+ _periodicDiffWriter = periodicRunner->makeJob(std::move(diffWriterJob));
+ _periodicDiffWriter.start();
+
ThreadPool::Options threadPoolOptions;
threadPoolOptions.maxThreads = gQueryAnalysisWriterMaxThreadPoolSize;
threadPoolOptions.minThreads = gQueryAnalysisWriterMinThreadPoolSize;
@@ -296,6 +310,9 @@ void QueryAnalysisWriter::onShutdown() {
if (_periodicQueryWriter.isValid()) {
_periodicQueryWriter.stop();
}
+ if (_periodicDiffWriter.isValid()) {
+ _periodicDiffWriter.stop();
+ }
}
void QueryAnalysisWriter::_flushQueries(OperationContext* opCtx) {
@@ -308,6 +325,16 @@ void QueryAnalysisWriter::_flushQueries(OperationContext* opCtx) {
}
}
+void QueryAnalysisWriter::_flushDiffs(OperationContext* opCtx) {
+ try {
+ _flush(opCtx, NamespaceString::kConfigSampledQueriesDiffNamespace, &_diffs);
+ } catch (DBException& ex) {
+ LOGV2(7075400,
+ "Failed to flush diffs, will try again at the next interval",
+ "error"_attr = redact(ex));
+ }
+}
+
void QueryAnalysisWriter::_flush(OperationContext* opCtx,
const NamespaceString& ns,
Buffer* buffer) {
@@ -427,7 +454,7 @@ void QueryAnalysisWriter::Buffer::truncate(size_t index, long long numBytes) {
bool QueryAnalysisWriter::_exceedsMaxSizeBytes() {
stdx::lock_guard<Latch> lk(_mutex);
- return _queries.getSize() >= gQueryAnalysisWriterMaxMemoryUsageBytes.load();
+ return _queries.getSize() + _diffs.getSize() >= gQueryAnalysisWriterMaxMemoryUsageBytes.load();
}
ExecutorFuture<void> QueryAnalysisWriter::addFindQuery(const UUID& sampleId,
@@ -629,5 +656,40 @@ ExecutorFuture<void> QueryAnalysisWriter::addFindAndModifyQuery(
});
}
+ExecutorFuture<void> QueryAnalysisWriter::addDiff(const UUID& sampleId,
+ const NamespaceString& nss,
+ const UUID& collUuid,
+ const BSONObj& preImage,
+ const BSONObj& postImage) {
+ invariant(_executor);
+ return ExecutorFuture<void>(_executor)
+ .then([this,
+ sampleId,
+ nss,
+ collUuid,
+ preImage = preImage.getOwned(),
+ postImage = postImage.getOwned()]() {
+ auto diff = doc_diff::computeInlineDiff(preImage, postImage);
+
+ if (!diff || diff->isEmpty()) {
+ return;
+ }
+
+ auto doc = SampledQueryDiffDocument{sampleId, nss, collUuid, std::move(*diff)};
+ stdx::lock_guard<Latch> lk(_mutex);
+ _diffs.add(doc.toBSON());
+ })
+ .then([this] {
+ if (_exceedsMaxSizeBytes()) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ _flushDiffs(opCtx);
+ }
+ })
+ .onError([this, nss](Status status) {
+ LOGV2(7075401, "Failed to add diff", "ns"_attr = nss, "error"_attr = redact(status));
+ });
+}
+
} // namespace analyze_shard_key
} // namespace mongo
diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h
index 76ae137273a..d93f187fede 100644
--- a/src/mongo/db/s/query_analysis_writer.h
+++ b/src/mongo/db/s/query_analysis_writer.h
@@ -44,10 +44,17 @@ namespace analyze_shard_key {
* Owns the machinery for persisting sampled queries. That consists of the following:
* - The buffer that stores sampled queries and the periodic background job that inserts those
* queries into the local config.sampledQueries collection.
+ * - The buffer that stores diffs for sampled update queries and the periodic background job that
+ * inserts those diffs into the local config.sampledQueriesDiff collection.
*
* Currently, query sampling is only supported on a sharded cluster. So a writer must be a shardsvr
* mongod. If the mongod is a primary, it will execute the insert commands locally. If it is a
* secondary, it will perform the insert commands against the primary.
+ *
+ * The memory usage of the buffers is controlled by the 'queryAnalysisWriterMaxMemoryUsageBytes'
+ * server parameter. Upon adding a query or a diff that causes the total size of buffers to exceed
+ * the limit, the writer will flush the corresponding buffer immediately instead of waiting for it
+ * to get flushed later by the periodic job.
*/
class QueryAnalysisWriter final : public std::enable_shared_from_this<QueryAnalysisWriter> {
QueryAnalysisWriter(const QueryAnalysisWriter&) = delete;
@@ -137,6 +144,12 @@ public:
ExecutorFuture<void> addFindAndModifyQuery(
const write_ops::FindAndModifyCommandRequest& findAndModifyCmd);
+ ExecutorFuture<void> addDiff(const UUID& sampleId,
+ const NamespaceString& nss,
+ const UUID& collUuid,
+ const BSONObj& preImage,
+ const BSONObj& postImage);
+
int getQueriesCountForTest() const {
stdx::lock_guard<Latch> lk(_mutex);
return _queries.getCount();
@@ -146,6 +159,15 @@ public:
_flushQueries(opCtx);
}
+ int getDiffsCountForTest() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _diffs.getCount();
+ }
+
+ void flushDiffsForTest(OperationContext* opCtx) {
+ _flushDiffs(opCtx);
+ }
+
private:
ExecutorFuture<void> _addReadQuery(const UUID& sampleId,
const NamespaceString& nss,
@@ -154,6 +176,7 @@ private:
const BSONObj& collation);
void _flushQueries(OperationContext* opCtx);
+ void _flushDiffs(OperationContext* opCtx);
/**
* The helper for '_flushQueries'. Inserts the documents in 'buffer' into the collection 'ns'
@@ -162,6 +185,8 @@ private:
* they are expected for the following reasons:
* - For the query buffer, a sampled query that is idempotent (e.g. a read or retryable write)
* could get added to the buffer (across nodes) more than once due to retries.
+ * - For the diff buffer, a sampled multi-update query could end up generating multiple diffs
+ * and each diff is identified using the sample id of the sampled query that creates it.
*
* Throws an error if the inserts fail with any other error.
*/
@@ -178,6 +203,9 @@ private:
PeriodicJobAnchor _periodicQueryWriter;
Buffer _queries;
+ PeriodicJobAnchor _periodicDiffWriter;
+ Buffer _diffs;
+
// Initialized on startup and joined on shutdown.
std::shared_ptr<executor::TaskExecutor> _executor;
};
diff --git a/src/mongo/db/s/query_analysis_writer_test.cpp b/src/mongo/db/s/query_analysis_writer_test.cpp
index 3c78afab971..b17f39c82df 100644
--- a/src/mongo/db/s/query_analysis_writer_test.cpp
+++ b/src/mongo/db/s/query_analysis_writer_test.cpp
@@ -157,6 +157,11 @@ DEATH_TEST(QueryAnalysisWriterBufferTest, TruncateInvalidSize_Positive, "invaria
buffer.truncate(0, doc.objsize() * 2);
}
+void assertBsonObjEqualUnordered(const BSONObj& lhs, const BSONObj& rhs) {
+ UnorderedFieldsBSONObjComparator comparator;
+ ASSERT_EQ(comparator.compare(lhs, rhs), 0);
+}
+
struct QueryAnalysisWriterTest : public ShardServerTestFixture {
public:
void setUp() {
@@ -373,6 +378,32 @@ protected:
ASSERT_BSONOBJ_EQ(parsedCmd.toBSON({}), expectedCmd.toBSON({}));
}
+ /*
+ * Returns the number of the documents for the collection 'nss' in the config.sampledQueriesDiff
+ * collection.
+ */
+ int getDiffDocumentsCount(const NamespaceString& nss) {
+ return _getConfigDocumentsCount(NamespaceString::kConfigSampledQueriesDiffNamespace, nss);
+ }
+
+ /*
+ * Asserts that there is a sampled diff document with the given sample id and that it has
+ * the given fields.
+ */
+ void assertDiffDocument(const UUID& sampleId,
+ const NamespaceString& nss,
+ const BSONObj& expectedDiff) {
+ auto doc =
+ _getConfigDocument(NamespaceString::kConfigSampledQueriesDiffNamespace, sampleId);
+ auto parsedDiffDoc =
+ SampledQueryDiffDocument::parse(IDLParserContext("QueryAnalysisWriterTest"), doc);
+
+ ASSERT_EQ(parsedDiffDoc.getNs(), nss);
+ ASSERT_EQ(parsedDiffDoc.getCollectionUuid(), getCollectionUUID(nss));
+ ASSERT_EQ(parsedDiffDoc.getSampleId(), sampleId);
+ assertBsonObjEqualUnordered(parsedDiffDoc.getDiff(), expectedDiff);
+ }
+
const NamespaceString nss0{"testDb", "testColl0"};
const NamespaceString nss1{"testDb", "testColl1"};
@@ -1036,6 +1067,215 @@ TEST_F(QueryAnalysisWriterTest, RemoveDuplicatesFromBufferAfterWriteError) {
}
}
+TEST_F(QueryAnalysisWriterTest, NoDiffs) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+ writer.flushQueriesForTest(operationContext());
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffsBasic) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto collUuid0 = getCollectionUUID(nss0);
+ auto sampleId = UUID::gen();
+ auto preImage = BSON("a" << 0);
+ auto postImage = BSON("a" << 1);
+
+ writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 1);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 1);
+ assertDiffDocument(sampleId, nss0, *doc_diff::computeInlineDiff(preImage, postImage));
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffsMultipleQueriesAndCollections) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ // Make nss0 have a diff for one query.
+ auto collUuid0 = getCollectionUUID(nss0);
+
+ auto sampleId0 = UUID::gen();
+ auto preImage0 = BSON("a" << 0 << "b" << 0 << "c" << 0);
+ auto postImage0 = BSON("a" << 0 << "b" << 1 << "d" << 1);
+
+ // Make nss1 have diffs for two queries.
+ auto collUuid1 = getCollectionUUID(nss1);
+
+ auto sampleId1 = UUID::gen();
+ auto preImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1) << "d" << BSON("e" << 1));
+ auto postImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1 << 2) << "d" << BSON("e" << 2));
+
+ auto sampleId2 = UUID::gen();
+ auto preImage2 = BSON("a" << BSONObj());
+ auto postImage2 = BSON("a" << BSON("b" << 2));
+
+ writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get();
+ writer.addDiff(sampleId1, nss1, collUuid1, preImage1, postImage1).get();
+ writer.addDiff(sampleId2, nss1, collUuid1, preImage2, postImage2).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 3);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 1);
+ assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0));
+
+ ASSERT_EQ(getDiffDocumentsCount(nss1), 2);
+ assertDiffDocument(sampleId1, nss1, *doc_diff::computeInlineDiff(preImage1, postImage1));
+ assertDiffDocument(sampleId2, nss1, *doc_diff::computeInlineDiff(preImage2, postImage2));
+}
+
+TEST_F(QueryAnalysisWriterTest, DuplicateDiffs) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto collUuid0 = getCollectionUUID(nss0);
+
+ auto sampleId0 = UUID::gen();
+ auto preImage0 = BSON("a" << 0);
+ auto postImage0 = BSON("a" << 1);
+
+ auto sampleId1 = UUID::gen();
+ auto preImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1));
+ auto postImage1 = BSON("a" << 1 << "b" << BSON_ARRAY(1 << 2));
+
+ auto sampleId2 = UUID::gen();
+ auto preImage2 = BSON("a" << BSONObj());
+ auto postImage2 = BSON("a" << BSON("b" << 2));
+
+ writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 1);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 1);
+ assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0));
+
+ writer.addDiff(sampleId1, nss0, collUuid0, preImage1, postImage1).get();
+ writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0)
+ .get(); // This is a duplicate.
+ writer.addDiff(sampleId2, nss0, collUuid0, preImage2, postImage2).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 3);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 3);
+ assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0));
+ assertDiffDocument(sampleId1, nss0, *doc_diff::computeInlineDiff(preImage1, postImage1));
+ assertDiffDocument(sampleId2, nss0, *doc_diff::computeInlineDiff(preImage2, postImage2));
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffsMultipleBatches_MaxBatchSize) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ RAIIServerParameterControllerForTest maxBatchSize{"queryAnalysisWriterMaxBatchSize", 2};
+ auto numDiffs = 5;
+ auto collUuid0 = getCollectionUUID(nss0);
+
+ std::vector<std::pair<UUID, BSONObj>> expectedSampledDiffs;
+ for (auto i = 0; i < numDiffs; i++) {
+ auto sampleId = UUID::gen();
+ auto preImage = BSON("a" << 0);
+ auto postImage = BSON(("a" + std::to_string(i)) << 1);
+ writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get();
+ expectedSampledDiffs.push_back(
+ {sampleId, *doc_diff::computeInlineDiff(preImage, postImage)});
+ }
+ ASSERT_EQ(writer.getDiffsCountForTest(), numDiffs);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), numDiffs);
+ for (const auto& [sampleId, diff] : expectedSampledDiffs) {
+ assertDiffDocument(sampleId, nss0, diff);
+ }
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffsMultipleBatches_MaxBSONObjSize) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto numDiffs = 3;
+ auto collUuid0 = getCollectionUUID(nss0);
+
+ std::vector<std::pair<UUID, BSONObj>> expectedSampledDiffs;
+ for (auto i = 0; i < numDiffs; i++) {
+ auto sampleId = UUID::gen();
+ auto preImage = BSON("a" << 0);
+ auto postImage = BSON(std::string(BSONObjMaxUserSize / 2, 'a') << 1);
+ writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get();
+ expectedSampledDiffs.push_back(
+ {sampleId, *doc_diff::computeInlineDiff(preImage, postImage)});
+ }
+ ASSERT_EQ(writer.getDiffsCountForTest(), numDiffs);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), numDiffs);
+ for (const auto& [sampleId, diff] : expectedSampledDiffs) {
+ assertDiffDocument(sampleId, nss0, diff);
+ }
+}
+
+TEST_F(QueryAnalysisWriterTest, FlushAfterAddDiffIfExceedsSizeLimit) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto maxMemoryUsageBytes = 1024;
+ RAIIServerParameterControllerForTest maxMemoryBytes{"queryAnalysisWriterMaxMemoryUsageBytes",
+ maxMemoryUsageBytes};
+
+ auto collUuid0 = getCollectionUUID(nss0);
+ auto sampleId0 = UUID::gen();
+ auto preImage0 = BSON("a" << 0);
+ auto postImage0 = BSON(std::string(maxMemoryUsageBytes / 2, 'a') << 1);
+
+ auto collUuid1 = getCollectionUUID(nss1);
+ auto sampleId1 = UUID::gen();
+ auto preImage1 = BSON("a" << 0);
+ auto postImage1 = BSON(std::string(maxMemoryUsageBytes / 2, 'b') << 1);
+
+ writer.addDiff(sampleId0, nss0, collUuid0, preImage0, postImage0).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 1);
+ // Adding the next diff causes the size to exceed the limit.
+ writer.addDiff(sampleId1, nss1, collUuid1, preImage1, postImage1).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 1);
+ assertDiffDocument(sampleId0, nss0, *doc_diff::computeInlineDiff(preImage0, postImage0));
+ ASSERT_EQ(getDiffDocumentsCount(nss1), 1);
+ assertDiffDocument(sampleId1, nss1, *doc_diff::computeInlineDiff(preImage1, postImage1));
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffEmpty) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto collUuid0 = getCollectionUUID(nss0);
+ auto sampleId = UUID::gen();
+ auto preImage = BSON("a" << 1);
+ auto postImage = preImage;
+
+ writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 0);
+}
+
+TEST_F(QueryAnalysisWriterTest, DiffExceedsSizeLimit) {
+ auto& writer = QueryAnalysisWriter::get(operationContext());
+
+ auto collUuid0 = getCollectionUUID(nss0);
+ auto sampleId = UUID::gen();
+ auto preImage = BSON(std::string(BSONObjMaxUserSize, 'a') << 1);
+ auto postImage = BSONObj();
+
+ writer.addDiff(sampleId, nss0, collUuid0, preImage, postImage).get();
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+ writer.flushDiffsForTest(operationContext());
+ ASSERT_EQ(writer.getDiffsCountForTest(), 0);
+
+ ASSERT_EQ(getDiffDocumentsCount(nss0), 0);
+}
+
} // namespace
} // namespace analyze_shard_key
} // namespace mongo
diff --git a/src/mongo/s/analyze_shard_key_documents.idl b/src/mongo/s/analyze_shard_key_documents.idl
index a3728d6c1cf..b1275858bd3 100644
--- a/src/mongo/s/analyze_shard_key_documents.idl
+++ b/src/mongo/s/analyze_shard_key_documents.idl
@@ -101,3 +101,21 @@ structs:
cmd:
type: object
description: "The command object for the write."
+
+ SampledQueryDiffDocument:
+ description: "Represents a document storing the diff for a sampled write query."
+ strict: false
+ fields:
+ _id:
+ type: uuid
+ description: "The unique sample id for the write."
+ cpp_name: sampleId
+ ns:
+ type: namespacestring
+ description: "The namespace of the collection for the write."
+ collectionUuid:
+ type: uuid
+ description: "The UUID of the collection for the write."
+ diff:
+ type: object
+ description: "The diff for the write."