summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp67
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.h29
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner_test.cpp82
3 files changed, 17 insertions, 161 deletions
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index ab3b0508510..834fb288f47 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -99,21 +99,6 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " tenant collection clone progress"),
- _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) {
- auto task = [ this, work = std::move(work) ](
- OperationContext * opCtx,
- const Status& status) mutable noexcept->TaskRunner::NextAction {
- try {
- work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
- } catch (const DBException& e) {
- setSyncFailedStatus(e.toStatus());
- }
- return TaskRunner::NextAction::kDisposeOperationContext;
- };
- _dbWorkTaskRunner.schedule(std::move(task));
- return executor::TaskExecutor::CallbackHandle();
- }),
- _dbWorkTaskRunner(dbPool),
_tenantId(tenantId) {
invariant(sourceNss.isValid());
invariant(ClonerUtils::isNamespaceForTenant(sourceNss, tenantId));
@@ -150,10 +135,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerSta
"namespace"_attr = getCloner()->getSourceNss(),
"uuid"_attr = getCloner()->getSourceUuid(),
"tenantId"_attr = getCloner()->getTenantId());
- getCloner()->waitForDatabaseWorkToComplete();
return kSkipRemainingStages;
} catch (const DBException&) {
- getCloner()->waitForDatabaseWorkToComplete();
throw;
}
}
@@ -469,7 +452,6 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() {
ScopedMetadataWriterAndReader mwr(getClient(), requestMetadataWriter, replyMetadataReader);
runQuery();
- waitForDatabaseWorkToComplete();
return kContinueNormally;
}
@@ -505,24 +487,16 @@ void TenantCollectionCloner::runQuery() {
}
void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) {
+ std::vector<BSONObj> docsToInsert;
{
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
while (cursor.moreInCurrentBatch()) {
- _documentsToInsert.emplace_back(cursor.nextSafe());
+ docsToInsert.emplace_back(cursor.nextSafe());
}
}
- // Schedule the next document batch insertion.
- auto&& scheduleResult = _scheduleDbWorkFn(
- [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); });
-
- if (!scheduleResult.isOK()) {
- Status newStatus = scheduleResult.getStatus().withContext(
- str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'");
- // We must throw an exception to terminate query.
- uassertStatusOK(newStatus);
- }
+ insertDocuments(std::move(docsToInsert));
tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
@@ -546,37 +520,32 @@ void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) {
}
-void TenantCollectionCloner::insertDocumentsCallback(
- const executor::TaskExecutor::CallbackArgs& cbd) {
- uassertStatusOK(cbd.status);
- std::vector<BSONObj> docs;
+void TenantCollectionCloner::insertDocuments(std::vector<BSONObj> docsToInsert) {
+ invariant(docsToInsert.size(),
+ "Document size can't be non-zero:: namespace: {}, tenantId: {}"_format(
+ _sourceNss.toString(), _tenantId));
{
stdx::lock_guard<Latch> lk(_mutex);
- if (_documentsToInsert.size() == 0) {
- LOGV2_WARNING(4884507,
- "insertDocumentsCallback, but no documents to insert",
- "namespace"_attr = _sourceNss,
- "tenantId"_attr = _tenantId);
- return;
- }
- _documentsToInsert.swap(docs);
- _stats.documentsCopied += docs.size();
+ _stats.documentsCopied += docsToInsert.size();
_stats.approxTotalBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize;
++_stats.insertedBatches;
- _progressMeter.hit(int(docs.size()));
+ _progressMeter.hit(int(docsToInsert.size()));
}
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
// Disabling the internal document validation for inserts on recipient side as those
// validations should have already been performed on donor's primary during tenant
// collection document insertion.
DisableDocumentValidation documentValidationDisabler(
- cbd.opCtx,
+ opCtx,
DocumentValidationSettings::kDisableSchemaValidation |
DocumentValidationSettings::kDisableInternalValidation);
write_ops::InsertCommandRequest insertOp(_existingNss.value_or(_sourceNss));
- insertOp.setDocuments(std::move(docs));
+ insertOp.setDocuments(std::move(docsToInsert));
insertOp.setWriteCommandRequestBase([] {
write_ops::WriteCommandRequestBase wcb;
wcb.setOrdered(true);
@@ -585,22 +554,18 @@ void TenantCollectionCloner::insertDocumentsCallback(
// Set the recipient info on the opCtx to skip checking user permissions in
// 'write_ops_exec::performInserts()'.
- tenantMigrationInfo(cbd.opCtx) =
+ tenantMigrationInfo(opCtx) =
boost::make_optional<TenantMigrationInfo>(getSharedData()->getMigrationId());
// write_ops_exec::PerformInserts() will handle limiting the batch size
// that gets inserted in a single WUOW.
- auto writeResult = write_ops_exec::performInserts(cbd.opCtx, insertOp);
+ auto writeResult = write_ops_exec::performInserts(opCtx, insertOp);
invariant(!writeResult.results.empty());
// Since the writes are ordered, it's ok to check just the last writeOp result.
uassertStatusOKWithContext(writeResult.results.back(),
"Tenant collection cloner: insert documents");
}
-void TenantCollectionCloner::waitForDatabaseWorkToComplete() {
- _dbWorkTaskRunner.join();
-}
-
bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const {
auto nss = data["nss"].str();
return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data);
diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h
index 12bd9bbb832..081ca271fc5 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.h
+++ b/src/mongo/db/repl/tenant_collection_cloner.h
@@ -65,14 +65,6 @@ public:
void append(BSONObjBuilder* builder) const;
};
- /**
- * Type of function to schedule storage interface tasks with the executor.
- *
- * Used for testing only.
- */
- using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>(
- executor::TaskExecutor::CallbackFn)>;
-
TenantCollectionCloner(const NamespaceString& ns,
const CollectionOptions& collectionOptions,
TenantMigrationSharedData* sharedData,
@@ -108,15 +100,6 @@ public:
_collectionClonerBatchSize = batchSize;
}
- /**
- * Overrides how executor schedules database work.
- *
- * For testing only.
- */
- void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) {
- _scheduleDbWorkFn = std::move(scheduleDbWorkFn);
- }
-
Timestamp getOperationTime_forTest();
protected:
@@ -217,18 +200,13 @@ private:
/**
* Called whenever there is a new batch of documents ready from the DBClientConnection.
*/
- void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd);
+ void insertDocuments(std::vector<BSONObj> docsToInsert);
/**
* Sends a query command to the source.
*/
void runQuery();
- /**
- * Waits for any database work to finish or fail.
- */
- void waitForDatabaseWorkToComplete();
-
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
@@ -258,14 +236,9 @@ private:
BSONObj _idIndexSpec; // (X)
BSONObj _lastDocId; // (X)
- // Function for scheduling database work using the executor.
- ScheduleDbWorkFn _scheduleDbWorkFn; // (R)
// Documents read from source to insert.
std::vector<BSONObj> _documentsToInsert; // (M)
Stats _stats; // (M)
- // We put _dbWorkTaskRunner after anything the database threads depend on to ensure it is
- // only destroyed after those threads exit.
- TaskRunner _dbWorkTaskRunner; // (R)
// The database name prefix of the tenant associated with this migration.
std::string _tenantId; // (R)
diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
index b98c6f129e6..376c2aa2acf 100644
--- a/src/mongo/db/repl/tenant_collection_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
@@ -462,88 +462,6 @@ TEST_F(TenantCollectionClonerTest, InsertDocumentsMultipleBatches) {
ASSERT_EQUALS(3u, stats.receivedBatches);
}
-TEST_F(TenantCollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) {
- // Set up data for preliminary stages
- _mockServer->setCommandReply("count", createCountResponse(3));
- _mockServer->setCommandReply("listIndexes",
- createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
- _mockServer->setCommandReply("find", createFindResponse());
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
- // Stop before running the query to set up the failure.
- auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEntered = collClonerBeforeFailPoint->setMode(
- FailPoint::alwaysOn,
- 0,
- fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}"));
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- ASSERT_EQUALS(ErrorCodes::UnknownError, cloner->run());
- });
- // Wait for the failpoint to be reached
- collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
- // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
- // getting documents.
- cloner->setScheduleDbWorkFn_forTest([](const executor::TaskExecutor::CallbackFn& workFn) {
- return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
- });
-
- // Continue and finish. Final status is checked in the thread.
- collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-}
-
-TEST_F(TenantCollectionClonerTest, InsertDocumentsCallbackCanceled) {
- // Set up data for preliminary stages
- _mockServer->setCommandReply("count", createCountResponse(3));
- _mockServer->setCommandReply("listIndexes",
- createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
- _mockServer->setCommandReply("find", createFindResponse());
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
- // Stop before running the query to set up the failure.
- auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEntered = collClonerBeforeFailPoint->setMode(
- FailPoint::alwaysOn,
- 0,
- fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}"));
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, cloner->run());
- });
- // Wait for the failpoint to be reached
- collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
- // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
- // getting documents.
- cloner->setScheduleDbWorkFn_forTest([&](const executor::TaskExecutor::CallbackFn& workFn) {
- executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
- mongo::executor::TaskExecutor::CallbackArgs args{
- nullptr,
- handle,
- {ErrorCodes::CallbackCanceled, "Never run, but treat like cancelled."}};
- workFn(args);
- return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
- });
-
- // Continue and finish. Final status is checked in the thread.
- collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-}
-
TEST_F(TenantCollectionClonerTest, InsertDocumentsFailed) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(3));