summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2023-05-15 14:29:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-15 16:51:56 +0000
commitae9dcbeaba01de7526e3ea45902bb9ae2905d537 (patch)
treee0acefcbe477e9cdad3a37bd45dac0cce88b6931
parentda78632c6843c8e7d79b598cb7d9ff307acf5ada (diff)
downloadmongo-ae9dcbeaba01de7526e3ea45902bb9ae2905d537.tar.gz
SERVER-77068 Use collection acquisitions in Update/Delete stages
-rw-r--r--src/mongo/db/exec/delete_stage.cpp2
-rw-r--r--src/mongo/db/exec/delete_stage.h2
-rw-r--r--src/mongo/db/exec/requires_collection_stage.h17
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp6
-rw-r--r--src/mongo/db/exec/timeseries_modify.h4
-rw-r--r--src/mongo/db/exec/update_stage.cpp61
-rw-r--r--src/mongo/db/exec/update_stage.h8
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp19
-rw-r--r--src/mongo/db/exec/write_stage_common.cpp15
-rw-r--r--src/mongo/db/exec/write_stage_common.h18
-rw-r--r--src/mongo/db/query/get_executor.cpp4
11 files changed, 56 insertions, 100 deletions
diff --git a/src/mongo/db/exec/delete_stage.cpp b/src/mongo/db/exec/delete_stage.cpp
index d8e48013d99..c926be68b3b 100644
--- a/src/mongo/db/exec/delete_stage.cpp
+++ b/src/mongo/db/exec/delete_stage.cpp
@@ -86,7 +86,7 @@ DeleteStage::DeleteStage(const char* stageType,
WorkingSet* ws,
const ScopedCollectionAcquisition& collection,
PlanStage* child)
- : RequiresMutableCollectionStage(stageType, expCtx, collection.getCollectionPtr()),
+ : RequiresWritableCollectionStage(stageType, expCtx, collection),
_params(std::move(params)),
_ws(ws),
_preWriteFilter(opCtx(), collection.nss()),
diff --git a/src/mongo/db/exec/delete_stage.h b/src/mongo/db/exec/delete_stage.h
index eaa91728693..01e44ee5422 100644
--- a/src/mongo/db/exec/delete_stage.h
+++ b/src/mongo/db/exec/delete_stage.h
@@ -102,7 +102,7 @@ struct DeleteStageParams {
* Callers of work() must be holding a write lock (and, for replicated deletes, callers must have
* had the replication coordinator approve the write).
*/
-class DeleteStage : public RequiresMutableCollectionStage {
+class DeleteStage : public RequiresWritableCollectionStage {
DeleteStage(const DeleteStage&) = delete;
DeleteStage& operator=(const DeleteStage&) = delete;
diff --git a/src/mongo/db/exec/requires_collection_stage.h b/src/mongo/db/exec/requires_collection_stage.h
index 8265323458b..43a9c854302 100644
--- a/src/mongo/db/exec/requires_collection_stage.h
+++ b/src/mongo/db/exec/requires_collection_stage.h
@@ -32,6 +32,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/shard_role.h"
#include "mongo/util/uuid.h"
namespace mongo {
@@ -101,6 +102,20 @@ private:
};
// Type alias for use by PlanStages that write to a Collection.
-using RequiresMutableCollectionStage = RequiresCollectionStage;
+class RequiresWritableCollectionStage : public RequiresCollectionStage {
+public:
+ RequiresWritableCollectionStage(const char* stageType,
+ ExpressionContext* expCtx,
+ const ScopedCollectionAcquisition& coll)
+ : RequiresCollectionStage(stageType, expCtx, coll.getCollectionPtr()),
+ _collectionAcquisition(coll) {}
+
+ const ScopedCollectionAcquisition& collectionAcquisition() const {
+ return _collectionAcquisition;
+ }
+
+private:
+ const ScopedCollectionAcquisition& _collectionAcquisition;
+};
} // namespace mongo
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index a6429311b81..da91c86fb53 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -44,15 +44,15 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx,
TimeseriesModifyParams&& params,
WorkingSet* ws,
std::unique_ptr<PlanStage> child,
- const CollectionPtr& coll,
+ const ScopedCollectionAcquisition& coll,
BucketUnpacker bucketUnpacker,
std::unique_ptr<MatchExpression> residualPredicate)
- : RequiresCollectionStage(kStageType, expCtx, coll),
+ : RequiresWritableCollectionStage(kStageType, expCtx, coll),
_params(std::move(params)),
_ws(ws),
_bucketUnpacker{std::move(bucketUnpacker)},
_residualPredicate(std::move(residualPredicate)),
- _preWriteFilter(opCtx(), coll->ns()) {
+ _preWriteFilter(opCtx(), coll.nss()) {
tassert(7308200,
"Multi deletes must have a residual predicate",
_isSingletonWrite() || _residualPredicate || _params.isUpdate);
diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h
index 3a36b378ae4..5269c79c6f9 100644
--- a/src/mongo/db/exec/timeseries_modify.h
+++ b/src/mongo/db/exec/timeseries_modify.h
@@ -98,7 +98,7 @@ struct TimeseriesModifyParams {
* The stage processes one bucket at a time, unpacking all the measurements and writing the output
* bucket in a single doWork() call.
*/
-class TimeseriesModifyStage final : public RequiresMutableCollectionStage {
+class TimeseriesModifyStage final : public RequiresWritableCollectionStage {
public:
static const char* kStageType;
@@ -106,7 +106,7 @@ public:
TimeseriesModifyParams&& params,
WorkingSet* ws,
std::unique_ptr<PlanStage> child,
- const CollectionPtr& coll,
+ const ScopedCollectionAcquisition& coll,
BucketUnpacker bucketUnpacker,
std::unique_ptr<MatchExpression> residualPredicate);
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 8b3a192463c..d3068814df0 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -106,11 +106,10 @@ UpdateStage::UpdateStage(ExpressionContext* expCtx,
const UpdateStageParams& params,
WorkingSet* ws,
const ScopedCollectionAcquisition& collection)
- : RequiresMutableCollectionStage(kStageType.rawData(), expCtx, collection.getCollectionPtr()),
+ : RequiresWritableCollectionStage(kStageType.rawData(), expCtx, collection),
_params(params),
_ws(ws),
_doc(params.driver->getDocument()),
- _cachedShardingCollectionDescription(collection.nss()),
_idRetrying(WorkingSet::INVALID_ID),
_idReturning(WorkingSet::INVALID_ID),
_updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr),
@@ -157,11 +156,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
FieldRefSet immutablePaths;
if (_isUserInitiatedWrite) {
- // Documents coming directly from users should be validated for storage. It is safe to
- // access the CollectionShardingState in this write context and to throw SSV if the sharding
- // metadata has not been initialized.
- const auto& collDesc =
- _cachedShardingCollectionDescription.getCollectionDescription(opCtx());
+ // Documents coming directly from users should be validated for storage.
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
if (collDesc.isSharded() && !OperationShardingState::isComingFromRouter(opCtx())) {
immutablePaths.fillFrom(collDesc.getKeyPatternFields());
@@ -238,8 +234,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
args.sampleId = request->getSampleId();
args.update = logObj;
if (_isUserInitiatedWrite) {
- const auto& collDesc =
- _cachedShardingCollectionDescription.getCollectionDescription(opCtx());
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
args.criteria = collDesc.extractDocumentKey(oldObjValue);
} else {
const auto docId = oldObjValue[idFieldName];
@@ -600,7 +595,6 @@ void UpdateStage::doRestoreStateRequiresCollection() {
}
_preWriteFilter.restoreState();
- _cachedShardingCollectionDescription.restoreState();
}
std::unique_ptr<PlanStageStats> UpdateStage::getStats() {
@@ -694,9 +688,9 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated(
void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& shardingWriteRouter,
const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj) {
- const auto& collDesc = shardingWriteRouter.getCollDesc();
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
- auto reshardingKeyPattern = collDesc->getReshardingKeyIfShouldForwardOps();
+ auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps();
if (!reshardingKeyPattern)
return;
@@ -706,8 +700,8 @@ void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& sha
if (newShardKey.binaryEqual(oldShardKey))
return;
- FieldRefSet shardKeyPaths(collDesc->getKeyPatternFields());
- _checkRestrictionsOnUpdatingShardKeyAreNotViolated(*collDesc, shardKeyPaths);
+ FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields());
+ _checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths);
auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value());
auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj);
@@ -721,22 +715,11 @@ void UpdateStage::checkUpdateChangesReshardingKey(const ShardingWriteRouter& sha
void UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj>& newObjCopy,
const Snapshotted<BSONObj>& oldObj) {
- ShardingWriteRouter shardingWriteRouter(
- opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache());
-
- auto* const css = shardingWriteRouter.getCss();
-
- // css can be null when this is a config server.
- if (css == nullptr) {
- return;
- }
-
- const auto collDesc = css->getCollectionDescription(opCtx());
-
// Calling mutablebson::Document::getObject() renders a full copy of the updated document. This
// can be expensive for larger documents, so we skip calling it when the collection isn't even
// sharded.
- if (!collDesc.isSharded()) {
+ const auto isSharded = collectionAcquisition().getShardingDescription().isSharded();
+ if (!isSharded) {
return;
}
@@ -744,15 +727,16 @@ void UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj
// It is possible that both the existing and new shard keys are being updated, so we do not want
// to short-circuit checking whether either is being modified.
- checkUpdateChangesExistingShardKey(shardingWriteRouter, newObj, oldObj);
+ ShardingWriteRouter shardingWriteRouter(
+ opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache());
+ checkUpdateChangesExistingShardKey(newObj, oldObj);
checkUpdateChangesReshardingKey(shardingWriteRouter, newObj, oldObj);
}
-void UpdateStage::checkUpdateChangesExistingShardKey(const ShardingWriteRouter& shardingWriteRouter,
- const BSONObj& newObj,
+void UpdateStage::checkUpdateChangesExistingShardKey(const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj) {
- const auto& collDesc = shardingWriteRouter.getCollDesc();
- const auto& shardKeyPattern = collDesc->getShardKeyPattern();
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
+ const auto& shardKeyPattern = collDesc.getShardKeyPattern();
auto oldShardKey = shardKeyPattern.extractShardKeyFromDoc(oldObj.value());
auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj);
@@ -764,25 +748,24 @@ void UpdateStage::checkUpdateChangesExistingShardKey(const ShardingWriteRouter&
return;
}
- FieldRefSet shardKeyPaths(collDesc->getKeyPatternFields());
+ FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields());
// Assert that the updated doc has no arrays or array descendants for the shard key fields.
update::assertPathsNotArray(_doc, shardKeyPaths);
- _checkRestrictionsOnUpdatingShardKeyAreNotViolated(*collDesc, shardKeyPaths);
+ _checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths);
// At this point we already asserted that the complete shardKey have been specified in the
// query, this implies that mongos is not doing a broadcast update and that it attached a
// shardVersion to the command. Thus it is safe to call getOwnershipFilter
- auto* const css = shardingWriteRouter.getCss();
- const auto collFilter = css->getOwnershipFilter(
- opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
+ const auto& collFilter = collectionAcquisition().getShardingFilter();
+ invariant(collFilter);
// If the shard key of an orphan document is allowed to change, and the document is allowed to
// become owned by the shard, the global uniqueness assumption for _id values would be violated.
- invariant(collFilter.keyBelongsToMe(oldShardKey));
+ invariant(collFilter->keyBelongsToMe(oldShardKey));
- if (!collFilter.keyBelongsToMe(newShardKey)) {
+ if (!collFilter->keyBelongsToMe(newShardKey)) {
if (MONGO_unlikely(hangBeforeThrowWouldChangeOwningShard.shouldFail())) {
LOGV2(20605, "Hit hangBeforeThrowWouldChangeOwningShard failpoint");
hangBeforeThrowWouldChangeOwningShard.pauseWhileSet(opCtx());
diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h
index 9c9bcc483ad..2f7dfd397aa 100644
--- a/src/mongo/db/exec/update_stage.h
+++ b/src/mongo/db/exec/update_stage.h
@@ -85,7 +85,7 @@ private:
*
* Callers of doWork() must be holding a write lock.
*/
-class UpdateStage : public RequiresMutableCollectionStage {
+class UpdateStage : public RequiresWritableCollectionStage {
UpdateStage(const UpdateStage&) = delete;
UpdateStage& operator=(const UpdateStage&) = delete;
@@ -144,9 +144,6 @@ protected:
mutablebson::Document& _doc;
mutablebson::DamageVector _damages;
- // Cached collection sharding description. It is reset when restoring from a yield.
- write_stage_common::CachedShardingDescription _cachedShardingCollectionDescription;
-
private:
/**
* Computes the result of applying mods to the document 'oldObj' at RecordId 'recordId' in
@@ -186,8 +183,7 @@ private:
* been updated to a value belonging to a chunk that is not owned by this shard. We cannot apply
* this update atomically.
*/
- void checkUpdateChangesExistingShardKey(const ShardingWriteRouter& shardingWriteRouter,
- const BSONObj& newObj,
+ void checkUpdateChangesExistingShardKey(const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj);
void checkUpdateChangesReshardingKey(const ShardingWriteRouter& shardingWriteRouter,
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
index a17f9266205..ce9f0bad19c 100644
--- a/src/mongo/db/exec/upsert_stage.cpp
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -131,18 +131,14 @@ void UpsertStage::_performInsert(BSONObj newDocument) {
// 'q' field belong to this shard, but those in the 'u' field do not. In this case we need to
// throw so that MongoS can target the insert to the correct shard.
if (_isUserInitiatedWrite) {
- const auto& collDesc =
- _cachedShardingCollectionDescription.getCollectionDescription(opCtx());
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
if (collDesc.isSharded()) {
- auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(
- opCtx(), collection()->ns());
- auto collFilter = scopedCss->getOwnershipFilter(
- opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
- const ShardKeyPattern& shardKeyPattern = collFilter.getShardKeyPattern();
- auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument);
-
- if (!collFilter.keyBelongsToMe(newShardKey)) {
+ const auto& collFilter = collectionAcquisition().getShardingFilter();
+ invariant(collFilter);
+ auto newShardKey = collDesc.getShardKeyPattern().extractShardKeyFromDoc(newDocument);
+
+ if (!collFilter->keyBelongsToMe(newShardKey)) {
// An attempt to upsert a document with a shard key value that belongs on another
// shard must either be a retryable write or inside a transaction.
// An upsert without a transaction number is legal if
@@ -203,8 +199,7 @@ BSONObj UpsertStage::_produceNewDocumentForInsert() {
FieldRefSet shardKeyPaths, immutablePaths;
if (_isUserInitiatedWrite) {
// Obtain the collection description. This will be needed to compute the shardKey paths.
- const auto& collDesc =
- _cachedShardingCollectionDescription.getCollectionDescription(opCtx());
+ const auto& collDesc = collectionAcquisition().getShardingDescription();
// If the collection is sharded, add all fields from the shard key to the 'shardKeyPaths'
// set.
diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp
index 8a6c259e8e2..8d80f16cd06 100644
--- a/src/mongo/db/exec/write_stage_common.cpp
+++ b/src/mongo/db/exec/write_stage_common.cpp
@@ -131,21 +131,6 @@ void PreWriteFilter::logFromMigrate(const Document& doc,
"record"_attr = doc);
}
-void CachedShardingDescription::restoreState() {
- _collectionDescription.reset();
-}
-
-const ScopedCollectionDescription& CachedShardingDescription::getCollectionDescription(
- OperationContext* opCtx) {
- if (!_collectionDescription) {
- const auto scopedCss =
- CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, _nss);
- _collectionDescription = scopedCss->getCollectionDescription(opCtx);
- }
-
- return *_collectionDescription;
-}
-
bool ensureStillMatches(const CollectionPtr& collection,
OperationContext* opCtx,
WorkingSet* ws,
diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h
index 169ef5edc6c..89e260e69bb 100644
--- a/src/mongo/db/exec/write_stage_common.h
+++ b/src/mongo/db/exec/write_stage_common.h
@@ -35,7 +35,6 @@
#include "mongo/db/exec/shard_filterer.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/s/scoped_collection_metadata.h"
namespace mongo {
@@ -151,23 +150,6 @@ private:
};
/**
- * This class represents a cached sharding collection description. When resuming from a yield, the
- * cache needs to be invalidated.
- */
-class CachedShardingDescription {
-public:
- CachedShardingDescription(const NamespaceString& nss) : _nss(nss) {}
-
- void restoreState();
-
- const ScopedCollectionDescription& getCollectionDescription(OperationContext* opCtx);
-
-private:
- const NamespaceString _nss;
- boost::optional<ScopedCollectionDescription> _collectionDescription;
-};
-
-/**
* Returns true if the document referred to by 'id' still exists and matches the query predicate
* given by 'cq'. Returns true if the document still exists and 'cq' is null. Returns false
* otherwise.
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 887ea96be96..5c234f05dac 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1929,7 +1929,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele
TimeseriesModifyParams(deleteStageParams.get()),
ws.get(),
std::move(root),
- collectionPtr,
+ coll,
BucketUnpacker(*collectionPtr->getTimeseriesOptions()),
parsedDelete->releaseResidualExpr());
} else if (batchDelete) {
@@ -2119,7 +2119,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpda
TimeseriesModifyParams(&updateStageParams),
ws.get(),
std::move(root),
- collectionPtr,
+ coll,
BucketUnpacker(*collectionPtr->getTimeseriesOptions()),
parsedUpdate->releaseResidualExpr());
} else {