From ae9dcbeaba01de7526e3ea45902bb9ae2905d537 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Mon, 15 May 2023 14:29:47 +0000 Subject: SERVER-77068 Use collection acquisitions in Update/Delete stages --- src/mongo/db/exec/delete_stage.cpp | 2 +- src/mongo/db/exec/delete_stage.h | 2 +- src/mongo/db/exec/requires_collection_stage.h | 17 +++++++- src/mongo/db/exec/timeseries_modify.cpp | 6 +-- src/mongo/db/exec/timeseries_modify.h | 4 +- src/mongo/db/exec/update_stage.cpp | 61 ++++++++++----------------- src/mongo/db/exec/update_stage.h | 8 +--- src/mongo/db/exec/upsert_stage.cpp | 19 +++------ src/mongo/db/exec/write_stage_common.cpp | 15 ------- src/mongo/db/exec/write_stage_common.h | 18 -------- src/mongo/db/query/get_executor.cpp | 4 +- 11 files changed, 56 insertions(+), 100 deletions(-) diff --git a/src/mongo/db/exec/delete_stage.cpp b/src/mongo/db/exec/delete_stage.cpp index d8e48013d99..c926be68b3b 100644 --- a/src/mongo/db/exec/delete_stage.cpp +++ b/src/mongo/db/exec/delete_stage.cpp @@ -86,7 +86,7 @@ DeleteStage::DeleteStage(const char* stageType, WorkingSet* ws, const ScopedCollectionAcquisition& collection, PlanStage* child) - : RequiresMutableCollectionStage(stageType, expCtx, collection.getCollectionPtr()), + : RequiresWritableCollectionStage(stageType, expCtx, collection), _params(std::move(params)), _ws(ws), _preWriteFilter(opCtx(), collection.nss()), diff --git a/src/mongo/db/exec/delete_stage.h b/src/mongo/db/exec/delete_stage.h index eaa91728693..01e44ee5422 100644 --- a/src/mongo/db/exec/delete_stage.h +++ b/src/mongo/db/exec/delete_stage.h @@ -102,7 +102,7 @@ struct DeleteStageParams { * Callers of work() must be holding a write lock (and, for replicated deletes, callers must have * had the replication coordinator approve the write). */ -class DeleteStage : public RequiresMutableCollectionStage { +class DeleteStage : public RequiresWritableCollectionStage { DeleteStage(const DeleteStage&) = delete; DeleteStage& operator=(const DeleteStage&) = delete; diff --git a/src/mongo/db/exec/requires_collection_stage.h b/src/mongo/db/exec/requires_collection_stage.h index 8265323458b..43a9c854302 100644 --- a/src/mongo/db/exec/requires_collection_stage.h +++ b/src/mongo/db/exec/requires_collection_stage.h @@ -32,6 +32,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/exec/plan_stage.h" +#include "mongo/db/shard_role.h" #include "mongo/util/uuid.h" namespace mongo { @@ -101,6 +102,20 @@ private: }; // Type alias for use by PlanStages that write to a Collection. -using RequiresMutableCollectionStage = RequiresCollectionStage; +class RequiresWritableCollectionStage : public RequiresCollectionStage { +public: + RequiresWritableCollectionStage(const char* stageType, + ExpressionContext* expCtx, + const ScopedCollectionAcquisition& coll) + : RequiresCollectionStage(stageType, expCtx, coll.getCollectionPtr()), + _collectionAcquisition(coll) {} + + const ScopedCollectionAcquisition& collectionAcquisition() const { + return _collectionAcquisition; + } + +private: + const ScopedCollectionAcquisition& _collectionAcquisition; +}; } // namespace mongo diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index a6429311b81..da91c86fb53 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -44,15 +44,15 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, TimeseriesModifyParams&& params, WorkingSet* ws, std::unique_ptr child, - const CollectionPtr& coll, + const ScopedCollectionAcquisition& coll, BucketUnpacker bucketUnpacker, std::unique_ptr residualPredicate) - : RequiresCollectionStage(kStageType, expCtx, coll), + : RequiresWritableCollectionStage(kStageType, expCtx, coll), _params(std::move(params)), _ws(ws), _bucketUnpacker{std::move(bucketUnpacker)}, _residualPredicate(std::move(residualPredicate)), - _preWriteFilter(opCtx(), coll->ns()) { + _preWriteFilter(opCtx(), coll.nss()) { tassert(7308200, "Multi deletes must have a residual predicate", _isSingletonWrite() || _residualPredicate || _params.isUpdate); diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h index 3a36b378ae4..5269c79c6f9 100644 --- a/src/mongo/db/exec/timeseries_modify.h +++ b/src/mongo/db/exec/timeseries_modify.h @@ -98,7 +98,7 @@ struct TimeseriesModifyParams { * The stage processes one bucket at a time, unpacking all the measurements and writing the output * bucket in a single doWork() call. */ -class TimeseriesModifyStage final : public RequiresMutableCollectionStage { +class TimeseriesModifyStage final : public RequiresWritableCollectionStage { public: static const char* kStageType; @@ -106,7 +106,7 @@ public: TimeseriesModifyParams&& params, WorkingSet* ws, std::unique_ptr child, - const CollectionPtr& coll, + const ScopedCollectionAcquisition& coll, BucketUnpacker bucketUnpacker, std::unique_ptr residualPredicate); diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 8b3a192463c..d3068814df0 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -106,11 +106,10 @@ UpdateStage::UpdateStage(ExpressionContext* expCtx, const UpdateStageParams& params, WorkingSet* ws, const ScopedCollectionAcquisition& collection) - : RequiresMutableCollectionStage(kStageType.rawData(), expCtx, collection.getCollectionPtr()), + : RequiresWritableCollectionStage(kStageType.rawData(), expCtx, collection), _params(params), _ws(ws), _doc(params.driver->getDocument()), - _cachedShardingCollectionDescription(collection.nss()), _idRetrying(WorkingSet::INVALID_ID), _idReturning(WorkingSet::INVALID_ID), _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr), @@ -157,11 +156,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted& oldObj, FieldRefSet immutablePaths; if (_isUserInitiatedWrite) { - // Documents coming directly from users should be validated for storage. It is safe to - // access the CollectionShardingState in this write context and to throw SSV if the sharding - // metadata has not been initialized. - const auto& collDesc = - _cachedShardingCollectionDescription.getCollectionDescription(opCtx()); + // Documents coming directly from users should be validated for storage. + const auto& collDesc = collectionAcquisition().getShardingDescription(); if (collDesc.isSharded() && !OperationShardingState::isComingFromRouter(opCtx())) { immutablePaths.fillFrom(collDesc.getKeyPatternFields()); @@ -238,8 +234,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted& oldObj, args.sampleId = request->getSampleId(); args.update = logObj; if (_isUserInitiatedWrite) { - const auto& collDesc = - _cachedShardingCollectionDescription.getCollectionDescription(opCtx()); + const auto& collDesc = collectionAcquisition().getShardingDescription(); args.criteria = collDesc.extractDocumentKey(oldObjValue); } else { const auto docId = oldObjValue[idFieldName]; @@ -600,7 +595,6 @@ void UpdateStage::doRestoreStateRequiresCollection() { } _preWriteFilter.restoreState(); - _cachedShardingCollectionDescription.restoreState(); } std::unique_ptr UpdateStage::getStats() { @@ -694,9 +688,9 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated( void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& shardingWriteRouter, const BSONObj& newObj, const Snapshotted& oldObj) { - const auto& collDesc = shardingWriteRouter.getCollDesc(); + const auto& collDesc = collectionAcquisition().getShardingDescription(); - auto reshardingKeyPattern = collDesc->getReshardingKeyIfShouldForwardOps(); + auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); if (!reshardingKeyPattern) return; @@ -706,8 +700,8 @@ void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& sha if (newShardKey.binaryEqual(oldShardKey)) return; - FieldRefSet shardKeyPaths(collDesc->getKeyPatternFields()); - _checkRestrictionsOnUpdatingShardKeyAreNotViolated(*collDesc, shardKeyPaths); + FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields()); + _checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths); auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value()); auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj); @@ -721,22 +715,11 @@ void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& sha void UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional& newObjCopy, const Snapshotted& oldObj) { - ShardingWriteRouter shardingWriteRouter( - opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache()); - - auto* const css = shardingWriteRouter.getCss(); - - // css can be null when this is a config server. - if (css == nullptr) { - return; - } - - const auto collDesc = css->getCollectionDescription(opCtx()); - // Calling mutablebson::Document::getObject() renders a full copy of the updated document. This // can be expensive for larger documents, so we skip calling it when the collection isn't even // sharded. - if (!collDesc.isSharded()) { + const auto isSharded = collectionAcquisition().getShardingDescription().isSharded(); + if (!isSharded) { return; } @@ -744,15 +727,16 @@ void UpdateStage::checkUpdateChangesShardKeyFields(const boost::optionalns(), Grid::get(opCtx())->catalogCache()); + checkUpdateChangesExistingShardKey(newObj, oldObj); checkUpdateChangesReshardingKey(shardingWriteRouter, newObj, oldObj); } -void UpdateStage::checkUpdateChangesExistingShardKey(const ShardingWriteRouter& shardingWriteRouter, - const BSONObj& newObj, +void UpdateStage::checkUpdateChangesExistingShardKey(const BSONObj& newObj, const Snapshotted& oldObj) { - const auto& collDesc = shardingWriteRouter.getCollDesc(); - const auto& shardKeyPattern = collDesc->getShardKeyPattern(); + const auto& collDesc = collectionAcquisition().getShardingDescription(); + const auto& shardKeyPattern = collDesc.getShardKeyPattern(); auto oldShardKey = shardKeyPattern.extractShardKeyFromDoc(oldObj.value()); auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj); @@ -764,25 +748,24 @@ void UpdateStage::checkUpdateChangesExistingShardKey(const ShardingWriteRouter& return; } - FieldRefSet shardKeyPaths(collDesc->getKeyPatternFields()); + FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields()); // Assert that the updated doc has no arrays or array descendants for the shard key fields. update::assertPathsNotArray(_doc, shardKeyPaths); - _checkRestrictionsOnUpdatingShardKeyAreNotViolated(*collDesc, shardKeyPaths); + _checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths); // At this point we already asserted that the complete shardKey have been specified in the // query, this implies that mongos is not doing a broadcast update and that it attached a // shardVersion to the command. Thus it is safe to call getOwnershipFilter - auto* const css = shardingWriteRouter.getCss(); - const auto collFilter = css->getOwnershipFilter( - opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); + const auto& collFilter = collectionAcquisition().getShardingFilter(); + invariant(collFilter); // If the shard key of an orphan document is allowed to change, and the document is allowed to // become owned by the shard, the global uniqueness assumption for _id values would be violated. - invariant(collFilter.keyBelongsToMe(oldShardKey)); + invariant(collFilter->keyBelongsToMe(oldShardKey)); - if (!collFilter.keyBelongsToMe(newShardKey)) { + if (!collFilter->keyBelongsToMe(newShardKey)) { if (MONGO_unlikely(hangBeforeThrowWouldChangeOwningShard.shouldFail())) { LOGV2(20605, "Hit hangBeforeThrowWouldChangeOwningShard failpoint"); hangBeforeThrowWouldChangeOwningShard.pauseWhileSet(opCtx()); diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h index 9c9bcc483ad..2f7dfd397aa 100644 --- a/src/mongo/db/exec/update_stage.h +++ b/src/mongo/db/exec/update_stage.h @@ -85,7 +85,7 @@ private: * * Callers of doWork() must be holding a write lock. */ -class UpdateStage : public RequiresMutableCollectionStage { +class UpdateStage : public RequiresWritableCollectionStage { UpdateStage(const UpdateStage&) = delete; UpdateStage& operator=(const UpdateStage&) = delete; @@ -144,9 +144,6 @@ protected: mutablebson::Document& _doc; mutablebson::DamageVector _damages; - // Cached collection sharding description. It is reset when restoring from a yield. - write_stage_common::CachedShardingDescription _cachedShardingCollectionDescription; - private: /** * Computes the result of applying mods to the document 'oldObj' at RecordId 'recordId' in @@ -186,8 +183,7 @@ private: * been updated to a value belonging to a chunk that is not owned by this shard. We cannot apply * this update atomically. */ - void checkUpdateChangesExistingShardKey(const ShardingWriteRouter& shardingWriteRouter, - const BSONObj& newObj, + void checkUpdateChangesExistingShardKey(const BSONObj& newObj, const Snapshotted& oldObj); void checkUpdateChangesReshardingKey(const ShardingWriteRouter& shardingWriteRouter, diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index a17f9266205..ce9f0bad19c 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -131,18 +131,14 @@ void UpsertStage::_performInsert(BSONObj newDocument) { // 'q' field belong to this shard, but those in the 'u' field do not. In this case we need to // throw so that MongoS can target the insert to the correct shard. if (_isUserInitiatedWrite) { - const auto& collDesc = - _cachedShardingCollectionDescription.getCollectionDescription(opCtx()); + const auto& collDesc = collectionAcquisition().getShardingDescription(); if (collDesc.isSharded()) { - auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire( - opCtx(), collection()->ns()); - auto collFilter = scopedCss->getOwnershipFilter( - opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); - const ShardKeyPattern& shardKeyPattern = collFilter.getShardKeyPattern(); - auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument); - - if (!collFilter.keyBelongsToMe(newShardKey)) { + const auto& collFilter = collectionAcquisition().getShardingFilter(); + invariant(collFilter); + auto newShardKey = collDesc.getShardKeyPattern().extractShardKeyFromDoc(newDocument); + + if (!collFilter->keyBelongsToMe(newShardKey)) { // An attempt to upsert a document with a shard key value that belongs on another // shard must either be a retryable write or inside a transaction. // An upsert without a transaction number is legal if @@ -203,8 +199,7 @@ BSONObj UpsertStage::_produceNewDocumentForInsert() { FieldRefSet shardKeyPaths, immutablePaths; if (_isUserInitiatedWrite) { // Obtain the collection description. This will be needed to compute the shardKey paths. - const auto& collDesc = - _cachedShardingCollectionDescription.getCollectionDescription(opCtx()); + const auto& collDesc = collectionAcquisition().getShardingDescription(); // If the collection is sharded, add all fields from the shard key to the 'shardKeyPaths' // set. diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp index 8a6c259e8e2..8d80f16cd06 100644 --- a/src/mongo/db/exec/write_stage_common.cpp +++ b/src/mongo/db/exec/write_stage_common.cpp @@ -131,21 +131,6 @@ void PreWriteFilter::logFromMigrate(const Document& doc, "record"_attr = doc); } -void CachedShardingDescription::restoreState() { - _collectionDescription.reset(); -} - -const ScopedCollectionDescription& CachedShardingDescription::getCollectionDescription( - OperationContext* opCtx) { - if (!_collectionDescription) { - const auto scopedCss = - CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, _nss); - _collectionDescription = scopedCss->getCollectionDescription(opCtx); - } - - return *_collectionDescription; -} - bool ensureStillMatches(const CollectionPtr& collection, OperationContext* opCtx, WorkingSet* ws, diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h index 169ef5edc6c..89e260e69bb 100644 --- a/src/mongo/db/exec/write_stage_common.h +++ b/src/mongo/db/exec/write_stage_common.h @@ -35,7 +35,6 @@ #include "mongo/db/exec/shard_filterer.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/s/scoped_collection_metadata.h" namespace mongo { @@ -150,23 +149,6 @@ private: std::unique_ptr _shardFilterer; }; -/** - * This class represents a cached sharding collection description. When resuming from a yield, the - * cache needs to be invalidated. - */ -class CachedShardingDescription { -public: - CachedShardingDescription(const NamespaceString& nss) : _nss(nss) {} - - void restoreState(); - - const ScopedCollectionDescription& getCollectionDescription(OperationContext* opCtx); - -private: - const NamespaceString _nss; - boost::optional _collectionDescription; -}; - /** * Returns true if the document referred to by 'id' still exists and matches the query predicate * given by 'cq'. Returns true if the document still exists and 'cq' is null. Returns false diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 887ea96be96..5c234f05dac 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1929,7 +1929,7 @@ StatusWith> getExecutorDele TimeseriesModifyParams(deleteStageParams.get()), ws.get(), std::move(root), - collectionPtr, + coll, BucketUnpacker(*collectionPtr->getTimeseriesOptions()), parsedDelete->releaseResidualExpr()); } else if (batchDelete) { @@ -2119,7 +2119,7 @@ StatusWith> getExecutorUpda TimeseriesModifyParams(&updateStageParams), ws.get(), std::move(root), - collectionPtr, + coll, BucketUnpacker(*collectionPtr->getTimeseriesOptions()), parsedUpdate->releaseResidualExpr()); } else { -- cgit v1.2.1