diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner.cpp | 67 |
1 files changed, 16 insertions, 51 deletions
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index ab3b0508510..834fb288f47 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -99,21 +99,6 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " tenant collection clone progress"), - _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) { - auto task = [ this, work = std::move(work) ]( - OperationContext * opCtx, - const Status& status) mutable noexcept->TaskRunner::NextAction { - try { - work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); - } catch (const DBException& e) { - setSyncFailedStatus(e.toStatus()); - } - return TaskRunner::NextAction::kDisposeOperationContext; - }; - _dbWorkTaskRunner.schedule(std::move(task)); - return executor::TaskExecutor::CallbackHandle(); - }), - _dbWorkTaskRunner(dbPool), _tenantId(tenantId) { invariant(sourceNss.isValid()); invariant(ClonerUtils::isNamespaceForTenant(sourceNss, tenantId)); @@ -150,10 +135,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerSta "namespace"_attr = getCloner()->getSourceNss(), "uuid"_attr = getCloner()->getSourceUuid(), "tenantId"_attr = getCloner()->getTenantId()); - getCloner()->waitForDatabaseWorkToComplete(); return kSkipRemainingStages; } catch (const DBException&) { - getCloner()->waitForDatabaseWorkToComplete(); throw; } } @@ -469,7 +452,6 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() { ScopedMetadataWriterAndReader mwr(getClient(), requestMetadataWriter, replyMetadataReader); runQuery(); - waitForDatabaseWorkToComplete(); return kContinueNormally; } @@ -505,24 +487,16 @@ void TenantCollectionCloner::runQuery() { } void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) { + std::vector<BSONObj> docsToInsert; { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; while (cursor.moreInCurrentBatch()) { - _documentsToInsert.emplace_back(cursor.nextSafe()); + docsToInsert.emplace_back(cursor.nextSafe()); } } - // Schedule the next document batch insertion. - auto&& scheduleResult = _scheduleDbWorkFn( - [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); }); - - if (!scheduleResult.isOK()) { - Status newStatus = scheduleResult.getStatus().withContext( - str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'"); - // We must throw an exception to terminate query. - uassertStatusOK(newStatus); - } + insertDocuments(std::move(docsToInsert)); tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { @@ -546,37 +520,32 @@ void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) { } -void TenantCollectionCloner::insertDocumentsCallback( - const executor::TaskExecutor::CallbackArgs& cbd) { - uassertStatusOK(cbd.status); - std::vector<BSONObj> docs; +void TenantCollectionCloner::insertDocuments(std::vector<BSONObj> docsToInsert) { + invariant(docsToInsert.size(), + "Document size can't be non-zero:: namespace: {}, tenantId: {}"_format( + _sourceNss.toString(), _tenantId)); { stdx::lock_guard<Latch> lk(_mutex); - if (_documentsToInsert.size() == 0) { - LOGV2_WARNING(4884507, - "insertDocumentsCallback, but no documents to insert", - "namespace"_attr = _sourceNss, - "tenantId"_attr = _tenantId); - return; - } - _documentsToInsert.swap(docs); - _stats.documentsCopied += docs.size(); + _stats.documentsCopied += docsToInsert.size(); _stats.approxTotalBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize; ++_stats.insertedBatches; - _progressMeter.hit(int(docs.size())); + _progressMeter.hit(int(docsToInsert.size())); } + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + // Disabling the internal document validation for inserts on recipient side as those // validations should have already been performed on donor's primary during tenant // collection document insertion. DisableDocumentValidation documentValidationDisabler( - cbd.opCtx, + opCtx, DocumentValidationSettings::kDisableSchemaValidation | DocumentValidationSettings::kDisableInternalValidation); write_ops::InsertCommandRequest insertOp(_existingNss.value_or(_sourceNss)); - insertOp.setDocuments(std::move(docs)); + insertOp.setDocuments(std::move(docsToInsert)); insertOp.setWriteCommandRequestBase([] { write_ops::WriteCommandRequestBase wcb; wcb.setOrdered(true); @@ -585,22 +554,18 @@ void TenantCollectionCloner::insertDocumentsCallback( // Set the recipient info on the opCtx to skip checking user permissions in // 'write_ops_exec::performInserts()'. - tenantMigrationInfo(cbd.opCtx) = + tenantMigrationInfo(opCtx) = boost::make_optional<TenantMigrationInfo>(getSharedData()->getMigrationId()); // write_ops_exec::PerformInserts() will handle limiting the batch size // that gets inserted in a single WUOW. - auto writeResult = write_ops_exec::performInserts(cbd.opCtx, insertOp); + auto writeResult = write_ops_exec::performInserts(opCtx, insertOp); invariant(!writeResult.results.empty()); // Since the writes are ordered, it's ok to check just the last writeOp result. uassertStatusOKWithContext(writeResult.results.back(), "Tenant collection cloner: insert documents"); } -void TenantCollectionCloner::waitForDatabaseWorkToComplete() { - _dbWorkTaskRunner.join(); -} - bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const { auto nss = data["nss"].str(); return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data); |