diff options
27 files changed, 330 insertions, 68 deletions
diff --git a/jstests/core/tailable_skip_limit.js b/jstests/core/tailable_skip_limit.js index d0da0a15b28..1fae7263a8f 100644 --- a/jstests/core/tailable_skip_limit.js +++ b/jstests/core/tailable_skip_limit.js @@ -1,63 +1,82 @@ // Test that tailable cursors work correctly with skip and limit. +(function() { + "use strict"; -// Setup the capped collection. -var collname = "jstests_tailable_skip_limit" -var t = db[collname]; -t.drop(); -db.createCollection(collname, {capped: true, size: 1024}); - -t.save({_id: 1}); -t.save({_id: 2}); - -// Non-tailable with skip -var cursor = t.find().skip(1); -assert.eq(2, cursor.next()["_id"]); -assert(!cursor.hasNext()); -t.save({_id: 3}); -assert(!cursor.hasNext()); - -// Non-tailable with limit -var cursor = t.find().limit(100); -for (var i = 1; i <= 3; i++) { - assert.eq(i, cursor.next()["_id"]); -} -assert(!cursor.hasNext()); -t.save({_id: 4}); -assert(!cursor.hasNext()); - -// Non-tailable with negative limit -var cursor = t.find().limit(-100); -for (var i = 1; i <= 4; i++) { - assert.eq(i, cursor.next()["_id"]); -} -assert(!cursor.hasNext()); -t.save({_id: 5}); -assert(!cursor.hasNext()); - -// Tailable with skip -cursor = t.find().addOption(2).skip(4); -assert.eq(5, cursor.next()["_id"]); -assert(!cursor.hasNext()); -t.save({_id: 6}); -assert(cursor.hasNext()); -assert.eq(6, cursor.next()["_id"]); - -// Tailable with limit -var cursor = t.find().addOption(2).limit(100); -for (var i = 1; i <= 6; i++) { - assert.eq(i, cursor.next()["_id"]); -} -assert(!cursor.hasNext()); -t.save({_id: 7}); -assert(cursor.hasNext()); -assert.eq(7, cursor.next()["_id"]); - -// Tailable with negative limit -var cursor = t.find().addOption(2).limit(-100); -for (var i = 1; i <= 7; i++) { - assert.eq(i, cursor.next()["_id"]); -} -assert(!cursor.hasNext()); -t.save({_id: 8}); -assert(cursor.hasNext()); -assert.eq(8, cursor.next()["_id"]); + // Setup the capped collection. + var collname = "jstests_tailable_skip_limit" + var t = db[collname]; + t.drop(); + db.createCollection(collname, {capped: true, size: 1024}); + + t.save({_id: 1}); + t.save({_id: 2}); + + // Non-tailable with skip + var cursor = t.find().skip(1); + assert.eq(2, cursor.next()["_id"]); + assert(!cursor.hasNext()); + t.save({_id: 3}); + assert(!cursor.hasNext()); + + // Non-tailable with limit + var cursor = t.find().limit(100); + for (var i = 1; i <= 3; i++) { + assert.eq(i, cursor.next()["_id"]); + } + assert(!cursor.hasNext()); + t.save({_id: 4}); + assert(!cursor.hasNext()); + + // Non-tailable with negative limit + var cursor = t.find().limit(-100); + for (var i = 1; i <= 4; i++) { + assert.eq(i, cursor.next()["_id"]); + } + assert(!cursor.hasNext()); + t.save({_id: 5}); + assert(!cursor.hasNext()); + + // Tailable with skip + cursor = t.find().addOption(2).skip(4); + assert.eq(5, cursor.next()["_id"]); + assert(!cursor.hasNext()); + t.save({_id: 6}); + assert(cursor.hasNext()); + assert.eq(6, cursor.next()["_id"]); + + // Tailable with limit + var cursor = t.find().addOption(2).limit(100); + for (var i = 1; i <= 6; i++) { + assert.eq(i, cursor.next()["_id"]); + } + assert(!cursor.hasNext()); + t.save({_id: 7}); + assert(cursor.hasNext()); + assert.eq(7, cursor.next()["_id"]); + + // Tailable with negative limit + var cursor = t.find().addOption(2).limit(-100); + for (var i = 1; i <= 7; i++) { + assert.eq(i, cursor.next()["_id"]); + } + assert(!cursor.hasNext()); + t.save({_id: 8}); + assert(cursor.hasNext()); + assert.eq(8, cursor.next()["_id"]); + + // Tests that a tailable cursor over an empty capped collection produces a dead cursor, intended + // to be run on both mongod and mongos. For SERVER-20720. + t.drop(); + db.createCollection(t.getName(), {capped: true, size: 1024}); + + var cmdRes = db.runCommand({find: t.getName(), tailable: true}); + assert.commandWorked(cmdRes); + assert.eq(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, t.getFullName()); + assert.eq(cmdRes.cursor.firstBatch.length, 0); + + // Test that the cursor works in the shell. + assert.eq(t.find().addOption(2).itcount(), 0); + assert.writeOK(t.insert({a: 1})); + assert.eq(t.find().addOption(2).itcount(), 1); +})(); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index d7e8edf0dcd..d8a47c23eaa 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -62,15 +62,22 @@ AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, AsyncResultsMerger::~AsyncResultsMerger() { stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete); +} - bool allExhausted = true; +bool AsyncResultsMerger::remotesExhausted() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return remotesExhausted_inlock(); +} + +bool AsyncResultsMerger::remotesExhausted_inlock() { for (const auto& remote : _remotes) { if (!remote.exhausted()) { - allExhausted = false; + return false; } } - invariant(allExhausted || _lifecycleState == kKillComplete); + return true; } bool AsyncResultsMerger::ready() { diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 6b00f94f8aa..4c6cc58f4d1 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -86,6 +86,11 @@ public: virtual ~AsyncResultsMerger(); /** + * Returns true if all of the remote cursors are exhausted. + */ + bool remotesExhausted(); + + /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either * --there is a buffered result which we can return, @@ -230,6 +235,11 @@ private: */ Status askForNextBatch_inlock(size_t remoteIndex); + /** + * Checks whether or not the remote cursors are all exhausted. + */ + bool remotesExhausted_inlock(); + // // Helpers for ready(). // diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 70e5d0e69a3..dd1d90dc636 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -190,6 +190,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); // First shard responds. std::vector<CursorResponse> responses; @@ -200,6 +201,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { // Can't return any results until we have a response from all three shards. ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); // Second two shards respond. responses.clear(); @@ -211,6 +213,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { executor->waitForEvent(readyEvent); + ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); @@ -245,6 +248,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); + ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); @@ -261,6 +265,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; @@ -272,6 +277,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); + ASSERT_FALSE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); @@ -284,6 +290,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); responses.clear(); std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")}; @@ -291,6 +298,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); + ASSERT_TRUE(arm->remotesExhausted()); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); @@ -944,6 +952,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { // In the tailable case, we expect boost::none after every batch. ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_FALSE(arm->remotesExhausted()); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); @@ -956,9 +965,11 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); executor->waitForEvent(killedEvent); @@ -979,14 +990,38 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); - // After receiving an empty batch, the ARM should return boost::none. + // After receiving an empty batch, the ARM should return boost::none, but remotes should not be + // marked as exhausted. ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_FALSE(arm->remotesExhausted()); auto killedEvent = arm->kill(); executor->waitForEvent(killedEvent); } +TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); + makeCursorFromFindCmd(findCmd, {_remotes[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // Remote responds with an empty batch and a zero cursor id. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + executor->waitForEvent(readyEvent); + + // Afterwards, the ARM should return boost::none and remote cursors should be marked as + // exhausted. + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->remotesExhausted()); +} + TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 4375c2a82ae..d0e6d8cf811 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -90,6 +90,11 @@ public: * 'obj' must be owned BSON. */ virtual void queueResult(const BSONObj& obj) = 0; + + /** + * Returns whether or not all the remote cursors underlying this cursor have been exhausted. + */ + virtual bool remotesExhausted() = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index c217c8afceb..8303d84e6c1 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -81,6 +81,10 @@ void ClusterClientCursorImpl::queueResult(const BSONObj& obj) { _stash.push(obj); } +bool ClusterClientCursorImpl::remotesExhausted() { + return _root->remotesExhausted(); +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( executor::TaskExecutor* executor, ClusterClientCursorParams params) { // The first stage is always the one which merges from the remotes. diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 3b2945befbd..d7f628e4945 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -65,6 +65,8 @@ public: void queueResult(const BSONObj& obj) final; + bool remotesExhausted() final; + private: /** * Constructs the pipeline of MergerPlanStages which will be used to answer the query. diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index c159f12c80f..96f5e2944a2 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -100,6 +100,35 @@ TEST(ClusterClientCursorImpl, QueueResult) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL); } +TEST(ClusterClientCursorImpl, RemotesExhausted) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->markRemotesExhausted(); + + ClusterClientCursorImpl cursor(std::move(mockStage)); + ASSERT_TRUE(cursor.remotesExhausted()); + + auto firstResult = cursor.next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT_TRUE(cursor.remotesExhausted()); + + auto secondResult = cursor.next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue()); + ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT_TRUE(cursor.remotesExhausted()); + + auto thirdResult = cursor.next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); + ASSERT_TRUE(cursor.remotesExhausted()); + + ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 9a4982e5bae..402f2fd7ef8 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -79,6 +79,10 @@ void ClusterClientCursorMock::queueResult(const BSONObj& obj) { _resultsQueue.push({obj}); } +bool ClusterClientCursorMock::remotesExhausted() { + MONGO_UNREACHABLE; +} + void ClusterClientCursorMock::queueError(Status status) { _resultsQueue.push({status}); } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 0f129e96c20..d538e482773 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -54,6 +54,11 @@ public: void queueResult(const BSONObj& obj) final; /** + * Not used. + */ + bool remotesExhausted() final; + + /** * Queues an error response. */ void queueError(Status status); diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 0c0b9a45077..7f09137a7ac 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -143,6 +143,11 @@ void ClusterCursorManager::PinnedCursor::queueResult(const BSONObj& obj) { _cursor->queueResult(obj); } +bool ClusterCursorManager::PinnedCursor::remotesExhausted() { + invariant(_cursor); + return _cursor->remotesExhausted(); +} + void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index f5b790cc2db..089507ec338 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -183,6 +183,12 @@ public: */ void queueResult(const BSONObj& obj); + /** + * Returns whether or not all the remote cursors underlying this cursor have been + * exhausted. Cannot be called after returnCursor() is called. A cursor must be owned. + */ + bool remotesExhausted(); + private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 847b2dcecc4..ebc04d3103a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -298,8 +298,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, } if (!next.getValue()) { - // We reached end-of-stream. - if (!pinnedCursor.isTailable()) { + // We reached end-of-stream. If the cursor is not tailable, then we mark it as + // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even + // when we reach end-of-stream. However, if all the remote cursors are exhausted, there + // is no hope of returning data and thus we need to close the mongos cursor as well. + if (!pinnedCursor.isTailable() || pinnedCursor.remotesExhausted()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } break; diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 190729cddce..68e12b7f1f6 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -66,6 +66,11 @@ public: */ virtual void kill() = 0; + /** + * Returns whether or not all the remote cursors are exhausted. + */ + virtual bool remotesExhausted() = 0; + protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 09f04cf1b35..c2f584f0358 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -59,4 +59,8 @@ void RouterStageLimit::kill() { getChildStage()->kill(); } +bool RouterStageLimit::remotesExhausted() { + return getChildStage()->remotesExhausted(); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 1c252e7fac0..26ced69b24b 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -43,6 +43,8 @@ public: void kill() final; + bool remotesExhausted() final; + private: long long _limit; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index fb5875fd625..b85d82bef2d 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -137,6 +137,33 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { ASSERT(!fourthResult.getValue()); } +TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->markRemotesExhausted(); + + auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100); + ASSERT_TRUE(limitStage->remotesExhausted()); + + auto firstResult = limitStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1)); + ASSERT_TRUE(limitStage->remotesExhausted()); + + auto secondResult = limitStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue()); + ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2)); + ASSERT_TRUE(limitStage->remotesExhausted()); + + auto thirdResult = limitStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); + ASSERT_TRUE(limitStage->remotesExhausted()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index b861e999385..84f69166f7a 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -60,4 +60,8 @@ void RouterStageMerge::kill() { _executor->waitForEvent(killEvent); } +bool RouterStageMerge::remotesExhausted() { + return _arm.remotesExhausted(); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 738804cc30f..6ed9709f130 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -49,6 +49,8 @@ public: void kill() final; + bool remotesExhausted() final; + private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index 69cc780824b..5af0d0d470a 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -46,6 +46,10 @@ void RouterStageMock::queueEOF() { _resultsQueue.push({boost::none}); } +void RouterStageMock::markRemotesExhausted() { + _remotesExhausted = true; +} + StatusWith<boost::optional<BSONObj>> RouterStageMock::next() { if (_resultsQueue.empty()) { return {boost::none}; @@ -60,4 +64,8 @@ void RouterStageMock::kill() { // No child to kill. } +bool RouterStageMock::remotesExhausted() { + return _remotesExhausted; +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 7f5f4727349..7cb31ce9745 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -36,7 +36,8 @@ namespace mongo { /** - * Passes through the first n results and then returns boost::none. + * Initialized by adding results to its results queue, it then passes through the results in its + * queue until the queue is empty. */ class RouterStageMock final : public RouterExecStage { public: @@ -46,6 +47,8 @@ public: void kill() final; + bool remotesExhausted() final; + /** * Queues a BSONObj to be returned. */ @@ -62,8 +65,14 @@ public: */ void queueEOF(); + /** + * Explicitly marks the remote cursors as all exhausted. + */ + void markRemotesExhausted(); + private: std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue; + bool _remotesExhausted = false; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 7fa343f44c1..8708e603c7b 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -61,4 +61,8 @@ void RouterStageRemoveSortKey::kill() { getChildStage()->kill(); } +bool RouterStageRemoveSortKey::remotesExhausted() { + return getChildStage()->remotesExhausted(); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index c376226f23f..3cdae152db7 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -44,6 +44,8 @@ public: StatusWith<boost::optional<BSONObj>> next() final; void kill() final; + + bool remotesExhausted() final; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index 90a7a555b07..668aec8e978 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -122,6 +122,33 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { ASSERT(!fourthResult.getValue()); } +TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); + mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); + mockStage->markRemotesExhausted(); + + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto firstResult = sortKeyStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1)); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto secondResult = sortKeyStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue()); + ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2 << "b" << 2)); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); + + auto thirdResult = sortKeyStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); + ASSERT_TRUE(sortKeyStage->remotesExhausted()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 51a2abe06af..5fc866b2db0 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -60,4 +60,8 @@ void RouterStageSkip::kill() { getChildStage()->kill(); } +bool RouterStageSkip::remotesExhausted() { + return getChildStage()->remotesExhausted(); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 6f78b31ce90..b29e7fb20bd 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -43,6 +43,8 @@ public: void kill() final; + bool remotesExhausted() final; + private: long long _skip; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index 84761d7b6ee..6e03e2d3301 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -172,6 +172,34 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { ASSERT(!thirdResult.getValue()); } +TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + mockStage->markRemotesExhausted(); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); + ASSERT_TRUE(skipStage->remotesExhausted()); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2)); + ASSERT_TRUE(skipStage->remotesExhausted()); + + auto secondResult = skipStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue()); + ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3)); + ASSERT_TRUE(skipStage->remotesExhausted()); + + auto thirdResult = skipStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); + ASSERT_TRUE(skipStage->remotesExhausted()); +} + } // namespace } // namespace mongo |