summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2020-12-24 12:13:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-24 12:40:28 +0000
commit21418579095fa5ff44a851c2feb62ea4773ac3a6 (patch)
tree14dd7ba3bd5954e18c6371991fafdca04f1fcd3d /src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
parentac9e2bcb4346a4aa5347e070d974f9bd7ab7c57d (diff)
downloadmongo-21418579095fa5ff44a851c2feb62ea4773ac3a6.tar.gz
SERVER-53108 Move batching logic into ReshardingDonorOplogIterator.
Raises the default value of the reshardingBatchLimitOperations server parameter to 5,000 oplog entries to match that of the replBatchLimitOperations server parameter. Also introduces a reshardingBatchLimitBytes server parameter.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp91
1 files changed, 55 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 3c185d49858..f1860457ef4 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -102,25 +102,30 @@ ReshardingOplogFetcher::~ReshardingOplogFetcher() {
}
Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& lastSeen) {
- // `lastSeen` is the _id of the document ReshardingDonorOplogIterator::getNext() has last read
- // from the oplog buffer collection.
+ // `lastSeen` is the _id of the document ReshardingDonorOplogIterator::getNextBatch() has last
+ // read from the oplog buffer collection.
//
// `_startAt` is updated after each insert into the oplog buffer collection by
// ReshardingOplogFetcher to reflect the newer resume point if a new aggregation request was
// being issued.
stdx::lock_guard lk(_mutex);
+ if (_interruptStatus) {
+ return Future<void>::makeReady(*_interruptStatus);
+ }
+
if (lastSeen < _startAt) {
// `lastSeen < _startAt` means there's at least one document which has been inserted by
// ReshardingOplogFetcher and hasn't been returned by
- // ReshardingDonorOplogIterator::getNext(). The caller has no reason to wait until yet
+ // ReshardingDonorOplogIterator::getNextBatch(). The caller has no reason to wait until yet
// another document has been inserted before reading from the oplog buffer collection.
return Future<void>::makeReady();
}
// `lastSeen == _startAt` means the last document inserted by ReshardingOplogFetcher has already
- // been returned by ReshardingDonorOplogIterator::getNext() and so ReshardingDonorOplogIterator
- // would want to wait until ReshardingOplogFetcher does another insert.
+ // been returned by ReshardingDonorOplogIterator::getNextBatch() and so
+ // ReshardingDonorOplogIterator would want to wait until ReshardingOplogFetcher does another
+ // insert.
//
// `lastSeen > _startAt` isn't expected to happen in practice because
// ReshardingDonorOplogIterator only uses _id's from documents that it actually read from the
@@ -128,42 +133,56 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l
return std::move(_onInsertFuture);
}
-Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) {
- auto pf = makePromiseFuture<void>();
- _fetchedFinishPromise = std::move(pf.promise);
+void ReshardingOplogFetcher::interrupt(Status status) {
+ invariant(!status.isOK());
- _reschedule(executor);
-
- return std::move(pf.future);
+ // We replace the promise/future pair with a fresh one because consume() won't know an error has
+ // already been set and would otherwise attempt to fulfill the promise again. Later calls to
+ // awaitInsert() won't ever look at `_onInsertFuture` though.
+ auto [p, f] = makePromiseFuture<void>();
+ stdx::lock_guard lk(_mutex);
+ _interruptStatus = status;
+ _onInsertPromise.setError(*_interruptStatus);
+ _onInsertPromise = std::move(p);
+ _onInsertFuture = std::move(f);
}
-void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) {
- executor->schedule([this, executor](Status status) {
- // The callback function is invoked in the execution context of the calling code when
- // OutOfLineExecutor::schedule() is called with an error post-shutdown. This means that the
- // ThreadClient in the outer context is still alive on the stack. We therefore delay
- // constructing a new ThreadClient until after checking the status.
- if (!status.isOK()) {
- LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status);
- _fetchedFinishPromise.setError(status);
- return;
- }
-
- ThreadClient client(
- fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()),
- getGlobalServiceContext());
+ExecutorFuture<void> ReshardingOplogFetcher::schedule(
+ std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) {
+ return ExecutorFuture(executor)
+ .then(
+ [this, executor, cancelToken] { return _reschedule(std::move(executor), cancelToken); })
+ .onError([](Status status) {
+ LOGV2_INFO(5192101, "Resharding oplog fetcher aborting", "reason"_attr = status);
+ return status;
+ });
+}
- try {
- if (iterate(client.get())) {
- _reschedule(executor);
- } else {
- _fetchedFinishPromise.emplaceValue();
+ExecutorFuture<void> ReshardingOplogFetcher::_reschedule(
+ std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) {
+ return ExecutorFuture(executor)
+ .then([this, executor, cancelToken] {
+ ThreadClient client(fmt::format("OplogFetcher-{}-{}",
+ _reshardingUUID.toString(),
+ _donorShard.toString()),
+ getGlobalServiceContext());
+
+ return iterate(client.get());
+ })
+ .then([executor, cancelToken](bool moreToCome) {
+ // Wait a little before re-running the aggregation pipeline on the donor's oplog. The
+ // 1-second value was chosen to match the default awaitData timeout that would have been
+ // used if the aggregation cursor was TailableModeEnum::kTailableAndAwaitData.
+ return executor->sleepFor(Seconds{1}, cancelToken).then([moreToCome] {
+ return moreToCome;
+ });
+ })
+ .then([this, executor, cancelToken](bool moreToCome) {
+ if (!moreToCome) {
+ return ExecutorFuture(std::move(executor));
}
- } catch (...) {
- LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus());
- _fetchedFinishPromise.setError(exceptionToStatus());
- }
- });
+ return _reschedule(std::move(executor), cancelToken);
+ });
}
bool ReshardingOplogFetcher::iterate(Client* client) {