summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp')
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 2d51dc92697..204806d839a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -50,8 +50,8 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/cluster_write.h"
-#include "mongo/s/grid.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/router.h"
#include "mongo/s/stale_shard_version_helpers.h"
namespace mongo {
@@ -311,37 +311,40 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx,
void ShardServerProcessInterface::createIndexesOnEmptyCollection(
OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) {
- auto cachedDbInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db()));
- BSONObjBuilder newCmdBuilder;
- newCmdBuilder.append("createIndexes", ns.coll());
- newCmdBuilder.append("indexes", indexSpecs);
- newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
- opCtx->getWriteConcern().toBSON());
- auto cmdObj = newCmdBuilder.done();
-
- shardVersionRetry(
+ sharding::router::DBPrimaryRouter router(opCtx->getServiceContext(), ns.db());
+ router.route(
opCtx,
- Grid::get(opCtx)->catalogCache(),
- ns,
"copying index for empty collection {}"_format(ns.ns()),
- [&] {
- auto response = executeCommandAgainstDatabasePrimary(
- opCtx,
- ns.db(),
- std::move(cachedDbInfo),
- cmdObj,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append("createIndexes", ns.coll());
+ cmdBuilder.append("indexes", indexSpecs);
+ cmdBuilder.append(WriteConcernOptions::kWriteConcernField,
+ opCtx->getWriteConcern().toBSON());
+ sharding::router::DBPrimaryRouter::appendCRUDUnshardedRoutingTokenToCommand(
+ cdb->getPrimary(), cdb->getVersion(), &cmdBuilder);
+
+ auto cmdObj = cmdBuilder.obj();
+
+ auto response = std::move(
+ gatherResponses(opCtx,
+ ns.db(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Shard::RetryPolicy::kIdempotent,
+ std::vector<AsyncRequestsSender::Request>{
+ AsyncRequestsSender::Request(cdb->getPrimary(), cmdObj)})
+ .front());
uassertStatusOKWithContext(response.swResponse,
- str::stream() << "failed to run command " << cmdObj);
- auto result = response.swResponse.getValue().data;
+ str::stream() << "command was not sent " << cmdObj);
+ const auto& result = response.swResponse.getValue().data;
uassertStatusOKWithContext(getStatusFromCommandResult(result),
- str::stream() << "failed while running command " << cmdObj);
+ str::stream() << "command was sent but failed " << cmdObj);
uassertStatusOKWithContext(
getWriteConcernStatusFromCommandResult(result),
- str::stream() << "write concern failed while running command " << cmdObj);
+ str::stream()
+ << "command was sent and succeeded, but failed waiting for write concern "
+ << cmdObj);
});
}