diff options
author | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
---|---|---|
committer | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
commit | e74d2910bbe76790ad131d53fee277829cd95982 (patch) | |
tree | cabe148764529c9623652374fbc36323a550cd44 /src/mongo/db/ops/write_ops_exec.cpp | |
parent | 280145e9940729480bb8a35453d4056afac87641 (diff) | |
parent | ba467f46cc1bc49965e1d72b541eff0cf1d7b22e (diff) | |
download | mongo-jenniferpeshansky/SERVER-70854.tar.gz |
Merge branch 'master' into jenniferpeshansky/SERVER-70854jenniferpeshansky/SERVER-70854
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f3a37117b00..e98a5931dd8 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -67,6 +67,7 @@ #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/server_write_concern_metrics.h" #include "mongo/db/stats/top.h" @@ -677,7 +678,7 @@ WriteResult performInserts(OperationContext* opCtx, bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); - size_t stmtIdIndex = 0; + size_t nextOpIndex = 0; size_t bytesInBatch = 0; std::vector<InsertStatement> batch; const size_t maxBatchSize = internalInsertMaxBatchSize.load(); @@ -685,10 +686,11 @@ WriteResult performInserts(OperationContext* opCtx, batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize)); for (auto&& doc : wholeOp.getDocuments()) { + const auto currentOpIndex = nextOpIndex++; const bool isLastDoc = (&doc == &wholeOp.getDocuments().back()); bool containsDotsAndDollarsField = false; auto fixedDoc = fixDocumentForInsert(opCtx, doc, &containsDotsAndDollarsField); - const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex); const bool wasAlreadyExecuted = opCtx->isRetryableWrite() && txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId); @@ -1041,7 +1043,7 @@ WriteResult performUpdates(OperationContext* opCtx, bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); - size_t stmtIdIndex = 0; + size_t nextOpIndex = 0; WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); @@ -1054,7 +1056,8 @@ WriteResult performUpdates(OperationContext* opCtx, // updates. bool forgoOpCounterIncrements = false; for (auto&& singleOp : wholeOp.getUpdates()) { - const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + const auto currentOpIndex = nextOpIndex++; + const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex); if (opCtx->isRetryableWrite()) { if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) { containsRetry = true; @@ -1081,6 +1084,13 @@ WriteResult performUpdates(OperationContext* opCtx, finishCurOp(opCtx, &*curOp); } }); + + if (analyze_shard_key::supportsPersistingSampledQueries() && singleOp.getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addUpdateQuery(wholeOp, currentOpIndex) + .getAsync([](auto) {}); + } + try { lastOpFixer.startingOp(); @@ -1276,7 +1286,7 @@ WriteResult performDeletes(OperationContext* opCtx, bool containsRetry = false; ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); }); - size_t stmtIdIndex = 0; + size_t nextOpIndex = 0; WriteResult out; out.results.reserve(wholeOp.getDeletes().size()); @@ -1286,7 +1296,8 @@ WriteResult performDeletes(OperationContext* opCtx, wholeOp.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx)); for (auto&& singleOp : wholeOp.getDeletes()) { - const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + const auto currentOpIndex = nextOpIndex++; + const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex); if (opCtx->isRetryableWrite() && txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) { containsRetry = true; @@ -1316,6 +1327,13 @@ WriteResult performDeletes(OperationContext* opCtx, &hangBeforeChildRemoveOpIsPopped, opCtx, "hangBeforeChildRemoveOpIsPopped"); } }); + + if (analyze_shard_key::supportsPersistingSampledQueries() && singleOp.getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addDeleteQuery(wholeOp, currentOpIndex) + .getAsync([](auto) {}); + } + try { lastOpFixer.startingOp(); out.results.push_back(performSingleDeleteOp(opCtx, |