summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcos Jose Grillo Ramirez <marcos.grillo@mongodb.com>2021-11-10 09:11:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-15 13:35:56 +0000
commit52e6fb7e647fbad710ec78bd641e0074a4b895a5 (patch)
tree0c8d39fc12aa1ee26c2677dfc42730b6b285e8a5
parentee15ecf8cd75a13a862984ed9c58693fb30b9aaa (diff)
downloadmongo-52e6fb7e647fbad710ec78bd641e0074a4b895a5.tar.gz
SERVER-59806 Change upsert for insert when writing chunks in create sharded collection path
(cherry picked from commit bdc2e9b2ed299f4dfbf6183eed94707afbde8478)
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp63
1 files changed, 30 insertions, 33 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index f9892052a46..603fd762ebf 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/create_gen.h"
@@ -274,46 +275,42 @@ void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx,
str::stream() << "Error removing chunks matching uuid " << uuid);
}
-void upsertChunks(OperationContext* opCtx,
+void insertChunks(OperationContext* opCtx,
std::vector<ChunkType>& chunks,
const OperationSessionInfo& osi) {
- BatchedCommandRequest updateRequest([&]() {
- write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
- std::vector<write_ops::UpdateOpEntry> entries;
+ BatchedCommandRequest insertRequest([&]() {
+ write_ops::InsertCommandRequest insertOp(ChunkType::ConfigNS);
+ std::vector<BSONObj> entries;
entries.reserve(chunks.size());
for (const auto& chunk : chunks) {
- write_ops::UpdateOpEntry entry(
- BSON(ChunkType::collectionUUID << chunk.getCollectionUUID() << ChunkType::shard
- << chunk.getShard() << ChunkType::min
- << chunk.getMin()),
- write_ops::UpdateModification::parseFromClassicUpdate(chunk.toConfigBSON()));
- entry.setUpsert(true);
- entry.setMulti(false);
- entries.push_back(entry);
+ entries.push_back(chunk.toConfigBSON());
}
- updateOp.setUpdates(entries);
- return updateOp;
+ insertOp.setDocuments(entries);
+ return insertOp;
}());
- updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
-
- const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON());
+ insertRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ {
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("CreateCollectionCoordinator::insertChunks");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
- BatchedCommandResponse batchResponse;
- const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- const auto response =
- configShard->runCommand(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- ChunkType::ConfigNS.db().toString(),
- cmdObj,
- Shard::kDefaultConfigCommandTimeout,
- Shard::RetryPolicy::kIdempotent);
-
- const auto writeStatus =
- Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
-
- uassertStatusOK(batchResponse.toStatus());
- uassertStatusOK(writeStatus);
+ AlternativeClientRegion acr(newClient);
+ auto executor =
+ Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtx = CancelableOperationContext(
+ cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
+ newOpCtx->setLogicalSessionId(*osi.getSessionId());
+ newOpCtx->setTxnNumber(*osi.getTxnNumber());
+
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ cluster::write(newOpCtx.get(), insertRequest, &stats, &response);
+ uassertStatusOK(response.toStatus());
+ }
}
void updateCatalogEntry(OperationContext* opCtx,
@@ -823,7 +820,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
// Upsert Chunks.
_doc = _updateSession(opCtx, _doc);
- upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
+ insertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
CollectionType coll(nss(),
_initialChunks.collVersion().epoch(),