diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-05-12 18:20:15 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-12 18:58:31 +0000 |
commit | dbde3c8b9995d744f38051f6c7fba59e83e03a86 (patch) | |
tree | 50e98aab0d3bb616905e4f4188a01992c5323844 /src/mongo/db/s/resharding/resharding_data_replication.cpp | |
parent | 270908b05b9f98709a85faffe624aaf175707ecc (diff) | |
download | mongo-dbde3c8b9995d744f38051f6c7fba59e83e03a86.tar.gz |
SERVER-56438 Add ReshardingDonorOplogIterator::dispose() method.
Guarantees that Pipeline underlying ReshardingDonorOplogIterator is
always disposed by ReshardingOplogApplier, even upon cancellation.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_data_replication.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_data_replication.cpp | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index 1a39281774e..dcb344b7502 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -288,7 +288,8 @@ SemiFuture<void> ReshardingDataReplication::runUntilStrictlyConsistent( // Calling _runOplogAppliers() won't actually immediately start performing oplog application. // Only after the _startOplogApplication promise is fulfilled will oplog application begin. - auto oplogApplierFutures = _runOplogAppliers(executor, errorSource.token(), opCtxFactory); + auto oplogApplierFutures = + _runOplogAppliers(executor, cleanupExecutor, errorSource.token(), opCtxFactory); // We must additionally wait for fulfillCloningDoneFuture to become ready to ensure their // corresponding promises aren't being fulfilled while the .onCompletion() is running. @@ -380,6 +381,7 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogFetchers std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogAppliers( std::shared_ptr<executor::TaskExecutor> executor, + std::shared_ptr<executor::TaskExecutor> cleanupExecutor, CancellationToken cancelToken, CancelableOperationContextFactory opCtxFactory) { std::vector<SharedSemiFuture<void>> oplogApplierFutures; @@ -391,8 +393,12 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runOplogAppliers oplogApplierFutures.emplace_back( future_util::withCancellation(_startOplogApplication.getFuture(), cancelToken) .thenRunOn(executor) - .then([applier = applier.get(), executor, cancelToken, opCtxFactory] { - return applier->run(executor, cancelToken, opCtxFactory); + .then([applier = applier.get(), + executor, + cleanupExecutor, + cancelToken, + opCtxFactory] { + return applier->run(executor, cleanupExecutor, cancelToken, opCtxFactory); }) .share()); } |