diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-04-01 14:23:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-01 15:39:51 +0000 |
commit | c829cc9b78f8beffebf272b3157f557bd475571b (patch) | |
tree | f245d463e69334d6a6f9e737178d473ab1cb2de6 | |
parent | 98ddeb44e4b0f97c92c1c4f22012c0b62142d06a (diff) | |
download | mongo-c829cc9b78f8beffebf272b3157f557bd475571b.tar.gz |
SERVER-64475 Change isOperationVersioned to isComingFromRouter
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/exec/write_stage_common.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner_params.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.h | 16 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 18 |
12 files changed, 41 insertions, 63 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 7c793764ba3..5a273dc2d89 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -184,11 +184,13 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, // metadata has not been initialized. const auto collDesc = CollectionShardingState::get(opCtx(), collection()->ns()) ->getCollectionDescription(opCtx()); - if (collDesc.isSharded() && !OperationShardingState::isOperationVersioned(opCtx())) { + if (collDesc.isSharded() && !OperationShardingState::isComingFromRouter(opCtx())) { immutablePaths.fillFrom(collDesc.getKeyPatternFields()); } + immutablePaths.keepShortest(&idFieldRef); } + if (!driver->needMatchDetails()) { // If we don't need match details, avoid doing the rematch status = driver->update(opCtx(), diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index 66a4f823190..4170e9a6028 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -187,25 +187,26 @@ BSONObj UpsertStage::_produceNewDocumentForInsert() { // Obtain the collection description. This will be needed to compute the shardKey paths. // The collection description must remain in scope since it owns the pointers used by // 'shardKeyPaths' and 'immutablePaths'. - boost::optional<ScopedCollectionDescription> optCollDesc; FieldRefSet shardKeyPaths, immutablePaths; if (_isUserInitiatedWrite) { - optCollDesc.emplace( + const auto collDesc = CollectionShardingState::get(opCtx(), _params.request->getNamespaceString()) - ->getCollectionDescription(opCtx())); + ->getCollectionDescription(opCtx()); // If the collection is sharded, add all fields from the shard key to the 'shardKeyPaths' // set. - if (optCollDesc->isSharded()) { - shardKeyPaths.fillFrom(optCollDesc->getKeyPatternFields()); + if (collDesc.isSharded()) { + shardKeyPaths.fillFrom(collDesc.getKeyPatternFields()); } + // An unversioned request cannot update the shard key, so all shardKey paths are immutable. - if (!OperationShardingState::isOperationVersioned(opCtx())) { + if (!OperationShardingState::isComingFromRouter(opCtx())) { for (auto&& shardKeyPath : shardKeyPaths) { immutablePaths.insert(shardKeyPath); } } + // The _id field is always immutable to user requests, even if the shard key is mutable. immutablePaths.keepShortest(&idFieldRef); } diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp index e341a158cba..886be780f87 100644 --- a/src/mongo/db/exec/write_stage_common.cpp +++ b/src/mongo/db/exec/write_stage_common.cpp @@ -80,8 +80,8 @@ PreWriteFilter::Action PreWriteFilter::computeAction(const Document& doc) { if (docBelongsToMe) return Action::kWrite; else - return OperationShardingState::isOperationVersioned(_opCtx) ? Action::kSkip - : Action::kWriteAsFromMigrate; + return OperationShardingState::isComingFromRouter(_opCtx) ? Action::kSkip + : Action::kWriteAsFromMigrate; } bool PreWriteFilter::_documentBelongsToMe(const BSONObj& doc) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 8cefa6efded..ca46dc0963e 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1433,7 +1433,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind ? PlanYieldPolicy::YieldPolicy::YIELD_AUTO : PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY; - if (OperationShardingState::isOperationVersioned(opCtx)) { + if (OperationShardingState::isComingFromRouter(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } @@ -2182,7 +2182,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCoun } size_t plannerOptions = QueryPlannerParams::IS_COUNT; - if (OperationShardingState::isOperationVersioned(opCtx)) { + + if (OperationShardingState::isComingFromRouter(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index d119679a053..c7ddb2c928a 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -79,9 +79,9 @@ struct QueryPlannerParams { // Set this if you're running on a sharded cluster. We'll add a "drop all docs that // shouldn't be on this shard" stage before projection. // - // In order to set this, you must check OperationShardingState::isOperationVersioned() in - // the same lock that you use to build the query executor. You must also wrap the - // PlanExecutor in a ClientCursor within the same lock. + // In order to set this, you must check OperationShardingState::isComingFromRouter() in the + // same lock that you use to build the query executor. You must also wrap the PlanExecutor + // in a ClientCursor within the same lock. // // See the comment on ShardFilterStage for details. INCLUDE_SHARD_FILTER = 1 << 2, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index f8bf232917c..b4a1d1e88d4 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -540,9 +540,8 @@ std::vector<OpTime> logInsertOps( oplogLink.prevOpTime = opTimes[i - 1]; // Direct inserts to shards of orphan documents should not generate change stream events. - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && - (!oplogEntry.getFromMigrate() || !*oplogEntry.getFromMigrate()) && - !OperationShardingState::isOperationVersioned(opCtx) && + if (!oplogEntry.getFromMigrate().value_or(false) && + !OperationShardingState::isComingFromRouter(opCtx) && preWriteFilter.computeAction(Document(begin[i].doc)) == write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) { LOGV2_DEBUG(6258100, diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index e4cd8c2ffef..a39d3ea92e9 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -61,7 +61,7 @@ const auto kUnshardedCollection = std::make_shared<UnshardedCollection>(); boost::optional<ChunkVersion> getOperationReceivedVersion(OperationContext* opCtx, const NamespaceString& nss) { // If there is a version attached to the OperationContext, use it as the received version. - if (OperationShardingState::isOperationVersioned(opCtx)) { + if (OperationShardingState::isComingFromRouter(opCtx)) { return OperationShardingState::get(opCtx).getShardVersion(nss); } @@ -128,8 +128,7 @@ ScopedCollectionDescription CollectionShardingRuntime::getCollectionDescription( // If the server has been started with --shardsvr, but hasn't been added to a cluster we should // consider all collections as unsharded. Also, return unsharded if no shard version or db // version is present on the context. - if (!ShardingState::get(_serviceContext)->enabled() || - (!OperationShardingState::isOperationVersioned(opCtx) && !oss.hasDbVersion())) { + if (!OperationShardingState::isComingFromRouter(opCtx)) { return {kUnshardedCollection}; } @@ -343,14 +342,10 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt( if (!optReceivedShardVersion && !supportNonVersionedOperations) return kUnshardedCollection; - // Assume that the received shard version was IGNORED if the current operation wasn't verioned. + // Assume that the received shard version was IGNORED if the current operation wasn't versioned const auto& receivedShardVersion = optReceivedShardVersion ? *optReceivedShardVersion : ChunkVersion::IGNORED(); - // An operation with read concern 'available' should never have shardVersion set. - invariant(repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kAvailableReadConcern); - auto csrLock = CSRLock::lockShared(opCtx, this); { diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index feb4e0aaa87..6792234c9fd 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -105,8 +105,8 @@ void mergeChunks(OperationContext* opCtx, uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk( opCtx, nss, ChunkRange(minKey, maxKey)))); - const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx); - if (!isVersioned) { + auto& oss = OperationShardingState::get(opCtx); + if (!oss.getShardVersion(nss)) { onShardVersionMismatch(opCtx, nss, boost::none); } @@ -114,7 +114,7 @@ void mergeChunks(OperationContext* opCtx, AutoGetCollection autoColl(opCtx, nss, MODE_IS); auto csr = CollectionShardingRuntime::get(opCtx, nss); // If there is a version attached to the OperationContext, validate it - if (isVersioned) { + if (oss.getShardVersion(nss)) { csr->checkShardVersionOrThrow(opCtx); } return csr->getCurrentMetadataIfKnown(); diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 1e371ab0d60..0419fb1e895 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -49,9 +49,9 @@ OperationShardingState& OperationShardingState::get(OperationContext* opCtx) { return shardingMetadataDecoration(opCtx); } -bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) { +bool OperationShardingState::isComingFromRouter(OperationContext* opCtx) { const auto& oss = get(opCtx); - return !oss._shardVersions.empty(); + return !oss._databaseVersions.empty() || !oss._shardVersions.empty(); } void OperationShardingState::setShardRole(OperationContext* opCtx, @@ -86,10 +86,6 @@ void OperationShardingState::setShardRole(OperationContext* opCtx, } } -bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const { - return _shardVersions.find(nss.ns()) != _shardVersions.end(); -} - boost::optional<ChunkVersion> OperationShardingState::getShardVersion(const NamespaceString& nss) { const auto it = _shardVersions.find(nss.ns()); if (it != _shardVersions.end()) { diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 471016de760..99bd516295e 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -84,11 +84,11 @@ public: static OperationShardingState& get(OperationContext* opCtx); /** - * Returns true if the the current operation was sent by the caller with shard version - * information attached, meaning that it must perform shard version checking and orphan - * filtering. + * Returns true if the the current operation was sent from an upstream router, rather than it + * being a direct connection against the shard. The way this decision is made is based on + * whether there is shard version declared for any namespace. */ - static bool isOperationVersioned(OperationContext* opCtx); + static bool isComingFromRouter(OperationContext* opCtx); /** * NOTE: DO NOT ADD any new usages of this class without including someone from the Sharding @@ -117,12 +117,6 @@ public: const boost::optional<DatabaseVersion>& dbVersion); /** - * Returns whether or not there is a shard version for the namespace associated with this - * operation. - */ - bool hasShardVersion(const NamespaceString& nss) const; - - /** * Returns the shard version (i.e. maximum chunk version) of a namespace being used by the * operation. Documents in chunks which did not belong on this shard at this shard version * will be filtered out. @@ -179,6 +173,7 @@ private: ShardVersionTracker(ChunkVersion v) : v(v) {} ShardVersionTracker(ShardVersionTracker&&) = default; ShardVersionTracker(const ShardVersionTracker&) = delete; + ShardVersionTracker& operator=(const ShardVersionTracker&) = delete; ChunkVersion v; int recursion{0}; }; @@ -189,6 +184,7 @@ private: DatabaseVersionTracker(DatabaseVersion v) : v(v) {} DatabaseVersionTracker(DatabaseVersionTracker&&) = default; DatabaseVersionTracker(const DatabaseVersionTracker&) = delete; + DatabaseVersionTracker& operator=(const DatabaseVersionTracker&) = delete; DatabaseVersion v; int recursion{0}; }; diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index f3c2abb8ac7..06905f20ecc 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -129,10 +129,8 @@ protected: {std::move(chunk)})), boost::none); - if (!OperationShardingState::isOperationVersioned(opCtx)) { - OperationShardingState::setShardRole( - opCtx, nss, cm.getVersion(kThisShard.getShardId()), boost::none); - } + OperationShardingState::setShardRole( + opCtx, nss, cm.getVersion(kThisShard.getShardId()), boost::none); return CollectionMetadata(std::move(cm), kThisShard.getShardId()); } @@ -575,6 +573,6 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); } } -} // namespace +} // namespace } // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 53ed2f413e0..1e8c6e9cbd6 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1620,22 +1620,12 @@ Future<void> ExecCommandDatabase::_commandExec() { _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request); _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get()); - const auto dbname = request.getDatabase().toString(); - const bool iAmPrimary = - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (!opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && - (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - auto& oss = OperationShardingState::get(opCtx); - auto const shardingState = ShardingState::get(opCtx); - if (OperationShardingState::isOperationVersioned(opCtx) || oss.hasDbVersion()) { - uassertStatusOK(shardingState->canAcceptShardedCommands()); - } - } - _execContext->getReplyBuilder()->reset(); + if (OperationShardingState::isComingFromRouter(opCtx)) { + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + } + auto runCommand = [&] { if (getInvocation()->supportsWriteConcern() || getInvocation()->definition()->getLogicalOp() == LogicalOp::opGetMore) { |