summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-03-15 08:38:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-15 09:06:07 +0000
commitabe5428751586d14241f94f06261c7037690557f (patch)
treeeb38abc66f9cf45566df6426778d89a7cb7a31fa
parent2572165d03ab73a8969dd60f5c0fbda20ad41aef (diff)
downloadmongo-abe5428751586d14241f94f06261c7037690557f.tar.gz
SERVER-63759 First implementation of the Router API
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp22
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp53
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp38
-rw-r--r--src/mongo/s/SConscript3
-rw-r--r--src/mongo/s/query/SConscript14
-rw-r--r--src/mongo/s/router.cpp165
-rw-r--r--src/mongo/s/router.h125
7 files changed, 353 insertions, 67 deletions
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index 8c9528e0e09..ebf3d147c8f 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -27,12 +27,10 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/s/query/sharded_agg_test_fixture.h"
-#include "mongo/s/stale_shard_version_helpers.h"
+#include "mongo/s/router.h"
namespace mongo {
namespace {
@@ -196,15 +194,13 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
const bool hasChangeStream = false;
auto future = launchAsync([&] {
// Shouldn't throw.
- auto results =
- shardVersionRetry(operationContext(),
- Grid::get(getServiceContext())->catalogCache(),
- kTestAggregateNss,
- "dispatch shard pipeline"_sd,
- [&]() {
- return sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, pipeline->clone());
- });
+ sharding::router::CollectionRouter router(getServiceContext(), kTestAggregateNss);
+ auto results = router.route(operationContext(),
+ "dispatch shard pipeline"_sd,
+ [&](OperationContext* opCtx, const ChunkManager& cm) {
+ return sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, pipeline->clone());
+ });
ASSERT_EQ(results.remoteCursors.size(), 1UL);
ASSERT(!bool(results.splitPipeline));
});
@@ -213,7 +209,7 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
return createErrorCursorResponse(
- Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
+ {ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries.
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 2d51dc92697..204806d839a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -50,8 +50,8 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/cluster_write.h"
-#include "mongo/s/grid.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/router.h"
#include "mongo/s/stale_shard_version_helpers.h"
namespace mongo {
@@ -311,37 +311,40 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx,
void ShardServerProcessInterface::createIndexesOnEmptyCollection(
OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) {
- auto cachedDbInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db()));
- BSONObjBuilder newCmdBuilder;
- newCmdBuilder.append("createIndexes", ns.coll());
- newCmdBuilder.append("indexes", indexSpecs);
- newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
- opCtx->getWriteConcern().toBSON());
- auto cmdObj = newCmdBuilder.done();
-
- shardVersionRetry(
+ sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), ns.db());
+ router.route(
opCtx,
- Grid::get(opCtx)->catalogCache(),
- ns,
"copying index for empty collection {}"_format(ns.ns()),
- [&] {
- auto response = executeCommandAgainstDatabasePrimary(
- opCtx,
- ns.db(),
- std::move(cachedDbInfo),
- cmdObj,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append("createIndexes", ns.coll());
+ cmdBuilder.append("indexes", indexSpecs);
+ cmdBuilder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+ sharding::router::DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(
+ cdb->getPrimary(), cdb->getVersion(), &cmdBuilder);
+
+ auto cmdObj = cmdBuilder.obj();
+
+ auto response = std::move(
+ gatherResponses(opCtx,
+ ns.db(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Shard::RetryPolicy::kIdempotent,
+ std::vector<AsyncRequestsSender::Request>{
+ AsyncRequestsSender::Request(cdb->getPrimary(), cmdObj)})
+ .front());
uassertStatusOKWithContext(response.swResponse,
- str::stream() << "failed to run command " << cmdObj);
- auto result = response.swResponse.getValue().data;
+ str::stream() << "command was not sent " << cmdObj);
+ const auto& result = response.swResponse.getValue().data;
uassertStatusOKWithContext(getStatusFromCommandResult(result),
- str::stream() << "failed while running command " << cmdObj);
+ str::stream() << "command was sent but failed " << cmdObj);
uassertStatusOKWithContext(
getWriteConcernStatusFromCommandResult(result),
- str::stream() << "write concern failed while running command " << cmdObj);
+ str::stream()
+ << "command was sent and succeeded, but failed waiting for write concern "
+ << cmdObj);
});
}
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 9c5ea37cbcb..3eaeb1666c8 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -26,11 +26,12 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
-#include "sharded_agg_helpers.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -58,11 +59,13 @@
#include "mongo/s/query/cluster_query_knobs_gen.h"
#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/router.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/visit_helper.h"
-namespace mongo::sharded_agg_helpers {
+namespace mongo {
+namespace sharded_agg_helpers {
MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors);
@@ -613,7 +616,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
}
}
-
} // namespace
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
@@ -1349,19 +1351,22 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
ns == NamespaceString::kChangeStreamPreImagesNamespace);
};
- auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
- return shardVersionRetry(
- expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() {
- auto pipelineToTarget = pipeline->clone();
+ if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
+ shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) {
+ auto pipelineToTarget = pipeline->clone();
- if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
- shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) {
- return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
- pipelineToTarget.release());
- }
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ pipelineToTarget.release());
+ }
+
+ sharding::router::CollectionRouter router(expCtx->opCtx->getServiceContext(), expCtx->ns);
+ return router.route(
+ expCtx->opCtx,
+ "targeting pipeline to attach cursors"_sd,
+ [&](OperationContext* opCtx, const ChunkManager& cm) {
+ auto pipelineToTarget = pipeline->clone();
- auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
- if (cm.isOK() && !cm.getValue().isSharded()) {
+ if (!cm.isSharded()) {
// If the collection is unsharded and we are on the primary, we should be able to
// do a local read. The primary may be moved right after the primary shard check,
// but the local read path will do a db version check before it establishes a cursor
@@ -1371,7 +1376,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
expCtx->mongoProcessInterface->setExpectedShardVersion(
expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED());
setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion(
- expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion());
+ expCtx->opCtx, expCtx->ns, cm.dbVersion());
// During 'attachCursorSourceToPipelineForLocalRead', the expected db version
// must be set. Whether or not that call is succesful, to avoid affecting later
@@ -1424,4 +1429,5 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
});
}
-} // namespace mongo::sharded_agg_helpers
+} // namespace sharded_agg_helpers
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index c312f01776f..867de0bdedf 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -48,12 +48,13 @@ env.Library(
source=[
'cluster_commands_helpers.cpp',
'multi_statement_transaction_requests_sender.cpp',
- 'transaction_router.cpp',
'router_transactions_metrics.cpp',
'router_transactions_stats.idl',
+ 'router.cpp',
'session_catalog_router.cpp',
'stale_shard_version_helpers.cpp',
'transaction_router_resource_yielder.cpp',
+ 'transaction_router.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 3e78801e64f..e0e72ae107f 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -92,17 +92,6 @@ env.Library(
)
env.Library(
- target="cluster_client_cursor_mock",
- source=[
- "cluster_client_cursor_mock.cpp",
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/shared_request_handling',
- ],
-)
-
-env.Library(
target="store_possible_cursor",
source=[
"store_possible_cursor.cpp"
@@ -152,6 +141,7 @@ env.CppUnitTest(
"async_results_merger_test.cpp",
"blocking_results_merger_test.cpp",
"cluster_client_cursor_impl_test.cpp",
+ "cluster_client_cursor_mock.cpp",
"cluster_cursor_manager_test.cpp",
"cluster_exchange_test.cpp",
"establish_cursors_test.cpp",
@@ -167,6 +157,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/logical_session_id",
"$BUILD_DIR/mongo/db/query/query_request",
"$BUILD_DIR/mongo/db/query/query_test_service_context",
+ "$BUILD_DIR/mongo/db/shared_request_handling",
"$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture",
"$BUILD_DIR/mongo/s/sharding_router_test_fixture",
"$BUILD_DIR/mongo/s/vector_clock_mongos",
@@ -174,7 +165,6 @@ env.CppUnitTest(
"async_results_merger",
"cluster_aggregate",
"cluster_client_cursor",
- "cluster_client_cursor_mock",
"cluster_cursor_manager",
"router_exec_stage",
"store_possible_cursor",
diff --git a/src/mongo/s/router.cpp b/src/mongo/s/router.cpp
new file mode 100644
index 00000000000..dba40ad1137
--- /dev/null
+++ b/src/mongo/s/router.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/s/router.h"
+
+#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/stale_exception.h"
+
+namespace mongo {
+namespace sharding {
+namespace router {
+
+RouterBase::RouterBase(ServiceContext* service) : _service(service) {}
+
+DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, StringData db)
+ : RouterBase(service), _db(db.toString()) {}
+
+void DBPrimaryRouter::appendDDLRoutingTokenToCommand(const DatabaseType& dbt,
+ BSONObjBuilder* builder) {
+ const auto& dbVersion = dbt.getVersion();
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+}
+
+void DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId,
+ const DatabaseVersion& dbVersion,
+ BSONObjBuilder* builder) {
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+ ChunkVersion::UNSHARDED().serializeToBSON(ChunkVersion::kShardVersionField, builder);
+}
+
+CachedDatabaseInfo DBPrimaryRouter::_getRoutingInfo(OperationContext* opCtx) const {
+ auto catalogCache = Grid::get(_service)->catalogCache();
+ return uassertStatusOK(catalogCache->getDatabase(opCtx, _db));
+}
+
+void DBPrimaryRouter::_onException(RouteContext* context, Status s) {
+ if (++context->numAttempts > kMaxNumStaleVersionRetries) {
+ uassertStatusOKWithContext(
+ s,
+ str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries
+ << " retries attempting \'" << context->comment << "\'");
+ } else {
+ LOGV2_DEBUG(637590,
+ 3,
+ "Retrying {description}. Got error: {status}",
+ "description"_attr = context->comment,
+ "status"_attr = s);
+ }
+
+ auto catalogCache = Grid::get(_service)->catalogCache();
+
+ if (s == ErrorCodes::StaleDbVersion) {
+ auto si = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(si);
+ invariant(si->getDb() == _db,
+ str::stream() << "StaleDbVersion on unexpected database. Expected " << _db
+ << ", received " << si->getDb());
+
+ catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted());
+ } else {
+ uassertStatusOK(s);
+ }
+}
+
+CollectionRouter::CollectionRouter(ServiceContext* service, NamespaceString nss)
+ : RouterBase(service), _nss(std::move(nss)) {}
+
+void CollectionRouter::appendCRUDRoutingTokenToCommand(const ShardId& shardId,
+ const ChunkManager& cm,
+ BSONObjBuilder* builder) {
+ auto chunkVersion(cm.getVersion(shardId));
+
+ if (chunkVersion == ChunkVersion::UNSHARDED()) {
+ // Need to add the database version as well
+ const auto& dbVersion = cm.dbVersion();
+ if (!dbVersion.isFixed()) {
+ BSONObjBuilder dbvBuilder(builder->subobjStart(DatabaseVersion::kDatabaseVersionField));
+ dbVersion.serialize(&dbvBuilder);
+ }
+ }
+ chunkVersion.serializeToBSON(ChunkVersion::kShardVersionField, builder);
+}
+
+ChunkManager CollectionRouter::_getRoutingInfo(OperationContext* opCtx) const {
+ auto catalogCache = Grid::get(_service)->catalogCache();
+ return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, _nss));
+}
+
+void CollectionRouter::_onException(RouteContext* context, Status s) {
+ if (++context->numAttempts > kMaxNumStaleVersionRetries) {
+ uassertStatusOKWithContext(
+ s,
+ str::stream() << "Exceeded maximum number of " << kMaxNumStaleVersionRetries
+ << " retries attempting \'" << context->comment << "\'");
+ } else {
+ LOGV2_DEBUG(637591,
+ 3,
+ "Retrying {description}. Got error: {status}",
+ "description"_attr = context->comment,
+ "status"_attr = s);
+ }
+
+ auto catalogCache = Grid::get(_service)->catalogCache();
+
+ if (s.isA<ErrorCategory::StaleShardVersionError>()) {
+ if (auto si = s.extraInfo<StaleConfigInfo>()) {
+ invariant(si->getNss() == _nss,
+ str::stream() << "StaleConfig on unexpected namespace. Expected " << _nss
+ << ", received " << si->getNss());
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ _nss, si->getVersionWanted(), si->getShardId());
+ } else {
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(_nss);
+ }
+ } else if (s == ErrorCodes::StaleDbVersion) {
+ auto si = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(si);
+ invariant(si->getDb() == _nss.db(),
+ str::stream() << "StaleDbVersion on unexpected database. Expected " << _nss.db()
+ << ", received " << si->getDb());
+
+ catalogCache->onStaleDatabaseVersion(si->getDb(), si->getVersionWanted());
+ } else {
+ uassertStatusOK(s);
+ }
+}
+
+} // namespace router
+} // namespace sharding
+} // namespace mongo
diff --git a/src/mongo/s/router.h b/src/mongo/s/router.h
new file mode 100644
index 00000000000..6cfd1d9e546
--- /dev/null
+++ b/src/mongo/s/router.h
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/catalog_cache.h"
+
+namespace mongo {
+namespace sharding {
+namespace router {
+
+class RouterBase {
+protected:
+ RouterBase(ServiceContext* service);
+
+ struct RouteContext {
+ const std::string comment;
+ int numAttempts{0};
+ };
+
+ ServiceContext* const _service;
+};
+
+// Both the router classes below declare the scope in which their 'route' method is executed as a
+// router for the relevant database or collection. These classes are the only way to obtain routing
+// information for a given entry.
+
+/**
+ * This class should mostly be used for routing of DDL operations which need to be coordinated from
+ * the primary shard of the database.
+ */
+class DBPrimaryRouter : public RouterBase {
+public:
+ DBPrimaryRouter(ServiceContext* service, StringData db);
+
+ template <typename F>
+ auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
+ RouteContext context{comment.toString()};
+ while (true) {
+ auto cdb = _getRoutingInfo(opCtx);
+ try {
+ return callbackFn(opCtx, cdb);
+ } catch (const DBException& ex) {
+ _onException(&context, ex.toStatus());
+ }
+ }
+ }
+
+ static void appendDDLRoutingTokenToCommand(const DatabaseType& dbt, BSONObjBuilder* builder);
+
+ static void appendCRUDUnshardedRoutingTokenToCommand(const ShardId& shardId,
+ const DatabaseVersion& dbVersion,
+ BSONObjBuilder* builder);
+
+private:
+ CachedDatabaseInfo _getRoutingInfo(OperationContext* opCtx) const;
+ void _onException(RouteContext* context, Status s);
+
+ std::string _db;
+};
+
+/**
+ * This class should mostly be used for routing CRUD operations which need to have a view of the
+ * entire routing table for a collection.
+ */
+class CollectionRouter : public RouterBase {
+public:
+ CollectionRouter(ServiceContext* service, NamespaceString nss);
+
+ template <typename F>
+ auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
+ RouteContext context{comment.toString()};
+ while (true) {
+ auto cm = _getRoutingInfo(opCtx);
+ try {
+ return callbackFn(opCtx, cm);
+ } catch (const DBException& ex) {
+ _onException(&context, ex.toStatus());
+ }
+ }
+ }
+
+ static void appendCRUDRoutingTokenToCommand(const ShardId& shardId,
+ const ChunkManager& cm,
+ BSONObjBuilder* builder);
+
+private:
+ ChunkManager _getRoutingInfo(OperationContext* opCtx) const;
+ void _onException(RouteContext* context, Status s);
+
+ NamespaceString _nss;
+};
+
+} // namespace router
+} // namespace sharding
+} // namespace mongo