diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2020-12-24 12:13:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-24 12:40:28 +0000 |
commit | 21418579095fa5ff44a851c2feb62ea4773ac3a6 (patch) | |
tree | 14dd7ba3bd5954e18c6371991fafdca04f1fcd3d /src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp | |
parent | ac9e2bcb4346a4aa5347e070d974f9bd7ab7c57d (diff) | |
download | mongo-21418579095fa5ff44a851c2feb62ea4773ac3a6.tar.gz |
SERVER-53108 Move batching logic into ReshardingDonorOplogIterator.
Raises the default value of the reshardingBatchLimitOperations server
parameter to 5,000 oplog entries to match that of the
replBatchLimitOperations server parameter.
Also introduces a reshardingBatchLimitBytes server parameter.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp | 91 |
1 files changed, 55 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 3c185d49858..f1860457ef4 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -102,25 +102,30 @@ ReshardingOplogFetcher::~ReshardingOplogFetcher() { } Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& lastSeen) { - // `lastSeen` is the _id of the document ReshardingDonorOplogIterator::getNext() has last read - // from the oplog buffer collection. + // `lastSeen` is the _id of the document ReshardingDonorOplogIterator::getNextBatch() has last + // read from the oplog buffer collection. // // `_startAt` is updated after each insert into the oplog buffer collection by // ReshardingOplogFetcher to reflect the newer resume point if a new aggregation request was // being issued. stdx::lock_guard lk(_mutex); + if (_interruptStatus) { + return Future<void>::makeReady(*_interruptStatus); + } + if (lastSeen < _startAt) { // `lastSeen < _startAt` means there's at least one document which has been inserted by // ReshardingOplogFetcher and hasn't been returned by - // ReshardingDonorOplogIterator::getNext(). The caller has no reason to wait until yet + // ReshardingDonorOplogIterator::getNextBatch(). The caller has no reason to wait until yet // another document has been inserted before reading from the oplog buffer collection. return Future<void>::makeReady(); } // `lastSeen == _startAt` means the last document inserted by ReshardingOplogFetcher has already - // been returned by ReshardingDonorOplogIterator::getNext() and so ReshardingDonorOplogIterator - // would want to wait until ReshardingOplogFetcher does another insert. + // been returned by ReshardingDonorOplogIterator::getNextBatch() and so + // ReshardingDonorOplogIterator would want to wait until ReshardingOplogFetcher does another + // insert. // // `lastSeen > _startAt` isn't expected to happen in practice because // ReshardingDonorOplogIterator only uses _id's from documents that it actually read from the @@ -128,42 +133,56 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l return std::move(_onInsertFuture); } -Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { - auto pf = makePromiseFuture<void>(); - _fetchedFinishPromise = std::move(pf.promise); +void ReshardingOplogFetcher::interrupt(Status status) { + invariant(!status.isOK()); - _reschedule(executor); - - return std::move(pf.future); + // We replace the promise/future pair with a fresh one because consume() won't know an error has + // already been set and would otherwise attempt to fulfill the promise again. Later calls to + // awaitInsert() won't ever look at `_onInsertFuture` though. + auto [p, f] = makePromiseFuture<void>(); + stdx::lock_guard lk(_mutex); + _interruptStatus = status; + _onInsertPromise.setError(*_interruptStatus); + _onInsertPromise = std::move(p); + _onInsertFuture = std::move(f); } -void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) { - executor->schedule([this, executor](Status status) { - // The callback function is invoked in the execution context of the calling code when - // OutOfLineExecutor::schedule() is called with an error post-shutdown. This means that the - // ThreadClient in the outer context is still alive on the stack. We therefore delay - // constructing a new ThreadClient until after checking the status. - if (!status.isOK()) { - LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status); - _fetchedFinishPromise.setError(status); - return; - } - - ThreadClient client( - fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()), - getGlobalServiceContext()); +ExecutorFuture<void> ReshardingOplogFetcher::schedule( + std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) { + return ExecutorFuture(executor) + .then( + [this, executor, cancelToken] { return _reschedule(std::move(executor), cancelToken); }) + .onError([](Status status) { + LOGV2_INFO(5192101, "Resharding oplog fetcher aborting", "reason"_attr = status); + return status; + }); +} - try { - if (iterate(client.get())) { - _reschedule(executor); - } else { - _fetchedFinishPromise.emplaceValue(); +ExecutorFuture<void> ReshardingOplogFetcher::_reschedule( + std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) { + return ExecutorFuture(executor) + .then([this, executor, cancelToken] { + ThreadClient client(fmt::format("OplogFetcher-{}-{}", + _reshardingUUID.toString(), + _donorShard.toString()), + getGlobalServiceContext()); + + return iterate(client.get()); + }) + .then([executor, cancelToken](bool moreToCome) { + // Wait a little before re-running the aggregation pipeline on the donor's oplog. The + // 1-second value was chosen to match the default awaitData timeout that would have been + // used if the aggregation cursor was TailableModeEnum::kTailableAndAwaitData. + return executor->sleepFor(Seconds{1}, cancelToken).then([moreToCome] { + return moreToCome; + }); + }) + .then([this, executor, cancelToken](bool moreToCome) { + if (!moreToCome) { + return ExecutorFuture(std::move(executor)); } - } catch (...) { - LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus()); - _fetchedFinishPromise.setError(exceptionToStatus()); - } - }); + return _reschedule(std::move(executor), cancelToken); + }); } bool ReshardingOplogFetcher::iterate(Client* client) { |