summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-03-15 08:38:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-15 09:06:07 +0000
commitabe5428751586d14241f94f06261c7037690557f (patch)
treeeb38abc66f9cf45566df6426778d89a7cb7a31fa /src/mongo/s
parent2572165d03ab73a8969dd60f5c0fbda20ad41aef (diff)
downloadmongo-abe5428751586d14241f94f06261c7037690557f.tar.gz
SERVER-63759 First implementation of the Router API
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript3
-rw-r--r--src/mongo/s/query/SConscript14
-rw-r--r--src/mongo/s/router.cpp165
-rw-r--r--src/mongo/s/router.h125
4 files changed, 294 insertions, 13 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index c312f01776f..867de0bdedf 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -48,12 +48,13 @@ env.Library(
source=[
'cluster_commands_helpers.cpp',
'multi_statement_transaction_requests_sender.cpp',
- 'transaction_router.cpp',
'router_transactions_metrics.cpp',
'router_transactions_stats.idl',
+ 'router.cpp',
'session_catalog_router.cpp',
'stale_shard_version_helpers.cpp',
'transaction_router_resource_yielder.cpp',
+ 'transaction_router.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 3e78801e64f..e0e72ae107f 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -92,17 +92,6 @@ env.Library(
)
env.Library(
- target="cluster_client_cursor_mock",
- source=[
- "cluster_client_cursor_mock.cpp",
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/shared_request_handling',
- ],
-)
-
-env.Library(
target="store_possible_cursor",
source=[
"store_possible_cursor.cpp"
@@ -152,6 +141,7 @@ env.CppUnitTest(
"async_results_merger_test.cpp",
"blocking_results_merger_test.cpp",
"cluster_client_cursor_impl_test.cpp",
+ "cluster_client_cursor_mock.cpp",
"cluster_cursor_manager_test.cpp",
"cluster_exchange_test.cpp",
"establish_cursors_test.cpp",
@@ -167,6 +157,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/logical_session_id",
"$BUILD_DIR/mongo/db/query/query_request",
"$BUILD_DIR/mongo/db/query/query_test_service_context",
+ "$BUILD_DIR/mongo/db/shared_request_handling",
"$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture",
"$BUILD_DIR/mongo/s/sharding_router_test_fixture",
"$BUILD_DIR/mongo/s/vector_clock_mongos",
@@ -174,7 +165,6 @@ env.CppUnitTest(
"async_results_merger",
"cluster_aggregate",
"cluster_client_cursor",
- "cluster_client_cursor_mock",
"cluster_cursor_manager",
"router_exec_stage",
"store_possible_cursor",
diff --git a/src/mongo/s/router.cpp b/src/mongo/s/router.cpp
new file mode 100644
index 00000000000..dba40ad1137
--- /dev/null
+++ b/src/mongo/s/router.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/s/router.h"
+
+#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/stale_exception.h"
+
+namespace mongo {
+namespace sharding {
+namespace router {
+
+RouterBase::RouterBase(ServiceContext* service) : _service(service) {}
+
+DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, StringData db)
+ : RouterBase(service), _db(db.toString()) {}
+
+void DBPrimaryRouter::appendDDLRoutingTokenToCommand(const DatabaseType& dbt,
+ BSONObjBuilder* builder) {
+ const auto& dbVersion = dbt.getVersion();
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+}
+
+void DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId,
+ const DatabaseVersion& dbVersion,
+ BSONObjBuilder* builder) {
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+ ChunkVersion::UNSHARDED().serializeToBSON(ChunkVersion::kShardVersionField, builder);
+}
+
+CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) const {
+ auto catalogCache = Grid::get(_service)->catalogCache();
+ return uassertStatusOK(catalogCache->getDatabase(opCtx, _db));
+}
+
+void DBPrimaryRouter::_onException(RouteContext* context, Status s) {
+ if (++context->numAttempts > kMaxNumStaleVersionRetries) {
+ uassertStatusOKWithContext(
+ s,
+ str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries
+ << " retries attempting \'" << context->comment << "\'");
+ } else {
+ LOGV2_DEBUG(637590,
+ 3,
+ "Retrying {description}. Got error: {status}",
+ "description"_attr = context->comment,
+ "status"_attr = s);
+ }
+
+ auto catalogCache = Grid::get(_service)->catalogCache();
+
+ if (s == ErrorCodes::StaleDbVersion) {
+ auto si = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(si);
+ invariant(si->getDb() == _db,
+ str::stream() << "StaleDbVersion on unexpected database. Expected " << _db
+ << ", received " << si->getDb());
+
+ catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted());
+ } else {
+ uassertStatusOK(s);
+ }
+}
+
+CollectionRouter::CollectionRouter(ServiceContext* service, NamespaceString nss)
+ : RouterBase(service), _nss(std::move(nss)) {}
+
+void CollectionRouter::appendCRUDRoutingTokenToCommand(const ShardId& shardId,
+ const ChunkManager& cm,
+ BSONObjBuilder* builder) {
+ auto chunkVersion(cm.getVersion(shardId));
+
+ if (chunkVersion == ChunkVersion::UNSHARDED()) {
+ // Need to add the database version as well
+ const auto& dbVersion = cm.dbVersion();
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+ }
+ chunkVersion.serializeToBSON(ChunkVersion::kShardVersionField, builder);
+}
+
+ChunkManager CollectionRouter::_getRoutingInfo(OperationContext* opCtx) const {
+ auto catalogCache = Grid::get(_service)->catalogCache();
+ return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, _nss));
+}
+
+void CollectionRouter::_onException(RouteContext* context, Status s) {
+ if (++context->numAttempts > kMaxNumStaleVersionRetries) {
+ uassertStatusOKWithContext(
+ s,
+ str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries
+ << " retries attempting \'" << context->comment << "\'");
+ } else {
+ LOGV2_DEBUG(637591,
+ 3,
+ "Retrying {description}. Got error: {status}",
+ "description"_attr = context->comment,
+ "status"_attr = s);
+ }
+
+ auto catalogCache = Grid::get(_service)->catalogCache();
+
+ if (s.isA<ErrorCategory::StaleShardVersionError>()) {
+ if (auto si = s.extraInfo<StaleConfigInfo>()) {
+ invariant(si->getNss() == _nss,
+ str::stream() << "StaleConfig on unexpected namespace. Expected " << _nss
+ << ", received " << si->getNss());
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ _nss, si->getVersionWanted(), si->getShardId());
+ } else {
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(_nss);
+ }
+ } else if (s == ErrorCodes::StaleDbVersion) {
+ auto si = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(si);
+ invariant(si->getDb() == _nss.db(),
+ str::stream() << "StaleDbVersion on unexpected database. Expected " << _nss.db()
+ << ", received " << si->getDb());
+
+ catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted());
+ } else {
+ uassertStatusOK(s);
+ }
+}
+
+} // namespace router
+} // namespace sharding
+} // namespace mongo
diff --git a/src/mongo/s/router.h b/src/mongo/s/router.h
new file mode 100644
index 00000000000..6cfd1d9e546
--- /dev/null
+++ b/src/mongo/s/router.h
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/catalog_cache.h"
+
+namespace mongo {
+namespace sharding {
+namespace router {
+
+class RouterBase {
+protected:
+ RouterBase(ServiceContext* service);
+
+ struct RouteContext {
+ const std::string comment;
+ int numAttempts{0};
+ };
+
+ ServiceContext* const _service;
+};
+
+// Both the router classes below declare the scope in which their 'route' method is executed as a
+// router for the relevant database or collection. These classes are the only way to obtain routing
+// information for a given entry.
+
+/**
+ * This class should mostly be used for routing of DDL operations which need to be coordinated from
+ * the primary shard of the database.
+ */
+class DBPrimaryRouter : public RouterBase {
+public:
+ DBPrimaryRouter(ServiceContext* service, StringData db);
+
+ template <typename F>
+ auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
+ RouteContext context{comment.toString()};
+ while (true) {
+ auto cdb = _getRoutingInfo(opCtx);
+ try {
+ return callbackFn(opCtx, cdb);
+ } catch (const DBException& ex) {
+ _onException(&context, ex.toStatus());
+ }
+ }
+ }
+
+ static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder);
+
+ static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId,
+ const DatabaseVersion& dbVersion,
+ BSONObjBuilder* builder);
+
+private:
+ CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const;
+ void _onException(RouteContext* context, Status s);
+
+ std::string _db;
+};
+
+/**
+ * This class should mostly be used for routing CRUD operations which need to have a view of the
+ * entire routing table for a collection.
+ */
+class CollectionRouter : public RouterBase {
+public:
+ CollectionRouter(ServiceContext* service, NamespaceString nss);
+
+ template <typename F>
+ auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
+ RouteContext context{comment.toString()};
+ while (true) {
+ auto cm = _getRoutingInfo(opCtx);
+ try {
+ return callbackFn(opCtx, cm);
+ } catch (const DBException& ex) {
+ _onException(&context, ex.toStatus());
+ }
+ }
+ }
+
+ static void appendCRUDRoutingTokenToCommand(const ShardId& shardId,
+ const ChunkManager& cm,
+ BSONObjBuilder* builder);
+
+private:
+ ChunkManager _getRoutingInfo(OperationContext* opCtx) const;
+ void _onException(RouteContext* context, Status s);
+
+ NamespaceString _nss;
+};
+
+} // namespace router
+} // namespace sharding
+} // namespace mongo