summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorHana Pearlman <hana.pearlman@mongodb.com>2021-09-30 15:50:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-30 16:35:28 +0000
commit6c4a4bb88aa29b8edb42690bf123a43d72a4c621 (patch)
treec7ac8b237ca7a39719415fe2dec101d627305ea8 /src/mongo/db/pipeline
parent85a5fe32e3932fd834993cbf32d2115fffae4e01 (diff)
downloadmongo-6c4a4bb88aa29b8edb42690bf123a43d72a4c621.tar.gz
SERVER-58376: Perform local reads when on the primary reading from an unsharded coll
Co-authored-by: Alya Carina Berciu alya.berciu@mongodb.com
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h28
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h15
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp29
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp57
-rw-r--r--src/mongo/db/pipeline/sharded_union_test.cpp48
8 files changed, 200 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 1be99ee1826..64d23c976a4 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -430,13 +430,37 @@ public:
* Sets the expected shard version for the given namespace. Invariants if the caller attempts to
* change an existing shard version, or if the shard version for this namespace has already been
* checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the
- * constraint that the foreign collection must be unsharded. If the parent operation is
- * unversioned, this method does nothing.
+ * constraint that the foreign collection must be unsharded if featureFlagShardedLookup is
+ * turned off. Also used to enforce that the catalog cache is up-to-date when doing a local
+ * read.If the parent operation is unversioned, this method does nothing.
*/
virtual void setExpectedShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<ChunkVersion> chunkVersion) = 0;
+ /**
+ * Sets the expected db version for the given namespace. Return true if the db version is set by
+ * this method. Throws an IllegalOperation if the caller attempts to change an existing db
+ * version. If the parent operation is unversioned, does nothing and returns false. Used to
+ * enforce that the catalog cache is up-to-date when doing a local read.
+ */
+ virtual bool setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) = 0;
+
+ /**
+ * Unsets the expected db version for the given namespace. Used as a pair with
+ * 'setExpectedDbVersion()' to reset state after a local read is attempted.
+ */
+ virtual void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) = 0;
+
+ /**
+ * Checks if this process is on the primary shard for db specified by the given namespace.
+ * Throws an IllegalOperation exception otherwise. Assumes the operation context has a db
+ * version attached to it for db name specified by the namespace.
+ */
+ virtual void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) = 0;
+
virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0;
/**
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 83998c935a1..f1dd0eab9b8 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -216,6 +216,20 @@ public:
MONGO_UNREACHABLE;
}
+ bool setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override {
+ MONGO_UNREACHABLE;
+ }
+
std::unique_ptr<ResourceYielder> getResourceYielder() const override {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index e402d549494..663fa4531bf 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -125,6 +125,21 @@ public:
// Do nothing on a non-shardsvr mongoD.
}
+ bool setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) override {
+ // Do nothing on a non-shardsvr mongoD.
+ return false;
+ }
+
+ void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
+ // Do nothing on a non-shardsvr mongoD.
+ }
+
+ void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override {
+ // Do nothing on a non-shardsvr mongoD.
+ }
+
protected:
// This constructor is marked as protected in order to prevent instantiation since this
// interface is designed to have a concrete process interface for each possible
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 628446554d5..67bd2f2d4f9 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -406,6 +407,29 @@ ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipelin
ownedPipeline, shardTargetingPolicy, std::move(readConcern));
}
+void ShardServerProcessInterface::unsetExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.unsetExpectedDbVersion_Only_For_Aggregation_Local_Reads(nss.db());
+}
+
+bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) {
+ auto& oss = OperationShardingState::get(opCtx);
+
+ if (auto knownDBVersion = oss.getDbVersion(nss.db())) {
+ uassert(ErrorCodes::IllegalOperation,
+ "Expected db version must match known db version",
+ knownDBVersion == dbVersion);
+ } else if (_opIsVersioned) {
+ oss.initializeClientRoutingVersions(nss, boost::none, dbVersion);
+ return true;
+ }
+
+ return false;
+}
+
void ShardServerProcessInterface::setExpectedShardVersion(
OperationContext* opCtx,
const NamespaceString& nss,
@@ -418,6 +442,11 @@ void ShardServerProcessInterface::setExpectedShardVersion(
}
}
+void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db());
+}
+
BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate(
BSONObj cmdObj,
const CachedDatabaseInfo& cachedDbInfo,
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index b223907c792..59309fddcd4 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -139,6 +139,14 @@ public:
const NamespaceString& nss,
boost::optional<ChunkVersion> chunkVersion) final;
+ bool setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) final;
+
+ void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final;
+
+ void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final;
+
private:
// If the current operation is versioned, then we attach the DB version to the command object;
// otherwise, it is returned unmodified. Used when running internal commands, as the parent
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 4a3973ab030..51adcf96f6e 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -267,6 +267,20 @@ public:
// Do nothing.
}
+ void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) override {
+ // Do nothing.
+ }
+
+ bool setExpectedDbVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ DatabaseVersion dbVersion) override {
+ return false;
+ }
+
+ void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) override {
+ // Do nothing.
+ }
+
std::unique_ptr<TemporaryRecordStore> createTemporaryRecordStore(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 21bb1d87f37..288c59814bf 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1296,6 +1296,63 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
pipelineToTarget.release());
}
+
+ auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
+ if (cm.isOK() && !cm.getValue().isSharded()) {
+ // If the collection is unsharded and we are on the primary, we should be able to
+ // do a local read. The primary may be moved right after the primary shard check,
+ // but the local read path will do a db version check before it establishes a cursor
+ // to catch this case and ensure we fail to read locally.
+ auto setDbVersion = false;
+ try {
+ expCtx->mongoProcessInterface->setExpectedShardVersion(
+ expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED());
+ setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion(
+ expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion());
+
+ // During 'attachCursorSourceToPipelineForLocalRead', the expected db version
+ // must be set. Whether or not that call is succesful, to avoid affecting later
+ // operations on this db, we should unset the expected db version on the opCtx,
+ // if we set it above.
+ ScopeGuard dbVersionUnsetter([&]() {
+ if (setDbVersion) {
+ expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx,
+ expCtx->ns);
+ }
+ });
+
+ expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx,
+ expCtx->ns);
+
+ LOGV2_DEBUG(5837600,
+ 3,
+ "Performing local read",
+ "ns"_attr = expCtx->ns,
+ "pipeline"_attr = pipelineToTarget->serializeToBson(),
+ "comment"_attr = expCtx->opCtx->getComment());
+
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ pipelineToTarget.release());
+ } catch (ExceptionFor<ErrorCodes::IllegalOperation>&) {
+ // The current node isn't the primary for or has stale information about this
+ // collection, proceed with shard targeting.
+ } catch (ExceptionFor<ErrorCodes::StaleDbVersion>&) {
+ // The current node has stale information about this collection, proceed with
+ // shard targeting, which has logic to handle refreshing that may be needed.
+ } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>&) {
+ // The current node has stale information about this collection, proceed with
+ // shard targeting, which has logic to handle refreshing that may be needed.
+ } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // The current node may be trying to run a pipeline on a namespace which is an
+ // unresolved view, proceed with shard targeting,
+ }
+
+ // The local read failed. Recreate 'pipelineToTarget' if it was released above.
+ if (!pipelineToTarget) {
+ pipelineToTarget = pipeline->clone();
+ }
+ }
+
return targetShardsAndAddMergeCursors(expCtx,
std::move(pipelineToTarget),
boost::none,
diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp
index e27be5c5477..1f480e61f99 100644
--- a/src/mongo/db/pipeline/sharded_union_test.cpp
+++ b/src/mongo/db/pipeline/sharded_union_test.cpp
@@ -365,7 +365,7 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo
TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceived) {
// Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
auto shards = setupNShards(2);
- loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ auto cm = loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
NamespaceString nsToUnionWith(expCtx()->ns.db(), "view");
// Mock out the view namespace as emtpy for now - this is what it would be when parsing in a
@@ -392,18 +392,44 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv
ASSERT(unionWith->getNext().isEOF());
});
- // Mock out one error response, then expect a refresh of the sharding catalog for that
- // namespace, then mock out a successful response.
+ // Mock the expected config server queries.
+ const OID epoch = OID::gen();
+ const UUID uuid = UUID::gen();
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+
+ const Timestamp timestamp;
+ ChunkVersion version(1, 0, epoch, timestamp);
+
+ ChunkType chunk1(*cm.getUUID(),
+ {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
+ version,
+ {"0"});
+ chunk1.setName(OID::gen());
+ version.incMinor();
+
+ ChunkType chunk2(*cm.getUUID(),
+ {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"1"});
+ chunk2.setName(OID::gen());
+ version.incMinor();
+
+ expectCollectionAndChunksAggregation(
+ kTestAggregateNss, epoch, timestamp, uuid, shardKeyPattern, {chunk1, chunk2});
+
+ // Mock out the sharded view error responses from both shards.
+ std::vector<BSONObj> viewPipeline = {fromjson("{$group: {_id: '$groupKey'}}"),
+ // Prevent the $match from being pushed into the shards
+ // where it would not execute in this mocked environment.
+ fromjson("{$_internalInhibitOptimization: {}}"),
+ fromjson("{$match: {_id: 'unionResult'}}")};
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return createErrorCursorResponse(
+ Status{ResolvedView{expectedBackingNs, viewPipeline, BSONObj()}, "It was a view!"_sd});
+ });
onCommand([&](const executor::RemoteCommandRequest& request) {
return createErrorCursorResponse(
- Status{ResolvedView{expectedBackingNs,
- {fromjson("{$group: {_id: '$groupKey'}}"),
- // Prevent the $match from being pushed into the shards where it
- // would not execute in this mocked environment.
- fromjson("{$_internalInhibitOptimization: {}}"),
- fromjson("{$match: {_id: 'unionResult'}}")},
- BSONObj()},
- "It was a view!"_sd});
+ Status{ResolvedView{expectedBackingNs, viewPipeline, BSONObj()}, "It was a view!"_sd});
});
// That error should be incorporated, then we should target both shards. The results should be