summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-08-25 18:34:54 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-31 21:53:09 +0000
commitc70016c56f4f34ba09f4975fbf5bb3ac5129b5f7 (patch)
treec68f8e2c539246ef00a9abc4dfee16e667024dac
parent5360e4bd5724440676017d17f6878852753e4bac (diff)
downloadmongo-c70016c56f4f34ba09f4975fbf5bb3ac5129b5f7.tar.gz
SERVER-48477 move CRUD interface into PlanExecutor
This allows classic PlanStages to be removed from the PlanExecutor interface. It also positions the code base well for adding SBE support for all of the CRUD commands.
-rw-r--r--src/mongo/db/commands/count_cmd.cpp9
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp23
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp8
-rw-r--r--src/mongo/db/exec/delete.cpp35
-rw-r--r--src/mongo/db/exec/delete.h11
-rw-r--r--src/mongo/db/exec/update_stage.cpp123
-rw-r--r--src/mongo/db/exec/update_stage.h37
-rw-r--r--src/mongo/db/ops/delete.cpp4
-rw-r--r--src/mongo/db/ops/find_and_modify_result.cpp10
-rw-r--r--src/mongo/db/ops/find_and_modify_result.h4
-rw-r--r--src/mongo/db/ops/update.cpp6
-rw-r--r--src/mongo/db/ops/update_result.cpp26
-rw-r--r--src/mongo/db/ops/update_result.h23
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp124
-rw-r--r--src/mongo/db/ops/write_ops_exec.h20
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp42
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp6
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h19
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp6
-rw-r--r--src/mongo/db/query/explain.cpp17
-rw-r--r--src/mongo/db/query/explain.h12
-rw-r--r--src/mongo/db/query/plan_executor.h39
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp78
-rw-r--r--src/mongo/db/query/plan_executor_impl.h17
-rw-r--r--src/mongo/db/query/plan_executor_sbe.h21
-rw-r--r--src/mongo/db/repl/oplog.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp9
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp2
-rw-r--r--src/mongo/db/s/sharding_state_recovery.cpp2
-rw-r--r--src/mongo/db/service_entry_point_common.cpp6
-rw-r--r--src/mongo/db/ttl.cpp8
-rw-r--r--src/mongo/dbtests/query_stage_multiplan.cpp18
-rw-r--r--src/mongo/dbtests/query_stage_sort.cpp17
35 files changed, 433 insertions, 361 deletions
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 45b51b5913e..918167e4739 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -263,7 +263,7 @@ public:
curOp->setPlanSummary_inlock(exec->getPlanSummary());
}
- exec->executePlan();
+ auto countResult = exec->executeCount();
PlanSummaryStats summaryStats;
exec->getSummaryStats(&summaryStats);
@@ -276,12 +276,7 @@ public:
curOp->debug().execStats = exec->getStats();
}
- // Plan is done executing. We just need to pull the count out of the root stage.
- invariant(STAGE_COUNT == exec->getRootStage()->stageType() ||
- STAGE_RECORD_STORE_FAST_COUNT == exec->getRootStage()->stageType());
- auto* countStats = static_cast<const CountStats*>(exec->getRootStage()->getSpecificStats());
-
- result.appendNumber("n", countStats->nCounted);
+ result.appendNumber("n", countResult);
return true;
}
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 9785b5b51e2..f944274c695 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/explain.h"
@@ -165,17 +166,19 @@ void appendCommandResponse(const PlanExecutor* exec,
const boost::optional<BSONObj>& value,
BSONObjBuilder* result) {
if (isRemove) {
- find_and_modify::serializeRemove(DeleteStage::getNumDeleted(*exec), value, result);
+ find_and_modify::serializeRemove(value, result);
} else {
- const auto updateStats = UpdateStage::getUpdateStats(exec);
+ const auto updateResult = exec->getUpdateResult();
// Note we have to use the objInserted from the stats here, rather than 'value' because the
// _id field could have been excluded by a projection.
- find_and_modify::serializeUpsert(updateStats->inserted ? 1 : updateStats->nMatched,
- value,
- updateStats->nMatched > 0,
- updateStats->objInserted,
- result);
+ find_and_modify::serializeUpsert(
+ !updateResult.upsertedId.isEmpty() ? 1 : updateResult.numMatched,
+ value,
+ updateResult.numMatched > 0,
+ updateResult.upsertedId.isEmpty() ? BSONElement{}
+ : updateResult.upsertedId.firstElement(),
+ result);
}
}
@@ -414,7 +417,7 @@ public:
uassertStatusOK(parsedUpdate.parseQueryToCQ());
}
- if (!UpdateStage::shouldRetryDuplicateKeyException(
+ if (!write_ops_exec::shouldRetryDuplicateKeyException(
parsedUpdate, *ex.extraInfo<DuplicateKeyErrorInfo>())) {
throw;
}
@@ -489,7 +492,7 @@ public:
opDebug->setPlanSummaryMetrics(summaryStats);
// Fill out OpDebug with the number of deleted docs.
- opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec);
+ opDebug->additiveMetrics.ndeleted = docFound ? 1 : 0;
if (curOp->shouldDBProfile()) {
curOp->debug().execStats = exec->getStats();
@@ -565,7 +568,7 @@ public:
if (collection) {
CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats);
}
- UpdateStage::recordUpdateStatsInOpDebug(UpdateStage::getUpdateStats(exec.get()), opDebug);
+ write_ops_exec::recordUpdateResultInOpDebug(exec->getUpdateResult(), opDebug);
opDebug->setPlanSummaryMetrics(summaryStats);
if (curOp->shouldDBProfile()) {
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index e79e9ce5581..bacfbd5dfd8 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -90,7 +90,7 @@ void serializeReply(OperationContext* opCtx,
ReplyStyle replyStyle,
bool continueOnError,
size_t opsInBatch,
- WriteResult result,
+ write_ops_exec::WriteResult result,
BSONObjBuilder* out) {
if (shouldSkipOutput(opCtx))
return;
@@ -307,7 +307,7 @@ private:
}
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
- auto reply = performInserts(opCtx, _batch);
+ auto reply = write_ops_exec::performInserts(opCtx, _batch);
serializeReply(opCtx,
ReplyStyle::kNotUpdate,
!_batch.getWriteCommandBase().getOrdered(),
@@ -405,7 +405,7 @@ private:
}
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
- auto reply = performUpdates(opCtx, _batch);
+ auto reply = write_ops_exec::performUpdates(opCtx, _batch);
serializeReply(opCtx,
ReplyStyle::kUpdate,
!_batch.getWriteCommandBase().getOrdered(),
@@ -516,7 +516,7 @@ private:
}
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
- auto reply = performDeletes(opCtx, _batch);
+ auto reply = write_ops_exec::performDeletes(opCtx, _batch);
serializeReply(opCtx,
ReplyStyle::kNotUpdate,
!_batch.getWriteCommandBase().getOrdered(),
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
index a53447e9f03..78528e74b0e 100644
--- a/src/mongo/db/exec/delete.cpp
+++ b/src/mongo/db/exec/delete.cpp
@@ -68,15 +68,12 @@ bool shouldRestartDeleteIfNoLongerMatches(const DeleteStageParams* params) {
} // namespace
-// static
-const char* DeleteStage::kStageType = "DELETE";
-
DeleteStage::DeleteStage(ExpressionContext* expCtx,
std::unique_ptr<DeleteStageParams> params,
WorkingSet* ws,
Collection* collection,
PlanStage* child)
- : RequiresMutableCollectionStage(kStageType, expCtx, collection),
+ : RequiresMutableCollectionStage(kStageType.rawData(), expCtx, collection),
_params(std::move(params)),
_ws(ws),
_idRetrying(WorkingSet::INVALID_ID),
@@ -270,36 +267,6 @@ const SpecificStats* DeleteStage::getSpecificStats() const {
return &_specificStats;
}
-// static
-long long DeleteStage::getNumDeleted(const PlanExecutor& exec) {
- invariant(exec.getRootStage()->isEOF());
-
- // If we're deleting from a non-existent collection, then the delete plan may have an EOF as the
- // root stage.
- if (exec.getRootStage()->stageType() == STAGE_EOF) {
- return 0LL;
- }
-
- // If the collection exists, the delete plan may either have a delete stage at the root, or (for
- // findAndModify) a projection stage wrapping a delete stage.
- switch (exec.getRootStage()->stageType()) {
- case StageType::STAGE_PROJECTION_DEFAULT:
- case StageType::STAGE_PROJECTION_COVERED:
- case StageType::STAGE_PROJECTION_SIMPLE: {
- invariant(exec.getRootStage()->getChildren().size() == 1U);
- invariant(StageType::STAGE_DELETE == exec.getRootStage()->child()->stageType());
- const SpecificStats* stats = exec.getRootStage()->child()->getSpecificStats();
- return static_cast<const DeleteStats*>(stats)->docsDeleted;
- }
- default: {
- invariant(StageType::STAGE_DELETE == exec.getRootStage()->stageType());
- const auto* deleteStats =
- static_cast<const DeleteStats*>(exec.getRootStage()->getSpecificStats());
- return deleteStats->docsDeleted;
- }
- }
-}
-
PlanStage::StageState DeleteStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) {
_idRetrying = idToRetry;
*out = WorkingSet::INVALID_ID;
diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h
index a0f9f056d92..813ff1683fb 100644
--- a/src/mongo/db/exec/delete.h
+++ b/src/mongo/db/exec/delete.h
@@ -99,6 +99,8 @@ class DeleteStage final : public RequiresMutableCollectionStage {
DeleteStage& operator=(const DeleteStage&) = delete;
public:
+ static constexpr StringData kStageType = "DELETE"_sd;
+
DeleteStage(ExpressionContext* expCtx,
std::unique_ptr<DeleteStageParams> params,
WorkingSet* ws,
@@ -116,15 +118,6 @@ public:
const SpecificStats* getSpecificStats() const final;
- static const char* kStageType;
-
- /**
- * Extracts the number of documents deleted by the delete plan 'exec'.
- *
- * Should only be called if the root plan stage of 'exec' is DELETE and if 'exec' is EOF.
- */
- static long long getNumDeleted(const PlanExecutor& exec);
-
protected:
void doSaveStateRequiresCollection() final {}
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 51e0abfae2c..776f6aec134 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -105,10 +105,6 @@ CollectionUpdateArgs::StoreDocOption getStoreDocMode(const UpdateRequest& update
}
} // namespace
-const char* UpdateStage::kStageType = "UPDATE";
-
-const UpdateStats UpdateStage::kEmptyUpdateStats;
-
// Public constructor.
UpdateStage::UpdateStage(ExpressionContext* expCtx,
const UpdateStageParams& params,
@@ -126,7 +122,7 @@ UpdateStage::UpdateStage(ExpressionContext* expCtx,
const UpdateStageParams& params,
WorkingSet* ws,
Collection* collection)
- : RequiresMutableCollectionStage(kStageType, expCtx, collection),
+ : RequiresMutableCollectionStage(kStageType.rawData(), expCtx, collection),
_params(params),
_ws(ws),
_doc(params.driver->getDocument()),
@@ -354,82 +350,6 @@ void UpdateStage::_assertPathsNotArray(const mb::Document& document, const Field
}
}
-bool UpdateStage::matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) {
- if (root.matchType() == MatchExpression::EQ) {
- return true;
- }
-
- if (root.matchType() == MatchExpression::AND) {
- for (size_t i = 0; i < root.numChildren(); ++i) {
- if (root.getChild(i)->matchType() != MatchExpression::EQ) {
- return false;
- }
- }
-
- return true;
- }
-
- return false;
-}
-
-bool UpdateStage::shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpdate,
- const DuplicateKeyErrorInfo& errorInfo) {
- invariant(parsedUpdate.hasParsedQuery());
-
- const auto updateRequest = parsedUpdate.getRequest();
-
- // In order to be retryable, the update must be an upsert with multi:false.
- if (!updateRequest->isUpsert() || updateRequest->isMulti()) {
- return false;
- }
-
- auto matchExpr = parsedUpdate.getParsedQuery()->root();
- invariant(matchExpr);
-
- // In order to be retryable, the update query must contain no expressions other than AND and EQ.
- if (!matchContainsOnlyAndedEqualityNodes(*matchExpr)) {
- return false;
- }
-
- // In order to be retryable, the update equality field paths must be identical to the unique
- // index key field paths. Also, the values that triggered the DuplicateKey error must match the
- // values used in the upsert query predicate.
- pathsupport::EqualityMatches equalities;
- auto status = pathsupport::extractEqualityMatches(*matchExpr, &equalities);
- if (!status.isOK()) {
- return false;
- }
-
- auto keyPattern = errorInfo.getKeyPattern();
- if (equalities.size() != static_cast<size_t>(keyPattern.nFields())) {
- return false;
- }
-
- auto keyValue = errorInfo.getDuplicatedKeyValue();
-
- BSONObjIterator keyPatternIter(keyPattern);
- BSONObjIterator keyValueIter(keyValue);
- while (keyPatternIter.more() && keyValueIter.more()) {
- auto keyPatternElem = keyPatternIter.next();
- auto keyValueElem = keyValueIter.next();
-
- auto keyName = keyPatternElem.fieldNameStringData();
- if (!equalities.count(keyName)) {
- return false;
- }
-
- // Comparison which obeys field ordering but ignores field name.
- BSONElementComparator cmp{BSONElementComparator::FieldNamesMode::kIgnore, nullptr};
- if (cmp.evaluate(equalities[keyName]->getData() != keyValueElem)) {
- return false;
- }
- }
- invariant(!keyPatternIter.more());
- invariant(!keyValueIter.more());
-
- return true;
-}
-
bool UpdateStage::isEOF() {
// We're done updating if either the child has no more results to give us, or we've
// already gotten a result back and we're not a multi-update.
@@ -641,47 +561,6 @@ const SpecificStats* UpdateStage::getSpecificStats() const {
return &_specificStats;
}
-const UpdateStats* UpdateStage::getUpdateStats(const PlanExecutor* exec) {
- invariant(exec->getRootStage()->isEOF());
-
- // If we're updating a non-existent collection, then the delete plan may have an EOF as the root
- // stage.
- if (exec->getRootStage()->stageType() == STAGE_EOF) {
- return &kEmptyUpdateStats;
- }
-
- // If the collection exists, then we expect the root of the plan tree to either be an update
- // stage, or (for findAndModify) a projection stage wrapping an update stage.
- switch (exec->getRootStage()->stageType()) {
- case StageType::STAGE_PROJECTION_DEFAULT:
- case StageType::STAGE_PROJECTION_COVERED:
- case StageType::STAGE_PROJECTION_SIMPLE: {
- invariant(exec->getRootStage()->getChildren().size() == 1U);
- invariant(StageType::STAGE_UPDATE == exec->getRootStage()->child()->stageType());
- const SpecificStats* stats = exec->getRootStage()->child()->getSpecificStats();
- return static_cast<const UpdateStats*>(stats);
- }
- default:
- invariant(StageType::STAGE_UPDATE == exec->getRootStage()->stageType());
- return static_cast<const UpdateStats*>(exec->getRootStage()->getSpecificStats());
- }
-}
-
-void UpdateStage::recordUpdateStatsInOpDebug(const UpdateStats* updateStats, OpDebug* opDebug) {
- invariant(opDebug);
- opDebug->additiveMetrics.nMatched = updateStats->nMatched;
- opDebug->additiveMetrics.nModified = updateStats->nModified;
- opDebug->upsert = updateStats->inserted;
-}
-
-UpdateResult UpdateStage::makeUpdateResult(const UpdateStats* updateStats) {
- return UpdateResult(updateStats->nMatched > 0 /* Did we update at least one obj? */,
- updateStats->isModUpdate /* Is this a $mod update? */,
- updateStats->nModified /* number of modified docs, no no-ops */,
- updateStats->nMatched /* # of docs matched/updated, even no-ops */,
- updateStats->objInserted);
-};
-
PlanStage::StageState UpdateStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) {
_idRetrying = idToRetry;
*out = WorkingSet::INVALID_ID;
diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h
index 3677f509a6f..0eff229ce7d 100644
--- a/src/mongo/db/exec/update_stage.h
+++ b/src/mongo/db/exec/update_stage.h
@@ -81,6 +81,8 @@ class UpdateStage : public RequiresMutableCollectionStage {
UpdateStage& operator=(const UpdateStage&) = delete;
public:
+ static constexpr StringData kStageType = "UPDATE"_sd;
+
UpdateStage(ExpressionContext* expCtx,
const UpdateStageParams& params,
WorkingSet* ws,
@@ -98,33 +100,6 @@ public:
const SpecificStats* getSpecificStats() const final;
- static const char* kStageType;
-
- /**
- * Gets a pointer to the UpdateStats inside 'exec'.
- *
- * The 'exec' must have an UPDATE stage as its root stage, and the plan must be EOF before
- * calling this method.
- */
- static const UpdateStats* getUpdateStats(const PlanExecutor* exec);
-
- /**
- * Populate 'opDebug' with stats from 'updateStats' describing the execution of this update.
- */
- static void recordUpdateStatsInOpDebug(const UpdateStats* updateStats, OpDebug* opDebug);
-
- /**
- * Converts 'updateStats' into an UpdateResult.
- */
- static UpdateResult makeUpdateResult(const UpdateStats* updateStats);
-
- /**
- * Returns true if an update failure due to a given DuplicateKey error is eligible for retry.
- * Requires that parsedUpdate.hasParsedQuery() is true.
- */
- static bool shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpdate,
- const DuplicateKeyErrorInfo& errorInfo);
-
protected:
UpdateStage(ExpressionContext* expCtx,
const UpdateStageParams& params,
@@ -156,14 +131,6 @@ protected:
mutablebson::DamageVector _damages;
private:
- static const UpdateStats kEmptyUpdateStats;
-
- /**
- * Returns whether a given MatchExpression contains is a MatchType::EQ or a MatchType::AND node
- * with only MatchType::EQ children.
- */
- static bool matchContainsOnlyAndedEqualityNodes(const MatchExpression& root);
-
/**
* Computes the result of applying mods to the document 'oldObj' at RecordId 'recordId' in
* memory, then commits these changes to the database. Returns a possibly unowned copy
diff --git a/src/mongo/db/ops/delete.cpp b/src/mongo/db/ops/delete.cpp
index 9b2720e681f..ea988e53c6c 100644
--- a/src/mongo/db/ops/delete.cpp
+++ b/src/mongo/db/ops/delete.cpp
@@ -60,9 +60,7 @@ long long deleteObjects(OperationContext* opCtx,
auto exec = uassertStatusOK(getExecutorDelete(
&CurOp::get(opCtx)->debug(), collection, &parsedDelete, boost::none /* verbosity */));
- exec->executePlan();
-
- return DeleteStage::getNumDeleted(*exec);
+ return exec->executeDelete();
}
} // namespace mongo
diff --git a/src/mongo/db/ops/find_and_modify_result.cpp b/src/mongo/db/ops/find_and_modify_result.cpp
index 734fb637942..686fc2c753f 100644
--- a/src/mongo/db/ops/find_and_modify_result.cpp
+++ b/src/mongo/db/ops/find_and_modify_result.cpp
@@ -50,9 +50,9 @@ void appendValue(const boost::optional<BSONObj>& value, BSONObjBuilder* builder)
} // namespace
-void serializeRemove(size_t n, const boost::optional<BSONObj>& value, BSONObjBuilder* builder) {
+void serializeRemove(const boost::optional<BSONObj>& value, BSONObjBuilder* builder) {
BSONObjBuilder lastErrorObjBuilder(builder->subobjStart("lastErrorObject"));
- builder->appendNumber("n", n);
+ builder->appendNumber("n", value ? 1 : 0);
lastErrorObjBuilder.doneFast();
appendValue(value, builder);
@@ -61,13 +61,13 @@ void serializeRemove(size_t n, const boost::optional<BSONObj>& value, BSONObjBui
void serializeUpsert(size_t n,
const boost::optional<BSONObj>& value,
bool updatedExisting,
- const BSONObj& objInserted,
+ BSONElement idInserted,
BSONObjBuilder* builder) {
BSONObjBuilder lastErrorObjBuilder(builder->subobjStart("lastErrorObject"));
lastErrorObjBuilder.appendNumber("n", n);
lastErrorObjBuilder.appendBool("updatedExisting", updatedExisting);
- if (!objInserted.isEmpty()) {
- lastErrorObjBuilder.appendAs(objInserted["_id"], kUpsertedFieldName);
+ if (idInserted) {
+ lastErrorObjBuilder.appendAs(idInserted, kUpsertedFieldName);
}
lastErrorObjBuilder.doneFast();
diff --git a/src/mongo/db/ops/find_and_modify_result.h b/src/mongo/db/ops/find_and_modify_result.h
index fd4e28e0b4b..30da07faa9f 100644
--- a/src/mongo/db/ops/find_and_modify_result.h
+++ b/src/mongo/db/ops/find_and_modify_result.h
@@ -39,12 +39,12 @@ class BSONObjBuilder;
namespace find_and_modify {
-void serializeRemove(size_t n, const boost::optional<BSONObj>& value, BSONObjBuilder* builder);
+void serializeRemove(const boost::optional<BSONObj>& value, BSONObjBuilder* builder);
void serializeUpsert(size_t n,
const boost::optional<BSONObj>& value,
bool updatedExisting,
- const BSONObj& objInserted,
+ BSONElement idInserted,
BSONObjBuilder* builder);
} // namespace find_and_modify
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index e55608644b5..56c69a9c662 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -95,10 +95,6 @@ UpdateResult update(OperationContext* opCtx, Database* db, const UpdateRequest&
auto exec = uassertStatusOK(
getExecutorUpdate(nullOpDebug, collection, &parsedUpdate, boost::none /* verbosity */));
- exec->executePlan();
-
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
-
- return UpdateStage::makeUpdateResult(updateStats);
+ return exec->executeUpdate();
}
} // namespace mongo
diff --git a/src/mongo/db/ops/update_result.cpp b/src/mongo/db/ops/update_result.cpp
index a47fd3067ae..84a29f3108b 100644
--- a/src/mongo/db/ops/update_result.cpp
+++ b/src/mongo/db/ops/update_result.cpp
@@ -40,18 +40,18 @@
namespace mongo {
-UpdateResult::UpdateResult(bool existing_,
- bool modifiers_,
- unsigned long long numDocsModified_,
- unsigned long long numMatched_,
- const BSONObj& upsertedObject_)
- : existing(existing_),
- modifiers(modifiers_),
- numDocsModified(numDocsModified_),
- numMatched(numMatched_) {
- BSONElement id = upsertedObject_["_id"];
+UpdateResult::UpdateResult(bool existing,
+ bool modifiers,
+ unsigned long long numDocsModified,
+ unsigned long long numMatched,
+ const BSONObj& upsertedObject)
+ : existing(existing),
+ modifiers(modifiers),
+ numDocsModified(numDocsModified),
+ numMatched(numMatched) {
+ BSONElement id = upsertedObject["_id"];
if (!existing && numMatched == 0 && !id.eoo()) {
- upserted = id.wrap(kUpsertedFieldName);
+ upsertedId = id.wrap(kUpsertedFieldName);
}
LOGV2_DEBUG(20885,
4,
@@ -60,13 +60,13 @@ UpdateResult::UpdateResult(bool existing_,
"UpdateResult",
"numMatched"_attr = numMatched,
"numModified"_attr = numDocsModified,
- "upserted"_attr = redact(upserted),
+ "upsertedId"_attr = redact(upsertedId),
"modifiers"_attr = modifiers,
"existing"_attr = existing);
}
std::string UpdateResult::toString() const {
- return str::stream() << " upserted: " << upserted << " modifiers: " << modifiers
+ return str::stream() << " upsertedId: " << upsertedId << " modifiers: " << modifiers
<< " existing: " << existing << " numDocsModified: " << numDocsModified
<< " numMatched: " << numMatched;
}
diff --git a/src/mongo/db/ops/update_result.h b/src/mongo/db/ops/update_result.h
index f50be92037d..7e714b78dc7 100644
--- a/src/mongo/db/ops/update_result.h
+++ b/src/mongo/db/ops/update_result.h
@@ -34,28 +34,29 @@
namespace mongo {
struct UpdateResult {
- UpdateResult(bool existing_,
- bool modifiers_,
- unsigned long long numDocsModified_,
- unsigned long long numMatched_,
- const BSONObj& upsertedObject_);
+ UpdateResult(bool existing,
+ bool modifiers,
+ unsigned long long numDocsModified,
+ unsigned long long numMatched,
+ const BSONObj& upsertedObject);
std::string toString() const;
- // if existing objects were modified
+ // True if at least one pre-existing document was modified.
const bool existing;
- // was this a $ mod
+ // True if this was a modifier-style update, not a replacement update or a pipeline-style
+ // update.
const bool modifiers;
- // how many docs updated
const long long numDocsModified;
- // how many docs seen by update
const long long numMatched;
- // if something was upserted, the new _id of the object
- BSONObj upserted;
+ // If either the operation was not an upsert, or the upsert did not result in an insert, then
+ // this is the empty object. If an insert occurred as the result of an upsert operation, then
+ // this is a single-element object containing the _id of the document inserted.
+ BSONObj upsertedId;
};
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index d79fe941414..62401613f07 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/transaction_participant.h"
+#include "mongo/db/update/path_support.h"
#include "mongo/db/write_concern.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -81,7 +82,7 @@
#include "mongo/util/log_and_backoff.h"
#include "mongo/util/scopeguard.h"
-namespace mongo {
+namespace mongo::write_ops_exec {
// Convention in this file: generic helpers go in the anonymous namespace. Helpers that are for a
// single type of operation are static functions defined above their caller.
@@ -672,7 +673,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
- exec->executePlan();
+ auto updateResult = exec->executeUpdate();
PlanSummaryStats summary;
exec->getSummaryStats(&summary);
@@ -684,19 +685,18 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
curOp.debug().execStats = exec->getStats();
}
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, &curOp.debug());
+ recordUpdateResultInOpDebug(updateResult, &curOp.debug());
curOp.debug().setPlanSummaryMetrics(summary);
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
- const bool didInsert = !res.upserted.isEmpty();
- const long long nMatchedOrInserted = didInsert ? 1 : res.numMatched;
- LastError::get(opCtx->getClient()).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
+ const bool didInsert = !updateResult.upsertedId.isEmpty();
+ const long long nMatchedOrInserted = didInsert ? 1 : updateResult.numMatched;
+ LastError::get(opCtx->getClient())
+ .recordUpdate(updateResult.existing, nMatchedOrInserted, updateResult.upsertedId);
SingleWriteResult result;
result.setN(nMatchedOrInserted);
- result.setNModified(res.numDocsModified);
- result.setUpsertedId(res.upserted);
+ result.setNModified(updateResult.numDocsModified);
+ result.setUpsertedId(updateResult.upsertedId);
return result;
}
@@ -753,8 +753,8 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
uassertStatusOK(parsedUpdate.parseQueryToCQ());
}
- if (!UpdateStage::shouldRetryDuplicateKeyException(
- parsedUpdate, *ex.extraInfo<DuplicateKeyErrorInfo>())) {
+ if (!shouldRetryDuplicateKeyException(parsedUpdate,
+ *ex.extraInfo<DuplicateKeyErrorInfo>())) {
throw;
}
@@ -910,9 +910,8 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
- exec->executePlan();
- long long n = DeleteStage::getNumDeleted(*exec);
- curOp.debug().additiveMetrics.ndeleted = n;
+ auto nDeleted = exec->executeDelete();
+ curOp.debug().additiveMetrics.ndeleted = nDeleted;
PlanSummaryStats summary;
exec->getSummaryStats(&summary);
@@ -925,10 +924,10 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
curOp.debug().execStats = exec->getStats();
}
- LastError::get(opCtx->getClient()).recordDelete(n);
+ LastError::get(opCtx->getClient()).recordDelete(nDeleted);
SingleWriteResult result;
- result.setN(n);
+ result.setN(nDeleted);
return result;
}
@@ -1013,4 +1012,93 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
return out;
}
-} // namespace mongo
+void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug) {
+ invariant(opDebug);
+ opDebug->additiveMetrics.nMatched = updateResult.numMatched;
+ opDebug->additiveMetrics.nModified = updateResult.numDocsModified;
+ opDebug->upsert = !updateResult.upsertedId.isEmpty();
+}
+
+namespace {
+/**
+ * Returns whether a given MatchExpression contains is a MatchType::EQ or a MatchType::AND node with
+ * only MatchType::EQ children.
+ */
+bool matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) {
+ if (root.matchType() == MatchExpression::EQ) {
+ return true;
+ }
+
+ if (root.matchType() == MatchExpression::AND) {
+ for (size_t i = 0; i < root.numChildren(); ++i) {
+ if (root.getChild(i)->matchType() != MatchExpression::EQ) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+} // namespace
+
+bool shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpdate,
+ const DuplicateKeyErrorInfo& errorInfo) {
+ invariant(parsedUpdate.hasParsedQuery());
+
+ const auto updateRequest = parsedUpdate.getRequest();
+
+ // In order to be retryable, the update must be an upsert with multi:false.
+ if (!updateRequest->isUpsert() || updateRequest->isMulti()) {
+ return false;
+ }
+
+ auto matchExpr = parsedUpdate.getParsedQuery()->root();
+ invariant(matchExpr);
+
+ // In order to be retryable, the update query must contain no expressions other than AND and EQ.
+ if (!matchContainsOnlyAndedEqualityNodes(*matchExpr)) {
+ return false;
+ }
+
+ // In order to be retryable, the update equality field paths must be identical to the unique
+ // index key field paths. Also, the values that triggered the DuplicateKey error must match the
+ // values used in the upsert query predicate.
+ pathsupport::EqualityMatches equalities;
+ auto status = pathsupport::extractEqualityMatches(*matchExpr, &equalities);
+ if (!status.isOK()) {
+ return false;
+ }
+
+ auto keyPattern = errorInfo.getKeyPattern();
+ if (equalities.size() != static_cast<size_t>(keyPattern.nFields())) {
+ return false;
+ }
+
+ auto keyValue = errorInfo.getDuplicatedKeyValue();
+
+ BSONObjIterator keyPatternIter(keyPattern);
+ BSONObjIterator keyValueIter(keyValue);
+ while (keyPatternIter.more() && keyValueIter.more()) {
+ auto keyPatternElem = keyPatternIter.next();
+ auto keyValueElem = keyValueIter.next();
+
+ auto keyName = keyPatternElem.fieldNameStringData();
+ if (!equalities.count(keyName)) {
+ return false;
+ }
+
+ // Comparison which obeys field ordering but ignores field name.
+ BSONElementComparator cmp{BSONElementComparator::FieldNamesMode::kIgnore, nullptr};
+ if (cmp.evaluate(equalities[keyName]->getData() != keyValueElem)) {
+ return false;
+ }
+ }
+ invariant(!keyPatternIter.more());
+ invariant(!keyValueIter.more());
+
+ return true;
+}
+
+} // namespace mongo::write_ops_exec
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index 246c4226880..c6f03535198 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -35,11 +35,17 @@
#include "mongo/base/status_with.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/single_write_result_gen.h"
+#include "mongo/db/ops/update_result.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
+class OpDebug;
+class ParsedUpdate;
+
+namespace write_ops_exec {
+
/**
* The result of performing a single write, possibly within a batch.
*/
@@ -70,4 +76,18 @@ WriteResult performInserts(OperationContext* opCtx,
WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op);
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op);
+/**
+ * Populate 'opDebug' with stats describing the execution of an update operation. Illegal to call
+ * with a null OpDebug pointer.
+ */
+void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug);
+
+/**
+ * Returns true if an update failure due to a given DuplicateKey error is eligible for retry.
+ * Requires that parsedUpdate.hasParsedQuery() is true.
+ */
+bool shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpdate,
+ const DuplicateKeyErrorInfo& errorInfo);
+
+} // namespace write_ops_exec
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index d39dc6f78c5..252b417d92f 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -139,14 +139,14 @@ void parseOplogEntryForFindAndModify(OperationContext* opCtx,
1,
request.shouldReturnNew() ? oplogEntry.getObject() : boost::optional<BSONObj>(),
false,
- oplogEntry.getObject(),
+ oplogEntry.getObject()["_id"],
builder);
case repl::OpTypeEnum::kUpdate:
return find_and_modify::serializeUpsert(
1, extractPreOrPostImage(opCtx, oplogWithCorrectLinks), true, {}, builder);
case repl::OpTypeEnum::kDelete:
return find_and_modify::serializeRemove(
- 1, extractPreOrPostImage(opCtx, oplogWithCorrectLinks), builder);
+ extractPreOrPostImage(opCtx, oplogWithCorrectLinks), builder);
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f6cbf689508..88429000f86 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -103,12 +103,17 @@ namespace {
* Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
* if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
* percentage of the collection.
+ *
+ * If needed, adds DocumentSourceSampleFromRandomCursor to the front of the pipeline, replacing the
+ * $sample stage. This is needed if we select an optimized plan for $sample taking advantage of
+ * storage engine support for random cursors.
*/
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor(
const Collection* coll,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long sampleSize,
- long long numRecords) {
+ long long numRecords,
+ Pipeline* pipeline) {
OperationContext* opCtx = expCtx->opCtx;
// Verify that we are already under a collection lock. We avoid taking locks ourselves in this
@@ -140,6 +145,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
->getOwnershipFilter(
opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup);
+ TrialStage* trialStage = nullptr;
+
// Because 'numRecords' includes orphan documents, our initial decision to optimize the $sample
// cursor may have been mistaken. For sharded collections, build a TRIAL plan that will switch
// to a collection scan if the ratio of orphaned to owned documents encountered over the first
@@ -168,10 +175,24 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
std::move(collScanPlan),
kMaxPresampleSize,
minWorkAdvancedRatio);
+ trialStage = static_cast<TrialStage*>(root.get());
}
- return plan_executor_factory::make(
+ auto exec = plan_executor_factory::make(
expCtx, std::move(ws), std::move(root), coll, PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
+
+ // For sharded collections, the root of the plan tree is a TrialStage that may have chosen
+ // either a random-sampling cursor trial plan or a COLLSCAN backup plan. We can only optimize
+ // the $sample aggregation stage if the trial plan was chosen.
+ if (!trialStage || !trialStage->pickedBackupPlan()) {
+ // Replace $sample stage with $sampleFromRandomCursor stage.
+ pipeline->popFront();
+ std::string idString = coll->ns().isOplog() ? "ts" : "_id";
+ pipeline->addInitialSource(
+ DocumentSourceSampleFromRandomCursor::create(expCtx, sampleSize, idString, numRecords));
+ }
+
+ return exec;
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
@@ -325,28 +346,13 @@ PipelineD::buildInnerQueryExecutor(const Collection* collection,
const long long sampleSize = sampleStage->getSampleSize();
const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
auto exec = uassertStatusOK(
- createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords));
+ createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords, pipeline));
if (exec) {
- // For sharded collections, the root of the plan tree is a TrialStage that may have
- // chosen either a random-sampling cursor trial plan or a COLLSCAN backup plan. We
- // can only optimize the $sample aggregation stage if the trial plan was chosen.
- auto* trialStage = (exec->getRootStage()->stageType() == StageType::STAGE_TRIAL
- ? static_cast<TrialStage*>(exec->getRootStage())
- : nullptr);
- if (!trialStage || !trialStage->pickedBackupPlan()) {
- // Replace $sample stage with $sampleFromRandomCursor stage.
- pipeline->popFront();
- std::string idString = collection->ns().isOplog() ? "ts" : "_id";
- pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create(
- expCtx, sampleSize, idString, numRecords));
- }
-
// The order in which we evaluate these arguments is significant. We'd like to be
// sure that the DocumentSourceCursor is created _last_, because if we run into a
// case where a DocumentSourceCursor has been created (yet hasn't been put into a
// Pipeline) and an exception is thrown, an invariant will trigger in the
// DocumentSourceCursor. This is a design flaw in DocumentSourceCursor.
-
auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata);
const auto cursorType = deps.hasNoRequirements()
? DocumentSourceCursor::CursorType::kEmptyDocuments
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 7a9f0524446..0556aacac06 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -148,12 +148,6 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
}
}
-void PlanExecutorPipeline::executePlan() {
- while (_getNext()) {
- // Run the pipeline to completion, but discard the results.
- }
-}
-
std::string PlanExecutorPipeline::getPlanSummary() const {
return PipelineD::getPlanSummaryStr(_pipeline.get());
}
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
index f6ad2a97341..54a66979bfc 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.h
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -46,10 +46,6 @@ public:
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
bool isChangeStream);
- PlanStage* getRootStage() const override {
- MONGO_UNREACHABLE;
- }
-
CanonicalQuery* getCanonicalQuery() const override {
return nullptr;
}
@@ -81,7 +77,20 @@ public:
bool isEOF() override;
- void executePlan() override;
+ // DocumentSource execution is only used for executing aggregation commands, so the interfaces
+ // for executing other CRUD operations are not supported.
+ long long executeCount() override {
+ MONGO_UNREACHABLE;
+ }
+ UpdateResult executeUpdate() override {
+ MONGO_UNREACHABLE;
+ }
+ UpdateResult getUpdateResult() const override {
+ MONGO_UNREACHABLE;
+ }
+ long long executeDelete() override {
+ MONGO_UNREACHABLE;
+ }
void dispose(OperationContext* opCtx) override {
_pipeline->dispose(opCtx);
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index a8137949f57..3cae0550907 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -71,7 +71,7 @@ Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<Express
std::vector<BSONObj>&& objs,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
- auto writeResults = performInserts(
+ auto writeResults = write_ops_exec::performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
// Need to check each result in the batch since the writes are unordered.
@@ -91,8 +91,8 @@ StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::
UpsertType upsert,
bool multi,
boost::optional<OID> targetEpoch) {
- auto writeResults =
- performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+ auto writeResults = write_ops_exec::performUpdates(
+ expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
// Need to check each result in the batch since the writes are unordered.
UpdateResult updateResult;
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index d80405b8aaf..55a0af03740 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -184,6 +184,19 @@ unique_ptr<PlanStageStats> getWinningPlanStatsTree(const PlanExecutorImpl* exec)
: std::move(exec->getRootStage()->getStats());
}
+/**
+ * Executes the given plan executor, discarding the resulting documents, until it reaches EOF. If a
+ * runtime error occur or execution is killed, throws a DBException.
+ *
+ * If 'exec' is configured for yielding, then a call to this helper could result in a yield.
+ */
+void executePlan(PlanExecutor* exec) {
+ BSONObj obj;
+ while (exec->getNext(&obj, nullptr) == PlanExecutor::ADVANCED) {
+ // Discard the resulting documents.
+ }
+}
+
} // namespace
namespace mongo {
@@ -811,7 +824,7 @@ void Explain::explainPipelineExecutor(PlanExecutorPipeline* exec,
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
// TODO SERVER-32732: An execution error should be reported in explain, but should not
// cause the explain itself to fail.
- exec->executePlan();
+ executePlan(exec);
}
*out << "stages" << Value(exec->writeExplainOps(verbosity));
@@ -835,7 +848,7 @@ void Explain::explainStages(PlanExecutor* exec,
// If we need execution stats, then run the plan in order to gather the stats.
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
try {
- exec->executePlan();
+ executePlan(exec);
} catch (const DBException&) {
executePlanStatus = exceptionToStatus();
}
diff --git a/src/mongo/db/query/explain.h b/src/mongo/db/query/explain.h
index 3c692082a03..666959883b2 100644
--- a/src/mongo/db/query/explain.h
+++ b/src/mongo/db/query/explain.h
@@ -145,8 +145,9 @@ public:
* If exec's root stage is a MultiPlanStage, returns the stats for the trial period of of the
* winning plan. Otherwise, returns nullptr.
*
- * Must be called _before_ calling PlanExecutor::executePlan() or PlanExecutor::getNext().
- **/
+ * Must be called _before_ executing the plan with PlanExecutor::getNext()
+ * or the PlanExecutor::execute*() methods.
+ */
static std::unique_ptr<PlanStageStats> getWinningPlanTrialStats(PlanExecutor* exec);
/**
@@ -220,12 +221,13 @@ private:
BSONObjBuilder* topLevelBob);
/**
- * Adds the "executionStats" field to out. Assumes PlanExecutor::executePlan() has been called
- * and that verbosity >= kExecStats.
+ * Adds the "executionStats" field to out. Assumes that the PlanExecutor has already been
+ * executed to the point of reaching EOF. Also assumes that verbosity >= kExecStats.
*
* If verbosity >= kExecAllPlans, it will include the "allPlansExecution" array.
*
- * - 'execPlanStatus' is the value returned after executing the query.
+ * - 'execPlanStatus' is OK if the query was exected successfully, or a non-OK status if there
+ * was a runtime error.
* - 'winningPlanTrialStats' may be nullptr.
**/
static void generateExecutionInfo(PlanExecutor* exec,
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index b271902e162..3fe99fcad2d 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -32,6 +32,7 @@
#include "mongo/base/status.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/update_result.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/plan_yield_policy.h"
@@ -148,11 +149,6 @@ public:
virtual ~PlanExecutor() = default;
/**
- * Get the stage tree wrapped by this executor, without transferring ownership.
- */
- virtual PlanStage* getRootStage() const = 0;
-
- /**
* Get the query that this executor is executing, without transferring ownership.
*/
virtual CanonicalQuery* getCanonicalQuery() const = 0;
@@ -246,14 +242,33 @@ public:
virtual bool isEOF() = 0;
/**
- * Execute the plan to completion, throwing out the results. Used when you want to work the
- * underlying tree without getting results back.
- *
- * If a YIELD_AUTO policy is set on this executor, then this will automatically yield.
- *
- * Throws an exception if this plan results in a runtime error or is killed.
+ * If this plan executor was constructed to execute a count implementation, e.g. it was obtained
+ * by calling 'getExecutorCount()', then executes the count operation and returns the result.
+ * Illegal to call on other plan executors.
+ */
+ virtual long long executeCount() = 0;
+
+ /**
+ * If this plan executor was constructed to execute an update, e.g. it was obtained by calling
+ * 'getExecutorUpdate()', then executes the update operation and returns an 'UpdateResult'
+ * describing the outcome. Illegal to call on other plan executors.
+ */
+ virtual UpdateResult executeUpdate() = 0;
+
+ /**
+ * If this plan executor has already executed an update operation, returns the an 'UpdateResult'
+ * describing the outcome of the update. Illegal to call if either 1) the PlanExecutor is not
+ * an update PlanExecutor, or 2) the PlanExecutor has not yet been executed either with
+ * 'executeUpdate()' or by calling 'getNext()' until end-of-stream.
+ */
+ virtual UpdateResult getUpdateResult() const = 0;
+
+ /**
+ * If this plan executor was constructed to execute a delete, e.g. it was obtained by calling
+ * 'getExecutorDelete()', then executes the delete operation and returns the number of documents
+ * that were deleted. Illegal to call on other plan executors.
*/
- virtual void executePlan() = 0;
+ virtual long long executeDelete() = 0;
//
// Concurrency-related methods.
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index c8f5efe8eea..7be8aa4ef87 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -474,7 +474,7 @@ void PlanExecutorImpl::dispose(OperationContext* opCtx) {
_currentState = kDisposed;
}
-void PlanExecutorImpl::executePlan() {
+void PlanExecutorImpl::_executePlan() {
invariant(_currentState == kUsable);
Document obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
@@ -490,6 +490,82 @@ void PlanExecutorImpl::executePlan() {
invariant(PlanExecutor::IS_EOF == state);
}
+long long PlanExecutorImpl::executeCount() {
+ invariant(_root->stageType() == StageType::STAGE_COUNT ||
+ _root->stageType() == StageType::STAGE_RECORD_STORE_FAST_COUNT);
+
+ _executePlan();
+ auto countStats = static_cast<const CountStats*>(_root->getSpecificStats());
+ return countStats->nCounted;
+}
+
+UpdateResult PlanExecutorImpl::executeUpdate() {
+ _executePlan();
+ return getUpdateResult();
+}
+
+UpdateResult PlanExecutorImpl::getUpdateResult() const {
+ auto updateStatsToResult = [](const UpdateStats& updateStats) -> UpdateResult {
+ return UpdateResult(updateStats.nMatched > 0 /* Did we update at least one obj? */,
+ updateStats.isModUpdate /* Is this a $mod update? */,
+ updateStats.nModified /* number of modified docs, no no-ops */,
+ updateStats.nMatched /* # of docs matched/updated, even no-ops */,
+ updateStats.objInserted);
+ };
+
+ // If we're updating a non-existent collection, then the delete plan may have an EOF as the root
+ // stage.
+ if (_root->stageType() == STAGE_EOF) {
+ const auto stats = std::make_unique<UpdateStats>();
+ return updateStatsToResult(static_cast<const UpdateStats&>(*stats));
+ }
+
+ // If the collection exists, then we expect the root of the plan tree to either be an update
+ // stage, or (for findAndModify) a projection stage wrapping an update stage.
+ switch (_root->stageType()) {
+ case StageType::STAGE_PROJECTION_DEFAULT:
+ case StageType::STAGE_PROJECTION_COVERED:
+ case StageType::STAGE_PROJECTION_SIMPLE: {
+ invariant(_root->getChildren().size() == 1U);
+ invariant(StageType::STAGE_UPDATE == _root->child()->stageType());
+ const SpecificStats* stats = _root->child()->getSpecificStats();
+ return updateStatsToResult(static_cast<const UpdateStats&>(*stats));
+ }
+ default:
+ invariant(StageType::STAGE_UPDATE == _root->stageType());
+ const auto stats = _root->getSpecificStats();
+ return updateStatsToResult(static_cast<const UpdateStats&>(*stats));
+ }
+}
+
+long long PlanExecutorImpl::executeDelete() {
+ _executePlan();
+
+ // If we're deleting from a non-existent collection, then the delete plan may have an EOF as the
+ // root stage.
+ if (_root->stageType() == STAGE_EOF) {
+ return 0LL;
+ }
+
+ // If the collection exists, the delete plan may either have a delete stage at the root, or (for
+ // findAndModify) a projection stage wrapping a delete stage.
+ switch (_root->stageType()) {
+ case StageType::STAGE_PROJECTION_DEFAULT:
+ case StageType::STAGE_PROJECTION_COVERED:
+ case StageType::STAGE_PROJECTION_SIMPLE: {
+ invariant(_root->getChildren().size() == 1U);
+ invariant(StageType::STAGE_DELETE == _root->child()->stageType());
+ const SpecificStats* stats = _root->child()->getSpecificStats();
+ return static_cast<const DeleteStats*>(stats)->docsDeleted;
+ }
+ default: {
+ invariant(StageType::STAGE_DELETE == _root->stageType());
+ const auto* deleteStats = static_cast<const DeleteStats*>(_root->getSpecificStats());
+ return deleteStats->docsDeleted;
+ }
+ }
+}
+
void PlanExecutorImpl::enqueue(const BSONObj& obj) {
_stash.push(Document{obj.getOwned()});
}
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
index f4efb60b8af..865a20fc515 100644
--- a/src/mongo/db/query/plan_executor_impl.h
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -64,7 +64,6 @@ public:
PlanYieldPolicy::YieldPolicy yieldPolicy);
virtual ~PlanExecutorImpl();
- PlanStage* getRootStage() const final;
CanonicalQuery* getCanonicalQuery() const final;
const NamespaceString& nss() const final;
OperationContext* getOpCtx() const final;
@@ -75,7 +74,10 @@ public:
ExecState getNextDocument(Document* objOut, RecordId* dlOut) final;
ExecState getNext(BSONObj* out, RecordId* dlOut) final;
bool isEOF() final;
- void executePlan() final;
+ long long executeCount() override;
+ UpdateResult executeUpdate() override;
+ UpdateResult getUpdateResult() const override;
+ long long executeDelete() override;
void markAsKilled(Status killStatus) final;
void dispose(OperationContext* opCtx) final;
void enqueue(const BSONObj& obj) final;
@@ -101,8 +103,19 @@ public:
*/
MultiPlanStage* getMultiPlanStage() const;
+ PlanStage* getRootStage() const;
+
private:
/**
+ * Executes the underlying PlanStage tree until it indicates EOF. Throws an exception if the
+ * plan results in an error.
+ *
+ * Useful for cases where the caller wishes to execute the plan and extract stats from it (e.g.
+ * the result of a count or update) rather than returning a set of resulting documents.
+ */
+ void _executePlan();
+
+ /**
* Called on construction in order to ensure that when callers receive a new instance of a
* 'PlanExecutorImpl', plan selection has already been completed.
*
diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h
index de16b910fcb..680f76c578e 100644
--- a/src/mongo/db/query/plan_executor_sbe.h
+++ b/src/mongo/db/query/plan_executor_sbe.h
@@ -49,10 +49,6 @@ public:
boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy);
- PlanStage* getRootStage() const override {
- return nullptr;
- }
-
CanonicalQuery* getCanonicalQuery() const override {
return _cq.get();
}
@@ -78,7 +74,22 @@ public:
return _state == State::kClosed;
}
- void executePlan() override {
+ long long executeCount() override {
+ // Using SBE to execute a count command is not yet supported.
+ MONGO_UNREACHABLE;
+ }
+
+ UpdateResult executeUpdate() override {
+ // Using SBE to execute an update command is not yet supported.
+ MONGO_UNREACHABLE;
+ }
+ UpdateResult getUpdateResult() const override {
+ // Using SBE to execute an update command is not yet supported.
+ MONGO_UNREACHABLE;
+ }
+
+ long long executeDelete() override {
+ // Using SBE to execute a delete command is not yet supported.
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 58133d54a59..05fa6ecb4b8 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1230,7 +1230,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
UpdateResult res = update(opCtx, db, request);
- if (res.numMatched == 0 && res.upserted.isEmpty()) {
+ if (res.numMatched == 0 && res.upsertedId.isEmpty()) {
LOGV2_ERROR(21257,
"No document was updated even though we got a DuplicateKey "
"error when inserting");
@@ -1326,7 +1326,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
UpdateResult ur = update(opCtx, db, request);
- if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
+ if (ur.numMatched == 0 && ur.upsertedId.isEmpty()) {
if (collection && collection->isCapped() &&
mode == OplogApplication::Mode::kSecondary) {
// We can't assume there was a problem when the collection is capped,
@@ -1377,7 +1377,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
}
} else if (mode == OplogApplication::Mode::kSecondary && !upsertOplogEntry &&
- !ur.upserted.isEmpty() && !(collection && collection->isCapped())) {
+ !ur.upsertedId.isEmpty() && !(collection && collection->isCapped())) {
// This indicates we upconverted an update to an upsert, and it did indeed
// upsert. In steady state mode this is unexpected.
LOGV2_WARNING(2170001,
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 9a4abeee1bc..2936fff1198 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -937,7 +937,8 @@ Status _updateWithQuery(OperationContext* opCtx,
auto planExecutor = std::move(planExecutorResult.getValue());
try {
- planExecutor->executePlan();
+ // The update result is ignored.
+ [[maybe_unused]] auto updateResult = planExecutor->executeUpdate();
} catch (const WriteConflictException&) {
// Re-throw the WCE, since it will get caught and retried at a higher level.
throw;
@@ -1010,7 +1011,8 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
parsedUpdate.yieldPolicy());
try {
- planExecutor->executePlan();
+ // The update result is ignored.
+ [[maybe_unused]] auto updateResult = planExecutor->executeUpdate();
} catch (const WriteConflictException&) {
// Re-throw the WCE, since it will get caught and retried at a higher level.
throw;
@@ -1088,7 +1090,8 @@ Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
auto planExecutor = std::move(planExecutorResult.getValue());
try {
- planExecutor->executePlan();
+ // The count of deleted documents is ignored.
+ [[maybe_unused]] auto nDeleted = planExecutor->executeDelete();
} catch (const WriteConflictException&) {
// Re-throw the WCE, since it will get caught and retried at a higher level.
throw;
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 1eef517ab29..bbda14ffb22 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -1013,7 +1013,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return toInsert;
}());
- const WriteResult reply = performInserts(opCtx, insertOp, true);
+ const auto reply = write_ops_exec::performInserts(opCtx, insertOp, true);
for (unsigned long i = 0; i < reply.results.size(); ++i) {
uassertStatusOKWithContext(
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index deffdfe9731..353e06e58c9 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -248,7 +248,7 @@ public:
txnParticipant.beginOrContinue(
innerOpCtx.get(), *sessionInfo.getTxnNumber(), boost::none, boost::none);
- const auto reply = performInserts(innerOpCtx.get(), insertRequest);
+ const auto reply = write_ops_exec::performInserts(innerOpCtx.get(), insertRequest);
ASSERT(reply.results.size() == 1);
ASSERT(reply.results[0].isOK());
});
diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp
index b2628aa3334..682b4b14c8b 100644
--- a/src/mongo/db/s/sharding_state_recovery.cpp
+++ b/src/mongo/db/s/sharding_state_recovery.cpp
@@ -165,7 +165,7 @@ Status modifyRecoveryDocument(OperationContext* opCtx,
updateReq.setUpsert();
UpdateResult result = update(opCtx, autoGetOrCreateDb->getDb(), updateReq);
- invariant(result.numDocsModified == 1 || !result.upserted.isEmpty());
+ invariant(result.numDocsModified == 1 || !result.upsertedId.isEmpty());
invariant(result.numMatched <= 1);
// Wait until the majority write concern has been satisfied, but do it outside of lock
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index c075e9cd315..7493feef5b0 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -1544,7 +1544,7 @@ void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, co
audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code());
uassertStatusOK(status);
}
- performInserts(opCtx, insertOp);
+ write_ops_exec::performInserts(opCtx, insertOp);
}
void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) {
@@ -1567,7 +1567,7 @@ void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, co
status.code());
uassertStatusOK(status);
- performUpdates(opCtx, updateOp);
+ write_ops_exec::performUpdates(opCtx, updateOp);
}
void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) {
@@ -1580,7 +1580,7 @@ void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, co
audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code());
uassertStatusOK(status);
- performDeletes(opCtx, deleteOp);
+ write_ops_exec::performDeletes(opCtx, deleteOp);
}
DbResponse receivedGetMore(OperationContext* opCtx,
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index 126b9dcd0d3..f99a991da20 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -385,7 +385,9 @@ private:
direction);
try {
- exec->executePlan();
+ const auto numDeleted = exec->executeDelete();
+ ttlDeletedDocuments.increment(numDeleted);
+ LOGV2_DEBUG(22536, 1, "deleted: {numDeleted}", "numDeleted"_attr = numDeleted);
} catch (const ExceptionFor<ErrorCodes::QueryPlanKilled>&) {
// It is expected that a collection drop can kill a query plan while the TTL monitor is
// deleting an old document, so ignore this error.
@@ -398,10 +400,6 @@ private:
"error"_attr = redact(exception.toStatus()));
return;
}
-
- const long long numDeleted = DeleteStage::getNumDeleted(*exec);
- ttlDeletedDocuments.increment(numDeleted);
- LOGV2_DEBUG(22536, 1, "deleted: {numDeleted}", "numDeleted"_attr = numDeleted);
}
// Protects the state below.
diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp
index 0f946280824..4729de695ea 100644
--- a/src/mongo/dbtests/query_stage_multiplan.cpp
+++ b/src/mongo/dbtests/query_stage_multiplan.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_executor_factory.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/query_planner.h"
@@ -502,7 +503,9 @@ TEST_F(QueryStageMultiPlanTest, MPSExplainAllPlans) {
ctx.getCollection(),
PlanYieldPolicy::YieldPolicy::NO_YIELD));
- auto root = static_cast<MultiPlanStage*>(exec->getRootStage());
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ ASSERT(execImpl);
+ auto root = static_cast<MultiPlanStage*>(execImpl->getRootStage());
ASSERT_TRUE(root->bestPlanChosen());
// The first candidate plan should have won.
ASSERT_EQ(root->bestPlanIdx(), 0);
@@ -551,9 +554,18 @@ TEST_F(QueryStageMultiPlanTest, MPSSummaryStats) {
auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr)));
auto exec = uassertStatusOK(
getExecutor(opCtx(), coll, std::move(cq), PlanYieldPolicy::YieldPolicy::NO_YIELD, 0));
- ASSERT_EQ(exec->getRootStage()->stageType(), STAGE_MULTI_PLAN);
- exec->executePlan();
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ ASSERT(execImpl);
+ ASSERT_EQ(execImpl->getRootStage()->stageType(), StageType::STAGE_MULTI_PLAN);
+
+ // Execute the plan executor util EOF, discarding the results.
+ {
+ BSONObj obj;
+ while (exec->getNext(&obj, nullptr) == PlanExecutor::ADVANCED) {
+ // Do nothing with the documents produced by the executor.
+ }
+ }
PlanSummaryStats stats;
exec->getSummaryStats(&stats);
diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp
index 4e14c38fec9..bf3353d698b 100644
--- a/src/mongo/dbtests/query_stage_sort.cpp
+++ b/src/mongo/dbtests/query_stage_sort.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/exec/sort.h"
#include "mongo/db/json.h"
#include "mongo/db/query/plan_executor_factory.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/dbtests/dbtests.h"
/**
@@ -354,7 +355,13 @@ public:
getRecordIds(&recordIds, coll);
auto exec = makePlanExecutorWithSortStage(coll);
- SortStage* ss = static_cast<SortStageDefault*>(exec->getRootStage());
+
+ // This test is specifically for the classic PlanStage execution engine, so assert that we
+ // have the right kind of PlanExecutor.
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ ASSERT(execImpl);
+
+ SortStage* ss = static_cast<SortStageDefault*>(execImpl->getRootStage());
SortKeyGeneratorStage* keyGenStage =
static_cast<SortKeyGeneratorStage*>(ss->getChildren()[0].get());
QueuedDataStage* queuedDataStage =
@@ -463,7 +470,13 @@ public:
getRecordIds(&recordIds, coll);
auto exec = makePlanExecutorWithSortStage(coll);
- SortStage* ss = static_cast<SortStageDefault*>(exec->getRootStage());
+
+ // This test is specifically for the classic PlanStage execution engine, so assert that we
+ // have the right kind of PlanExecutor.
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ ASSERT(execImpl);
+
+ SortStage* ss = static_cast<SortStageDefault*>(execImpl->getRootStage());
SortKeyGeneratorStage* keyGenStage =
static_cast<SortKeyGeneratorStage*>(ss->getChildren()[0].get());
QueuedDataStage* queuedDataStage =