From cce280f98a8badf8aef4ed960e82e61e61d3fe5e Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Thu, 1 Nov 2018 17:35:26 -0400 Subject: SERVER-37871 Enforce agreement on shard key across cluster for $out --- src/mongo/db/pipeline/SConscript | 1 + src/mongo/db/pipeline/document_source_out.cpp | 158 +++++++++++++-------- src/mongo/db/pipeline/document_source_out.h | 43 +++++- src/mongo/db/pipeline/document_source_out.idl | 25 ++-- .../db/pipeline/document_source_out_in_place.h | 2 +- src/mongo/db/pipeline/document_source_out_test.cpp | 31 +++- src/mongo/db/pipeline/mongo_process_common.cpp | 5 +- src/mongo/db/pipeline/mongo_process_common.h | 2 +- src/mongo/db/pipeline/mongo_process_interface.h | 12 +- src/mongo/db/pipeline/mongos_process_interface.h | 6 + .../db/pipeline/process_interface_shardsvr.cpp | 8 ++ src/mongo/db/pipeline/process_interface_shardsvr.h | 4 + .../db/pipeline/process_interface_standalone.h | 6 + .../db/pipeline/stub_mongo_process_interface.h | 11 +- src/mongo/s/catalog_cache.cpp | 32 +++++ src/mongo/s/catalog_cache.h | 9 ++ src/mongo/s/chunk_version.cpp | 6 +- src/mongo/s/chunk_version.h | 13 ++ 18 files changed, 291 insertions(+), 83 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 28f833e2e67..2f1c2e27cca 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -575,6 +575,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/storage/key_string', '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/s/common_s', 'document_value', ], ) diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 9d8e31da131..13538d6f20c 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -177,6 +177,17 @@ BSONObj extractUniqueKeyFromDoc(const Document& doc, const std::set& } return result.freeze().toBson(); } + +void ensureUniqueKeyHasSupportingIndex(const boost::intrusive_ptr& expCtx, + const NamespaceString& outputNs, + const std::set& uniqueKey, + const BSONObj& userSpecifiedUniqueKey) { + uassert( + 50938, + str::stream() << "Cannot find index to verify that $out's unique key will be unique: " + << userSpecifiedUniqueKey, + expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(expCtx, outputNs, uniqueKey)); +} } // namespace DocumentSource::GetNextResult DocumentSourceOut::getNext() { @@ -258,7 +269,7 @@ intrusive_ptr DocumentSourceOut::create( const intrusive_ptr& expCtx, WriteModeEnum mode, std::set uniqueKey, - boost::optional targetEpoch) { + boost::optional targetCollectionVersion) { // TODO (SERVER-36832): Allow this combination. uassert( @@ -297,13 +308,13 @@ intrusive_ptr DocumentSourceOut::create( switch (mode) { case WriteModeEnum::kModeReplaceCollection: return new DocumentSourceOutReplaceColl( - std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); case WriteModeEnum::kModeInsertDocuments: return new DocumentSourceOutInPlace( - std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); case WriteModeEnum::kModeReplaceDocuments: return new DocumentSourceOutInPlaceReplace( - std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); default: MONGO_UNREACHABLE; } @@ -313,11 +324,11 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, const intrusive_ptr& expCtx, WriteModeEnum mode, std::set uniqueKey, - boost::optional targetEpoch) + boost::optional targetCollectionVersion) : DocumentSource(expCtx), _writeConcern(expCtx->opCtx->getWriteConcern()), _outputNs(std::move(outputNs)), - _targetEpoch(targetEpoch), + _targetCollectionVersion(targetCollectionVersion), _done(false), _mode(mode), _uniqueKeyFields(std::move(uniqueKey)), @@ -329,75 +340,104 @@ intrusive_ptr DocumentSourceOut::createFromBson( auto mode = WriteModeEnum::kModeReplaceCollection; std::set uniqueKey; NamespaceString outputNs; - boost::optional targetEpoch; + boost::optional targetCollectionVersion; if (elem.type() == BSONType::String) { outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str()); uniqueKey.emplace("_id"); } else if (elem.type() == BSONType::Object) { auto spec = DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject()); - mode = spec.getMode(); - targetEpoch = spec.getTargetEpoch(); - uassert(50984, - "$out received unexpected 'targetEpoch' on mongos", - !(expCtx->inMongos && bool(targetEpoch))); // Retrieve the target database from the user command, otherwise use the namespace from the // expression context. - if (auto targetDb = spec.getTargetDb()) { - outputNs = NamespaceString(*targetDb, spec.getTargetCollection()); - } else { - outputNs = NamespaceString(expCtx->ns.db(), spec.getTargetCollection()); - } + auto dbName = spec.getTargetDb() ? *spec.getTargetDb() : expCtx->ns.db(); + outputNs = NamespaceString(dbName, spec.getTargetCollection()); - // Convert unique key object to a vector of FieldPaths. - if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) { - uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); - - // Make sure the uniqueKey has a supporting index. Skip this check if the command is - // sent from mongos since the uniqueKey check would've happened already. - uassert(50938, - str::stream() - << "Cannot find index to verify that $out's unique key will be unique: " - << userSpecifiedUniqueKey, - expCtx->fromMongos || - expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex( - expCtx, outputNs, uniqueKey)); - } else { - uassert(51009, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos); - if (expCtx->inMongos && mode != WriteModeEnum::kModeReplaceCollection) { - // In case there are multiple shards which will perform this $out in parallel, we - // need to figure out and attach the collection's epoch to ensure each shard is - // talking about the same version of the collection. This mongos will coordinate - // that. We force a catalog refresh to do so because there is no shard versioning - // protocol on this namespace. We will also figure out and attach the uniqueKey to - // send to the shards. We don't need to do this for 'replaceCollection' mode since - // that mode cannot currently target a sharded collection. - - // There are cases where the aggregation could fail if the collection is dropped or - // re-created during or near the time of the aggregation. This is okay - we are - // mostly paranoid that this mongos is very stale and want to prevent returning an - // error if the collection was dropped a long time ago. Because of this, we are okay - // with piggy-backing off another thread's request to refresh the cache, simply - // waiting for that request to return instead of forcing another refresh. - targetEpoch = expCtx->mongoProcessInterface->refreshAndGetEpoch(expCtx, outputNs); - } - // Even if we're not on mongos, we're still acting as a router here - the targeted - // collection may not be completely on our shard. - auto docKeyPaths = - expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, - outputNs); - uniqueKey = std::set(std::make_move_iterator(docKeyPaths.begin()), - std::make_move_iterator(docKeyPaths.end())); - } + std::tie(uniqueKey, targetCollectionVersion) = expCtx->inMongos + ? resolveUniqueKeyOnMongoS(expCtx, spec, outputNs) + : resolveUniqueKeyOnMongoD(expCtx, spec, outputNs); } else { uasserted(16990, str::stream() << "$out only supports a string or object argument, not " << typeName(elem.type())); } - return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); + return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetCollectionVersion); +} + +std::pair, boost::optional> +DocumentSourceOut::resolveUniqueKeyOnMongoD(const boost::intrusive_ptr& expCtx, + const DocumentSourceOutSpec& spec, + const NamespaceString& outputNs) { + invariant(!expCtx->inMongos); + auto targetCollectionVersion = spec.getTargetCollectionVersion(); + if (targetCollectionVersion) { + uassert(51018, "Unexpected target chunk version specified", expCtx->fromMongos); + // If mongos has sent us a target shard version, we need to be sure we are prepared to + // act as a router which is at least as recent as that mongos. + expCtx->mongoProcessInterface->checkRoutingInfoEpochOrThrow( + expCtx, outputNs, *targetCollectionVersion); + } + + auto userSpecifiedUniqueKey = spec.getUniqueKey(); + if (!userSpecifiedUniqueKey) { + uassert(51017, "Expected uniqueKey to be provided from mongos", !expCtx->fromMongos); + return {std::set{"_id"}, targetCollectionVersion}; + } + + // Make sure the uniqueKey has a supporting index. Skip this check if the command is sent + // from mongos since the uniqueKey check would've happened already. + auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); + if (!expCtx->fromMongos) { + ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey); + } + return {uniqueKey, targetCollectionVersion}; +} + +std::pair, boost::optional> +DocumentSourceOut::resolveUniqueKeyOnMongoS(const boost::intrusive_ptr& expCtx, + const DocumentSourceOutSpec& spec, + const NamespaceString& outputNs) { + invariant(expCtx->inMongos); + uassert(50984, + "$out received unexpected 'targetCollectionVersion' on mongos", + !spec.getTargetCollectionVersion()); + + if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) { + // Convert unique key object to a vector of FieldPaths. + auto uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); + ensureUniqueKeyHasSupportingIndex(expCtx, outputNs, uniqueKey, *userSpecifiedUniqueKey); + + // If the user supplies the uniqueKey we don't need to attach a ChunkVersion for the shards + // since we are not at risk of 'guessing' the wrong shard key. + return {uniqueKey, boost::none}; + } + + // In case there are multiple shards which will perform this $out in parallel, we need to figure + // out and attach the collection's shard version to ensure each shard is talking about the same + // version of the collection. This mongos will coordinate that. We force a catalog refresh to do + // so because there is no shard versioning protocol on this namespace and so we otherwise could + // not be sure this node is (or will be come) at all recent. We will also figure out and attach + // the uniqueKey to send to the shards. We don't need to do this for 'replaceCollection' mode + // since that mode cannot currently target a sharded collection. + + // There are cases where the aggregation could fail if the collection is dropped or re-created + // during or near the time of the aggregation. This is okay - we are mostly paranoid that this + // mongos is very stale and want to prevent returning an error if the collection was dropped a + // long time ago. Because of this, we are okay with piggy-backing off another thread's request + // to refresh the cache, simply waiting for that request to return instead of forcing another + // refresh. + boost::optional targetCollectionVersion = + spec.getMode() == WriteModeEnum::kModeReplaceCollection + ? boost::none + : expCtx->mongoProcessInterface->refreshAndGetCollectionVersion(expCtx, outputNs); + + auto docKeyPaths = expCtx->mongoProcessInterface->collectDocumentKeyFieldsActingAsRouter( + expCtx->opCtx, outputNs); + return {std::set(std::make_move_iterator(docKeyPaths.begin()), + std::make_move_iterator(docKeyPaths.end())), + targetCollectionVersion}; } Value DocumentSourceOut::serialize(boost::optional explain) const { @@ -412,7 +452,7 @@ Value DocumentSourceOut::serialize(boost::optional ex } return uniqueKeyBob.obj(); }()); - spec.setTargetEpoch(_targetEpoch); + spec.setTargetCollectionVersion(_targetCollectionVersion); return Value(Document{{getSourceName(), spec.toBSON()}}); } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 3220c73db6f..906a30f10ee 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -33,6 +33,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_out_gen.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/chunk_version.h" namespace mongo { @@ -94,11 +95,17 @@ public: bool _allowShardedOutNss; }; + /** + * Builds a new $out stage which will spill all documents into 'outputNs' as inserts. If + * 'targetCollectionVersion' is provided then processing will stop with an error if the + * collection's epoch changes during the course of execution. This is used as a mechanism to + * prevent the shard key from changing. + */ DocumentSourceOut(NamespaceString outputNs, const boost::intrusive_ptr& expCtx, WriteModeEnum mode, std::set uniqueKey, - boost::optional targetEpoch); + boost::optional targetCollectionVersion); virtual ~DocumentSourceOut() = default; @@ -195,7 +202,7 @@ public: LocalReadConcernBlock readLocal(pExpCtx->opCtx); pExpCtx->mongoProcessInterface->insert( - pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch); + pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch()); }; /** @@ -211,7 +218,7 @@ public: const boost::intrusive_ptr& expCtx, WriteModeEnum, std::set uniqueKey = std::set{"_id"}, - boost::optional targetEpoch = boost::none); + boost::optional targetCollectionVersion = boost::none); /** * Parses a $out stage from the user-supplied BSON. @@ -228,9 +235,37 @@ protected: WriteConcernOptions _writeConcern; const NamespaceString _outputNs; - boost::optional _targetEpoch; + boost::optional _targetCollectionVersion; + + boost::optional _targetEpoch() { + return _targetCollectionVersion ? boost::optional(_targetCollectionVersion->epoch()) + : boost::none; + } private: + /** + * If 'spec' does not specify a uniqueKey, uses the sharding catalog to pick a default key of + * the shard key + _id. Returns a pair of the uniqueKey (either from the spec or generated), and + * an optional ChunkVersion, populated with the version stored in the sharding catalog when we + * asked for the shard key. + */ + static std::pair, boost::optional> resolveUniqueKeyOnMongoS( + const boost::intrusive_ptr&, + const DocumentSourceOutSpec& spec, + const NamespaceString& outputNs); + + /** + * Ensures that 'spec' contains a uniqueKey which has a supporting index - either because the + * uniqueKey was sent from mongos or because there is a corresponding unique index. Returns the + * target ChunkVersion already attached to 'spec', but verifies that this node's cached routing + * table agrees on the epoch for that version before returning. Throws a StaleConfigException if + * not. + */ + static std::pair, boost::optional> resolveUniqueKeyOnMongoD( + const boost::intrusive_ptr&, + const DocumentSourceOutSpec& spec, + const NamespaceString& outputNs); + bool _initialized = false; bool _done = false; diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl index 006dfe665ab..f82e5796421 100644 --- a/src/mongo/db/pipeline/document_source_out.idl +++ b/src/mongo/db/pipeline/document_source_out.idl @@ -30,6 +30,8 @@ global: cpp_namespace: "mongo" + cpp_includes: + - "mongo/s/chunk_version.h" imports: - "mongo/idl/basic_types.idl" @@ -43,6 +45,14 @@ enums: kModeInsertDocuments: "insertDocuments" kModeReplaceDocuments: "replaceDocuments" +types: + ChunkVersion: + bson_serialization_type: object + description: An object representing a chunk version for a collection. + cpp_type: ChunkVersion + serializer: ChunkVersion::toBSON + deserializer: ChunkVersion::fromBSONThrowing + structs: DocumentSourceOutSpec: description: A document used to specify the $out stage of an aggregation pipeline. @@ -71,12 +81,11 @@ structs: optional: true description: Document of fields representing the unique key. - epoch: - cpp_name: targetEpoch - type: objectid + targetCollectionVersion: + type: ChunkVersion optional: true - description: If set, the epoch found when parsed on mongos. Can be used to check if - a collection has since been dropped and re-created, in which case the - shard key may have changed. As of this writing, this also can be used - to detect if the collection has gone from unsharded to sharded, and - thus now has a shard key. + description: If set, the collection's ChunkVersion found when parsed on mongos. Can + be used to check if a collection has since been dropped and re-created, + in which case the shard key may have changed. As of this writing, this + also can be used to detect if the collection has gone from unsharded to + sharded, and thus now has a shard key. diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index 1dd54ff7e59..b30e6b29931 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -78,7 +78,7 @@ public: _writeConcern, upsert, multi, - _targetEpoch); + _targetEpoch()); } catch (const ExceptionFor& ex) { uassertStatusOKWithContext(ex.toStatus(), "$out failed to update the matching document, did you " diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index 44256d1fb4b..60530547165 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -67,6 +67,12 @@ public: OperationContext* opCtx, const NamespaceString& nss) const override { return {"_id"}; } + + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr& expCtx, + const NamespaceString&, + ChunkVersion) const override { + return; // Pretend it always matches for our tests here. + } }; class DocumentSourceOutTest : public AggregationContextFixture { @@ -359,21 +365,38 @@ TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) { ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); } -TEST_F(DocumentSourceOutTest, FailsToParseIfTargetEpochIsSpecifiedOnMongos) { +TEST_F(DocumentSourceOutTest, FailsToParseIfTargetCollectionVersionIsSpecifiedOnMongos) { BSONObj spec = BSON("$out" << BSON("to" << "test" << "mode" << kDefaultMode << "uniqueKey" << BSON("_id" << 1) - << "epoch" - << OID::gen())); + << "targetCollectionVersion" + << ChunkVersion(0, 0, OID::gen()).toBSON())); getExpCtx()->inMongos = true; ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984); - // Test that 'targetEpoch' is accepted if not in mongos. + // Test that 'targetCollectionVersion' is accepted if _from_ mongos. getExpCtx()->inMongos = false; + getExpCtx()->fromMongos = true; ASSERT(createOutStage(spec) != nullptr); + + // Test that 'targetCollectionVersion' is not accepted if on mongod but not from mongos. + getExpCtx()->inMongos = false; + getExpCtx()->fromMongos = false; + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51018); +} + +TEST_F(DocumentSourceOutTest, FailsToParseifUniqueKeyIsNotSentFromMongos) { + BSONObj spec = BSON("$out" << BSON("to" + << "test" + << "mode" + << kDefaultMode + << "targetCollectionVersion" + << ChunkVersion(0, 0, OID::gen()).toBSON())); + getExpCtx()->fromMongos = true; + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 51017); } TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) { diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp index 3ab1d1a2bfc..4348baf3940 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -149,7 +149,7 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern, return nFieldsMatched == uniqueKeyPaths.size(); } -boost::optional MongoProcessCommon::refreshAndGetEpoch( +boost::optional MongoProcessCommon::refreshAndGetCollectionVersion( const boost::intrusive_ptr& expCtx, const NamespaceString& nss) const { const bool forceRefreshFromThisThread = false; auto routingInfo = uassertStatusOK( @@ -157,7 +157,7 @@ boost::optional MongoProcessCommon::refreshAndGetEpoch( ->catalogCache() ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread)); if (auto chunkManager = routingInfo.cm()) { - return chunkManager->getVersion().epoch(); + return chunkManager->getVersion(); } return boost::none; } @@ -175,4 +175,5 @@ std::vector MongoProcessCommon::_shardKeyToDocumentKeyFields( } return result; } + } // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h index ff2a7a5595a..e1158b50d15 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -63,7 +63,7 @@ public: virtual std::vector collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override; - virtual boost::optional refreshAndGetEpoch( + boost::optional refreshAndGetCollectionVersion( const boost::intrusive_ptr& expCtx, const NamespaceString& nss) const final; diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 947bc5e6972..5f092107c1a 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/storage/backup_cursor_state.h" +#include "mongo/s/chunk_version.h" namespace mongo { @@ -299,9 +300,18 @@ public: * request to be sent to the config servers. If another thread has already requested a refresh, * it will instead wait for that response. */ - virtual boost::optional refreshAndGetEpoch( + virtual boost::optional refreshAndGetCollectionVersion( const boost::intrusive_ptr& expCtx, const NamespaceString& nss) const = 0; + + /** + * Consults the CatalogCache to determine if this node has routing information for the + * collection given by 'nss' which reports the same epoch as given by 'targetCollectionVersion'. + * Major and minor versions in 'targetCollectionVersion' are ignored. + */ + virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 2cd8b80f2be..f2806baf001 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -173,6 +173,12 @@ public: const NamespaceString&, const std::set& uniqueKeyPaths) const final; + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr&, + const NamespaceString&, + ChunkVersion) const final { + MONGO_UNREACHABLE; + } + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index 47bd71bd753..76949f4348d 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -73,6 +73,14 @@ void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOption } // namespace +void MongoInterfaceShardServer::checkRoutingInfoEpochOrThrow( + const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const { + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + return catalogCache->checkEpochOrThrow(nss, targetCollectionVersion); +} + std::pair, bool> MongoInterfaceShardServer::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index fba537a2e1a..22c54a8a6cf 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -43,6 +43,10 @@ class MongoInterfaceShardServer final : public MongoInterfaceStandalone { public: using MongoInterfaceStandalone::MongoInterfaceStandalone; + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const final; + std::pair, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const final; diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 6d36a1fd5f2..12627655cd5 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -120,6 +120,12 @@ public: const NamespaceString& nss, const std::set& uniqueKeyPaths) const final; + virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const override { + uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr"); + } + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index fd733087b18..dda5b238c84 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -191,9 +191,16 @@ public: return true; } - boost::optional refreshAndGetEpoch(const boost::intrusive_ptr& expCtx, - const NamespaceString& nss) const override { + boost::optional refreshAndGetCollectionVersion( + const boost::intrusive_ptr& expCtx, + const NamespaceString& nss) const override { return boost::none; } + + void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr& expCtx, + const NamespaceString&, + ChunkVersion) const override { + uasserted(51019, "Unexpected check of routing table"); + } }; } // namespace mongo diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 14e90ddb2f9..741cec85fd2 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -43,6 +43,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/database_version_helpers.h" #include "mongo/s/grid.h" +#include "mongo/s/stale_exception.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -367,6 +368,37 @@ void CatalogCache::onStaleShardVersion(CachedCollectionRoutingInfo&& ccriToInval } } +void CatalogCache::checkEpochOrThrow(const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const { + stdx::lock_guard lg(_mutex); + const auto itDb = _collectionsByDb.find(nss.db()); + uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none), + str::stream() << "could not act as router for " << nss.ns() + << ", no entry for database " + << nss.db(), + itDb != _collectionsByDb.end()); + + auto itColl = itDb->second.find(nss.ns()); + uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none), + str::stream() << "could not act as router for " << nss.ns() + << ", no entry for collection.", + itColl != itDb->second.end()); + + uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none), + str::stream() << "could not act as router for " << nss.ns() << ", wanted " + << targetCollectionVersion.toString() + << ", but found the collection was unsharded", + itColl->second->routingInfo); + + auto foundVersion = itColl->second->routingInfo->getVersion(); + uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion), + str::stream() << "could not act as router for " << nss.ns() << ", wanted " + << targetCollectionVersion.toString() + << ", but found " + << foundVersion.toString(), + foundVersion.epoch() == targetCollectionVersion.epoch()); +} + void CatalogCache::invalidateDatabaseEntry(const StringData dbName) { stdx::lock_guard lg(_mutex); auto itDbEntry = _databases.find(dbName); diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 24dc8fa2666..c7f53a2ab27 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -206,6 +206,15 @@ public: */ void onStaleShardVersion(CachedCollectionRoutingInfo&&); + /** + * Throws a StaleConfigException if this catalog cache does not have an entry for the given + * namespace, or if the entry for the given namespace does not have the same epoch as + * 'targetCollectionVersion'. Does not perform any refresh logic. Ignores everything except the + * epoch of 'targetCollectionVersion' when performing the check, but needs the entire target + * version to throw a StaleConfigException. + */ + void checkEpochOrThrow(const NamespaceString& nss, ChunkVersion targetCollectionVersion) const; + /** * Non-blocking method, which indiscriminately causes the database entry for the specified * database to be refreshed the next time getDatabase is called. diff --git a/src/mongo/s/chunk_version.cpp b/src/mongo/s/chunk_version.cpp index 7074472584f..0de8f233312 100644 --- a/src/mongo/s/chunk_version.cpp +++ b/src/mongo/s/chunk_version.cpp @@ -49,7 +49,11 @@ StatusWith ChunkVersion::parseWithField(const BSONObj& obj, String str::stream() << "Invalid type " << versionElem.type() << " for shardVersion element. Expected an array"}; - BSONObjIterator it(versionElem.Obj()); + return fromBSON(versionElem.Obj()); +} + +StatusWith ChunkVersion::fromBSON(const BSONObj& obj) { + BSONObjIterator it(obj); if (!it.more()) return {ErrorCodes::BadValue, "Unexpected empty version array"}; diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index 67d9a64d62c..754fa9faff6 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -73,6 +73,19 @@ public: */ static StatusWith parseWithField(const BSONObj& obj, StringData field); + /** + * Parses 'obj', which is expected to have two elements: the timestamp and the object id. The + * field names don't matter, so 'obj' can be a BSONArray. + */ + static StatusWith fromBSON(const BSONObj& obj); + + /** + * A throwing version of 'fromBSON'. + */ + static ChunkVersion fromBSONThrowing(const BSONObj& obj) { + return uassertStatusOK(fromBSON(obj)); + } + /** * NOTE: This format is being phased out. Use parseWithField instead. * -- cgit v1.2.1