summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcos Jose Grillo Ramirez <marcos.grillo@mongodb.com>2021-11-10 09:11:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-15 13:11:12 +0000
commit4da342b293978544e1f17da5dcccb47211c298bb (patch)
tree92b12c65210060ff0de7a2abbe6c51b8b0278fc1
parent810151fafee6c4b648a19c24c5ab9416a6cc5d17 (diff)
downloadmongo-4da342b293978544e1f17da5dcccb47211c298bb.tar.gz
SERVER-59806 Change upsert for insert when writing chunks in create sharded collection path
(cherry picked from commit bdc2e9b2ed299f4dfbf6183eed94707afbde8478)
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp63
1 files changed, 30 insertions, 33 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index dca61f1644e..44571b18cf8 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/create_gen.h"
@@ -274,46 +275,42 @@ void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx,
str::stream() << "Error removing chunks matching uuid " << uuid);
}
-void upsertChunks(OperationContext* opCtx,
+void insertChunks(OperationContext* opCtx,
std::vector<ChunkType>& chunks,
const OperationSessionInfo& osi) {
- BatchedCommandRequest updateRequest([&]() {
- write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
- std::vector<write_ops::UpdateOpEntry> entries;
+ BatchedCommandRequest insertRequest([&]() {
+ write_ops::InsertCommandRequest insertOp(ChunkType::ConfigNS);
+ std::vector<BSONObj> entries;
entries.reserve(chunks.size());
for (const auto& chunk : chunks) {
- write_ops::UpdateOpEntry entry(
- BSON(ChunkType::collectionUUID << chunk.getCollectionUUID() << ChunkType::shard
- << chunk.getShard() << ChunkType::min
- << chunk.getMin()),
- write_ops::UpdateModification::parseFromClassicUpdate(chunk.toConfigBSON()));
- entry.setUpsert(true);
- entry.setMulti(false);
- entries.push_back(entry);
+ entries.push_back(chunk.toConfigBSON());
}
- updateOp.setUpdates(entries);
- return updateOp;
+ insertOp.setDocuments(entries);
+ return insertOp;
}());
- updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
-
- const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON());
+ insertRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ {
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("CreateCollectionCoordinator::insertChunks");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
- BatchedCommandResponse batchResponse;
- const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- const auto response =
- configShard->runCommand(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- ChunkType::ConfigNS.db().toString(),
- cmdObj,
- Shard::kDefaultConfigCommandTimeout,
- Shard::RetryPolicy::kIdempotent);
-
- const auto writeStatus =
- Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
-
- uassertStatusOK(batchResponse.toStatus());
- uassertStatusOK(writeStatus);
+ AlternativeClientRegion acr(newClient);
+ auto executor =
+ Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtx = CancelableOperationContext(
+ cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
+ newOpCtx->setLogicalSessionId(*osi.getSessionId());
+ newOpCtx->setTxnNumber(*osi.getTxnNumber());
+
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ cluster::write(newOpCtx.get(), insertRequest, &stats, &response);
+ uassertStatusOK(response.toStatus());
+ }
}
void updateCatalogEntry(OperationContext* opCtx,
@@ -829,7 +826,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
// Upsert Chunks.
_doc = _updateSession(opCtx, _doc);
- upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
+ insertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
CollectionType coll(nss(),
_initialChunks.collVersion().epoch(),