diff options
author | Marcos Jose Grillo Ramirez <marcos.grillo@mongodb.com> | 2021-11-10 09:11:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-15 13:11:12 +0000 |
commit | 4da342b293978544e1f17da5dcccb47211c298bb (patch) | |
tree | 92b12c65210060ff0de7a2abbe6c51b8b0278fc1 | |
parent | 810151fafee6c4b648a19c24c5ab9416a6cc5d17 (diff) | |
download | mongo-4da342b293978544e1f17da5dcccb47211c298bb.tar.gz |
SERVER-59806 Change upsert for insert when writing chunks in create sharded collection path
(cherry picked from commit bdc2e9b2ed299f4dfbf6183eed94707afbde8478)
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 63 |
1 files changed, 30 insertions, 33 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index dca61f1644e..44571b18cf8 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -33,6 +33,7 @@ #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/create_gen.h" @@ -274,46 +275,42 @@ void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx, str::stream() << "Error removing chunks matching uuid " << uuid); } -void upsertChunks(OperationContext* opCtx, +void insertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks, const OperationSessionInfo& osi) { - BatchedCommandRequest updateRequest([&]() { - write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); - std::vector<write_ops::UpdateOpEntry> entries; + BatchedCommandRequest insertRequest([&]() { + write_ops::InsertCommandRequest insertOp(ChunkType::ConfigNS); + std::vector<BSONObj> entries; entries.reserve(chunks.size()); for (const auto& chunk : chunks) { - write_ops::UpdateOpEntry entry( - BSON(ChunkType::collectionUUID << chunk.getCollectionUUID() << ChunkType::shard - << chunk.getShard() << ChunkType::min - << chunk.getMin()), - write_ops::UpdateModification::parseFromClassicUpdate(chunk.toConfigBSON())); - entry.setUpsert(true); - entry.setMulti(false); - entries.push_back(entry); + entries.push_back(chunk.toConfigBSON()); } - updateOp.setUpdates(entries); - return updateOp; + insertOp.setDocuments(entries); + return insertOp; }()); - updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - - const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); + insertRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + { + auto newClient = + opCtx->getServiceContext()->makeClient("CreateCollectionCoordinator::insertChunks"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } - BatchedCommandResponse batchResponse; - const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - const auto response = - configShard->runCommand(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - ChunkType::ConfigNS.db().toString(), - cmdObj, - Shard::kDefaultConfigCommandTimeout, - Shard::RetryPolicy::kIdempotent); - - const auto writeStatus = - Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); - - uassertStatusOK(batchResponse.toStatus()); - uassertStatusOK(writeStatus); + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtx = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + newOpCtx->setLogicalSessionId(*osi.getSessionId()); + newOpCtx->setTxnNumber(*osi.getTxnNumber()); + + BatchedCommandResponse response; + BatchWriteExecStats stats; + cluster::write(newOpCtx.get(), insertRequest, &stats, &response); + uassertStatusOK(response.toStatus()); + } } void updateCatalogEntry(OperationContext* opCtx, @@ -829,7 +826,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { // Upsert Chunks. _doc = _updateSession(opCtx, _doc); - upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc)); + insertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc)); CollectionType coll(nss(), _initialChunks.collVersion().epoch(), |