summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-10-30 13:15:39 -0400
committerDavid Storch <david.storch@10gen.com>2015-11-05 11:26:56 -0500
commit5b34689a84b969affc822e014e44816959ca460b (patch)
tree29f16d4988fbad4f5c62892fd253cbe90d02b3fa /src/mongo/s/query
parent5edafdbf6ca1effcb18d62c8e53b37544afecfcc (diff)
downloadmongo-5b34689a84b969affc822e014e44816959ca460b.tar.gz
SERVER-21218 make mongos forward maxTimeMS on getMore command to mongod
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp22
-rw-r--r--src/mongo/s/query/async_results_merger.h12
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp60
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h10
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp13
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h3
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h9
-rw-r--r--src/mongo/s/query/cluster_find.cpp8
-rw-r--r--src/mongo/s/query/router_exec_stage.h10
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp13
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp13
-rw-r--r--src/mongo/s/query/router_stage_mock.h8
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp13
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp4
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp13
27 files changed, 246 insertions, 4 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index cdada8c55f8..eaaf759156e 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -111,6 +111,18 @@ bool AsyncResultsMerger::remotesExhausted_inlock() {
return true;
}
+Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (!_params.isTailable || !_params.isAwaitData) {
+ return Status(ErrorCodes::BadValue,
+ "maxTimeMS can only be used with getMore for tailable, awaitData cursors");
+ }
+
+ _awaitDataTimeout = awaitDataTimeout;
+ return Status::OK();
+}
+
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return ready_inlock();
@@ -268,10 +280,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
adjustedBatchSize = *_params.batchSize - remote.fetchedCount;
}
- cmdObj =
- GetMoreRequest(
- _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none)
- .toBSON();
+ cmdObj = GetMoreRequest(_params.nsString,
+ *remote.cursorId,
+ adjustedBatchSize,
+ _awaitDataTimeout,
+ boost::none,
+ boost::none).toBSON();
} else {
// Do the first time shard host resolution.
invariant(_params.readPreference);
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 9fc9b36960b..40e309e2273 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -40,6 +40,7 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -91,6 +92,15 @@ public:
bool remotesExhausted();
/**
+ * Sets the maxTimeMS value that the ARM should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
@@ -352,6 +362,8 @@ private:
// boost::none.
bool _eofNext = false;
+ boost::optional<Milliseconds> _awaitDataTimeout;
+
//
// Killing
//
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7dd3d6c2e2f..7886f92a713 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -117,6 +117,7 @@ protected:
params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize();
params.skip = lpq->getSkip();
params.isTailable = lpq->isTailable();
+ params.isAwaitData = lpq->isAwaitData();
params.isAllowPartialResults = lpq->isAllowPartialResults();
for (const auto& shardId : shardIds) {
@@ -1307,6 +1308,65 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
ASSERT_TRUE(arm->ready());
}
+TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
+ responses.emplace_back(_nss, CursorId(123), batch1);
+ scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+
+ ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Pending getMore request should include maxTimeMS.
+ BSONObj expectedCmdObj = BSON("getMore" << CursorId(123) << "collection"
+ << "testcoll"
+ << "maxTimeMS" << 789);
+ ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
+
+ responses.clear();
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+}
+
+TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
+ BSONObj findCmd = fromjson("{find: 'testcoll'}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+ auto killEvent = arm->kill();
+ executor->waitForEvent(killEvent);
+}
+
+TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+ auto killEvent = arm->kill();
+ executor->waitForEvent(killEvent);
+}
} // namespace
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index d0e6d8cf811..55f2412f7c8 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include "mongo/db/jsobj.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -95,6 +96,15 @@ public:
* Returns whether or not all the remote cursors underlying this cursor have been exhausted.
*/
virtual bool remotesExhausted() = 0;
+
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index bf6d301d0d2..a80562042dc 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -119,6 +119,10 @@ bool ClusterClientCursorImpl::remotesExhausted() {
return _root->remotesExhausted();
}
+Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _root->setAwaitDataTimeout(awaitDataTimeout);
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams&& params) {
const auto skip = params.skip;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 76c3f4c6708..1a8766f9005 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -108,6 +108,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
/**
* Constructs a cluster client cursor.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 96f5e2944a2..50b47f47817 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -129,6 +129,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
}
+TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ ClusterClientCursorImpl cursor(std::move(mockStage));
+ ASSERT_OK(cursor.setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 97e999c6939..4cf9418fc7f 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -32,6 +32,8 @@
#include "mongo/s/query/cluster_client_cursor_mock.h"
+#include "mongo/util/assert_util.h"
+
namespace mongo {
ClusterClientCursorMock::ClusterClientCursorMock(stdx::function<void(void)> killCallback)
@@ -91,4 +93,8 @@ void ClusterClientCursorMock::queueError(Status status) {
_resultsQueue.push({status});
}
+Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index a8515ba61ce..67efae2181a 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -53,6 +53,8 @@ public:
void queueResult(const BSONObj& obj) final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 0e497fd105a..5de1b912b72 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -132,6 +132,9 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection.
bool isTailable = false;
+ // Whether this cursor has the awaitData option set.
+ bool isAwaitData = false;
+
// Read preference for where to target the query. This value is only set if initial shard host
// targeting is necessary and not used if using externally prepared cursor ids.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 641bb29c30d..96ab2eb88dc 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -148,6 +148,11 @@ bool ClusterCursorManager::PinnedCursor::remotesExhausted() {
return _cursor->remotesExhausted();
}
+Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ invariant(_cursor);
+ return _cursor->setAwaitDataTimeout(awaitDataTimeout);
+}
+
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index b26796628c2..7c04414f15c 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -189,6 +189,15 @@ public:
*/
bool remotesExhausted();
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued
+ * getMore requests. A cursor must be owned.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e.
+ * if the cursor is not tailable + awaitData).
+ */
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 01562d8e835..34374af7a64 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -220,6 +220,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.batchSize = query.getParsed().getEffectiveBatchSize();
params.skip = query.getParsed().getSkip();
params.isTailable = query.getParsed().isTailable();
+ params.isAwaitData = query.getParsed().isAwaitData();
params.isAllowPartialResults = query.getParsed().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
@@ -406,6 +407,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
}
invariant(request.cursorid == pinnedCursor.getValue().getCursorId());
+ if (request.awaitDataTimeout) {
+ auto status = pinnedCursor.getValue().setAwaitDataTimeout(*request.awaitDataTimeout);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
std::vector<BSONObj> batch;
int bytesBuffered = 0;
long long batchSize = request.batchSize.value_or(0);
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 68e12b7f1f6..deb4bf34f9f 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -33,6 +33,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -71,6 +72,15 @@ public:
*/
virtual bool remotesExhausted() = 0;
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index c2f584f0358..9a9f77fbf00 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -63,4 +63,8 @@ bool RouterStageLimit::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 26ced69b24b..0db06c30c3b 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -45,6 +45,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index b85d82bef2d..fd8fa335e7e 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -164,6 +164,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
ASSERT_TRUE(limitStage->remotesExhausted());
}
+TEST(RouterStageLimitTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
+ ASSERT_OK(limitStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 2e75954d434..527bc0f0063 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -64,4 +64,8 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
+Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _arm.setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index d84b7172e47..a146c66f346 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -51,6 +51,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index 5af0d0d470a..179635bbb08 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -68,4 +68,17 @@ bool RouterStageMock::remotesExhausted() {
return _remotesExhausted;
}
+Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ _awaitDataTimeout = awaitDataTimeout;
+ return Status::OK();
+}
+
+StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() {
+ if (!_awaitDataTimeout) {
+ return Status(ErrorCodes::BadValue, "no awaitData timeout set");
+ }
+
+ return *_awaitDataTimeout;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 7cb31ce9745..b83e2879096 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -49,6 +49,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
/**
* Queues a BSONObj to be returned.
*/
@@ -70,9 +72,15 @@ public:
*/
void markRemotesExhausted();
+ /**
+ * Gets the timeout for awaitData, or an error if none was set.
+ */
+ StatusWith<Milliseconds> getAwaitDataTimeout();
+
private:
std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue;
bool _remotesExhausted = false;
+ boost::optional<Milliseconds> _awaitDataTimeout;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 8708e603c7b..16e6f9407a4 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -65,4 +65,8 @@ bool RouterStageRemoveSortKey::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index 3cdae152db7..6ef60012a4d 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -46,6 +46,8 @@ public:
void kill() final;
bool remotesExhausted() final;
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index 668aec8e978..255bcb3cba4 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -149,6 +149,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
ASSERT_TRUE(sortKeyStage->remotesExhausted());
}
+TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 5fc866b2db0..536c3d173a2 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -64,4 +64,8 @@ bool RouterStageSkip::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index b29e7fb20bd..35994d31e3e 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -45,6 +45,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
long long _skip;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 6e03e2d3301..7aca1f600bd 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -200,6 +200,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
ASSERT_TRUE(skipStage->remotesExhausted());
}
+TEST(RouterStageSkipTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+ ASSERT_OK(skipStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo