diff options
author | jannaerin <golden.janna@gmail.com> | 2021-07-15 14:05:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-29 18:51:17 +0000 |
commit | 298d02f7432b84df2a955addc5bcdf6a366c6645 (patch) | |
tree | cd63849097905aee03c691c1fa400c6ba801b987 /src/mongo | |
parent | 68f6c01d0f0452a9aef96e2da6289ce40fac2e41 (diff) | |
download | mongo-298d02f7432b84df2a955addc5bcdf6a366c6645.tar.gz |
SERVER-49897 Insert no-op entries into oplog buffer collections for resharding so resuming is less wasteful
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 14 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 1 | ||||
-rw-r--r-- | src/mongo/db/rs_local_client.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/rs_local_client.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/shard_local.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/shard_local.h | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/client/shard.h | 11 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.h | 8 |
15 files changed, 234 insertions, 33 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index cf0f72cb113..5c77365f8ed 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -58,6 +58,7 @@ const char* kNamespaceFieldName = "ns"; const char* kFirstBatchFieldName = "firstBatch"; const char* kNextBatchFieldName = "nextBatch"; +const char* kPostBatchResumeTokenFieldName = "postBatchResumeToken"; /** * Parses cursor response in command result for cursor ID, namespace and documents. @@ -144,6 +145,19 @@ Status parseCursorResponse(const BSONObj& obj, doc.shareOwnershipWith(obj); } + BSONElement postBatchResumeToken = cursorObj.getField(kPostBatchResumeTokenFieldName); + if (!postBatchResumeToken.eoo()) { + if (postBatchResumeToken.type() != BSONType::Object) { + return Status(ErrorCodes::FailedToParse, + str::stream() + << "'" << kCursorFieldName << "." << kPostBatchResumeTokenFieldName + << "' field must be of type object " << obj); + } + + batchData->otherFields.postBatchResumeToken.emplace(postBatchResumeToken.Obj().getOwned()); + } + + return Status::OK(); } diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 4ce57087c20..6e2c4a825e8 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -69,6 +69,7 @@ public: Documents documents; struct OtherFields { BSONObj metadata; + boost::optional<BSONObj> postBatchResumeToken = boost::none; } otherFields; Microseconds elapsed = Microseconds(0); bool first = false; diff --git a/src/mongo/db/rs_local_client.cpp b/src/mongo/db/rs_local_client.cpp index 738edc3becd..f4a668bed7c 100644 --- a/src/mongo/db/rs_local_client.cpp +++ b/src/mongo/db/rs_local_client.cpp @@ -159,7 +159,8 @@ StatusWith<Shard::QueryResponse> RSLocalClient::queryOnce( Status RSLocalClient::runAggregation( OperationContext* opCtx, const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback) { + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback) { DBDirectClient client(opCtx); auto cursor = uassertStatusOKWithContext( DBClientCursor::fromAggregationRequest( @@ -174,7 +175,8 @@ Status RSLocalClient::runAggregation( } try { - if (!callback(batchDocs)) { + // TODO SERVER-58938 pass DBClientCursor::_postBatchResumeToken to callback + if (!callback(batchDocs, boost::none)) { break; } } catch (const DBException& ex) { diff --git a/src/mongo/db/rs_local_client.h b/src/mongo/db/rs_local_client.h index 8fb1dd544ca..46f523e0e37 100644 --- a/src/mongo/db/rs_local_client.h +++ b/src/mongo/db/rs_local_client.h @@ -70,9 +70,11 @@ public: boost::optional<long long> limit, const boost::optional<BSONObj>& hint = boost::none); - Status runAggregation(OperationContext* opCtx, - const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback); + Status runAggregation( + OperationContext* opCtx, + const AggregateCommandRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback); private: /** diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index e1ca728d8f0..f98dae2426f 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -135,6 +135,14 @@ public: return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); } + repl::MutableOplogEntry makeProgressMarkOplogEntry(Timestamp ts) { + ReshardingDonorOplogId oplogId(ts, ts); + const BSONObj oField(BSON("msg" + << "Latest oplog ts from donor's cursor response")); + const BSONObj o2Field(BSON("type" << kReshardProgressMark)); + return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); + } + const NamespaceString& oplogNss() const { return _oplogNss; } @@ -412,5 +420,34 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPostImageOplogEntry) { ASSERT_TRUE(next.empty()); } +TEST_F(ReshardingDonorOplogIterTest, BatchIncludesProgressMarkEntries) { + const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); + const auto progressMarkOplog1 = makeProgressMarkOplogEntry(Timestamp(15, 3)); + const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); + + DBDirectClient client(operationContext()); + const auto ns = oplogNss().ns(); + client.insert(ns, oplog1.toBSON()); + client.insert(ns, progressMarkOplog1.toBSON()); + client.insert(ns, finalOplog.toBSON()); + + ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); + auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); + auto altClient = makeKillableClient(); + AlternativeClientRegion acr(altClient); + + auto next = getNextBatch(&iter, executor, factory); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); + + next = getNextBatch(&iter, executor, factory); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(progressMarkOplog1), getId(next[0])); + + next = getNextBatch(&iter, executor, factory); + ASSERT_TRUE(next.empty()); +} + } // anonymous namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 434e5c9d0a0..6749ef949cf 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -107,8 +107,8 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l // // `_startAt` is updated after each insert into the oplog buffer collection by // ReshardingOplogFetcher to reflect the newer resume point if a new aggregation request was - // being issued. - + // being issued. It is also updated with the latest oplog timestamp from donor's cursor response + // after we finish inserting the entire batch. stdx::lock_guard lk(_mutex); if (lastSeen < _startAt) { // `lastSeen < _startAt` means there's at least one document which has been inserted by @@ -289,7 +289,9 @@ bool ReshardingOplogFetcher::consume(Client* client, uassertStatusOK(shard->runAggregation( opCtxRaii.get(), aggRequest, - [this, &batchesProcessed, &moreToCome, factory](const std::vector<BSONObj>& batch) { + [this, &batchesProcessed, &moreToCome, factory]( + const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken) { ThreadClient client(fmt::format("ReshardingFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()), @@ -332,6 +334,49 @@ bool ReshardingOplogFetcher::consume(Client* client, } } + if (postBatchResumeToken) { + // Insert a noop entry with the latest oplog timestamp from the donor's cursor + // response. This will allow the fetcher to resume reading from the last oplog entry + // it fetched even if that entry is for a different collection, making resuming less + // wasteful. + try { + auto lastOplogTs = postBatchResumeToken->getField("ts").timestamp(); + auto startAt = ReshardingDonorOplogId(lastOplogTs, lastOplogTs); + + WriteUnitOfWork wuow(opCtx); + + repl::MutableOplogEntry oplog; + oplog.setNss(_toWriteInto); + oplog.setOpType(repl::OpTypeEnum::kNoop); + oplog.setUuid(_collUUID); + oplog.set_id(Value(startAt.toBSON())); + oplog.setObject(BSON("msg" + << "Latest oplog ts from donor's cursor response")); + oplog.setObject2(BSON("type" << kReshardProgressMark)); + oplog.setOpTime(OplogSlot()); + oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + + uassertStatusOK( + toWriteTo->insertDocument(opCtx, InsertStatement{oplog.toBSON()}, nullptr)); + wuow.commit(); + + _env->metrics()->onOplogEntriesFetched(1); + + auto [p, f] = makePromiseFuture<void>(); + { + stdx::lock_guard lk(_mutex); + _startAt = startAt; + _onInsertPromise.emplaceValue(); + _onInsertPromise = std::move(p); + _onInsertFuture = std::move(f); + } + } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) { + // It's possible that the donor shard has not generated new oplog entries since + // the previous getMore. In this case the latest oplog timestamp the donor + // returns will be the same, so it's safe to ignore this error. + } + } + if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) { return false; } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index 40ce2dcf21e..559d9cbe630 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -478,8 +478,8 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { auto hasSeenStartAtFuture = fetcher.awaitInsert(startAt); ASSERT_FALSE(hasSeenStartAtFuture.isReady()); - // iterate() won't lead to any documents being inserted into the output collection (because no - // writes have happened to the data collection) so `hasSeenStartAtFuture` still won't be ready. + // Because no writes have happened to the data collection, the fetcher will insert a no-op entry + // with the latestOplogTimestamp, so `hasSeenStartAtFuture` will be ready. auto fetcherJob = launchAsync([&, this] { ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); fetcher.useReadConcernForTest(false); @@ -487,12 +487,12 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { auto factory = makeCancelableOpCtx(); return fetcher.iterate(&cc(), factory); }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); - ASSERT_FALSE(hasSeenStartAtFuture.isReady()); + ASSERT_TRUE(hasSeenStartAtFuture.isReady()); // Insert a document into the data collection and have it generate an oplog entry with a - // "destinedRecipient" field. Only after iterate() is called again and inserts a record into the - // output collection will `hasSeenStartAtFuture` have become ready. + // "destinedRecipient" field. auto dataWriteTimestamp = [&] { FailPointEnableBlock fp("addDestinedRecipient", BSON("destinedRecipient" << _destinationShard.toString())); @@ -505,7 +505,6 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); }(); - ASSERT_FALSE(hasSeenStartAtFuture.isReady()); fetcherJob = launchAsync([&, this] { ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); @@ -521,9 +520,99 @@ TEST_F(ReshardingOplogFetcherTest, TestAwaitInsert) { ASSERT_TRUE(fetcher.awaitInsert(startAt).isReady()); // However, asking for `dataWriteTimestamp` wouldn't become ready until the next record is - // insert into the output collection. + // inserted into the output collection. ASSERT_FALSE(fetcher.awaitInsert({dataWriteTimestamp, dataWriteTimestamp}).isReady()); } +TEST_F(ReshardingOplogFetcherTest, TestStartAtUpdatedWithProgressMarkOplogTs) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + const NamespaceString otherCollection("dbtests.collectionNotBeingResharded"); + + create(outputCollectionNss); + create(dataCollectionNss); + create(otherCollection); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + ReshardingDonorOplogId startAt{_fetchTimestamp, _fetchTimestamp}; + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + startAt, + _donorShard, + _destinationShard, + outputCollectionNss); + + // Insert a document into the data collection and have it generate an oplog entry with a + // "destinedRecipient" field. + auto writeToDataCollectionTs = [&] { + FailPointEnableBlock fp("addDestinedRecipient", + BSON("destinedRecipient" << _destinationShard.toString())); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1))); + wuow.commit(); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + }(); + + auto fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); + + // The fetcher's lastSeenTimestamp should be equal to `writeToDataCollectionTs`. + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToDataCollectionTs); + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToDataCollectionTs); + ASSERT_EQ(1, metricsFetchedCount()) << " Verify reported metrics"; + + // Now, insert a document into a different collection that is not involved in resharding. + auto writeToOtherCollectionTs = [&] { + AutoGetCollection dataColl(_opCtx, otherCollection, LockMode::MODE_IX); + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), InsertStatement(BSON("_id" << 1 << "a" << 1))); + wuow.commit(); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + return repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + }(); + + fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + ASSERT_TRUE(requestPassthroughHandler(fetcherJob)); + + // The fetcher's lastSeenTimestamp should now be equal to `writeToOtherCollectionTs` + // because the lastSeenTimestamp will be updated with the latest oplog timestamp from the + // donor's cursor response. + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getClusterTime() == writeToOtherCollectionTs); + ASSERT_TRUE(fetcher.getLastSeenTimestamp().getTs() == writeToOtherCollectionTs); + ASSERT_EQ(2, metricsFetchedCount()) << " Verify reported metrics"; + + // The last document returned by ReshardingDonorOplogIterator::getNextBatch() would be + // `writeToDataCollectionTs`, but ReshardingOplogFetcher would have inserted a doc with + // `writeToOtherCollectionTs` after this so `awaitInsert` should be immediately ready when + // passed `writeToDataCollectionTs`. + ASSERT_TRUE(fetcher.awaitInsert({writeToDataCollectionTs, writeToDataCollectionTs}).isReady()); + + // `awaitInsert` should not be ready if passed `writeToOtherCollectionTs`. + ASSERT_FALSE( + fetcher.awaitInsert({writeToOtherCollectionTs, writeToOtherCollectionTs}).isReady()); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 5856e4c1f0f..5aa9473991a 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -382,7 +382,6 @@ bool isFinalOplog(const repl::OplogEntry& oplog, UUID reshardingUUID) { reshardingUUID; } - NamespaceString getLocalOplogBufferNamespace(UUID existingUUID, ShardId donorShardId) { return NamespaceString("config.localReshardingOplogBuffer.{}.{}"_format( existingUUID.toString(), donorShardId.toString())); diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 48a626272b4..0e8646a0f88 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -52,6 +52,7 @@ namespace mongo { constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd; +constexpr auto kReshardProgressMark = "reshardProgressMark"_sd; static const auto kReshardErrorMaxBytes = 2000; /** diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp index 8edbdcd95ef..aad3d286b07 100644 --- a/src/mongo/db/s/shard_local.cpp +++ b/src/mongo/db/s/shard_local.cpp @@ -212,9 +212,11 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx, MONGO_UNREACHABLE; } -Status ShardLocal::runAggregation(OperationContext* opCtx, - const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback) { +Status ShardLocal::runAggregation( + OperationContext* opCtx, + const AggregateCommandRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback) { return _rsLocalClient.runAggregation(opCtx, aggRequest, callback); } diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h index a9ec0d82a22..ed88dfeb10c 100644 --- a/src/mongo/db/s/shard_local.h +++ b/src/mongo/db/s/shard_local.h @@ -70,9 +70,11 @@ public: const std::string& dbName, const BSONObj& cmdObj) override; - Status runAggregation(OperationContext* opCtx, - const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback); + Status runAggregation( + OperationContext* opCtx, + const AggregateCommandRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback); private: StatusWith<Shard::CommandResponse> _runCommand(OperationContext* opCtx, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index b84adc3319e..7b0c7cc57ad 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -738,7 +738,8 @@ std::pair<CollectionType, std::vector<ChunkType>> ShardingCatalogClientImpl::get // Run the aggregation std::vector<BSONObj> aggResult; - auto callback = [&aggResult](const std::vector<BSONObj>& batch) { + auto callback = [&aggResult](const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken) { aggResult.insert(aggResult.end(), std::make_move_iterator(batch.begin()), std::make_move_iterator(batch.end())); diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index be61ec54877..9c28d5139c8 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -210,15 +210,16 @@ public: /** * Synchronously run the aggregation request, with a best effort honoring of request - * options. `callback` will be called with the batch contained in each response. `callback` - * should return `true` to execute another getmore. Returning `false` will send a - * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to - * `callback`. + * options. `callback` will be called with the batch and resume token contained in each + * response. `callback` should return `true` to execute another getmore. Returning `false` will + * send a `killCursors`. If the aggregation results are exhausted, there will be no additional + * calls to `callback`. */ virtual Status runAggregation( OperationContext* opCtx, const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0; + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback) = 0; /** * Runs a write command against a shard. This is separate from runCommand, because write diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 9087cb5d244..b1acef8f962 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -424,7 +424,8 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx, Status ShardRemote::runAggregation( OperationContext* opCtx, const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback) { + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback) { BSONObj readPrefMetadata; @@ -467,7 +468,9 @@ Status ShardRemote::runAggregation( } try { - if (!callback(data.documents)) { + boost::optional<BSONObj> postBatchResumeToken = + data.documents.empty() ? data.otherFields.postBatchResumeToken : boost::none; + if (!callback(data.documents, postBatchResumeToken)) { *nextAction = Fetcher::NextAction::kNoAction; } } catch (...) { diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h index 69a7c2233bf..6c99a8a5247 100644 --- a/src/mongo/s/client/shard_remote.h +++ b/src/mongo/s/client/shard_remote.h @@ -85,9 +85,11 @@ public: const std::string& dbName, const BSONObj& cmdObj) final; - Status runAggregation(OperationContext* opCtx, - const AggregateCommandRequest& aggRequest, - std::function<bool(const std::vector<BSONObj>& batch)> callback); + Status runAggregation( + OperationContext* opCtx, + const AggregateCommandRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch, + const boost::optional<BSONObj>& postBatchResumeToken)> callback); private: struct AsyncCmdHandle { |