summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2021-07-15 14:05:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-29 18:51:17 +0000
commit298d02f7432b84df2a955addc5bcdf6a366c6645 (patch)
treecd63849097905aee03c691c1fa400c6ba801b987 /src
parent68f6c01d0f0452a9aef96e2da6289ce40fac2e41 (diff)
downloadmongo-298d02f7432b84df2a955addc5bcdf6a366c6645.tar.gz
SERVER-49897 Insert no-op entries into oplog buffer collections for resharding so resuming is less wasteful
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/fetcher.cpp14
-rw-r--r--src/mongo/client/fetcher.h1
-rw-r--r--src/mongo/db/rs_local_client.cpp6
-rw-r--r--src/mongo/db/rs_local_client.h8
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp51
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp103
-rw-r--r--src/mongo/db/s/resharding_util.cpp1
-rw-r--r--src/mongo/db/s/resharding_util.h1
-rw-r--r--src/mongo/db/s/shard_local.cpp8
-rw-r--r--src/mongo/db/s/shard_local.h8
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp3
-rw-r--r--src/mongo/s/client/shard.h11
-rw-r--r--src/mongo/s/client/shard_remote.cpp7
-rw-r--r--src/mongo/s/client/shard_remote.h8
15 files changed, 234 insertions, 33 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index cf0f72cb113..5c77365f8ed 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -58,6 +58,7 @@ const char* kNamespaceFieldName = "ns";
const char* kFirstBatchFieldName = "firstBatch";
const char* kNextBatchFieldName = "nextBatch";
+const char* kPostBatchResumeTokenFieldName = "postBatchResumeToken";
/**
* Parses cursor response in command result for cursor ID, namespace and documents.
@@ -144,6 +145,19 @@ Status parseCursorResponse(const BSONObj& obj,
doc.shareOwnershipWith(obj);
}
+ BSONElement postBatchResumeToken = cursorObj.getField(kPostBatchResumeTokenFieldName);
+ if (!postBatchResumeToken.eoo()) {
+ if (postBatchResumeToken.type() != BSONType::Object) {
+ return Status(ErrorCodes::FailedToParse,
+ str::stream()
+ << "'" << kCursorFieldName << "." << kPostBatchResumeTokenFieldName
+ << "' field must be of type object " << obj);
+ }
+
+ batchData->otherFields.postBatchResumeToken.emplace(postBatchResumeToken.Obj().getOwned());
+ }
+
+
return Status::OK();
}
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 4ce57087c20..6e2c4a825e8 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -69,6 +69,7 @@ public:
Documents documents;
struct OtherFields {
BSONObj metadata;
+ boost::optional<BSONObj> postBatchResumeToken = boost::none;
} otherFields;
Microseconds elapsed = Microseconds(0);
bool first = false;
diff --git a/src/mongo/db/rs_local_client.cpp b/src/mongo/db/rs_local_client.cpp
index 738edc3becd..f4a668bed7c 100644
--- a/src/mongo/db/rs_local_client.cpp
+++ b/src/mongo/db/rs_local_client.cpp
@@ -159,7 +159,8 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce(
Status RSLocalClient::runAggregation(
OperationContext* opCtx,
const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback) {
DBDirectClient client(opCtx);
auto cursor = uassertStatusOKWithContext(
DBClientCursor::fromAggregationRequest(
@@ -174,7 +175,8 @@ Status RSLocalClient::runAggregation(
}
try {
- if (!callback(batchDocs)) {
+ // TODO SERVER-58938 pass DBClientCursor::_postBatchResumeToken to callback
+ if (!callback(batchDocs, boost::none)) {
break;
}
} catch (const DBException& ex) {
diff --git a/src/mongo/db/rs_local_client.h b/src/mongo/db/rs_local_client.h
index 8fb1dd544ca..46f523e0e37 100644
--- a/src/mongo/db/rs_local_client.h
+++ b/src/mongo/db/rs_local_client.h
@@ -70,9 +70,11 @@ public:
boost::optional<long long> limit,
const boost::optional<BSONObj>& hint = boost::none);
- Status runAggregation(OperationContext* opCtx,
- const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback);
+ Status runAggregation(
+ OperationContext* opCtx,
+ const AggregateCommandRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback);
private:
/**
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index e1ca728d8f0..f98dae2426f 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -135,6 +135,14 @@ public:
return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
}
+ repl::MutableOplogEntry makeProgressMarkOplogEntry(Timestamp ts) {
+ ReshardingDonorOplogId oplogId(ts, ts);
+ const BSONObj oField(BSON("msg"
+ << "Latest oplog ts from donor's cursor response"));
+ const BSONObj o2Field(BSON("type" << kReshardProgressMark));
+ return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
+ }
+
const NamespaceString& oplogNss() const {
return _oplogNss;
}
@@ -412,5 +420,34 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPostImageOplogEntry) {
ASSERT_TRUE(next.empty());
}
+TEST_F(ReshardingDonorOplogIterTest, BatchIncludesProgressMarkEntries) {
+ const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
+ const auto progressMarkOplog1 = makeProgressMarkOplogEntry(Timestamp(15, 3));
+ const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
+
+ DBDirectClient client(operationContext());
+ const auto ns = oplogNss().ns();
+ client.insert(ns, oplog1.toBSON());
+ client.insert(ns, progressMarkOplog1.toBSON());
+ client.insert(ns, finalOplog.toBSON());
+
+ ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
+ auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
+ auto altClient = makeKillableClient();
+ AlternativeClientRegion acr(altClient);
+
+ auto next = getNextBatch(&iter, executor, factory);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
+
+ next = getNextBatch(&iter, executor, factory);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(progressMarkOplog1), getId(next[0]));
+
+ next = getNextBatch(&iter, executor, factory);
+ ASSERT_TRUE(next.empty());
+}
+
} // anonymous namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 434e5c9d0a0..6749ef949cf 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -107,8 +107,8 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l
//
// `_startAt` is updated after each insert into the oplog buffer collection by
// ReshardingOplogFetcher to reflect the newer resume point if a new aggregation request was
- // being issued.
-
+ // being issued. It is also updated with the latest oplog timestamp from donor's cursor response
+ // after we finish inserting the entire batch.
stdx::lock_guard lk(_mutex);
if (lastSeen < _startAt) {
// `lastSeen < _startAt` means there's at least one document which has been inserted by
@@ -289,7 +289,9 @@ bool ReshardingOplogFetcher::consume(Client* client,
uassertStatusOK(shard->runAggregation(
opCtxRaii.get(),
aggRequest,
- [this, &batchesProcessed, &moreToCome, factory](const std::vector<BSONObj>& batch) {
+ [this, &batchesProcessed, &moreToCome, factory](
+ const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken) {
ThreadClient client(fmt::format("ReshardingFetcher-{}-{}",
_reshardingUUID.toString(),
_donorShard.toString()),
@@ -332,6 +334,49 @@ bool ReshardingOplogFetcher::consume(Client* client,
}
}
+ if (postBatchResumeToken) {
+ // Insert a noop entry with the latest oplog timestamp from the donor's cursor
+ // response. This will allow the fetcher to resume reading from the last oplog entry
+ // it fetched even if that entry is for a different collection, making resuming less
+ // wasteful.
+ try {
+ auto lastOplogTs = postBatchResumeToken->getField("ts").timestamp();
+ auto startAt = ReshardingDonorOplogId(lastOplogTs, lastOplogTs);
+
+ WriteUnitOfWork wuow(opCtx);
+
+ repl::MutableOplogEntry oplog;
+ oplog.setNss(_toWriteInto);
+ oplog.setOpType(repl::OpTypeEnum::kNoop);
+ oplog.setUuid(_collUUID);
+ oplog.set_id(Value(startAt.toBSON()));
+ oplog.setObject(BSON("msg"
+ << "Latest oplog ts from donor's cursor response"));
+ oplog.setObject2(BSON("type" << kReshardProgressMark));
+ oplog.setOpTime(OplogSlot());
+ oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
+
+ uassertStatusOK(
+ toWriteTo->insertDocument(opCtx, InsertStatement{oplog.toBSON()}, nullptr));
+ wuow.commit();
+
+ _env->metrics()->onOplogEntriesFetched(1);
+
+ auto [p, f] = makePromiseFuture<void>();
+ {
+ stdx::lock_guard lk(_mutex);
+ _startAt = startAt;
+ _onInsertPromise.emplaceValue();
+ _onInsertPromise = std::move(p);
+ _onInsertFuture = std::move(f);
+ }
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
+ // It's possible that the donor shard has not generated new oplog entries since
+ // the previous getMore. In this case the latest oplog timestamp the donor
+ // returns will be the same, so it's safe to ignore this error.
+ }
+ }
+
if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) {
return false;
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 40ce2dcf21e..559d9cbe630 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -478,8 +478,8 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
auto hasSeenStartAtFuture = fetcher.awaitInsert(startAt);
ASSERT_FALSE(hasSeenStartAtFuture.isReady());
- // iterate() won't lead to any documents being inserted into the output collection (because no
- // writes have happened to the data collection) so `hasSeenStartAtFuture` still won't be ready.
+ // Because no writes have happened to the data collection, the fetcher will insert a no-op entry
+ // with the latestOplogTimestamp, so `hasSeenStartAtFuture` will be ready.
auto fetcherJob = launchAsync([&, this] {
ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
fetcher.useReadConcernForTest(false);
@@ -487,12 +487,12 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
auto factory = makeCancelableOpCtx();
return fetcher.iterate(&cc(), factory);
});
+
ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
- ASSERT_FALSE(hasSeenStartAtFuture.isReady());
+ ASSERT_TRUE(hasSeenStartAtFuture.isReady());
// Insert a document into the data collection and have it generate an oplog entry with a
- // "destinedRecipient" field. Only after iterate() is called again and inserts a record into the
- // output collection will `hasSeenStartAtFuture` have become ready.
+ // "destinedRecipient" field.
auto dataWriteTimestamp = [&] {
FailPointEnableBlock fp("addDestinedRecipient",
BSON("destinedRecipient" << _destinationShard.toString()));
@@ -505,7 +505,6 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
}();
- ASSERT_FALSE(hasSeenStartAtFuture.isReady());
fetcherJob = launchAsync([&, this] {
ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
@@ -521,9 +520,99 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) {
ASSERT_TRUE(fetcher.awaitInsert(startAt).isReady());
// However, asking for `dataWriteTimestamp` wouldn't become ready until the next record is
- // insert into the output collection.
+ // inserted into the output collection.
ASSERT_FALSE(fetcher.awaitInsert({dataWriteTimestamp, dataWriteTimestamp}).isReady());
}
+TEST_F(ReshardingOplogFetcherTest, TestStartAtUpdatedWithProgressMarkOplogTs) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+ const NamespaceString otherCollection("dbtests.collectionNotBeingResharded");
+
+ create(outputCollectionNss);
+ create(dataCollectionNss);
+ create(otherCollection);
+ _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+
+ const auto& collectionUUID = [&] {
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ return dataColl->uuid();
+ }();
+
+ ReshardingDonorOplogId startAt{_fetchTimestamp, _fetchTimestamp};
+ ReshardingOplogFetcher fetcher(makeFetcherEnv(),
+ _reshardingUUID,
+ collectionUUID,
+ startAt,
+ _donorShard,
+ _destinationShard,
+ outputCollectionNss);
+
+ // Insert a document into the data collection and have it generate an oplog entry with a
+ // "destinedRecipient" field.
+ auto writeToDataCollectionTs = [&] {
+ FailPointEnableBlock fp("addDestinedRecipient",
+ BSON("destinedRecipient" << _destinationShard.toString()));
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1)));
+ wuow.commit();
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+ }();
+
+ auto fetcherJob = launchAsync([&, this] {
+ ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+ auto factory = makeCancelableOpCtx();
+ return fetcher.iterate(&cc(), factory);
+ });
+ ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
+
+ // The fetcher's lastSeenTimestamp should be equal to `writeToDataCollectionTs`.
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToDataCollectionTs);
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToDataCollectionTs);
+ ASSERT_EQ(1, metricsFetchedCount()) << " Verify reported metrics";
+
+ // Now, insert a document into a different collection that is not involved in resharding.
+ auto writeToOtherCollectionTs = [&] {
+ AutoGetCollection dataColl(_opCtx, otherCollection, LockMode::MODE_IX);
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1)));
+ wuow.commit();
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+ }();
+
+ fetcherJob = launchAsync([&, this] {
+ ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+ auto factory = makeCancelableOpCtx();
+ return fetcher.iterate(&cc(), factory);
+ });
+ ASSERT_TRUE(requestPassthroughHandler(fetcherJob));
+
+ // The fetcher's lastSeenTimestamp should now be equal to `writeToOtherCollectionTs`
+ // because the lastSeenTimestamp will be updated with the latest oplog timestamp from the
+ // donor's cursor response.
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToOtherCollectionTs);
+ ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToOtherCollectionTs);
+ ASSERT_EQ(2, metricsFetchedCount()) << " Verify reported metrics";
+
+ // The last document returned by ReshardingDonorOplogIterator::getNextBatch() would be
+ // `writeToDataCollectionTs`, but ReshardingOplogFetcher would have inserted a doc with
+ // `writeToOtherCollectionTs` after this so `awaitInsert` should be immediately ready when
+ // passed `writeToDataCollectionTs`.
+ ASSERT_TRUE(fetcher.awaitInsert({writeToDataCollectionTs, writeToDataCollectionTs}).isReady());
+
+ // `awaitInsert` should not be ready if passed `writeToOtherCollectionTs`.
+ ASSERT_FALSE(
+ fetcher.awaitInsert({writeToOtherCollectionTs, writeToOtherCollectionTs}).isReady());
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 5856e4c1f0f..5aa9473991a 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -382,7 +382,6 @@ bool isFinalOplog(const repl::OplogEntry& oplog, UUID reshardingUUID) {
reshardingUUID;
}
-
NamespaceString getLocalOplogBufferNamespace(UUID existingUUID, ShardId donorShardId) {
return NamespaceString("config.localReshardingOplogBuffer.{}.{}"_format(
existingUUID.toString(), donorShardId.toString()));
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 48a626272b4..0e8646a0f88 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -52,6 +52,7 @@
namespace mongo {
constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd;
+constexpr auto kReshardProgressMark = "reshardProgressMark"_sd;
static const auto kReshardErrorMaxBytes = 2000;
/**
diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp
index 8edbdcd95ef..aad3d286b07 100644
--- a/src/mongo/db/s/shard_local.cpp
+++ b/src/mongo/db/s/shard_local.cpp
@@ -212,9 +212,11 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-Status ShardLocal::runAggregation(OperationContext* opCtx,
- const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+Status ShardLocal::runAggregation(
+ OperationContext* opCtx,
+ const AggregateCommandRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback) {
return _rsLocalClient.runAggregation(opCtx, aggRequest, callback);
}
diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h
index a9ec0d82a22..ed88dfeb10c 100644
--- a/src/mongo/db/s/shard_local.h
+++ b/src/mongo/db/s/shard_local.h
@@ -70,9 +70,11 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) override;
- Status runAggregation(OperationContext* opCtx,
- const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback);
+ Status runAggregation(
+ OperationContext* opCtx,
+ const AggregateCommandRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback);
private:
StatusWith<Shard::CommandResponse> _runCommand(OperationContext* opCtx,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index b84adc3319e..7b0c7cc57ad 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -738,7 +738,8 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get
// Run the aggregation
std::vector<BSONObj> aggResult;
- auto callback = [&aggResult](const std::vector<BSONObj>& batch) {
+ auto callback = [&aggResult](const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken) {
aggResult.insert(aggResult.end(),
std::make_move_iterator(batch.begin()),
std::make_move_iterator(batch.end()));
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index be61ec54877..9c28d5139c8 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -210,15 +210,16 @@ public:
/**
* Synchronously run the aggregation request, with a best effort honoring of request
- * options. `callback` will be called with the batch contained in each response. `callback`
- * should return `true` to execute another getmore. Returning `false` will send a
- * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to
- * `callback`.
+ * options. `callback` will be called with the batch and resume token contained in each
+ * response. `callback` should return `true` to execute another getmore. Returning `false` will
+ * send a `killCursors`. If the aggregation results are exhausted, there will be no additional
+ * calls to `callback`.
*/
virtual Status runAggregation(
OperationContext* opCtx,
const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0;
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback) = 0;
/**
* Runs a write command against a shard. This is separate from runCommand, because write
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 9087cb5d244..b1acef8f962 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -424,7 +424,8 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx,
Status ShardRemote::runAggregation(
OperationContext* opCtx,
const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback) {
BSONObj readPrefMetadata;
@@ -467,7 +468,9 @@ Status ShardRemote::runAggregation(
}
try {
- if (!callback(data.documents)) {
+ boost::optional<BSONObj> postBatchResumeToken =
+ data.documents.empty() ? data.otherFields.postBatchResumeToken : boost::none;
+ if (!callback(data.documents, postBatchResumeToken)) {
*nextAction = Fetcher::NextAction::kNoAction;
}
} catch (...) {
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index 69a7c2233bf..6c99a8a5247 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -85,9 +85,11 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) final;
- Status runAggregation(OperationContext* opCtx,
- const AggregateCommandRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback);
+ Status runAggregation(
+ OperationContext* opCtx,
+ const AggregateCommandRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch,
+ const boost::optional<BSONObj>& postBatchResumeToken)> callback);
private:
struct AsyncCmdHandle {