diff options
author | David Storch <david.storch@10gen.com> | 2018-11-07 11:04:09 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2018-11-12 10:29:15 -0500 |
commit | 7369fd49c9d0c348406e08a3308d6e12cdcb057a (patch) | |
tree | 4941508e4f61ea73411699161ab25f8bcb129582 /src/mongo/db/exec | |
parent | bc8bfc6b8ad5ebf05090ae49f8fa8bf35d028d28 (diff) | |
download | mongo-7369fd49c9d0c348406e08a3308d6e12cdcb057a.tar.gz |
SERVER-37446 Make UPDATE and DELETE inherit from RequiresMutableCollectionStage.
Also deletes UpdateLifecyle, which was used as part of the
UpdateStage's yield recovery, but is no longer necessary.
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/delete.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/exec/delete.h | 16 | ||||
-rw-r--r-- | src/mongo/db/exec/requires_collection_stage.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/exec/requires_collection_stage.h | 25 | ||||
-rw-r--r-- | src/mongo/db/exec/update.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/exec/update.h | 16 |
6 files changed, 121 insertions, 95 deletions
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 3b8a6f803c6..a279cdd27f7 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -78,19 +78,15 @@ DeleteStage::DeleteStage(OperationContext* opCtx, WorkingSet* ws, Collection* collection, PlanStage* child) - : PlanStage(kStageType, opCtx), + : RequiresMutableCollectionStage(kStageType, opCtx, collection), _params(params), _ws(ws), - _collection(collection), _idRetrying(WorkingSet::INVALID_ID), _idReturning(WorkingSet::INVALID_ID) { _children.emplace_back(child); } bool DeleteStage::isEOF() { - if (!_collection) { - return true; - } if (!_params.isMulti && _specificStats.docsDeleted > 0) { return true; } @@ -102,7 +98,6 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { if (isEOF()) { return PlanStage::IS_EOF; } - invariant(_collection); // If isEOF() returns false, we must have a collection. // It is possible that after a delete was executed, a WriteConflictException occurred // and prevented us from returning ADVANCED with the old version of the document. @@ -169,7 +164,7 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { bool docStillMatches; try { docStillMatches = write_stage_common::ensureStillMatches( - _collection, getOpCtx(), _ws, id, _params.canonicalQuery); + collection(), getOpCtx(), _ws, id, _params.canonicalQuery); } catch (const WriteConflictException&) { // There was a problem trying to detect if the document still exists, so retry. memberFreer.Dismiss(); @@ -208,14 +203,14 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { if (!_params.isExplain) { try { WriteUnitOfWork wunit(getOpCtx()); - _collection->deleteDocument(getOpCtx(), - _params.stmtId, - recordId, - _params.opDebug, - _params.fromMigrate, - false, - _params.returnDeleted ? Collection::StoreDeletedDoc::On - : Collection::StoreDeletedDoc::Off); + collection()->deleteDocument(getOpCtx(), + _params.stmtId, + recordId, + _params.opDebug, + _params.fromMigrate, + false, + _params.returnDeleted ? Collection::StoreDeletedDoc::On + : Collection::StoreDeletedDoc::Off); wunit.commit(); } catch (const WriteConflictException&) { memberFreer.Dismiss(); // Keep this member around so we can retry deleting it. @@ -263,9 +258,8 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } -void DeleteStage::doRestoreState() { - invariant(_collection); - const NamespaceString& ns(_collection->ns()); +void DeleteStage::restoreState(RequiresCollTag) { + const NamespaceString& ns = collection()->ns(); uassert(ErrorCodes::PrimarySteppedDown, str::stream() << "Demoted from primary while removing from " << ns.ns(), !getOpCtx()->writesAreReplicated() || @@ -287,11 +281,26 @@ const SpecificStats* DeleteStage::getSpecificStats() const { // static long long DeleteStage::getNumDeleted(const PlanExecutor& exec) { invariant(exec.getRootStage()->isEOF()); - invariant(exec.getRootStage()->stageType() == STAGE_DELETE); - DeleteStage* deleteStage = static_cast<DeleteStage*>(exec.getRootStage()); - const DeleteStats* deleteStats = - static_cast<const DeleteStats*>(deleteStage->getSpecificStats()); - return deleteStats->docsDeleted; + + // If we're deleting from a non-existent collection, then the delete plan may have an EOF as the + // root stage. + if (exec.getRootStage()->stageType() == STAGE_EOF) { + return 0LL; + } + + // If the collection exists, the delete plan may either have a delete stage at the root, or (for + // findAndModify) a projection stage wrapping a delete stage. + if (StageType::STAGE_PROJECTION == exec.getRootStage()->stageType()) { + invariant(exec.getRootStage()->getChildren().size() == 1U); + invariant(StageType::STAGE_DELETE == exec.getRootStage()->child()->stageType()); + const SpecificStats* stats = exec.getRootStage()->child()->getSpecificStats(); + return static_cast<const DeleteStats*>(stats)->docsDeleted; + } else { + invariant(StageType::STAGE_DELETE == exec.getRootStage()->stageType()); + const auto* deleteStats = + static_cast<const DeleteStats*>(exec.getRootStage()->getSpecificStats()); + return deleteStats->docsDeleted; + } } PlanStage::StageState DeleteStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) { diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h index cfd65c8acec..c3fd5ee5cba 100644 --- a/src/mongo/db/exec/delete.h +++ b/src/mongo/db/exec/delete.h @@ -30,7 +30,7 @@ #pragma once -#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/requires_collection_stage.h" #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id.h" @@ -85,7 +85,7 @@ struct DeleteStageParams { * Callers of work() must be holding a write lock (and, for replicated deletes, callers must have * had the replication coordinator approve the write). */ -class DeleteStage final : public PlanStage { +class DeleteStage final : public RequiresMutableCollectionStage { MONGO_DISALLOW_COPYING(DeleteStage); public: @@ -98,8 +98,6 @@ public: bool isEOF() final; StageState doWork(WorkingSetID* out) final; - void doRestoreState() final; - StageType stageType() const final { return STAGE_DELETE; } @@ -117,6 +115,11 @@ public: */ static long long getNumDeleted(const PlanExecutor& exec); +protected: + void saveState(RequiresCollTag) final {} + + void restoreState(RequiresCollTag) final; + private: /** * Stores 'idToRetry' in '_idRetrying' so the delete can be retried during the next call to @@ -129,11 +132,6 @@ private: // Not owned by us. WorkingSet* _ws; - // Collection to operate on. Not owned by us. Can be NULL (if NULL, isEOF() will always - // return true). If non-NULL, the lifetime of the collection must supersede that of the - // stage. - Collection* _collection; - // If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next. WorkingSetID _idRetrying; diff --git a/src/mongo/db/exec/requires_collection_stage.cpp b/src/mongo/db/exec/requires_collection_stage.cpp index 106767eed73..9424f542839 100644 --- a/src/mongo/db/exec/requires_collection_stage.cpp +++ b/src/mongo/db/exec/requires_collection_stage.cpp @@ -35,14 +35,16 @@ namespace mongo { -void RequiresCollectionStage::doSaveState() { +template <typename CollectionT> +void RequiresCollectionStageBase<CollectionT>::doSaveState() { // A stage may not access storage while in a saved state. _collection = nullptr; saveState(RequiresCollTag{}); } -void RequiresCollectionStage::doRestoreState() { +template <typename CollectionT> +void RequiresCollectionStageBase<CollectionT>::doRestoreState() { invariant(!_collection); const UUIDCatalog& catalog = UUIDCatalog::get(getOpCtx()); @@ -54,4 +56,7 @@ void RequiresCollectionStage::doRestoreState() { restoreState(RequiresCollTag{}); } +template class RequiresCollectionStageBase<const Collection*>; +template class RequiresCollectionStageBase<Collection*>; + } // namespace mongo diff --git a/src/mongo/db/exec/requires_collection_stage.h b/src/mongo/db/exec/requires_collection_stage.h index f6192331374..273a7185ae4 100644 --- a/src/mongo/db/exec/requires_collection_stage.h +++ b/src/mongo/db/exec/requires_collection_stage.h @@ -43,17 +43,24 @@ namespace mongo { * * Subclasses must implement the saveStage() and restoreState() variants tagged with RequiresCollTag * in order to supply custom yield preparation or yield recovery logic. + * + * Templated on 'CollectionT', which may be instantiated using either Collection* or const + * Collection*. This abstracts the implementation of this base class for use by derived classes + * which read (e.g. COLLSCAN and MULTI_ITERATOR) and derived classes that write (e.g. UPDATE and + * DELETE). Derived classes should use the 'RequiresCollectionStage' or + * 'RequiresMutableCollectionStage' aliases provided below. */ -class RequiresCollectionStage : public PlanStage { +template <typename CollectionT> +class RequiresCollectionStageBase : public PlanStage { public: - RequiresCollectionStage(const char* stageType, OperationContext* opCtx, const Collection* coll) + RequiresCollectionStageBase(const char* stageType, OperationContext* opCtx, CollectionT coll) : PlanStage(stageType, opCtx), _collection(coll), _collectionUUID(_collection->uuid().get()) { invariant(_collection); } - virtual ~RequiresCollectionStage() = default; + virtual ~RequiresCollectionStageBase() = default; protected: struct RequiresCollTag {}; @@ -72,17 +79,23 @@ protected: */ virtual void restoreState(RequiresCollTag) = 0; - const Collection* collection() { + CollectionT collection() const { return _collection; } - UUID uuid() { + UUID uuid() const { return _collectionUUID; } private: - const Collection* _collection; + CollectionT _collection; const UUID _collectionUUID; }; +// Type alias for use by PlanStages that read a Collection. +using RequiresCollectionStage = RequiresCollectionStageBase<const Collection*>; + +// Type alias for use by PlanStages that write to a Collection. +using RequiresMutableCollectionStage = RequiresCollectionStageBase<Collection*>; + } // namespace mongo diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index ab14a670f1f..d1f0a3aab77 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -43,7 +43,6 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/exec/write_stage_common.h" #include "mongo/db/op_observer.h" -#include "mongo/db/ops/update_lifecycle.h" #include "mongo/db/query/explain.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" @@ -161,15 +160,16 @@ CollectionUpdateArgs::StoreDocOption getStoreDocMode(const UpdateRequest& update const char* UpdateStage::kStageType = "UPDATE"; +const UpdateStats UpdateStage::kEmptyUpdateStats; + UpdateStage::UpdateStage(OperationContext* opCtx, const UpdateStageParams& params, WorkingSet* ws, Collection* collection, PlanStage* child) - : PlanStage(kStageType, opCtx), + : RequiresMutableCollectionStage(kStageType, opCtx, collection), _params(params), _ws(ws), - _collection(collection), _idRetrying(WorkingSet::INVALID_ID), _idReturning(WorkingSet::INVALID_ID), _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL), @@ -193,7 +193,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco const UpdateRequest* request = _params.request; UpdateDriver* driver = _params.driver; CanonicalQuery* cq = _params.canonicalQuery; - UpdateLifecycle* lifecycle = request->getLifecycle(); // If asked to return new doc, default to the oldObj, in case nothing changes. BSONObj newObj = oldObj.value(); @@ -205,7 +204,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco // only enable in-place mutations if the underlying storage engine offers support for // writing damage events. _doc.reset(oldObj.value(), - (_collection->updateWithDamagesSupported() + (collection()->updateWithDamagesSupported() ? mutablebson::Document::kInPlaceEnabled : mutablebson::Document::kInPlaceDisabled)); @@ -217,13 +216,10 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco const bool validateForStorage = getOpCtx()->writesAreReplicated() && _enforceOkForStorage; FieldRefSet immutablePaths; if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) { - if (lifecycle) { - auto immutablePathsVector = - getImmutableFields(getOpCtx(), request->getNamespaceString()); - if (immutablePathsVector) { - immutablePaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(*immutablePathsVector)); - } + auto immutablePathsVector = getImmutableFields(getOpCtx(), request->getNamespaceString()); + if (immutablePathsVector) { + immutablePaths.fillFrom( + transitional_tools_do_not_use::unspool_vector(*immutablePathsVector)); } immutablePaths.keepShortest(&idFieldRef); } @@ -253,7 +249,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco // Skip adding _id field if the collection is capped (since capped collection documents can // neither grow nor shrink). - const auto createIdField = !_collection->isCapped(); + const auto createIdField = !collection()->isCapped(); // Ensure if _id exists it is first status = ensureIdFieldIsFirst(&_doc); @@ -288,8 +284,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco RecordId newRecordId; CollectionUpdateArgs args; if (!request->isExplain()) { - invariant(_collection); - auto* css = CollectionShardingState::get(getOpCtx(), _collection->ns()); + auto* css = CollectionShardingState::get(getOpCtx(), collection()->ns()); args.stmtId = request->getStmtId(); args.update = logObj; auto metadata = css->getMetadata(getOpCtx()); @@ -311,7 +306,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco Snapshotted<RecordData> snap(oldObj.snapshotId(), oldRec); - StatusWith<RecordData> newRecStatus = _collection->updateDocumentWithDamages( + StatusWith<RecordData> newRecStatus = collection()->updateDocumentWithDamages( getOpCtx(), recordId, std::move(snap), source, _damages, &args); newObj = uassertStatusOK(std::move(newRecStatus)).releaseToBson(); @@ -328,13 +323,13 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco newObj.objsize() <= BSONObjMaxUserSize); if (!request->isExplain()) { - newRecordId = _collection->updateDocument(getOpCtx(), - recordId, - oldObj, - newObj, - driver->modsAffectIndices(), - _params.opDebug, - &args); + newRecordId = collection()->updateDocument(getOpCtx(), + recordId, + oldObj, + newObj, + driver->modsAffectIndices(), + _params.opDebug, + &args); } } @@ -463,13 +458,12 @@ void UpdateStage::doInsert() { return; } - writeConflictRetry(getOpCtx(), "upsert", _collection->ns().ns(), [&] { + writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] { WriteUnitOfWork wunit(getOpCtx()); - invariant(_collection); - uassertStatusOK(_collection->insertDocument(getOpCtx(), - InsertStatement(request->getStmtId(), newObj), - _params.opDebug, - request->isFromMigration())); + uassertStatusOK(collection()->insertDocument(getOpCtx(), + InsertStatement(request->getStmtId(), newObj), + _params.opDebug, + request->isFromMigration())); // Technically, we should save/restore state here, but since we are going to return // immediately after, it would just be wasted work. @@ -531,10 +525,6 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return PlanStage::IS_EOF; } - // If we're here, then we still have to ask for results from the child and apply - // updates to them. We should only get here if the collection exists. - invariant(_collection); - // It is possible that after an update was applied, a WriteConflictException // occurred and prevented us from returning ADVANCED with the requested version // of the document. @@ -589,7 +579,7 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { bool docStillMatches; try { docStillMatches = write_stage_common::ensureStillMatches( - _collection, getOpCtx(), _ws, id, _params.canonicalQuery); + collection(), getOpCtx(), _ws, id, _params.canonicalQuery); } catch (const WriteConflictException&) { // There was a problem trying to detect if the document still exists, so retry. memberFreer.Dismiss(); @@ -702,7 +692,7 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return status; } -void UpdateStage::doRestoreState() { +void UpdateStage::restoreState(RequiresCollTag) { const UpdateRequest& request = *_params.request; const NamespaceString& nsString(request.getNamespaceString()); @@ -716,16 +706,10 @@ void UpdateStage::doRestoreState() { << nsString.ns()); } - if (request.getLifecycle()) { - UpdateLifecycle* lifecycle = request.getLifecycle(); - lifecycle->setCollection(_collection); - - if (!lifecycle->canContinue()) { - uasserted(17270, "Update aborted due to invalid state transitions after yield."); - } - - _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(getOpCtx())); - } + // The set of indices may have changed during yield. Make sure that the update driver has up to + // date index information. + const auto& updateIndexData = collection()->infoCache()->getIndexKeys(getOpCtx()); + _params.driver->refreshIndexKeys(&updateIndexData); } unique_ptr<PlanStageStats> UpdateStage::getStats() { @@ -742,9 +726,24 @@ const SpecificStats* UpdateStage::getSpecificStats() const { const UpdateStats* UpdateStage::getUpdateStats(const PlanExecutor* exec) { invariant(exec->getRootStage()->isEOF()); - invariant(exec->getRootStage()->stageType() == STAGE_UPDATE); - UpdateStage* updateStage = static_cast<UpdateStage*>(exec->getRootStage()); - return static_cast<const UpdateStats*>(updateStage->getSpecificStats()); + + // If we're updating a non-existent collection, then the delete plan may have an EOF as the root + // stage. + if (exec->getRootStage()->stageType() == STAGE_EOF) { + return &kEmptyUpdateStats; + } + + // If the collection exists, then we expect the root of the plan tree to either be an update + // stage, or (for findAndModify) a projection stage wrapping an update stage. + if (StageType::STAGE_PROJECTION == exec->getRootStage()->stageType()) { + invariant(exec->getRootStage()->getChildren().size() == 1U); + invariant(StageType::STAGE_UPDATE == exec->getRootStage()->child()->stageType()); + const SpecificStats* stats = exec->getRootStage()->child()->getSpecificStats(); + return static_cast<const UpdateStats*>(stats); + } else { + invariant(StageType::STAGE_UPDATE == exec->getRootStage()->stageType()); + return static_cast<const UpdateStats*>(exec->getRootStage()->getSpecificStats()); + } } void UpdateStage::recordUpdateStatsInOpDebug(const UpdateStats* updateStats, OpDebug* opDebug) { diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h index 6a988609162..21ee75e2612 100644 --- a/src/mongo/db/exec/update.h +++ b/src/mongo/db/exec/update.h @@ -32,7 +32,7 @@ #include "mongo/db/catalog/collection.h" -#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/exec/requires_collection_stage.h" #include "mongo/db/jsobj.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/update_result.h" @@ -75,7 +75,7 @@ private: * * Callers of work() must be holding a write lock. */ -class UpdateStage final : public PlanStage { +class UpdateStage final : public RequiresMutableCollectionStage { MONGO_DISALLOW_COPYING(UpdateStage); public: @@ -88,8 +88,6 @@ public: bool isEOF() final; StageState doWork(WorkingSetID* out) final; - void doRestoreState() final; - StageType stageType() const final { return STAGE_UPDATE; } @@ -146,7 +144,14 @@ public: bool enforceOkForStorage, UpdateStats* stats); +protected: + void saveState(RequiresCollTag) final {} + + void restoreState(RequiresCollTag) final; + private: + static const UpdateStats kEmptyUpdateStats; + /** * Computes the result of applying mods to the document 'oldObj' at RecordId 'recordId' in * memory, then commits these changes to the database. Returns a possibly unowned copy @@ -183,9 +188,6 @@ private: // Not owned by us. WorkingSet* _ws; - // Not owned by us. May be NULL. - Collection* _collection; - // If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next. WorkingSetID _idRetrying; |