summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYunhe (John) Wang <yunhe.wang@mongodb.com>2015-10-06 13:41:59 -0400
committerYunhe (John) Wang <yunhe.wang@mongodb.com>2015-10-08 13:31:20 -0400
commitaaf2969861e882624132c5d6b6141acfafc15aa7 (patch)
treef411c1634fdd7af47dde1a9cdb0bfe60335e571d
parent9b984cdc4b58be9002c692cd9ba8af2a3731b748 (diff)
downloadmongo-aaf2969861e882624132c5d6b6141acfafc15aa7.tar.gz
SERVER-20720 mongos now returns dead tailable cursor over an empty capped collection
-rw-r--r--jstests/core/tailable_skip_limit.js141
-rw-r--r--src/mongo/s/query/async_results_merger.cpp13
-rw-r--r--src/mongo/s/query/async_results_merger.h10
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp37
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp29
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h6
-rw-r--r--src/mongo/s/query/cluster_find.cpp7
-rw-r--r--src/mongo/s/query/router_exec_stage.h5
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp27
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp8
-rw-r--r--src/mongo/s/query/router_stage_mock.h11
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp27
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp4
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp28
27 files changed, 330 insertions, 68 deletions
diff --git a/jstests/core/tailable_skip_limit.js b/jstests/core/tailable_skip_limit.js
index d0da0a15b28..1fae7263a8f 100644
--- a/jstests/core/tailable_skip_limit.js
+++ b/jstests/core/tailable_skip_limit.js
@@ -1,63 +1,82 @@
// Test that tailable cursors work correctly with skip and limit.
+(function() {
+ "use strict";
-// Setup the capped collection.
-var collname = "jstests_tailable_skip_limit"
-var t = db[collname];
-t.drop();
-db.createCollection(collname, {capped: true, size: 1024});
-
-t.save({_id: 1});
-t.save({_id: 2});
-
-// Non-tailable with skip
-var cursor = t.find().skip(1);
-assert.eq(2, cursor.next()["_id"]);
-assert(!cursor.hasNext());
-t.save({_id: 3});
-assert(!cursor.hasNext());
-
-// Non-tailable with limit
-var cursor = t.find().limit(100);
-for (var i = 1; i <= 3; i++) {
- assert.eq(i, cursor.next()["_id"]);
-}
-assert(!cursor.hasNext());
-t.save({_id: 4});
-assert(!cursor.hasNext());
-
-// Non-tailable with negative limit
-var cursor = t.find().limit(-100);
-for (var i = 1; i <= 4; i++) {
- assert.eq(i, cursor.next()["_id"]);
-}
-assert(!cursor.hasNext());
-t.save({_id: 5});
-assert(!cursor.hasNext());
-
-// Tailable with skip
-cursor = t.find().addOption(2).skip(4);
-assert.eq(5, cursor.next()["_id"]);
-assert(!cursor.hasNext());
-t.save({_id: 6});
-assert(cursor.hasNext());
-assert.eq(6, cursor.next()["_id"]);
-
-// Tailable with limit
-var cursor = t.find().addOption(2).limit(100);
-for (var i = 1; i <= 6; i++) {
- assert.eq(i, cursor.next()["_id"]);
-}
-assert(!cursor.hasNext());
-t.save({_id: 7});
-assert(cursor.hasNext());
-assert.eq(7, cursor.next()["_id"]);
-
-// Tailable with negative limit
-var cursor = t.find().addOption(2).limit(-100);
-for (var i = 1; i <= 7; i++) {
- assert.eq(i, cursor.next()["_id"]);
-}
-assert(!cursor.hasNext());
-t.save({_id: 8});
-assert(cursor.hasNext());
-assert.eq(8, cursor.next()["_id"]);
+ // Setup the capped collection.
+ var collname = "jstests_tailable_skip_limit"
+ var t = db[collname];
+ t.drop();
+ db.createCollection(collname, {capped: true, size: 1024});
+
+ t.save({_id: 1});
+ t.save({_id: 2});
+
+ // Non-tailable with skip
+ var cursor = t.find().skip(1);
+ assert.eq(2, cursor.next()["_id"]);
+ assert(!cursor.hasNext());
+ t.save({_id: 3});
+ assert(!cursor.hasNext());
+
+ // Non-tailable with limit
+ var cursor = t.find().limit(100);
+ for (var i = 1; i <= 3; i++) {
+ assert.eq(i, cursor.next()["_id"]);
+ }
+ assert(!cursor.hasNext());
+ t.save({_id: 4});
+ assert(!cursor.hasNext());
+
+ // Non-tailable with negative limit
+ var cursor = t.find().limit(-100);
+ for (var i = 1; i <= 4; i++) {
+ assert.eq(i, cursor.next()["_id"]);
+ }
+ assert(!cursor.hasNext());
+ t.save({_id: 5});
+ assert(!cursor.hasNext());
+
+ // Tailable with skip
+ cursor = t.find().addOption(2).skip(4);
+ assert.eq(5, cursor.next()["_id"]);
+ assert(!cursor.hasNext());
+ t.save({_id: 6});
+ assert(cursor.hasNext());
+ assert.eq(6, cursor.next()["_id"]);
+
+ // Tailable with limit
+ var cursor = t.find().addOption(2).limit(100);
+ for (var i = 1; i <= 6; i++) {
+ assert.eq(i, cursor.next()["_id"]);
+ }
+ assert(!cursor.hasNext());
+ t.save({_id: 7});
+ assert(cursor.hasNext());
+ assert.eq(7, cursor.next()["_id"]);
+
+ // Tailable with negative limit
+ var cursor = t.find().addOption(2).limit(-100);
+ for (var i = 1; i <= 7; i++) {
+ assert.eq(i, cursor.next()["_id"]);
+ }
+ assert(!cursor.hasNext());
+ t.save({_id: 8});
+ assert(cursor.hasNext());
+ assert.eq(8, cursor.next()["_id"]);
+
+ // Tests that a tailable cursor over an empty capped collection produces a dead cursor, intended
+ // to be run on both mongod and mongos. For SERVER-20720.
+ t.drop();
+ db.createCollection(t.getName(), {capped: true, size: 1024});
+
+ var cmdRes = db.runCommand({find: t.getName(), tailable: true});
+ assert.commandWorked(cmdRes);
+ assert.eq(cmdRes.cursor.id, NumberLong(0));
+ assert.eq(cmdRes.cursor.ns, t.getFullName());
+ assert.eq(cmdRes.cursor.firstBatch.length, 0);
+
+ // Test that the cursor works in the shell.
+ assert.eq(t.find().addOption(2).itcount(), 0);
+ assert.writeOK(t.insert({a: 1}));
+ assert.eq(t.find().addOption(2).itcount(), 1);
+})();
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index d7e8edf0dcd..d8a47c23eaa 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -62,15 +62,22 @@ AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
AsyncResultsMerger::~AsyncResultsMerger() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(remotesExhausted_inlock() || _lifecycleState == kKillComplete);
+}
- bool allExhausted = true;
+bool AsyncResultsMerger::remotesExhausted() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return remotesExhausted_inlock();
+}
+
+bool AsyncResultsMerger::remotesExhausted_inlock() {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
- allExhausted = false;
+ return false;
}
}
- invariant(allExhausted || _lifecycleState == kKillComplete);
+ return true;
}
bool AsyncResultsMerger::ready() {
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 6b00f94f8aa..4c6cc58f4d1 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -86,6 +86,11 @@ public:
virtual ~AsyncResultsMerger();
/**
+ * Returns true if all of the remote cursors are exhausted.
+ */
+ bool remotesExhausted();
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
@@ -230,6 +235,11 @@ private:
*/
Status askForNextBatch_inlock(size_t remoteIndex);
+ /**
+ * Checks whether or not the remote cursors are all exhausted.
+ */
+ bool remotesExhausted_inlock();
+
//
// Helpers for ready().
//
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 70e5d0e69a3..dd1d90dc636 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -190,6 +190,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
ASSERT_FALSE(arm->ready());
auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
// First shard responds.
std::vector<CursorResponse> responses;
@@ -200,6 +201,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
// Can't return any results until we have a response from all three shards.
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
// Second two shards respond.
responses.clear();
@@ -211,6 +213,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
executor->waitForEvent(readyEvent);
+ ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->ready());
@@ -245,6 +248,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
+ ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->ready());
@@ -261,6 +265,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
@@ -272,6 +277,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
+ ASSERT_FALSE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->ready());
@@ -284,6 +290,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
responses.clear();
std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")};
@@ -291,6 +298,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
+ ASSERT_TRUE(arm->remotesExhausted());
ASSERT_TRUE(arm->ready());
ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->ready());
@@ -944,6 +952,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
// In the tailable case, we expect boost::none after every batch.
ASSERT_TRUE(arm->ready());
ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_FALSE(arm->remotesExhausted());
ASSERT_FALSE(arm->ready());
readyEvent = unittest::assertGet(arm->nextEvent());
@@ -956,9 +965,11 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
+ ASSERT_FALSE(arm->remotesExhausted());
ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()));
ASSERT_TRUE(arm->ready());
ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
executor->waitForEvent(killedEvent);
@@ -979,14 +990,38 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
- // After receiving an empty batch, the ARM should return boost::none.
+ // After receiving an empty batch, the ARM should return boost::none, but remotes should not be
+ // marked as exhausted.
ASSERT_TRUE(arm->ready());
ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_FALSE(arm->remotesExhausted());
auto killedEvent = arm->kill();
executor->waitForEvent(killedEvent);
}
+TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
+ makeCursorFromFindCmd(findCmd, {_remotes[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Remote responds with an empty batch and a zero cursor id.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch;
+ responses.emplace_back(_nss, CursorId(0), batch);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
+ executor->waitForEvent(readyEvent);
+
+ // Afterwards, the ARM should return boost::none and remote cursors should be marked as
+ // exhausted.
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->remotesExhausted());
+}
+
TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}");
makeCursorFromFindCmd(findCmd, {_remotes[0]});
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 4375c2a82ae..d0e6d8cf811 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -90,6 +90,11 @@ public:
* 'obj' must be owned BSON.
*/
virtual void queueResult(const BSONObj& obj) = 0;
+
+ /**
+ * Returns whether or not all the remote cursors underlying this cursor have been exhausted.
+ */
+ virtual bool remotesExhausted() = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index c217c8afceb..8303d84e6c1 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -81,6 +81,10 @@ void ClusterClientCursorImpl::queueResult(const BSONObj& obj) {
_stash.push(obj);
}
+bool ClusterClientCursorImpl::remotesExhausted() {
+ return _root->remotesExhausted();
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams params) {
// The first stage is always the one which merges from the remotes.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 3b2945befbd..d7f628e4945 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -65,6 +65,8 @@ public:
void queueResult(const BSONObj& obj) final;
+ bool remotesExhausted() final;
+
private:
/**
* Constructs the pipeline of MergerPlanStages which will be used to answer the query.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index c159f12c80f..96f5e2944a2 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -100,6 +100,35 @@ TEST(ClusterClientCursorImpl, QueueResult) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL);
}
+TEST(ClusterClientCursorImpl, RemotesExhausted) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ mockStage->queueResult(BSON("a" << 1));
+ mockStage->queueResult(BSON("a" << 2));
+ mockStage->markRemotesExhausted();
+
+ ClusterClientCursorImpl cursor(std::move(mockStage));
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto firstResult = cursor.next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue());
+ ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto secondResult = cursor.next();
+ ASSERT_OK(secondResult.getStatus());
+ ASSERT(secondResult.getValue());
+ ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto thirdResult = cursor.next();
+ ASSERT_OK(thirdResult.getStatus());
+ ASSERT(!thirdResult.getValue());
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 9a4982e5bae..402f2fd7ef8 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -79,6 +79,10 @@ void ClusterClientCursorMock::queueResult(const BSONObj& obj) {
_resultsQueue.push({obj});
}
+bool ClusterClientCursorMock::remotesExhausted() {
+ MONGO_UNREACHABLE;
+}
+
void ClusterClientCursorMock::queueError(Status status) {
_resultsQueue.push({status});
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 0f129e96c20..d538e482773 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -54,6 +54,11 @@ public:
void queueResult(const BSONObj& obj) final;
/**
+ * Not used.
+ */
+ bool remotesExhausted() final;
+
+ /**
* Queues an error response.
*/
void queueError(Status status);
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 0c0b9a45077..7f09137a7ac 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -143,6 +143,11 @@ void ClusterCursorManager::PinnedCursor::queueResult(const BSONObj& obj) {
_cursor->queueResult(obj);
}
+bool ClusterCursorManager::PinnedCursor::remotesExhausted() {
+ invariant(_cursor);
+ return _cursor->remotesExhausted();
+}
+
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index f5b790cc2db..089507ec338 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -183,6 +183,12 @@ public:
*/
void queueResult(const BSONObj& obj);
+ /**
+ * Returns whether or not all the remote cursors underlying this cursor have been
+ * exhausted. Cannot be called after returnCursor() is called. A cursor must be owned.
+ */
+ bool remotesExhausted();
+
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 847b2dcecc4..ebc04d3103a 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -298,8 +298,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
}
if (!next.getValue()) {
- // We reached end-of-stream.
- if (!pinnedCursor.isTailable()) {
+ // We reached end-of-stream. If the cursor is not tailable, then we mark it as
+ // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even
+ // when we reach end-of-stream. However, if all the remote cursors are exhausted, there
+ // is no hope of returning data and thus we need to close the mongos cursor as well.
+ if (!pinnedCursor.isTailable() || pinnedCursor.remotesExhausted()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
}
break;
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 190729cddce..68e12b7f1f6 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -66,6 +66,11 @@ public:
*/
virtual void kill() = 0;
+ /**
+ * Returns whether or not all the remote cursors are exhausted.
+ */
+ virtual bool remotesExhausted() = 0;
+
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 09f04cf1b35..c2f584f0358 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -59,4 +59,8 @@ void RouterStageLimit::kill() {
getChildStage()->kill();
}
+bool RouterStageLimit::remotesExhausted() {
+ return getChildStage()->remotesExhausted();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 1c252e7fac0..26ced69b24b 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -43,6 +43,8 @@ public:
void kill() final;
+ bool remotesExhausted() final;
+
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index fb5875fd625..b85d82bef2d 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -137,6 +137,33 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
ASSERT(!fourthResult.getValue());
}
+TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ mockStage->queueResult(BSON("a" << 1));
+ mockStage->queueResult(BSON("a" << 2));
+ mockStage->markRemotesExhausted();
+
+ auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
+ ASSERT_TRUE(limitStage->remotesExhausted());
+
+ auto firstResult = limitStage->next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue());
+ ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1));
+ ASSERT_TRUE(limitStage->remotesExhausted());
+
+ auto secondResult = limitStage->next();
+ ASSERT_OK(secondResult.getStatus());
+ ASSERT(secondResult.getValue());
+ ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2));
+ ASSERT_TRUE(limitStage->remotesExhausted());
+
+ auto thirdResult = limitStage->next();
+ ASSERT_OK(thirdResult.getStatus());
+ ASSERT(!thirdResult.getValue());
+ ASSERT_TRUE(limitStage->remotesExhausted());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index b861e999385..84f69166f7a 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -60,4 +60,8 @@ void RouterStageMerge::kill() {
_executor->waitForEvent(killEvent);
}
+bool RouterStageMerge::remotesExhausted() {
+ return _arm.remotesExhausted();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 738804cc30f..6ed9709f130 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -49,6 +49,8 @@ public:
void kill() final;
+ bool remotesExhausted() final;
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index 69cc780824b..5af0d0d470a 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -46,6 +46,10 @@ void RouterStageMock::queueEOF() {
_resultsQueue.push({boost::none});
}
+void RouterStageMock::markRemotesExhausted() {
+ _remotesExhausted = true;
+}
+
StatusWith<boost::optional<BSONObj>> RouterStageMock::next() {
if (_resultsQueue.empty()) {
return {boost::none};
@@ -60,4 +64,8 @@ void RouterStageMock::kill() {
// No child to kill.
}
+bool RouterStageMock::remotesExhausted() {
+ return _remotesExhausted;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 7f5f4727349..7cb31ce9745 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -36,7 +36,8 @@
namespace mongo {
/**
- * Passes through the first n results and then returns boost::none.
+ * Initialized by adding results to its results queue, it then passes through the results in its
+ * queue until the queue is empty.
*/
class RouterStageMock final : public RouterExecStage {
public:
@@ -46,6 +47,8 @@ public:
void kill() final;
+ bool remotesExhausted() final;
+
/**
* Queues a BSONObj to be returned.
*/
@@ -62,8 +65,14 @@ public:
*/
void queueEOF();
+ /**
+ * Explicitly marks the remote cursors as all exhausted.
+ */
+ void markRemotesExhausted();
+
private:
std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue;
+ bool _remotesExhausted = false;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 7fa343f44c1..8708e603c7b 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -61,4 +61,8 @@ void RouterStageRemoveSortKey::kill() {
getChildStage()->kill();
}
+bool RouterStageRemoveSortKey::remotesExhausted() {
+ return getChildStage()->remotesExhausted();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index c376226f23f..3cdae152db7 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -44,6 +44,8 @@ public:
StatusWith<boost::optional<BSONObj>> next() final;
void kill() final;
+
+ bool remotesExhausted() final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index 90a7a555b07..668aec8e978 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -122,6 +122,33 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
ASSERT(!fourthResult.getValue());
}
+TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
+ mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
+ mockStage->markRemotesExhausted();
+
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ ASSERT_TRUE(sortKeyStage->remotesExhausted());
+
+ auto firstResult = sortKeyStage->next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue());
+ ASSERT_EQ(*firstResult.getValue(), BSON("a" << 1 << "b" << 1));
+ ASSERT_TRUE(sortKeyStage->remotesExhausted());
+
+ auto secondResult = sortKeyStage->next();
+ ASSERT_OK(secondResult.getStatus());
+ ASSERT(secondResult.getValue());
+ ASSERT_EQ(*secondResult.getValue(), BSON("a" << 2 << "b" << 2));
+ ASSERT_TRUE(sortKeyStage->remotesExhausted());
+
+ auto thirdResult = sortKeyStage->next();
+ ASSERT_OK(thirdResult.getStatus());
+ ASSERT(!thirdResult.getValue());
+ ASSERT_TRUE(sortKeyStage->remotesExhausted());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 51a2abe06af..5fc866b2db0 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -60,4 +60,8 @@ void RouterStageSkip::kill() {
getChildStage()->kill();
}
+bool RouterStageSkip::remotesExhausted() {
+ return getChildStage()->remotesExhausted();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 6f78b31ce90..b29e7fb20bd 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -43,6 +43,8 @@ public:
void kill() final;
+ bool remotesExhausted() final;
+
private:
long long _skip;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 84761d7b6ee..6e03e2d3301 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -172,6 +172,34 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
ASSERT(!thirdResult.getValue());
}
+TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ mockStage->queueResult(BSON("a" << 1));
+ mockStage->queueResult(BSON("a" << 2));
+ mockStage->queueResult(BSON("a" << 3));
+ mockStage->markRemotesExhausted();
+
+ auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
+ ASSERT_TRUE(skipStage->remotesExhausted());
+
+ auto firstResult = skipStage->next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue());
+ ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2));
+ ASSERT_TRUE(skipStage->remotesExhausted());
+
+ auto secondResult = skipStage->next();
+ ASSERT_OK(secondResult.getStatus());
+ ASSERT(secondResult.getValue());
+ ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3));
+ ASSERT_TRUE(skipStage->remotesExhausted());
+
+ auto thirdResult = skipStage->next();
+ ASSERT_OK(thirdResult.getStatus());
+ ASSERT(!thirdResult.getValue());
+ ASSERT_TRUE(skipStage->remotesExhausted());
+}
+
} // namespace
} // namespace mongo