diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-03-15 08:38:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-15 09:06:07 +0000 |
commit | abe5428751586d14241f94f06261c7037690557f (patch) | |
tree | eb38abc66f9cf45566df6426778d89a7cb7a31fa /src/mongo/s | |
parent | 2572165d03ab73a8969dd60f5c0fbda20ad41aef (diff) | |
download | mongo-abe5428751586d14241f94f06261c7037690557f.tar.gz |
SERVER-63759 First implementation of the Router API
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/query/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/s/router.cpp | 165 | ||||
-rw-r--r-- | src/mongo/s/router.h | 125 |
4 files changed, 294 insertions, 13 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index c312f01776f..867de0bdedf 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -48,12 +48,13 @@ env.Library( source=[ 'cluster_commands_helpers.cpp', 'multi_statement_transaction_requests_sender.cpp', - 'transaction_router.cpp', 'router_transactions_metrics.cpp', 'router_transactions_stats.idl', + 'router.cpp', 'session_catalog_router.cpp', 'stale_shard_version_helpers.cpp', 'transaction_router_resource_yielder.cpp', + 'transaction_router.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/txn_cmd_request', diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 3e78801e64f..e0e72ae107f 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -92,17 +92,6 @@ env.Library( ) env.Library( - target="cluster_client_cursor_mock", - source=[ - "cluster_client_cursor_mock.cpp", - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/shared_request_handling', - ], -) - -env.Library( target="store_possible_cursor", source=[ "store_possible_cursor.cpp" @@ -152,6 +141,7 @@ env.CppUnitTest( "async_results_merger_test.cpp", "blocking_results_merger_test.cpp", "cluster_client_cursor_impl_test.cpp", + "cluster_client_cursor_mock.cpp", "cluster_cursor_manager_test.cpp", "cluster_exchange_test.cpp", "establish_cursors_test.cpp", @@ -167,6 +157,7 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/logical_session_id", "$BUILD_DIR/mongo/db/query/query_request", "$BUILD_DIR/mongo/db/query/query_test_service_context", + "$BUILD_DIR/mongo/db/shared_request_handling", "$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture", "$BUILD_DIR/mongo/s/sharding_router_test_fixture", "$BUILD_DIR/mongo/s/vector_clock_mongos", @@ -174,7 +165,6 @@ env.CppUnitTest( "async_results_merger", "cluster_aggregate", "cluster_client_cursor", - "cluster_client_cursor_mock", "cluster_cursor_manager", "router_exec_stage", "store_possible_cursor", diff --git a/src/mongo/s/router.cpp b/src/mongo/s/router.cpp new file mode 100644 index 00000000000..dba40ad1137 --- /dev/null +++ b/src/mongo/s/router.cpp @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/s/router.h" + +#include "mongo/logv2/log.h" +#include "mongo/s/grid.h" +#include "mongo/s/stale_exception.h" + +namespace mongo { +namespace sharding { +namespace router { + +RouterBase::RouterBase(ServiceContext* service) : _service(service) {} + +DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, StringData db) + : RouterBase(service), _db(db.toString()) {} + +void DBPrimaryRouter::appendDDLRoutingTokenToCommand(const DatabaseType& dbt, + BSONObjBuilder* builder) { + const auto& dbVersion = dbt.getVersion(); + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } +} + +void DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId, + const DatabaseVersion& dbVersion, + BSONObjBuilder* builder) { + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } + ChunkVersion::UNSHARDED().serializeToBSON(ChunkVersion::kShardVersionField, builder); +} + +CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) const { + auto catalogCache = Grid::get(_service)->catalogCache(); + return uassertStatusOK(catalogCache->getDatabase(opCtx, _db)); +} + +void DBPrimaryRouter::_onException(RouteContext* context, Status s) { + if (++context->numAttempts > kMaxNumStaleVersionRetries) { + uassertStatusOKWithContext( + s, + str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting \'" << context->comment << "\'"); + } else { + LOGV2_DEBUG(637590, + 3, + "Retrying {description}. Got error: {status}", + "description"_attr = context->comment, + "status"_attr = s); + } + + auto catalogCache = Grid::get(_service)->catalogCache(); + + if (s == ErrorCodes::StaleDbVersion) { + auto si = s.extraInfo<StaleDbRoutingVersion>(); + invariant(si); + invariant(si->getDb() == _db, + str::stream() << "StaleDbVersion on unexpected database. Expected " << _db + << ", received " << si->getDb()); + + catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); + } else { + uassertStatusOK(s); + } +} + +CollectionRouter::CollectionRouter(ServiceContext* service, NamespaceString nss) + : RouterBase(service), _nss(std::move(nss)) {} + +void CollectionRouter::appendCRUDRoutingTokenToCommand(const ShardId& shardId, + const ChunkManager& cm, + BSONObjBuilder* builder) { + auto chunkVersion(cm.getVersion(shardId)); + + if (chunkVersion == ChunkVersion::UNSHARDED()) { + // Need to add the database version as well + const auto& dbVersion = cm.dbVersion(); + if (!dbVersion.isFixed()) { + BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField)); + dbVersion.serialize(&dbvBuilder); + } + } + chunkVersion.serializeToBSON(ChunkVersion::kShardVersionField, builder); +} + +ChunkManager CollectionRouter::_getRoutingInfo(OperationContext* opCtx) const { + auto catalogCache = Grid::get(_service)->catalogCache(); + return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, _nss)); +} + +void CollectionRouter::_onException(RouteContext* context, Status s) { + if (++context->numAttempts > kMaxNumStaleVersionRetries) { + uassertStatusOKWithContext( + s, + str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting \'" << context->comment << "\'"); + } else { + LOGV2_DEBUG(637591, + 3, + "Retrying {description}. Got error: {status}", + "description"_attr = context->comment, + "status"_attr = s); + } + + auto catalogCache = Grid::get(_service)->catalogCache(); + + if (s.isA<ErrorCategory::StaleShardVersionError>()) { + if (auto si = s.extraInfo<StaleConfigInfo>()) { + invariant(si->getNss() == _nss, + str::stream() << "StaleConfig on unexpected namespace. Expected " << _nss + << ", received " << si->getNss()); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + _nss, si->getVersionWanted(), si->getShardId()); + } else { + catalogCache->invalidateCollectionEntry_LINEARIZABLE(_nss); + } + } else if (s == ErrorCodes::StaleDbVersion) { + auto si = s.extraInfo<StaleDbRoutingVersion>(); + invariant(si); + invariant(si->getDb() == _nss.db(), + str::stream() << "StaleDbVersion on unexpected database. Expected " << _nss.db() + << ", received " << si->getDb()); + + catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted()); + } else { + uassertStatusOK(s); + } +} + +} // namespace router +} // namespace sharding +} // namespace mongo diff --git a/src/mongo/s/router.h b/src/mongo/s/router.h new file mode 100644 index 00000000000..6cfd1d9e546 --- /dev/null +++ b/src/mongo/s/router.h @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/catalog_cache.h" + +namespace mongo { +namespace sharding { +namespace router { + +class RouterBase { +protected: + RouterBase(ServiceContext* service); + + struct RouteContext { + const std::string comment; + int numAttempts{0}; + }; + + ServiceContext* const _service; +}; + +// Both the router classes below declare the scope in which their 'route' method is executed as a +// router for the relevant database or collection. These classes are the only way to obtain routing +// information for a given entry. + +/** + * This class should mostly be used for routing of DDL operations which need to be coordinated from + * the primary shard of the database. + */ +class DBPrimaryRouter : public RouterBase { +public: + DBPrimaryRouter(ServiceContext* service, StringData db); + + template <typename F> + auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { + RouteContext context{comment.toString()}; + while (true) { + auto cdb = _getRoutingInfo(opCtx); + try { + return callbackFn(opCtx, cdb); + } catch (const DBException& ex) { + _onException(&context, ex.toStatus()); + } + } + } + + static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder); + + static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId, + const DatabaseVersion& dbVersion, + BSONObjBuilder* builder); + +private: + CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const; + void _onException(RouteContext* context, Status s); + + std::string _db; +}; + +/** + * This class should mostly be used for routing CRUD operations which need to have a view of the + * entire routing table for a collection. + */ +class CollectionRouter : public RouterBase { +public: + CollectionRouter(ServiceContext* service, NamespaceString nss); + + template <typename F> + auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { + RouteContext context{comment.toString()}; + while (true) { + auto cm = _getRoutingInfo(opCtx); + try { + return callbackFn(opCtx, cm); + } catch (const DBException& ex) { + _onException(&context, ex.toStatus()); + } + } + } + + static void appendCRUDRoutingTokenToCommand(const ShardId& shardId, + const ChunkManager& cm, + BSONObjBuilder* builder); + +private: + ChunkManager _getRoutingInfo(OperationContext* opCtx) const; + void _onException(RouteContext* context, Status s); + + NamespaceString _nss; +}; + +} // namespace router +} // namespace sharding +} // namespace mongo |