diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner.cpp | 67 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner.h | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner_test.cpp | 82 |
3 files changed, 17 insertions, 161 deletions
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index ab3b0508510..834fb288f47 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -99,21 +99,6 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss, kProgressMeterCheckInterval, "documents copied", str::stream() << _sourceNss.toString() << " tenant collection clone progress"), - _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) { - auto task = [ this, work = std::move(work) ]( - OperationContext * opCtx, - const Status& status) mutable noexcept->TaskRunner::NextAction { - try { - work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); - } catch (const DBException& e) { - setSyncFailedStatus(e.toStatus()); - } - return TaskRunner::NextAction::kDisposeOperationContext; - }; - _dbWorkTaskRunner.schedule(std::move(task)); - return executor::TaskExecutor::CallbackHandle(); - }), - _dbWorkTaskRunner(dbPool), _tenantId(tenantId) { invariant(sourceNss.isValid()); invariant(ClonerUtils::isNamespaceForTenant(sourceNss, tenantId)); @@ -150,10 +135,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerSta "namespace"_attr = getCloner()->getSourceNss(), "uuid"_attr = getCloner()->getSourceUuid(), "tenantId"_attr = getCloner()->getTenantId()); - getCloner()->waitForDatabaseWorkToComplete(); return kSkipRemainingStages; } catch (const DBException&) { - getCloner()->waitForDatabaseWorkToComplete(); throw; } } @@ -469,7 +452,6 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() { ScopedMetadataWriterAndReader mwr(getClient(), requestMetadataWriter, replyMetadataReader); runQuery(); - waitForDatabaseWorkToComplete(); return kContinueNormally; } @@ -505,24 +487,16 @@ void TenantCollectionCloner::runQuery() { } void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) { + std::vector<BSONObj> docsToInsert; { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; while (cursor.moreInCurrentBatch()) { - _documentsToInsert.emplace_back(cursor.nextSafe()); + docsToInsert.emplace_back(cursor.nextSafe()); } } - // Schedule the next document batch insertion. - auto&& scheduleResult = _scheduleDbWorkFn( - [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); }); - - if (!scheduleResult.isOK()) { - Status newStatus = scheduleResult.getStatus().withContext( - str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'"); - // We must throw an exception to terminate query. - uassertStatusOK(newStatus); - } + insertDocuments(std::move(docsToInsert)); tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { @@ -546,37 +520,32 @@ void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) { } -void TenantCollectionCloner::insertDocumentsCallback( - const executor::TaskExecutor::CallbackArgs& cbd) { - uassertStatusOK(cbd.status); - std::vector<BSONObj> docs; +void TenantCollectionCloner::insertDocuments(std::vector<BSONObj> docsToInsert) { + invariant(docsToInsert.size(), + "Document size can't be non-zero:: namespace: {}, tenantId: {}"_format( + _sourceNss.toString(), _tenantId)); { stdx::lock_guard<Latch> lk(_mutex); - if (_documentsToInsert.size() == 0) { - LOGV2_WARNING(4884507, - "insertDocumentsCallback, but no documents to insert", - "namespace"_attr = _sourceNss, - "tenantId"_attr = _tenantId); - return; - } - _documentsToInsert.swap(docs); - _stats.documentsCopied += docs.size(); + _stats.documentsCopied += docsToInsert.size(); _stats.approxTotalBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize; ++_stats.insertedBatches; - _progressMeter.hit(int(docs.size())); + _progressMeter.hit(int(docsToInsert.size())); } + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + // Disabling the internal document validation for inserts on recipient side as those // validations should have already been performed on donor's primary during tenant // collection document insertion. DisableDocumentValidation documentValidationDisabler( - cbd.opCtx, + opCtx, DocumentValidationSettings::kDisableSchemaValidation | DocumentValidationSettings::kDisableInternalValidation); write_ops::InsertCommandRequest insertOp(_existingNss.value_or(_sourceNss)); - insertOp.setDocuments(std::move(docs)); + insertOp.setDocuments(std::move(docsToInsert)); insertOp.setWriteCommandRequestBase([] { write_ops::WriteCommandRequestBase wcb; wcb.setOrdered(true); @@ -585,22 +554,18 @@ void TenantCollectionCloner::insertDocumentsCallback( // Set the recipient info on the opCtx to skip checking user permissions in // 'write_ops_exec::performInserts()'. - tenantMigrationInfo(cbd.opCtx) = + tenantMigrationInfo(opCtx) = boost::make_optional<TenantMigrationInfo>(getSharedData()->getMigrationId()); // write_ops_exec::PerformInserts() will handle limiting the batch size // that gets inserted in a single WUOW. - auto writeResult = write_ops_exec::performInserts(cbd.opCtx, insertOp); + auto writeResult = write_ops_exec::performInserts(opCtx, insertOp); invariant(!writeResult.results.empty()); // Since the writes are ordered, it's ok to check just the last writeOp result. uassertStatusOKWithContext(writeResult.results.back(), "Tenant collection cloner: insert documents"); } -void TenantCollectionCloner::waitForDatabaseWorkToComplete() { - _dbWorkTaskRunner.join(); -} - bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const { auto nss = data["nss"].str(); return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data); diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h index 12bd9bbb832..081ca271fc5 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.h +++ b/src/mongo/db/repl/tenant_collection_cloner.h @@ -65,14 +65,6 @@ public: void append(BSONObjBuilder* builder) const; }; - /** - * Type of function to schedule storage interface tasks with the executor. - * - * Used for testing only. - */ - using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>( - executor::TaskExecutor::CallbackFn)>; - TenantCollectionCloner(const NamespaceString& ns, const CollectionOptions& collectionOptions, TenantMigrationSharedData* sharedData, @@ -108,15 +100,6 @@ public: _collectionClonerBatchSize = batchSize; } - /** - * Overrides how executor schedules database work. - * - * For testing only. - */ - void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) { - _scheduleDbWorkFn = std::move(scheduleDbWorkFn); - } - Timestamp getOperationTime_forTest(); protected: @@ -217,18 +200,13 @@ private: /** * Called whenever there is a new batch of documents ready from the DBClientConnection. */ - void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd); + void insertDocuments(std::vector<BSONObj> docsToInsert); /** * Sends a query command to the source. */ void runQuery(); - /** - * Waits for any database work to finish or fail. - */ - void waitForDatabaseWorkToComplete(); - // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // @@ -258,14 +236,9 @@ private: BSONObj _idIndexSpec; // (X) BSONObj _lastDocId; // (X) - // Function for scheduling database work using the executor. - ScheduleDbWorkFn _scheduleDbWorkFn; // (R) // Documents read from source to insert. std::vector<BSONObj> _documentsToInsert; // (M) Stats _stats; // (M) - // We put _dbWorkTaskRunner after anything the database threads depend on to ensure it is - // only destroyed after those threads exit. - TaskRunner _dbWorkTaskRunner; // (R) // The database name prefix of the tenant associated with this migration. std::string _tenantId; // (R) diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp index b98c6f129e6..376c2aa2acf 100644 --- a/src/mongo/db/repl/tenant_collection_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp @@ -462,88 +462,6 @@ TEST_F(TenantCollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_EQUALS(3u, stats.receivedBatches); } -TEST_F(TenantCollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { - // Set up data for preliminary stages - _mockServer->setCommandReply("count", createCountResponse(3)); - _mockServer->setCommandReply("listIndexes", - createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); - _mockServer->setCommandReply("find", createFindResponse()); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - // Stop before running the query to set up the failure. - auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEntered = collClonerBeforeFailPoint->setMode( - FailPoint::alwaysOn, - 0, - fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}")); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - ASSERT_EQUALS(ErrorCodes::UnknownError, cloner->run()); - }); - // Wait for the failpoint to be reached - collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); - // Replace scheduleDbWork function so that cloner will fail to schedule DB work after - // getting documents. - cloner->setScheduleDbWorkFn_forTest([](const executor::TaskExecutor::CallbackFn& workFn) { - return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); - }); - - // Continue and finish. Final status is checked in the thread. - collClonerBeforeFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); -} - -TEST_F(TenantCollectionClonerTest, InsertDocumentsCallbackCanceled) { - // Set up data for preliminary stages - _mockServer->setCommandReply("count", createCountResponse(3)); - _mockServer->setCommandReply("listIndexes", - createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); - _mockServer->setCommandReply("find", createFindResponse()); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - // Stop before running the query to set up the failure. - auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEntered = collClonerBeforeFailPoint->setMode( - FailPoint::alwaysOn, - 0, - fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}")); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, cloner->run()); - }); - // Wait for the failpoint to be reached - collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); - // Replace scheduleDbWork function so that cloner will fail to schedule DB work after - // getting documents. - cloner->setScheduleDbWorkFn_forTest([&](const executor::TaskExecutor::CallbackFn& workFn) { - executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); - mongo::executor::TaskExecutor::CallbackArgs args{ - nullptr, - handle, - {ErrorCodes::CallbackCanceled, "Never run, but treat like cancelled."}}; - workFn(args); - return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); - }); - - // Continue and finish. Final status is checked in the thread. - collClonerBeforeFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); -} - TEST_F(TenantCollectionClonerTest, InsertDocumentsFailed) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(3)); |