summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2018-11-07 11:04:09 -0500
committerDavid Storch <david.storch@10gen.com>2018-11-12 10:29:15 -0500
commit7369fd49c9d0c348406e08a3308d6e12cdcb057a (patch)
tree4941508e4f61ea73411699161ab25f8bcb129582 /src/mongo/db/exec
parentbc8bfc6b8ad5ebf05090ae49f8fa8bf35d028d28 (diff)
downloadmongo-7369fd49c9d0c348406e08a3308d6e12cdcb057a.tar.gz
SERVER-37446 Make UPDATE and DELETE inherit from RequiresMutableCollectionStage.
Also deletes UpdateLifecyle, which was used as part of the UpdateStage's yield recovery, but is no longer necessary.
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/delete.cpp55
-rw-r--r--src/mongo/db/exec/delete.h16
-rw-r--r--src/mongo/db/exec/requires_collection_stage.cpp9
-rw-r--r--src/mongo/db/exec/requires_collection_stage.h25
-rw-r--r--src/mongo/db/exec/update.cpp95
-rw-r--r--src/mongo/db/exec/update.h16
6 files changed, 121 insertions, 95 deletions
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
index 3b8a6f803c6..a279cdd27f7 100644
--- a/src/mongo/db/exec/delete.cpp
+++ b/src/mongo/db/exec/delete.cpp
@@ -78,19 +78,15 @@ DeleteStage::DeleteStage(OperationContext* opCtx,
WorkingSet* ws,
Collection* collection,
PlanStage* child)
- : PlanStage(kStageType, opCtx),
+ : RequiresMutableCollectionStage(kStageType, opCtx, collection),
_params(params),
_ws(ws),
- _collection(collection),
_idRetrying(WorkingSet::INVALID_ID),
_idReturning(WorkingSet::INVALID_ID) {
_children.emplace_back(child);
}
bool DeleteStage::isEOF() {
- if (!_collection) {
- return true;
- }
if (!_params.isMulti && _specificStats.docsDeleted > 0) {
return true;
}
@@ -102,7 +98,6 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
if (isEOF()) {
return PlanStage::IS_EOF;
}
- invariant(_collection); // If isEOF() returns false, we must have a collection.
// It is possible that after a delete was executed, a WriteConflictException occurred
// and prevented us from returning ADVANCED with the old version of the document.
@@ -169,7 +164,7 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
bool docStillMatches;
try {
docStillMatches = write_stage_common::ensureStillMatches(
- _collection, getOpCtx(), _ws, id, _params.canonicalQuery);
+ collection(), getOpCtx(), _ws, id, _params.canonicalQuery);
} catch (const WriteConflictException&) {
// There was a problem trying to detect if the document still exists, so retry.
memberFreer.Dismiss();
@@ -208,14 +203,14 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
if (!_params.isExplain) {
try {
WriteUnitOfWork wunit(getOpCtx());
- _collection->deleteDocument(getOpCtx(),
- _params.stmtId,
- recordId,
- _params.opDebug,
- _params.fromMigrate,
- false,
- _params.returnDeleted ? Collection::StoreDeletedDoc::On
- : Collection::StoreDeletedDoc::Off);
+ collection()->deleteDocument(getOpCtx(),
+ _params.stmtId,
+ recordId,
+ _params.opDebug,
+ _params.fromMigrate,
+ false,
+ _params.returnDeleted ? Collection::StoreDeletedDoc::On
+ : Collection::StoreDeletedDoc::Off);
wunit.commit();
} catch (const WriteConflictException&) {
memberFreer.Dismiss(); // Keep this member around so we can retry deleting it.
@@ -263,9 +258,8 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
-void DeleteStage::doRestoreState() {
- invariant(_collection);
- const NamespaceString& ns(_collection->ns());
+void DeleteStage::restoreState(RequiresCollTag) {
+ const NamespaceString& ns = collection()->ns();
uassert(ErrorCodes::PrimarySteppedDown,
str::stream() << "Demoted from primary while removing from " << ns.ns(),
!getOpCtx()->writesAreReplicated() ||
@@ -287,11 +281,26 @@ const SpecificStats* DeleteStage::getSpecificStats() const {
// static
long long DeleteStage::getNumDeleted(const PlanExecutor& exec) {
invariant(exec.getRootStage()->isEOF());
- invariant(exec.getRootStage()->stageType() == STAGE_DELETE);
- DeleteStage* deleteStage = static_cast<DeleteStage*>(exec.getRootStage());
- const DeleteStats* deleteStats =
- static_cast<const DeleteStats*>(deleteStage->getSpecificStats());
- return deleteStats->docsDeleted;
+
+ // If we're deleting from a non-existent collection, then the delete plan may have an EOF as the
+ // root stage.
+ if (exec.getRootStage()->stageType() == STAGE_EOF) {
+ return 0LL;
+ }
+
+ // If the collection exists, the delete plan may either have a delete stage at the root, or (for
+ // findAndModify) a projection stage wrapping a delete stage.
+ if (StageType::STAGE_PROJECTION == exec.getRootStage()->stageType()) {
+ invariant(exec.getRootStage()->getChildren().size() == 1U);
+ invariant(StageType::STAGE_DELETE == exec.getRootStage()->child()->stageType());
+ const SpecificStats* stats = exec.getRootStage()->child()->getSpecificStats();
+ return static_cast<const DeleteStats*>(stats)->docsDeleted;
+ } else {
+ invariant(StageType::STAGE_DELETE == exec.getRootStage()->stageType());
+ const auto* deleteStats =
+ static_cast<const DeleteStats*>(exec.getRootStage()->getSpecificStats());
+ return deleteStats->docsDeleted;
+ }
}
PlanStage::StageState DeleteStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) {
diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h
index cfd65c8acec..c3fd5ee5cba 100644
--- a/src/mongo/db/exec/delete.h
+++ b/src/mongo/db/exec/delete.h
@@ -30,7 +30,7 @@
#pragma once
-#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/requires_collection_stage.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/logical_session_id.h"
@@ -85,7 +85,7 @@ struct DeleteStageParams {
* Callers of work() must be holding a write lock (and, for replicated deletes, callers must have
* had the replication coordinator approve the write).
*/
-class DeleteStage final : public PlanStage {
+class DeleteStage final : public RequiresMutableCollectionStage {
MONGO_DISALLOW_COPYING(DeleteStage);
public:
@@ -98,8 +98,6 @@ public:
bool isEOF() final;
StageState doWork(WorkingSetID* out) final;
- void doRestoreState() final;
-
StageType stageType() const final {
return STAGE_DELETE;
}
@@ -117,6 +115,11 @@ public:
*/
static long long getNumDeleted(const PlanExecutor& exec);
+protected:
+ void saveState(RequiresCollTag) final {}
+
+ void restoreState(RequiresCollTag) final;
+
private:
/**
* Stores 'idToRetry' in '_idRetrying' so the delete can be retried during the next call to
@@ -129,11 +132,6 @@ private:
// Not owned by us.
WorkingSet* _ws;
- // Collection to operate on. Not owned by us. Can be NULL (if NULL, isEOF() will always
- // return true). If non-NULL, the lifetime of the collection must supersede that of the
- // stage.
- Collection* _collection;
-
// If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next.
WorkingSetID _idRetrying;
diff --git a/src/mongo/db/exec/requires_collection_stage.cpp b/src/mongo/db/exec/requires_collection_stage.cpp
index 106767eed73..9424f542839 100644
--- a/src/mongo/db/exec/requires_collection_stage.cpp
+++ b/src/mongo/db/exec/requires_collection_stage.cpp
@@ -35,14 +35,16 @@
namespace mongo {
-void RequiresCollectionStage::doSaveState() {
+template <typename CollectionT>
+void RequiresCollectionStageBase<CollectionT>::doSaveState() {
// A stage may not access storage while in a saved state.
_collection = nullptr;
saveState(RequiresCollTag{});
}
-void RequiresCollectionStage::doRestoreState() {
+template <typename CollectionT>
+void RequiresCollectionStageBase<CollectionT>::doRestoreState() {
invariant(!_collection);
const UUIDCatalog& catalog = UUIDCatalog::get(getOpCtx());
@@ -54,4 +56,7 @@ void RequiresCollectionStage::doRestoreState() {
restoreState(RequiresCollTag{});
}
+template class RequiresCollectionStageBase<const Collection*>;
+template class RequiresCollectionStageBase<Collection*>;
+
} // namespace mongo
diff --git a/src/mongo/db/exec/requires_collection_stage.h b/src/mongo/db/exec/requires_collection_stage.h
index f6192331374..273a7185ae4 100644
--- a/src/mongo/db/exec/requires_collection_stage.h
+++ b/src/mongo/db/exec/requires_collection_stage.h
@@ -43,17 +43,24 @@ namespace mongo {
*
* Subclasses must implement the saveStage() and restoreState() variants tagged with RequiresCollTag
* in order to supply custom yield preparation or yield recovery logic.
+ *
+ * Templated on 'CollectionT', which may be instantiated using either Collection* or const
+ * Collection*. This abstracts the implementation of this base class for use by derived classes
+ * which read (e.g. COLLSCAN and MULTI_ITERATOR) and derived classes that write (e.g. UPDATE and
+ * DELETE). Derived classes should use the 'RequiresCollectionStage' or
+ * 'RequiresMutableCollectionStage' aliases provided below.
*/
-class RequiresCollectionStage : public PlanStage {
+template <typename CollectionT>
+class RequiresCollectionStageBase : public PlanStage {
public:
- RequiresCollectionStage(const char* stageType, OperationContext* opCtx, const Collection* coll)
+ RequiresCollectionStageBase(const char* stageType, OperationContext* opCtx, CollectionT coll)
: PlanStage(stageType, opCtx),
_collection(coll),
_collectionUUID(_collection->uuid().get()) {
invariant(_collection);
}
- virtual ~RequiresCollectionStage() = default;
+ virtual ~RequiresCollectionStageBase() = default;
protected:
struct RequiresCollTag {};
@@ -72,17 +79,23 @@ protected:
*/
virtual void restoreState(RequiresCollTag) = 0;
- const Collection* collection() {
+ CollectionT collection() const {
return _collection;
}
- UUID uuid() {
+ UUID uuid() const {
return _collectionUUID;
}
private:
- const Collection* _collection;
+ CollectionT _collection;
const UUID _collectionUUID;
};
+// Type alias for use by PlanStages that read a Collection.
+using RequiresCollectionStage = RequiresCollectionStageBase<const Collection*>;
+
+// Type alias for use by PlanStages that write to a Collection.
+using RequiresMutableCollectionStage = RequiresCollectionStageBase<Collection*>;
+
} // namespace mongo
diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp
index ab14a670f1f..d1f0a3aab77 100644
--- a/src/mongo/db/exec/update.cpp
+++ b/src/mongo/db/exec/update.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/ops/update_lifecycle.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -161,15 +160,16 @@ CollectionUpdateArgs::StoreDocOption getStoreDocMode(const UpdateRequest& update
const char* UpdateStage::kStageType = "UPDATE";
+const UpdateStats UpdateStage::kEmptyUpdateStats;
+
UpdateStage::UpdateStage(OperationContext* opCtx,
const UpdateStageParams& params,
WorkingSet* ws,
Collection* collection,
PlanStage* child)
- : PlanStage(kStageType, opCtx),
+ : RequiresMutableCollectionStage(kStageType, opCtx, collection),
_params(params),
_ws(ws),
- _collection(collection),
_idRetrying(WorkingSet::INVALID_ID),
_idReturning(WorkingSet::INVALID_ID),
_updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL),
@@ -193,7 +193,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
const UpdateRequest* request = _params.request;
UpdateDriver* driver = _params.driver;
CanonicalQuery* cq = _params.canonicalQuery;
- UpdateLifecycle* lifecycle = request->getLifecycle();
// If asked to return new doc, default to the oldObj, in case nothing changes.
BSONObj newObj = oldObj.value();
@@ -205,7 +204,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
// only enable in-place mutations if the underlying storage engine offers support for
// writing damage events.
_doc.reset(oldObj.value(),
- (_collection->updateWithDamagesSupported()
+ (collection()->updateWithDamagesSupported()
? mutablebson::Document::kInPlaceEnabled
: mutablebson::Document::kInPlaceDisabled));
@@ -217,13 +216,10 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
const bool validateForStorage = getOpCtx()->writesAreReplicated() && _enforceOkForStorage;
FieldRefSet immutablePaths;
if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) {
- if (lifecycle) {
- auto immutablePathsVector =
- getImmutableFields(getOpCtx(), request->getNamespaceString());
- if (immutablePathsVector) {
- immutablePaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(*immutablePathsVector));
- }
+ auto immutablePathsVector = getImmutableFields(getOpCtx(), request->getNamespaceString());
+ if (immutablePathsVector) {
+ immutablePaths.fillFrom(
+ transitional_tools_do_not_use::unspool_vector(*immutablePathsVector));
}
immutablePaths.keepShortest(&idFieldRef);
}
@@ -253,7 +249,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
// Skip adding _id field if the collection is capped (since capped collection documents can
// neither grow nor shrink).
- const auto createIdField = !_collection->isCapped();
+ const auto createIdField = !collection()->isCapped();
// Ensure if _id exists it is first
status = ensureIdFieldIsFirst(&_doc);
@@ -288,8 +284,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
RecordId newRecordId;
CollectionUpdateArgs args;
if (!request->isExplain()) {
- invariant(_collection);
- auto* css = CollectionShardingState::get(getOpCtx(), _collection->ns());
+ auto* css = CollectionShardingState::get(getOpCtx(), collection()->ns());
args.stmtId = request->getStmtId();
args.update = logObj;
auto metadata = css->getMetadata(getOpCtx());
@@ -311,7 +306,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
Snapshotted<RecordData> snap(oldObj.snapshotId(), oldRec);
- StatusWith<RecordData> newRecStatus = _collection->updateDocumentWithDamages(
+ StatusWith<RecordData> newRecStatus = collection()->updateDocumentWithDamages(
getOpCtx(), recordId, std::move(snap), source, _damages, &args);
newObj = uassertStatusOK(std::move(newRecStatus)).releaseToBson();
@@ -328,13 +323,13 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
newObj.objsize() <= BSONObjMaxUserSize);
if (!request->isExplain()) {
- newRecordId = _collection->updateDocument(getOpCtx(),
- recordId,
- oldObj,
- newObj,
- driver->modsAffectIndices(),
- _params.opDebug,
- &args);
+ newRecordId = collection()->updateDocument(getOpCtx(),
+ recordId,
+ oldObj,
+ newObj,
+ driver->modsAffectIndices(),
+ _params.opDebug,
+ &args);
}
}
@@ -463,13 +458,12 @@ void UpdateStage::doInsert() {
return;
}
- writeConflictRetry(getOpCtx(), "upsert", _collection->ns().ns(), [&] {
+ writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] {
WriteUnitOfWork wunit(getOpCtx());
- invariant(_collection);
- uassertStatusOK(_collection->insertDocument(getOpCtx(),
- InsertStatement(request->getStmtId(), newObj),
- _params.opDebug,
- request->isFromMigration()));
+ uassertStatusOK(collection()->insertDocument(getOpCtx(),
+ InsertStatement(request->getStmtId(), newObj),
+ _params.opDebug,
+ request->isFromMigration()));
// Technically, we should save/restore state here, but since we are going to return
// immediately after, it would just be wasted work.
@@ -531,10 +525,6 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return PlanStage::IS_EOF;
}
- // If we're here, then we still have to ask for results from the child and apply
- // updates to them. We should only get here if the collection exists.
- invariant(_collection);
-
// It is possible that after an update was applied, a WriteConflictException
// occurred and prevented us from returning ADVANCED with the requested version
// of the document.
@@ -589,7 +579,7 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
bool docStillMatches;
try {
docStillMatches = write_stage_common::ensureStillMatches(
- _collection, getOpCtx(), _ws, id, _params.canonicalQuery);
+ collection(), getOpCtx(), _ws, id, _params.canonicalQuery);
} catch (const WriteConflictException&) {
// There was a problem trying to detect if the document still exists, so retry.
memberFreer.Dismiss();
@@ -702,7 +692,7 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return status;
}
-void UpdateStage::doRestoreState() {
+void UpdateStage::restoreState(RequiresCollTag) {
const UpdateRequest& request = *_params.request;
const NamespaceString& nsString(request.getNamespaceString());
@@ -716,16 +706,10 @@ void UpdateStage::doRestoreState() {
<< nsString.ns());
}
- if (request.getLifecycle()) {
- UpdateLifecycle* lifecycle = request.getLifecycle();
- lifecycle->setCollection(_collection);
-
- if (!lifecycle->canContinue()) {
- uasserted(17270, "Update aborted due to invalid state transitions after yield.");
- }
-
- _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(getOpCtx()));
- }
+ // The set of indices may have changed during yield. Make sure that the update driver has up to
+ // date index information.
+ const auto& updateIndexData = collection()->infoCache()->getIndexKeys(getOpCtx());
+ _params.driver->refreshIndexKeys(&updateIndexData);
}
unique_ptr<PlanStageStats> UpdateStage::getStats() {
@@ -742,9 +726,24 @@ const SpecificStats* UpdateStage::getSpecificStats() const {
const UpdateStats* UpdateStage::getUpdateStats(const PlanExecutor* exec) {
invariant(exec->getRootStage()->isEOF());
- invariant(exec->getRootStage()->stageType() == STAGE_UPDATE);
- UpdateStage* updateStage = static_cast<UpdateStage*>(exec->getRootStage());
- return static_cast<const UpdateStats*>(updateStage->getSpecificStats());
+
+ // If we're updating a non-existent collection, then the delete plan may have an EOF as the root
+ // stage.
+ if (exec->getRootStage()->stageType() == STAGE_EOF) {
+ return &kEmptyUpdateStats;
+ }
+
+ // If the collection exists, then we expect the root of the plan tree to either be an update
+ // stage, or (for findAndModify) a projection stage wrapping an update stage.
+ if (StageType::STAGE_PROJECTION == exec->getRootStage()->stageType()) {
+ invariant(exec->getRootStage()->getChildren().size() == 1U);
+ invariant(StageType::STAGE_UPDATE == exec->getRootStage()->child()->stageType());
+ const SpecificStats* stats = exec->getRootStage()->child()->getSpecificStats();
+ return static_cast<const UpdateStats*>(stats);
+ } else {
+ invariant(StageType::STAGE_UPDATE == exec->getRootStage()->stageType());
+ return static_cast<const UpdateStats*>(exec->getRootStage()->getSpecificStats());
+ }
}
void UpdateStage::recordUpdateStatsInOpDebug(const UpdateStats* updateStats, OpDebug* opDebug) {
diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h
index 6a988609162..21ee75e2612 100644
--- a/src/mongo/db/exec/update.h
+++ b/src/mongo/db/exec/update.h
@@ -32,7 +32,7 @@
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/exec/requires_collection_stage.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/update_result.h"
@@ -75,7 +75,7 @@ private:
*
* Callers of work() must be holding a write lock.
*/
-class UpdateStage final : public PlanStage {
+class UpdateStage final : public RequiresMutableCollectionStage {
MONGO_DISALLOW_COPYING(UpdateStage);
public:
@@ -88,8 +88,6 @@ public:
bool isEOF() final;
StageState doWork(WorkingSetID* out) final;
- void doRestoreState() final;
-
StageType stageType() const final {
return STAGE_UPDATE;
}
@@ -146,7 +144,14 @@ public:
bool enforceOkForStorage,
UpdateStats* stats);
+protected:
+ void saveState(RequiresCollTag) final {}
+
+ void restoreState(RequiresCollTag) final;
+
private:
+ static const UpdateStats kEmptyUpdateStats;
+
/**
* Computes the result of applying mods to the document 'oldObj' at RecordId 'recordId' in
* memory, then commits these changes to the database. Returns a possibly unowned copy
@@ -183,9 +188,6 @@ private:
// Not owned by us.
WorkingSet* _ws;
- // Not owned by us. May be NULL.
- Collection* _collection;
-
// If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next.
WorkingSetID _idRetrying;