diff options
Diffstat (limited to 'src/mongo/db/s/global_index/global_index_inserter.cpp')
-rw-r--r-- | src/mongo/db/s/global_index/global_index_inserter.cpp | 76 |
1 files changed, 39 insertions, 37 deletions
diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp index 673810360cd..93cbbb20049 100644 --- a/src/mongo/db/s/global_index/global_index_inserter.cpp +++ b/src/mongo/db/s/global_index/global_index_inserter.cpp @@ -64,43 +64,45 @@ NamespaceString GlobalIndexInserter::_skipIdNss() { void GlobalIndexInserter::processDoc(OperationContext* opCtx, const BSONObj& indexKeyValues, const BSONObj& documentKey) { - auto insertToGlobalIndexFn = - [this, service = opCtx->getServiceContext(), indexKeyValues, documentKey]( - const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FindCommandRequest skipIdQuery(_skipIdNss()); - skipIdQuery.setFilter(BSON("_id" << documentKey)); - skipIdQuery.setLimit(1); - - return txnClient.exhaustiveFind(skipIdQuery) - .thenRunOn(txnExec) - .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec]( - const auto& skipIdDocResults) { - auto client = service->makeClient("globalIndexInserter"); - auto opCtx = service->makeOperationContext(client.get()); - globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get()); - - if (!skipIdDocResults.empty()) { - return SemiFuture<void>::makeReady(); - } - - InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID); - // Note: dbName is unused by command but required by idl. - globalIndexEntryInsert.setDbName(DatabaseName::kAdmin); - globalIndexEntryInsert.setGlobalIndexKeyEntry( - GlobalIndexKeyEntry(indexKeyValues, documentKey)); - - return txnClient.runCommand(_nss.dbName(), globalIndexEntryInsert.toBSON({})) - .thenRunOn(txnExec) - .then([this, documentKey, &txnClient](const auto& commandResponse) { - write_ops::InsertCommandRequest skipIdInsert(_skipIdNss()); - - skipIdInsert.setDocuments({BSON("_id" << documentKey)}); - return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue(); - }) - .semi(); - }) - .semi(); - }; + auto insertToGlobalIndexFn = [this, + service = opCtx->getServiceContext(), + indexKeyValues, + documentKey](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + FindCommandRequest skipIdQuery(_skipIdNss()); + skipIdQuery.setFilter(BSON("_id" << documentKey)); + skipIdQuery.setLimit(1); + + return txnClient.exhaustiveFind(skipIdQuery) + .thenRunOn(txnExec) + .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec]( + const auto& skipIdDocResults) { + auto client = service->makeClient("globalIndexInserter"); + auto opCtx = service->makeOperationContext(client.get()); + globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get()); + + if (!skipIdDocResults.empty()) { + return SemiFuture<void>::makeReady(); + } + + InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID); + // Note: dbName is unused by command but required by idl. + globalIndexEntryInsert.setDbName(DatabaseName::kAdmin); + globalIndexEntryInsert.setGlobalIndexKeyEntry( + GlobalIndexKeyEntry(indexKeyValues, documentKey)); + + return txnClient.runCommandChecked(_nss.dbName(), globalIndexEntryInsert.toBSON({})) + .thenRunOn(txnExec) + .then([this, documentKey, &txnClient](const auto& commandResponse) { + write_ops::InsertCommandRequest skipIdInsert(_skipIdNss()); + + skipIdInsert.setDocuments({BSON("_id" << documentKey)}); + return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue(); + }) + .semi(); + }) + .semi(); + }; auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr, inlineExecutor); |