summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_collection_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp67
1 files changed, 16 insertions, 51 deletions
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index ab3b0508510..834fb288f47 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -99,21 +99,6 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " tenant collection clone progress"),
- _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) {
- auto task = [ this, work = std::move(work) ](
- OperationContext * opCtx,
- const Status& status) mutable noexcept->TaskRunner::NextAction {
- try {
- work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
- } catch (const DBException& e) {
- setSyncFailedStatus(e.toStatus());
- }
- return TaskRunner::NextAction::kDisposeOperationContext;
- };
- _dbWorkTaskRunner.schedule(std::move(task));
- return executor::TaskExecutor::CallbackHandle();
- }),
- _dbWorkTaskRunner(dbPool),
_tenantId(tenantId) {
invariant(sourceNss.isValid());
invariant(ClonerUtils::isNamespaceForTenant(sourceNss, tenantId));
@@ -150,10 +135,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerSta
"namespace"_attr = getCloner()->getSourceNss(),
"uuid"_attr = getCloner()->getSourceUuid(),
"tenantId"_attr = getCloner()->getTenantId());
- getCloner()->waitForDatabaseWorkToComplete();
return kSkipRemainingStages;
} catch (const DBException&) {
- getCloner()->waitForDatabaseWorkToComplete();
throw;
}
}
@@ -469,7 +452,6 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() {
ScopedMetadataWriterAndReader mwr(getClient(), requestMetadataWriter, replyMetadataReader);
runQuery();
- waitForDatabaseWorkToComplete();
return kContinueNormally;
}
@@ -505,24 +487,16 @@ void TenantCollectionCloner::runQuery() {
}
void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) {
+ std::vector<BSONObj> docsToInsert;
{
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
while (cursor.moreInCurrentBatch()) {
- _documentsToInsert.emplace_back(cursor.nextSafe());
+ docsToInsert.emplace_back(cursor.nextSafe());
}
}
- // Schedule the next document batch insertion.
- auto&& scheduleResult = _scheduleDbWorkFn(
- [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); });
-
- if (!scheduleResult.isOK()) {
- Status newStatus = scheduleResult.getStatus().withContext(
- str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'");
- // We must throw an exception to terminate query.
- uassertStatusOK(newStatus);
- }
+ insertDocuments(std::move(docsToInsert));
tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
@@ -546,37 +520,32 @@ void TenantCollectionCloner::handleNextBatch(DBClientCursor& cursor) {
}
-void TenantCollectionCloner::insertDocumentsCallback(
- const executor::TaskExecutor::CallbackArgs& cbd) {
- uassertStatusOK(cbd.status);
- std::vector<BSONObj> docs;
+void TenantCollectionCloner::insertDocuments(std::vector<BSONObj> docsToInsert) {
+ invariant(docsToInsert.size(),
+ "Document size can't be non-zero:: namespace: {}, tenantId: {}"_format(
+ _sourceNss.toString(), _tenantId));
{
stdx::lock_guard<Latch> lk(_mutex);
- if (_documentsToInsert.size() == 0) {
- LOGV2_WARNING(4884507,
- "insertDocumentsCallback, but no documents to insert",
- "namespace"_attr = _sourceNss,
- "tenantId"_attr = _tenantId);
- return;
- }
- _documentsToInsert.swap(docs);
- _stats.documentsCopied += docs.size();
+ _stats.documentsCopied += docsToInsert.size();
_stats.approxTotalBytesCopied = ((long)_stats.documentsCopied) * _stats.avgObjSize;
++_stats.insertedBatches;
- _progressMeter.hit(int(docs.size()));
+ _progressMeter.hit(int(docsToInsert.size()));
}
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
// Disabling the internal document validation for inserts on recipient side as those
// validations should have already been performed on donor's primary during tenant
// collection document insertion.
DisableDocumentValidation documentValidationDisabler(
- cbd.opCtx,
+ opCtx,
DocumentValidationSettings::kDisableSchemaValidation |
DocumentValidationSettings::kDisableInternalValidation);
write_ops::InsertCommandRequest insertOp(_existingNss.value_or(_sourceNss));
- insertOp.setDocuments(std::move(docs));
+ insertOp.setDocuments(std::move(docsToInsert));
insertOp.setWriteCommandRequestBase([] {
write_ops::WriteCommandRequestBase wcb;
wcb.setOrdered(true);
@@ -585,22 +554,18 @@ void TenantCollectionCloner::insertDocumentsCallback(
// Set the recipient info on the opCtx to skip checking user permissions in
// 'write_ops_exec::performInserts()'.
- tenantMigrationInfo(cbd.opCtx) =
+ tenantMigrationInfo(opCtx) =
boost::make_optional<TenantMigrationInfo>(getSharedData()->getMigrationId());
// write_ops_exec::PerformInserts() will handle limiting the batch size
// that gets inserted in a single WUOW.
- auto writeResult = write_ops_exec::performInserts(cbd.opCtx, insertOp);
+ auto writeResult = write_ops_exec::performInserts(opCtx, insertOp);
invariant(!writeResult.results.empty());
// Since the writes are ordered, it's ok to check just the last writeOp result.
uassertStatusOKWithContext(writeResult.results.back(),
"Tenant collection cloner: insert documents");
}
-void TenantCollectionCloner::waitForDatabaseWorkToComplete() {
- _dbWorkTaskRunner.join();
-}
-
bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const {
auto nss = data["nss"].str();
return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data);