From abe5428751586d14241f94f06261c7037690557f Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Tue, 15 Mar 2022 08:38:01 +0000 Subject: SERVER-63759 First implementation of the Router API --- .../db/pipeline/dispatch_shard_pipeline_test.cpp | 22 ++- .../shardsvr_process_interface.cpp | 53 +++---- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 38 +++-- src/mongo/s/SConscript | 3 +- src/mongo/s/query/SConscript | 14 +- src/mongo/s/router.cpp | 165 +++++++++++++++++++++ src/mongo/s/router.h | 125 ++++++++++++++++ 7 files changed, 353 insertions(+), 67 deletions(-) create mode 100644 src/mongo/s/router.cpp create mode 100644 src/mongo/s/router.h diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index 8c9528e0e09..ebf3d147c8f 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -27,12 +27,10 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/s/query/sharded_agg_test_fixture.h" -#include "mongo/s/stale_shard_version_helpers.h" +#include "mongo/s/router.h" namespace mongo { namespace { @@ -196,15 +194,13 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { const bool hasChangeStream = false; auto future = launchAsync([&] { // Shouldn't throw. - auto results = - shardVersionRetry(operationContext(), - Grid::get(getServiceContext())->catalogCache(), - kTestAggregateNss, - "dispatch shard pipeline"_sd, - [&]() { - return sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, pipeline->clone()); - }); + sharding::router::CollectionRouter router(getServiceContext(), kTestAggregateNss); + auto results = router.route(operationContext(), + "dispatch shard pipeline"_sd, + [&](OperationContext* opCtx, const ChunkManager& cm) { + return sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, pipeline->clone()); + }); ASSERT_EQ(results.remoteCursors.size(), 1UL); ASSERT(!bool(results.splitPipeline)); }); @@ -213,7 +209,7 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { return createErrorCursorResponse( - Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); + {ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 2d51dc92697..204806d839a 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -50,8 +50,8 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_write.h" -#include "mongo/s/grid.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/router.h" #include "mongo/s/stale_shard_version_helpers.h" namespace mongo { @@ -311,37 +311,40 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx, void ShardServerProcessInterface::createIndexesOnEmptyCollection( OperationContext* opCtx, const NamespaceString& ns, const std::vector& indexSpecs) { - auto cachedDbInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db())); - BSONObjBuilder newCmdBuilder; - newCmdBuilder.append("createIndexes", ns.coll()); - newCmdBuilder.append("indexes", indexSpecs); - newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, - opCtx->getWriteConcern().toBSON()); - auto cmdObj = newCmdBuilder.done(); - - shardVersionRetry( + sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), ns.db()); + router.route( opCtx, - Grid::get(opCtx)->catalogCache(), - ns, "copying index for empty collection {}"_format(ns.ns()), - [&] { - auto response = executeCommandAgainstDatabasePrimary( - opCtx, - ns.db(), - std::move(cachedDbInfo), - cmdObj, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent); + [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("createIndexes", ns.coll()); + cmdBuilder.append("indexes", indexSpecs); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, + opCtx->getWriteConcern().toBSON()); + sharding::router::DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand( + cdb->getPrimary(), cdb->getVersion(), &cmdBuilder); + + auto cmdObj = cmdBuilder.obj(); + + auto response = std::move( + gatherResponses(opCtx, + ns.db(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Shard::RetryPolicy::kIdempotent, + std::vector{ + AsyncRequestsSender::Request(cdb->getPrimary(), cmdObj)}) + .front()); uassertStatusOKWithContext(response.swResponse, - str::stream() << "failed to run command " << cmdObj); - auto result = response.swResponse.getValue().data; + str::stream() << "command was not sent " << cmdObj); + const auto& result = response.swResponse.getValue().data; uassertStatusOKWithContext(getStatusFromCommandResult(result), - str::stream() << "failed while running command " << cmdObj); + str::stream() << "command was sent but failed " << cmdObj); uassertStatusOKWithContext( getWriteConcernStatusFromCommandResult(result), - str::stream() << "write concern failed while running command " << cmdObj); + str::stream() + << "command was sent and succeeded, but failed waiting for write concern " + << cmdObj); }); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9c5ea37cbcb..3eaeb1666c8 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -26,11 +26,12 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" -#include "sharded_agg_helpers.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -58,11 +59,13 @@ #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/s/router.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" #include "mongo/util/visit_helper.h" -namespace mongo::sharded_agg_helpers { +namespace mongo { +namespace sharded_agg_helpers { MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors); @@ -613,7 +616,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { } } - } // namespace std::unique_ptr targetShardsAndAddMergeCursors( @@ -1349,19 +1351,22 @@ std::unique_ptr attachCursorToPipeline( ns == NamespaceString::kChangeStreamPreImagesNamespace); }; - auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); - return shardVersionRetry( - expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { - auto pipelineToTarget = pipeline->clone(); + if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || + shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) { + auto pipelineToTarget = pipeline->clone(); - if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || - shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) { - return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( - pipelineToTarget.release()); - } + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + pipelineToTarget.release()); + } + + sharding::router::CollectionRouter router(expCtx->opCtx->getServiceContext(), expCtx->ns); + return router.route( + expCtx->opCtx, + "targeting pipeline to attach cursors"_sd, + [&](OperationContext* opCtx, const ChunkManager& cm) { + auto pipelineToTarget = pipeline->clone(); - auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); - if (cm.isOK() && !cm.getValue().isSharded()) { + if (!cm.isSharded()) { // If the collection is unsharded and we are on the primary, we should be able to // do a local read. The primary may be moved right after the primary shard check, // but the local read path will do a db version check before it establishes a cursor @@ -1371,7 +1376,7 @@ std::unique_ptr attachCursorToPipeline( expCtx->mongoProcessInterface->setExpectedShardVersion( expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED()); setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion( - expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion()); + expCtx->opCtx, expCtx->ns, cm.dbVersion()); // During 'attachCursorSourceToPipelineForLocalRead', the expected db version // must be set. Whether or not that call is succesful, to avoid affecting later @@ -1424,4 +1429,5 @@ std::unique_ptr attachCursorToPipeline( }); } -} // namespace mongo::sharded_agg_helpers +} // namespace sharded_agg_helpers +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index c312f01776f..867de0bdedf 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -48,12 +48,13 @@ env.Library( source=[ 'cluster_commands_helpers.cpp', 'multi_statement_transaction_requests_sender.cpp', - 'transaction_router.cpp', 'router_transactions_metrics.cpp', 'router_transactions_stats.idl', + 'router.cpp', 'session_catalog_router.cpp', 'stale_shard_version_helpers.cpp', 'transaction_router_resource_yielder.cpp', + 'transaction_router.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/txn_cmd_request', diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 3e78801e64f..e0e72ae107f 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -91,17 +91,6 @@ env.Library( ] ) -env.Library( - target="cluster_client_cursor_mock", - source=[ - "cluster_client_cursor_mock.cpp", - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/shared_request_handling', - ], -) - env.Library( target="store_possible_cursor", source=[ @@ -152,6 +141,7 @@ env.CppUnitTest( "async_results_merger_test.cpp", "blocking_results_merger_test.cpp", "cluster_client_cursor_impl_test.cpp", + "cluster_client_cursor_mock.cpp", "cluster_cursor_manager_test.cpp", "cluster_exchange_test.cpp", "establish_cursors_test.cpp", @@ -167,6 +157,7 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/logical_session_id", "$BUILD_DIR/mongo/db/query/query_request", "$BUILD_DIR/mongo/db/query/query_test_service_context", + "$BUILD_DIR/mongo/db/shared_request_handling", "$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture", "$BUILD_DIR/mongo/s/sharding_router_test_fixture", "$BUILD_DIR/mongo/s/vector_clock_mongos", @@ -174,7 +165,6 @@ env.CppUnitTest( "async_results_merger", "cluster_aggregate", "cluster_client_cursor", - "cluster_client_cursor_mock", "cluster_cursor_manager", "router_exec_stage", "store_possible_cursor", diff --git a/src/mongo/s/router.cpp b/src/mongo/s/router.cpp new file mode 100644 index 00000000000..dba40ad1137 --- /dev/null +++ b/src/mongo/s/router.cpp @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/s/router.h" + +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/stale_exception.h" + +namespace mongo { +namespace sharding { +namespace router { + +RouterBase::RouterBase(ServiceContext* service) : _service(service) {} + +DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, StringData db) + : RouterBase(service), _db(db.toString()) {} + +void DBPrimaryRouter::appendDDLRoutingTokenToCommand(const DatabaseType& dbt, + BSONObjBuilder* builder) { + const auto& dbVersion = dbt.getVersion(); + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } +} + +void DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId, + const DatabaseVersion& dbVersion, + BSONObjBuilder* builder) { + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } + ChunkVersion::UNSHARDED().serializeToBSON(ChunkVersion::kShardVersionField, builder); +} + +CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) const { + auto catalogCache = Grid::get(_service)->catalogCache(); + return uassertStatusOK(catalogCache->getDatabase(opCtx, _db)); +} + +void DBPrimaryRouter::_onException(RouteContext* context, Status s) { + if (++context->numAttempts > kMaxNumStaleVersionRetries) { + uassertStatusOKWithContext( + s, + str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting \'" << context->comment << "\'"); + } else { + LOGV2_DEBUG(637590, + 3, + "Retrying {description}. Got error: {status}", + "description"_attr = context->comment, + "status"_attr = s); + } + + auto catalogCache = Grid::get(_service)->catalogCache(); + + if (s == ErrorCodes::StaleDbVersion) { + auto si = s.extraInfo(); + invariant(si); + invariant(si->getDb() == _db, + str::stream() << "StaleDbVersion on unexpected database. Expected " << _db + << ", received " << si->getDb()); + + catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); + } else { + uassertStatusOK(s); + } +} + +CollectionRouter::CollectionRouter(ServiceContext* service, NamespaceString nss) + : RouterBase(service), _nss(std::move(nss)) {} + +void CollectionRouter::appendCRUDRoutingTokenToCommand(const ShardId& shardId, + const ChunkManager& cm, + BSONObjBuilder* builder) { + auto chunkVersion(cm.getVersion(shardId)); + + if (chunkVersion == ChunkVersion::UNSHARDED()) { + // Need to add the database version as well + const auto& dbVersion = cm.dbVersion(); + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } + } + chunkVersion.serializeToBSON(ChunkVersion::kShardVersionField, builder); +} + +ChunkManager CollectionRouter::_getRoutingInfo(OperationContext* opCtx) const { + auto catalogCache = Grid::get(_service)->catalogCache(); + return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, _nss)); +} + +void CollectionRouter::_onException(RouteContext* context, Status s) { + if (++context->numAttempts > kMaxNumStaleVersionRetries) { + uassertStatusOKWithContext( + s, + str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting \'" << context->comment << "\'"); + } else { + LOGV2_DEBUG(637591, + 3, + "Retrying {description}. Got error: {status}", + "description"_attr = context->comment, + "status"_attr = s); + } + + auto catalogCache = Grid::get(_service)->catalogCache(); + + if (s.isA()) { + if (auto si = s.extraInfo()) { + invariant(si->getNss() == _nss, + str::stream() << "StaleConfig on unexpected namespace. Expected " << _nss + << ", received " << si->getNss()); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + _nss, si->getVersionWanted(), si->getShardId()); + } else { + catalogCache->invalidateCollectionEntry_LINEARIZABLE(_nss); + } + } else if (s == ErrorCodes::StaleDbVersion) { + auto si = s.extraInfo(); + invariant(si); + invariant(si->getDb() == _nss.db(), + str::stream() << "StaleDbVersion on unexpected database. Expected " << _nss.db() + << ", received " << si->getDb()); + + catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); + } else { + uassertStatusOK(s); + } +} + +} // namespace router +} // namespace sharding +} // namespace mongo diff --git a/src/mongo/s/router.h b/src/mongo/s/router.h new file mode 100644 index 00000000000..6cfd1d9e546 --- /dev/null +++ b/src/mongo/s/router.h @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/catalog_cache.h" + +namespace mongo { +namespace sharding { +namespace router { + +class RouterBase { +protected: + RouterBase(ServiceContext* service); + + struct RouteContext { + const std::string comment; + int numAttempts{0}; + }; + + ServiceContext* const _service; +}; + +// Both the router classes below declare the scope in which their 'route' method is executed as a +// router for the relevant database or collection. These classes are the only way to obtain routing +// information for a given entry. + +/** + * This class should mostly be used for routing of DDL operations which need to be coordinated from + * the primary shard of the database. + */ +class DBPrimaryRouter : public RouterBase { +public: + DBPrimaryRouter(ServiceContext* service, StringData db); + + template + auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { + RouteContext context{comment.toString()}; + while (true) { + auto cdb = _getRoutingInfo(opCtx); + try { + return callbackFn(opCtx, cdb); + } catch (const DBException& ex) { + _onException(&context, ex.toStatus()); + } + } + } + + static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder); + + static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId, + const DatabaseVersion& dbVersion, + BSONObjBuilder* builder); + +private: + CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const; + void _onException(RouteContext* context, Status s); + + std::string _db; +}; + +/** + * This class should mostly be used for routing CRUD operations which need to have a view of the + * entire routing table for a collection. + */ +class CollectionRouter : public RouterBase { +public: + CollectionRouter(ServiceContext* service, NamespaceString nss); + + template + auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { + RouteContext context{comment.toString()}; + while (true) { + auto cm = _getRoutingInfo(opCtx); + try { + return callbackFn(opCtx, cm); + } catch (const DBException& ex) { + _onException(&context, ex.toStatus()); + } + } + } + + static void appendCRUDRoutingTokenToCommand(const ShardId& shardId, + const ChunkManager& cm, + BSONObjBuilder* builder); + +private: + ChunkManager _getRoutingInfo(OperationContext* opCtx) const; + void _onException(RouteContext* context, Status s); + + NamespaceString _nss; +}; + +} // namespace router +} // namespace sharding +} // namespace mongo -- cgit v1.2.1