summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-03-15 16:28:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-15 17:30:57 +0000
commit83cbc410d85ff65a38abb2b6e60ad9af3107f455 (patch)
treebd76768f11778e8f827663dc6e9a6a5561c3fd8b
parentc51987df9a8b60a98e9f91ff0328208df60427f8 (diff)
downloadmongo-83cbc410d85ff65a38abb2b6e60ad9af3107f455.tar.gz
SERVER-64459 Use ScopedSetShardRole in ShardvrProcessInterface
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp23
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h34
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h17
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h23
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp47
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h13
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h23
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp25
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp5
-rw-r--r--src/mongo/db/s/operation_sharding_state.h5
12 files changed, 71 insertions, 157 deletions
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index e77137c922c..8634d4d86bd 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -229,12 +229,17 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
long long depth = 0;
bool shouldPerformAnotherQuery;
do {
+ std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection>
+ expectUnshardedCollectionInScope;
+
const auto allowForeignSharded = foreignShardedGraphLookupAllowed();
if (!allowForeignSharded) {
// Enforce that the foreign collection must be unsharded for $graphLookup.
- _fromExpCtx->mongoProcessInterface->setExpectedShardVersion(
- _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED());
+ expectUnshardedCollectionInScope =
+ _fromExpCtx->mongoProcessInterface->expectUnshardedCollectionInScope(
+ _fromExpCtx->opCtx, _fromExpCtx->ns, boost::none);
}
+
shouldPerformAnotherQuery = false;
// Check whether each key in the frontier exists in the cache or needs to be queried.
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 95fd6cfc291..718c823e269 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -28,13 +28,8 @@
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-#include "mongo/db/commands/feature_compatibility_version_parser.h"
-#include "mongo/platform/basic.h"
-
#include "mongo/db/pipeline/document_source_lookup.h"
-#include <memory>
-
#include "mongo/base/init.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
@@ -56,10 +51,6 @@
#include "mongo/util/fail_point.h"
namespace mongo {
-
-using boost::intrusive_ptr;
-using std::vector;
-
namespace {
/**
@@ -287,7 +278,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceLookUp::clone() const {
return make_intrusive<DocumentSourceLookUp>(*this);
}
-void validateLookupCollectionlessPipeline(const vector<BSONObj>& pipeline) {
+void validateLookupCollectionlessPipeline(const std::vector<BSONObj>& pipeline) {
uassert(ErrorCodes::FailedToParse,
"$lookup stage without explicit collection must have a pipeline with $documents as "
"first stage",
@@ -552,11 +543,15 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// Resolve the 'let' variables to values per the given input document.
resolveLetVariables(inputDoc, &_fromExpCtx->variables);
+ std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection>
+ expectUnshardedCollectionInScope;
+
const auto allowForeignShardedColl = foreignShardedLookupAllowed();
if (!allowForeignShardedColl) {
// Enforce that the foreign collection must be unsharded for lookup.
- _fromExpCtx->mongoProcessInterface->setExpectedShardVersion(
- _fromExpCtx->opCtx, _fromExpCtx->ns, ChunkVersion::UNSHARDED());
+ expectUnshardedCollectionInScope =
+ _fromExpCtx->mongoProcessInterface->expectUnshardedCollectionInScope(
+ _fromExpCtx->opCtx, _fromExpCtx->ns, boost::none);
}
// If we don't have a cache, build and return the pipeline immediately.
@@ -1169,7 +1164,7 @@ void DocumentSourceLookUp::reattachToOperationContext(OperationContext* opCtx) {
}
}
-intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
+boost::intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(ErrorCodes::FailedToParse,
"the $lookup specification must be an Object",
@@ -1244,7 +1239,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
}
uassert(ErrorCodes::FailedToParse, "must specify 'as' field for a $lookup", !as.empty());
- intrusive_ptr<DocumentSourceLookUp> lookupStage = nullptr;
+ boost::intrusive_ptr<DocumentSourceLookUp> lookupStage = nullptr;
if (hasPipeline) {
if (localField.empty() && foreignField.empty()) {
// $lookup specified with only pipeline syntax.
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index c8d74ddd550..c327385db78 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-#include "mongo/platform/basic.h"
-
#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h"
#include <algorithm>
@@ -60,7 +58,6 @@
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/sbe_plan_cache.h"
#include "mongo/db/repl/primary_only_service.h"
-#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/transaction_coordinator_curop.h"
#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
@@ -77,7 +74,6 @@
#include "mongo/s/query/document_source_merge_cursors.h"
namespace mongo {
-
namespace {
class MongoDResourceYielder : public ResourceYielder {
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 218f222a771..4bf20c07c68 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -434,32 +434,16 @@ public:
ChunkVersion targetCollectionVersion) const = 0;
/**
- * Sets the expected shard version for the given namespace. Invariants if the caller attempts to
- * change an existing shard version, or if the shard version for this namespace has already been
- * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the
- * constraint that the foreign collection must be unsharded if featureFlagShardedLookup is
- * turned off. Also used to enforce that the catalog cache is up-to-date when doing a local
- * read.
+ * Used to enforce the constraint that the foreign collection must be unsharded.
*/
- virtual void setExpectedShardVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) = 0;
-
- /**
- * Sets the expected db version for the given namespace. Return true if the db version is set by
- * this method. Throws an IllegalOperation if the caller attempts to change an existing db
- * version. If the parent operation is unversioned, does nothing and returns false. Used to
- * enforce that the catalog cache is up-to-date when doing a local read.
- */
- virtual bool setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) = 0;
-
- /**
- * Unsets the expected db version for the given namespace. Used as a pair with
- * 'setExpectedDbVersion()' to reset state after a local read is attempted.
- */
- virtual void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) = 0;
+ class ScopedExpectUnshardedCollection {
+ public:
+ virtual ~ScopedExpectUnshardedCollection() = default;
+ };
+ virtual std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion) = 0;
/**
* Checks if this process is on the primary shard for db specified by the given namespace.
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 055a9f1533b..c467e616078 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -214,19 +214,10 @@ public:
MONGO_UNREACHABLE;
}
- void setExpectedShardVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) override {
- MONGO_UNREACHABLE;
- }
-
- bool setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) override {
- MONGO_UNREACHABLE;
- }
-
- void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
+ std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index b2b4f731f4e..ccbe90205c9 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -117,21 +117,16 @@ public:
const NamespaceString& ns,
const std::vector<BSONObj>& indexSpecs) override;
- void setExpectedShardVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) override {
- // Do nothing on a non-shardsvr mongoD.
- }
-
- bool setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) override {
- // Do nothing on a non-shardsvr mongoD.
- return false;
- }
+ std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion) override {
+ class ScopedExpectUnshardedCollectionNoop : public ScopedExpectUnshardedCollection {
+ public:
+ ScopedExpectUnshardedCollectionNoop() = default;
+ };
- void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
- // Do nothing on a non-shardsvr mongoD.
+ return std::make_unique<ScopedExpectUnshardedCollectionNoop>();
}
void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override {
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 204806d839a..69b5a111e2b 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -384,43 +384,28 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin
ownedPipeline, shardTargetingPolicy, std::move(readConcern));
}
-void ShardServerProcessInterface::unsetExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss) {
- auto& oss = OperationShardingState::get(opCtx);
- oss.unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(nss.db());
-}
-
-bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) {
- auto& oss = OperationShardingState::get(opCtx);
-
- if (auto knownDBVersion = oss.getDbVersion(nss.db())) {
- uassert(ErrorCodes::IllegalOperation,
- "Expected db version must match known db version",
- knownDBVersion == dbVersion);
- } else {
- OperationShardingState::setShardRole(opCtx, nss, boost::none /* shardVersion */, dbVersion);
- }
-
- return false;
-}
-
-void ShardServerProcessInterface::setExpectedShardVersion(
+std::unique_ptr<MongoProcessInterface::ScopedExpectUnshardedCollection>
+ShardServerProcessInterface::expectUnshardedCollectionInScope(
OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) {
- auto& oss = OperationShardingState::get(opCtx);
- if (oss.hasShardVersion(nss)) {
- invariant(oss.getShardVersion(nss) == chunkVersion);
- } else {
- OperationShardingState::setShardRole(
- opCtx, nss, chunkVersion, boost::none /* databaseVersion */);
- }
+ const boost::optional<DatabaseVersion>& dbVersion) {
+ class ScopedExpectUnshardedCollectionImpl : public ScopedExpectUnshardedCollection {
+ public:
+ ScopedExpectUnshardedCollectionImpl(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion)
+ : _expectUnsharded(opCtx, nss, ChunkVersion::UNSHARDED(), dbVersion) {}
+
+ private:
+ ScopedSetShardRole _expectUnsharded;
+ };
+
+ return std::make_unique<ScopedExpectUnshardedCollectionImpl>(opCtx, nss, dbVersion);
}
void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opCtx,
const NamespaceString& nss) {
DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db());
}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index d16ae72c444..f6026f6ef3a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -128,15 +128,10 @@ public:
ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
boost::optional<BSONObj> readConcern = boost::none) final;
- void setExpectedShardVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) final;
-
- bool setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) final;
-
- void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final;
+ std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion) override;
void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final;
};
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index e3d83322111..fb30be686fb 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
-
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -267,20 +266,16 @@ public:
return {*fieldPaths, targetCollectionVersion};
}
- void setExpectedShardVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<ChunkVersion> chunkVersion) override {
- // Do nothing.
- }
-
- void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
- // Do nothing.
- }
+ std::unique_ptr<ScopedExpectUnshardedCollection> expectUnshardedCollectionInScope(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<DatabaseVersion>& dbVersion) override {
+ class ScopedExpectUnshardedCollectionNoop : public ScopedExpectUnshardedCollection {
+ public:
+ ScopedExpectUnshardedCollectionNoop() = default;
+ };
- bool setExpectedDbVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- DatabaseVersion dbVersion) override {
- return false;
+ return std::make_unique<ScopedExpectUnshardedCollectionNoop>();
}
void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override {
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 3eaeb1666c8..f64257eedf9 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-#include "mongo/platform/basic.h"
-
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/curop.h"
@@ -54,7 +52,6 @@
#include "mongo/db/vector_clock.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/cluster_query_knobs_gen.h"
#include "mongo/s/query/document_source_merge_cursors.h"
@@ -66,11 +63,10 @@
namespace mongo {
namespace sharded_agg_helpers {
+namespace {
MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors);
-namespace {
-
/**
* Given a document representing an aggregation command such as
* {aggregate: "myCollection", pipeline: [], ...},
@@ -1371,23 +1367,10 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
// do a local read. The primary may be moved right after the primary shard check,
// but the local read path will do a db version check before it establishes a cursor
// to catch this case and ensure we fail to read locally.
- auto setDbVersion = false;
try {
- expCtx->mongoProcessInterface->setExpectedShardVersion(
- expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED());
- setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion(
- expCtx->opCtx, expCtx->ns, cm.dbVersion());
-
- // During 'attachCursorSourceToPipelineForLocalRead', the expected db version
- // must be set. Whether or not that call is succesful, to avoid affecting later
- // operations on this db, we should unset the expected db version on the opCtx,
- // if we set it above.
- ScopeGuard dbVersionUnsetter([&]() {
- if (setDbVersion) {
- expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx,
- expCtx->ns);
- }
- });
+ auto expectUnshardedCollection(
+ expCtx->mongoProcessInterface->expectUnshardedCollectionInScope(
+ expCtx->opCtx, expCtx->ns, cm.dbVersion()));
expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx,
expCtx->ns);
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 256924b4665..1e371ab0d60 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -86,11 +86,6 @@ void OperationShardingState::setShardRole(OperationContext* opCtx,
}
}
-void OperationShardingState::unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(
- const StringData& dbName) {
- _databaseVersions.erase(dbName);
-}
-
bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const {
return _shardVersions.find(nss.ns()) != _shardVersions.end();
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 7a9a20012bb..471016de760 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -117,11 +117,6 @@ public:
const boost::optional<DatabaseVersion>& dbVersion);
/**
- * Removes the databaseVersion stored for the given namespace.
- */
- void unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(const StringData& dbName);
-
- /**
* Returns whether or not there is a shard version for the namespace associated with this
* operation.
*/