summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops/write_ops_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp30
1 files changed, 24 insertions, 6 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index f3a37117b00..e98a5931dd8 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -67,6 +67,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/server_write_concern_metrics.h"
#include "mongo/db/stats/top.h"
@@ -677,7 +678,7 @@ WriteResult performInserts(OperationContext* opCtx,
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
- size_t stmtIdIndex = 0;
+ size_t nextOpIndex = 0;
size_t bytesInBatch = 0;
std::vector<InsertStatement> batch;
const size_t maxBatchSize = internalInsertMaxBatchSize.load();
@@ -685,10 +686,11 @@ WriteResult performInserts(OperationContext* opCtx,
batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize));
for (auto&& doc : wholeOp.getDocuments()) {
+ const auto currentOpIndex = nextOpIndex++;
const bool isLastDoc = (&doc == &wholeOp.getDocuments().back());
bool containsDotsAndDollarsField = false;
auto fixedDoc = fixDocumentForInsert(opCtx, doc, &containsDotsAndDollarsField);
- const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex);
const bool wasAlreadyExecuted = opCtx->isRetryableWrite() &&
txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId);
@@ -1041,7 +1043,7 @@ WriteResult performUpdates(OperationContext* opCtx,
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
- size_t stmtIdIndex = 0;
+ size_t nextOpIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getUpdates().size());
@@ -1054,7 +1056,8 @@ WriteResult performUpdates(OperationContext* opCtx,
// updates.
bool forgoOpCounterIncrements = false;
for (auto&& singleOp : wholeOp.getUpdates()) {
- const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ const auto currentOpIndex = nextOpIndex++;
+ const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex);
if (opCtx->isRetryableWrite()) {
if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
containsRetry = true;
@@ -1081,6 +1084,13 @@ WriteResult performUpdates(OperationContext* opCtx,
finishCurOp(opCtx, &*curOp);
}
});
+
+ if (analyze_shard_key::supportsPersistingSampledQueries() && singleOp.getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addUpdateQuery(wholeOp, currentOpIndex)
+ .getAsync([](auto) {});
+ }
+
try {
lastOpFixer.startingOp();
@@ -1276,7 +1286,7 @@ WriteResult performDeletes(OperationContext* opCtx,
bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });
- size_t stmtIdIndex = 0;
+ size_t nextOpIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getDeletes().size());
@@ -1286,7 +1296,8 @@ WriteResult performDeletes(OperationContext* opCtx,
wholeOp.getLegacyRuntimeConstants().value_or(Variables::generateRuntimeConstants(opCtx));
for (auto&& singleOp : wholeOp.getDeletes()) {
- const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ const auto currentOpIndex = nextOpIndex++;
+ const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, currentOpIndex);
if (opCtx->isRetryableWrite() &&
txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) {
containsRetry = true;
@@ -1316,6 +1327,13 @@ WriteResult performDeletes(OperationContext* opCtx,
&hangBeforeChildRemoveOpIsPopped, opCtx, "hangBeforeChildRemoveOpIsPopped");
}
});
+
+ if (analyze_shard_key::supportsPersistingSampledQueries() && singleOp.getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addDeleteQuery(wholeOp, currentOpIndex)
+ .getAsync([](auto) {});
+ }
+
try {
lastOpFixer.startingOp();
out.results.push_back(performSingleDeleteOp(opCtx,