diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-03-01 14:05:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-01 14:48:56 +0000 |
commit | 21459dd651094d3771bd74e3da5f9ae679832f77 (patch) | |
tree | 05f3be2d589670344ae808a7ae498bc92a6182c2 /src/mongo/s | |
parent | 76b2bc3d9ebcb35796a0cb8eae666a5692056bac (diff) | |
download | mongo-21459dd651094d3771bd74e3da5f9ae679832f77.tar.gz |
SERVER-64035 Ensure all UNSHARDED code paths include DBVersion
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp | 134 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 170 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.h | 45 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_cmd.cpp | 12 |
5 files changed, 76 insertions, 286 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 22dab3091e2..23285dd6fa6 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -598,7 +598,6 @@ env.CppUnitTest( source=[ 'append_raw_responses_test.cpp', 'balancer_configuration_test.cpp', - 'build_versioned_requests_for_targeted_shards_test.cpp', 'catalog_cache_refresh_test.cpp', 'catalog_cache_test.cpp', 'catalog/sharding_catalog_client_test.cpp', diff --git a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp b/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp deleted file mode 100644 index c3f1ff02d4d..00000000000 --- a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include "mongo/unittest/unittest.h" - -#include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog_cache_test_fixture.h" -#include "mongo/s/cluster_commands_helpers.h" -#include "mongo/s/database_version.h" - -namespace mongo { -namespace { - -const NamespaceString kNss("TestDB", "TestColl"); - -} // namespace - -class BuildVersionedRequestsForTargetedShardsTest : public CatalogCacheTestFixture { -protected: - /** - * Runs 'buildVersionedRequestsForTargetedShards' and asserts that the returned vector matches - * the expected vector. - */ - void runBuildVersionedRequestsExpect( - const ChunkManager& cm, - const std::set<ShardId>& shardsToSkip, - const BSONObj& cmdObj, - const BSONObj& query, - const BSONObj& collation, - const std::vector<AsyncRequestsSender::Request>& expectedRequests) { - - const auto actualRequests = buildVersionedRequestsForTargetedShards( - operationContext(), kNss, cm, shardsToSkip, cmdObj, query, collation); - - ASSERT_EQ(expectedRequests.size(), actualRequests.size()); - _assertShardIdsMatch(expectedRequests, actualRequests); - } - - void setUp() override { - CatalogCacheTestFixture::setUp(); - - _shards = setupNShards(2); - } - - void expectGetDatabaseUnsharded() { - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { - DatabaseType db( - kNss.db().toString(), {"0"}, false, DatabaseVersion(UUID::gen(), Timestamp(1, 1))); - return std::vector<BSONObj>{db.toBSON()}; - }()); - } - - void expectGetCollectionUnsharded() { - expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { return std::vector<BSONObj>{}; }()); - } - - std::vector<ShardType> _shards; - -private: - static void _assertShardIdsMatch( - const std::vector<AsyncRequestsSender::Request>& expectedShardIdsFromRequest, - const std::vector<AsyncRequestsSender::Request>& actualShardIdsFromRequest) { - BSONArrayBuilder expectedBuilder; - for (const auto& [shardId, cmdObj] : expectedShardIdsFromRequest) { - expectedBuilder << shardId; - } - - BSONArrayBuilder actualBuilder; - for (const auto& [shardId, cmdObj] : actualShardIdsFromRequest) { - actualBuilder << shardId; - } - - ASSERT_BSONOBJ_EQ(expectedBuilder.arr(), actualBuilder.arr()); - } -}; - -// -// Database is not sharded -// - -TEST_F(BuildVersionedRequestsForTargetedShardsTest, ReturnPrimaryShardForUnshardedDatabase) { - auto future = scheduleRoutingInfoUnforcedRefresh(kNss); - - expectGetDatabaseUnsharded(); - expectGetCollectionUnsharded(); - - auto cm = future.default_timed_get(); - - AsyncRequestsSender::Request expectedRequest{ShardId(_shards[0].getName()), {}}; - runBuildVersionedRequestsExpect(*cm, {}, {}, {}, {}, {expectedRequest}); -} - -TEST_F(BuildVersionedRequestsForTargetedShardsTest, - ReturnNothingForUnshardedDatabaseIfPrimaryShardIsSkipped) { - auto future = scheduleRoutingInfoUnforcedRefresh(kNss); - - expectGetDatabaseUnsharded(); - expectGetCollectionUnsharded(); - - auto cm = future.default_timed_get(); - - runBuildVersionedRequestsExpect(*cm, {ShardId(_shards[0].getName())}, {}, {}, {}, {}); -} - -} // namespace mongo diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 7a2ac3adca7..22a9238fbd2 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -123,25 +123,63 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTarg namespace { /** - * Constructs a requests vector targeting each of the specified shard ids. Each request contains the - * same cmdObj combined with the default sharding parameters. + * Consults the routing info to build requests for: + * 1. If sharded, shards that own chunks for the namespace, or + * 2. If unsharded, the primary shard for the database. + * + * If a shard is included in shardsToSkip, it will be excluded from the list returned to the + * caller. */ -std::vector<AsyncRequestsSender::Request> buildUnshardedRequestsForAllShards( - OperationContext* opCtx, std::vector<ShardId> shardIds, const BSONObj& cmdObj) { +std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkManager& cm, + const std::set<ShardId>& shardsToSkip, + const BSONObj& cmdObj, + const BSONObj& query, + const BSONObj& collation) { + auto cmdToSend = cmdObj; - appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()); + + if (!cm.isSharded()) { + // The collection is unsharded. Target only the primary shard for the database. + + auto primaryShardId = cm.dbPrimary(); + + if (shardsToSkip.find(primaryShardId) != shardsToSkip.end()) { + return {}; + } + + // Attach shardVersion "UNSHARDED", unless targeting the config server. + const auto cmdObjWithShardVersion = (primaryShardId != ShardId::kConfigServerId) + ? appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()) + : cmdToSend; + + return std::vector<AsyncRequestsSender::Request>{AsyncRequestsSender::Request( + std::move(primaryShardId), + appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion()))}; + } std::vector<AsyncRequestsSender::Request> requests; - for (auto&& shardId : shardIds) - requests.emplace_back(std::move(shardId), cmdToSend); - return requests; -} + // The collection is sharded. Target all shards that own chunks that match the query. + std::set<ShardId> shardIds; + std::unique_ptr<CollatorInterface> collator; + if (!collation.isEmpty()) { + collator = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); + } -std::vector<AsyncRequestsSender::Request> buildUnversionedRequestsForAllShards( - OperationContext* opCtx, const BSONObj& cmdObj) { - auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(); - return buildUnshardedRequestsForAllShards(opCtx, std::move(shardIds), cmdObj); + auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss); + cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); + + for (const ShardId& shardId : shardIds) { + if (shardsToSkip.find(shardId) == shardsToSkip.end()) { + requests.emplace_back(shardId, appendShardVersion(cmdToSend, cm.getVersion(shardId))); + } + } + + return requests; } std::vector<AsyncRequestsSender::Response> gatherResponsesImpl( @@ -214,6 +252,7 @@ std::vector<AsyncRequestsSender::Response> gatherResponsesImpl( return responses; } + } // namespace std::vector<AsyncRequestsSender::Response> gatherResponses( @@ -340,79 +379,17 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, cmdObj); } -BSONObj stripWriteConcern(const BSONObj& cmdObj) { - BSONObjBuilder output; - for (const auto& elem : cmdObj) { - const auto name = elem.fieldNameStringData(); - if (name == WriteConcernOptions::kWriteConcernField) { - continue; - } - output.append(elem); - } - return output.obj(); -} - -std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( - OperationContext* opCtx, - const NamespaceString& nss, - const ChunkManager& cm, - const std::set<ShardId>& shardsToSkip, - const BSONObj& cmdObj, - const BSONObj& query, - const BSONObj& collation) { - - auto cmdToSend = cmdObj; - - if (!cm.isSharded()) { - // The collection is unsharded. Target only the primary shard for the database. - - const auto primaryShardId = cm.dbPrimary(); - - if (shardsToSkip.find(primaryShardId) != shardsToSkip.end()) { - return {}; - } - - // Attach shardVersion "UNSHARDED", unless targeting the config server. - const auto cmdObjWithShardVersion = (primaryShardId != ShardId::kConfigServerId) - ? appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()) - : cmdToSend; - - return buildUnshardedRequestsForAllShards( - opCtx, - {primaryShardId}, - appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())); - } - - std::vector<AsyncRequestsSender::Request> requests; - - // The collection is sharded. Target all shards that own chunks that match the query. - std::set<ShardId> shardIds; - std::unique_ptr<CollatorInterface> collator; - if (!collation.isEmpty()) { - collator = uassertStatusOK( - CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); - } - - auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss); - cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); - - for (const ShardId& shardId : shardIds) { - if (shardsToSkip.find(shardId) == shardsToSkip.end()) { - requests.emplace_back(shardId, appendShardVersion(cmdToSend, cm.getVersion(shardId))); - } - } - - return requests; -} - std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetAllShards( OperationContext* opCtx, StringData dbName, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy) { - return gatherResponses( - opCtx, dbName, readPref, retryPolicy, buildUnversionedRequestsForAllShards(opCtx, cmdObj)); + std::vector<AsyncRequestsSender::Request> requests; + for (auto shardId : Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload()) + requests.emplace_back(std::move(shardId), cmdObj); + + return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests); } std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTable( @@ -450,30 +427,6 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( opCtx, dbName, readPref, retryPolicy, requests); } -std::vector<AsyncRequestsSender::Response> scatterGatherOnlyVersionIfUnsharded( - OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy, - const std::set<ErrorCodes::Error>& ignorableErrors) { - auto cm = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - - std::vector<AsyncRequestsSender::Request> requests; - if (cm.isSharded()) { - // An unversioned request on a sharded collection can cause a shard that has not owned data - // for the collection yet to implicitly create the collection without all the collection - // options. So, we signal to shards that they should not implicitly create the collection. - requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj); - } else { - requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, BSONObj(), BSONObj()); - } - - return gatherResponses(opCtx, nss.db(), readPref, retryPolicy, requests); -} - AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( OperationContext* opCtx, StringData dbName, @@ -481,13 +434,18 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy) { + // Attach shardVersion "UNSHARDED", unless targeting the config server. + const auto cmdObjWithShardVersion = (dbInfo.primaryId() != ShardId::kConfigServerId) + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj; + auto responses = gatherResponses(opCtx, dbName, readPref, retryPolicy, - buildUnshardedRequestsForAllShards( - opCtx, {dbInfo.primaryId()}, appendDbVersionIfPresent(cmdObj, dbInfo))); + std::vector<AsyncRequestsSender::Request>{AsyncRequestsSender::Request( + dbInfo.primaryId(), appendDbVersionIfPresent(cmdObj, dbInfo))}); return std::move(responses.front()); } diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 46cc2be72f5..1a9428c5547 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -83,23 +83,6 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTarg const boost::optional<LegacyRuntimeConstants>& runtimeConstants); /** - * Consults the routing info to build requests for: - * 1. If sharded, shards that own chunks for the namespace, or - * 2. If unsharded, the primary shard for the database. - * - * If a shard is included in shardsToSkip, it will be excluded from the list returned to the - * caller. - */ -std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( - OperationContext* opCtx, - const NamespaceString& nss, - const ChunkManager& cm, - const std::set<ShardId>& shardsToSkip, - const BSONObj& cmdObj, - const BSONObj& query, - const BSONObj& collation); - -/** * Dispatches all the specified requests in parallel and waits until all complete, returning a * vector of the same size and positions as that of 'requests'. * @@ -160,11 +143,6 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, const BSONObj& cmdObj); /** - * Returns a copy of 'cmdObj' with the writeConcern removed. - */ -BSONObj stripWriteConcern(const BSONObj& cmdObj); - -/** * Utility for dispatching unversioned commands to all shards in a cluster. * * Returns a non-OK status if a failure occurs on *this* node during execution. Otherwise, returns @@ -189,8 +167,6 @@ std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetAllShar * target by applying the passed-in query and collation to the local routing table cache. * * Does not retry on StaleConfigException. - * - * Return value is the same as scatterGatherUnversionedTargetAllShards(). */ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTable( OperationContext* opCtx, @@ -211,8 +187,6 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting * Callers can specify shards to skip, even if these shards would be otherwise targeted. * * Allows StaleConfigException errors to append to the response list. - * - * Return value is the same as scatterGatherUnversionedTargetAllShards(). */ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( @@ -228,25 +202,6 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( const BSONObj& collation); /** - * Utility for dispatching commands on a namespace, but with special hybrid versioning: - * - If the namespace is unsharded, a version is attached (so this node can find out if its routing - * table was stale, and the namespace is actually sharded), and only the primary shard is targeted. - * - If the namespace is sharded, no version is attached, and the request is broadcast to all - * shards. - * - * Does not retry on StaleConfigException. - * - * Return value is the same as scatterGatherUnversionedTargetAllShards(). - */ -std::vector<AsyncRequestsSender::Response> scatterGatherOnlyVersionIfUnsharded( - OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy, - const std::set<ErrorCodes::Error>& ignorableErrors = {}); - -/** * Utility for dispatching commands against the primary of a database and attaching the appropriate * database version. Also attaches UNSHARDED to the command. Does not retry on stale version. */ diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 2a6e4c6ef13..66a7b120b47 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -81,6 +81,18 @@ BSONObj appendLegacyRuntimeConstantsToCommandObject(OperationContext* opCtx, return origCmdObj.addField(rtcBSON.getField(kLegacyRuntimeConstantsField)); } +BSONObj stripWriteConcern(const BSONObj& cmdObj) { + BSONObjBuilder output; + for (const auto& elem : cmdObj) { + const auto name = elem.fieldNameStringData(); + if (name == WriteConcernOptions::kWriteConcernField) { + continue; + } + output.append(elem); + } + return output.obj(); +} + BSONObj getCollation(const BSONObj& cmdObj) { BSONElement collationElement; auto status = bsonExtractTypedField(cmdObj, "collation", BSONType::Object, &collationElement); |