summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-11-01 17:35:26 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-11-21 14:56:27 -0500
commitcce280f98a8badf8aef4ed960e82e61e61d3fe5e (patch)
tree4ddfbd74d4acea1135c9ab15ecdd8803abb4ca6d /src/mongo/db/pipeline
parent756f8e070b5fbf6728ecefbd625601f3dd7e75a0 (diff)
downloadmongo-cce280f98a8badf8aef4ed960e82e61e61d3fe5e.tar.gz
SERVER-37871 Enforce agreement on shard key across cluster for $out
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp158
-rw-r--r--src/mongo/db/pipeline/document_source_out.h43
-rw-r--r--src/mongo/db/pipeline/document_source_out.idl25
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h2
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp31
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp5
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h2
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h12
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp8
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h11
14 files changed, 232 insertions, 82 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 28f833e2e67..2f1c2e27cca 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -575,6 +575,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/storage/key_string',
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/s/common_s',
'document_value',
],
)
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 9d8e31da131..13538d6f20c 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -177,6 +177,17 @@ BSONObj extractUniqueKeyFromDoc(const Document& doc, const std::set<FieldPath>&
}
return result.freeze().toBson();
}
+
+void ensureUniqueKeyHasSupportingIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& outputNs,
+ const std::set<FieldPath>& uniqueKey,
+ const BSONObj& userSpecifiedUniqueKey) {
+ uassert(
+ 50938,
+ str::stream() << "Cannot find index to verify that $out's unique key will be unique: "
+ << userSpecifiedUniqueKey,
+ expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(expCtx, outputNs, uniqueKey));
+}
} // namespace
DocumentSource::GetNextResult DocumentSourceOut::getNext() {
@@ -258,7 +269,7 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey,
- boost::optional<OID> targetEpoch) {
+ boost::optional<ChunkVersion> targetCollectionVersion) {
// TODO (SERVER-36832): Allow this combination.
uassert(
@@ -297,13 +308,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
switch (mode) {
case WriteModeEnum::kModeReplaceCollection:
return new DocumentSourceOutReplaceColl(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
case WriteModeEnum::kModeInsertDocuments:
return new DocumentSourceOutInPlace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
case WriteModeEnum::kModeReplaceDocuments:
return new DocumentSourceOutInPlaceReplace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
default:
MONGO_UNREACHABLE;
}
@@ -313,11 +324,11 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey,
- boost::optional<OID> targetEpoch)
+ boost::optional<ChunkVersion> targetCollectionVersion)
: DocumentSource(expCtx),
_writeConcern(expCtx->opCtx->getWriteConcern()),
_outputNs(std::move(outputNs)),
- _targetEpoch(targetEpoch),
+ _targetCollectionVersion(targetCollectionVersion),
_done(false),
_mode(mode),
_uniqueKeyFields(std::move(uniqueKey)),
@@ -329,75 +340,104 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
auto mode = WriteModeEnum::kModeReplaceCollection;
std::set<FieldPath> uniqueKey;
NamespaceString outputNs;
- boost::optional<OID> targetEpoch;
+ boost::optional<ChunkVersion> targetCollectionVersion;
if (elem.type() == BSONType::String) {
outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
uniqueKey.emplace("_id");
} else if (elem.type() == BSONType::Object) {
auto spec =
DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
-
mode = spec.getMode();
- targetEpoch = spec.getTargetEpoch();
- uassert(50984,
- "$out received unexpected 'targetEpoch' on mongos",
- !(expCtx->inMongos && bool(targetEpoch)));
// Retrieve the target database from the user command, otherwise use the namespace from the
// expression context.
- if (auto targetDb = spec.getTargetDb()) {
- outputNs = NamespaceString(*targetDb, spec.getTargetCollection());
- } else {
- outputNs = NamespaceString(expCtx->ns.db(), spec.getTargetCollection());
- }
+ auto dbName = spec.getTargetDb() ? *spec.getTargetDb() : expCtx->ns.db();
+ outputNs = NamespaceString(dbName, spec.getTargetCollection());
- // Convert unique key object to a vector of FieldPaths.
- if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) {
- uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
-
- // Make sure the uniqueKey has a supporting index. Skip this check if the command is
- // sent from mongos since the uniqueKey check would've happened already.
- uassert(50938,
- str::stream()
- << "Cannot find index to verify that $out's unique key will be unique: "
- << userSpecifiedUniqueKey,
- expCtx->fromMongos ||
- expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(
- expCtx, outputNs, uniqueKey));
- } else {
- uassert(51009, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos);
- if (expCtx->inMongos && mode != WriteModeEnum::kModeReplaceCollection) {
- // In case there are multiple shards which will perform this $out in parallel, we
- // need to figure out and attach the collection's epoch to ensure each shard is
- // talking about the same version of the collection. This mongos will coordinate
- // that. We force a catalog refresh to do so because there is no shard versioning
- // protocol on this namespace. We will also figure out and attach the uniqueKey to
- // send to the shards. We don't need to do this for 'replaceCollection' mode since
- // that mode cannot currently target a sharded collection.
-
- // There are cases where the aggregation could fail if the collection is dropped or
- // re-created during or near the time of the aggregation. This is okay - we are
- // mostly paranoid that this mongos is very stale and want to prevent returning an
- // error if the collection was dropped a long time ago. Because of this, we are okay
- // with piggy-backing off another thread's request to refresh the cache, simply
- // waiting for that request to return instead of forcing another refresh.
- targetEpoch = expCtx->mongoProcessInterface->refreshAndGetEpoch(expCtx, outputNs);
- }
- // Even if we're not on mongos, we're still acting as a router here - the targeted
- // collection may not be completely on our shard.
- auto docKeyPaths =
- expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx,
- outputNs);
- uniqueKey = std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
- std::make_move_iterator(docKeyPaths.end()));
- }
+ std::tie(uniqueKey, targetCollectionVersion) = expCtx->inMongos
+ ? resolveUniqueKeyOnMongoS(expCtx, spec, outputNs)
+ : resolveUniqueKeyOnMongoD(expCtx, spec, outputNs);
} else {
uasserted(16990,
str::stream() << "$out only supports a string or object argument, not "
<< typeName(elem.type()));
}
- return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
+ return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion);
+}
+
+std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+DocumentSourceOut::resolveUniqueKeyOnMongoD(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceOutSpec& spec,
+ const NamespaceString& outputNs) {
+ invariant(!expCtx->inMongos);
+ auto targetCollectionVersion = spec.getTargetCollectionVersion();
+ if (targetCollectionVersion) {
+ uassert(51018, "Unexpected target chunk version specified", expCtx->fromMongos);
+ // If mongos has sent us a target shard version, we need to be sure we are prepared to
+ // act as a router which is at least as recent as that mongos.
+ expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow(
+ expCtx, outputNs, *targetCollectionVersion);
+ }
+
+ auto userSpecifiedUniqueKey = spec.getUniqueKey();
+ if (!userSpecifiedUniqueKey) {
+ uassert(51017, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos);
+ return {std::set<FieldPath>{"_id"}, targetCollectionVersion};
+ }
+
+ // Make sure the uniqueKey has a supporting index. Skip this check if the command is sent
+ // from mongos since the uniqueKey check would've happened already.
+ auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
+ if (!expCtx->fromMongos) {
+ ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey);
+ }
+ return {uniqueKey, targetCollectionVersion};
+}
+
+std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>>
+DocumentSourceOut::resolveUniqueKeyOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceOutSpec& spec,
+ const NamespaceString& outputNs) {
+ invariant(expCtx->inMongos);
+ uassert(50984,
+ "$out received unexpected 'targetCollectionVersion' on mongos",
+ !spec.getTargetCollectionVersion());
+
+ if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) {
+ // Convert unique key object to a vector of FieldPaths.
+ auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
+ ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey);
+
+ // If the user supplies the uniqueKey we don't need to attach a ChunkVersion for the shards
+ // since we are not at risk of 'guessing' the wrong shard key.
+ return {uniqueKey, boost::none};
+ }
+
+ // In case there are multiple shards which will perform this $out in parallel, we need to figure
+ // out and attach the collection's shard version to ensure each shard is talking about the same
+ // version of the collection. This mongos will coordinate that. We force a catalog refresh to do
+ // so because there is no shard versioning protocol on this namespace and so we otherwise could
+ // not be sure this node is (or will be come) at all recent. We will also figure out and attach
+ // the uniqueKey to send to the shards. We don't need to do this for 'replaceCollection' mode
+ // since that mode cannot currently target a sharded collection.
+
+ // There are cases where the aggregation could fail if the collection is dropped or re-created
+ // during or near the time of the aggregation. This is okay - we are mostly paranoid that this
+ // mongos is very stale and want to prevent returning an error if the collection was dropped a
+ // long time ago. Because of this, we are okay with piggy-backing off another thread's request
+ // to refresh the cache, simply waiting for that request to return instead of forcing another
+ // refresh.
+ boost::optional<ChunkVersion> targetCollectionVersion =
+ spec.getMode() == WriteModeEnum::kModeReplaceCollection
+ ? boost::none
+ : expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs);
+
+ auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter(
+ expCtx->opCtx, outputNs);
+ return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
+ std::make_move_iterator(docKeyPaths.end())),
+ targetCollectionVersion};
}
Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
@@ -412,7 +452,7 @@ Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> ex
}
return uniqueKeyBob.obj();
}());
- spec.setTargetEpoch(_targetEpoch);
+ spec.setTargetCollectionVersion(_targetCollectionVersion);
return Value(Document{{getSourceName(), spec.toBSON()}});
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 3220c73db6f..906a30f10ee 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -33,6 +33,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -94,11 +95,17 @@ public:
bool _allowShardedOutNss;
};
+ /**
+ * Builds a new $out stage which will spill all documents into 'outputNs' as inserts. If
+ * 'targetCollectionVersion' is provided then processing will stop with an error if the
+ * collection's epoch changes during the course of execution. This is used as a mechanism to
+ * prevent the shard key from changing.
+ */
DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey,
- boost::optional<OID> targetEpoch);
+ boost::optional<ChunkVersion> targetCollectionVersion);
virtual ~DocumentSourceOut() = default;
@@ -195,7 +202,7 @@ public:
LocalReadConcernBlock readLocal(pExpCtx->opCtx);
pExpCtx->mongoProcessInterface->insert(
- pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch);
+ pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch());
};
/**
@@ -211,7 +218,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum,
std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"},
- boost::optional<OID> targetEpoch = boost::none);
+ boost::optional<ChunkVersion> targetCollectionVersion = boost::none);
/**
* Parses a $out stage from the user-supplied BSON.
@@ -228,9 +235,37 @@ protected:
WriteConcernOptions _writeConcern;
const NamespaceString _outputNs;
- boost::optional<OID> _targetEpoch;
+ boost::optional<ChunkVersion> _targetCollectionVersion;
+
+ boost::optional<OID> _targetEpoch() {
+ return _targetCollectionVersion ? boost::optional<OID>(_targetCollectionVersion->epoch())
+ : boost::none;
+ }
private:
+ /**
+ * If 'spec' does not specify a uniqueKey, uses the sharding catalog to pick a default key of
+ * the shard key + _id. Returns a pair of the uniqueKey (either from the spec or generated), and
+ * an optional ChunkVersion, populated with the version stored in the sharding catalog when we
+ * asked for the shard key.
+ */
+ static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoS(
+ const boost::intrusive_ptr<ExpressionContext>&,
+ const DocumentSourceOutSpec& spec,
+ const NamespaceString& outputNs);
+
+ /**
+ * Ensures that 'spec' contains a uniqueKey which has a supporting index - either because the
+ * uniqueKey was sent from mongos or because there is a corresponding unique index. Returns the
+ * target ChunkVersion already attached to 'spec', but verifies that this node's cached routing
+ * table agrees on the epoch for that version before returning. Throws a StaleConfigException if
+ * not.
+ */
+ static std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> resolveUniqueKeyOnMongoD(
+ const boost::intrusive_ptr<ExpressionContext>&,
+ const DocumentSourceOutSpec& spec,
+ const NamespaceString& outputNs);
+
bool _initialized = false;
bool _done = false;
diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl
index 006dfe665ab..f82e5796421 100644
--- a/src/mongo/db/pipeline/document_source_out.idl
+++ b/src/mongo/db/pipeline/document_source_out.idl
@@ -30,6 +30,8 @@
global:
cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/chunk_version.h"
imports:
- "mongo/idl/basic_types.idl"
@@ -43,6 +45,14 @@ enums:
kModeInsertDocuments: "insertDocuments"
kModeReplaceDocuments: "replaceDocuments"
+types:
+ ChunkVersion:
+ bson_serialization_type: object
+ description: An object representing a chunk version for a collection.
+ cpp_type: ChunkVersion
+ serializer: ChunkVersion::toBSON
+ deserializer: ChunkVersion::fromBSONThrowing
+
structs:
DocumentSourceOutSpec:
description: A document used to specify the $out stage of an aggregation pipeline.
@@ -71,12 +81,11 @@ structs:
optional: true
description: Document of fields representing the unique key.
- epoch:
- cpp_name: targetEpoch
- type: objectid
+ targetCollectionVersion:
+ type: ChunkVersion
optional: true
- description: If set, the epoch found when parsed on mongos. Can be used to check if
- a collection has since been dropped and re-created, in which case the
- shard key may have changed. As of this writing, this also can be used
- to detect if the collection has gone from unsharded to sharded, and
- thus now has a shard key.
+ description: If set, the collection's ChunkVersion found when parsed on mongos. Can
+ be used to check if a collection has since been dropped and re-created,
+ in which case the shard key may have changed. As of this writing, this
+ also can be used to detect if the collection has gone from unsharded to
+ sharded, and thus now has a shard key.
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 1dd54ff7e59..b30e6b29931 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -78,7 +78,7 @@ public:
_writeConcern,
upsert,
multi,
- _targetEpoch);
+ _targetEpoch());
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$out failed to update the matching document, did you "
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index 44256d1fb4b..60530547165 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -67,6 +67,12 @@ public:
OperationContext* opCtx, const NamespaceString& nss) const override {
return {"_id"};
}
+
+ void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString&,
+ ChunkVersion) const override {
+ return; // Pretend it always matches for our tests here.
+ }
};
class DocumentSourceOutTest : public AggregationContextFixture {
@@ -359,21 +365,38 @@ TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) {
ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
}
-TEST_F(DocumentSourceOutTest, FailsToParseIfTargetEpochIsSpecifiedOnMongos) {
+TEST_F(DocumentSourceOutTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) {
BSONObj spec = BSON("$out" << BSON("to"
<< "test"
<< "mode"
<< kDefaultMode
<< "uniqueKey"
<< BSON("_id" << 1)
- << "epoch"
- << OID::gen()));
+ << "targetCollectionVersion"
+ << ChunkVersion(0, 0, OID::gen()).toBSON()));
getExpCtx()->inMongos = true;
ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984);
- // Test that 'targetEpoch' is accepted if not in mongos.
+ // Test that 'targetCollectionVersion' is accepted if _from_ mongos.
getExpCtx()->inMongos = false;
+ getExpCtx()->fromMongos = true;
ASSERT(createOutStage(spec) != nullptr);
+
+ // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos.
+ getExpCtx()->inMongos = false;
+ getExpCtx()->fromMongos = false;
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51018);
+}
+
+TEST_F(DocumentSourceOutTest, FailsToParseifUniqueKeyIsNotSentFromMongos) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "test"
+ << "mode"
+ << kDefaultMode
+ << "targetCollectionVersion"
+ << ChunkVersion(0, 0, OID::gen()).toBSON()));
+ getExpCtx()->fromMongos = true;
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51017);
}
TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
index 3ab1d1a2bfc..4348baf3940 100644
--- a/src/mongo/db/pipeline/mongo_process_common.cpp
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -149,7 +149,7 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern,
return nFieldsMatched == uniqueKeyPaths.size();
}
-boost::optional<OID> MongoProcessCommon::refreshAndGetEpoch(
+boost::optional<ChunkVersion> MongoProcessCommon::refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const {
const bool forceRefreshFromThisThread = false;
auto routingInfo = uassertStatusOK(
@@ -157,7 +157,7 @@ boost::optional<OID> MongoProcessCommon::refreshAndGetEpoch(
->catalogCache()
->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread));
if (auto chunkManager = routingInfo.cm()) {
- return chunkManager->getVersion().epoch();
+ return chunkManager->getVersion();
}
return boost::none;
}
@@ -175,4 +175,5 @@ std::vector<FieldPath> MongoProcessCommon::_shardKeyToDocumentKeyFields(
}
return result;
}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index ff2a7a5595a..e1158b50d15 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -63,7 +63,7 @@ public:
virtual std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override;
- virtual boost::optional<OID> refreshAndGetEpoch(
+ boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss) const final;
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 947bc5e6972..5f092107c1a 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/storage/backup_cursor_state.h"
+#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -299,9 +300,18 @@ public:
* request to be sent to the config servers. If another thread has already requested a refresh,
* it will instead wait for that response.
*/
- virtual boost::optional<OID> refreshAndGetEpoch(
+ virtual boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss) const = 0;
+
+ /**
+ * Consults the CatalogCache to determine if this node has routing information for the
+ * collection given by 'nss' which reports the same epoch as given by 'targetCollectionVersion'.
+ * Major and minor versions in 'targetCollectionVersion' are ignored.
+ */
+ virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ ChunkVersion targetCollectionVersion) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 2cd8b80f2be..f2806baf001 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -173,6 +173,12 @@ public:
const NamespaceString&,
const std::set<FieldPath>& uniqueKeyPaths) const final;
+ void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>&,
+ const NamespaceString&,
+ ChunkVersion) const final {
+ MONGO_UNREACHABLE;
+ }
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index 47bd71bd753..76949f4348d 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -73,6 +73,14 @@ void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOption
} // namespace
+void MongoInterfaceShardServer::checkRoutingInfoEpochOrThrow(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ ChunkVersion targetCollectionVersion) const {
+ auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ return catalogCache->checkEpochOrThrow(nss, targetCollectionVersion);
+}
+
std::pair<std::vector<FieldPath>, bool>
MongoInterfaceShardServer::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index fba537a2e1a..22c54a8a6cf 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -43,6 +43,10 @@ class MongoInterfaceShardServer final : public MongoInterfaceStandalone {
public:
using MongoInterfaceStandalone::MongoInterfaceStandalone;
+ void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ ChunkVersion targetCollectionVersion) const final;
+
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const final;
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 6d36a1fd5f2..12627655cd5 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -120,6 +120,12 @@ public:
const NamespaceString& nss,
const std::set<FieldPath>& uniqueKeyPaths) const final;
+ virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ ChunkVersion targetCollectionVersion) const override {
+ uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr");
+ }
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index fd733087b18..dda5b238c84 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -191,9 +191,16 @@ public:
return true;
}
- boost::optional<OID> refreshAndGetEpoch(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss) const override {
+ boost::optional<ChunkVersion> refreshAndGetCollectionVersion(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const override {
return boost::none;
}
+
+ void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString&,
+ ChunkVersion) const override {
+ uasserted(51019, "Unexpected check of routing table");
+ }
};
} // namespace mongo