summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_data_replication.cpp
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-05-12 18:20:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-12 18:58:31 +0000
commitdbde3c8b9995d744f38051f6c7fba59e83e03a86 (patch)
tree50e98aab0d3bb616905e4f4188a01992c5323844 /src/mongo/db/s/resharding/resharding_data_replication.cpp
parent270908b05b9f98709a85faffe624aaf175707ecc (diff)
downloadmongo-dbde3c8b9995d744f38051f6c7fba59e83e03a86.tar.gz
SERVER-56438 Add ReshardingDonorOplogIterator::dispose() method.
Guarantees that Pipeline underlying ReshardingDonorOplogIterator is always disposed by ReshardingOplogApplier, even upon cancellation.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_data_replication.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp12
1 files changed, 9 insertions, 3 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index 1a39281774e..dcb344b7502 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -288,7 +288,8 @@ SemiFuture<void> ReshardingDataReplication::runUntilStrictlyConsistent(
// Calling _runOplogAppliers() won't actually immediately start performing oplog application.
// Only after the _startOplogApplication promise is fulfilled will oplog application begin.
- auto oplogApplierFutures = _runOplogAppliers(executor, errorSource.token(), opCtxFactory);
+ auto oplogApplierFutures =
+ _runOplogAppliers(executor, cleanupExecutor, errorSource.token(), opCtxFactory);
// We must additionally wait for fulfillCloningDoneFuture to become ready to ensure their
// corresponding promises aren't being fulfilled while the .onCompletion() is running.
@@ -380,6 +381,7 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogFetchers
std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogAppliers(
std::shared_ptr<executor::TaskExecutor> executor,
+ std::shared_ptr<executor::TaskExecutor> cleanupExecutor,
CancellationToken cancelToken,
CancelableOperationContextFactory opCtxFactory) {
std::vector<SharedSemiFuture<void>> oplogApplierFutures;
@@ -391,8 +393,12 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogAppliers
oplogApplierFutures.emplace_back(
future_util::withCancellation(_startOplogApplication.getFuture(), cancelToken)
.thenRunOn(executor)
- .then([applier = applier.get(), executor, cancelToken, opCtxFactory] {
- return applier->run(executor, cancelToken, opCtxFactory);
+ .then([applier = applier.get(),
+ executor,
+ cleanupExecutor,
+ cancelToken,
+ opCtxFactory] {
+ return applier->run(executor, cleanupExecutor, cancelToken, opCtxFactory);
})
.share());
}